BinlogGcer.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.binlog;

import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.persist.BinlogGcInfo;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.BinlogGcTask;

import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;

public class BinlogGcer extends MasterDaemon {
    private static final Logger LOG = LogManager.getLogger(BinlogGcer.class);
    private static final long GC_DURATION_MS = 15 * 1000L; // 15s

    // TODO(Drogon): use this to control gc frequency by real gc time waste sample
    private long lastGcTime = 0L;

    public BinlogGcer() {
        super("binlog-gcer", GC_DURATION_MS);
    }

    @Override
    protected void runAfterCatalogReady() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("start binlog syncer jobs.");
        }
        try {
            List<BinlogTombstone> tombstones = Env.getCurrentEnv().getBinlogManager().gc();
            if (tombstones != null && !tombstones.isEmpty()) {
                LOG.info("tombstones size: {}", tombstones.size());
            } else {
                LOG.info("no gc binlog");
                return;
            }

            try {
                sendGcInfoToBe(tombstones);
            } catch (Throwable e) {
                // TODO(Drogon): retry
                // if send gc info to be failed, next gc depend on gc duration
                LOG.warn("Failed to send gc info to be", e);
            }

            for (BinlogTombstone tombstone : tombstones) {
                tombstone.clearTableVersionMap();
            }
            BinlogGcInfo info = new BinlogGcInfo(tombstones);
            Env.getCurrentEnv().getEditLog().logGcBinlog(info);
        } catch (Throwable e) {
            LOG.warn("Failed to process one round of BinlogGcer", e);
        }
    }

    private void sendGcInfoToBe(List<BinlogTombstone> tombstones) {
        if (tombstones == null || tombstones.isEmpty()) {
            return;
        }

        Map<Long, BinlogGcTask> beBinlogGcTaskMap = Maps.newHashMap();
        for (BinlogTombstone tombstone : tombstones) {
            sendDbGcInfoToBe(beBinlogGcTaskMap, tombstone);
        }

        if (beBinlogGcTaskMap.isEmpty()) {
            return;
        }

        AgentBatchTask batchTask = new AgentBatchTask();
        for (BinlogGcTask task : beBinlogGcTaskMap.values()) {
            batchTask.addTask(task);
        }
        AgentTaskExecutor.submit(batchTask);
    }

    private void sendDbGcInfoToBe(Map<Long, BinlogGcTask> beBinlogGcTaskMap, BinlogTombstone tombstone) {
        long dbId = tombstone.getDbId();
        Database db = Env.getCurrentEnv().getInternalCatalog().getDbNullable(dbId);
        if (db == null) {
            LOG.warn("db {} does not exist", dbId);
            return;
        }

        Map<Long, UpsertRecord.TableRecord> tableVersionMap = tombstone.getTableVersionMap();
        for (Map.Entry<Long, UpsertRecord.TableRecord> entry : tableVersionMap.entrySet()) {
            long tableId = entry.getKey();

            OlapTable table = null;
            try {
                Table tbl = db.getTableOrMetaException(tableId);
                if (tbl == null) {
                    LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId);
                    continue;
                }
                if (!(tbl instanceof OlapTable)) {
                    LOG.warn("table is not olap table. db: {}, table id: {}", db.getFullName(), tableId);
                    continue;
                }
                table = (OlapTable) tbl;
            } catch (Exception e) {
                LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId);
                continue;
            }

            UpsertRecord.TableRecord record = entry.getValue();
            sendTableGcInfoToBe(beBinlogGcTaskMap, table, record);
        }
    }

    private void sendTableGcInfoToBe(Map<Long, BinlogGcTask> beBinlogGcTaskMap, OlapTable olapTable,
            UpsertRecord.TableRecord tableRecord) {

        olapTable.readLock();
        try {
            for (UpsertRecord.TableRecord.PartitionRecord partitionRecord : tableRecord.getPartitionRecords()) {
                long partitionId = partitionRecord.partitionId;
                Partition partition = olapTable.getPartition(partitionId);
                if (partition == null) {
                    LOG.warn("fail to get partition. table: {}, partition id: {}", olapTable.getName(), partitionId);
                    continue;
                }

                long version = partitionRecord.version;

                List<MaterializedIndex> indexes = partition.getMaterializedIndices(IndexExtState.VISIBLE);
                for (MaterializedIndex index : indexes) {
                    List<Tablet> tablets = index.getTablets();
                    for (Tablet tablet : tablets) {
                        List<Replica> replicas = tablet.getReplicas();
                        for (Replica replica : replicas) {
                            long beId = replica.getBackendIdWithoutException();
                            long signature = -1;
                            BinlogGcTask binlogGcTask = null;
                            if (beBinlogGcTaskMap.containsKey(beId)) {
                                binlogGcTask = beBinlogGcTaskMap.get(beId);
                            } else {
                                binlogGcTask = new BinlogGcTask(beId, signature);
                                beBinlogGcTaskMap.put(beId, binlogGcTask);
                            }

                            binlogGcTask.addTask(tablet.getId(), version);
                        }
                    }
                }
            }
        } finally {
            olapTable.readUnlock();
        }
    }
}