IcebergSplitPlanningSupport.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.iceberg.source;

import org.apache.doris.analysis.Expr;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.security.authentication.ExecutionAuthenticator;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.TableFormatType;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.thrift.TPushAggOp;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.iceberg.BaseFileScanTask;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.DeleteFileIndex;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestContent;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.InclusiveMetricsEvaluator;
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.util.ScanTaskUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

class IcebergSplitPlanningSupport {
    private static final Logger LOG = LogManager.getLogger(IcebergSplitPlanningSupport.class);
    private static final long COUNT_WITH_PARALLEL_SPLITS = 10000;

    private final IcebergScanNode scanNode;
    private final IcebergSource source;
    private final Table icebergTable;
    private final ExecutionAuthenticator preExecutionAuthenticator;
    private final SessionVariable sessionVariable;
    private final ScanContext scanContext;
    private final Map<StorageProperties.Type, StorageProperties> storagePropertiesMap;
    private final int formatVersion;
    private final boolean isPartitionedTable;

    private final List<String> pushdownIcebergPredicates = Lists.newArrayList();
    private final Map<PartitionData, Map<String, String>> partitionMapInfos = new HashMap<>();

    private TableScan icebergTableScan;
    private Boolean isBatchMode;
    private boolean tableLevelPushDownCount;
    private long countFromSnapshot;
    private long targetSplitSize;
    private long manifestCacheHits;
    private long manifestCacheMisses;
    private long manifestCacheFailures;

    private boolean locationPathCacheInitialized;
    private StorageProperties cachedStorageProperties;
    private String cachedSchema;
    private String cachedFsIdPrefix;
    private String cachedOriginalPathPrefix;
    private String cachedNormalizedPathPrefix;
    private String cachedFsIdentifier;

    IcebergSplitPlanningSupport(
            IcebergScanNode scanNode,
            IcebergSource source,
            Table icebergTable,
            ExecutionAuthenticator preExecutionAuthenticator,
            SessionVariable sessionVariable,
            ScanContext scanContext,
            Map<StorageProperties.Type, StorageProperties> storagePropertiesMap,
            int formatVersion,
            boolean isPartitionedTable) {
        this.scanNode = scanNode;
        this.source = source;
        this.icebergTable = icebergTable;
        this.preExecutionAuthenticator = preExecutionAuthenticator;
        this.sessionVariable = sessionVariable;
        this.scanContext = scanContext;
        this.storagePropertiesMap = storagePropertiesMap;
        this.formatVersion = formatVersion;
        this.isPartitionedTable = isPartitionedTable;
    }

    TableScan createTableScan() throws UserException {
        try {
            return preExecutionAuthenticator.execute(this::createTableScanInternal);
        } catch (Exception e) {
            throw new UserException(e.getMessage(), e);
        }
    }

    CloseableIterable<FileScanTask> planFileScanTask(TableScan scan) throws Exception {
        return preExecutionAuthenticator.execute(() -> planFileScanTaskInternal(scan));
    }

    Split createSplit(FileScanTask fileScanTask) {
        return createIcebergSplit(fileScanTask);
    }

    boolean isBatchMode() {
        Boolean cached = isBatchMode;
        if (cached != null) {
            return cached;
        }
        TPushAggOp aggOp = scanNode.getPushDownAggNoGroupingOp();
        if (aggOp.equals(TPushAggOp.COUNT)) {
            try {
                countFromSnapshot = loadCountFromSnapshot();
            } catch (UserException e) {
                throw new RuntimeException(e);
            }
            if (countFromSnapshot >= 0) {
                tableLevelPushDownCount = true;
                isBatchMode = false;
                return false;
            }
        }

        try {
            if (createTableScanInternal().snapshot() == null) {
                isBatchMode = false;
                return false;
            }
        } catch (UserException e) {
            throw new RuntimeException(e);
        }

        if (!sessionVariable.getEnableExternalTableBatchMode()) {
            isBatchMode = false;
            return false;
        }

        try {
            return preExecutionAuthenticator.execute(() -> {
                try (CloseableIterator<ManifestFile> matchingManifest =
                        org.apache.doris.datasource.iceberg.IcebergUtils.getMatchingManifest(
                                createTableScanInternal().snapshot().dataManifests(icebergTable.io()),
                                icebergTable.specs(),
                                createTableScanInternal().filter()).iterator()) {
                    int cnt = 0;
                    while (matchingManifest.hasNext()) {
                        ManifestFile next = matchingManifest.next();
                        cnt += next.addedFilesCount() + next.existingFilesCount();
                        if (cnt >= sessionVariable.getNumFilesInBatchMode()) {
                            isBatchMode = true;
                            return true;
                        }
                    }
                }
                isBatchMode = false;
                return false;
            });
        } catch (Exception e) {
            Optional<NotSupportedException> opt = checkNotSupportedException(e);
            if (opt.isPresent()) {
                throw opt.get();
            }
            throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e), e);
        }
    }

    int numApproximateSplits() {
        return partitionMapInfos.isEmpty() ? 1 : partitionMapInfos.size();
    }

    int getSelectedPartitionNum() {
        return partitionMapInfos.size();
    }

    boolean hasTableLevelPushDownCount() {
        return tableLevelPushDownCount;
    }

    long getCountFromSnapshot() {
        return countFromSnapshot;
    }

    List<String> getPushdownIcebergPredicates() {
        return pushdownIcebergPredicates;
    }

    long getManifestCacheHits() {
        return manifestCacheHits;
    }

    long getManifestCacheMisses() {
        return manifestCacheMisses;
    }

    long getManifestCacheFailures() {
        return manifestCacheFailures;
    }

    int getCountPushDownSplitCount(int numBackends) {
        Preconditions.checkState(tableLevelPushDownCount, "Table-level count pushdown is not enabled");
        return countFromSnapshot < COUNT_WITH_PARALLEL_SPLITS
                ? 1
                : sessionVariable.getParallelExecInstanceNum(scanContext.getClusterName()) * numBackends;
    }

    void recordManifestCacheProfile() {
        if (!org.apache.doris.datasource.iceberg.IcebergUtils.isManifestCacheEnabled(source.getCatalog())) {
            return;
        }
        SummaryProfile summaryProfile = SummaryProfile.getSummaryProfile(ConnectContext.get());
        if (summaryProfile == null || summaryProfile.getExecutionSummary() == null) {
            return;
        }
        summaryProfile.getExecutionSummary().addInfoString("Manifest Cache",
                String.format("hits=%d, misses=%d, failures=%d",
                        manifestCacheHits, manifestCacheMisses, manifestCacheFailures));
    }

    UserException translatePlanningException(Exception e) {
        Optional<NotSupportedException> opt = checkNotSupportedException(e);
        if (opt.isPresent()) {
            return new UserException(opt.get().getMessage(), opt.get());
        }
        if (e instanceof UserException) {
            return (UserException) e;
        }
        return new UserException(e.getMessage(), e);
    }

    private TableScan createTableScanInternal() throws UserException {
        if (icebergTableScan != null) {
            return icebergTableScan;
        }

        TableScan scan = icebergTable.newScan().metricsReporter(
                new org.apache.doris.datasource.iceberg.profile.IcebergMetricsReporter());

        org.apache.doris.datasource.iceberg.source.IcebergTableQueryInfo info = scanNode.getSpecifiedSnapshot();
        if (info != null) {
            if (info.getRef() != null) {
                scan = scan.useRef(info.getRef());
            } else {
                scan = scan.useSnapshot(info.getSnapshotId());
            }
        }

        List<Expression> expressions = new ArrayList<>();
        for (Expr conjunct : scanNode.getConjuncts()) {
            Expression expression = org.apache.doris.datasource.iceberg.IcebergUtils.convertToIcebergExpr(
                    conjunct, icebergTable.schema());
            if (expression != null) {
                expressions.add(expression);
            }
        }
        for (Expression predicate : expressions) {
            scan = scan.filter(predicate);
            pushdownIcebergPredicates.add(predicate.toString());
        }

        icebergTableScan = scan.planWith(source.getCatalog().getThreadPoolWithPreAuth());
        return icebergTableScan;
    }

    private CloseableIterable<FileScanTask> planFileScanTaskInternal(TableScan scan) {
        if (!org.apache.doris.datasource.iceberg.IcebergUtils.isManifestCacheEnabled(source.getCatalog())) {
            return splitFiles(scan);
        }
        try {
            return planFileScanTaskWithManifestCache(scan);
        } catch (Exception e) {
            manifestCacheFailures++;
            LOG.warn("Plan with manifest cache failed, fallback to original scan: " + e.getMessage(), e);
            return splitFiles(scan);
        }
    }

    private CloseableIterable<FileScanTask> splitFiles(TableScan scan) {
        if (sessionVariable.getFileSplitSize() > 0) {
            return TableScanUtil.splitFiles(scan.planFiles(), sessionVariable.getFileSplitSize());
        }
        if (isBatchMode()) {
            return TableScanUtil.splitFiles(scan.planFiles(), sessionVariable.getMaxSplitSize());
        }

        List<FileScanTask> fileScanTaskList = new ArrayList<>();
        try (CloseableIterable<FileScanTask> scanTasksIter = scan.planFiles()) {
            for (FileScanTask task : scanTasksIter) {
                fileScanTaskList.add(task);
            }
        } catch (Exception e) {
            throw new RuntimeException("Failed to materialize file scan tasks", e);
        }

        targetSplitSize = determineTargetFileSplitSize(fileScanTaskList);
        return TableScanUtil.splitFiles(CloseableIterable.withNoopClose(fileScanTaskList), targetSplitSize);
    }

    private long determineTargetFileSplitSize(Iterable<FileScanTask> tasks) {
        long result = sessionVariable.getMaxInitialSplitSize();
        long accumulatedTotalFileSize = 0;
        boolean exceedInitialThreshold = false;
        for (FileScanTask task : tasks) {
            accumulatedTotalFileSize += ScanTaskUtil.contentSizeInBytes(task.file());
            if (!exceedInitialThreshold && accumulatedTotalFileSize
                    >= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()) {
                exceedInitialThreshold = true;
            }
        }
        result = exceedInitialThreshold ? sessionVariable.getMaxSplitSize() : result;
        if (!isBatchMode()) {
            result = applyMaxFileSplitNumLimit(result, accumulatedTotalFileSize);
        }
        return result;
    }

    private long applyMaxFileSplitNumLimit(long targetFileSplitSize, long totalFileSize) {
        int maxFileSplitNum = sessionVariable.getMaxFileSplitNum();
        if (maxFileSplitNum <= 0 || totalFileSize <= 0) {
            return targetFileSplitSize;
        }
        long minSplitSizeForMaxNum = (totalFileSize + maxFileSplitNum - 1L) / (long) maxFileSplitNum;
        return Math.max(targetFileSplitSize, minSplitSizeForMaxNum);
    }

    private CloseableIterable<FileScanTask> planFileScanTaskWithManifestCache(TableScan scan) throws IOException {
        Snapshot snapshot = scan.snapshot();
        if (snapshot == null) {
            return CloseableIterable.withNoopClose(Collections.emptyList());
        }

        org.apache.doris.datasource.iceberg.IcebergExternalMetaCache cache =
                Env.getCurrentEnv().getExtMetaCacheMgr().iceberg(source.getCatalog().getId());
        if (!(source.getTargetTable() instanceof ExternalTable)) {
            throw new RuntimeException("Iceberg scan target table is not an external table");
        }
        ExternalTable targetExternalTable = (ExternalTable) source.getTargetTable();

        Expression filterExpr = scanNode.getConjuncts().stream()
                .map(conjunct -> org.apache.doris.datasource.iceberg.IcebergUtils.convertToIcebergExpr(
                        conjunct, icebergTable.schema()))
                .filter(Objects::nonNull)
                .reduce(Expressions.alwaysTrue(), Expressions::and);

        Map<Integer, PartitionSpec> specsById = icebergTable.specs();
        boolean caseSensitive = true;

        Map<Integer, ResidualEvaluator> residualEvaluators = new HashMap<>();
        specsById.forEach((id, spec) -> residualEvaluators.put(id,
                ResidualEvaluator.of(spec, filterExpr, caseSensitive)));

        InclusiveMetricsEvaluator metricsEvaluator =
                new InclusiveMetricsEvaluator(icebergTable.schema(), filterExpr, caseSensitive);

        List<DeleteFile> deleteFiles = new ArrayList<>();
        List<ManifestFile> deleteManifests = snapshot.deleteManifests(icebergTable.io());
        for (ManifestFile manifest : deleteManifests) {
            if (manifest.content() != ManifestContent.DELETES) {
                continue;
            }
            PartitionSpec spec = specsById.get(manifest.partitionSpecId());
            if (spec == null) {
                continue;
            }
            ManifestEvaluator evaluator =
                    ManifestEvaluator.forPartitionFilter(filterExpr, spec, caseSensitive);
            if (!evaluator.eval(manifest)) {
                continue;
            }
            org.apache.doris.datasource.iceberg.cache.ManifestCacheValue value =
                    org.apache.doris.datasource.iceberg.cache.IcebergManifestCacheLoader.loadDeleteFilesWithCache(
                            cache, targetExternalTable, manifest, icebergTable, this::recordManifestCacheAccess);
            deleteFiles.addAll(value.getDeleteFiles());
        }

        DeleteFileIndex deleteIndex = DeleteFileIndex.builderFor(deleteFiles)
                .specsById(specsById)
                .caseSensitive(caseSensitive)
                .build();

        List<FileScanTask> tasks = new ArrayList<>();
        try (CloseableIterable<ManifestFile> dataManifests =
                     org.apache.doris.datasource.iceberg.IcebergUtils.getMatchingManifest(
                             snapshot.dataManifests(icebergTable.io()), specsById, filterExpr)) {
            for (ManifestFile manifest : dataManifests) {
                if (manifest.content() != ManifestContent.DATA) {
                    continue;
                }
                PartitionSpec spec = specsById.get(manifest.partitionSpecId());
                if (spec == null) {
                    continue;
                }
                ResidualEvaluator residualEvaluator = residualEvaluators.get(manifest.partitionSpecId());
                if (residualEvaluator == null) {
                    continue;
                }

                org.apache.doris.datasource.iceberg.cache.ManifestCacheValue value =
                        org.apache.doris.datasource.iceberg.cache.IcebergManifestCacheLoader.loadDataFilesWithCache(
                                cache, targetExternalTable, manifest, icebergTable, this::recordManifestCacheAccess);

                for (org.apache.iceberg.DataFile dataFile : value.getDataFiles()) {
                    if (metricsEvaluator != null && !metricsEvaluator.eval(dataFile)) {
                        continue;
                    }
                    if (residualEvaluator.residualFor(dataFile.partition()).equals(Expressions.alwaysFalse())) {
                        continue;
                    }
                    List<DeleteFile> deletes = Arrays.asList(
                            deleteIndex.forDataFile(dataFile.dataSequenceNumber(), dataFile));

                    tasks.add(new BaseFileScanTask(
                            dataFile,
                            deletes.toArray(new DeleteFile[0]),
                            SchemaParser.toJson(icebergTable.schema()),
                            PartitionSpecParser.toJson(spec),
                            residualEvaluator));
                }
            }
        }

        targetSplitSize = determineTargetFileSplitSize(tasks);
        return TableScanUtil.splitFiles(CloseableIterable.withNoopClose(tasks), targetSplitSize);
    }

    private void initLocationPathCache(String samplePath) {
        try {
            LocationPath sampleLocationPath = LocationPath.of(samplePath, storagePropertiesMap);
            cachedStorageProperties = sampleLocationPath.getStorageProperties();
            cachedSchema = sampleLocationPath.getSchema();
            cachedFsIdentifier = sampleLocationPath.getFsIdentifier();

            int schemeEnd = cachedFsIdentifier.indexOf("://");
            if (schemeEnd > 0) {
                cachedFsIdPrefix = cachedFsIdentifier.substring(0, schemeEnd + 3);
            }

            String normalizedPath = sampleLocationPath.getNormalizedLocation();
            int lastSlashInOriginal = samplePath.lastIndexOf('/');
            int lastSlashInNormalized = normalizedPath.lastIndexOf('/');
            if (lastSlashInOriginal > 0 && lastSlashInNormalized > 0) {
                cachedOriginalPathPrefix = samplePath.substring(0, lastSlashInOriginal + 1);
                cachedNormalizedPathPrefix = normalizedPath.substring(0, lastSlashInNormalized + 1);
            }

            locationPathCacheInitialized = true;
        } catch (Exception e) {
            LOG.warn("Failed to initialize LocationPath cache, will use full parsing", e);
        }
    }

    private LocationPath createLocationPathWithCache(String path) {
        if (!locationPathCacheInitialized) {
            initLocationPathCache(path);
        }
        if (cachedOriginalPathPrefix != null && path.startsWith(cachedOriginalPathPrefix)) {
            String normalizedPath = cachedNormalizedPathPrefix + path.substring(cachedOriginalPathPrefix.length());
            return LocationPath.ofDirect(normalizedPath, cachedSchema, cachedFsIdentifier, cachedStorageProperties);
        }
        if (cachedStorageProperties != null) {
            return LocationPath.ofWithCache(path, cachedStorageProperties, cachedSchema, cachedFsIdPrefix);
        }
        return LocationPath.of(path, storagePropertiesMap);
    }

    private Split createIcebergSplit(FileScanTask fileScanTask) {
        String originalPath = fileScanTask.file().path().toString();
        LocationPath locationPath = createLocationPathWithCache(originalPath);
        IcebergSplit split = new IcebergSplit(
                locationPath,
                fileScanTask.start(),
                fileScanTask.length(),
                fileScanTask.file().fileSizeInBytes(),
                new String[0],
                formatVersion,
                storagePropertiesMap,
                new ArrayList<>(),
                originalPath);
        if (!fileScanTask.deletes().isEmpty()) {
            split.setDeleteFileFilters(getDeleteFileFilters(fileScanTask));
        }
        split.setTableFormatType(TableFormatType.ICEBERG);
        split.setTargetSplitSize(targetSplitSize);
        if (isPartitionedTable) {
            PartitionData partitionData = (PartitionData) fileScanTask.file().partition();
            if (sessionVariable.isEnableRuntimeFilterPartitionPrune()) {
                int specId = fileScanTask.file().specId();
                PartitionSpec partitionSpec = icebergTable.specs().get(specId);
                Preconditions.checkNotNull(partitionSpec, "Partition spec with specId %s not found for table %s",
                        specId, icebergTable.name());
                Map<String, String> partitionInfoMap = partitionMapInfos.computeIfAbsent(partitionData, k ->
                        org.apache.doris.datasource.iceberg.IcebergUtils.getPartitionInfoMap(
                                partitionData, partitionSpec, sessionVariable.getTimeZone()));
                if (partitionInfoMap != null) {
                    split.setIcebergPartitionValues(partitionInfoMap);
                }
            } else {
                partitionMapInfos.put(partitionData, null);
            }
        }
        return split;
    }

    private List<IcebergDeleteFileFilter> getDeleteFileFilters(FileScanTask spitTask) {
        List<IcebergDeleteFileFilter> filters = new ArrayList<>();
        for (DeleteFile delete : spitTask.deletes()) {
            if (delete.content() == FileContent.POSITION_DELETES) {
                filters.add(IcebergDeleteFileFilter.createPositionDelete(delete));
            } else if (delete.content() == FileContent.EQUALITY_DELETES) {
                filters.add(IcebergDeleteFileFilter.createEqualityDelete(
                        delete.path().toString(), delete.equalityFieldIds(),
                        delete.fileSizeInBytes(), delete.format()));
            } else {
                throw new IllegalStateException("Unknown delete content: " + delete.content());
            }
        }
        return filters;
    }

    void assignCountToSplits(List<Split> splits) {
        Preconditions.checkState(tableLevelPushDownCount, "Table-level count pushdown is not enabled");
        if (splits.isEmpty()) {
            return;
        }
        int size = splits.size();
        long countPerSplit = countFromSnapshot / size;
        for (int i = 0; i < size - 1; i++) {
            ((IcebergSplit) splits.get(i)).setTableLevelRowCount(countPerSplit);
        }
        ((IcebergSplit) splits.get(size - 1))
                .setTableLevelRowCount(countPerSplit + countFromSnapshot % size);
    }

    private void recordManifestCacheAccess(boolean cacheHit) {
        if (cacheHit) {
            manifestCacheHits++;
        } else {
            manifestCacheMisses++;
        }
    }

    List<FileScanTask> getFileScanTasksFromContext() {
        ConnectContext ctx = ConnectContext.get();
        Preconditions.checkNotNull(ctx);
        Preconditions.checkNotNull(ctx.getStatementContext());

        List<FileScanTask> tasks = ctx.getStatementContext().getAndClearIcebergRewriteFileScanTasks();
        if (tasks != null && !tasks.isEmpty()) {
            LOG.info("Retrieved {} file scan tasks from context for table {}",
                    tasks.size(), icebergTable.name());
            return new ArrayList<>(tasks);
        }
        return null;
    }

    private long loadCountFromSnapshot() throws UserException {
        IcebergTableQueryInfo info = scanNode.getSpecifiedSnapshot();
        Snapshot snapshot = info == null ? icebergTable.currentSnapshot() : icebergTable.snapshot(info.getSnapshotId());
        if (snapshot == null) {
            return 0;
        }

        Map<String, String> summary = snapshot.summary();
        if (!summary.get(org.apache.doris.datasource.iceberg.IcebergUtils.TOTAL_EQUALITY_DELETES).equals("0")) {
            return -1;
        }

        long deleteCount = Long.parseLong(summary.get(org.apache.doris.datasource.iceberg.IcebergUtils
                .TOTAL_POSITION_DELETES));
        if (deleteCount == 0) {
            return Long.parseLong(summary.get(org.apache.doris.datasource.iceberg.IcebergUtils.TOTAL_RECORDS));
        }
        if (sessionVariable.ignoreIcebergDanglingDelete) {
            return Long.parseLong(summary.get(org.apache.doris.datasource.iceberg.IcebergUtils.TOTAL_RECORDS))
                    - deleteCount;
        }
        return -1;
    }

    private Optional<NotSupportedException> checkNotSupportedException(Exception e) {
        if (e instanceof NullPointerException) {
            LOG.warn("Unable to plan for iceberg table {}", source.getTargetTable().getName(), e);
            return Optional.of(new NotSupportedException(
                    "Unable to plan iceberg table due to unsupported partition evolution/drop-column pattern"));
        }
        return Optional.empty();
    }
}