LocalParallelPlanningSplitProducer.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.iceberg.source;

import org.apache.doris.common.UserException;
import org.apache.doris.common.security.authentication.ExecutionAuthenticator;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.spi.Split;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
 * Local FE async planning producer that reuses current Iceberg planning flow.
 */
public class LocalParallelPlanningSplitProducer implements PlanningSplitProducer {
    private static final Logger LOG = LogManager.getLogger(LocalParallelPlanningSplitProducer.class);

    /**
     * Runtime context required by local split planning.
     */
    public interface PlanningContext {
        boolean isBatchMode();

        int numApproximateSplits();

        TableScan createTableScan() throws UserException;

        CloseableIterable<FileScanTask> planFileScanTask(TableScan scan);

        Split createSplit(FileScanTask task);

        void recordManifestCacheProfile();

        Optional<NotSupportedException> checkNotSupportedException(Exception e);

        ExecutionAuthenticator getExecutionAuthenticator();

        Executor getScheduleExecutor();
    }

    private final PlanningContext context;
    private final AtomicBoolean stopped = new AtomicBoolean(false);
    private final AtomicReference<CompletableFuture<Void>> runningTask = new AtomicReference<>();

    public LocalParallelPlanningSplitProducer(PlanningContext context) {
        this.context = Preconditions.checkNotNull(context, "planningContext is null");
    }

    @Override
    public boolean isBatchMode() {
        return context.isBatchMode();
    }

    @Override
    public int numApproximateSplits() {
        return context.numApproximateSplits();
    }

    @Override
    public void start(int numBackends, SplitSink sink) throws UserException {
        stopped.set(false);
        final TableScan scan;
        try {
            scan = context.getExecutionAuthenticator().execute(() -> context.createTableScan());
        } catch (Exception e) {
            throw new UserException(e.getMessage(), e);
        }
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            AtomicReference<CloseableIterable<FileScanTask>> taskRef = new AtomicReference<>();
            try {
                context.getExecutionAuthenticator().execute(() -> {
                    CloseableIterable<FileScanTask> fileScanTasks = context.planFileScanTask(scan);
                    taskRef.set(fileScanTasks);
                    CloseableIterator<FileScanTask> iterator = fileScanTasks.iterator();
                    while (!stopped.get() && sink.needMore() && iterator.hasNext()) {
                        sink.addBatch(Lists.newArrayList(context.createSplit(iterator.next())));
                    }
                    return null;
                });
                sink.finish();
                context.recordManifestCacheProfile();
            } catch (Exception e) {
                if (stopped.get()) {
                    LOG.debug("Iceberg local split planning is stopped");
                    return;
                }
                Optional<NotSupportedException> opt = context.checkNotSupportedException(e);
                if (opt.isPresent()) {
                    sink.fail(new UserException(opt.get().getMessage(), opt.get()));
                } else {
                    sink.fail(new UserException(e.getMessage(), e));
                }
            } finally {
                closeTaskIterable(taskRef.get());
            }
        }, context.getScheduleExecutor());
        runningTask.set(future);
    }

    @Override
    public void stop() {
        if (!stopped.compareAndSet(false, true)) {
            return;
        }
        CompletableFuture<Void> task = runningTask.get();
        if (task != null) {
            task.cancel(true);
        }
    }

    private void closeTaskIterable(CloseableIterable<FileScanTask> tasks) {
        if (tasks == null) {
            return;
        }
        try {
            tasks.close();
        } catch (IOException e) {
            LOG.warn("close file scan task iterable failed: {}", e.getMessage(), e);
        }
    }
}