LogicalHudiScan.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.logical;
import org.apache.doris.analysis.TableScanParams;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
import org.apache.doris.datasource.hudi.source.COWIncrementalRelation;
import org.apache.doris.datasource.hudi.source.EmptyIncrementalRelation;
import org.apache.doris.datasource.hudi.source.IncrementalRelation;
import org.apache.doris.datasource.hudi.source.MORIncrementalRelation;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.TableSample;
import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.GreaterThan;
import org.apache.doris.nereids.trees.expressions.GreaterThanEqual;
import org.apache.doris.nereids.trees.expressions.LessThanEqual;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.literal.StringLiteral;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import com.google.common.collect.ImmutableSet;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
/**
* Logical Hudi scan for Hudi table
*/
public class LogicalHudiScan extends LogicalFileScan {
private static final Logger LOG = LogManager.getLogger(LogicalHudiScan.class);
// for hudi incremental read
private final Optional<TableScanParams> scanParams;
private final Optional<IncrementalRelation> incrementalRelation;
/**
* Constructor for LogicalHudiScan.
*/
protected LogicalHudiScan(RelationId id, ExternalTable table, List<String> qualifier,
Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties,
SelectedPartitions selectedPartitions, Optional<TableSample> tableSample,
Optional<TableSnapshot> tableSnapshot,
Optional<TableScanParams> scanParams, Optional<IncrementalRelation> incrementalRelation,
Collection<Slot> operativeSlots) {
super(id, table, qualifier, groupExpression, logicalProperties,
selectedPartitions, tableSample, tableSnapshot, operativeSlots);
Objects.requireNonNull(scanParams, "scanParams should not null");
Objects.requireNonNull(incrementalRelation, "incrementalRelation should not null");
this.scanParams = scanParams;
this.incrementalRelation = incrementalRelation;
}
public LogicalHudiScan(RelationId id, ExternalTable table, List<String> qualifier,
Optional<TableSample> tableSample, Optional<TableSnapshot> tableSnapshot, Collection<Slot> operativeSlots) {
this(id, table, qualifier, Optional.empty(), Optional.empty(),
((HMSExternalTable) table).initHudiSelectedPartitions(tableSnapshot), tableSample, tableSnapshot,
Optional.empty(), Optional.empty(),
operativeSlots);
}
public Optional<TableScanParams> getScanParams() {
return scanParams;
}
public Optional<IncrementalRelation> getIncrementalRelation() {
return incrementalRelation;
}
/**
* replace incremental params as AND expression
* incr('beginTime'='20240308110257169', 'endTime'='20240308110677278') =>
* _hoodie_commit_time >= 20240308110257169 and _hoodie_commit_time <= '20240308110677278'
*/
public Set<Expression> generateIncrementalExpression(List<Slot> slots) {
if (!incrementalRelation.isPresent()) {
return Collections.emptySet();
}
SlotReference timeField = null;
for (Slot slot : slots) {
if ("_hoodie_commit_time".equals(slot.getName())) {
timeField = (SlotReference) slot;
break;
}
}
if (timeField == null) {
return Collections.emptySet();
}
StringLiteral upperValue = new StringLiteral(incrementalRelation.get().getEndTs());
StringLiteral lowerValue = new StringLiteral(incrementalRelation.get().getStartTs());
ComparisonPredicate less = new LessThanEqual(timeField, upperValue);
ComparisonPredicate great = incrementalRelation.get().isIncludeStartTime()
? new GreaterThanEqual(timeField, lowerValue)
: new GreaterThan(timeField, lowerValue);
return ImmutableSet.of(great, less);
}
@Override
public String toString() {
return Utils.toSqlString("LogicalHudiScan",
"qualified", qualifiedName(),
"output", getOutput()
);
}
@Override
public LogicalHudiScan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new LogicalHudiScan(relationId, (ExternalTable) table, qualifier, groupExpression,
Optional.of(getLogicalProperties()), selectedPartitions, tableSample, tableSnapshot,
scanParams, incrementalRelation, operativeSlots);
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new LogicalHudiScan(relationId, (ExternalTable) table, qualifier,
groupExpression, logicalProperties, selectedPartitions, tableSample, tableSnapshot,
scanParams, incrementalRelation, operativeSlots);
}
public LogicalHudiScan withSelectedPartitions(SelectedPartitions selectedPartitions) {
return new LogicalHudiScan(relationId, (ExternalTable) table, qualifier, Optional.empty(),
Optional.of(getLogicalProperties()), selectedPartitions, tableSample, tableSnapshot,
scanParams, incrementalRelation, operativeSlots);
}
@Override
public LogicalHudiScan withRelationId(RelationId relationId) {
return new LogicalHudiScan(relationId, (ExternalTable) table, qualifier, Optional.empty(),
Optional.empty(), selectedPartitions, tableSample, tableSnapshot,
scanParams, incrementalRelation, operativeSlots);
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitLogicalHudiScan(this, context);
}
@Override
public LogicalFileScan withOperativeSlots(Collection<Slot> operativeSlots) {
return new LogicalHudiScan(relationId, (ExternalTable) table, qualifier,
groupExpression, Optional.of(getLogicalProperties()),
selectedPartitions, tableSample, tableSnapshot, scanParams, incrementalRelation,
operativeSlots);
}
/**
* Set scan params for incremental read
*
* @param table should be hudi table
* @param scanParams including incremental read params
*/
public LogicalHudiScan withScanParams(HMSExternalTable table, TableScanParams scanParams) {
Optional<IncrementalRelation> newIncrementalRelation = Optional.empty();
Optional<TableScanParams> newScanParams = Optional.empty();
if (scanParams != null && scanParams.incrementalRead()) {
Map<String, String> optParams = table.getHadoopProperties();
if (scanParams.getParams().containsKey("beginTime")) {
optParams.put("hoodie.datasource.read.begin.instanttime", scanParams.getParams().get("beginTime"));
}
if (scanParams.getParams().containsKey("endTime")) {
optParams.put("hoodie.datasource.read.end.instanttime", scanParams.getParams().get("endTime"));
}
scanParams.getParams().forEach((k, v) -> {
if (k.startsWith("hoodie.")) {
optParams.put(k, v);
}
});
HoodieTableMetaClient hudiClient = table.getHudiClient();
try {
boolean isCowOrRoTable = table.isHoodieCowTable();
if (isCowOrRoTable) {
Map<String, String> serd = table.getRemoteTable().getSd().getSerdeInfo().getParameters();
if ("true".equals(serd.get("hoodie.query.as.ro.table"))
&& table.getRemoteTable().getTableName().endsWith("_ro")) {
// Incremental read RO table as RT table, I don't know why?
isCowOrRoTable = false;
LOG.warn("Execute incremental read on RO table: {}", table.getFullQualifiers());
}
}
if (hudiClient.getCommitsTimeline().filterCompletedInstants().countInstants() == 0) {
newIncrementalRelation = Optional.of(new EmptyIncrementalRelation(optParams));
} else if (isCowOrRoTable) {
newIncrementalRelation = Optional.of(new COWIncrementalRelation(
optParams, HiveMetaStoreClientHelper.getConfiguration(table), hudiClient));
} else {
newIncrementalRelation = Optional.of(new MORIncrementalRelation(
optParams, HiveMetaStoreClientHelper.getConfiguration(table), hudiClient));
}
} catch (Exception e) {
throw new AnalysisException(
"Failed to create incremental relation for table: " + table.getFullQualifiers(), e);
}
}
newScanParams = Optional.ofNullable(scanParams);
return new LogicalHudiScan(relationId, table, qualifier, Optional.empty(),
Optional.empty(), selectedPartitions, tableSample, tableSnapshot,
newScanParams, newIncrementalRelation, operativeSlots);
}
}