Resource.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.analysis.CreateResourceStmt;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.io.DeepCopy;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.nereids.trees.plans.commands.CreateResourceCommand;
import org.apache.doris.nereids.trees.plans.commands.info.CreateResourceInfo;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
public abstract class Resource implements Writable, GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(OdbcCatalogResource.class);
public static final String REFERENCE_SPLIT = "@";
public static final String INCLUDE_DATABASE_LIST = "include_database_list";
public static final String EXCLUDE_DATABASE_LIST = "exclude_database_list";
public static final String LOWER_CASE_META_NAMES = "lower_case_meta_names";
public static final String META_NAMES_MAPPING = "meta_names_mapping";
public enum ResourceType {
UNKNOWN,
SPARK,
ODBC_CATALOG,
S3,
JDBC,
HDFS,
HMS,
ES,
AZURE;
public static ResourceType fromString(String resourceType) {
for (ResourceType type : ResourceType.values()) {
if (type.name().equalsIgnoreCase(resourceType)) {
return type;
}
}
return UNKNOWN;
}
}
public enum ReferenceType {
TVF, // table valued function
LOAD,
EXPORT,
REPOSITORY,
OUTFILE,
TABLE,
POLICY,
CATALOG
}
@SerializedName(value = "name")
protected String name;
@SerializedName(value = "type")
protected ResourceType type;
@SerializedName(value = "references")
protected Map<String, ReferenceType> references = Maps.newHashMap();
@SerializedName(value = "id")
protected long id = -1;
@SerializedName(value = "version")
protected long version = -1;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
public void writeLock() {
lock.writeLock().lock();
}
public void writeUnlock() {
lock.writeLock().unlock();
}
public void readLock() {
lock.readLock().lock();
}
public void readUnlock() {
lock.readLock().unlock();
}
// https://programmerr47.medium.com/gson-unsafe-problem-d1ff29d4696f
// Resource subclass also MUST define default ctor, otherwise when reloading object from json
// some not serialized field (i.e. `lock`) will be `null`.
public Resource() {
}
public Resource(String name, ResourceType type) {
this.name = name;
this.type = type;
}
public static Resource fromStmt(CreateResourceStmt stmt) throws DdlException {
Resource resource = getResourceInstance(stmt.getResourceType(), stmt.getResourceName());
resource.id = Env.getCurrentEnv().getNextId();
resource.version = 0;
resource.setProperties(stmt.getProperties());
return resource;
}
public static Resource fromCommand(CreateResourceCommand command) throws DdlException {
CreateResourceInfo info = command.getInfo();
Resource resource = getResourceInstance(info.getResourceType(), info.getResourceName());
resource.id = Env.getCurrentEnv().getNextId();
resource.version = 0;
resource.setProperties(info.getProperties());
return resource;
}
public long getId() {
return this.id;
}
public long getVersion() {
return this.version;
}
public synchronized boolean removeReference(String referenceName, ReferenceType type) {
String fullName = referenceName + REFERENCE_SPLIT + type.name();
if (references.remove(fullName) != null) {
LOG.info("Reference(type={}, name={}) is removed from resource {}, current set: {}",
type, referenceName, name, references);
return true;
}
return false;
}
public synchronized boolean addReference(String referenceName, ReferenceType type) {
String fullName = referenceName + REFERENCE_SPLIT + type.name();
if (references.put(fullName, type) == null) {
LOG.info("Reference(type={}, name={}) is added to resource {}, current set: {}",
type, referenceName, name, references);
return true;
}
return false;
}
/**
* Get resource instance by resource name and type
* @param type
* @param name
* @return
* @throws DdlException
*/
private static Resource getResourceInstance(ResourceType type, String name) throws DdlException {
Resource resource;
switch (type) {
case SPARK:
resource = new SparkResource(name);
break;
case ODBC_CATALOG:
resource = new OdbcCatalogResource(name);
break;
case S3:
resource = new S3Resource(name);
break;
case AZURE:
resource = new AzureResource(name);
break;
case JDBC:
resource = new JdbcResource(name);
break;
case HDFS:
resource = new HdfsResource(name);
break;
case HMS:
resource = new HMSResource(name);
break;
case ES:
resource = new EsResource(name);
break;
default:
throw new DdlException("Unknown resource type: " + type);
}
return resource;
}
public String getName() {
return name;
}
public ResourceType getType() {
return type;
}
/**
* Modify properties in child resources
* @param properties
* @throws DdlException
*/
public void modifyProperties(Map<String, String> properties) throws DdlException {
notifyUpdate(properties);
}
/**
* Check properties in child resources
* @param properties
* @throws AnalysisException
*/
public void checkProperties(Map<String, String> properties) throws AnalysisException { }
protected void replaceIfEffectiveValue(Map<String, String> properties, String key, String value) {
if (!Strings.isNullOrEmpty(value)) {
properties.put(key, value);
}
}
/**
* Set and check the properties in child resources
*/
protected abstract void setProperties(ImmutableMap<String, String> properties) throws DdlException;
public abstract Map<String, String> getCopiedProperties();
public void dropResource() throws DdlException {
if (!references.isEmpty()) {
String msg = String.join(", ", references.keySet());
throw new DdlException(String.format("Resource %s is used by: %s", name, msg));
}
}
/**
* Fill BaseProcResult with different properties in child resources
* ResourceMgr.RESOURCE_PROC_NODE_TITLE_NAMES format:
* | Name | ResourceType | Key | Value |
*/
protected abstract void getProcNodeData(BaseProcResult result);
@Override
public String toString() {
return GsonUtils.GSON.toJson(this);
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
public static Resource read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, Resource.class);
}
@Override
public void gsonPostProcess() throws IOException {
// Resource is loaded from meta with older version
if (references == null) {
references = Maps.newHashMap();
}
}
@Override
public Resource clone() {
Resource copied = DeepCopy.copy(this, Resource.class, FeConstants.meta_version);
if (copied == null) {
LOG.warn("failed to clone odbc resource: " + getName());
return null;
}
return copied;
}
private void notifyUpdate(Map<String, String> properties) {
references.entrySet().stream().collect(Collectors.groupingBy(Entry::getValue)).forEach((type, refs) -> {
if (type == ReferenceType.CATALOG) {
for (Map.Entry<String, ReferenceType> ref : refs) {
String catalogName = ref.getKey().split(REFERENCE_SPLIT)[0];
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName);
if (catalog == null) {
LOG.warn("Can't find the reference catalog {} for resource {}", catalogName, name);
continue;
}
if (!name.equals(catalog.getResource())) {
LOG.warn("Failed to update catalog {} for different resource "
+ "names(resource={}, catalog.resource={})", catalogName, name, catalog.getResource());
continue;
}
catalog.notifyPropertiesUpdated(properties);
}
}
});
}
public void applyDefaultProperties() {}
}