TableStreamManager.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog.stream;

import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.persist.PruneTableStreamPartitionOffsetInfo;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;

import com.google.common.base.Preconditions;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;

public class TableStreamManager extends MasterDaemon implements Writable, GsonPostProcessable {
    private static final Logger LOG = LogManager.getLogger(TableStreamManager.class);
    @SerializedName(value = "dbStreamMap")
    private Map<Long, Set<Long>> dbStreamMap;
    protected MonitoredReentrantReadWriteLock rwLock;

    public TableStreamManager() {
        super("table-stream-cleanup", Config.table_stream_partition_offset_cleanup_interval_second * 1000L);
        this.rwLock = new MonitoredReentrantReadWriteLock(true);
        this.dbStreamMap = new HashMap<>();
    }

    @Override
    protected void runAfterCatalogReady() {
        if (Config.enable_table_stream) {
            cleanupStalePartitionOffsets();
        }
    }

    public void addTableStream(BaseTableStream stream) {
        rwLock.writeLock().lock();
        try {
            dbStreamMap.computeIfAbsent(stream.getDatabase().getId(), k -> new HashSet<>()).add(stream.getId());
        } finally {
            rwLock.writeLock().unlock();
        }
    }

    public void removeStaleDbAndStream(BaseTableStream stream) {
        rwLock.writeLock().lock();
        try {
            Optional.ofNullable(dbStreamMap.get(stream.getDatabase().getId()))
                    .ifPresent(set -> set.remove(stream.getId()));
        } finally {
            rwLock.writeLock().unlock();
        }
    }

    public void removeStaleDbAndStream(List<Long> staleDbIds, List<Pair<Long, Long>> staleStreamIds) {
        if (staleStreamIds.isEmpty() && staleDbIds.isEmpty()) {
            return;
        }
        rwLock.writeLock().lock();
        try {
            staleDbIds.forEach(dbId -> dbStreamMap.remove(dbId));
            staleStreamIds.forEach(
                    pair -> Optional.ofNullable(dbStreamMap.get(pair.first))
                            .ifPresent(set -> set.remove(pair.second))
            );
        } finally {
            rwLock.writeLock().unlock();
        }
    }

    @Override
    public void write(DataOutput out) throws IOException {
        String json = GsonUtils.GSON.toJson(this);
        Text.writeString(out, json);
    }

    public static TableStreamManager read(DataInput in) throws IOException {
        String json = Text.readString(in);
        return GsonUtils.GSON.fromJson(json, TableStreamManager.class);
    }

    public Set<Long> getTableStreamIds(DatabaseIf db) {
        Set<Long> result = new HashSet<>();
        rwLock.readLock().lock();
        try {
            result.addAll(dbStreamMap.getOrDefault(db.getId(), new HashSet<>()));
        } finally {
            rwLock.readLock().unlock();
        }
        return result;
    }

    public void cleanupStalePartitionOffsets() {
        List<Long> staleDbIds = new ArrayList<>();
        List<Pair<Long, Long>> staleStreamIds = new ArrayList<>();
        List<PruneTableStreamPartitionOffsetInfo.Entry> pruneEntries = new ArrayList<>();
        for (Map.Entry<Long, Set<Long>> entry : copyDbStreamMap().entrySet()) {
            Optional<Database> db = Env.getCurrentInternalCatalog().getDb(entry.getKey());
            if (!db.isPresent()) {
                staleDbIds.add(entry.getKey());
                continue;
            }
            for (Long tableId : entry.getValue()) {
                Optional<Table> table = db.get().getTable(tableId);
                if (!table.isPresent()) {
                    staleStreamIds.add(Pair.of(db.get().getId(), tableId));
                    continue;
                }
                if (!(table.get() instanceof OlapTableStream)) {
                    staleStreamIds.add(Pair.of(db.get().getId(), tableId));
                    continue;
                }
                cleanupStalePartitionOffsets((OlapTableStream) table.get()).ifPresent(pruneEntries::add);
            }
        }
        removeStaleDbAndStream(staleDbIds, staleStreamIds);
        if (!pruneEntries.isEmpty()) {
            Env.getCurrentEnv().getEditLog().logPruneTableStreamPartitionOffsets(
                    new PruneTableStreamPartitionOffsetInfo(pruneEntries));
        }
    }

    private Optional<PruneTableStreamPartitionOffsetInfo.Entry> cleanupStalePartitionOffsets(OlapTableStream stream) {
        if (stream.isDisabled() || stream.isStale()) {
            return Optional.empty();
        }
        OlapTable baseTable = stream.getBaseTableNullable();
        if (baseTable == null) {
            return Optional.empty();
        }
        Set<Long> validPartitionIds;
        if (!baseTable.tryReadLock(Table.TRY_LOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("skip cleaning stream {} because base table {} read lock is busy",
                        stream.getName(), baseTable.getName());
            }
            return Optional.empty();
        }
        try {
            if (baseTable.isDropped) {
                return Optional.empty();
            }
            validPartitionIds = new HashSet<>(baseTable.getPartitionIds());
        } finally {
            baseTable.readUnlock();
        }
        if (!stream.tryWriteLockIfExist(Table.TRY_LOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("skip cleaning stream {} because stream write lock is busy", stream.getName());
            }
            return Optional.empty();
        }
        try {
            if (stream.isDisabled() || stream.isStale()) {
                return Optional.empty();
            }
            Set<Long> stalePartitionIds = stream.unprotectedCollectStalePartitionOffsetIds(validPartitionIds);
            if (stalePartitionIds.isEmpty()) {
                return Optional.empty();
            }
            int removedPartitionCount = stream.unprotectedPrunePartitionOffsets(stalePartitionIds);
            if (removedPartitionCount > 0) {
                LOG.info("cleaned {} stale partition offset entries from stream {}.{} ({})",
                        removedPartitionCount, stream.getDatabase().getFullName(), stream.getName(), stream.getId());
            }
            return Optional.of(new PruneTableStreamPartitionOffsetInfo.Entry(
                    stream.getDatabase().getId(), stream.getId(), stalePartitionIds));
        } finally {
            stream.writeUnlock();
        }
    }

    public void replayPruneTableStreamPartitionOffsets(PruneTableStreamPartitionOffsetInfo info) {
        if (info == null || info.getEntries() == null || info.getEntries().isEmpty()) {
            return;
        }
        for (PruneTableStreamPartitionOffsetInfo.Entry entry : info.getEntries()) {
            replayPruneTableStreamPartitionOffsets(entry);
        }
    }

    private void replayPruneTableStreamPartitionOffsets(PruneTableStreamPartitionOffsetInfo.Entry entry) {
        if (entry == null || entry.getPartitionIds() == null || entry.getPartitionIds().isEmpty()) {
            return;
        }
        Optional<Database> db = Env.getCurrentInternalCatalog().getDb(entry.getDbId());
        if (!db.isPresent()) {
            LOG.info("skip replay pruning partition offsets because db {} does not exist", entry.getDbId());
            return;
        }
        Optional<Table> table = db.get().getTable(entry.getStreamId());
        if (!table.isPresent()) {
            LOG.info("skip replay pruning partition offsets because stream {}.{} does not exist",
                    entry.getDbId(), entry.getStreamId());
            return;
        }
        if (!(table.get() instanceof OlapTableStream)) {
            LOG.info("skip replay pruning partition offsets because table {}.{} is not an olap table stream",
                    entry.getDbId(), entry.getStreamId());
            return;
        }
        OlapTableStream stream = (OlapTableStream) table.get();
        if (!stream.tryWriteLockIfExist(Table.TRY_LOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
            LOG.warn("skip replay pruning partition offsets because stream {}.{} write lock is busy",
                    db.get().getFullName(), stream.getName());
            return;
        }
        try {
            stream.unprotectedPrunePartitionOffsets(entry.getPartitionIds());
        } finally {
            stream.writeUnlock();
        }
    }

    private Map<Long, Set<Long>> copyDbStreamMap() {
        Map<Long, Set<Long>> copiedMap = new HashMap<>();
        rwLock.readLock().lock();
        try {
            for (Map.Entry<Long, Set<Long>> e : dbStreamMap.entrySet()) {
                copiedMap.put(e.getKey(), new HashSet<>(e.getValue()));
            }
        } finally {
            rwLock.readLock().unlock();
        }
        return copiedMap;
    }

    public void fillTableStreamValuesMetadataResult(List<TRow> dataBatch) {
        for (Map.Entry<Long, Set<Long>> entry : copyDbStreamMap().entrySet()) {
            Optional<Database> db = Env.getCurrentInternalCatalog().getDb(entry.getKey());
            if (db.isPresent()) {
                for (Long tableId : entry.getValue()) {
                    Optional<Table> table = db.get().getTable(tableId);
                    if (!table.isPresent()) {
                        if (LOG.isDebugEnabled()) {
                            LOG.warn("invalid stream id: {}, db: {}", tableId, db.get().getFullName());
                        }
                        continue;
                    }
                    if (!(table.get() instanceof BaseTableStream)) {
                        LOG.warn("invalid not table stream type table: {}", table.get().getName());
                        continue;
                    }
                    BaseTableStream stream = (BaseTableStream) table.get();
                    if (stream.readLockIfExist()) {
                        try {
                            TRow trow = new TRow();
                            // DB_NAME
                            trow.addToColumnValue(new TCell().setStringVal(stream.getDatabase().getFullName()));
                            // STREAM_NAME
                            trow.addToColumnValue(new TCell().setStringVal(stream.getName()));
                            // STREAM_ID
                            trow.addToColumnValue(new TCell().setLongVal(stream.getId()));
                            // STREAM_TYPE
                            trow.addToColumnValue(new TCell().setStringVal(stream.getTableStreamType()));
                            // CONSUME_TYPE
                            trow.addToColumnValue(new TCell().setStringVal(stream.getConsumeTypeString()));
                            // STREAM_COMMENT
                            trow.addToColumnValue(new TCell().setStringVal(stream.getComment()));
                            TableIf baseTable = stream.getBaseTableNullable();
                            if (baseTable == null) {
                                // BASE_TABLE_NAME
                                trow.addToColumnValue(new TCell().setStringVal("N/A"));
                                // BASE_TABLE_DB
                                trow.addToColumnValue(new TCell().setStringVal("N/A"));
                                // BASE_TABLE_CTL
                                trow.addToColumnValue(new TCell().setStringVal("N/A"));
                                // BASE_TABLE_TYPE
                                trow.addToColumnValue(new TCell().setStringVal("N/A"));
                            } else {
                                List<String> baseTableQualifiers = baseTable.getFullQualifiers();
                                // BASE_TABLE_NAME
                                trow.addToColumnValue(new TCell().setStringVal(baseTableQualifiers.get(2)));
                                // BASE_TABLE_DB
                                trow.addToColumnValue(new TCell().setStringVal(baseTableQualifiers.get(1)));
                                // BASE_TABLE_CTL
                                trow.addToColumnValue(new TCell().setStringVal(baseTableQualifiers.get(0)));
                                // BASE_TABLE_TYPE
                                trow.addToColumnValue(new TCell().setStringVal(baseTable.getType().name()));
                            }
                            // ENABLED
                            trow.addToColumnValue(new TCell().setBoolVal(!stream.isDisabled()));
                            // IS_STALE
                            trow.addToColumnValue(new TCell().setBoolVal(stream.isStale()));
                            // STALE_REASON
                            trow.addToColumnValue(new TCell().setStringVal(stream.getStaleReason()));
                            dataBatch.add(trow);
                        } finally {
                            stream.readUnlock();
                        }
                    }
                }
            }
        }
    }

    public void fillStreamConsumptionValuesMetadataResult(List<TRow> dataBatch) {
        for (Map.Entry<Long, Set<Long>> entry : copyDbStreamMap().entrySet()) {
            Optional<Database> db = Env.getCurrentInternalCatalog().getDb(entry.getKey());
            if (db.isPresent()) {
                for (Long tableId : entry.getValue()) {
                    Optional<Table> table = db.get().getTable(tableId);
                    if (!table.isPresent()) {
                        if (LOG.isDebugEnabled()) {
                            LOG.warn("invalid stream id: {}, db: {}", tableId, db.get().getFullName());
                        }
                        continue;
                    }
                    Preconditions.checkArgument(table.get() instanceof BaseTableStream);
                    BaseTableStream stream = (BaseTableStream) table.get();
                    if (stream.readLockIfExist()) {
                        try {
                            stream.fillTableStreamConsumptionInfo(dataBatch);
                        } finally {
                            stream.readUnlock();
                        }
                    }
                }
            }
        }
    }

    @Override
    public void gsonPostProcess() throws IOException {
        this.rwLock = new MonitoredReentrantReadWriteLock(true);
        this.intervalMs = Config.table_stream_partition_offset_cleanup_interval_second * 1000L;
    }
}