DeleteHandler.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.load;

import org.apache.doris.analysis.DeleteStmt;
import org.apache.doris.analysis.Predicate;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class DeleteHandler implements Writable {
    private static final Logger LOG = LogManager.getLogger(DeleteHandler.class);

    // TransactionId -> DeleteJob
    private final Map<Long, DeleteJob> idToDeleteJob;

    // Db -> DeleteInfo list
    @SerializedName(value = "dbToDeleteInfos")
    private final Map<Long, List<DeleteInfo>> dbToDeleteInfos;

    private final ReentrantReadWriteLock lock;

    public DeleteHandler() {
        idToDeleteJob = Maps.newConcurrentMap();
        dbToDeleteInfos = Maps.newConcurrentMap();
        lock = new ReentrantReadWriteLock();
    }

    public void readLock() {
        lock.readLock().lock();
    }

    public void readUnlock() {
        lock.readLock().unlock();
    }

    private void writeLock() {
        lock.writeLock().lock();
    }

    private void writeUnlock() {
        lock.writeLock().unlock();
    }

    /**
     * use for Nereids process empty relation
     */
    public void processEmptyRelation(QueryState execState) {
        String sb = "{'label':'" + DeleteJob.DELETE_PREFIX + UUID.randomUUID()
                + "', 'txnId':'" + -1
                + "', 'status':'" + TransactionStatus.VISIBLE.name() + "'}";
        execState.setOk(0, 0, sb);
    }

    /**
     * used for Nereids planner
     */
    public void process(Database targetDb, OlapTable targetTbl, List<Partition> selectedPartitions,
            List<Predicate> deleteConditions, QueryState execState, List<String> partitionNames) {
        DeleteJob deleteJob = null;
        try {
            targetTbl.readLock();
            try {
                if (targetTbl.getState() != OlapTable.OlapTableState.NORMAL) {
                    // table under alter operation can also do delete.
                    // just add a comment here to notice.
                }
                deleteJob = DeleteJob.newBuilder()
                        .buildWithNereids(new DeleteJob.BuildParams(
                                targetDb,
                                targetTbl,
                                partitionNames,
                                selectedPartitions,
                                deleteConditions));

                long txnId = deleteJob.beginTxn();
                TransactionState txnState = Env.getCurrentGlobalTransactionMgr()
                        .getTransactionState(targetDb.getId(), txnId);
                // must call this to make sure we only handle the tablet in the mIndex we saw here.
                // table may be under schema change or rollup, and the newly created tablets will not be checked later,
                // to make sure that the delete transaction can be done successfully.
                deleteJob.addTableIndexes(txnState);
                idToDeleteJob.put(txnId, deleteJob);
                deleteJob.dispatch();
            } finally {
                targetTbl.readUnlock();
            }
            deleteJob.await();
            String commitMsg = deleteJob.commit();
            execState.setOk(0, 0, commitMsg);
        } catch (Exception ex) {
            if (deleteJob != null) {
                deleteJob.cancel(ex.getMessage());
            }
            execState.setError(ex.getMessage());
        } finally {
            if (!FeConstants.runningUnitTest) {
                clearJob(deleteJob);
            }
        }
    }

    /**
     * used for legacy planner
     */
    public void process(DeleteStmt stmt, QueryState execState) throws DdlException {
        Database targetDb = Env.getCurrentInternalCatalog().getDbOrDdlException(stmt.getDbName());
        OlapTable targetTbl = targetDb.getOlapTableOrDdlException(stmt.getTableName());
        DeleteJob deleteJob = null;
        try {
            targetTbl.readLock();
            try {
                if (targetTbl.getState() != OlapTable.OlapTableState.NORMAL) {
                    // table under alter operation can also do delete.
                    // just add a comment here to notice.
                }
                deleteJob = DeleteJob.newBuilder()
                        .buildWith(new DeleteJob.BuildParams(
                                targetDb,
                                targetTbl,
                                stmt.getPartitionNames(),
                                stmt.getDeleteConditions()));

                long txnId = deleteJob.beginTxn();
                TransactionState txnState = Env.getCurrentGlobalTransactionMgr()
                        .getTransactionState(targetDb.getId(), txnId);
                // must call this to make sure we only handle the tablet in the mIndex we saw here.
                // table may be under schema change or rollup, and the newly created tablets will not be checked later,
                // to make sure that the delete transaction can be done successfully.
                txnState.addTableIndexes(targetTbl);
                idToDeleteJob.put(txnId, deleteJob);
                deleteJob.dispatch();
            } finally {
                targetTbl.readUnlock();
            }
            deleteJob.await();
            String commitMsg = deleteJob.commit();
            execState.setOk(0, 0, commitMsg);
        } catch (Exception ex) {
            if (deleteJob != null) {
                deleteJob.cancel(ex.getMessage());
            }
            execState.setError(ex.getMessage());
        } finally {
            if (!FeConstants.runningUnitTest) {
                clearJob(deleteJob);
            }
        }
    }

    /**
     * This method should always be called in the end of the delete process to clean the job.
     * Better put it in finally block.
     *
     * @param job
     */
    private void clearJob(DeleteJob job) {
        if (job == null) {
            return;
        }
        long signature = job.getTransactionId();
        idToDeleteJob.remove(signature);
        job.cleanUp();
        // do not remove callback from GlobalTransactionMgr's callback factory here.
        // the callback will be removed after transaction is aborted or visible.
    }

    public void recordFinishedJob(DeleteJob job) {
        if (job != null) {
            long dbId = job.getDeleteInfo().getDbId();
            LOG.info("record finished deleteJob, transactionId {}, dbId {}",
                    job.getTransactionId(), dbId);
            dbToDeleteInfos.putIfAbsent(dbId, Lists.newArrayList());
            List<DeleteInfo> deleteInfoList = dbToDeleteInfos.get(dbId);
            writeLock();
            try {
                deleteInfoList.add(job.getDeleteInfo());
            } finally {
                writeUnlock();
            }
        }
    }

    public DeleteJob getDeleteJob(long transactionId) {
        return idToDeleteJob.get(transactionId);
    }

    // show delete stmt
    public List<List<Comparable>> getDeleteInfosByDb(long dbId) {
        LinkedList<List<Comparable>> infos = new LinkedList<>();
        Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
        if (db == null) {
            return infos;
        }

        String dbName = db.getFullName();
        List<DeleteInfo> deleteInfoList = new ArrayList<>();
        if (dbId == -1) {
            for (Long tempDbId : dbToDeleteInfos.keySet()) {
                if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(),
                        InternalCatalog.INTERNAL_CATALOG_NAME,
                        Env.getCurrentEnv().getCatalogMgr().getDbNullable(tempDbId).getFullName(),
                        PrivPredicate.LOAD)) {
                    continue;
                }

                deleteInfoList.addAll(dbToDeleteInfos.get(tempDbId));
            }
        } else {
            deleteInfoList = dbToDeleteInfos.get(dbId);
        }

        readLock();
        try {
            if (deleteInfoList == null) {
                return infos;
            }

            for (DeleteInfo deleteInfo : deleteInfoList) {
                String tableName = deleteInfo.getTableName();
                if (!Env.getCurrentEnv().getAccessManager()
                        .checkTblPriv(ConnectContext.get(), InternalCatalog.INTERNAL_CATALOG_NAME, dbName,
                            tableName, PrivPredicate.LOAD)) {
                    continue;
                }

                List<Comparable> info = Lists.newArrayList();
                if (Util.isTempTable(tableName)) {
                    if (!Util.isTempTableInCurrentSession(tableName)) {
                        continue;
                    }
                    info.add(Util.getTempTableDisplayName(tableName));
                } else {
                    info.add(deleteInfo.getTableName());
                }

                if (deleteInfo.isNoPartitionSpecified()) {
                    info.add("*");
                } else {
                    info.add(Joiner.on(", ").join(deleteInfo.getPartitionNames()));
                }

                info.add(TimeUtils.longToTimeString(deleteInfo.getCreateTimeMs()));
                String conds = Joiner.on(", ").join(deleteInfo.getDeleteConditions());
                info.add(conds);

                info.add("FINISHED");
                infos.add(info);
            }
        } finally {
            readUnlock();
        }

        // sort by createTimeMs
        ListComparator<List<Comparable>> comparator = new ListComparator<>(2);
        infos.sort(comparator);
        return infos;
    }

    public void replayDelete(DeleteInfo deleteInfo, Env env) {
        // add to deleteInfos
        long dbId = deleteInfo.getDbId();
        LOG.info("replay delete, dbId {}", dbId);
        dbToDeleteInfos.putIfAbsent(dbId, Lists.newArrayList());
        List<DeleteInfo> deleteInfoList = dbToDeleteInfos.get(dbId);
        writeLock();
        try {
            deleteInfoList.add(deleteInfo);
        } finally {
            writeUnlock();
        }
    }

    // for delete handler, we only persist those delete already finished.
    @Override
    public void write(DataOutput out) throws IOException {
        removeOldDeleteInfos();
    }

    public static DeleteHandler read(DataInput in) throws IOException {
        String json = Text.readString(in);
        DeleteHandler deleteHandler = GsonUtils.GSON.fromJson(json, DeleteHandler.class);
        deleteHandler.removeOldDeleteInfos();
        return deleteHandler;
    }

    public void removeOldDeleteInfos() {
        long curTime = System.currentTimeMillis();
        int counter = 0;
        Iterator<Entry<Long, List<DeleteInfo>>> iter1 = dbToDeleteInfos.entrySet().iterator();
        while (iter1.hasNext()) {
            List<DeleteInfo> deleteInfoList = iter1.next().getValue();

            writeLock();
            try {
                Iterator<DeleteInfo> iter2 = deleteInfoList.iterator();
                while (iter2.hasNext()) {
                    DeleteInfo deleteInfo = iter2.next();
                    if ((curTime - deleteInfo.getCreateTimeMs()) / 1000
                            > Config.streaming_label_keep_max_second) {
                        iter2.remove();
                        ++counter;
                    }
                }
            } finally {
                writeUnlock();
            }

            if (deleteInfoList.isEmpty()) {
                iter1.remove();
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("remove expired delete job info num: {}", counter);
        }
    }
}