NereidsLoadScanProvider.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.load;
import org.apache.doris.alter.SchemaChangeHandler;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.FunctionSet;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.nereids.analyzer.UnboundFunction;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.IsNull;
import org.apache.doris.nereids.trees.expressions.Multiply;
import org.apache.doris.nereids.trees.expressions.Not;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.Function;
import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
import org.apache.doris.nereids.trees.expressions.literal.StringLiteral;
import org.apache.doris.nereids.trees.plans.commands.info.DefaultValue;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.types.VarcharType;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TUniqueKeyUpdateMode;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
* process column mapping expressions, delete conditions and sequence columns
*/
public class NereidsLoadScanProvider {
private static final Logger LOG = LogManager.getLogger(NereidsLoadScanProvider.class);
private NereidsFileGroupInfo fileGroupInfo;
private Set<String> partialUpdateInputColumns;
public NereidsLoadScanProvider(NereidsFileGroupInfo fileGroupInfo, Set<String> partialUpdateInputColumns) {
this.fileGroupInfo = fileGroupInfo;
this.partialUpdateInputColumns = partialUpdateInputColumns;
}
/**
* creating a NereidsParamCreateContext contains column mapping expressions and scan slots
*/
public NereidsParamCreateContext createLoadContext() throws UserException {
NereidsParamCreateContext context = new NereidsParamCreateContext();
context.fileGroup = fileGroupInfo.getFileGroup();
NereidsLoadTaskInfo.NereidsImportColumnDescs columnDescs = new NereidsLoadTaskInfo.NereidsImportColumnDescs();
columnDescs.descs = context.fileGroup.getColumnExprList();
handleDeleteCondition(columnDescs.descs, context.fileGroup);
handleSequenceColumn(columnDescs.descs, context.fileGroup);
fillContextExprMap(columnDescs.descs, context);
return context;
}
private void handleDeleteCondition(List<NereidsImportColumnDesc> columnDescList, NereidsBrokerFileGroup fileGroup) {
if (fileGroup.getMergeType() == LoadTask.MergeType.MERGE) {
columnDescList.add(
NereidsImportColumnDesc.newDeleteSignImportColumnDesc(fileGroup.getDeleteCondition()));
} else if (fileGroup.getMergeType() == LoadTask.MergeType.DELETE) {
columnDescList.add(NereidsImportColumnDesc.newDeleteSignImportColumnDesc(new IntegerLiteral(1)));
}
}
private void handleSequenceColumn(List<NereidsImportColumnDesc> columnDescList, NereidsBrokerFileGroup fileGroup)
throws UserException {
TableIf targetTable = fileGroupInfo.getTargetTable();
if (targetTable instanceof OlapTable && ((OlapTable) targetTable).hasSequenceCol()) {
OlapTable olapTable = (OlapTable) targetTable;
String sequenceCol = olapTable.getSequenceMapCol();
if (sequenceCol != null) {
String finalSequenceCol = sequenceCol;
Optional<NereidsImportColumnDesc> foundCol = columnDescList.stream()
.filter(c -> c.getColumnName().equalsIgnoreCase(finalSequenceCol)).findAny();
// if `columnDescs.descs` is empty, that means it's not a partial update load, and user not specify
// column name.
if (foundCol.isPresent() || shouldAddSequenceColumn(columnDescList)) {
columnDescList.add(new NereidsImportColumnDesc(Column.SEQUENCE_COL,
new UnboundSlot(sequenceCol)));
} else if (!fileGroupInfo.isFixedPartialUpdate()) {
Column seqCol = olapTable.getFullSchema().stream()
.filter(col -> col.getName().equals(olapTable.getSequenceMapCol()))
.findFirst().get();
if (seqCol.getDefaultValue() == null
|| !seqCol.getDefaultValue().equals(DefaultValue.CURRENT_TIMESTAMP)) {
throw new UserException("Table " + olapTable.getName()
+ " has sequence column, need to specify the sequence column");
}
}
} else if (!fileGroupInfo.isFlexiblePartialUpdate()) {
sequenceCol = fileGroup.getSequenceCol();
columnDescList.add(new NereidsImportColumnDesc(Column.SEQUENCE_COL, new UnboundSlot(sequenceCol)));
}
}
}
private void fillContextExprMap(List<NereidsImportColumnDesc> columnDescList, NereidsParamCreateContext context)
throws UserException {
NereidsBrokerFileGroup fileGroup = context.fileGroup;
context.exprMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
context.scanSlots = new ArrayList<>(columnDescList.size());
Table tbl = fileGroupInfo.getTargetTable();
// rewrite column list
List<NereidsImportColumnDesc> columnDescs = new ArrayList<>(columnDescList.size());
Map<UnboundSlot, Expression> replaceMap = Maps.newHashMap();
for (NereidsImportColumnDesc desc : columnDescList) {
if (!desc.isColumn()) {
NereidsImportColumnDesc newDesc = desc.withExpr(ExpressionUtils.replace(desc.getExpr(), replaceMap));
columnDescs.add(newDesc);
replaceMap.put(new UnboundSlot(newDesc.getColumnName()), newDesc.getExpr());
} else {
columnDescs.add(desc);
}
}
// We make a copy of the columnExprs so that our subsequent changes
// to the columnExprs will not affect the original columnExprs.
// skip the mapping columns not exist in schema
// eg: the origin column list is:
// (k1, k2, tmpk3 = k1 + k2, k3 = tmpk3)
// after calling rewriteColumns(), it will become
// (k1, k2, tmpk3 = k1 + k2, k3 = k1 + k2)
// so "tmpk3 = k1 + k2" is not needed anymore, we can skip it.
List<NereidsImportColumnDesc> copiedColumnExprs = new ArrayList<>(columnDescs.size());
for (NereidsImportColumnDesc importColumnDesc : columnDescs) {
String mappingColumnName = importColumnDesc.getColumnName();
if (importColumnDesc.isColumn() || tbl.getColumn(mappingColumnName) != null) {
copiedColumnExprs.add(importColumnDesc);
}
}
// check whether the OlapTable has sequenceCol and skipBitmapCol
boolean hasSequenceCol = false;
boolean hasSequenceMapCol = false;
boolean hasSkipBitmapColumn = false;
if (tbl instanceof OlapTable) {
OlapTable olapTable = (OlapTable) tbl;
hasSequenceCol = olapTable.hasSequenceCol();
hasSequenceMapCol = (olapTable.getSequenceMapCol() != null);
hasSkipBitmapColumn = olapTable.hasSkipBitmapColumn();
}
// If user does not specify the file field names, generate it by using base schema of table.
// So that the following process can be unified
boolean specifyFileFieldNames = copiedColumnExprs.stream().anyMatch(p -> p.isColumn());
if (!specifyFileFieldNames) {
List<Column> columns = tbl.getBaseSchema(false);
for (Column column : columns) {
NereidsImportColumnDesc columnDesc;
if (fileGroup.getFileFormatProperties().getFileFormatType() == TFileFormatType.FORMAT_JSON) {
columnDesc = new NereidsImportColumnDesc(column.getName());
} else {
columnDesc = new NereidsImportColumnDesc(column.getName().toLowerCase());
}
if (LOG.isDebugEnabled()) {
LOG.debug("add base column {} to stream load task", column.getName());
}
copiedColumnExprs.add(columnDesc);
}
List<String> hiddenColumns = fileGroupInfo.getHiddenColumns();
if (hasSkipBitmapColumn
&& fileGroupInfo.getUniqueKeyUpdateMode() == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS) {
Preconditions.checkArgument(hiddenColumns == null);
if (LOG.isDebugEnabled()) {
LOG.debug("add hidden column {} to stream load task", Column.DELETE_SIGN);
}
copiedColumnExprs.add(new NereidsImportColumnDesc(Column.DELETE_SIGN));
if (LOG.isDebugEnabled()) {
LOG.debug("add hidden column {} to stream load task", Column.SKIP_BITMAP_COL);
}
// allow to specify __DORIS_SEQUENCE_COL__ if table has sequence type column
if (hasSequenceCol && !hasSequenceMapCol) {
copiedColumnExprs.add(new NereidsImportColumnDesc(Column.SEQUENCE_COL));
if (LOG.isDebugEnabled()) {
LOG.debug("add hidden column {} to stream load task", Column.SEQUENCE_COL);
}
}
copiedColumnExprs.add(new NereidsImportColumnDesc(Column.SKIP_BITMAP_COL));
}
if (hiddenColumns != null) {
for (String columnName : hiddenColumns) {
Column column = tbl.getColumn(columnName);
if (column != null && !column.isVisible()) {
NereidsImportColumnDesc columnDesc = new NereidsImportColumnDesc(column.getName());
if (LOG.isDebugEnabled()) {
LOG.debug("add hidden column {} to stream load task", column.getName());
}
copiedColumnExprs.add(columnDesc);
}
}
}
}
// generate a map for checking easily
Map<String, Expression> columnExprMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
for (NereidsImportColumnDesc importColumnDesc : copiedColumnExprs) {
columnExprMap.put(importColumnDesc.getColumnName(), importColumnDesc.getExpr());
}
HashMap<String, Type> colToType = new HashMap<>();
// check default value and auto-increment column
for (Column column : tbl.getBaseSchema()) {
if (fileGroupInfo.getUniqueKeyUpdateMode() == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS
&& !partialUpdateInputColumns.contains(column.getName())) {
continue;
}
String columnName = column.getName();
colToType.put(columnName, column.getType());
Expression expression = null;
if (column.getGeneratedColumnInfo() != null) {
// the generated column will be handled by bindSink
} else {
if (columnExprMap.get(columnName) != null) {
expression = columnExprMap.get(columnName);
} else {
// other column with default value will be handled by bindSink
}
}
if (expression != null) {
// check hll_hash
if (column.getDataType() == PrimitiveType.HLL) {
if (!(expression instanceof UnboundFunction)) {
throw new AnalysisException("HLL column must use " + FunctionSet.HLL_HASH + " function, like "
+ columnName + "=" + FunctionSet.HLL_HASH + "(xxx)");
}
UnboundFunction function = (UnboundFunction) expression;
String functionName = function.getName();
if (!functionName.equalsIgnoreCase(FunctionSet.HLL_HASH)
&& !functionName.equalsIgnoreCase("hll_empty")
&& !functionName.equalsIgnoreCase(FunctionSet.HLL_FROM_BASE64)) {
throw new AnalysisException("HLL column must use " + FunctionSet.HLL_HASH + " function, like "
+ columnName + "=" + FunctionSet.HLL_HASH + "(xxx) or "
+ columnName + "=" + FunctionSet.HLL_FROM_BASE64 + "(xxx) or "
+ columnName + "=hll_empty()");
}
}
if (fileGroup.isNegative() && column.getAggregationType() != null
&& column.getAggregationType() == AggregateType.SUM) {
expression = new Multiply(expression, new IntegerLiteral(-1));
}
// check Bitmap Compatibility and check QuantileState Compatibility need be checked after binding
// for jsonb type, use jsonb_parse_xxx to parse src string to jsonb.
// and if input string is not a valid json string, return null. this need be handled after binding
expression = ExpressionUtils.replace(expression, replaceMap);
replaceMap.put(new UnboundSlot(columnName), expression);
context.exprMap.put(column.getName(), expression);
}
}
Map<String, Pair<String, List<String>>> columnToHadoopFunction = fileGroup.getColumnToHadoopFunction();
// validate hadoop functions
if (columnToHadoopFunction != null) {
Map<String, String> columnNameMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
for (NereidsImportColumnDesc importColumnDesc : copiedColumnExprs) {
if (importColumnDesc.isColumn()) {
columnNameMap.put(importColumnDesc.getColumnName(), importColumnDesc.getColumnName());
}
}
for (Map.Entry<String, Pair<String, List<String>>> entry : columnToHadoopFunction.entrySet()) {
String mappingColumnName = entry.getKey();
Column mappingColumn = tbl.getColumn(mappingColumnName);
if (mappingColumn == null) {
throw new DdlException("Mapping column is not in table. column: " + mappingColumnName);
}
Pair<String, List<String>> function = entry.getValue();
try {
NereidsDataDescription.validateMappingFunction(function.first, function.second, columnNameMap,
mappingColumn, false);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
}
}
// create scan SlotReferences and transform hadoop functions
boolean hasColumnFromTable = false;
for (NereidsImportColumnDesc importColumnDesc : copiedColumnExprs) {
// make column name case match with real column name
String columnName = importColumnDesc.getColumnName();
Column tblColumn = tbl.getColumn(columnName);
if (tblColumn != null) {
hasColumnFromTable = true;
}
String realColName;
if (tblColumn == null || tblColumn.getName() == null || importColumnDesc.getExpr() == null) {
realColName = columnName;
} else {
realColName = tblColumn.getName();
}
if (importColumnDesc.getExpr() != null) {
if (tblColumn.getGeneratedColumnInfo() == null) {
Expression expr = transformHadoopFunctionExpr(tbl, realColName, importColumnDesc.getExpr());
context.exprMap.put(realColName, expr);
}
} else {
Column slotColumn;
if (fileGroup.getFileFormatProperties().getFileFormatType() == TFileFormatType.FORMAT_ARROW) {
slotColumn = new Column(realColName, colToType.get(realColName), true);
} else {
if (fileGroupInfo.getUniqueKeyUpdateMode() == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS
&& hasSkipBitmapColumn) {
// we store the unique ids of missing columns in skip bitmap column in flexible partial update
int colUniqueId = tblColumn.getUniqueId();
if (realColName.equals(Column.SKIP_BITMAP_COL)) {
// don't change the skip_bitmap_col's type to varchar becasue we will fill this column
// in NewJsonReader manually rather than reading them from files as varchar type and then
// converting them to their real type
slotColumn = new Column(realColName, PrimitiveType.BITMAP, true);
} else {
// columns default be varchar type
slotColumn = new Column(realColName, PrimitiveType.VARCHAR, true);
}
// In flexible partial update, every row can update different columns, we should check
// key columns intergrity for every row in XXXReader on BE rather than checking it on FE
// directly for all rows like in fixed columns partial update. So we should set if a slot
// is key column here
slotColumn.setIsKey(tblColumn.isKey());
slotColumn.setIsAutoInc(tblColumn.isAutoInc());
slotColumn.setUniqueId(colUniqueId);
} else {
slotColumn = new Column(realColName, PrimitiveType.VARCHAR, true);
}
}
context.scanSlots.add(SlotReference.fromColumn(tbl, slotColumn, tbl.getFullQualifiers()));
}
}
if (!hasColumnFromTable) {
// we should add at least one column for target table to make bindSink happy
Column column = null;
for (Column col : tbl.getBaseSchema()) {
if (col.getGeneratedColumnInfo() == null) {
column = col;
break;
}
}
if (column == null) {
throw new DdlException(String.format("can't find non-generated column in table %s", tbl.getName()));
}
context.exprMap.put(column.getName(), new NullLiteral(DataType.fromCatalogType(column.getType())));
}
}
/**
* if not set sequence column and column size is null or only have deleted sign ,return true
*/
private boolean shouldAddSequenceColumn(List<NereidsImportColumnDesc> columnDescList) {
if (columnDescList.isEmpty()) {
return true;
}
return columnDescList.size() == 1 && columnDescList.get(0).getColumnName().equalsIgnoreCase(Column.DELETE_SIGN);
}
private TFileFormatType formatType(String fileFormat) throws UserException {
if (fileFormat == null) {
// get file format by the file path
return TFileFormatType.FORMAT_CSV_PLAIN;
}
TFileFormatType formatType = Util.getFileFormatTypeFromName(fileFormat);
if (formatType == TFileFormatType.FORMAT_UNKNOWN) {
throw new UserException("Not supported file format: " + fileFormat);
}
return formatType;
}
/**
* When doing schema change, there may have some 'shadow' columns, with prefix '__doris_shadow_' in
* their names. These columns are invisible to user, but we need to generate data for these columns.
* So we add column mappings for these column.
* eg1:
* base schema is (A, B, C), and B is under schema change, so there will be a shadow column: '__doris_shadow_B'
* So the final column mapping should looks like: (A, B, C, __doris_shadow_B = substitute(B));
*/
private List<NereidsImportColumnDesc> getSchemaChangeShadowColumnDesc(Table tbl,
Map<String, Expression> columnExprMap) {
List<NereidsImportColumnDesc> shadowColumnDescs = Lists.newArrayList();
for (Column column : tbl.getFullSchema()) {
if (!column.isNameWithPrefix(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
continue;
}
String originCol = column.getNameWithoutPrefix(SchemaChangeHandler.SHADOW_NAME_PREFIX);
if (columnExprMap.containsKey(originCol)) {
Expression mappingExpr = columnExprMap.get(originCol);
if (mappingExpr != null) {
/*
* eg:
* (A, C) SET (B = func(xx))
* ->
* (A, C) SET (B = func(xx), __doris_shadow_B = func(xx))
*/
NereidsImportColumnDesc importColumnDesc = new NereidsImportColumnDesc(column.getName(),
mappingExpr);
shadowColumnDescs.add(importColumnDesc);
} else {
/*
* eg:
* (A, B, C)
* ->
* (A, B, C) SET (__doris_shadow_B = B)
*/
UnboundSlot slot = new UnboundSlot(originCol);
// TODO: check if it's OK to remove setType
// slot.setType(column.getType());
NereidsImportColumnDesc importColumnDesc = new NereidsImportColumnDesc(column.getName(), slot);
shadowColumnDescs.add(importColumnDesc);
}
} else {
/*
* There is a case that if user does not specify the related origin column, eg:
* COLUMNS (A, C), and B is not specified, but B is being modified
* so there is a shadow column '__doris_shadow_B'.
* We can not just add a mapping function "__doris_shadow_B = substitute(B)",
* because Doris can not find column B.
* In this case, __doris_shadow_B can use its default value, so no need to add it to column mapping
*/
// do nothing
}
}
return shadowColumnDescs;
}
/**
* This method is used to transform hadoop function.
* The hadoop function includes: replace_value, strftime, time_format, alignment_timestamp, default_value, now.
* It rewrites those function with real function name and param.
* For the other function, the expr only go through this function and the origin expr is returned.
*/
private Expression transformHadoopFunctionExpr(Table tbl, String columnName, Expression originExpr)
throws UserException {
Column column = tbl.getColumn(columnName);
if (column == null) {
// the unknown column will be checked later.
return originExpr;
}
// To compatible with older load version
if (originExpr instanceof Function) {
Function funcExpr = (Function) originExpr;
String funcName = funcExpr.getName();
if (funcName.equalsIgnoreCase("replace_value")) {
List<Expression> exprs = Lists.newArrayList();
UnboundSlot slot = new UnboundSlot(columnName);
// We will convert this to IF(`col` != child0, `col`, child1),
// because we need the if return type equal to `col`, we use NE
/*
* We will convert this based on different cases:
* case 1: k1 = replace_value(null, anyval);
* to: k1 = if (k1 is not null, k1, anyval);
*
* case 2: k1 = replace_value(anyval1, anyval2);
* to: k1 = if (k1 is not null, if(k1 != anyval1, k1, anyval2), null);
*/
if (funcExpr.child(0) instanceof NullLiteral) {
// case 1
exprs.add(new Not(new IsNull(slot)));
exprs.add(slot);
if (funcExpr.children().size() > 1) {
exprs.add(funcExpr.child(1));
} else {
if (column.getDefaultValue() != null) {
if (column.getDefaultValueExprDef() != null) {
String exprSql = column.getDefaultValueExpr().toSql();
exprs.add(NereidsLoadUtils.parseExpressionSeq(exprSql).get(0));
} else {
exprs.add(new StringLiteral(column.getDefaultValue()));
}
} else {
if (column.isAllowNull()) {
exprs.add(new NullLiteral(VarcharType.SYSTEM_DEFAULT));
} else {
throw new UserException("Column(" + columnName + ") has no default value.");
}
}
}
} else {
// case 2
exprs.add(new Not(new IsNull(slot)));
List<Expression> innerIfExprs = Lists.newArrayList();
innerIfExprs.add(new Not(new EqualTo(slot, funcExpr.child(0))));
innerIfExprs.add(slot);
if (funcExpr.children().size() > 1) {
innerIfExprs.add(funcExpr.child(1));
} else {
if (column.getDefaultValue() != null) {
if (column.getDefaultValueExprDef() != null) {
String exprSql = column.getDefaultValueExpr().toSql();
innerIfExprs.add(NereidsLoadUtils.parseExpressionSeq(exprSql).get(0));
} else {
innerIfExprs.add(new StringLiteral(column.getDefaultValue()));
}
} else {
if (column.isAllowNull()) {
innerIfExprs.add(new NullLiteral(VarcharType.SYSTEM_DEFAULT));
} else {
throw new UserException("Column(" + columnName + ") has no default value.");
}
}
}
UnboundFunction innerIfFn = new UnboundFunction("if", innerIfExprs);
exprs.add(innerIfFn);
exprs.add(new NullLiteral(VarcharType.SYSTEM_DEFAULT));
}
if (LOG.isDebugEnabled()) {
LOG.debug("replace_value expr: {}", exprs);
}
UnboundFunction newFn = new UnboundFunction("if", exprs);
return newFn;
} else if (funcName.equalsIgnoreCase("strftime")) {
// FROM_UNIXTIME(val)
return new UnboundFunction("from_unixtime", Lists.newArrayList(funcExpr.child(1)));
} else if (funcName.equalsIgnoreCase("time_format")) {
// DATE_FORMAT(STR_TO_DATE(dt_str, dt_fmt))
List<Expression> strToDateExprs = Lists.newArrayList(funcExpr.child(2), funcExpr.child(1));
UnboundFunction strToDateFuncExpr = new UnboundFunction("str_to_date", strToDateExprs);
List<Expression> dateFormatArgs = Lists.newArrayList(strToDateFuncExpr, funcExpr.child(0));
UnboundFunction dateFormatFunc = new UnboundFunction("date_format", dateFormatArgs);
return dateFormatFunc;
} else if (funcName.equalsIgnoreCase("alignment_timestamp")) {
/*
* change to:
* UNIX_TIMESTAMP(DATE_FORMAT(FROM_UNIXTIME(ts), "%Y-01-01 00:00:00"));
*
*/
// FROM_UNIXTIME
UnboundFunction fromUnixFunc = new UnboundFunction("from_unixtime",
Lists.newArrayList(funcExpr.child(1)));
// DATE_FORMAT
StringLiteral precision = (StringLiteral) funcExpr.child(0);
StringLiteral format;
if (precision.getStringValue().equalsIgnoreCase("year")) {
format = new StringLiteral("%Y-01-01 00:00:00");
} else if (precision.getStringValue().equalsIgnoreCase("month")) {
format = new StringLiteral("%Y-%m-01 00:00:00");
} else if (precision.getStringValue().equalsIgnoreCase("day")) {
format = new StringLiteral("%Y-%m-%d 00:00:00");
} else if (precision.getStringValue().equalsIgnoreCase("hour")) {
format = new StringLiteral("%Y-%m-%d %H:00:00");
} else {
throw new UserException("Unknown precision(" + precision.getStringValue() + ")");
}
List<Expression> dateFormatArgs = Lists.newArrayList(fromUnixFunc, format);
UnboundFunction dateFormatFunc = new UnboundFunction("date_format", dateFormatArgs);
// UNIX_TIMESTAMP
List<Expression> unixTimeArgs = Lists.newArrayList();
unixTimeArgs.add(dateFormatFunc);
UnboundFunction unixTimeFunc = new UnboundFunction("unix_timestamp", unixTimeArgs);
return unixTimeFunc;
} else if (funcName.equalsIgnoreCase("default_value")) {
return funcExpr.child(0);
} else if (funcName.equalsIgnoreCase("now")) {
UnboundFunction newFunc = new UnboundFunction("now", Lists.newArrayList());
return newFunc;
} else if (funcName.equalsIgnoreCase("substitute")) {
return funcExpr.child(0);
}
}
return originExpr;
}
}