ResourceMgr.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.analysis.AlterResourceStmt;
import org.apache.doris.analysis.CreateResourceStmt;
import org.apache.doris.analysis.DropResourceStmt;
import org.apache.doris.catalog.Resource.ResourceType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.common.proc.ProcNodeInterface;
import org.apache.doris.common.proc.ProcResult;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.commands.CreateResourceCommand;
import org.apache.doris.nereids.trees.plans.commands.DropResourceCommand;
import org.apache.doris.nereids.trees.plans.commands.info.CreateResourceInfo;
import org.apache.doris.persist.DropResourceOperationLog;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.policy.Policy;
import org.apache.doris.policy.StoragePolicy;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Resource manager is responsible for managing external resources used by Doris.
* For example, Spark/MapReduce used for ETL, Spark/GPU used for queries, HDFS/S3 used for external storage.
* Now only support Spark.
*/
public class ResourceMgr implements Writable {
private static final Logger LOG = LogManager.getLogger(ResourceMgr.class);
public static final ImmutableList<String> RESOURCE_PROC_NODE_TITLE_NAMES = new ImmutableList.Builder<String>()
.add("Name").add("ResourceType").add("Item").add("Value")
.build();
// { resourceName -> Resource}
@SerializedName(value = "nameToResource")
private final Map<String, Resource> nameToResource = Maps.newConcurrentMap();
private final ResourceProcNode procNode = new ResourceProcNode();
public ResourceMgr() {
}
public void createResource(CreateResourceStmt stmt) throws DdlException {
if (stmt.getResourceType() == ResourceType.UNKNOWN) {
throw new DdlException("Only support SPARK, ODBC_CATALOG ,JDBC, S3_COOLDOWN, S3, HDFS and HMS resource.");
}
Resource resource = Resource.fromStmt(stmt);
if (createResource(resource, stmt.isIfNotExists())) {
Env.getCurrentEnv().getEditLog().logCreateResource(resource);
LOG.info("Create resource success. Resource: {}", resource.getName());
}
}
public void createResource(CreateResourceCommand command) throws DdlException {
CreateResourceInfo info = command.getInfo();
if (info.getResourceType() == ResourceType.UNKNOWN) {
throw new DdlException("Only support SPARK, ODBC_CATALOG ,JDBC, S3_COOLDOWN, S3, HDFS and HMS resource.");
}
Resource resource = Resource.fromCommand(command);
if (createResource(resource, info.isIfNotExists())) {
Env.getCurrentEnv().getEditLog().logCreateResource(resource);
LOG.info("Create resource success. Resource: {}", resource.getName());
}
}
// Return true if the resource is truly added,
// otherwise, return false or throw exception.
public boolean createResource(Resource resource, boolean ifNotExists) throws DdlException {
String resourceName = resource.getName();
if (nameToResource.putIfAbsent(resourceName, resource) != null) {
if (ifNotExists) {
return false;
}
throw new DdlException("Resource(" + resourceName + ") already exist");
}
return true;
}
public void replayCreateResource(Resource resource) {
resource.applyDefaultProperties();
nameToResource.put(resource.getName(), resource);
}
public void dropResource(DropResourceCommand dropResourceCommand) throws DdlException {
String resourceName = dropResourceCommand.getResourceName();
if (!nameToResource.containsKey(resourceName)) {
if (dropResourceCommand.isIfExists()) {
return;
}
throw new DdlException("Resource(" + resourceName + ") does not exist");
}
Resource resource = nameToResource.get(resourceName);
resource.dropResource();
// Check whether the resource is in use before deleting it, except spark resource
StoragePolicy checkedStoragePolicy = StoragePolicy.ofCheck(null);
checkedStoragePolicy.setStorageResource(resourceName);
if (Env.getCurrentEnv().getPolicyMgr().existPolicy(checkedStoragePolicy)) {
Policy policy = Env.getCurrentEnv().getPolicyMgr().getPolicy(checkedStoragePolicy);
LOG.warn("Can not drop resource, since it's used in policy {}", policy.getPolicyName());
throw new DdlException("Can not drop resource, since it's used in policy " + policy.getPolicyName());
}
nameToResource.remove(resourceName);
// log drop
Env.getCurrentEnv().getEditLog().logDropResource(new DropResourceOperationLog(resourceName));
LOG.info("Drop resource success. Resource resourceName: {}", resourceName);
}
public void dropResource(DropResourceStmt stmt) throws DdlException {
String resourceName = stmt.getResourceName();
if (!nameToResource.containsKey(resourceName)) {
if (stmt.isIfExists()) {
return;
}
throw new DdlException("Resource(" + resourceName + ") does not exist");
}
Resource resource = nameToResource.get(resourceName);
resource.dropResource();
// Check whether the resource is in use before deleting it, except spark resource
StoragePolicy checkedStoragePolicy = StoragePolicy.ofCheck(null);
checkedStoragePolicy.setStorageResource(resourceName);
if (Env.getCurrentEnv().getPolicyMgr().existPolicy(checkedStoragePolicy)) {
Policy policy = Env.getCurrentEnv().getPolicyMgr().getPolicy(checkedStoragePolicy);
LOG.warn("Can not drop resource, since it's used in policy {}", policy.getPolicyName());
throw new DdlException("Can not drop resource, since it's used in policy " + policy.getPolicyName());
}
nameToResource.remove(resourceName);
// log drop
Env.getCurrentEnv().getEditLog().logDropResource(new DropResourceOperationLog(resourceName));
LOG.info("Drop resource success. Resource resourceName: {}", resourceName);
}
// Drop resource whether successful or not
public void dropResource(Resource resource) {
String name = resource.getName();
if (nameToResource.remove(name) == null) {
LOG.info("resource " + name + " does not exists.");
}
}
public void replayDropResource(DropResourceOperationLog operationLog) {
nameToResource.remove(operationLog.getName());
}
public void alterResource(String resourceName, Map<String, String> properties) throws DdlException {
if (!nameToResource.containsKey(resourceName)) {
throw new DdlException("Resource(" + resourceName + ") dose not exist.");
}
Resource resource = nameToResource.get(resourceName);
resource.modifyProperties(properties);
// log alter
Env.getCurrentEnv().getEditLog().logAlterResource(resource);
LOG.info("Alter resource success. Resource: {}", resource);
}
public void alterResource(AlterResourceStmt stmt) throws DdlException {
String resourceName = stmt.getResourceName();
Map<String, String> properties = stmt.getProperties();
alterResource(resourceName, properties);
}
public void replayAlterResource(Resource resource) {
nameToResource.put(resource.getName(), resource);
}
public boolean containsResource(String name) {
return nameToResource.containsKey(name);
}
public Resource getResource(String name) {
// nameToResource == null iff this is in replay thread
// just return null to ignore this.
if (nameToResource == null) {
return null;
}
return nameToResource.get(name);
}
public int getResourceNum() {
return nameToResource.size();
}
public List<Resource> getResource(ResourceType type) {
return nameToResource.values().stream().filter(resource -> resource.getType() == type)
.collect(Collectors.toList());
}
public List<List<Comparable>> getResourcesInfo(PatternMatcher matcher,
String name, boolean accurateMatch, Set<String> typeSets) {
List<List<String>> targetRows = procNode.fetchResult().getRows();
List<List<Comparable>> returnRows = Lists.newArrayList();
for (List<String> row : targetRows) {
if (row == null || row.size() < 2) {
continue;
}
String resourceName = row.get(0);
String resourceType = row.get(1);
if (matcher != null && !matcher.match(resourceName)) {
continue;
}
if (name != null) {
if (accurateMatch && !resourceName.equals(name)) {
continue;
}
if (!accurateMatch && !resourceName.contains(name)) {
continue;
}
}
if (typeSets != null) {
if (!typeSets.contains(resourceType.toUpperCase())) {
continue;
}
}
List<Comparable> comparableRow = Lists.newArrayList();
for (Comparable slot : row) {
comparableRow.add(slot);
}
returnRows.add(comparableRow);
}
return returnRows;
}
public ResourceProcNode getProcNode() {
return procNode;
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
public static ResourceMgr read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, ResourceMgr.class);
}
public class ResourceProcNode implements ProcNodeInterface {
@Override
public ProcResult fetchResult() {
BaseProcResult result = new BaseProcResult();
result.setNames(RESOURCE_PROC_NODE_TITLE_NAMES);
for (Map.Entry<String, Resource> entry : nameToResource.entrySet()) {
Resource resource = entry.getValue();
// check resource privs
if (!Env.getCurrentEnv().getAccessManager().checkResourcePriv(ConnectContext.get(), resource.getName(),
PrivPredicate.SHOW_RESOURCES)) {
continue;
}
resource.getProcNodeData(result);
}
return result;
}
}
public static int analyzeColumn(String columnName) throws AnalysisException {
for (String title : RESOURCE_PROC_NODE_TITLE_NAMES) {
if (title.equalsIgnoreCase(columnName)) {
return RESOURCE_PROC_NODE_TITLE_NAMES.indexOf(title);
}
}
throw new AnalysisException("Title name[" + columnName + "] does not exist");
}
}