BatchCreatePartitionsHelper.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
//
// Copied from
// https://github.com/awslabs/aws-glue-data-catalog-client-for-apache-hive-metastore/blob/branch-3.4.0/
//

package com.amazonaws.glue.catalog.util;

import com.amazonaws.glue.catalog.converters.CatalogToHiveConverter;
import com.amazonaws.glue.catalog.converters.CatalogToHiveConverterFactory;
import com.amazonaws.glue.catalog.converters.GlueInputConverter;
import com.amazonaws.glue.catalog.metastore.AWSGlueMetastore;
import static com.amazonaws.glue.catalog.util.PartitionUtils.isInvalidUserInputException;
import com.amazonaws.services.glue.model.EntityNotFoundException;
import com.amazonaws.services.glue.model.Partition;
import com.amazonaws.services.glue.model.PartitionError;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.log4j.Logger;
import shade.doris.hive.org.apache.thrift.TException;

import java.util.Collection;
import java.util.List;
import java.util.Map;

public final class BatchCreatePartitionsHelper {

  private static final Logger logger = Logger.getLogger(BatchCreatePartitionsHelper.class);

  private final AWSGlueMetastore glueClient;
  private final String databaseName;
  private final String tableName;
  private final List<Partition> partitions;
  private final boolean ifNotExists;
  private Map<PartitionKey, Partition> partitionMap;
  private List<Partition> partitionsFailed;
  private TException firstTException;
  private String catalogId;
  private CatalogToHiveConverter catalogToHiveConverter;

  public BatchCreatePartitionsHelper(AWSGlueMetastore glueClient, String databaseName, String tableName, String catalogId,
          List<Partition> partitions, boolean ifNotExists) {
    this.glueClient = glueClient;
    this.databaseName = databaseName;
    this.tableName = tableName;
    this.catalogId = catalogId;
    this.partitions = partitions;
    this.ifNotExists = ifNotExists;
    catalogToHiveConverter = CatalogToHiveConverterFactory.getCatalogToHiveConverter();
  }

  public BatchCreatePartitionsHelper createPartitions() {
    partitionMap = PartitionUtils.buildPartitionMap(partitions);
    partitionsFailed = Lists.newArrayList();

    try {
      List<PartitionError> result =
              glueClient.createPartitions(databaseName, tableName,
                      GlueInputConverter.convertToPartitionInputs(partitionMap.values()));
      processResult(result);
    } catch (Exception e) {
      logger.error("Exception thrown while creating partitions in DataCatalog: ", e);
      firstTException = catalogToHiveConverter.wrapInHiveException(e);
      if (isInvalidUserInputException(e)) {
        setAllFailed();
      } else {
        checkIfPartitionsCreated();
      }
    }
    return this;
  }

  private void setAllFailed() {
    partitionsFailed = partitions;
    partitionMap.clear();
  }

  private void processResult(List<PartitionError> partitionErrors) {
    if (partitionErrors == null || partitionErrors.isEmpty()) {
      return;
    }

    logger.error(String.format("BatchCreatePartitions failed to create %d out of %d partitions. \n",
            partitionErrors.size(), partitionMap.size()));

    for (PartitionError partitionError : partitionErrors) {
      Partition partitionFailed = partitionMap.remove(new PartitionKey(partitionError.getPartitionValues()));

      TException exception = catalogToHiveConverter.errorDetailToHiveException(partitionError.getErrorDetail());
      if (ifNotExists && exception instanceof AlreadyExistsException) {
        // AlreadyExistsException is allowed, so we shouldn't add the partition to partitionsFailed list
        continue;
      }
      logger.error(exception);
      if (firstTException == null) {
        firstTException = exception;
      }
      partitionsFailed.add(partitionFailed);
    }
  }

  private void checkIfPartitionsCreated() {
    for (Partition partition : partitions) {
      if (!partitionExists(partition)) {
        partitionsFailed.add(partition);
        partitionMap.remove(new PartitionKey(partition));
      }
    }
  }

  private boolean partitionExists(Partition partition) {
    try {
      Partition partitionReturned = glueClient.getPartition(databaseName, tableName, partition.getValues());
      return partitionReturned != null; //probably always true here
    } catch (EntityNotFoundException e) {
      // here we assume namespace and table exist. It is assured by calling "isInvalidUserInputException" method above
      return false;
    } catch (Exception e) {
      logger.error(String.format("Get partition request %s failed. ", StringUtils.join(partition.getValues(), "/")), e);
      // partition status unknown, we assume that the partition was not created
      return false;
    }
  }

  public TException getFirstTException() {
    return firstTException;
  }

  public Collection<Partition> getPartitionsCreated() {
    return partitionMap.values();
  }

  public List<Partition> getPartitionsFailed() {
    return partitionsFailed;
  }

}