DefaultAWSGlueMetastore.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
//
// Copied from
// https://github.com/awslabs/aws-glue-data-catalog-client-for-apache-hive-metastore/blob/branch-3.4.0/
//

package com.amazonaws.glue.catalog.metastore;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.glue.catalog.converters.PartitionNameParser;
import com.amazonaws.glue.catalog.util.MetastoreClientUtils;
import com.amazonaws.services.glue.AWSGlue;
import com.amazonaws.services.glue.model.BatchCreatePartitionRequest;
import com.amazonaws.services.glue.model.BatchGetPartitionRequest;
import com.amazonaws.services.glue.model.BatchGetPartitionResult;
import com.amazonaws.services.glue.model.ColumnStatistics;
import com.amazonaws.services.glue.model.ColumnStatisticsError;
import com.amazonaws.services.glue.model.CreateDatabaseRequest;
import com.amazonaws.services.glue.model.CreateTableRequest;
import com.amazonaws.services.glue.model.CreateUserDefinedFunctionRequest;
import com.amazonaws.services.glue.model.Database;
import com.amazonaws.services.glue.model.DatabaseInput;
import com.amazonaws.services.glue.model.DeleteColumnStatisticsForPartitionRequest;
import com.amazonaws.services.glue.model.DeleteColumnStatisticsForTableRequest;
import com.amazonaws.services.glue.model.DeleteDatabaseRequest;
import com.amazonaws.services.glue.model.DeletePartitionRequest;
import com.amazonaws.services.glue.model.DeleteTableRequest;
import com.amazonaws.services.glue.model.DeleteUserDefinedFunctionRequest;
import com.amazonaws.services.glue.model.GetColumnStatisticsForPartitionRequest;
import com.amazonaws.services.glue.model.GetColumnStatisticsForPartitionResult;
import com.amazonaws.services.glue.model.GetColumnStatisticsForTableRequest;
import com.amazonaws.services.glue.model.GetColumnStatisticsForTableResult;
import com.amazonaws.services.glue.model.GetDatabaseRequest;
import com.amazonaws.services.glue.model.GetDatabaseResult;
import com.amazonaws.services.glue.model.GetDatabasesRequest;
import com.amazonaws.services.glue.model.GetDatabasesResult;
import com.amazonaws.services.glue.model.GetPartitionRequest;
import com.amazonaws.services.glue.model.GetPartitionsRequest;
import com.amazonaws.services.glue.model.GetPartitionsResult;
import com.amazonaws.services.glue.model.GetTableRequest;
import com.amazonaws.services.glue.model.GetTableResult;
import com.amazonaws.services.glue.model.GetTablesRequest;
import com.amazonaws.services.glue.model.GetTablesResult;
import com.amazonaws.services.glue.model.GetUserDefinedFunctionRequest;
import com.amazonaws.services.glue.model.GetUserDefinedFunctionsRequest;
import com.amazonaws.services.glue.model.GetUserDefinedFunctionsResult;
import com.amazonaws.services.glue.model.Partition;
import com.amazonaws.services.glue.model.PartitionError;
import com.amazonaws.services.glue.model.PartitionInput;
import com.amazonaws.services.glue.model.PartitionValueList;
import com.amazonaws.services.glue.model.Segment;
import com.amazonaws.services.glue.model.Table;
import com.amazonaws.services.glue.model.TableInput;
import com.amazonaws.services.glue.model.UpdateColumnStatisticsForPartitionRequest;
import com.amazonaws.services.glue.model.UpdateColumnStatisticsForPartitionResult;
import com.amazonaws.services.glue.model.UpdateColumnStatisticsForTableRequest;
import com.amazonaws.services.glue.model.UpdateColumnStatisticsForTableResult;
import com.amazonaws.services.glue.model.UpdateDatabaseRequest;
import com.amazonaws.services.glue.model.UpdatePartitionRequest;
import com.amazonaws.services.glue.model.UpdateTableRequest;
import com.amazonaws.services.glue.model.UpdateUserDefinedFunctionRequest;
import com.amazonaws.services.glue.model.UserDefinedFunction;
import com.amazonaws.services.glue.model.UserDefinedFunctionInput;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.thrift.TException;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class DefaultAWSGlueMetastore implements AWSGlueMetastore {

    public static final int BATCH_GET_PARTITIONS_MAX_REQUEST_SIZE = 1000;
    /**
     * Based on the maxResults parameter at https://docs.aws.amazon.com/glue/latest/webapi/API_GetPartitions.html
     */
    public static final int GET_PARTITIONS_MAX_SIZE = 1000;
    /**
     * Maximum number of Glue Segments. A segment defines a non-overlapping region of a table's partitions,
     * allowing multiple requests to be executed in parallel.
     */
    public static final int DEFAULT_NUM_PARTITION_SEGMENTS = 5;
    /**
     * Currently the upper limit allowed by Glue is 10.
     * https://docs.aws.amazon.com/glue/latest/webapi/API_Segment.html
     */
    public static final int MAX_NUM_PARTITION_SEGMENTS = 10;
    public static final String NUM_PARTITION_SEGMENTS_CONF = "aws.glue.partition.num.segments";
    public static final String CUSTOM_EXECUTOR_FACTORY_CONF = "hive.metastore.executorservice.factory.class";

    /**
     * Based on the ColumnNames parameter at https://docs.aws.amazon.com/glue/latest/webapi/API_GetColumnStatisticsForPartition.html
     */
    public static final int GET_COLUMNS_STAT_MAX_SIZE = 100;
    public static final int UPDATE_COLUMNS_STAT_MAX_SIZE = 25;

    /**
     * To be used with UpdateTable
     */
    public static final String SKIP_AWS_GLUE_ARCHIVE = "skipAWSGlueArchive";

    private static final int NUM_EXECUTOR_THREADS = 5;
    static final String GLUE_METASTORE_DELEGATE_THREADPOOL_NAME_FORMAT = "glue-metastore-delegate-%d";
    private static final ExecutorService GLUE_METASTORE_DELEGATE_THREAD_POOL = Executors.newFixedThreadPool(
            NUM_EXECUTOR_THREADS,
            new ThreadFactoryBuilder()
                    .setNameFormat(GLUE_METASTORE_DELEGATE_THREADPOOL_NAME_FORMAT)
                    .setDaemon(true).build()
    );

    private final Configuration conf;
    private final AWSGlue glueClient;
    private final String catalogId;
    private final ExecutorService executorService;
    private final int numPartitionSegments;

    protected ExecutorService getExecutorService(Configuration conf) {
        Class<? extends ExecutorServiceFactory> executorFactoryClass = conf
                .getClass(CUSTOM_EXECUTOR_FACTORY_CONF,
                        DefaultExecutorServiceFactory.class).asSubclass(
                        ExecutorServiceFactory.class);
        ExecutorServiceFactory factory = ReflectionUtils.newInstance(
                executorFactoryClass, conf);
        return factory.getExecutorService(conf);
    }

    public DefaultAWSGlueMetastore(Configuration conf, AWSGlue glueClient) {
        checkNotNull(conf, "Hive Config cannot be null");
        checkNotNull(glueClient, "glueClient cannot be null");
        this.numPartitionSegments = conf.getInt(NUM_PARTITION_SEGMENTS_CONF, DEFAULT_NUM_PARTITION_SEGMENTS);
        checkArgument(numPartitionSegments <= MAX_NUM_PARTITION_SEGMENTS,
                String.format("Hive Config [%s] can't exceed %d", NUM_PARTITION_SEGMENTS_CONF, MAX_NUM_PARTITION_SEGMENTS));
        this.conf = conf;
        this.glueClient = glueClient;
        this.catalogId = MetastoreClientUtils.getCatalogId(conf);
        this.executorService = getExecutorService(conf);
    }

    // ======================= Database =======================

    @Override
    public void createDatabase(DatabaseInput databaseInput) {
        CreateDatabaseRequest createDatabaseRequest = new CreateDatabaseRequest().withDatabaseInput(databaseInput)
                .withCatalogId(catalogId);
        glueClient.createDatabase(createDatabaseRequest);
    }

    @Override
    public Database getDatabase(String dbName) {
        GetDatabaseRequest getDatabaseRequest = new GetDatabaseRequest().withCatalogId(catalogId).withName(dbName);
        GetDatabaseResult result = glueClient.getDatabase(getDatabaseRequest);
        return result.getDatabase();
    }

    @Override
    public List<Database> getAllDatabases() {
        List<Database> ret = Lists.newArrayList();
        String nextToken = null;
        do {
            GetDatabasesRequest getDatabasesRequest = new GetDatabasesRequest().withNextToken(nextToken).withCatalogId(
                    catalogId);
            GetDatabasesResult result = glueClient.getDatabases(getDatabasesRequest);
            nextToken = result.getNextToken();
            ret.addAll(result.getDatabaseList());
        } while (nextToken != null);
        return ret;
    }

    @Override
    public void updateDatabase(String databaseName, DatabaseInput databaseInput) {
        UpdateDatabaseRequest updateDatabaseRequest = new UpdateDatabaseRequest().withName(databaseName)
                .withDatabaseInput(databaseInput).withCatalogId(catalogId);
        glueClient.updateDatabase(updateDatabaseRequest);
    }

    @Override
    public void deleteDatabase(String dbName) {
        DeleteDatabaseRequest deleteDatabaseRequest = new DeleteDatabaseRequest().withName(dbName).withCatalogId(
                catalogId);
        glueClient.deleteDatabase(deleteDatabaseRequest);
    }

    // ======================== Table ========================

    @Override
    public void createTable(String dbName, TableInput tableInput) {
        CreateTableRequest createTableRequest = new CreateTableRequest().withTableInput(tableInput)
                .withDatabaseName(dbName).withCatalogId(catalogId);
        glueClient.createTable(createTableRequest);
    }

    @Override
    public Table getTable(String dbName, String tableName) {
        GetTableRequest getTableRequest = new GetTableRequest().withDatabaseName(dbName).withName(tableName)
                .withCatalogId(catalogId);
        GetTableResult result = glueClient.getTable(getTableRequest);
        return result.getTable();
    }

    @Override
    public List<Table> getTables(String dbname, String tablePattern) {
        List<Table> ret = new ArrayList<>();
        String nextToken = null;
        do {
            GetTablesRequest getTablesRequest = new GetTablesRequest().withDatabaseName(dbname)
                    .withExpression(tablePattern).withNextToken(nextToken).withCatalogId(catalogId);
            GetTablesResult result = glueClient.getTables(getTablesRequest);
            ret.addAll(result.getTableList());
            nextToken = result.getNextToken();
        } while (nextToken != null);
        return ret;
    }

    @Override
    public void updateTable(String dbName, TableInput tableInput) {
        UpdateTableRequest updateTableRequest = new UpdateTableRequest().withDatabaseName(dbName)
                .withTableInput(tableInput).withCatalogId(catalogId);
        glueClient.updateTable(updateTableRequest);
    }

    @Override
    public void updateTable(String dbName, TableInput tableInput, EnvironmentContext environmentContext) {
        UpdateTableRequest updateTableRequest = new UpdateTableRequest().withDatabaseName(dbName)
                .withTableInput(tableInput).withCatalogId(catalogId).withSkipArchive(skipArchive(environmentContext));
        glueClient.updateTable(updateTableRequest);
    }

    private boolean skipArchive(EnvironmentContext environmentContext) {
        return environmentContext != null &&
                environmentContext.isSetProperties() &&
                StatsSetupConst.TRUE.equals(environmentContext.getProperties().get(SKIP_AWS_GLUE_ARCHIVE));
    }

    @Override
    public void deleteTable(String dbName, String tableName) {
        DeleteTableRequest deleteTableRequest = new DeleteTableRequest().withDatabaseName(dbName).withName(tableName)
                .withCatalogId(catalogId);
        glueClient.deleteTable(deleteTableRequest);
    }

    // =========================== Partition ===========================

    @Override
    public Partition getPartition(String dbName, String tableName, List<String> partitionValues) {
        GetPartitionRequest request = new GetPartitionRequest()
                .withDatabaseName(dbName)
                .withTableName(tableName)
                .withPartitionValues(partitionValues)
                .withCatalogId(catalogId);
        return glueClient.getPartition(request).getPartition();
    }

    @Override
    public List<Partition> getPartitionsByNames(String dbName, String tableName,
            List<PartitionValueList> partitionsToGet) {

        List<List<PartitionValueList>> batchedPartitionsToGet = Lists.partition(partitionsToGet,
                BATCH_GET_PARTITIONS_MAX_REQUEST_SIZE);
        List<Future<BatchGetPartitionResult>> batchGetPartitionFutures = Lists.newArrayList();

        for (List<PartitionValueList> batch : batchedPartitionsToGet) {
            final BatchGetPartitionRequest request = new BatchGetPartitionRequest()
                    .withDatabaseName(dbName)
                    .withTableName(tableName)
                    .withPartitionsToGet(batch)
                    .withCatalogId(catalogId);
            batchGetPartitionFutures.add(this.executorService.submit(new Callable<BatchGetPartitionResult>() {
                @Override
                public BatchGetPartitionResult call() throws Exception {
                    return glueClient.batchGetPartition(request);
                }
            }));
        }

        List<Partition> result = Lists.newArrayList();
        try {
            for (Future<BatchGetPartitionResult> future : batchGetPartitionFutures) {
                result.addAll(future.get().getPartitions());
            }
        } catch (ExecutionException e) {
            Throwables.propagateIfInstanceOf(e.getCause(), AmazonServiceException.class);
            Throwables.propagate(e.getCause());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return result;
    }

    @Override
    public List<Partition> getPartitions(String dbName, String tableName, String expression,
            long max) throws TException {
        if (max == 0) {
            return Collections.emptyList();
        }
        if (max < 0 || max > GET_PARTITIONS_MAX_SIZE) {
            return getPartitionsParallel(dbName, tableName, expression, max);
        } else {
            // We don't need to get too many partitions, so just do it serially.
            return getCatalogPartitions(dbName, tableName, expression, max, null);
        }
    }

    private List<Partition> getPartitionsParallel(
            final String databaseName,
            final String tableName,
            final String expression,
            final long max) throws TException {
        // Prepare the segments
        List<Segment> segments = Lists.newArrayList();
        for (int i = 0; i < numPartitionSegments; i++) {
            segments.add(new Segment()
                    .withSegmentNumber(i)
                    .withTotalSegments(numPartitionSegments));
        }
        // Submit Glue API calls in parallel using the thread pool.
        // We could convert this into a parallelStream after upgrading to JDK 8 compiler base.
        List<Future<List<Partition>>> futures = Lists.newArrayList();
        for (final Segment segment : segments) {
            futures.add(this.executorService.submit(new Callable<List<Partition>>() {
                @Override
                public List<Partition> call() throws Exception {
                    return getCatalogPartitions(databaseName, tableName, expression, max, segment);
                }
            }));
        }

        // Get the results
        List<Partition> partitions = Lists.newArrayList();
        try {
            for (Future<List<Partition>> future : futures) {
                List<Partition> segmentPartitions = future.get();
                if (partitions.size() + segmentPartitions.size() >= max && max > 0) {
                    // Extract the required number of partitions from the segment and we're done.
                    long remaining = max - partitions.size();
                    partitions.addAll(segmentPartitions.subList(0, (int) remaining));
                    break;
                } else {
                    partitions.addAll(segmentPartitions);
                }
            }
        } catch (ExecutionException e) {
            Throwables.propagateIfInstanceOf(e.getCause(), AmazonServiceException.class);
            Throwables.propagate(e.getCause());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return partitions;
    }


    private List<Partition> getCatalogPartitions(String databaseName, String tableName, String expression,
            long max, Segment segment) {
        List<Partition> partitions = Lists.newArrayList();
        String nextToken = null;
        do {
            GetPartitionsRequest request = new GetPartitionsRequest()
                    .withDatabaseName(databaseName)
                    .withTableName(tableName)
                    .withExpression(expression)
                    .withNextToken(nextToken)
                    .withCatalogId(catalogId)
                    .withSegment(segment);
            GetPartitionsResult res = glueClient.getPartitions(request);
            List<Partition> list = res.getPartitions();
            if ((partitions.size() + list.size()) >= max && max > 0) {
                long remaining = max - partitions.size();
                partitions.addAll(list.subList(0, (int) remaining));
                break;
            }
            partitions.addAll(list);
            nextToken = res.getNextToken();
        } while (nextToken != null);
        return partitions;
    }

    @Override
    public void updatePartition(String dbName, String tableName, List<String> partitionValues,
            PartitionInput partitionInput) {
        UpdatePartitionRequest updatePartitionRequest = new UpdatePartitionRequest().withDatabaseName(dbName)
                .withTableName(tableName).withPartitionValueList(partitionValues)
                .withPartitionInput(partitionInput).withCatalogId(catalogId);
        glueClient.updatePartition(updatePartitionRequest);
    }

    @Override
    public void deletePartition(String dbName, String tableName, List<String> partitionValues) {
        DeletePartitionRequest request = new DeletePartitionRequest()
                .withDatabaseName(dbName)
                .withTableName(tableName)
                .withPartitionValues(partitionValues)
                .withCatalogId(catalogId);
        glueClient.deletePartition(request);
    }

    @Override
    public List<PartitionError> createPartitions(String dbName, String tableName,
            List<PartitionInput> partitionInputs) {
        BatchCreatePartitionRequest request =
                new BatchCreatePartitionRequest().withDatabaseName(dbName)
                        .withTableName(tableName).withCatalogId(catalogId)
                        .withPartitionInputList(partitionInputs);
        return glueClient.batchCreatePartition(request).getErrors();
    }

    // ====================== User Defined Function ======================

    @Override
    public void createUserDefinedFunction(String dbName, UserDefinedFunctionInput functionInput) {
        CreateUserDefinedFunctionRequest createUserDefinedFunctionRequest = new CreateUserDefinedFunctionRequest()
                .withDatabaseName(dbName).withFunctionInput(functionInput).withCatalogId(catalogId);
        glueClient.createUserDefinedFunction(createUserDefinedFunctionRequest);
    }

    @Override
    public UserDefinedFunction getUserDefinedFunction(String dbName, String functionName) {
        GetUserDefinedFunctionRequest getUserDefinedFunctionRequest = new GetUserDefinedFunctionRequest()
                .withDatabaseName(dbName).withFunctionName(functionName).withCatalogId(catalogId);
        return glueClient.getUserDefinedFunction(getUserDefinedFunctionRequest).getUserDefinedFunction();
    }

    @Override
    public List<UserDefinedFunction> getUserDefinedFunctions(String dbName, String pattern) {
        List<UserDefinedFunction> ret = Lists.newArrayList();
        String nextToken = null;
        do {
            GetUserDefinedFunctionsRequest getUserDefinedFunctionsRequest = new GetUserDefinedFunctionsRequest()
                    .withDatabaseName(dbName).withPattern(pattern).withNextToken(nextToken).withCatalogId(catalogId);
            GetUserDefinedFunctionsResult result = glueClient.getUserDefinedFunctions(getUserDefinedFunctionsRequest);
            nextToken = result.getNextToken();
            ret.addAll(result.getUserDefinedFunctions());
        } while (nextToken != null);
        return ret;
    }

    @Override
    public List<UserDefinedFunction> getUserDefinedFunctions(String pattern) {
        List<UserDefinedFunction> ret = Lists.newArrayList();
        String nextToken = null;
        do {
            GetUserDefinedFunctionsRequest getUserDefinedFunctionsRequest = new GetUserDefinedFunctionsRequest()
                    .withPattern(pattern).withNextToken(nextToken).withCatalogId(catalogId);
            GetUserDefinedFunctionsResult result = glueClient.getUserDefinedFunctions(getUserDefinedFunctionsRequest);
            nextToken = result.getNextToken();
            ret.addAll(result.getUserDefinedFunctions());
        } while (nextToken != null);
        return ret;
    }

    @Override
    public void deleteUserDefinedFunction(String dbName, String functionName) {
        DeleteUserDefinedFunctionRequest deleteUserDefinedFunctionRequest = new DeleteUserDefinedFunctionRequest()
                .withDatabaseName(dbName).withFunctionName(functionName).withCatalogId(catalogId);
        glueClient.deleteUserDefinedFunction(deleteUserDefinedFunctionRequest);
    }

    @Override
    public void updateUserDefinedFunction(String dbName, String functionName, UserDefinedFunctionInput functionInput) {
        UpdateUserDefinedFunctionRequest updateUserDefinedFunctionRequest = new UpdateUserDefinedFunctionRequest()
                .withDatabaseName(dbName).withFunctionName(functionName).withFunctionInput(functionInput)
                .withCatalogId(catalogId);
        glueClient.updateUserDefinedFunction(updateUserDefinedFunctionRequest);
    }

    @Override
    public void deletePartitionColumnStatistics(String dbName, String tableName, List<String> partitionValues, String colName) {
        DeleteColumnStatisticsForPartitionRequest request = new DeleteColumnStatisticsForPartitionRequest()
                .withCatalogId(catalogId)
                .withDatabaseName(dbName)
                .withTableName(tableName)
                .withPartitionValues(partitionValues)
                .withColumnName(colName);
        glueClient.deleteColumnStatisticsForPartition(request);
    }

    @Override
    public void deleteTableColumnStatistics(String dbName, String tableName, String colName) {
        DeleteColumnStatisticsForTableRequest request = new DeleteColumnStatisticsForTableRequest()
                .withCatalogId(catalogId)
                .withDatabaseName(dbName)
                .withTableName(tableName)
                .withColumnName(colName);
        glueClient.deleteColumnStatisticsForTable(request);
    }

    @Override
    public Map<String, List<ColumnStatistics>> getPartitionColumnStatistics(String dbName, String tableName, List<String> partitionValues, List<String> columnNames) {
        Map<String, List<ColumnStatistics>> partitionStatistics = new HashMap<>();
        List<List<String>> pagedColNames = Lists.partition(columnNames, GET_COLUMNS_STAT_MAX_SIZE);
        List<String> partValues;
        for (String partName : partitionValues) {
            partValues = PartitionNameParser.getPartitionValuesFromName(partName);
            List<Future<GetColumnStatisticsForPartitionResult>> pagedResult = new ArrayList<>();
            for (List<String> cols : pagedColNames) {
                GetColumnStatisticsForPartitionRequest request = new GetColumnStatisticsForPartitionRequest()
                        .withCatalogId(catalogId)
                        .withDatabaseName(dbName)
                        .withTableName(tableName)
                        .withPartitionValues(partValues)
                        .withColumnNames(cols);
                pagedResult.add(GLUE_METASTORE_DELEGATE_THREAD_POOL.submit(new Callable<GetColumnStatisticsForPartitionResult>() {
                    @Override
                    public GetColumnStatisticsForPartitionResult call() throws Exception {
                        return glueClient.getColumnStatisticsForPartition(request);
                    }
                }));
            }

            List<ColumnStatistics> result = new ArrayList<>();
            for (Future<GetColumnStatisticsForPartitionResult> page : pagedResult) {
                try {
                    result.addAll(page.get().getColumnStatisticsList());
                } catch (ExecutionException e) {
                    Throwables.propagateIfInstanceOf(e.getCause(), AmazonServiceException.class);
                    Throwables.propagate(e.getCause());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            partitionStatistics.put(partName, result);
        }
        return partitionStatistics;
    }

    @Override
    public List<ColumnStatistics> getTableColumnStatistics(String dbName, String tableName, List<String> colNames) {
        List<List<String>> pagedColNames = Lists.partition(colNames, GET_COLUMNS_STAT_MAX_SIZE);
        List<Future<GetColumnStatisticsForTableResult>> pagedResult = new ArrayList<>();

        for (List<String> cols : pagedColNames) {
            GetColumnStatisticsForTableRequest request = new GetColumnStatisticsForTableRequest()
                    .withCatalogId(catalogId)
                    .withDatabaseName(dbName)
                    .withTableName(tableName)
                    .withColumnNames(cols);
            pagedResult.add(GLUE_METASTORE_DELEGATE_THREAD_POOL.submit(new Callable<GetColumnStatisticsForTableResult>() {
                @Override
                public GetColumnStatisticsForTableResult call() throws Exception {
                    return glueClient.getColumnStatisticsForTable(request);
                }
            }));
        }
        List<ColumnStatistics> results = new ArrayList<>();

        for (Future<GetColumnStatisticsForTableResult> page : pagedResult) {
            try {
                results.addAll(page.get().getColumnStatisticsList());
            } catch (ExecutionException e) {
                Throwables.propagateIfInstanceOf(e.getCause(), AmazonServiceException.class);
                Throwables.propagate(e.getCause());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        return results;
    }

    @Override
    public List<ColumnStatisticsError> updatePartitionColumnStatistics(
            String dbName,
            String tableName,
            List<String> partitionValues,
            List<ColumnStatistics> columnStatistics) {

        List<List<ColumnStatistics>> statisticsListPaged = Lists.partition(columnStatistics, UPDATE_COLUMNS_STAT_MAX_SIZE);
        List<Future<UpdateColumnStatisticsForPartitionResult>> pagedResult = new ArrayList<>();
        for (List<ColumnStatistics> statList : statisticsListPaged) {
            UpdateColumnStatisticsForPartitionRequest request = new UpdateColumnStatisticsForPartitionRequest()
                    .withCatalogId(catalogId)
                    .withDatabaseName(dbName)
                    .withTableName(tableName)
                    .withPartitionValues(partitionValues)
                    .withColumnStatisticsList(statList);
            pagedResult.add(GLUE_METASTORE_DELEGATE_THREAD_POOL.submit(new Callable<UpdateColumnStatisticsForPartitionResult>() {
                @Override
                public UpdateColumnStatisticsForPartitionResult call() throws Exception {
                    return glueClient.updateColumnStatisticsForPartition(request);
                }
            }));
        }
        // Waiting for calls to finish. Will fail the call if one of the future task fails
        List<ColumnStatisticsError> columnStatisticsErrors = new ArrayList<>();
        try {
            for (Future<UpdateColumnStatisticsForPartitionResult> page : pagedResult) {
                Optional.ofNullable(page.get().getErrors()).ifPresent(error -> columnStatisticsErrors.addAll(error));
            }
        } catch (ExecutionException e) {
            Throwables.propagateIfInstanceOf(e.getCause(), AmazonServiceException.class);
            Throwables.propagate(e.getCause());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return columnStatisticsErrors;
    }

    @Override
    public List<ColumnStatisticsError> updateTableColumnStatistics(
            String dbName,
            String tableName,
            List<ColumnStatistics> columnStatistics) {

        List<List<ColumnStatistics>> statisticsListPaged = Lists.partition(columnStatistics, UPDATE_COLUMNS_STAT_MAX_SIZE);
        List<Future<UpdateColumnStatisticsForTableResult>> pagedResult = new ArrayList<>();
        for (List<ColumnStatistics> statList : statisticsListPaged) {
            UpdateColumnStatisticsForTableRequest request = new UpdateColumnStatisticsForTableRequest()
                    .withCatalogId(catalogId)
                    .withDatabaseName(dbName)
                    .withTableName(tableName)
                    .withColumnStatisticsList(statList);
            pagedResult.add(GLUE_METASTORE_DELEGATE_THREAD_POOL.submit(new Callable<UpdateColumnStatisticsForTableResult>() {
                @Override
                public UpdateColumnStatisticsForTableResult call() throws Exception {
                    return glueClient.updateColumnStatisticsForTable(request);
                }
            }));
        }

        // Waiting for calls to finish. Will fail the call if one of the future task fails
        List<ColumnStatisticsError> columnStatisticsErrors = new ArrayList<>();
        try {
            for (Future<UpdateColumnStatisticsForTableResult> page : pagedResult) {
                Optional.ofNullable(page.get().getErrors()).ifPresent(error -> columnStatisticsErrors.addAll(error));
            }
        } catch (ExecutionException e) {
            Throwables.propagateIfInstanceOf(e.getCause(), AmazonServiceException.class);
            Throwables.propagate(e.getCause());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return columnStatisticsErrors;
    }
}