PaimonTransaction.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.paimon;

import org.apache.doris.common.UserException;
import org.apache.doris.common.security.authentication.ExecutionAuthenticator;
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TCommitMessage;
import org.apache.doris.transaction.Transaction;

import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.io.DataInputDeserializer;
import org.apache.paimon.table.InnerTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageSerializer;
import org.apache.paimon.table.sink.StreamTableCommit;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

public class PaimonTransaction implements Transaction {
    private static final Logger LOG = LogManager.getLogger(PaimonTransaction.class);

    private final PaimonMetadataOps ops;
    private PaimonExternalTable table;
    private long transactionId = -1L;
    private String commitUser = "";
    private String hadoopUser = "";

    private final List<TCommitMessage> commitMessages = Lists.newArrayList();
    private final Set<String> commitPayloadSet = new HashSet<>();

    public PaimonTransaction(PaimonMetadataOps ops) {
        this.ops = ops;
    }

    public void updateCommitMessages(List<TCommitMessage> messages) {
        if (messages == null || messages.isEmpty()) {
            return;
        }
        synchronized (this) {
            for (TCommitMessage message : messages) {
                if (message == null || !message.isSetPayload()) {
                    continue;
                }
                byte[] payload = message.getPayload();
                if (payload == null || payload.length == 0) {
                    continue;
                }
                String key = Base64.getEncoder().encodeToString(payload);
                if (commitPayloadSet.add(key)) {
                    commitMessages.add(message);
                }
            }
        }
    }

    public void beginInsert(PaimonExternalTable table, Optional<InsertCommandContext> insertCtx) {
        this.table = table;
        ConnectContext ctx = ConnectContext.get();
        if (ctx == null) {
            return;
        }
        String user = ctx.getQualifiedUser();
        int atPos = user.indexOf('@');
        if (atPos > 0) {
            user = user.substring(0, atPos);
        }
        this.hadoopUser = user;
    }

    public void finishInsert(PaimonExternalTable table, Optional<InsertCommandContext> insertCtx) {
    }

    @Override
    public void commit() throws UserException {
        List<TCommitMessage> rawMessages;
        synchronized (this) {
            rawMessages = Lists.newArrayList(commitMessages);
        }
        if (rawMessages.isEmpty()) {
            return;
        }
        if (table == null) {
            throw new UserException("Missing paimon table for transaction");
        }
        if (transactionId <= 0) {
            throw new UserException("Missing transaction id for paimon commit");
        }

        try {
            ExecutionAuthenticator authenticator = ops.dorisCatalog.getExecutionAuthenticator();
            authenticator.execute(() -> {
                org.apache.paimon.table.Table paimonTable =
                        table.getPaimonTable(MvccUtil.getSnapshotFromContext(table));
                if (!(paimonTable instanceof InnerTable)) {
                    throw new RuntimeException("Paimon table does not support commit: " + paimonTable.getClass());
                }

                List<CommitMessage> allMessages = new ArrayList<>();
                for (TCommitMessage msg : rawMessages) {
                    if (msg == null || !msg.isSetPayload()) {
                        continue;
                    }
                    byte[] payload = msg.getPayload();
                    if (payload == null || payload.length == 0) {
                        continue;
                    }
                    allMessages.addAll(deserializeCommitMessagePayload(payload));
                }
                LOG.info("paimon: rawMessages size={}, allMessages size={}", rawMessages.size(), allMessages.size());
                if (allMessages.isEmpty()) {
                    throw new RuntimeException("allMessages is empty! rawMessages size=" + rawMessages.size());
                }

                StreamTableCommit committer = ((InnerTable) paimonTable).newCommit(commitUser);
                try {
                    Map<Long, List<CommitMessage>> commitMap = new HashMap<>();
                    commitMap.put(transactionId, allMessages);
                    committer.filterAndCommit(commitMap);
                    return null;
                } finally {
                    committer.close();
                }
            });
        } catch (Exception e) {
            throw new UserException("Failed to commit paimon transaction on FE", e);
        }
    }

    @Override
    public void rollback() {
        LOG.info("Rollback PaimonTransaction for table {}", table == null ? "null" : table.getName());
    }

    public long getUpdateCnt() {
        return 0L;
    }

    public void setTransactionId(long transactionId) {
        this.transactionId = transactionId;
        this.commitUser = "doris_txn_" + transactionId;
    }

    public String getCommitUser() {
        return commitUser;
    }

    private static List<CommitMessage> deserializeCommitMessagePayload(byte[] payload) throws IOException {
        CommitMessageSerializer serializer = new CommitMessageSerializer();
        if (payload != null && payload.length >= 12
                && payload[0] == 'D' && payload[1] == 'P' && payload[2] == 'C' && payload[3] == 'M') {
            int version = ((payload[4] & 0xFF) << 24) | ((payload[5] & 0xFF) << 16)
                    | ((payload[6] & 0xFF) << 8) | (payload[7] & 0xFF);
            int len = ((payload[8] & 0xFF) << 24) | ((payload[9] & 0xFF) << 16)
                    | ((payload[10] & 0xFF) << 8) | (payload[11] & 0xFF);
            if (len >= 0 && payload.length >= 12 + len) {
                byte[] raw = new byte[len];
                System.arraycopy(payload, 12, raw, 0, len);
                try {
                    List<CommitMessage> msgs = serializer.deserializeList(version, new DataInputDeserializer(raw));
                    if (msgs != null) {
                        return msgs;
                    } else {
                        LOG.warn("paimon: deserialized msg list is null");
                    }
                } catch (Exception e) {
                    LOG.debug("Deserialize paimon commit message failed for header version {}", version, e);
                }
            } else {
                LOG.warn("paimon: payload version or length mismatch: version={}, len={}, payload.length={}",
                        version, len, payload.length);
            }
        } else {
            LOG.warn("paimon: payload header mismatch or too short: length={}, payload={}",
                    payload == null ? 0 : payload.length, java.util.Arrays.toString(payload));
        }

        int[] candidateVersions = new int[] {11, 10, 9, 8, 7, 6, 5, 4, 3};
        Exception last = null;
        for (int v : candidateVersions) {
            try {
                return serializer.deserializeList(v, new DataInputDeserializer(payload));
            } catch (Exception e) {
                last = e;
            }
        }
        IOException ioe =
                new IOException("Failed to deserialize paimon commit message payload for all candidate versions");
        if (last != null) {
            ioe.addSuppressed(last);
        }
        throw ioe;
    }
}