IcebergMetricsReporter.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.iceberg.profile;

import org.apache.doris.common.profile.RuntimeProfile;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.qe.ConnectContext;

import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import org.apache.iceberg.metrics.CounterResult;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.metrics.MetricsReport;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.metrics.ScanMetricsResult;
import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.metrics.TimerResult;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

/**
 * MetricsReporter implementation that forwards Iceberg scan metrics into Doris
 * profiles.
 */
public class IcebergMetricsReporter implements MetricsReporter {

    private static final Pattern WHITESPACE = Pattern.compile("\\s+");

    @Override
    public void report(MetricsReport report) {
        if (!(report instanceof ScanReport)) {
            return;
        }

        SummaryProfile summaryProfile = SummaryProfile.getSummaryProfile(ConnectContext.get());
        if (summaryProfile == null) {
            return;
        }

        RuntimeProfile executionSummary = summaryProfile.getExecutionSummary();
        if (executionSummary == null) {
            return;
        }

        ScanReport scanReport = (ScanReport) report;
        ScanMetricsResult metrics = scanReport.scanMetrics();
        if (metrics == null) {
            return;
        }

        RuntimeProfile icebergGroup = executionSummary.getChildMap().get(SummaryProfile.ICEBERG_SCAN_METRICS);
        if (icebergGroup == null) {
            icebergGroup = new RuntimeProfile(SummaryProfile.ICEBERG_SCAN_METRICS);
            executionSummary.addChild(icebergGroup, true);
        }

        RuntimeProfile scanProfile = new RuntimeProfile(buildScanProfileName(scanReport));
        appendScanDetails(scanProfile, scanReport, metrics);
        icebergGroup.addChild(scanProfile, true);
    }

    private String sanitize(String value) {
        if (Strings.isNullOrEmpty(value)) {
            return "";
        }
        return WHITESPACE.matcher(value).replaceAll(" ").trim();
    }

    private String buildScanProfileName(ScanReport report) {
        return "Table Scan (" + report.tableName() + ")";
    }

    private void appendScanDetails(RuntimeProfile scanProfile, ScanReport report, ScanMetricsResult metrics) {
        scanProfile.addInfoString("table", report.tableName());
        scanProfile.addInfoString("snapshot", String.valueOf(report.snapshotId()));
        String filter = sanitize(report.filter() == null ? null : report.filter().toString());
        if (!Strings.isNullOrEmpty(filter)) {
            scanProfile.addInfoString("filter", filter);
        }
        if (!report.projectedFieldNames().isEmpty()) {
            scanProfile.addInfoString("columns", Joiner.on('|').join(report.projectedFieldNames()));
        }

        appendTimer(scanProfile, "planning", metrics.totalPlanningDuration());
        appendCounter(scanProfile, "data_files", metrics.resultDataFiles());
        appendCounter(scanProfile, "delete_files", metrics.resultDeleteFiles());
        appendCounter(scanProfile, "skipped_data_files", metrics.skippedDataFiles());
        appendCounter(scanProfile, "skipped_delete_files", metrics.skippedDeleteFiles());
        appendCounter(scanProfile, "total_size", metrics.totalFileSizeInBytes());
        appendCounter(scanProfile, "total_delete_size", metrics.totalDeleteFileSizeInBytes());
        appendCounter(scanProfile, "scanned_manifests", metrics.scannedDataManifests());
        appendCounter(scanProfile, "skipped_manifests", metrics.skippedDataManifests());
        appendCounter(scanProfile, "scanned_delete_manifests", metrics.scannedDeleteManifests());
        appendCounter(scanProfile, "skipped_delete_manifests", metrics.skippedDeleteManifests());
        appendCounter(scanProfile, "indexed_delete_files", metrics.indexedDeleteFiles());
        appendCounter(scanProfile, "equality_delete_files", metrics.equalityDeleteFiles());
        appendCounter(scanProfile, "positional_delete_files", metrics.positionalDeleteFiles());

        appendMetadata(scanProfile, report.metadata());
    }

    private void appendMetadata(RuntimeProfile scanProfile, Map<String, String> metadata) {
        if (metadata == null || metadata.isEmpty()) {
            return;
        }
        List<String> importantKeys = ImmutableList.of("scan-state", "scan-id");
        List<String> captured = new ArrayList<>();
        for (String key : importantKeys) {
            if (metadata.containsKey(key)) {
                captured.add(key + "=" + metadata.get(key));
            }
        }
        if (!captured.isEmpty()) {
            scanProfile.addInfoString("metadata", "{" + String.join(", ", captured) + "}");
        }
    }

    private void appendTimer(RuntimeProfile scanProfile, String name, TimerResult timerResult) {
        if (timerResult == null) {
            return;
        }
        scanProfile.addInfoString(name, formatTimer(timerResult));
    }

    private void appendCounter(RuntimeProfile scanProfile, String name, CounterResult counterResult) {
        if (counterResult == null) {
            return;
        }
        scanProfile.addInfoString(name, formatCounter(counterResult));
    }

    private String formatCounter(CounterResult counterResult) {
        long value = counterResult.value();
        if (counterResult.unit() == MetricsContext.Unit.BYTES) {
            return DebugUtil.printByteWithUnit(value);
        }
        return Long.toString(value);
    }

    private String formatTimer(TimerResult timerResult) {
        Duration duration = timerResult.totalDuration();
        long millis = duration.toMillis();
        String pretty = DebugUtil.getPrettyStringMs(millis);
        return pretty + " (" + timerResult.count() + " ops)";
    }
}