BeSelectionPolicy.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.system;

import org.apache.doris.common.Config;
import org.apache.doris.qe.SimpleScheduler;
import org.apache.doris.resource.Tag;
import org.apache.doris.thrift.TStorageMedium;

import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * Selection policy for building BE nodes
 */
public class BeSelectionPolicy {
    private static final Logger LOG = LogManager.getLogger(BeSelectionPolicy.class);

    public boolean needScheduleAvailable = false;
    public boolean needQueryAvailable = false;
    public boolean needLoadAvailable = false;
    // Resource tag. Empty means no need to consider resource tag.
    public Set<Tag> resourceTags = Sets.newHashSet();
    // storage medium. null means no need to consider storage medium.
    public TStorageMedium storageMedium = null;
    // Check if disk usage reaches limit. false means no need to check.
    public boolean checkDiskUsage = false;
    // If set to false, do not select backends on same host.
    public boolean allowOnSameHost = false;

    public boolean preferComputeNode = false;
    public int expectBeNum = 0;

    public boolean enableRoundRobin = false;
    // if enable round robin, choose next be from nextRoundRobinIndex
    // call SystemInfoService::selectBackendIdsByPolicy will update nextRoundRobinIndex
    public int nextRoundRobinIndex = -1;

    public List<String> preferredLocations = new ArrayList<>();

    public boolean requireAliveBe = false;

    private BeSelectionPolicy() {

    }

    public static class Builder {
        private BeSelectionPolicy policy;

        public Builder() {
            policy = new BeSelectionPolicy();
        }

        public Builder needScheduleAvailable() {
            policy.needScheduleAvailable = true;
            return this;
        }

        public Builder needQueryAvailable() {
            policy.needQueryAvailable = true;
            return this;
        }

        public Builder needLoadAvailable() {
            policy.needLoadAvailable = true;
            return this;
        }

        public Builder addTags(Set<Tag> tags) {
            policy.resourceTags.addAll(tags);
            return this;
        }

        public Builder setStorageMedium(TStorageMedium medium) {
            policy.storageMedium = medium;
            return this;
        }

        public Builder needCheckDiskUsage() {
            policy.checkDiskUsage = true;
            return this;
        }

        public Builder allowOnSameHost() {
            policy.allowOnSameHost = true;
            return this;
        }

        public Builder preferComputeNode(boolean prefer) {
            policy.preferComputeNode = prefer;
            return this;
        }

        public Builder assignExpectBeNum(int expectBeNum) {
            policy.expectBeNum = expectBeNum;
            return this;
        }

        public Builder addPreLocations(List<String> preferredLocations) {
            policy.preferredLocations.addAll(preferredLocations);
            return this;
        }

        public Builder setEnableRoundRobin(boolean enableRoundRobin) {
            policy.enableRoundRobin = enableRoundRobin;
            return this;
        }

        public Builder setNextRoundRobinIndex(int nextRoundRobinIndex) {
            policy.nextRoundRobinIndex = nextRoundRobinIndex;
            return this;
        }

        public Builder setRequireAliveBe() {
            policy.requireAliveBe = true;
            return this;
        }

        public BeSelectionPolicy build() {
            return policy;
        }
    }

    private boolean isMatch(Backend backend) {
        // Compute node is only used when preferComputeNode is set.
        if (!preferComputeNode && backend.isComputeNode()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Backend [{}] is not match by ComputeNode rule, policy: [{}]", backend.getHost(), this);
            }
            return false;
        }

        if (needScheduleAvailable && !backend.isScheduleAvailable()
                || needQueryAvailable && !backend.isQueryAvailable()
                || needLoadAvailable && !backend.isLoadAvailable()
                || (!resourceTags.isEmpty() && !resourceTags.contains(backend.getLocationTag()))
                || storageMedium != null && !backend.hasSpecifiedStorageMedium(storageMedium)
                || (requireAliveBe && !backend.isAlive())) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Backend [{}] is not match by Other rules, policy: [{}]", backend.getHost(), this);
            }
            return false;
        }

        if (checkDiskUsage) {
            if (storageMedium == null && backend.diskExceedLimit()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Backend [{}] is not match by diskExceedLimit rule, policy: [{}]", backend.getHost(),
                            this);
                }
                return false;
            }
            if (storageMedium != null && backend.diskExceedLimitByStorageMedium(storageMedium)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Backend [{}] is not match by diskExceedLimitByStorageMedium rule, policy: [{}]",
                            backend.getHost(), this);
                }
                return false;
            }
        }
        return true;
    }

    public List<Backend> getCandidateBackends(Collection<Backend> backends) {
        List<Backend> filterBackends = backends.stream().filter(this::isMatch)
                .collect(Collectors.toList());
        List<Backend> preLocationFilterBackends = filterBackends.stream()
                .filter(iterm -> preferredLocations.contains(iterm.getHost())).collect(Collectors.toList());
        // If preLocations were chosen, use the preLocation backends. Otherwise we just ignore this filter.
        if (!preLocationFilterBackends.isEmpty()) {
            filterBackends = preLocationFilterBackends;
        }
        Collections.shuffle(filterBackends);
        int numComputeNode = filterBackends.stream().filter(Backend::isComputeNode).collect(Collectors.toList()).size();
        List<Backend> candidates = new ArrayList<>();
        if (preferComputeNode && numComputeNode > 0) {
            int realExpectBeNum = expectBeNum == -1 ? numComputeNode : expectBeNum;
            int num = 0;
            // pick compute node first
            for (Backend backend : filterBackends) {
                if (backend.isComputeNode()) {
                    candidates.add(backend);
                    num++;
                }
            }
            // fill with some mix node.
            if (num < realExpectBeNum) {
                for (Backend backend : filterBackends) {
                    if (backend.isMixNode()) {
                        if (num >= realExpectBeNum) {
                            break;
                        }
                        candidates.add(backend);
                        num++;
                    }
                }
            }
        } else {
            candidates.addAll(filterBackends);
        }
        // filter out backends in black list
        if (!Config.disable_backend_black_list) {
            candidates = candidates.stream().filter(b -> SimpleScheduler.isAvailable(b)).collect(Collectors.toList());
        }
        Collections.shuffle(candidates);
        return candidates;
    }

    @Override
    public String toString() {
        return String.format("computeNode=%s | query=%s | load=%s | schedule=%s | tags=%s | medium=%s",
                preferComputeNode, needQueryAvailable, needLoadAvailable, needScheduleAvailable,
                resourceTags.stream().map(tag -> tag.toString()).collect(Collectors.joining(",")), storageMedium);
    }
}