COWIncrementalRelation.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.hudi.source;

import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.TableFormatType;
import org.apache.doris.spi.Split;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.GlobPattern;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.storage.StoragePath;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;

public class COWIncrementalRelation implements IncrementalRelation {
    private final Map<String, String> optParams;
    private final HoodieTableMetaClient metaClient;
    private final HollowCommitHandling hollowCommitHandling;
    private final boolean startInstantArchived;
    private final boolean endInstantArchived;
    private final boolean fullTableScan;
    private final FileSystem fs;
    private final Map<String, HoodieWriteStat> fileToWriteStat;
    private final Collection<String> filteredRegularFullPaths;
    private final Collection<String> filteredMetaBootstrapFullPaths;

    private final boolean includeStartTime;
    private final String startTs;
    private final String endTs;

    public COWIncrementalRelation(Map<String, String> optParams, Configuration configuration,
            HoodieTableMetaClient metaClient)
            throws HoodieException, IOException {
        this.optParams = optParams;
        this.metaClient = metaClient;
        hollowCommitHandling = HollowCommitHandling.valueOf(
                optParams.getOrDefault("hoodie.read.timeline.holes.resolution.policy", "FAIL"));
        HoodieTimeline commitTimeline = TimelineUtils.handleHollowCommitIfNeeded(
                metaClient.getCommitTimeline().filterCompletedInstants(), metaClient, hollowCommitHandling);
        if (commitTimeline.empty()) {
            throw new HoodieException("No instants to incrementally pull");
        }
        if (!metaClient.getTableConfig().populateMetaFields()) {
            throw new HoodieException("Incremental queries are not supported when meta fields are disabled");
        }

        String startInstantTime = optParams.get("hoodie.datasource.read.begin.instanttime");
        if (startInstantTime == null) {
            throw new HoodieException("Specify the begin instant time to pull from using "
                    + "option hoodie.datasource.read.begin.instanttime");
        }
        if (EARLIEST_TIME.equals(startInstantTime)) {
            startInstantTime = "000";
        }
        String endInstantTime = optParams.getOrDefault("hoodie.datasource.read.end.instanttime",
                hollowCommitHandling == HollowCommitHandling.USE_TRANSITION_TIME
                        ? commitTimeline.lastInstant().get().getStateTransitionTime()
                        : commitTimeline.lastInstant().get().getTimestamp());
        startInstantArchived = commitTimeline.isBeforeTimelineStarts(startInstantTime);
        endInstantArchived = commitTimeline.isBeforeTimelineStarts(endInstantTime);

        HoodieTimeline commitsTimelineToReturn;
        if (hollowCommitHandling == HollowCommitHandling.USE_TRANSITION_TIME) {
            commitsTimelineToReturn = commitTimeline.findInstantsInRangeByStateTransitionTime(startInstantTime,
                    endInstantTime);
        } else {
            commitsTimelineToReturn = commitTimeline.findInstantsInRange(startInstantTime, endInstantTime);
        }
        List<HoodieInstant> commitsToReturn = commitsTimelineToReturn.getInstants();

        // todo: support configuration hoodie.datasource.read.incr.filters
        StoragePath basePath = metaClient.getBasePathV2();
        Map<String, String> regularFileIdToFullPath = new HashMap<>();
        Map<String, String> metaBootstrapFileIdToFullPath = new HashMap<>();
        HoodieTimeline replacedTimeline = commitsTimelineToReturn.getCompletedReplaceTimeline();
        Map<String, String> replacedFile = new HashMap<>();
        for (HoodieInstant instant : replacedTimeline.getInstants()) {
            HoodieReplaceCommitMetadata.fromBytes(metaClient.getActiveTimeline().getInstantDetails(instant).get(),
                    HoodieReplaceCommitMetadata.class).getPartitionToReplaceFileIds().forEach(
                            (key, value) -> value.forEach(
                                    e -> replacedFile.put(e, FSUtils.constructAbsolutePath(basePath, key).toString())));
        }

        fileToWriteStat = new HashMap<>();
        for (HoodieInstant commit : commitsToReturn) {
            HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(
                    commitTimeline.getInstantDetails(commit).get(), HoodieCommitMetadata.class);
            metadata.getPartitionToWriteStats().forEach((partition, stats) -> {
                for (HoodieWriteStat stat : stats) {
                    fileToWriteStat.put(FSUtils.constructAbsolutePath(basePath, stat.getPath()).toString(), stat);
                }
            });
            if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS.equals(commit.getTimestamp())) {
                metadata.getFileIdAndFullPaths(basePath).forEach((k, v) -> {
                    if (!(replacedFile.containsKey(k) && v.startsWith(replacedFile.get(k)))) {
                        metaBootstrapFileIdToFullPath.put(k, v);
                    }
                });
            } else {
                metadata.getFileIdAndFullPaths(basePath).forEach((k, v) -> {
                    if (!(replacedFile.containsKey(k) && v.startsWith(replacedFile.get(k)))) {
                        regularFileIdToFullPath.put(k, v);
                    }
                });
            }
        }

        if (!metaBootstrapFileIdToFullPath.isEmpty()) {
            // filer out meta bootstrap files that have had more commits since metadata bootstrap
            metaBootstrapFileIdToFullPath.entrySet().removeIf(e -> regularFileIdToFullPath.containsKey(e.getKey()));
        }
        String pathGlobPattern = optParams.getOrDefault("hoodie.datasource.read.incr.path.glob", "");
        if ("".equals(pathGlobPattern)) {
            filteredRegularFullPaths = regularFileIdToFullPath.values();
            filteredMetaBootstrapFullPaths = metaBootstrapFileIdToFullPath.values();
        } else {
            GlobPattern globMatcher = new GlobPattern("*" + pathGlobPattern);
            filteredRegularFullPaths = regularFileIdToFullPath.values().stream().filter(globMatcher::matches)
                    .collect(Collectors.toList());
            filteredMetaBootstrapFullPaths = metaBootstrapFileIdToFullPath.values().stream()
                    .filter(globMatcher::matches).collect(Collectors.toList());

        }

        fs = new Path(basePath.toUri().getPath()).getFileSystem(configuration);
        fullTableScan = shouldFullTableScan();
        includeStartTime = !fullTableScan;
        if (fullTableScan || commitsToReturn.isEmpty()) {
            startTs = startInstantTime;
            endTs = endInstantTime;
        } else {
            startTs = commitsToReturn.get(0).getTimestamp();
            endTs = commitsToReturn.get(commitsToReturn.size() - 1).getTimestamp();
        }
    }

    private boolean shouldFullTableScan() throws HoodieException, IOException {
        boolean fallbackToFullTableScan = Boolean.parseBoolean(
                optParams.getOrDefault("hoodie.datasource.read.incr.fallback.fulltablescan.enable", "false"));
        if (fallbackToFullTableScan && (startInstantArchived || endInstantArchived)) {
            if (hollowCommitHandling == HollowCommitHandling.USE_TRANSITION_TIME) {
                throw new HoodieException("Cannot use stateTransitionTime while enables full table scan");
            }
            return true;
        }
        if (fallbackToFullTableScan) {
            for (String path : filteredMetaBootstrapFullPaths) {
                if (!fs.exists(new Path(path))) {
                    return true;
                }
            }
            for (String path : filteredRegularFullPaths) {
                if (!fs.exists(new Path(path))) {
                    return true;
                }
            }
        }
        return false;
    }

    @Override
    public List<FileSlice> collectFileSlices() throws HoodieException {
        throw new UnsupportedOperationException();
    }

    @Override
    public List<Split> collectSplits() throws HoodieException {
        if (fullTableScan) {
            throw new HoodieException("Fallback to full table scan");
        }
        if (filteredRegularFullPaths.isEmpty() && filteredMetaBootstrapFullPaths.isEmpty()) {
            return Collections.emptyList();
        }
        List<Split> splits = new ArrayList<>();
        Option<String[]> partitionColumns = metaClient.getTableConfig().getPartitionFields();
        List<String> partitionNames = partitionColumns.isPresent() ? Arrays.asList(partitionColumns.get())
                : Collections.emptyList();

        Consumer<String> generatorSplit =  baseFile -> {
            HoodieWriteStat stat = fileToWriteStat.get(baseFile);
            LocationPath locationPath = new LocationPath(baseFile, optParams);
            HudiSplit hudiSplit = new HudiSplit(locationPath, 0,
                    stat.getFileSizeInBytes(), stat.getFileSizeInBytes(), new String[0],
                    HudiPartitionProcessor.parsePartitionValues(partitionNames, stat.getPartitionPath()));
            hudiSplit.setTableFormatType(TableFormatType.HUDI);
            splits.add(hudiSplit);
        };

        for (String baseFile : filteredMetaBootstrapFullPaths) {
            generatorSplit.accept(baseFile);
        }
        for (String baseFile : filteredRegularFullPaths) {
            generatorSplit.accept(baseFile);
        }
        return splits;
    }

    @Override
    public Map<String, String> getHoodieParams() {
        optParams.put("hoodie.datasource.read.incr.operation", "true");
        optParams.put("hoodie.datasource.read.begin.instanttime", startTs);
        optParams.put("hoodie.datasource.read.end.instanttime", endTs);
        optParams.put("hoodie.datasource.read.incr.includeStartTime", includeStartTime ? "true" : "false");
        return optParams;
    }

    @Override
    public boolean fallbackFullTableScan() {
        return fullTableScan;
    }

    @Override
    public boolean isIncludeStartTime() {
        return includeStartTime;
    }

    @Override
    public String getStartTs() {
        return startTs;
    }

    @Override
    public String getEndTs() {
        return endTs;
    }
}