MORIncrementalRelation.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.hudi.source;

import org.apache.doris.spi.Split;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.GlobPattern;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.storage.StoragePathInfo;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class MORIncrementalRelation implements IncrementalRelation {
    private final Map<String, String> optParams;
    private final HoodieTableMetaClient metaClient;
    private final HoodieTimeline timeline;
    private final HollowCommitHandling hollowCommitHandling;
    private String startTimestamp;
    private final String endTimestamp;
    private final boolean startInstantArchived;
    private final boolean endInstantArchived;
    private final List<HoodieInstant> includedCommits;
    private final List<HoodieCommitMetadata> commitsMetadata;
    private final List<StoragePathInfo> affectedFilesInCommits;
    private final boolean fullTableScan;
    private final String globPattern;
    private final boolean includeStartTime;
    private final String startTs;
    private final String endTs;


    public MORIncrementalRelation(Map<String, String> optParams, Configuration configuration,
            HoodieTableMetaClient metaClient)
            throws HoodieException, IOException {
        this.optParams = optParams;
        this.metaClient = metaClient;
        timeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
        if (timeline.empty()) {
            throw new HoodieException("No instants to incrementally pull");
        }
        if (!metaClient.getTableConfig().populateMetaFields()) {
            throw new HoodieException("Incremental queries are not supported when meta fields are disabled");
        }
        hollowCommitHandling = HollowCommitHandling.valueOf(
                optParams.getOrDefault("hoodie.read.timeline.holes.resolution.policy", "FAIL"));

        startTimestamp = optParams.get("hoodie.datasource.read.begin.instanttime");
        if (startTimestamp == null) {
            throw new HoodieException("Specify the begin instant time to pull from using "
                    + "option hoodie.datasource.read.begin.instanttime");
        }
        if (EARLIEST_TIME.equals(startTimestamp)) {
            startTimestamp = "000";
        }
        endTimestamp = optParams.getOrDefault("hoodie.datasource.read.end.instanttime",
                hollowCommitHandling == HollowCommitHandling.USE_TRANSITION_TIME
                        ? timeline.lastInstant().get().getStateTransitionTime()
                        : timeline.lastInstant().get().getTimestamp());

        startInstantArchived = timeline.isBeforeTimelineStarts(startTimestamp);
        endInstantArchived = timeline.isBeforeTimelineStarts(endTimestamp);

        includedCommits = getIncludedCommits();
        commitsMetadata = getCommitsMetadata();
        affectedFilesInCommits = HoodieInputFormatUtils.listAffectedFilesForCommits(configuration,
                metaClient.getBasePathV2(), commitsMetadata);
        fullTableScan = shouldFullTableScan();
        if (hollowCommitHandling == HollowCommitHandling.USE_TRANSITION_TIME && fullTableScan) {
            throw new HoodieException("Cannot use stateTransitionTime while enables full table scan");
        }
        globPattern = optParams.getOrDefault("hoodie.datasource.read.incr.path.glob", "");

        if (startInstantArchived) {
            includeStartTime = false;
            startTs = startTimestamp;
        } else {
            includeStartTime = true;
            startTs = includedCommits.isEmpty() ? startTimestamp : includedCommits.get(0).getTimestamp();
        }
        endTs = endInstantArchived || includedCommits.isEmpty() ? endTimestamp
                : includedCommits.get(includedCommits.size() - 1).getTimestamp();
    }

    @Override
    public Map<String, String> getHoodieParams() {
        optParams.put("hoodie.datasource.read.incr.operation", "true");
        optParams.put("hoodie.datasource.read.begin.instanttime", startTs);
        optParams.put("hoodie.datasource.read.end.instanttime", endTs);
        optParams.put("hoodie.datasource.read.incr.includeStartTime", includeStartTime ? "true" : "false");
        return optParams;
    }

    private List<HoodieInstant> getIncludedCommits() {
        if (!startInstantArchived || !endInstantArchived) {
            // If endTimestamp commit is not archived, will filter instants
            // before endTimestamp.
            if (hollowCommitHandling == HollowCommitHandling.USE_TRANSITION_TIME) {
                return timeline.findInstantsInRangeByStateTransitionTime(startTimestamp, endTimestamp).getInstants();
            } else {
                return timeline.findInstantsInRange(startTimestamp, endTimestamp).getInstants();
            }
        } else {
            return timeline.getInstants();
        }
    }

    private List<HoodieCommitMetadata> getCommitsMetadata() throws IOException {
        List<HoodieCommitMetadata> result = new ArrayList<>();
        for (HoodieInstant commit : includedCommits) {
            result.add(TimelineUtils.getCommitMetadata(commit, timeline));
        }
        return result;
    }

    private boolean shouldFullTableScan() throws IOException {
        boolean should = Boolean.parseBoolean(
                optParams.getOrDefault("hoodie.datasource.read.incr.fallback.fulltablescan.enable", "false")) && (
                startInstantArchived || endInstantArchived);
        if (should) {
            return true;
        }
        for (StoragePathInfo fileStatus : affectedFilesInCommits) {
            if (!metaClient.getRawHoodieStorage().exists(fileStatus.getPath())) {
                return true;
            }
        }
        return false;
    }

    @Override
    public boolean fallbackFullTableScan() {
        return fullTableScan;
    }

    @Override
    public boolean isIncludeStartTime() {
        return includeStartTime;
    }

    @Override
    public String getStartTs() {
        return startTs;
    }

    @Override
    public String getEndTs() {
        return endTs;
    }

    @Override
    public List<FileSlice> collectFileSlices() throws HoodieException {
        if (includedCommits.isEmpty()) {
            return Collections.emptyList();
        } else if (fullTableScan) {
            throw new HoodieException("Fallback to full table scan");
        }
        HoodieTimeline scanTimeline;
        if (hollowCommitHandling == HollowCommitHandling.USE_TRANSITION_TIME) {
            scanTimeline = metaClient.getCommitsAndCompactionTimeline()
                    .findInstantsInRangeByStateTransitionTime(startTimestamp, endTimestamp);
        } else {
            scanTimeline = TimelineUtils.handleHollowCommitIfNeeded(
                            metaClient.getCommitsAndCompactionTimeline(), metaClient, hollowCommitHandling)
                    .findInstantsInRange(startTimestamp, endTimestamp);
        }
        String latestCommit = includedCommits.get(includedCommits.size() - 1).getTimestamp();
        HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, scanTimeline,
                affectedFilesInCommits);
        Stream<FileSlice> fileSlices = HoodieTableMetadataUtil.getWritePartitionPaths(commitsMetadata)
                .stream().flatMap(relativePartitionPath ->
                        fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit));
        if ("".equals(globPattern)) {
            return fileSlices.collect(Collectors.toList());
        }
        GlobPattern globMatcher = new GlobPattern("*" + globPattern);
        return fileSlices.filter(fileSlice -> globMatcher.matches(fileSlice.getBaseFile().map(BaseFile::getPath)
                .or(fileSlice.getLatestLogFile().map(f -> f.getPath().toString())).get())).collect(Collectors.toList());
    }

    @Override
    public List<Split> collectSplits() throws HoodieException {
        throw new UnsupportedOperationException();
    }
}