CloudEnv.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.cloud.catalog;
import org.apache.doris.analysis.CancelCloudWarmUpStmt;
import org.apache.doris.analysis.CreateStageStmt;
import org.apache.doris.analysis.DropStageStmt;
import org.apache.doris.analysis.ResourceTypeEnum;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EnvFactory;
import org.apache.doris.cloud.CacheHotspotManager;
import org.apache.doris.cloud.CloudWarmUpJob;
import org.apache.doris.cloud.CloudWarmUpJob.JobState;
import org.apache.doris.cloud.datasource.CloudInternalCatalog;
import org.apache.doris.cloud.load.CleanCopyJobScheduler;
import org.apache.doris.cloud.persist.UpdateCloudReplicaInfo;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.proto.Cloud.NodeInfoPB;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.CountingDataOutputStream;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.commands.CreateStageCommand;
import org.apache.doris.nereids.trees.plans.commands.DropStageCommand;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService.HostInfo;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
public class CloudEnv extends Env {
private static final Logger LOG = LogManager.getLogger(CloudEnv.class);
private CloudInstanceStatusChecker cloudInstanceStatusChecker;
private CloudClusterChecker cloudClusterCheck;
private CloudUpgradeMgr upgradeMgr;
private CloudTabletRebalancer cloudTabletRebalancer;
private CacheHotspotManager cacheHotspotMgr;
private boolean enableStorageVault;
private CleanCopyJobScheduler cleanCopyJobScheduler;
private String cloudInstanceId;
public CloudEnv(boolean isCheckpointCatalog) {
super(isCheckpointCatalog);
this.cleanCopyJobScheduler = new CleanCopyJobScheduler();
this.loadManager = ((CloudEnvFactory) EnvFactory.getInstance())
.createLoadManager(loadJobScheduler, cleanCopyJobScheduler);
this.cloudClusterCheck = new CloudClusterChecker((CloudSystemInfoService) systemInfo);
this.cloudInstanceStatusChecker = new CloudInstanceStatusChecker((CloudSystemInfoService) systemInfo);
this.cloudTabletRebalancer = new CloudTabletRebalancer((CloudSystemInfoService) systemInfo);
this.cacheHotspotMgr = new CacheHotspotManager((CloudSystemInfoService) systemInfo);
this.upgradeMgr = new CloudUpgradeMgr((CloudSystemInfoService) systemInfo);
}
public CloudTabletRebalancer getCloudTabletRebalancer() {
return this.cloudTabletRebalancer;
}
public CloudUpgradeMgr getCloudUpgradeMgr() {
return this.upgradeMgr;
}
public CloudClusterChecker getCloudClusterChecker() {
return this.cloudClusterCheck;
}
public String getCloudInstanceId() {
return cloudInstanceId;
}
private void setCloudInstanceId(String cloudInstanceId) {
this.cloudInstanceId = cloudInstanceId;
}
@Override
public void initialize(String[] args) throws Exception {
if (Strings.isNullOrEmpty(Config.cloud_unique_id) && Config.cluster_id == -1) {
throw new UserException("cluster_id must be specified in fe.conf if deployed "
+ "in cloud mode, because FE should known to which it belongs");
}
if (Config.cluster_id != -1) {
setCloudInstanceId(String.valueOf(Config.cluster_id));
}
if (Strings.isNullOrEmpty(Config.cloud_unique_id) && !Strings.isNullOrEmpty(cloudInstanceId)) {
Config.cloud_unique_id = "1:" + cloudInstanceId + ":fe";
LOG.info("cloud_unique_id is empty, setting it to: {}", Config.cloud_unique_id);
}
LOG.info("Initializing CloudEnv with cloud_unique_id: {}, cluster_id: {}, cloudInstanceId: {}",
Config.cloud_unique_id, Config.cluster_id, cloudInstanceId);
super.initialize(args);
}
@Override
protected void startMasterOnlyDaemonThreads() {
LOG.info("start cloud Master only daemon threads");
super.startMasterOnlyDaemonThreads();
cleanCopyJobScheduler.start();
cloudClusterCheck.start();
cloudTabletRebalancer.start();
if (Config.enable_fetch_cluster_cache_hotspot) {
cacheHotspotMgr.start();
}
upgradeMgr.start();
}
@Override
protected void startNonMasterDaemonThreads() {
LOG.info("start cloud Non Master only daemon threads");
super.startNonMasterDaemonThreads();
cloudInstanceStatusChecker.start();
}
public static String genFeNodeNameFromMeta(String host, int port, long timeMs) {
return host + "_" + port + "_" + timeMs;
}
public CacheHotspotManager getCacheHotspotMgr() {
return cacheHotspotMgr;
}
private CloudSystemInfoService getCloudSystemInfoService() {
return (CloudSystemInfoService) systemInfo;
}
private Cloud.NodeInfoPB getLocalTypeFromMetaService() {
// get helperNodes from ms
Cloud.GetClusterResponse response = getCloudSystemInfoService()
.getCloudCluster(Config.cloud_sql_server_cluster_name, Config.cloud_sql_server_cluster_id, "");
if (!response.hasStatus() || !response.getStatus().hasCode()
|| response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
LOG.warn("failed to get cloud cluster due to incomplete response, "
+ "cloud_unique_id={}, clusterId={}, response={}",
Config.cloud_unique_id, Config.cloud_sql_server_cluster_id, response);
return null;
}
LOG.info("get cluster response from meta service {}", response);
// Note: get_cluster interface cluster(option -> repeated), so it has at least one cluster.
if (response.getClusterCount() == 0) {
LOG.warn("meta service error , return cluster zero, plz check it, "
+ "cloud_unique_id={}, clusterId={}, response={}",
Config.cloud_unique_id, Config.cloud_sql_server_cluster_id, response);
return null;
}
this.enableStorageVault = response.getEnableStorageVault();
List<Cloud.NodeInfoPB> allNodes = response.getCluster(0).getNodesList()
.stream().filter(NodeInfoPB::hasNodeType).collect(Collectors.toList());
helperNodes.clear();
Optional<Cloud.NodeInfoPB> firstNonObserverNode = allNodes.stream()
.filter(nodeInfoPB -> nodeInfoPB.getNodeType() != NodeInfoPB.NodeType.FE_OBSERVER).findFirst();
firstNonObserverNode.ifPresent(nodeInfoPB -> helperNodes.add(new HostInfo(
Config.enable_fqdn_mode ? nodeInfoPB.getHost()
: nodeInfoPB.getIp(),
nodeInfoPB.getEditLogPort())));
Preconditions.checkState(helperNodes.size() == 1);
Optional<NodeInfoPB> local = allNodes.stream().filter(n -> ((Config.enable_fqdn_mode ? n.getHost() : n.getIp())
+ "_" + n.getEditLogPort()).equals(selfNode.getIdent())).findAny();
return local.orElse(null);
}
private void tryAddMyselfToMS() {
try {
try {
getCloudSystemInfoService().tryCreateInstance(getCloudInstanceId(),
getCloudInstanceId(), false);
} catch (Exception e) {
return;
}
addFrontend(FrontendNodeType.MASTER, selfNode.getHost(), selfNode.getPort());
} catch (DdlException e) {
LOG.warn("get ddl exception ", e);
}
}
protected void getClusterIdAndRole() throws IOException {
NodeInfoPB.NodeType type = NodeInfoPB.NodeType.UNKNOWN;
// cloud mode
while (true) {
Cloud.NodeInfoPB nodeInfoPB = null;
try {
nodeInfoPB = getLocalTypeFromMetaService();
} catch (Exception e) {
LOG.warn("failed to get local fe's type, sleep {} s, try again. exception: {}",
Config.resource_not_ready_sleep_seconds, e.getMessage());
}
if (nodeInfoPB == null) {
LOG.warn("failed to get local fe's type, sleep {} s, try again.",
Config.resource_not_ready_sleep_seconds);
if (isStartFromEmpty()) {
tryAddMyselfToMS();
}
try {
Thread.sleep(Config.resource_not_ready_sleep_seconds * 1000);
} catch (InterruptedException e) {
LOG.info("interrupted by {}", e);
}
continue;
}
type = nodeInfoPB.getNodeType();
break;
}
try {
String instanceId;
instanceId = getCloudSystemInfoService().getInstanceId(Config.cloud_unique_id);
setCloudInstanceId(instanceId);
} catch (IOException e) {
LOG.error("Failed to get instance ID from cloud_unique_id: {}", Config.cloud_unique_id, e);
throw e;
}
LOG.info("current fe's role is {}", type == NodeInfoPB.NodeType.FE_MASTER ? "MASTER" :
type == NodeInfoPB.NodeType.FE_FOLLOWER ? "FOLLOWER" :
type == NodeInfoPB.NodeType.FE_OBSERVER ? "OBSERVER" : "UNKNOWN");
if (type == NodeInfoPB.NodeType.UNKNOWN) {
LOG.warn("type current not support, please check it");
System.exit(-1);
}
super.getClusterIdAndRole();
}
@Override
public long loadTransactionState(DataInputStream dis, long checksum) throws IOException {
// for CloudGlobalTransactionMgr do nothing.
return checksum;
}
@Override
public long saveTransactionState(CountingDataOutputStream dos, long checksum) throws IOException {
return checksum;
}
public void checkCloudClusterPriv(String clusterName) throws DdlException {
// check resource usage privilege
if (!Env.getCurrentEnv().getAccessManager().checkCloudPriv(ConnectContext.get().getCurrentUserIdentity(),
clusterName, PrivPredicate.USAGE, ResourceTypeEnum.CLUSTER)) {
throw new DdlException("USAGE denied to user "
+ ConnectContext.get().getCurrentUserIdentity().getQualifiedUser() + "'@'" + ConnectContext.get()
.getRemoteIP()
+ "' for cloud cluster '" + clusterName + "'", ErrorCode.ERR_CLUSTER_NO_PERMISSIONS);
}
if (!getCloudSystemInfoService().getCloudClusterNames().contains(clusterName)) {
if (LOG.isDebugEnabled()) {
LOG.debug("current instance does not have a cluster name :{}", clusterName);
}
throw new DdlException(String.format("Compute Group %s not exist", clusterName),
ErrorCode.ERR_CLOUD_CLUSTER_ERROR);
}
}
public void changeCloudCluster(String clusterName, ConnectContext ctx) throws DdlException {
checkCloudClusterPriv(clusterName);
getCloudSystemInfoService().waitForAutoStart(clusterName);
try {
getCloudSystemInfoService().addCloudCluster(clusterName, "");
} catch (UserException e) {
throw new DdlException(e.getMessage(), e.getMysqlErrorCode());
}
ctx.setCloudCluster(clusterName);
ctx.getState().setOk();
}
public String analyzeCloudCluster(String name, ConnectContext ctx) throws DdlException {
String[] res = name.split("@");
if (res.length != 1 && res.length != 2) {
LOG.warn("invalid database name {}", name);
throw new DdlException("Invalid database name: " + name, ErrorCode.ERR_BAD_DB_ERROR);
}
if (res.length == 1) {
return name;
}
changeCloudCluster(res[1], ctx);
return res[0];
}
public void replayUpdateCloudReplica(UpdateCloudReplicaInfo info) throws MetaNotFoundException {
((CloudInternalCatalog) getInternalCatalog()).replayUpdateCloudReplica(info);
}
public boolean getEnableStorageVault() {
return this.enableStorageVault;
}
public void createStage(CreateStageStmt stmt) throws DdlException {
if (Config.isNotCloudMode()) {
throw new DdlException("stage is only supported in cloud mode");
}
if (!stmt.isDryRun()) {
((CloudInternalCatalog) getInternalCatalog()).createStage(stmt.toStageProto(), stmt.isIfNotExists());
}
}
public void createStage(CreateStageCommand command) throws DdlException {
if (Config.isNotCloudMode()) {
throw new DdlException("stage is only supported in cloud mode");
}
if (!command.isDryRun()) {
((CloudInternalCatalog) getInternalCatalog()).createStage(command.toStageProto(), command.isIfNotExists());
}
}
public void dropStage(DropStageStmt stmt) throws DdlException {
if (Config.isNotCloudMode()) {
throw new DdlException("stage is only supported in cloud mode");
}
((CloudInternalCatalog) getInternalCatalog()).dropStage(Cloud.StagePB.StageType.EXTERNAL,
null, null, stmt.getStageName(), null, stmt.isIfExists());
}
public void dropStage(DropStageCommand command) throws DdlException {
if (Config.isNotCloudMode()) {
throw new DdlException("stage is only supported in cloud mode");
}
((CloudInternalCatalog) getInternalCatalog()).dropStage(Cloud.StagePB.StageType.EXTERNAL,
null, null, command.getStageName(), null, command.isIfExists());
}
public long loadCloudWarmUpJob(DataInputStream dis, long checksum) throws Exception {
int size = dis.readInt();
long newChecksum = checksum ^ size;
if (size > 0) {
// There should be no old cloudWarmUp jobs, if exist throw exception, should not use this FE version
throw new IOException("There are [" + size + "] cloud warm up jobs."
+ " Please downgrade FE to an older version and handle residual jobs");
}
// finished or cancelled jobs
size = dis.readInt();
newChecksum ^= size;
if (size > 0) {
throw new IOException("There are [" + size + "] old finished or cancelled cloud warm up jobs."
+ " Please downgrade FE to an older version and handle residual jobs");
}
size = dis.readInt();
newChecksum ^= size;
for (int i = 0; i < size; i++) {
CloudWarmUpJob cloudWarmUpJob = CloudWarmUpJob.read(dis);
if (cloudWarmUpJob.isExpire() || cloudWarmUpJob.getJobState() == JobState.DELETED) {
LOG.info("cloud warm up job is expired, {}, ignore it", cloudWarmUpJob.getJobId());
continue;
}
this.getCacheHotspotMgr().addCloudWarmUpJob(cloudWarmUpJob);
}
LOG.info("finished load cloud warm up job from image");
return newChecksum;
}
public long saveCloudWarmUpJob(CountingDataOutputStream dos, long checksum) throws IOException {
Map<Long, CloudWarmUpJob> cloudWarmUpJobs;
cloudWarmUpJobs = this.getCacheHotspotMgr().getCloudWarmUpJobs();
/*
* reference: Env.java:saveAlterJob
* alter jobs == 0
* If the FE version upgrade from old version, if it have alter jobs, the FE will failed during start process
*
* the number of old version alter jobs has to be 0
*/
int size = 0;
checksum ^= size;
dos.writeInt(size);
checksum ^= size;
dos.writeInt(size);
size = cloudWarmUpJobs.size();
checksum ^= size;
dos.writeInt(size);
for (CloudWarmUpJob cloudWarmUpJob : cloudWarmUpJobs.values()) {
cloudWarmUpJob.write(dos);
}
return checksum;
}
public void cancelCloudWarmUp(CancelCloudWarmUpStmt stmt) throws DdlException {
getCacheHotspotMgr().cancel(stmt);
}
@Override
public void addFrontend(FrontendNodeType role, String host, int editLogPort) throws DdlException {
getCloudSystemInfoService().addFrontend(role, host, editLogPort);
}
@Override
public void dropFrontend(FrontendNodeType role, String host, int port) throws DdlException {
if (port == selfNode.getPort() && feType == FrontendNodeType.MASTER
&& selfNode.getHost().equals(host)) {
throw new DdlException("can not drop current master node.");
}
Frontend frontend = checkFeExist(host, port);
if (frontend == null) {
throw new DdlException("frontend does not exist[" + NetUtils
.getHostPortInAccessibleFormat(host, port) + "]");
}
if (frontend.getRole() != role) {
throw new DdlException(role.toString() + " does not exist[" + NetUtils
.getHostPortInAccessibleFormat(host, port) + "]");
}
if (Strings.isNullOrEmpty(frontend.getCloudUniqueId())) {
throw new DdlException("Frontend does not have a cloudUniqueId, wait for a minute.");
}
getCloudSystemInfoService().dropFrontend(frontend);
}
@Override
public void modifyFrontendHostName(String srcHost, int srcPort, String destHost) throws DdlException {
throw new DdlException("Modifying frontend hostname is not supported in cloud mode");
}
}