HMSTransaction.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java
// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java
// and modified by Doris
package org.apache.doris.datasource.hive;
import org.apache.doris.backup.Status;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.info.SimpleTableInfo;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.datasource.statistics.CommonStatistics;
import org.apache.doris.fsv2.FileSystem;
import org.apache.doris.fsv2.FileSystemProvider;
import org.apache.doris.fsv2.FileSystemUtil;
import org.apache.doris.fsv2.remote.RemoteFile;
import org.apache.doris.fsv2.remote.S3FileSystem;
import org.apache.doris.fsv2.remote.SwitchingFileSystem;
import org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.THiveLocationParams;
import org.apache.doris.thrift.THivePartitionUpdate;
import org.apache.doris.thrift.TS3MPUPendingUpload;
import org.apache.doris.thrift.TUpdateMode;
import org.apache.doris.transaction.Transaction;
import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.airlift.concurrent.MoreFutures;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
public class HMSTransaction implements Transaction {
private static final Logger LOG = LogManager.getLogger(HMSTransaction.class);
private final HiveMetadataOps hiveOps;
private final FileSystem fs;
private Optional<SummaryProfile> summaryProfile = Optional.empty();
private String queryId;
private boolean isOverwrite = false;
TFileType fileType;
private final Map<SimpleTableInfo, Action<TableAndMore>> tableActions = new HashMap<>();
private final Map<SimpleTableInfo, Map<List<String>, Action<PartitionAndMore>>>
partitionActions = new HashMap<>();
private final Map<SimpleTableInfo, List<FieldSchema>> tableColumns = new HashMap<>();
private final Executor fileSystemExecutor;
private HmsCommitter hmsCommitter;
private List<THivePartitionUpdate> hivePartitionUpdates = Lists.newArrayList();
private Optional<String> stagingDirectory;
private boolean isMockedPartitionUpdate = false;
private static class UncompletedMpuPendingUpload {
private final TS3MPUPendingUpload s3MPUPendingUpload;
private final String path;
public UncompletedMpuPendingUpload(TS3MPUPendingUpload s3MPUPendingUpload, String path) {
this.s3MPUPendingUpload = s3MPUPendingUpload;
this.path = path;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
UncompletedMpuPendingUpload that = (UncompletedMpuPendingUpload) o;
return Objects.equals(s3MPUPendingUpload, that.s3MPUPendingUpload) && Objects.equals(path,
that.path);
}
@Override
public int hashCode() {
return Objects.hash(s3MPUPendingUpload, path);
}
}
private final Set<UncompletedMpuPendingUpload> uncompletedMpuPendingUploads = new HashSet<>();
public HMSTransaction(HiveMetadataOps hiveOps, FileSystemProvider fileSystemProvider, Executor fileSystemExecutor) {
this.hiveOps = hiveOps;
this.fs = fileSystemProvider.get(null);
if (!(fs instanceof SwitchingFileSystem)) {
throw new RuntimeException("fs should be SwitchingFileSystem");
}
if (ConnectContext.get().getExecutor() != null) {
summaryProfile = Optional.of(ConnectContext.get().getExecutor().getSummaryProfile());
}
this.fileSystemExecutor = fileSystemExecutor;
}
@Override
public void commit() {
doCommit();
}
public List<THivePartitionUpdate> mergePartitions(List<THivePartitionUpdate> hivePUs) {
Map<String, THivePartitionUpdate> mm = new HashMap<>();
for (THivePartitionUpdate pu : hivePUs) {
if (mm.containsKey(pu.getName())) {
THivePartitionUpdate old = mm.get(pu.getName());
old.setFileSize(old.getFileSize() + pu.getFileSize());
old.setRowCount(old.getRowCount() + pu.getRowCount());
if (old.getS3MpuPendingUploads() != null && pu.getS3MpuPendingUploads() != null) {
old.getS3MpuPendingUploads().addAll(pu.getS3MpuPendingUploads());
}
old.getFileNames().addAll(pu.getFileNames());
} else {
mm.put(pu.getName(), pu);
}
}
return new ArrayList<>(mm.values());
}
@Override
public void rollback() {
if (hmsCommitter == null) {
return;
}
try {
hmsCommitter.abort();
hmsCommitter.rollback();
} finally {
hmsCommitter.shutdownExecutorService();
}
}
public void beginInsertTable(HiveInsertCommandContext ctx) {
queryId = ctx.getQueryId();
isOverwrite = ctx.isOverwrite();
fileType = ctx.getFileType();
if (fileType == TFileType.FILE_S3) {
stagingDirectory = Optional.empty();
} else {
stagingDirectory = Optional.of(ctx.getWritePath());
}
}
public void finishInsertTable(SimpleTableInfo tableInfo) {
Table table = getTable(tableInfo);
if (hivePartitionUpdates.isEmpty() && isOverwrite && table.getPartitionKeysSize() == 0) {
// use an empty hivePartitionUpdate to clean source table
isMockedPartitionUpdate = true;
THivePartitionUpdate emptyUpdate = new THivePartitionUpdate() {{
setUpdateMode(TUpdateMode.OVERWRITE);
setFileSize(0);
setRowCount(0);
setFileNames(Collections.emptyList());
if (fileType == TFileType.FILE_S3) {
setS3MpuPendingUploads(Lists.newArrayList(new TS3MPUPendingUpload()));
setLocation(new THiveLocationParams() {{
setWritePath(table.getSd().getLocation());
}
});
} else {
stagingDirectory.ifPresent((v) -> {
fs.makeDir(v);
setLocation(new THiveLocationParams() {{
setWritePath(v);
}
});
});
}
}
};
hivePartitionUpdates = Lists.newArrayList(emptyUpdate);
}
List<THivePartitionUpdate> mergedPUs = mergePartitions(hivePartitionUpdates);
for (THivePartitionUpdate pu : mergedPUs) {
if (pu.getS3MpuPendingUploads() != null) {
for (TS3MPUPendingUpload s3MPUPendingUpload : pu.getS3MpuPendingUploads()) {
uncompletedMpuPendingUploads.add(
new UncompletedMpuPendingUpload(s3MPUPendingUpload, pu.getLocation().getTargetPath()));
}
}
}
List<Pair<THivePartitionUpdate, HivePartitionStatistics>> insertExistsPartitions = new ArrayList<>();
for (THivePartitionUpdate pu : mergedPUs) {
TUpdateMode updateMode = pu.getUpdateMode();
HivePartitionStatistics hivePartitionStatistics = HivePartitionStatistics.fromCommonStatistics(
pu.getRowCount(),
pu.getFileNamesSize(),
pu.getFileSize());
String writePath = pu.getLocation().getWritePath();
if (table.getPartitionKeysSize() == 0) {
Preconditions.checkArgument(mergedPUs.size() == 1,
"When updating a non-partitioned table, multiple partitions should not be written");
switch (updateMode) {
case APPEND:
finishChangingExistingTable(
ActionType.INSERT_EXISTING,
tableInfo,
writePath,
pu.getFileNames(),
hivePartitionStatistics,
pu);
break;
case OVERWRITE:
dropTable(tableInfo);
createTable(tableInfo, table, writePath, pu.getFileNames(), hivePartitionStatistics, pu);
break;
default:
throw new RuntimeException("Not support mode:[" + updateMode + "] in unPartitioned table");
}
} else {
switch (updateMode) {
case APPEND:
// insert into existing partition
insertExistsPartitions.add(Pair.of(pu, hivePartitionStatistics));
break;
case NEW:
case OVERWRITE:
StorageDescriptor sd = table.getSd();
HivePartition hivePartition = new HivePartition(
tableInfo,
false,
sd.getInputFormat(),
pu.getLocation().getTargetPath(),
HiveUtil.toPartitionValues(pu.getName()),
Maps.newHashMap(),
sd.getOutputFormat(),
sd.getSerdeInfo().getSerializationLib(),
sd.getCols()
);
if (updateMode == TUpdateMode.OVERWRITE) {
dropPartition(tableInfo, hivePartition.getPartitionValues(), true);
}
addPartition(
tableInfo, hivePartition, writePath,
pu.getName(), pu.getFileNames(), hivePartitionStatistics, pu);
break;
default:
throw new RuntimeException("Not support mode:[" + updateMode + "] in partitioned table");
}
}
}
if (!insertExistsPartitions.isEmpty()) {
convertToInsertExistingPartitionAction(tableInfo, insertExistsPartitions);
}
}
public void doCommit() {
hmsCommitter = new HmsCommitter();
try {
for (Map.Entry<SimpleTableInfo, Action<TableAndMore>> entry : tableActions.entrySet()) {
SimpleTableInfo tableInfo = entry.getKey();
Action<TableAndMore> action = entry.getValue();
switch (action.getType()) {
case INSERT_EXISTING:
hmsCommitter.prepareInsertExistingTable(tableInfo, action.getData());
break;
case ALTER:
hmsCommitter.prepareAlterTable(tableInfo, action.getData());
break;
default:
throw new UnsupportedOperationException("Unsupported table action type: " + action.getType());
}
}
for (Map.Entry<SimpleTableInfo, Map<List<String>, Action<PartitionAndMore>>> tableEntry
: partitionActions.entrySet()) {
SimpleTableInfo tableInfo = tableEntry.getKey();
for (Map.Entry<List<String>, Action<PartitionAndMore>> partitionEntry :
tableEntry.getValue().entrySet()) {
Action<PartitionAndMore> action = partitionEntry.getValue();
switch (action.getType()) {
case INSERT_EXISTING:
hmsCommitter.prepareInsertExistPartition(tableInfo, action.getData());
break;
case ADD:
hmsCommitter.prepareAddPartition(tableInfo, action.getData());
break;
case ALTER:
hmsCommitter.prepareAlterPartition(tableInfo, action.getData());
break;
default:
throw new UnsupportedOperationException(
"Unsupported partition action type: " + action.getType());
}
}
}
hmsCommitter.doCommit();
} catch (Throwable t) {
LOG.warn("Failed to commit for {}, abort it.", queryId);
try {
hmsCommitter.abort();
hmsCommitter.rollback();
} catch (RuntimeException e) {
t.addSuppressed(new Exception("Failed to roll back after commit failure", e));
}
throw t;
} finally {
hmsCommitter.runClearPathsForFinish();
hmsCommitter.shutdownExecutorService();
}
}
public void updateHivePartitionUpdates(List<THivePartitionUpdate> pus) {
synchronized (this) {
hivePartitionUpdates.addAll(pus);
}
}
// for test
public void setHivePartitionUpdates(List<THivePartitionUpdate> hivePartitionUpdates) {
this.hivePartitionUpdates = hivePartitionUpdates;
}
public long getUpdateCnt() {
return hivePartitionUpdates.stream().mapToLong(THivePartitionUpdate::getRowCount).sum();
}
private void convertToInsertExistingPartitionAction(
SimpleTableInfo tableInfo,
List<Pair<THivePartitionUpdate, HivePartitionStatistics>> partitions) {
Map<List<String>, Action<PartitionAndMore>> partitionActionsForTable =
partitionActions.computeIfAbsent(tableInfo, k -> new HashMap<>());
for (List<Pair<THivePartitionUpdate, HivePartitionStatistics>> partitionBatch :
Iterables.partition(partitions, 100)) {
List<String> partitionNames = partitionBatch
.stream()
.map(pair -> pair.first.getName())
.collect(Collectors.toList());
// check in partitionAction
Action<PartitionAndMore> oldPartitionAction = partitionActionsForTable.get(partitionNames);
if (oldPartitionAction != null) {
switch (oldPartitionAction.getType()) {
case DROP:
case DROP_PRESERVE_DATA:
throw new RuntimeException(
"Not found partition from partition actions"
+ "for " + tableInfo + ", partitions: " + partitionNames);
case ADD:
case ALTER:
case INSERT_EXISTING:
case MERGE:
throw new UnsupportedOperationException(
"Inserting into a partition that were added, altered,"
+ "or inserted into in the same transaction is not supported");
default:
throw new IllegalStateException("Unknown action type: " + oldPartitionAction.getType());
}
}
Map<String, Partition> partitionsByNamesMap = HiveUtil.convertToNamePartitionMap(
partitionNames,
hiveOps.getClient().getPartitions(tableInfo.getDbName(), tableInfo.getTbName(), partitionNames));
for (int i = 0; i < partitionsByNamesMap.size(); i++) {
String partitionName = partitionNames.get(i);
// check from hms
Partition partition = partitionsByNamesMap.get(partitionName);
if (partition == null) {
// Prevent this partition from being deleted by other engines
throw new RuntimeException(
"Not found partition from hms for " + tableInfo
+ ", partitions: " + partitionNames);
}
THivePartitionUpdate pu = partitionBatch.get(i).first;
HivePartitionStatistics updateStats = partitionBatch.get(i).second;
StorageDescriptor sd = partition.getSd();
List<String> partitionValues = HiveUtil.toPartitionValues(pu.getName());
HivePartition hivePartition = new HivePartition(
tableInfo,
false,
sd.getInputFormat(),
partition.getSd().getLocation(),
partitionValues,
partition.getParameters(),
sd.getOutputFormat(),
sd.getSerdeInfo().getSerializationLib(),
sd.getCols()
);
partitionActionsForTable.put(
partitionValues,
new Action<>(
ActionType.INSERT_EXISTING,
new PartitionAndMore(
hivePartition,
pu.getLocation().getWritePath(),
pu.getName(),
pu.getFileNames(),
updateStats,
pu
))
);
}
}
}
private static void addSuppressedExceptions(
List<Throwable> suppressedExceptions,
Throwable t,
List<String> descriptions,
String description) {
descriptions.add(description);
// A limit is needed to avoid having a huge exception object. 5 was chosen arbitrarily.
if (suppressedExceptions.size() < 5) {
suppressedExceptions.add(t);
}
}
public static class UpdateStatisticsTask {
private final SimpleTableInfo tableInfo;
private final Optional<String> partitionName;
private final HivePartitionStatistics updatePartitionStat;
private final boolean merge;
private boolean done;
public UpdateStatisticsTask(SimpleTableInfo tableInfo, Optional<String> partitionName,
HivePartitionStatistics statistics, boolean merge) {
this.tableInfo = Objects.requireNonNull(tableInfo, "tableInfo is null");
this.partitionName = Objects.requireNonNull(partitionName, "partitionName is null");
this.updatePartitionStat = Objects.requireNonNull(statistics, "statistics is null");
this.merge = merge;
}
public void run(HiveMetadataOps hiveOps) {
if (partitionName.isPresent()) {
hiveOps.updatePartitionStatistics(tableInfo, partitionName.get(), this::updateStatistics);
} else {
hiveOps.updateTableStatistics(tableInfo, this::updateStatistics);
}
done = true;
}
public void undo(HiveMetadataOps hmsOps) {
if (!done) {
return;
}
if (partitionName.isPresent()) {
hmsOps.updatePartitionStatistics(tableInfo, partitionName.get(), this::resetStatistics);
} else {
hmsOps.updateTableStatistics(tableInfo, this::resetStatistics);
}
}
public String getDescription() {
if (partitionName.isPresent()) {
return "alter partition parameters " + tableInfo + " " + partitionName.get();
} else {
return "alter table parameters " + tableInfo;
}
}
private HivePartitionStatistics updateStatistics(HivePartitionStatistics currentStats) {
return merge ? HivePartitionStatistics.merge(currentStats, updatePartitionStat) : updatePartitionStat;
}
private HivePartitionStatistics resetStatistics(HivePartitionStatistics currentStatistics) {
return HivePartitionStatistics
.reduce(currentStatistics, updatePartitionStat, CommonStatistics.ReduceOperator.SUBTRACT);
}
}
public static class AddPartitionsTask {
private final List<HivePartitionWithStatistics> partitions = new ArrayList<>();
private final List<List<String>> createdPartitionValues = new ArrayList<>();
public boolean isEmpty() {
return partitions.isEmpty();
}
public List<HivePartitionWithStatistics> getPartitions() {
return partitions;
}
public void clear() {
partitions.clear();
createdPartitionValues.clear();
}
public void addPartition(HivePartitionWithStatistics partition) {
partitions.add(partition);
}
public void run(HiveMetadataOps hiveOps) {
HivePartition firstPartition = partitions.get(0).getPartition();
SimpleTableInfo tableInfo = firstPartition.getTableInfo();
List<List<HivePartitionWithStatistics>> batchedPartitions = Lists.partition(partitions, 20);
for (List<HivePartitionWithStatistics> batch : batchedPartitions) {
try {
hiveOps.addPartitions(tableInfo, batch);
for (HivePartitionWithStatistics partition : batch) {
createdPartitionValues.add(partition.getPartition().getPartitionValues());
}
} catch (Throwable t) {
LOG.warn("Failed to add partition", t);
throw t;
}
}
}
public List<List<String>> rollback(HiveMetadataOps hiveOps) {
HivePartition firstPartition = partitions.get(0).getPartition();
SimpleTableInfo tableInfo = firstPartition.getTableInfo();
List<List<String>> rollbackFailedPartitions = new ArrayList<>();
for (List<String> createdPartitionValue : createdPartitionValues) {
try {
hiveOps.dropPartition(tableInfo, createdPartitionValue, false);
} catch (Throwable t) {
LOG.warn("Failed to drop partition on {}.{} when rollback",
tableInfo, rollbackFailedPartitions);
rollbackFailedPartitions.add(createdPartitionValue);
}
}
return rollbackFailedPartitions;
}
}
private static class DirectoryCleanUpTask {
private final Path path;
private final boolean deleteEmptyDir;
public DirectoryCleanUpTask(String path, boolean deleteEmptyDir) {
this.path = new Path(path);
this.deleteEmptyDir = deleteEmptyDir;
}
public Path getPath() {
return path;
}
public boolean isDeleteEmptyDir() {
return deleteEmptyDir;
}
@Override
public String toString() {
return new StringJoiner(", ", DirectoryCleanUpTask.class.getSimpleName() + "[", "]")
.add("path=" + path)
.add("deleteEmptyDir=" + deleteEmptyDir)
.toString();
}
}
private static class DeleteRecursivelyResult {
private final boolean dirNoLongerExists;
private final List<String> notDeletedEligibleItems;
public DeleteRecursivelyResult(boolean dirNoLongerExists, List<String> notDeletedEligibleItems) {
this.dirNoLongerExists = dirNoLongerExists;
this.notDeletedEligibleItems = notDeletedEligibleItems;
}
public boolean dirNotExists() {
return dirNoLongerExists;
}
public List<String> getNotDeletedEligibleItems() {
return notDeletedEligibleItems;
}
}
private static class RenameDirectoryTask {
private final String renameFrom;
private final String renameTo;
public RenameDirectoryTask(String renameFrom, String renameTo) {
this.renameFrom = renameFrom;
this.renameTo = renameTo;
}
public String getRenameFrom() {
return renameFrom;
}
public String getRenameTo() {
return renameTo;
}
@Override
public String toString() {
return new StringJoiner(", ", RenameDirectoryTask.class.getSimpleName() + "[", "]")
.add("renameFrom:" + renameFrom)
.add("renameTo:" + renameTo)
.toString();
}
}
private void recursiveDeleteItems(Path directory, boolean deleteEmptyDir, boolean reverse) {
DeleteRecursivelyResult deleteResult = recursiveDeleteFiles(directory, deleteEmptyDir, reverse);
if (!deleteResult.getNotDeletedEligibleItems().isEmpty()) {
LOG.warn("Failed to delete directory {}. Some eligible items can't be deleted: {}.",
directory.toString(), deleteResult.getNotDeletedEligibleItems());
throw new RuntimeException(
"Failed to delete directory for files: " + deleteResult.getNotDeletedEligibleItems());
} else if (deleteEmptyDir && !deleteResult.dirNotExists()) {
LOG.warn("Failed to delete directory {} due to dir isn't empty", directory.toString());
throw new RuntimeException("Failed to delete directory for empty dir: " + directory.toString());
}
}
private DeleteRecursivelyResult recursiveDeleteFiles(Path directory, boolean deleteEmptyDir, boolean reverse) {
try {
Status status = fs.directoryExists(directory.toString());
if (status.getErrCode().equals(Status.ErrCode.NOT_FOUND)) {
return new DeleteRecursivelyResult(true, ImmutableList.of());
} else if (!status.ok()) {
ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder();
notDeletedEligibleItems.add(directory.toString() + "/*");
return new DeleteRecursivelyResult(false, notDeletedEligibleItems.build());
}
} catch (Exception e) {
ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder();
notDeletedEligibleItems.add(directory.toString() + "/*");
return new DeleteRecursivelyResult(false, notDeletedEligibleItems.build());
}
return doRecursiveDeleteFiles(directory, deleteEmptyDir, queryId, reverse);
}
private DeleteRecursivelyResult doRecursiveDeleteFiles(Path directory, boolean deleteEmptyDir,
String queryId, boolean reverse) {
List<RemoteFile> allFiles = new ArrayList<>();
Set<String> allDirs = new HashSet<>();
Status statusFile = fs.listFiles(directory.toString(), true, allFiles);
Status statusDir = fs.listDirectories(directory.toString(), allDirs);
if (!statusFile.ok() || !statusDir.ok()) {
ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder();
notDeletedEligibleItems.add(directory + "/*");
return new DeleteRecursivelyResult(false, notDeletedEligibleItems.build());
}
boolean allDescendentsDeleted = true;
ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder();
for (RemoteFile file : allFiles) {
if (reverse ^ file.getName().startsWith(queryId)) {
if (!deleteIfExists(file.getPath())) {
allDescendentsDeleted = false;
notDeletedEligibleItems.add(file.getPath().toString());
}
} else {
allDescendentsDeleted = false;
}
}
for (String dir : allDirs) {
DeleteRecursivelyResult subResult = doRecursiveDeleteFiles(new Path(dir), deleteEmptyDir, queryId, reverse);
if (!subResult.dirNotExists()) {
allDescendentsDeleted = false;
}
if (!subResult.getNotDeletedEligibleItems().isEmpty()) {
notDeletedEligibleItems.addAll(subResult.getNotDeletedEligibleItems());
}
}
if (allDescendentsDeleted && deleteEmptyDir) {
Verify.verify(notDeletedEligibleItems.build().isEmpty());
if (!deleteDirectoryIfExists(directory)) {
return new DeleteRecursivelyResult(false, ImmutableList.of(directory + "/"));
}
// all items of the location have been deleted.
return new DeleteRecursivelyResult(true, ImmutableList.of());
}
return new DeleteRecursivelyResult(false, notDeletedEligibleItems.build());
}
public boolean deleteIfExists(Path path) {
Status status = wrapperDeleteWithProfileSummary(path.toString());
if (status.ok()) {
return true;
}
return !fs.exists(path.toString()).ok();
}
public boolean deleteDirectoryIfExists(Path path) {
Status status = wrapperDeleteDirWithProfileSummary(path.toString());
if (status.ok()) {
return true;
}
return !fs.directoryExists(path.toString()).ok();
}
private static class TableAndMore {
private final Table table;
private final String currentLocation;
private final List<String> fileNames;
private final HivePartitionStatistics statisticsUpdate;
private final THivePartitionUpdate hivePartitionUpdate;
public TableAndMore(
Table table,
String currentLocation,
List<String> fileNames,
HivePartitionStatistics statisticsUpdate,
THivePartitionUpdate hivePartitionUpdate) {
this.table = Objects.requireNonNull(table, "table is null");
this.currentLocation = Objects.requireNonNull(currentLocation);
this.fileNames = Objects.requireNonNull(fileNames);
this.statisticsUpdate = Objects.requireNonNull(statisticsUpdate, "statisticsUpdate is null");
this.hivePartitionUpdate = Objects.requireNonNull(hivePartitionUpdate, "hivePartitionUpdate is null");
}
public Table getTable() {
return table;
}
public String getCurrentLocation() {
return currentLocation;
}
public List<String> getFileNames() {
return fileNames;
}
public HivePartitionStatistics getStatisticsUpdate() {
return statisticsUpdate;
}
public THivePartitionUpdate getHivePartitionUpdate() {
return hivePartitionUpdate;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("table", table)
.add("statisticsUpdate", statisticsUpdate)
.toString();
}
}
private static class PartitionAndMore {
private final HivePartition partition;
private final String currentLocation;
private final String partitionName;
private final List<String> fileNames;
private final HivePartitionStatistics statisticsUpdate;
private final THivePartitionUpdate hivePartitionUpdate;
public PartitionAndMore(
HivePartition partition,
String currentLocation,
String partitionName,
List<String> fileNames,
HivePartitionStatistics statisticsUpdate,
THivePartitionUpdate hivePartitionUpdate) {
this.partition = Objects.requireNonNull(partition, "partition is null");
this.currentLocation = Objects.requireNonNull(currentLocation, "currentLocation is null");
this.partitionName = Objects.requireNonNull(partitionName, "partition is null");
this.fileNames = Objects.requireNonNull(fileNames, "fileNames is null");
this.statisticsUpdate = Objects.requireNonNull(statisticsUpdate, "statisticsUpdate is null");
this.hivePartitionUpdate = Objects.requireNonNull(hivePartitionUpdate, "hivePartitionUpdate is null");
}
public HivePartition getPartition() {
return partition;
}
public String getCurrentLocation() {
return currentLocation;
}
public String getPartitionName() {
return partitionName;
}
public List<String> getFileNames() {
return fileNames;
}
public HivePartitionStatistics getStatisticsUpdate() {
return statisticsUpdate;
}
public THivePartitionUpdate getHivePartitionUpdate() {
return hivePartitionUpdate;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("partition", partition)
.add("currentLocation", currentLocation)
.add("fileNames", fileNames)
.toString();
}
}
private enum ActionType {
// drop a table/partition
DROP,
// drop a table/partition but will preserve data
DROP_PRESERVE_DATA,
// add a table/partition
ADD,
// drop then add a table/partition, like overwrite
ALTER,
// insert into an existing table/partition
INSERT_EXISTING,
// merger into an existing table/partition
MERGE,
}
public static class Action<T> {
private final ActionType type;
private final T data;
public Action(ActionType type, T data) {
this.type = Objects.requireNonNull(type, "type is null");
if (type == ActionType.DROP || type == ActionType.DROP_PRESERVE_DATA) {
Preconditions.checkArgument(data == null, "data is not null");
} else {
Objects.requireNonNull(data, "data is null");
}
this.data = data;
}
public ActionType getType() {
return type;
}
public T getData() {
Preconditions.checkState(type != ActionType.DROP);
return data;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("type", type)
.add("data", data)
.toString();
}
}
public synchronized Table getTable(SimpleTableInfo tableInfo) {
Action<TableAndMore> tableAction = tableActions.get(tableInfo);
if (tableAction == null) {
return hiveOps.getClient().getTable(tableInfo.getDbName(), tableInfo.getTbName());
}
switch (tableAction.getType()) {
case ADD:
case ALTER:
case INSERT_EXISTING:
case MERGE:
return tableAction.getData().getTable();
case DROP:
case DROP_PRESERVE_DATA:
break;
default:
throw new IllegalStateException("Unknown action type: " + tableAction.getType());
}
throw new RuntimeException("Not Found table: " + tableInfo);
}
public synchronized void finishChangingExistingTable(
ActionType actionType,
SimpleTableInfo tableInfo,
String location,
List<String> fileNames,
HivePartitionStatistics statisticsUpdate,
THivePartitionUpdate hivePartitionUpdate) {
Action<TableAndMore> oldTableAction = tableActions.get(tableInfo);
if (oldTableAction == null) {
Table table = hiveOps.getClient().getTable(tableInfo.getDbName(), tableInfo.getTbName());
tableActions.put(
tableInfo,
new Action<>(
actionType,
new TableAndMore(
table,
location,
fileNames,
statisticsUpdate,
hivePartitionUpdate)));
return;
}
switch (oldTableAction.getType()) {
case DROP:
throw new RuntimeException("Not found table: " + tableInfo);
case ADD:
case ALTER:
case INSERT_EXISTING:
case MERGE:
throw new UnsupportedOperationException(
"Inserting into an unpartitioned table that were added, altered,"
+ "or inserted into in the same transaction is not supported");
case DROP_PRESERVE_DATA:
break;
default:
throw new IllegalStateException("Unknown action type: " + oldTableAction.getType());
}
}
public synchronized void createTable(
SimpleTableInfo tableInfo,
Table table, String location, List<String> fileNames,
HivePartitionStatistics statistics,
THivePartitionUpdate hivePartitionUpdate) {
// When creating a table, it should never have partition actions. This is just a sanity check.
checkNoPartitionAction(tableInfo);
Action<TableAndMore> oldTableAction = tableActions.get(tableInfo);
TableAndMore tableAndMore = new TableAndMore(table, location, fileNames, statistics, hivePartitionUpdate);
if (oldTableAction == null) {
tableActions.put(tableInfo, new Action<>(ActionType.ADD, tableAndMore));
return;
}
switch (oldTableAction.getType()) {
case DROP:
tableActions.put(tableInfo, new Action<>(ActionType.ALTER, tableAndMore));
return;
case ADD:
case ALTER:
case INSERT_EXISTING:
case MERGE:
throw new RuntimeException("Table already exists: " + tableInfo);
case DROP_PRESERVE_DATA:
break;
default:
throw new IllegalStateException("Unknown action type: " + oldTableAction.getType());
}
}
public synchronized void dropTable(SimpleTableInfo tableInfo) {
// Dropping table with partition actions requires cleaning up staging data, which is not implemented yet.
checkNoPartitionAction(tableInfo);
Action<TableAndMore> oldTableAction = tableActions.get(tableInfo);
if (oldTableAction == null || oldTableAction.getType() == ActionType.ALTER) {
tableActions.put(tableInfo, new Action<>(ActionType.DROP, null));
return;
}
switch (oldTableAction.getType()) {
case DROP:
throw new RuntimeException("Not found table: " + tableInfo);
case ADD:
case ALTER:
case INSERT_EXISTING:
case MERGE:
throw new RuntimeException("Dropping a table added/modified in the same transaction is not supported");
case DROP_PRESERVE_DATA:
break;
default:
throw new IllegalStateException("Unknown action type: " + oldTableAction.getType());
}
}
private void checkNoPartitionAction(SimpleTableInfo tableInfo) {
Map<List<String>, Action<PartitionAndMore>> partitionActionsForTable =
partitionActions.get(tableInfo);
if (partitionActionsForTable != null && !partitionActionsForTable.isEmpty()) {
throw new RuntimeException(
"Cannot make schema changes to a table with modified partitions in the same transaction");
}
}
public synchronized void addPartition(
SimpleTableInfo tableInfo,
HivePartition partition,
String currentLocation,
String partitionName,
List<String> files,
HivePartitionStatistics statistics,
THivePartitionUpdate hivePartitionUpdate) {
Map<List<String>, Action<PartitionAndMore>> partitionActionsForTable =
partitionActions.computeIfAbsent(tableInfo, k -> new HashMap<>());
Action<PartitionAndMore> oldPartitionAction = partitionActionsForTable.get(partition.getPartitionValues());
if (oldPartitionAction == null) {
partitionActionsForTable.put(
partition.getPartitionValues(),
new Action<>(
ActionType.ADD,
new PartitionAndMore(partition, currentLocation, partitionName, files, statistics,
hivePartitionUpdate))
);
return;
}
switch (oldPartitionAction.getType()) {
case DROP:
case DROP_PRESERVE_DATA:
partitionActionsForTable.put(
partition.getPartitionValues(),
new Action<>(
ActionType.ALTER,
new PartitionAndMore(partition, currentLocation, partitionName, files, statistics,
hivePartitionUpdate))
);
return;
case ADD:
case ALTER:
case INSERT_EXISTING:
case MERGE:
throw new RuntimeException(
"Partition already exists for table: "
+ tableInfo + ", partition values: " + partition
.getPartitionValues());
default:
throw new IllegalStateException("Unknown action type: " + oldPartitionAction.getType());
}
}
public synchronized void dropPartition(
SimpleTableInfo tableInfo,
List<String> partitionValues,
boolean deleteData) {
Map<List<String>, Action<PartitionAndMore>> partitionActionsForTable =
partitionActions.computeIfAbsent(tableInfo, k -> new HashMap<>());
Action<PartitionAndMore> oldPartitionAction = partitionActionsForTable.get(partitionValues);
if (oldPartitionAction == null) {
if (deleteData) {
partitionActionsForTable.put(partitionValues, new Action<>(ActionType.DROP, null));
} else {
partitionActionsForTable.put(partitionValues, new Action<>(ActionType.DROP_PRESERVE_DATA, null));
}
return;
}
switch (oldPartitionAction.getType()) {
case DROP:
case DROP_PRESERVE_DATA:
throw new RuntimeException(
"Not found partition from partition actions for " + tableInfo
+ ", partitions: " + partitionValues);
case ADD:
case ALTER:
case INSERT_EXISTING:
case MERGE:
throw new RuntimeException(
"Dropping a partition added in the same transaction is not supported: "
+ tableInfo + ", partition values: " + partitionValues);
default:
throw new IllegalStateException("Unknown action type: " + oldPartitionAction.getType());
}
}
class HmsCommitter {
// update statistics for unPartitioned table or existed partition
private final List<UpdateStatisticsTask> updateStatisticsTasks = new ArrayList<>();
ExecutorService updateStatisticsExecutor = Executors.newFixedThreadPool(16);
// add new partition
private final AddPartitionsTask addPartitionsTask = new AddPartitionsTask();
// for file system rename operation
// whether to cancel the file system tasks
private final AtomicBoolean fileSystemTaskCancelled = new AtomicBoolean(false);
// file system tasks that are executed asynchronously, including rename_file, rename_dir
private final List<CompletableFuture<?>> asyncFileSystemTaskFutures = new ArrayList<>();
// when aborted, we need to delete all files under this path, even the current directory
private final Queue<DirectoryCleanUpTask> directoryCleanUpTasksForAbort = new ConcurrentLinkedQueue<>();
// when aborted, we need restore directory
private final List<RenameDirectoryTask> renameDirectoryTasksForAbort = new ArrayList<>();
// when finished, we need clear some directories
private final List<String> clearDirsForFinish = new ArrayList<>();
private final List<String> s3cleanWhenSuccess = new ArrayList<>();
public void cancelUnStartedAsyncFileSystemTask() {
fileSystemTaskCancelled.set(true);
}
private void undoUpdateStatisticsTasks() {
ImmutableList.Builder<CompletableFuture<?>> undoUpdateFutures = ImmutableList.builder();
for (UpdateStatisticsTask task : updateStatisticsTasks) {
undoUpdateFutures.add(CompletableFuture.runAsync(() -> {
try {
task.undo(hiveOps);
} catch (Throwable throwable) {
LOG.warn("Failed to rollback: {}", task.getDescription(), throwable);
}
}, updateStatisticsExecutor));
}
for (CompletableFuture<?> undoUpdateFuture : undoUpdateFutures.build()) {
MoreFutures.getFutureValue(undoUpdateFuture);
}
updateStatisticsTasks.clear();
}
private void undoAddPartitionsTask() {
if (addPartitionsTask.isEmpty()) {
return;
}
HivePartition firstPartition = addPartitionsTask.getPartitions().get(0).getPartition();
SimpleTableInfo tableInfo = firstPartition.getTableInfo();
List<List<String>> rollbackFailedPartitions = addPartitionsTask.rollback(hiveOps);
if (!rollbackFailedPartitions.isEmpty()) {
LOG.warn("Failed to rollback: add_partition for partition values {}.{}",
tableInfo, rollbackFailedPartitions);
}
addPartitionsTask.clear();
}
private void waitForAsyncFileSystemTaskSuppressThrowable() {
for (CompletableFuture<?> future : asyncFileSystemTaskFutures) {
try {
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Throwable t) {
// ignore
}
}
asyncFileSystemTaskFutures.clear();
}
public void prepareInsertExistingTable(SimpleTableInfo tableInfo, TableAndMore tableAndMore) {
Table table = tableAndMore.getTable();
String targetPath = table.getSd().getLocation();
String writePath = tableAndMore.getCurrentLocation();
if (!targetPath.equals(writePath)) {
wrapperAsyncRenameWithProfileSummary(
fileSystemExecutor,
asyncFileSystemTaskFutures,
fileSystemTaskCancelled,
writePath,
targetPath,
tableAndMore.getFileNames());
} else {
if (!tableAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures, fileSystemTaskCancelled,
tableAndMore.hivePartitionUpdate, targetPath);
}
}
directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, false));
updateStatisticsTasks.add(
new UpdateStatisticsTask(
tableInfo,
Optional.empty(),
tableAndMore.getStatisticsUpdate(),
true
));
}
public void prepareAlterTable(SimpleTableInfo tableInfo, TableAndMore tableAndMore) {
Table table = tableAndMore.getTable();
String targetPath = table.getSd().getLocation();
String writePath = tableAndMore.getCurrentLocation();
if (!targetPath.equals(writePath)) {
Path path = new Path(targetPath);
String oldTablePath = new Path(
path.getParent(), "_temp_" + queryId + "_" + path.getName()).toString();
Status status = wrapperRenameDirWithProfileSummary(
targetPath,
oldTablePath,
() -> renameDirectoryTasksForAbort.add(new RenameDirectoryTask(oldTablePath, targetPath)));
if (!status.ok()) {
throw new RuntimeException(
"Error to rename dir from " + targetPath + " to " + oldTablePath + status.getErrMsg());
}
clearDirsForFinish.add(oldTablePath);
status = wrapperRenameDirWithProfileSummary(
writePath,
targetPath,
() -> directoryCleanUpTasksForAbort.add(
new DirectoryCleanUpTask(targetPath, true)));
if (!status.ok()) {
throw new RuntimeException(
"Error to rename dir from " + writePath + " to " + targetPath + ":" + status.getErrMsg());
}
} else {
if (!tableAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
s3cleanWhenSuccess.add(targetPath);
s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures, fileSystemTaskCancelled,
tableAndMore.hivePartitionUpdate, targetPath);
}
}
updateStatisticsTasks.add(
new UpdateStatisticsTask(
tableInfo,
Optional.empty(),
tableAndMore.getStatisticsUpdate(),
false
));
}
public void prepareAddPartition(SimpleTableInfo tableInfo, PartitionAndMore partitionAndMore) {
HivePartition partition = partitionAndMore.getPartition();
String targetPath = partition.getPath();
String writePath = partitionAndMore.getCurrentLocation();
if (!targetPath.equals(writePath)) {
wrapperAsyncRenameDirWithProfileSummary(
fileSystemExecutor,
asyncFileSystemTaskFutures,
fileSystemTaskCancelled,
writePath,
targetPath,
() -> directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, true)));
} else {
if (!partitionAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures, fileSystemTaskCancelled,
partitionAndMore.hivePartitionUpdate, targetPath);
}
}
StorageDescriptor sd = getTable(tableInfo).getSd();
HivePartition hivePartition = new HivePartition(
tableInfo,
false,
sd.getInputFormat(),
targetPath,
partition.getPartitionValues(),
Maps.newHashMap(),
sd.getOutputFormat(),
sd.getSerdeInfo().getSerializationLib(),
sd.getCols()
);
HivePartitionWithStatistics partitionWithStats =
new HivePartitionWithStatistics(
partitionAndMore.getPartitionName(),
hivePartition,
partitionAndMore.getStatisticsUpdate());
addPartitionsTask.addPartition(partitionWithStats);
}
public void prepareInsertExistPartition(SimpleTableInfo tableInfo, PartitionAndMore partitionAndMore) {
HivePartition partition = partitionAndMore.getPartition();
String targetPath = partition.getPath();
String writePath = partitionAndMore.getCurrentLocation();
directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, false));
if (!targetPath.equals(writePath)) {
wrapperAsyncRenameWithProfileSummary(
fileSystemExecutor,
asyncFileSystemTaskFutures,
fileSystemTaskCancelled,
writePath,
targetPath,
partitionAndMore.getFileNames());
} else {
if (!partitionAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures, fileSystemTaskCancelled,
partitionAndMore.hivePartitionUpdate, targetPath);
}
}
updateStatisticsTasks.add(
new UpdateStatisticsTask(
tableInfo,
Optional.of(partitionAndMore.getPartitionName()),
partitionAndMore.getStatisticsUpdate(),
true));
}
private void runDirectoryClearUpTasksForAbort() {
for (DirectoryCleanUpTask cleanUpTask : directoryCleanUpTasksForAbort) {
recursiveDeleteItems(cleanUpTask.getPath(), cleanUpTask.isDeleteEmptyDir(), false);
}
directoryCleanUpTasksForAbort.clear();
}
private void runRenameDirTasksForAbort() {
Status status;
for (RenameDirectoryTask task : renameDirectoryTasksForAbort) {
status = fs.exists(task.getRenameFrom());
if (status.ok()) {
status = wrapperRenameDirWithProfileSummary(task.getRenameFrom(), task.getRenameTo(), () -> {
});
if (!status.ok()) {
LOG.warn("Failed to abort rename dir from {} to {}:{}",
task.getRenameFrom(), task.getRenameTo(), status.getErrMsg());
}
}
}
renameDirectoryTasksForAbort.clear();
}
private void runClearPathsForFinish() {
Status status;
for (String path : clearDirsForFinish) {
status = wrapperDeleteDirWithProfileSummary(path);
if (!status.ok()) {
LOG.warn("Failed to recursively delete path {}:{}", path, status.getErrCode());
}
}
}
private void runS3cleanWhenSuccess() {
for (String path : s3cleanWhenSuccess) {
recursiveDeleteItems(new Path(path), false, true);
}
}
public void prepareAlterPartition(SimpleTableInfo tableInfo, PartitionAndMore partitionAndMore) {
HivePartition partition = partitionAndMore.getPartition();
String targetPath = partition.getPath();
String writePath = partitionAndMore.getCurrentLocation();
if (!targetPath.equals(writePath)) {
Path path = new Path(targetPath);
String oldPartitionPath = new Path(
path.getParent(), "_temp_" + queryId + "_" + path.getName()).toString();
Status status = wrapperRenameDirWithProfileSummary(
targetPath,
oldPartitionPath,
() -> renameDirectoryTasksForAbort.add(new RenameDirectoryTask(oldPartitionPath, targetPath)));
if (!status.ok()) {
throw new RuntimeException(
"Error to rename dir "
+ "from " + targetPath
+ " to " + oldPartitionPath + ":" + status.getErrMsg());
}
clearDirsForFinish.add(oldPartitionPath);
status = wrapperRenameDirWithProfileSummary(
writePath,
targetPath,
() -> directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, true)));
if (!status.ok()) {
throw new RuntimeException(
"Error to rename dir from " + writePath + " to " + targetPath + ":" + status.getErrMsg());
}
} else {
if (!partitionAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
s3cleanWhenSuccess.add(targetPath);
s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures, fileSystemTaskCancelled,
partitionAndMore.hivePartitionUpdate, targetPath);
}
}
updateStatisticsTasks.add(
new UpdateStatisticsTask(
tableInfo,
Optional.of(partitionAndMore.getPartitionName()),
partitionAndMore.getStatisticsUpdate(),
false
));
}
private void waitForAsyncFileSystemTasks() {
summaryProfile.ifPresent(SummaryProfile::setTempStartTime);
for (CompletableFuture<?> future : asyncFileSystemTaskFutures) {
MoreFutures.getFutureValue(future, RuntimeException.class);
}
summaryProfile.ifPresent(SummaryProfile::freshFilesystemOptTime);
}
private void doAddPartitionsTask() {
summaryProfile.ifPresent(profile -> {
profile.setTempStartTime();
profile.addHmsAddPartitionCnt(addPartitionsTask.getPartitions().size());
});
if (!addPartitionsTask.isEmpty()) {
addPartitionsTask.run(hiveOps);
}
summaryProfile.ifPresent(SummaryProfile::setHmsAddPartitionTime);
}
private void doUpdateStatisticsTasks() {
summaryProfile.ifPresent(profile -> {
profile.setTempStartTime();
profile.addHmsUpdatePartitionCnt(updateStatisticsTasks.size());
});
ImmutableList.Builder<CompletableFuture<?>> updateStatsFutures = ImmutableList.builder();
List<String> failedTaskDescriptions = new ArrayList<>();
List<Throwable> suppressedExceptions = new ArrayList<>();
for (UpdateStatisticsTask task : updateStatisticsTasks) {
updateStatsFutures.add(CompletableFuture.runAsync(() -> {
try {
task.run(hiveOps);
} catch (Throwable t) {
synchronized (suppressedExceptions) {
addSuppressedExceptions(
suppressedExceptions, t, failedTaskDescriptions, task.getDescription());
}
}
}, updateStatisticsExecutor));
}
for (CompletableFuture<?> executeUpdateFuture : updateStatsFutures.build()) {
MoreFutures.getFutureValue(executeUpdateFuture);
}
if (!suppressedExceptions.isEmpty()) {
StringBuilder message = new StringBuilder();
message.append("Failed to execute some updating statistics tasks: ");
Joiner.on("; ").appendTo(message, failedTaskDescriptions);
RuntimeException exception = new RuntimeException(message.toString());
suppressedExceptions.forEach(exception::addSuppressed);
throw exception;
}
summaryProfile.ifPresent(SummaryProfile::setHmsUpdatePartitionTime);
}
private void pruneAndDeleteStagingDirectories() {
stagingDirectory.ifPresent((v) -> recursiveDeleteItems(new Path(v), true, false));
}
private void abortMultiUploads() {
if (uncompletedMpuPendingUploads.isEmpty()) {
return;
}
for (UncompletedMpuPendingUpload uncompletedMpuPendingUpload : uncompletedMpuPendingUploads) {
S3FileSystem s3FileSystem = (S3FileSystem) ((SwitchingFileSystem) fs)
.fileSystem(uncompletedMpuPendingUpload.path);
S3Client s3Client;
try {
s3Client = (S3Client) s3FileSystem.getObjStorage().getClient();
} catch (UserException e) {
throw new RuntimeException(e);
}
asyncFileSystemTaskFutures.add(CompletableFuture.runAsync(() -> {
s3Client.abortMultipartUpload(AbortMultipartUploadRequest.builder()
.bucket(uncompletedMpuPendingUpload.s3MPUPendingUpload.getBucket())
.key(uncompletedMpuPendingUpload.s3MPUPendingUpload.getKey())
.uploadId(uncompletedMpuPendingUpload.s3MPUPendingUpload.getUploadId())
.build());
}, fileSystemExecutor));
}
uncompletedMpuPendingUploads.clear();
}
public void doNothing() {
// do nothing
// only for regression test and unit test to throw exception
}
public void doCommit() {
waitForAsyncFileSystemTasks();
runS3cleanWhenSuccess();
doAddPartitionsTask();
doUpdateStatisticsTasks();
//delete write path
pruneAndDeleteStagingDirectories();
doNothing();
}
public void abort() {
cancelUnStartedAsyncFileSystemTask();
undoUpdateStatisticsTasks();
undoAddPartitionsTask();
waitForAsyncFileSystemTaskSuppressThrowable();
runDirectoryClearUpTasksForAbort();
runRenameDirTasksForAbort();
}
public void rollback() {
//delete write path
pruneAndDeleteStagingDirectories();
// abort the in-progress multipart uploads
abortMultiUploads();
for (CompletableFuture<?> future : asyncFileSystemTaskFutures) {
MoreFutures.getFutureValue(future, RuntimeException.class);
}
asyncFileSystemTaskFutures.clear();
}
public void shutdownExecutorService() {
// Disable new tasks from being submitted
updateStatisticsExecutor.shutdown();
try {
// Wait a while for existing tasks to terminate
if (!updateStatisticsExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
// Cancel currently executing tasks
updateStatisticsExecutor.shutdownNow();
// Wait a while for tasks to respond to being cancelled
if (!updateStatisticsExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
LOG.warn("Pool did not terminate");
}
}
} catch (InterruptedException e) {
// (Re-)Cancel if current thread also interrupted
updateStatisticsExecutor.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
}
public Status wrapperRenameDirWithProfileSummary(String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
summaryProfile.ifPresent(profile -> {
profile.setTempStartTime();
profile.incRenameDirCnt();
});
Status status = fs.renameDir(origFilePath, destFilePath, runWhenPathNotExist);
summaryProfile.ifPresent(SummaryProfile::freshFilesystemOptTime);
return status;
}
public Status wrapperDeleteWithProfileSummary(String remotePath) {
summaryProfile.ifPresent(profile -> {
profile.setTempStartTime();
profile.incDeleteFileCnt();
});
Status status = fs.delete(remotePath);
summaryProfile.ifPresent(SummaryProfile::freshFilesystemOptTime);
return status;
}
public Status wrapperDeleteDirWithProfileSummary(String remotePath) {
summaryProfile.ifPresent(profile -> {
profile.setTempStartTime();
profile.incDeleteDirRecursiveCnt();
});
Status status = fs.deleteDirectory(remotePath);
summaryProfile.ifPresent(SummaryProfile::freshFilesystemOptTime);
return status;
}
public void wrapperAsyncRenameWithProfileSummary(Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
List<String> fileNames) {
FileSystemUtil.asyncRenameFiles(
fs, executor, renameFileFutures, cancelled, origFilePath, destFilePath, fileNames);
summaryProfile.ifPresent(profile -> profile.addRenameFileCnt(fileNames.size()));
}
public void wrapperAsyncRenameDirWithProfileSummary(Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
FileSystemUtil.asyncRenameDir(
fs, executor, renameFileFutures, cancelled, origFilePath, destFilePath, runWhenPathNotExist);
summaryProfile.ifPresent(SummaryProfile::incRenameDirCnt);
}
private void s3Commit(Executor fileSystemExecutor, List<CompletableFuture<?>> asyncFileSystemTaskFutures,
AtomicBoolean fileSystemTaskCancelled, THivePartitionUpdate hivePartitionUpdate, String path) {
List<TS3MPUPendingUpload> s3MpuPendingUploads = hivePartitionUpdate.getS3MpuPendingUploads();
if (isMockedPartitionUpdate) {
return;
}
S3FileSystem s3FileSystem = (S3FileSystem) ((SwitchingFileSystem) fs).fileSystem(path);
S3Client s3Client;
try {
s3Client = (S3Client) s3FileSystem.getObjStorage().getClient();
} catch (UserException e) {
throw new RuntimeException(e);
}
for (TS3MPUPendingUpload s3MPUPendingUpload : s3MpuPendingUploads) {
asyncFileSystemTaskFutures.add(CompletableFuture.runAsync(() -> {
if (fileSystemTaskCancelled.get()) {
return;
}
List<CompletedPart> completedParts = Lists.newArrayList();
for (Map.Entry<Integer, String> entry : s3MPUPendingUpload.getEtags().entrySet()) {
completedParts.add(CompletedPart.builder().eTag(entry.getValue()).partNumber(entry.getKey())
.build());
}
s3Client.completeMultipartUpload(CompleteMultipartUploadRequest.builder()
.bucket(s3MPUPendingUpload.getBucket())
.key(s3MPUPendingUpload.getKey())
.uploadId(s3MPUPendingUpload.getUploadId())
.multipartUpload(CompletedMultipartUpload.builder().parts(completedParts).build())
.build());
uncompletedMpuPendingUploads.remove(new UncompletedMpuPendingUpload(s3MPUPendingUpload, path));
}, fileSystemExecutor));
}
}
}