ComputeGroup.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.cloud.catalog;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import lombok.Getter;
import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
public class ComputeGroup {
private static final Logger LOG = LogManager.getLogger(ComputeGroup.class);
public static final String BALANCE_TYPE = "balance_type";
public static final String BALANCE_WARM_UP_TASK_TIMEOUT = "balance_warm_up_task_timeout";
private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new ImmutableSet.Builder<String>()
.add(BALANCE_TYPE).add(BALANCE_WARM_UP_TASK_TIMEOUT).build();
private static final Map<String, String> ALL_PROPERTIES_DEFAULT_VALUE_MAP = Maps.newHashMap();
public static final int DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT = Config.cloud_pre_heating_time_limit_sec;
public static final BalanceTypeEnum DEFAULT_COMPUTE_GROUP_BALANCE_ENUM
= BalanceTypeEnum.fromString(Config.cloud_warm_up_for_rebalance_type);
static {
ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(BALANCE_TYPE, DEFAULT_COMPUTE_GROUP_BALANCE_ENUM.getValue());
if (BalanceTypeEnum.ASYNC_WARMUP.getValue().equals(Config.cloud_warm_up_for_rebalance_type)) {
ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(BALANCE_WARM_UP_TASK_TIMEOUT,
String.valueOf(DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT));
}
}
private enum PolicyTypeEnum {
ActiveStandby,
}
public enum ComputeTypeEnum {
SQL,
COMPUTE,
VIRTUAL,
}
@Getter
@Setter
public static class Policy {
PolicyTypeEnum policyType;
String activeComputeGroup;
String standbyComputeGroup;
long failoverFailureThreshold = 3;
long unhealthyNodeThresholdPercent = 100;
List<String> cacheWarmupJobIds = new ArrayList<>();
public Policy() {
policyType = PolicyTypeEnum.ActiveStandby;
}
@Override
public String toString() {
Map<String, String> showMap = new LinkedHashMap<>();
showMap.put("policyType", policyType.toString());
showMap.put("activeComputeGroup", activeComputeGroup);
showMap.put("standbyComputeGroup", standbyComputeGroup);
showMap.put("failoverFailureThreshold", String.valueOf(failoverFailureThreshold));
showMap.put("unhealthyNodeThresholdPercent", String.valueOf(unhealthyNodeThresholdPercent));
showMap.put("cacheWarmupJobIds", String.valueOf(cacheWarmupJobIds));
Gson gson = new Gson();
return gson.toJson(showMap);
}
public Cloud.ClusterPolicy toPb() {
return Cloud.ClusterPolicy.newBuilder()
.setType(Cloud.ClusterPolicy.PolicyType.ActiveStandby)
.setActiveClusterName(activeComputeGroup)
.addStandbyClusterNames(standbyComputeGroup)
.setFailoverFailureThreshold(failoverFailureThreshold)
.setUnhealthyNodeThresholdPercent(unhealthyNodeThresholdPercent)
.build();
}
}
@Getter
private String id;
@Getter
// cg can be renamed
@Setter
private String name;
@Getter
private ComputeTypeEnum type;
// record sub cluster name
@Getter
@Setter
private List<String> subComputeGroups;
@Getter
@Setter
private long unavailableSince = -1;
@Getter
@Setter
private long availableSince = -1;
@Getter
@Setter
private Policy policy;
@Getter
@Setter
private String currentClusterName;
@Getter
@Setter
private boolean needRebuildFileCache = false;
@Getter
@Setter
private Map<String, String> properties = new LinkedHashMap<>(ALL_PROPERTIES_DEFAULT_VALUE_MAP);
public ComputeGroup(String id, String name, ComputeTypeEnum type) {
this.id = id;
this.name = name;
this.type = type;
}
public boolean isVirtual() {
return type == ComputeTypeEnum.VIRTUAL;
}
public String getActiveComputeGroup() {
if (policy == null) {
return "empty_policy";
}
return policy.getActiveComputeGroup();
}
public String getStandbyComputeGroup() {
if (policy == null) {
return "empty_policy";
}
return policy.getStandbyComputeGroup();
}
private void validateTimeoutRestriction(Map<String, String> inputProperties) throws DdlException {
if (!properties.containsKey(BALANCE_TYPE)) {
return;
}
String originalBalanceType = properties.get(BALANCE_TYPE);
if (BalanceTypeEnum.ASYNC_WARMUP.getValue().equals(originalBalanceType)) {
return;
}
if (inputProperties.containsKey(BALANCE_TYPE)
&& BalanceTypeEnum.ASYNC_WARMUP.getValue().equals(inputProperties.get(BALANCE_TYPE))) {
return;
}
if (inputProperties.containsKey(BALANCE_WARM_UP_TASK_TIMEOUT)) {
throw new DdlException("Property " + BALANCE_WARM_UP_TASK_TIMEOUT
+ " cannot be set when current " + BALANCE_TYPE + " is " + originalBalanceType
+ ". Only async_warmup type supports timeout setting.");
}
}
/**
* Validate a single property key-value pair.
*/
private static void validateProperty(String key, String value) throws DdlException {
if (value == null || value.isEmpty()) {
return;
}
if (!ALL_PROPERTIES_NAME.contains(key)) {
throw new DdlException("Property " + key + " is not supported");
}
// Validate specific properties
if (BALANCE_TYPE.equals(key)) {
if (!BalanceTypeEnum.isValid(value)) {
throw new DdlException("Property " + BALANCE_TYPE
+ " only support without_warmup or async_warmup or sync_warmup");
}
} else if (BALANCE_WARM_UP_TASK_TIMEOUT.equals(key)) {
try {
int timeout = Integer.parseInt(value);
if (timeout <= 0) {
throw new DdlException("Property " + BALANCE_WARM_UP_TASK_TIMEOUT + " must be positive integer");
}
} catch (NumberFormatException e) {
throw new DdlException("Property " + BALANCE_WARM_UP_TASK_TIMEOUT + " must be positive integer");
}
}
}
public void checkProperties(Map<String, String> inputProperties) throws DdlException {
if (inputProperties == null || inputProperties.isEmpty()) {
return;
}
for (Map.Entry<String, String> entry : inputProperties.entrySet()) {
validateProperty(entry.getKey(), entry.getValue());
}
validateTimeoutRestriction(inputProperties);
}
public void modifyProperties(Map<String, String> inputProperties) throws DdlException {
String balanceType = inputProperties.get(BALANCE_TYPE);
if (balanceType == null) {
return;
}
if (BalanceTypeEnum.WITHOUT_WARMUP.getValue().equals(balanceType)
|| BalanceTypeEnum.SYNC_WARMUP.getValue().equals(balanceType)
|| BalanceTypeEnum.PEER_READ_ASYNC_WARMUP.getValue().equals(balanceType)) {
// delete BALANCE_WARM_UP_TASK_TIMEOUT if exists
properties.remove(BALANCE_WARM_UP_TASK_TIMEOUT);
} else if (BalanceTypeEnum.ASYNC_WARMUP.getValue().equals(balanceType)) {
// if BALANCE_WARM_UP_TASK_TIMEOUT exists, it has been validated in validateProperty
if (!properties.containsKey(BALANCE_WARM_UP_TASK_TIMEOUT)) {
properties.put(BALANCE_WARM_UP_TASK_TIMEOUT, String.valueOf(DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT));
}
}
}
// set properties, just set in periodic instance status checker
public void setProperties(Map<String, String> propertiesInMs) {
if (propertiesInMs == null || propertiesInMs.isEmpty()) {
return;
}
for (Map.Entry<String, String> entry : propertiesInMs.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
try {
validateProperty(key, value);
} catch (DdlException e) {
LOG.warn("ignore invalid property. compute group: {}, key: {}, value: {}, error: {}",
name, key, value, e.getMessage());
continue;
}
if (value != null && !value.isEmpty()) {
properties.put(key, value);
}
}
}
public BalanceTypeEnum getBalanceType() {
String balanceType = properties.get(BALANCE_TYPE);
BalanceTypeEnum type = BalanceTypeEnum.fromString(balanceType);
if (type == null) {
return BalanceTypeEnum.ASYNC_WARMUP;
}
return type;
}
public int getBalanceWarmUpTaskTimeout() {
String timeoutStr = properties.get(BALANCE_WARM_UP_TASK_TIMEOUT);
try {
return Integer.parseInt(timeoutStr);
} catch (NumberFormatException e) {
return DEFAULT_BALANCE_WARM_UP_TASK_TIMEOUT;
}
}
@Override
public String toString() {
Map<String, String> showMap = new LinkedHashMap<>();
showMap.put("id", id);
showMap.put("name", name);
showMap.put("type", type.toString());
showMap.put("unavailableSince", String.valueOf(unavailableSince));
showMap.put("availableSince", String.valueOf(availableSince));
showMap.put("policy", policy == null ? "no_policy" : policy.toString());
showMap.put("properties", properties.toString());
Gson gson = new Gson();
return gson.toJson(showMap);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ComputeGroup)) {
return false;
}
ComputeGroup that = (ComputeGroup) o;
return unavailableSince == that.unavailableSince
&& availableSince == that.availableSince
&& id.equals(that.id)
&& name.equals(that.name)
&& type == that.type
&& (policy != null ? policy.equals(that.policy) : that.policy == null);
}
@Override
public int hashCode() {
int result = id.hashCode();
result = 31 * result + name.hashCode();
result = 31 * result + (type != null ? type.hashCode() : 0);
result = 31 * result + (int) (unavailableSince ^ (unavailableSince >>> 32));
result = 31 * result + (int) (availableSince ^ (availableSince >>> 32));
result = 31 * result + (policy != null ? policy.hashCode() : 0);
return result;
}
}