HiveExternalMetaCache.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.hive;
import org.apache.doris.analysis.PartitionValue;
import org.apache.doris.backup.Status.ErrCode;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CacheException;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.NameMapping;
import org.apache.doris.datasource.SchemaCacheKey;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.metacache.AbstractExternalMetaCache;
import org.apache.doris.datasource.metacache.CacheSpec;
import org.apache.doris.datasource.metacache.MetaCacheEntry;
import org.apache.doris.datasource.metacache.MetaCacheEntryDef;
import org.apache.doris.fs.DirectoryLister;
import org.apache.doris.fs.FileSystemCache;
import org.apache.doris.fs.FileSystemDirectoryLister;
import org.apache.doris.fs.FileSystemIOException;
import org.apache.doris.fs.RemoteIterator;
import org.apache.doris.fs.remote.RemoteFile;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges;
import org.apache.doris.planner.ListPartitionPrunerV2;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Streams;
import lombok.Data;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.utils.FileUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
/**
* Hive engine implementation of {@link AbstractExternalMetaCache}.
*
* <p>This cache consolidates schema metadata and Hive Metastore-derived runtime metadata
* under one engine so callers can use a unified invalidation path.
*
* <p>Registered entries:
* <ul>
* <li>{@code schema}: table schema cache keyed by {@link SchemaCacheKey}</li>
* <li>{@code partition_values}: partition value/index structures per table</li>
* <li>{@code partition}: single partition metadata keyed by partition values</li>
* <li>{@code file}: file listing cache for partition/table locations</li>
* </ul>
*
* <p>Invalidation behavior:
* <ul>
* <li>{@link #invalidateDb(long, String)} and {@link #invalidateTable(long, String, String)}
* clear all related entries with table/db granularity.</li>
* <li>{@link #invalidatePartitions(long, String, String, List)} supports partition-level
* invalidation when specific partition names are provided, and falls back to table-level
* invalidation for empty input or unresolved table metadata.</li>
* </ul>
*/
public class HiveExternalMetaCache extends AbstractExternalMetaCache {
private static final Logger LOG = LogManager.getLogger(HiveExternalMetaCache.class);
public static final String ENGINE = "hive";
public static final String ENTRY_SCHEMA = "schema";
public static final String ENTRY_PARTITION_VALUES = "partition_values";
public static final String ENTRY_PARTITION = "partition";
public static final String ENTRY_FILE = "file";
public static final String HIVE_DEFAULT_PARTITION = "__HIVE_DEFAULT_PARTITION__";
public static final String ERR_CACHE_INCONSISTENCY = "ERR_CACHE_INCONSISTENCY: ";
private static final CacheSpec SCHEMA_CACHE_SPEC = CacheSpec.fromTtlValue(
null, Config.external_cache_expire_time_seconds_after_access, Config.max_external_schema_cache_num);
private static final CacheSpec PARTITION_VALUES_CACHE_SPEC = CacheSpec.fromTtlValue(
null, Config.external_cache_expire_time_seconds_after_access, Config.max_hive_partition_table_cache_num);
private static final CacheSpec PARTITION_CACHE_SPEC = CacheSpec.fromTtlValue(
null, Config.external_cache_expire_time_seconds_after_access, Config.max_hive_partition_cache_num);
private static final CacheSpec FILE_CACHE_SPEC = CacheSpec.fromTtlValue(
null, Config.external_cache_expire_time_seconds_after_access, Config.max_external_file_cache_num);
private final ExecutorService fileListingExecutor;
private final MetaCacheEntryDef<SchemaCacheKey, SchemaCacheValue> schemaEntryDef;
private final MetaCacheEntryDef<PartitionValueCacheKey, HivePartitionValues> partitionValuesEntryDef;
private final MetaCacheEntryDef<PartitionCacheKey, HivePartition> partitionEntryDef;
private final MetaCacheEntryDef<FileCacheKey, FileCacheValue> fileEntryDef;
public HiveExternalMetaCache(ExecutorService refreshExecutor, ExecutorService fileListingExecutor) {
super(ENGINE, refreshExecutor);
this.fileListingExecutor = fileListingExecutor;
schemaEntryDef = MetaCacheEntryDef.of(
ENTRY_SCHEMA,
SchemaCacheKey.class,
SchemaCacheValue.class,
this::loadSchemaCacheValue,
SCHEMA_CACHE_SPEC);
partitionValuesEntryDef = MetaCacheEntryDef.of(
ENTRY_PARTITION_VALUES,
PartitionValueCacheKey.class,
HivePartitionValues.class,
this::loadPartitionValuesCacheValue,
PARTITION_VALUES_CACHE_SPEC);
partitionEntryDef = MetaCacheEntryDef.of(
ENTRY_PARTITION,
PartitionCacheKey.class,
HivePartition.class,
this::loadPartitionCacheValue,
PARTITION_CACHE_SPEC);
fileEntryDef = MetaCacheEntryDef.of(
ENTRY_FILE,
FileCacheKey.class,
FileCacheValue.class,
this::loadFileCacheValue,
FILE_CACHE_SPEC);
registerMetaCacheEntryDef(schemaEntryDef);
registerMetaCacheEntryDef(partitionValuesEntryDef);
registerMetaCacheEntryDef(partitionEntryDef);
registerMetaCacheEntryDef(fileEntryDef);
}
public void refreshCatalog(long catalogId) {
invalidateCatalog(catalogId);
CatalogIf<?> catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
Map<String, String> catalogProperties = catalog == null || catalog.getProperties() == null
? Maps.newHashMap()
: Maps.newHashMap(catalog.getProperties());
initCatalog(catalogId, catalogProperties);
}
@Override
public void invalidateDb(long catalogId, String dbName) {
schemaEntry(catalogId).invalidateIf(key -> key.getNameMapping().getCtlId() == catalogId
&& key.getNameMapping().getLocalDbName().equals(dbName));
partitionValuesEntry(catalogId).invalidateIf(key -> key.getNameMapping().getCtlId() == catalogId
&& key.getNameMapping().getLocalDbName().equals(dbName));
partitionEntry(catalogId).invalidateIf(key -> key.getNameMapping().getCtlId() == catalogId
&& key.getNameMapping().getLocalDbName().equals(dbName));
fileEntry(catalogId).invalidateIf(key -> key.getCatalogId() == catalogId);
}
@Override
public void invalidateTable(long catalogId, String dbName, String tableName) {
schemaEntry(catalogId).invalidateIf(key -> key.getNameMapping().getCtlId() == catalogId
&& key.getNameMapping().getLocalDbName().equals(dbName)
&& key.getNameMapping().getLocalTblName().equals(tableName));
partitionValuesEntry(catalogId).invalidateIf(key -> key.getNameMapping().getCtlId() == catalogId
&& key.getNameMapping().getLocalDbName().equals(dbName)
&& key.getNameMapping().getLocalTblName().equals(tableName));
partitionEntry(catalogId).invalidateIf(key -> key.getNameMapping().getCtlId() == catalogId
&& key.getNameMapping().getLocalDbName().equals(dbName)
&& key.getNameMapping().getLocalTblName().equals(tableName));
long tableId = Util.genIdByName(dbName, tableName);
fileEntry(catalogId).invalidateIf(key -> key.getCatalogId() == catalogId && key.isSameTable(tableId));
}
@Override
public void invalidatePartitions(long catalogId, String dbName, String tableName, List<String> partitions) {
if (partitions == null || partitions.isEmpty()) {
invalidateTable(catalogId, dbName, tableName);
return;
}
CatalogIf<?> catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
if (!(catalog instanceof HMSExternalCatalog)) {
return;
}
HMSExternalCatalog hmsCatalog = (HMSExternalCatalog) catalog;
if (hmsCatalog.getDbNullable(dbName) == null
|| !(hmsCatalog.getDbNullable(dbName).getTableNullable(tableName) instanceof HMSExternalTable)) {
invalidateTable(catalogId, dbName, tableName);
return;
}
HMSExternalTable hmsTable = (HMSExternalTable) hmsCatalog.getDbNullable(dbName).getTableNullable(tableName);
for (String partition : partitions) {
invalidatePartitionCache(hmsTable, partition);
}
}
@Override
protected Map<String, String> catalogPropertyCompatibilityMap() {
Map<String, String> compatibilityMap = Maps.newHashMap();
compatibilityMap.put(
ExternalCatalog.SCHEMA_CACHE_TTL_SECOND,
"meta.cache." + ENGINE + "." + ENTRY_SCHEMA + ".ttl-second");
compatibilityMap.put(
HMSExternalCatalog.PARTITION_CACHE_TTL_SECOND,
"meta.cache." + ENGINE + "." + ENTRY_PARTITION_VALUES + ".ttl-second");
compatibilityMap.put(
HMSExternalCatalog.FILE_META_CACHE_TTL_SECOND,
"meta.cache." + ENGINE + "." + ENTRY_FILE + ".ttl-second");
return compatibilityMap;
}
private MetaCacheEntry<SchemaCacheKey, SchemaCacheValue> schemaEntry(long catalogId) {
return entry(catalogId, schemaEntryDef);
}
private MetaCacheEntry<PartitionValueCacheKey, HivePartitionValues> partitionValuesEntry(long catalogId) {
return entry(catalogId, partitionValuesEntryDef);
}
private MetaCacheEntry<PartitionCacheKey, HivePartition> partitionEntry(long catalogId) {
return entry(catalogId, partitionEntryDef);
}
private MetaCacheEntry<FileCacheKey, FileCacheValue> fileEntry(long catalogId) {
return entry(catalogId, fileEntryDef);
}
private MetaCacheEntry<SchemaCacheKey, SchemaCacheValue> schemaEntryIfInitialized(long catalogId) {
try {
return schemaEntry(catalogId);
} catch (IllegalStateException e) {
return null;
}
}
private MetaCacheEntry<PartitionValueCacheKey, HivePartitionValues> partitionValuesEntryIfInitialized(
long catalogId) {
try {
return partitionValuesEntry(catalogId);
} catch (IllegalStateException e) {
return null;
}
}
private MetaCacheEntry<PartitionCacheKey, HivePartition> partitionEntryIfInitialized(long catalogId) {
try {
return partitionEntry(catalogId);
} catch (IllegalStateException e) {
return null;
}
}
private MetaCacheEntry<FileCacheKey, FileCacheValue> fileEntryIfInitialized(long catalogId) {
try {
return fileEntry(catalogId);
} catch (IllegalStateException e) {
return null;
}
}
private SchemaCacheValue loadSchemaCacheValue(SchemaCacheKey key) {
CatalogIf<?> catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(key.getNameMapping().getCtlId());
if (!(catalog instanceof ExternalCatalog)) {
throw new CacheException("catalog %s is not external when loading hive schema cache",
null, key.getNameMapping().getCtlId());
}
ExternalCatalog externalCatalog = (ExternalCatalog) catalog;
return externalCatalog.getSchema(key).orElseThrow(() -> new CacheException(
"failed to load hive schema cache value for: %s.%s.%s",
null, key.getNameMapping().getCtlId(),
key.getNameMapping().getLocalDbName(),
key.getNameMapping().getLocalTblName()));
}
private HivePartitionValues loadPartitionValuesCacheValue(PartitionValueCacheKey key) {
return loadPartitionValues(key);
}
private HivePartition loadPartitionCacheValue(PartitionCacheKey key) {
return loadPartition(key);
}
private FileCacheValue loadFileCacheValue(FileCacheKey key) {
return loadFiles(key, new FileSystemDirectoryLister(), null);
}
private HMSExternalCatalog hmsCatalog(long catalogId) {
CatalogIf<?> catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
if (!(catalog instanceof HMSExternalCatalog)) {
throw new CacheException("catalog %s is not hms when loading hive metastore cache", null, catalogId);
}
return (HMSExternalCatalog) catalog;
}
private HivePartitionValues loadPartitionValues(PartitionValueCacheKey key) {
NameMapping nameMapping = key.nameMapping;
HMSExternalCatalog catalog = hmsCatalog(nameMapping.getCtlId());
List<String> partitionNames = catalog.getClient()
.listPartitionNames(nameMapping.getRemoteDbName(), nameMapping.getRemoteTblName());
if (LOG.isDebugEnabled()) {
LOG.debug("load #{} partitions for {} in catalog {}", partitionNames.size(), key, catalog.getName());
}
Map<Long, PartitionItem> idToPartitionItem = Maps.newHashMapWithExpectedSize(partitionNames.size());
BiMap<String, Long> partitionNameToIdMap = HashBiMap.create(partitionNames.size());
String localDbName = nameMapping.getLocalDbName();
String localTblName = nameMapping.getLocalTblName();
for (String partitionName : partitionNames) {
long partitionId = Util.genIdByName(catalog.getName(), localDbName, localTblName, partitionName);
ListPartitionItem listPartitionItem = toListPartitionItem(partitionName, key.types, catalog.getName());
idToPartitionItem.put(partitionId, listPartitionItem);
partitionNameToIdMap.put(partitionName, partitionId);
}
Map<Long, List<String>> partitionValuesMap = ListPartitionPrunerV2.getPartitionValuesMap(idToPartitionItem);
return new HivePartitionValues(idToPartitionItem, partitionNameToIdMap, partitionValuesMap);
}
private ListPartitionItem toListPartitionItem(String partitionName, List<Type> types, String catalogName) {
List<String> partitionValues = HiveUtil.toPartitionValues(partitionName);
Preconditions.checkState(types != null,
ERR_CACHE_INCONSISTENCY + "partition types is null for partition " + partitionName);
Preconditions.checkState(partitionValues.size() == types.size(),
ERR_CACHE_INCONSISTENCY + partitionName + " vs. " + types);
List<PartitionValue> values = Lists.newArrayListWithExpectedSize(types.size());
for (String partitionValue : partitionValues) {
values.add(new PartitionValue(partitionValue, HIVE_DEFAULT_PARTITION.equals(partitionValue)));
}
try {
PartitionKey partitionKey = PartitionKey.createListPartitionKeyWithTypes(values, types, true);
return new ListPartitionItem(Lists.newArrayList(partitionKey));
} catch (AnalysisException e) {
throw new CacheException("failed to convert hive partition %s to list partition in catalog %s",
e, partitionName, catalogName);
}
}
private HivePartition loadPartition(PartitionCacheKey key) {
NameMapping nameMapping = key.nameMapping;
HMSExternalCatalog catalog = hmsCatalog(nameMapping.getCtlId());
Partition partition = catalog.getClient()
.getPartition(nameMapping.getRemoteDbName(), nameMapping.getRemoteTblName(), key.values);
StorageDescriptor sd = partition.getSd();
if (LOG.isDebugEnabled()) {
LOG.debug("load partition format: {}, location: {} for {} in catalog {}",
sd.getInputFormat(), sd.getLocation(), key, catalog.getName());
}
return new HivePartition(nameMapping, false, sd.getInputFormat(), sd.getLocation(), key.values,
partition.getParameters());
}
private Map<PartitionCacheKey, HivePartition> loadPartitions(Iterable<? extends PartitionCacheKey> keys) {
Map<PartitionCacheKey, HivePartition> result = new HashMap<>();
if (keys == null) {
return result;
}
List<PartitionCacheKey> keyList = Streams.stream(keys).collect(Collectors.toList());
if (keyList.isEmpty()) {
return result;
}
PartitionCacheKey oneKey = keyList.get(0);
NameMapping nameMapping = oneKey.nameMapping;
HMSExternalCatalog catalog = hmsCatalog(nameMapping.getCtlId());
String localDbName = nameMapping.getLocalDbName();
String localTblName = nameMapping.getLocalTblName();
List<Column> partitionColumns = ((HMSExternalTable) catalog.getDbNullable(localDbName)
.getTableNullable(localTblName)).getPartitionColumns();
List<String> partitionNames = keyList.stream().map(key -> {
StringBuilder sb = new StringBuilder();
Preconditions.checkState(key.getValues().size() == partitionColumns.size());
for (int i = 0; i < partitionColumns.size(); i++) {
sb.append(FileUtils.escapePathName(partitionColumns.get(i).getName()));
sb.append("=");
sb.append(FileUtils.escapePathName(key.getValues().get(i)));
sb.append("/");
}
sb.delete(sb.length() - 1, sb.length());
return sb.toString();
}).collect(Collectors.toList());
List<Partition> partitions = catalog.getClient().getPartitions(
nameMapping.getRemoteDbName(), nameMapping.getRemoteTblName(), partitionNames);
for (Partition partition : partitions) {
StorageDescriptor sd = partition.getSd();
result.put(new PartitionCacheKey(nameMapping, partition.getValues()),
new HivePartition(nameMapping, false,
sd.getInputFormat(), sd.getLocation(), partition.getValues(),
partition.getParameters()));
}
return result;
}
private FileCacheValue getFileCache(HMSExternalCatalog catalog,
LocationPath path,
String inputFormat,
List<String> partitionValues,
DirectoryLister directoryLister,
TableIf table) throws UserException {
FileCacheValue result = new FileCacheValue();
FileSystemCache.FileSystemCacheKey fileSystemCacheKey = new FileSystemCache.FileSystemCacheKey(
path.getFsIdentifier(), path.getStorageProperties());
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache()
.getRemoteFileSystem(fileSystemCacheKey);
result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, path.getNormalizedLocation()));
boolean isRecursiveDirectories = Boolean.valueOf(
catalog.getProperties().getOrDefault("hive.recursive_directories", "true"));
try {
RemoteIterator<RemoteFile> iterator = directoryLister.listFiles(fs, isRecursiveDirectories,
table, path.getNormalizedLocation());
while (iterator.hasNext()) {
RemoteFile remoteFile = iterator.next();
String srcPath = remoteFile.getPath().toString();
LocationPath locationPath = LocationPath.of(srcPath, path.getStorageProperties());
result.addFile(remoteFile, locationPath);
}
} catch (FileSystemIOException e) {
if (e.getErrorCode().isPresent() && e.getErrorCode().get().equals(ErrCode.NOT_FOUND)) {
LOG.warn("File {} not exist.", path.getNormalizedLocation());
if (!Boolean.valueOf(catalog.getProperties()
.getOrDefault("hive.ignore_absent_partitions", "true"))) {
throw new UserException("Partition location does not exist: " + path.getNormalizedLocation());
}
} else {
throw new RuntimeException(e);
}
}
result.setPartitionValues(Lists.newArrayList(partitionValues));
return result;
}
private FileCacheValue loadFiles(FileCacheKey key, DirectoryLister directoryLister, TableIf table) {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
HMSExternalCatalog catalog = hmsCatalog(key.catalogId);
try {
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
LocationPath finalLocation = LocationPath.of(
key.getLocation(), catalog.getCatalogProperty().getStoragePropertiesMap());
try {
FileCacheValue result = getFileCache(catalog, finalLocation, key.inputFormat,
key.getPartitionValues(), directoryLister, table);
for (int i = 0; i < result.getValuesSize(); i++) {
if (HIVE_DEFAULT_PARTITION.equals(result.getPartitionValues().get(i))) {
result.getPartitionValues().set(i, FeConstants.null_string);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("load #{} splits for {} in catalog {}",
result.getFiles().size(), key, catalog.getName());
}
return result;
} catch (Exception e) {
throw new CacheException("failed to get input splits for %s in catalog %s",
e, key, catalog.getName());
}
} finally {
Thread.currentThread().setContextClassLoader(classLoader);
}
}
public HivePartitionValues getPartitionValues(ExternalTable dorisTable, List<Type> types) {
PartitionValueCacheKey key = new PartitionValueCacheKey(dorisTable.getOrBuildNameMapping(), types);
return getPartitionValues(key);
}
@VisibleForTesting
public HivePartitionValues getPartitionValues(PartitionValueCacheKey key) {
return partitionValuesEntry(key.nameMapping.getCtlId()).get(key);
}
public List<FileCacheValue> getFilesByPartitions(List<HivePartition> partitions,
boolean withCache,
boolean concurrent,
DirectoryLister directoryLister,
TableIf table) {
long start = System.currentTimeMillis();
if (partitions.isEmpty()) {
return Lists.newArrayList();
}
HivePartition firstPartition = partitions.get(0);
long catalogId = firstPartition.getNameMapping().getCtlId();
long fileId = Util.genIdByName(firstPartition.getNameMapping().getLocalDbName(),
firstPartition.getNameMapping().getLocalTblName());
List<FileCacheKey> keys = partitions.stream().map(p -> p.isDummyPartition()
? FileCacheKey.createDummyCacheKey(catalogId, fileId, p.getPath(), p.getInputFormat())
: new FileCacheKey(catalogId, fileId, p.getPath(), p.getInputFormat(), p.getPartitionValues()))
.collect(Collectors.toList());
List<FileCacheValue> fileLists;
try {
if (withCache) {
MetaCacheEntry<FileCacheKey, FileCacheValue> fileEntry = fileEntry(catalogId);
fileLists = keys.stream().map(fileEntry::get).collect(Collectors.toList());
} else if (concurrent) {
List<Future<FileCacheValue>> futures = keys.stream().map(
key -> fileListingExecutor.submit(() -> loadFiles(key, directoryLister, table)))
.collect(Collectors.toList());
fileLists = Lists.newArrayListWithExpectedSize(keys.size());
for (Future<FileCacheValue> future : futures) {
fileLists.add(future.get());
}
} else {
fileLists = keys.stream()
.map(key -> loadFiles(key, directoryLister, table))
.collect(Collectors.toList());
}
} catch (ExecutionException e) {
throw new CacheException("failed to get files from partitions in catalog %s",
e, hmsCatalog(catalogId).getName());
} catch (InterruptedException e) {
throw new CacheException("failed to get files from partitions in catalog %s with interrupted exception",
e, hmsCatalog(catalogId).getName());
}
if (LOG.isDebugEnabled()) {
LOG.debug("get #{} files from #{} partitions in catalog {} cost: {} ms",
fileLists.stream().mapToInt(l -> l.getFiles() == null ? 0 : l.getFiles().size()).sum(),
partitions.size(), hmsCatalog(catalogId).getName(), (System.currentTimeMillis() - start));
}
return fileLists;
}
public HivePartition getHivePartition(ExternalTable dorisTable, List<String> partitionValues) {
NameMapping nameMapping = dorisTable.getOrBuildNameMapping();
return partitionEntry(nameMapping.getCtlId()).get(new PartitionCacheKey(nameMapping, partitionValues));
}
public List<HivePartition> getAllPartitionsWithCache(ExternalTable dorisTable,
List<List<String>> partitionValuesList) {
return getAllPartitions(dorisTable, partitionValuesList, true);
}
public List<HivePartition> getAllPartitionsWithoutCache(ExternalTable dorisTable,
List<List<String>> partitionValuesList) {
return getAllPartitions(dorisTable, partitionValuesList, false);
}
private List<HivePartition> getAllPartitions(ExternalTable dorisTable,
List<List<String>> partitionValuesList,
boolean withCache) {
long start = System.currentTimeMillis();
NameMapping nameMapping = dorisTable.getOrBuildNameMapping();
long catalogId = nameMapping.getCtlId();
List<PartitionCacheKey> keys = partitionValuesList.stream()
.map(p -> new PartitionCacheKey(nameMapping, p))
.collect(Collectors.toList());
List<HivePartition> partitions;
if (withCache) {
MetaCacheEntry<PartitionCacheKey, HivePartition> partitionEntry = partitionEntry(catalogId);
partitions = keys.stream().map(partitionEntry::get).collect(Collectors.toList());
} else {
partitions = new ArrayList<>(loadPartitions(keys).values());
}
if (LOG.isDebugEnabled()) {
LOG.debug("get #{} partitions in catalog {} cost: {} ms", partitions.size(),
hmsCatalog(catalogId).getName(), (System.currentTimeMillis() - start));
}
return partitions;
}
public void invalidateTableCache(NameMapping nameMapping) {
long catalogId = nameMapping.getCtlId();
MetaCacheEntry<PartitionValueCacheKey, HivePartitionValues> partitionValuesEntry =
partitionValuesEntryIfInitialized(catalogId);
if (partitionValuesEntry != null) {
partitionValuesEntry.invalidateKey(new PartitionValueCacheKey(nameMapping, null));
}
MetaCacheEntry<PartitionCacheKey, HivePartition> partitionEntry = partitionEntryIfInitialized(catalogId);
if (partitionEntry != null) {
partitionEntry.invalidateIf(k -> k.isSameTable(
nameMapping.getLocalDbName(), nameMapping.getLocalTblName()));
}
MetaCacheEntry<FileCacheKey, FileCacheValue> fileEntry = fileEntryIfInitialized(catalogId);
if (fileEntry != null) {
long tableId = Util.genIdByName(nameMapping.getLocalDbName(), nameMapping.getLocalTblName());
fileEntry.invalidateIf(k -> k.isSameTable(tableId));
}
}
public void invalidatePartitionCache(ExternalTable dorisTable, String partitionName) {
NameMapping nameMapping = dorisTable.getOrBuildNameMapping();
long catalogId = nameMapping.getCtlId();
MetaCacheEntry<PartitionValueCacheKey, HivePartitionValues> partitionValuesEntry =
partitionValuesEntryIfInitialized(catalogId);
MetaCacheEntry<PartitionCacheKey, HivePartition> partitionEntry = partitionEntryIfInitialized(catalogId);
MetaCacheEntry<FileCacheKey, FileCacheValue> fileEntry = fileEntryIfInitialized(catalogId);
if (partitionValuesEntry == null || partitionEntry == null || fileEntry == null) {
return;
}
long tableId = Util.genIdByName(nameMapping.getLocalDbName(), nameMapping.getLocalTblName());
PartitionValueCacheKey key = new PartitionValueCacheKey(nameMapping, null);
HivePartitionValues partitionValues = partitionValuesEntry.getIfPresent(key);
if (partitionValues == null) {
return;
}
Long partitionId = partitionValues.partitionNameToIdMap.get(partitionName);
if (partitionId == null) {
return;
}
List<String> values = partitionValues.partitionValuesMap.get(partitionId);
if (values == null) {
return;
}
PartitionCacheKey partKey = new PartitionCacheKey(nameMapping, values);
HivePartition partition = partitionEntry.getIfPresent(partKey);
if (partition == null) {
return;
}
fileEntry.invalidateKey(new FileCacheKey(nameMapping.getCtlId(), tableId, partition.getPath(),
null, partition.getPartitionValues()));
partitionEntry.invalidateKey(partKey);
}
/**
* Selectively refreshes cache for affected partitions based on update information from BE.
*/
public void refreshAffectedPartitions(HMSExternalTable table,
List<org.apache.doris.thrift.THivePartitionUpdate> partitionUpdates,
List<String> modifiedPartNames,
List<String> newPartNames) {
if (partitionUpdates == null || partitionUpdates.isEmpty()) {
return;
}
for (org.apache.doris.thrift.THivePartitionUpdate update : partitionUpdates) {
String partitionName = update.getName();
if (Strings.isNullOrEmpty(partitionName)) {
continue;
}
switch (update.getUpdateMode()) {
case APPEND:
case OVERWRITE:
modifiedPartNames.add(partitionName);
break;
case NEW:
newPartNames.add(partitionName);
break;
default:
LOG.warn("Unknown update mode {} for partition {}",
update.getUpdateMode(), partitionName);
break;
}
}
refreshAffectedPartitionsCache(table, modifiedPartNames, newPartNames);
}
public void refreshAffectedPartitionsCache(HMSExternalTable table,
List<String> modifiedPartNames,
List<String> newPartNames) {
for (String partitionName : modifiedPartNames) {
invalidatePartitionCache(table, partitionName);
}
List<String> mergedPartNames = Lists.newArrayList(modifiedPartNames);
mergedPartNames.addAll(newPartNames);
if (!mergedPartNames.isEmpty()) {
addPartitionsCache(table.getOrBuildNameMapping(), mergedPartNames,
table.getPartitionColumnTypes(java.util.Optional.empty()));
}
LOG.info("Refreshed cache for table {}: {} modified partitions, {} new partitions",
table.getName(), modifiedPartNames.size(), newPartNames.size());
}
public void addPartitionsCache(NameMapping nameMapping,
List<String> partitionNames,
List<Type> partitionColumnTypes) {
long catalogId = nameMapping.getCtlId();
MetaCacheEntry<PartitionValueCacheKey, HivePartitionValues> partitionValuesEntry =
partitionValuesEntryIfInitialized(catalogId);
if (partitionValuesEntry == null) {
return;
}
PartitionValueCacheKey key = new PartitionValueCacheKey(nameMapping, partitionColumnTypes);
HivePartitionValues partitionValues = partitionValuesEntry.getIfPresent(key);
if (partitionValues == null) {
return;
}
HivePartitionValues copy = partitionValues.copy();
Map<Long, PartitionItem> idToPartitionItemBefore = copy.getIdToPartitionItem();
Map<String, Long> partitionNameToIdMapBefore = copy.getPartitionNameToIdMap();
Map<Long, PartitionItem> idToPartitionItem = new HashMap<>();
HMSExternalCatalog catalog = hmsCatalog(catalogId);
String localDbName = nameMapping.getLocalDbName();
String localTblName = nameMapping.getLocalTblName();
for (String partitionName : partitionNames) {
if (partitionNameToIdMapBefore.containsKey(partitionName)) {
LOG.info("addPartitionsCache partitionName:[{}] has exist in table:[{}]",
partitionName, localTblName);
continue;
}
long partitionId = Util.genIdByName(catalog.getName(), localDbName, localTblName, partitionName);
ListPartitionItem listPartitionItem = toListPartitionItem(partitionName, key.types, catalog.getName());
idToPartitionItemBefore.put(partitionId, listPartitionItem);
idToPartitionItem.put(partitionId, listPartitionItem);
partitionNameToIdMapBefore.put(partitionName, partitionId);
}
Map<Long, List<String>> partitionValuesMapBefore = copy.getPartitionValuesMap();
Map<Long, List<String>> partitionValuesMap = ListPartitionPrunerV2.getPartitionValuesMap(idToPartitionItem);
partitionValuesMapBefore.putAll(partitionValuesMap);
copy.rebuildSortedPartitionRanges();
HivePartitionValues partitionValuesCur = partitionValuesEntry.getIfPresent(key);
if (partitionValuesCur == partitionValues) {
partitionValuesEntry.put(key, copy);
}
}
public void dropPartitionsCache(ExternalTable dorisTable,
List<String> partitionNames,
boolean invalidPartitionCache) {
NameMapping nameMapping = dorisTable.getOrBuildNameMapping();
long catalogId = nameMapping.getCtlId();
MetaCacheEntry<PartitionValueCacheKey, HivePartitionValues> partitionValuesEntry =
partitionValuesEntryIfInitialized(catalogId);
if (partitionValuesEntry == null) {
return;
}
PartitionValueCacheKey key = new PartitionValueCacheKey(nameMapping, null);
HivePartitionValues partitionValues = partitionValuesEntry.getIfPresent(key);
if (partitionValues == null) {
return;
}
HivePartitionValues copy = partitionValues.copy();
Map<String, Long> partitionNameToIdMapBefore = copy.getPartitionNameToIdMap();
Map<Long, PartitionItem> idToPartitionItemBefore = copy.getIdToPartitionItem();
Map<Long, List<String>> partitionValuesMap = copy.getPartitionValuesMap();
for (String partitionName : partitionNames) {
if (!partitionNameToIdMapBefore.containsKey(partitionName)) {
LOG.info("dropPartitionsCache partitionName:[{}] not exist in table:[{}]",
partitionName, nameMapping.getFullLocalName());
continue;
}
Long partitionId = partitionNameToIdMapBefore.remove(partitionName);
idToPartitionItemBefore.remove(partitionId);
partitionValuesMap.remove(partitionId);
if (invalidPartitionCache) {
invalidatePartitionCache(dorisTable, partitionName);
}
}
copy.rebuildSortedPartitionRanges();
HivePartitionValues partitionValuesCur = partitionValuesEntry.getIfPresent(key);
if (partitionValuesCur == partitionValues) {
partitionValuesEntry.put(key, copy);
}
}
@VisibleForTesting
public void putPartitionValuesCacheForTest(PartitionValueCacheKey key, HivePartitionValues values) {
partitionValuesEntry(key.getNameMapping().getCtlId()).put(key, values);
}
public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions,
Map<String, String> txnValidIds,
boolean isFullAcid,
String bindBrokerName) {
List<FileCacheValue> fileCacheValues = Lists.newArrayList();
try {
if (partitions.isEmpty()) {
return fileCacheValues;
}
for (HivePartition partition : partitions) {
HMSExternalCatalog catalog = hmsCatalog(partition.getNameMapping().getCtlId());
LocationPath locationPath = LocationPath.of(partition.getPath(),
catalog.getCatalogProperty().getStoragePropertiesMap());
RemoteFileSystem fileSystem = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache()
.getRemoteFileSystem(new FileSystemCache.FileSystemCacheKey(
locationPath.getNormalizedLocation(),
locationPath.getStorageProperties()));
AuthenticationConfig authenticationConfig = AuthenticationConfig
.getKerberosConfig(locationPath.getStorageProperties().getBackendConfigProperties());
HadoopAuthenticator hadoopAuthenticator =
HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig);
fileCacheValues.add(
hadoopAuthenticator.doAs(() -> AcidUtil.getAcidState(
fileSystem,
partition,
txnValidIds,
catalog.getCatalogProperty().getStoragePropertiesMap(),
isFullAcid)));
}
} catch (Exception e) {
throw new CacheException("Failed to get input splits %s", e, txnValidIds.toString());
}
return fileCacheValues;
}
/**
* The key of hive partition values cache.
*/
@Data
public static class PartitionValueCacheKey {
private NameMapping nameMapping;
// Not part of cache identity.
private List<Type> types;
public PartitionValueCacheKey(NameMapping nameMapping, List<Type> types) {
this.nameMapping = nameMapping;
this.types = types;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof PartitionValueCacheKey)) {
return false;
}
return nameMapping.equals(((PartitionValueCacheKey) obj).nameMapping);
}
@Override
public int hashCode() {
return nameMapping.hashCode();
}
@Override
public String toString() {
return "PartitionValueCacheKey{" + "dbName='" + nameMapping.getLocalDbName() + '\''
+ ", tblName='" + nameMapping.getLocalTblName() + '\'' + '}';
}
}
@Data
public static class PartitionCacheKey {
private NameMapping nameMapping;
private List<String> values;
public PartitionCacheKey(NameMapping nameMapping, List<String> values) {
this.nameMapping = nameMapping;
this.values = values;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof PartitionCacheKey)) {
return false;
}
return nameMapping.equals(((PartitionCacheKey) obj).nameMapping)
&& Objects.equals(values, ((PartitionCacheKey) obj).values);
}
boolean isSameTable(String dbName, String tblName) {
return this.nameMapping.getLocalDbName().equals(dbName)
&& this.nameMapping.getLocalTblName().equals(tblName);
}
@Override
public int hashCode() {
return Objects.hash(nameMapping, values);
}
@Override
public String toString() {
return "PartitionCacheKey{" + "dbName='" + nameMapping.getLocalDbName() + '\''
+ ", tblName='" + nameMapping.getLocalTblName() + '\'' + ", values=" + values + '}';
}
}
@Data
public static class FileCacheKey {
private long dummyKey = 0;
private long catalogId;
private String location;
// Not part of cache identity.
private String inputFormat;
// The values of partitions.
protected List<String> partitionValues;
private long id;
public FileCacheKey(long catalogId, long id, String location, String inputFormat,
List<String> partitionValues) {
this.catalogId = catalogId;
this.location = location;
this.inputFormat = inputFormat;
this.partitionValues = partitionValues == null ? Lists.newArrayList() : partitionValues;
this.id = id;
}
public static FileCacheKey createDummyCacheKey(long catalogId, long id, String location,
String inputFormat) {
FileCacheKey fileCacheKey = new FileCacheKey(catalogId, id, location, inputFormat, null);
fileCacheKey.dummyKey = Objects.hash(catalogId, id);
return fileCacheKey;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof FileCacheKey)) {
return false;
}
if (dummyKey != 0) {
return dummyKey == ((FileCacheKey) obj).dummyKey;
}
return catalogId == ((FileCacheKey) obj).catalogId
&& location.equals(((FileCacheKey) obj).location)
&& Objects.equals(partitionValues, ((FileCacheKey) obj).partitionValues);
}
boolean isSameTable(long id) {
return this.id == id;
}
@Override
public int hashCode() {
if (dummyKey != 0) {
return Objects.hash(dummyKey);
}
return Objects.hash(catalogId, location, partitionValues);
}
@Override
public String toString() {
return "FileCacheKey{" + "catalogId=" + catalogId + ", location='" + location + '\''
+ ", inputFormat='" + inputFormat + '\'' + '}';
}
}
@Data
public static class FileCacheValue {
private final List<HiveFileStatus> files = Lists.newArrayList();
private boolean isSplittable;
protected List<String> partitionValues;
private AcidInfo acidInfo;
public void addFile(RemoteFile file, LocationPath locationPath) {
if (isFileVisible(file.getPath())) {
HiveFileStatus status = new HiveFileStatus();
status.setBlockLocations(file.getBlockLocations());
status.setPath(locationPath);
status.length = file.getSize();
status.blockSize = file.getBlockSize();
status.modificationTime = file.getModificationTime();
files.add(status);
}
}
public int getValuesSize() {
return partitionValues == null ? 0 : partitionValues.size();
}
@VisibleForTesting
public static boolean isFileVisible(Path path) {
if (path == null) {
return false;
}
String pathStr = path.toUri().toString();
if (containsHiddenPath(pathStr)) {
return false;
}
return true;
}
private static boolean containsHiddenPath(String path) {
if (path.startsWith(".") || path.startsWith("_")) {
return true;
}
for (int i = 0; i < path.length() - 1; i++) {
if (path.charAt(i) == '/' && (path.charAt(i + 1) == '.' || path.charAt(i + 1) == '_')) {
return true;
}
}
return false;
}
}
@Data
public static class HiveFileStatus {
BlockLocation[] blockLocations;
LocationPath path;
long length;
long blockSize;
long modificationTime;
boolean splittable;
List<String> partitionValues;
AcidInfo acidInfo;
}
@Data
public static class HivePartitionValues {
private BiMap<String, Long> partitionNameToIdMap;
private Map<Long, PartitionItem> idToPartitionItem;
private Map<Long, List<String>> partitionValuesMap;
// Sorted partition ranges for binary search filtering.
private SortedPartitionRanges<String> sortedPartitionRanges;
public HivePartitionValues() {
}
public HivePartitionValues(Map<Long, PartitionItem> idToPartitionItem,
BiMap<String, Long> partitionNameToIdMap,
Map<Long, List<String>> partitionValuesMap) {
this.idToPartitionItem = idToPartitionItem;
this.partitionNameToIdMap = partitionNameToIdMap;
this.partitionValuesMap = partitionValuesMap;
this.sortedPartitionRanges = buildSortedPartitionRanges();
}
public HivePartitionValues copy() {
HivePartitionValues copy = new HivePartitionValues();
copy.setPartitionNameToIdMap(partitionNameToIdMap == null ? null : HashBiMap.create(partitionNameToIdMap));
copy.setIdToPartitionItem(idToPartitionItem == null ? null : Maps.newHashMap(idToPartitionItem));
copy.setPartitionValuesMap(partitionValuesMap == null ? null : Maps.newHashMap(partitionValuesMap));
return copy;
}
public void rebuildSortedPartitionRanges() {
this.sortedPartitionRanges = buildSortedPartitionRanges();
}
public java.util.Optional<SortedPartitionRanges<String>> getSortedPartitionRanges() {
return java.util.Optional.ofNullable(sortedPartitionRanges);
}
private SortedPartitionRanges<String> buildSortedPartitionRanges() {
if (partitionNameToIdMap == null || partitionNameToIdMap.isEmpty()
|| idToPartitionItem == null || idToPartitionItem.isEmpty()) {
return null;
}
BiMap<Long, String> idToName = partitionNameToIdMap.inverse();
Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMapWithExpectedSize(idToPartitionItem.size());
for (Map.Entry<Long, PartitionItem> entry : idToPartitionItem.entrySet()) {
String partitionName = idToName.get(entry.getKey());
if (partitionName != null) {
nameToPartitionItem.put(partitionName, entry.getValue());
}
}
return SortedPartitionRanges.build(nameToPartitionItem);
}
}
}