IcebergTableSink.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.planner;

import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.nereids.trees.plans.commands.insert.BaseExternalTableInsertCommandContext;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TIcebergTableSink;
import org.apache.doris.thrift.TSortField;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SortDirection;
import org.apache.iceberg.SortField;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.types.Types;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

public class IcebergTableSink extends BaseExternalTableDataSink {

    private final IcebergExternalTable targetTable;
    private static final HashSet<TFileFormatType> supportedTypes = new HashSet<TFileFormatType>() {{
            add(TFileFormatType.FORMAT_ORC);
            add(TFileFormatType.FORMAT_PARQUET);
        }};

    public IcebergTableSink(IcebergExternalTable targetTable) {
        super();
        this.targetTable = targetTable;
    }

    @Override
    protected Set<TFileFormatType> supportedFileFormatTypes() {
        return supportedTypes;
    }

    @Override
    public String getExplainString(String prefix, TExplainLevel explainLevel) {
        StringBuilder strBuilder = new StringBuilder();
        strBuilder.append(prefix).append("ICEBERG TABLE SINK\n");
        if (explainLevel == TExplainLevel.BRIEF) {
            return strBuilder.toString();
        }
        // TODO: explain partitions
        return strBuilder.toString();
    }

    @Override
    public void bindDataSink(Optional<InsertCommandContext> insertCtx)
            throws AnalysisException {

        TIcebergTableSink tSink = new TIcebergTableSink();

        Table icebergTable = targetTable.getIcebergTable();

        tSink.setDbName(targetTable.getDbName());
        tSink.setTbName(targetTable.getName());

        // schema
        tSink.setSchemaJson(SchemaParser.toJson(icebergTable.schema()));

        // partition spec
        if (icebergTable.spec().isPartitioned()) {
            tSink.setPartitionSpecsJson(Maps.transformValues(icebergTable.specs(), PartitionSpecParser::toJson));
            tSink.setPartitionSpecId(icebergTable.spec().specId());
        }

        // sort order
        if (icebergTable.sortOrder().isSorted()) {
            SortOrder sortOrder = icebergTable.sortOrder();
            Set<Integer> baseColumnFieldIds = icebergTable.schema().columns().stream()
                    .map(Types.NestedField::fieldId)
                    .collect(ImmutableSet.toImmutableSet());
            ImmutableList.Builder<TSortField> sortFields = ImmutableList.builder();
            for (SortField sortField : sortOrder.fields()) {
                if (!sortField.transform().isIdentity()) {
                    continue;
                }
                if (!baseColumnFieldIds.contains(sortField.sourceId())) {
                    continue;
                }
                TSortField tSortField = new TSortField();
                tSortField.setSourceColumnId(sortField.sourceId());
                tSortField.setAscending(sortField.direction().equals(SortDirection.ASC));
                tSortField.setNullFirst(sortField.nullOrder().equals(NullOrder.NULLS_FIRST));
                sortFields.add(tSortField);
            }
            tSink.setSortFields(sortFields.build());
        }

        // file info
        tSink.setFileFormat(getTFileFormatType(IcebergUtils.getFileFormat(icebergTable).name()));
        tSink.setCompressionType(getTFileCompressType(IcebergUtils.getFileCompress(icebergTable)));

        // hadoop config
        HashMap<String, String> props = new HashMap<>(icebergTable.properties());
        Map<String, String> catalogProps = targetTable.getCatalog().getProperties();
        props.putAll(catalogProps);
        tSink.setHadoopConfig(props);

        // location
        LocationPath locationPath = new LocationPath(IcebergUtils.dataLocation(icebergTable), catalogProps);
        tSink.setOutputPath(locationPath.toStorageLocation().toString());
        tSink.setOriginalOutputPath(locationPath.getPath().toString());
        TFileType fileType = locationPath.getTFileTypeForBE();
        tSink.setFileType(fileType);
        if (fileType.equals(TFileType.FILE_BROKER)) {
            tSink.setBrokerAddresses(getBrokerAddresses(targetTable.getCatalog().bindBrokerName()));
        }

        if (insertCtx.isPresent()) {
            BaseExternalTableInsertCommandContext context = (BaseExternalTableInsertCommandContext) insertCtx.get();
            tSink.setOverwrite(context.isOverwrite());
        }
        tDataSink = new TDataSink(TDataSinkType.ICEBERG_TABLE_SINK);
        tDataSink.setIcebergTableSink(tSink);
    }
}