SimpleScheduler.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.Reference;
import org.apache.doris.common.UserException;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TScanRangeLocation;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
public class SimpleScheduler {
private static final Logger LOG = LogManager.getLogger(SimpleScheduler.class);
public static class BlackListInfo {
public BlackListInfo() {}
private Lock lock = new ReentrantLock();
// Record the reason why this backend is added to black list, will be updated only once.
private String reasonForBlackList = "";
// Record the first time this backend is tried to be added to black list.
private Long firstRecordBlackTimestampMs = 0L;
// Record the last time this backend is tried to be added to black list.
private Long lastRecordBlackTimestampMs = 0L;
// Record the timestamp this backend is really regarded as blacked.
private Long lastBlackTimestampMs = 0L;
// Record the count of this backend is tried to be added to black list.
private Long recordBlackListCount = 0L;
// Record the backend id
private Long backendID = 0L;
// Try to add this backend to black list, backend is not really added to black list until
// condition in shouldBeBlackListed is met.
public void tryAddBlackList(String reason) {
lock.lock();
try {
recordAddBlackList(reason);
if (shouldBeBlackListed()) {
doAddBlackList();
}
} finally {
lock.unlock();
}
}
// Just update the fileds.
private void recordAddBlackList(String reason) {
if (firstRecordBlackTimestampMs <= 0) {
firstRecordBlackTimestampMs = System.currentTimeMillis();
}
lastRecordBlackTimestampMs = System.currentTimeMillis();
// Restart the counter if the time interval is too long
if (lastRecordBlackTimestampMs - firstRecordBlackTimestampMs
>= Config.do_add_backend_black_list_threshold_seconds * 1000) {
firstRecordBlackTimestampMs = lastRecordBlackTimestampMs;
recordBlackListCount = 0L;
}
recordBlackListCount++;
if (Strings.isNullOrEmpty(reasonForBlackList) && !Strings.isNullOrEmpty(reason)) {
reasonForBlackList = reason;
}
}
private boolean shouldBeBlackListed() {
if (lastRecordBlackTimestampMs <= 0 || firstRecordBlackTimestampMs <= 0) {
return false;
}
if (recordBlackListCount < Config.do_add_backend_black_list_threshold_count) {
return false;
}
if (lastRecordBlackTimestampMs - firstRecordBlackTimestampMs
>= Config.do_add_backend_black_list_threshold_seconds * 1000) {
return false;
}
return true;
}
private void doAddBlackList() {
lastBlackTimestampMs = System.currentTimeMillis();
Exception e = new Exception();
String stack = org.apache.commons.lang3.exception.ExceptionUtils.getStackTrace(e);
LOG.warn("Backend is added to black list.\nInformation:\n{}\nStack:\n{}", toString(), stack);
}
public boolean shouldBeRemoved() {
lock.lock();
try {
if (lastBlackTimestampMs <= 0) {
return false;
}
Long currentTimeStamp = System.currentTimeMillis();
// If this backend has not been recorded as black for more than 10 secs, then regard it as normal
if (currentTimeStamp - lastBlackTimestampMs
>= Config.stay_in_backend_black_list_threshold_seconds * 1000) {
return true;
} else {
return false;
}
} finally {
lock.unlock();
}
}
public boolean isBlacked() {
lock.lock();
try {
return lastBlackTimestampMs > 0;
} finally {
lock.unlock();
}
}
public String getReasonForBlackList() {
lock.lock();
try {
return reasonForBlackList;
} finally {
lock.unlock();
}
}
public String toString() {
StringBuilder sb = new StringBuilder();
lock.lock();
// Convert to human readable time
LocalDateTime firstRecordBlackTimes = LocalDateTime.ofInstant(
Instant.ofEpochMilli(firstRecordBlackTimestampMs), ZoneId.systemDefault());
LocalDateTime lastRecordBlackTimes = LocalDateTime.ofInstant(
Instant.ofEpochMilli(lastRecordBlackTimestampMs), ZoneId.systemDefault());
LocalDateTime lastBlackTimes = LocalDateTime.ofInstant(
Instant.ofEpochMilli(lastBlackTimestampMs), ZoneId.systemDefault());
try {
sb.append("\nbackendID: ").append(backendID).append("\n");
sb.append("reasonForBlackList: ").append(reasonForBlackList).append("\n");
sb.append("firstRecordBlackTimestampMs: ").append(firstRecordBlackTimes).append("\n");
sb.append("lastRecordBlackTimestampMs: ").append(lastRecordBlackTimes).append("\n");
sb.append("lastBlackTimestampMs: ").append(lastBlackTimes).append("\n");
sb.append("recordBlackListCount: ").append(recordBlackListCount).append("\n");
sb.append("Config.do_add_backend_black_list_threshold_seconds: ")
.append(Config.do_add_backend_black_list_threshold_seconds).append("\n");
sb.append("Config.stay_in_backend_black_list_threshold_seconds: ")
.append(Config.stay_in_backend_black_list_threshold_seconds).append("\n");
return sb.toString();
} finally {
lock.unlock();
}
}
}
private static AtomicLong nextId = new AtomicLong(0);
// backend id -> (try time, reason)
// There will be multi threads to read and modify this map.
// But only one thread (UpdateBlacklistThread) will modify the `Pair`.
// So using concurrent map is enough
private static Map<Long, BlackListInfo> blacklistBackends = Maps.newConcurrentMap();
private static UpdateBlacklistThread updateBlacklistThread;
public static void init() {
updateBlacklistThread = new UpdateBlacklistThread();
updateBlacklistThread.start();
}
public static TNetworkAddress getHost(long backendId,
List<TScanRangeLocation> locations,
ImmutableMap<Long, Backend> backends,
Reference<Long> backendIdRef)
throws UserException {
if (CollectionUtils.isEmpty(locations) || backends == null || backends.isEmpty()) {
throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG);
}
if (LOG.isDebugEnabled()) {
LOG.debug("getHost backendID={}, backendSize={}", backendId, backends.size());
}
Backend backend = backends.get(backendId);
if (isAvailable(backend)) {
backendIdRef.setRef(backendId);
return new TNetworkAddress(backend.getHost(), backend.getBePort());
} else {
for (TScanRangeLocation location : locations) {
if (location.backend_id == backendId) {
continue;
}
// choose the first alive backend(in analysis stage, the locations are random)
Backend candidateBackend = backends.get(location.backend_id);
if (isAvailable(candidateBackend)) {
backendIdRef.setRef(location.backend_id);
return new TNetworkAddress(candidateBackend.getHost(), candidateBackend.getBePort());
}
}
}
// no backend returned
throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG
+ getBackendErrorMsg(locations.stream().map(l -> l.backend_id).collect(Collectors.toList()),
backends, locations.size()));
}
public static TScanRangeLocation getLocation(TScanRangeLocation minLocation,
List<TScanRangeLocation> locations,
ImmutableMap<Long, Backend> backends,
Reference<Long> backendIdRef)
throws UserException {
if (CollectionUtils.isEmpty(locations) || backends == null || backends.isEmpty()) {
throw new UserException("scan range location or candidate backends is empty");
}
Backend backend = backends.get(minLocation.backend_id);
if (isAvailable(backend)) {
backendIdRef.setRef(minLocation.backend_id);
return minLocation;
} else {
for (TScanRangeLocation location : locations) {
if (location.backend_id == minLocation.backend_id) {
continue;
}
// choose the first alive backend(in analysis stage, the locations are random)
Backend candidateBackend = backends.get(location.backend_id);
if (isAvailable(candidateBackend)) {
backendIdRef.setRef(location.backend_id);
return location;
}
}
}
// no backend returned
throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG
+ getBackendErrorMsg(locations.stream().map(l -> l.backend_id).collect(Collectors.toList()),
backends, locations.size()));
}
public static TNetworkAddress getHost(ImmutableMap<Long, Backend> backends,
Reference<Long> backendIdRef)
throws UserException {
if (backends.isEmpty()) {
throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG);
}
long id = nextId.getAndIncrement() % backends.size();
Map.Entry<Long, Backend> backendEntry = backends.entrySet().stream().skip(id).filter(
e -> isAvailable(e.getValue())).findFirst().orElse(null);
if (backendEntry == null && id > 0) {
backendEntry = backends.entrySet().stream().limit(id).filter(
e -> isAvailable(e.getValue())).findFirst().orElse(null);
}
if (backendEntry != null) {
Backend backend = backendEntry.getValue();
backendIdRef.setRef(backendEntry.getKey());
return new TNetworkAddress(backend.getHost(), backend.getBePort());
}
// no backend returned
throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG
+ getBackendErrorMsg(Lists.newArrayList(backends.keySet()), backends, 3));
}
// get the reason why backends can not be chosen.
private static String getBackendErrorMsg(List<Long> backendIds, ImmutableMap<Long, Backend> backends, int limit) {
List<String> res = Lists.newArrayList();
for (int i = 0; i < backendIds.size() && i < limit; i++) {
long beId = backendIds.get(i);
Backend be = backends.get(beId);
BlackListInfo blackListInfo = blacklistBackends.get(beId);
if (be == null) {
res.add(beId + ": not exist");
} else if (!be.isAlive()) {
res.add(beId + ": not alive");
} else if (blackListInfo != null && blackListInfo.isBlacked()) {
res.add(beId + ": in black list(" + blackListInfo.getReasonForBlackList() + ")");
} else if (!be.isQueryAvailable()) {
res.add(beId + ": disable query");
} else {
res.add(beId + ": unknown");
}
}
return res.toString();
}
public static void addToBlacklist(Long backendID, String reason) {
if (backendID == null || Config.disable_backend_black_list || Config.isCloudMode()) {
LOG.warn("ignore backend black list for backend: {}, disabled: {}, is cloud: {}",
backendID, Config.disable_backend_black_list, Config.isCloudMode());
return;
}
BlackListInfo blackListInfo = blacklistBackends.putIfAbsent(backendID, new BlackListInfo());
if (blackListInfo == null) {
blackListInfo = blacklistBackends.get(backendID);
}
blackListInfo.tryAddBlackList(reason);
}
public static boolean isAvailable(Backend backend) {
if (backend == null) {
return false;
}
if (!backend.isQueryAvailable()) {
return false;
}
BlackListInfo blackListInfo = blacklistBackends.get(backend.getId());
if (blackListInfo != null && blackListInfo.isBlacked()) {
return false;
}
return true;
}
private static class UpdateBlacklistThread implements Runnable {
private static final Logger LOG = LogManager.getLogger(UpdateBlacklistThread.class);
private static Thread thread;
public UpdateBlacklistThread() {
thread = new Thread(this, "UpdateBlacklistThread");
thread.setDaemon(true);
}
public void start() {
thread.start();
}
@Override
public void run() {
if (LOG.isDebugEnabled()) {
LOG.debug("UpdateBlacklistThread is start to run");
}
while (true) {
try {
Thread.sleep(1000L);
SystemInfoService clusterInfoService = Env.getCurrentSystemInfo();
Iterator<Map.Entry<Long, SimpleScheduler.BlackListInfo>>
iterator = blacklistBackends.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, SimpleScheduler.BlackListInfo> entry = iterator.next();
Long backendId = entry.getKey();
Backend backend = clusterInfoService.getBackend(backendId);
// remove from blacklist if backend does not exist anymore
if (backend == null) {
iterator.remove();
LOG.info("remove backend {} from black list because it does not exist", backendId);
} else {
BlackListInfo blackListInfo = entry.getValue();
if (backend.isAlive() || blackListInfo.shouldBeRemoved()) {
iterator.remove();
LOG.info("remove backend {} from black list. backend is alive: {}",
backendId, backend.isAlive());
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("blacklistBackends {}", blackListInfo.toString());
}
}
}
}
} catch (Throwable ex) {
LOG.warn("blacklist thread exception", ex);
}
}
}
}
public static TNetworkAddress getHostByCurrentBackend(Map<TNetworkAddress, Long> addressToBackendID) {
long id = nextId.getAndIncrement() % addressToBackendID.size();
return addressToBackendID.keySet().stream().skip(id).findFirst().orElse(null);
}
}