TableDispatchScheduler.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.clone;

import org.apache.doris.common.ThreadPoolManager;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

class TableDispatchScheduler {
    private static final Logger LOG = LogManager.getLogger(TableDispatchScheduler.class);
    private static final int MAX_ACTIVE_TABLE_WORKERS = 16;
    private static final long WORKER_KEEP_ALIVE_SECONDS = 60;

    // table id -> tablets queued for worker consumption (worker-queued state)
    private final Map<Long, Deque<TabletSchedCtx>> workerQueuedTabletsByTable = Maps.newHashMap();
    // one table can only be handled by one worker at a time
    private final Set<Long> activeTableWorkers = Sets.newHashSet();
    private final ThreadPoolExecutor tableWorkerPool;
    private final Consumer<List<TabletSchedCtx>> tabletBatchProcessor;
    private final int workerBatchSize;

    TableDispatchScheduler(Consumer<List<TabletSchedCtx>> tabletBatchProcessor, int workerBatchSize) {
        this.tabletBatchProcessor = tabletBatchProcessor;
        this.workerBatchSize = workerBatchSize;
        this.tableWorkerPool = ThreadPoolManager.newDaemonThreadPool(0, MAX_ACTIVE_TABLE_WORKERS,
                WORKER_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
                new ThreadPoolExecutor.AbortPolicy(), "table-dispatch-scheduler-worker", true);
        this.tableWorkerPool.allowCoreThreadTimeOut(true);
    }

    synchronized void clear() {
        workerQueuedTabletsByTable.clear();
        activeTableWorkers.clear();
    }

    void enqueueTablets(List<TabletSchedCtx> tabletCtxs) {
        synchronized (this) {
            for (TabletSchedCtx tabletCtx : tabletCtxs) {
                long tableId = tabletCtx.getTblId();
                Deque<TabletSchedCtx> queue = workerQueuedTabletsByTable.get(tableId);
                if (queue == null) {
                    queue = new LinkedList<>();
                    workerQueuedTabletsByTable.put(tableId, queue);
                }
                queue.addLast(tabletCtx);
            }
        }

        triggerTableWorkers();
    }

    synchronized void appendWorkerQueuedTablets(List<TabletSchedCtx> target, int limit) {
        for (Deque<TabletSchedCtx> queue : workerQueuedTabletsByTable.values()) {
            if (target.size() >= limit) {
                return;
            }
            queue.stream().limit(limit - target.size()).forEach(target::add);
        }
    }

    synchronized List<TabletSchedCtx> getWorkerQueuedTablets() {
        List<TabletSchedCtx> queuedTablets = Lists.newArrayList();
        for (Deque<TabletSchedCtx> queue : workerQueuedTabletsByTable.values()) {
            queuedTablets.addAll(queue);
        }
        return queuedTablets;
    }

    private void triggerTableWorkers() {
        while (true) {
            long tableIdToSchedule;
            synchronized (this) {
                if (activeTableWorkers.size() >= MAX_ACTIVE_TABLE_WORKERS) {
                    return;
                }

                tableIdToSchedule = pickNextTableIdToActivate();
                if (tableIdToSchedule == -1) {
                    return;
                }

                activeTableWorkers.add(tableIdToSchedule);
            }
            try {
                tableWorkerPool.execute(new TableScheduleWorker(tableIdToSchedule));
            } catch (RuntimeException e) {
                synchronized (this) {
                    activeTableWorkers.remove(tableIdToSchedule);
                }
                LOG.warn("failed to submit table schedule worker for table {}", tableIdToSchedule, e);
                return;
            }
        }
    }

    private long pickNextTableIdToActivate() {
        long tableId = -1;
        TabletSchedCtx candidate = null;
        for (Map.Entry<Long, Deque<TabletSchedCtx>> entry : workerQueuedTabletsByTable.entrySet()) {
            if (activeTableWorkers.contains(entry.getKey()) || entry.getValue().isEmpty()) {
                continue;
            }
            TabletSchedCtx head = entry.getValue().peekFirst();
            if (head == null) {
                continue;
            }

            // higher priority tablet
            if (candidate == null || head.compareTo(candidate) < 0) {
                candidate = head;
                tableId = entry.getKey();
            }
        }
        return tableId;
    }

    private synchronized List<TabletSchedCtx> pollNextTabletCtxBatch(long tableId) {
        List<TabletSchedCtx> tabletCtxs = Lists.newArrayList();
        Deque<TabletSchedCtx> queue = workerQueuedTabletsByTable.get(tableId);
        if (queue == null) {
            return tabletCtxs;
        }
        while (tabletCtxs.size() < workerBatchSize) {
            TabletSchedCtx tabletCtx = queue.pollFirst();
            if (tabletCtx == null) {
                break;
            }
            tabletCtxs.add(tabletCtx);
        }
        if (queue.isEmpty()) {
            workerQueuedTabletsByTable.remove(tableId);
        }
        return tabletCtxs;
    }

    private void onTableWorkerDone(long tableId) {
        synchronized (this) {
            activeTableWorkers.remove(tableId);
        }
        triggerTableWorkers();
    }

    private class TableScheduleWorker implements Runnable {
        private final long tableId;

        private TableScheduleWorker(long tableId) {
            this.tableId = tableId;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    List<TabletSchedCtx> tabletCtxBatch = pollNextTabletCtxBatch(tableId);
                    if (tabletCtxBatch.isEmpty()) {
                        return;
                    }
                    tabletBatchProcessor.accept(tabletCtxBatch);
                }
            } finally {
                onTableWorkerDone(tableId);
            }
        }
    }
}