PaimonTableCache.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.paimon;

import com.google.common.base.Objects;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PaimonTableCache {
    private static final Logger LOG = LoggerFactory.getLogger(PaimonTableCache.class);
    public static final long maxExternalSchemaCacheNum = 50;
    public static final long externalCacheExpireTimeMinutesAfterAccess = 100;
    private static Optional<Configuration> configurationOpt = Optional.empty();

    private static LoadingCache<PaimonTableCacheKey, TableExt> tableCache = CacheBuilder.newBuilder()
            .maximumSize(maxExternalSchemaCacheNum)
            .expireAfterAccess(externalCacheExpireTimeMinutesAfterAccess, TimeUnit.MINUTES)
            .build(new CacheLoader<PaimonTableCacheKey, TableExt>() {
                @Override
                public TableExt load(PaimonTableCacheKey key) {
                    return loadTable(key);
                }
            });

    private static TableExt loadTable(PaimonTableCacheKey key) {
        try {
            LOG.info("load table:{}", key);
            Catalog catalog = createCatalog(key.getPaimonOptionParams(), key.getHadoopOptionParams());
            Table table = catalog.getTable(Identifier.create(key.getDbName(), key.getTblName()));
            return new TableExt(table, System.currentTimeMillis());
        } catch (Catalog.TableNotExistException e) {
            LOG.warn("failed to create paimon table ", e);
            throw new RuntimeException(e);
        }
    }

    private static Catalog createCatalog(
            Map<String, String> paimonOptionParams,
            Map<String, String> hadoopOptionParams) {
        Options options = new Options();
        paimonOptionParams.forEach(options::set);
        Configuration configuration;
        if (configurationOpt.isPresent()) {
            configuration = new Configuration(configurationOpt.get());
            hadoopOptionParams.forEach(configuration::set);
        } else {
            configuration = new Configuration();
            hadoopOptionParams.forEach(configuration::set);
            String hadoopConfigPath = options.getString("hadoop-conf-dir", (String) null);
            if (hadoopConfigPath != null) {
                String coreSiteFile = String.format("%score-site.xml", hadoopConfigPath);
                Path coreSitePath = new Path(coreSiteFile);
                String hdfsSiteFile = String.format("%shdfs-site.xml", hadoopConfigPath);
                Path hdfsSitePath = new Path(hdfsSiteFile);
                configuration.addResource(coreSitePath);
                configuration.addResource(hdfsSitePath);
            }
            String hiveConfigPath = options.getString("hive-conf-dir", (String) null);
            if (hiveConfigPath != null) {
                String hiveSiteFile = String.format("%shive-site.xml", hiveConfigPath);
                Path hiveSitePath = new Path(hiveSiteFile);
                configuration.addResource(hiveSitePath);
            }
            configurationOpt = Optional.of(new Configuration(configuration));
        }
        paimonOptionParams.forEach(configuration::set);
        CatalogContext context = CatalogContext.create(options, configuration);
        return CatalogFactory.createCatalog(context);
    }

    public static void invalidateTableCache(PaimonTableCacheKey key) {
        tableCache.invalidate(key);
    }

    public static TableExt getTable(PaimonTableCacheKey key) {
        try {
            return tableCache.get(key);
        } catch (ExecutionException e) {
            throw new RuntimeException("failed to get table for:" + key);
        }
    }

    public static class TableExt {
        private Table table;
        private long createTime;

        public TableExt(Table table, long createTime) {
            this.table = table;
            this.createTime = createTime;
        }

        public Table getTable() {
            return table;
        }

        public long getCreateTime() {
            return createTime;
        }
    }

    public static class PaimonTableCacheKey {
        private long ctlId;
        private long dbId;
        private long tblId;
        private Map<String, String> paimonOptionParams;
        private Map<String, String> hadoopOptionParams;
        private String dbName;
        private String tblName;

        public PaimonTableCacheKey(long ctlId, long dbId, long tblId,
                Map<String, String> paimonOptionParams,
                Map<String, String> hadoopOptionParams,
                String dbName, String tblName) {
            this.ctlId = ctlId;
            this.dbId = dbId;
            this.tblId = tblId;
            this.paimonOptionParams = paimonOptionParams;
            this.hadoopOptionParams = hadoopOptionParams;
            this.dbName = dbName;
            this.tblName = tblName;
        }

        public long getCtlId() {
            return ctlId;
        }

        public long getDbId() {
            return dbId;
        }

        public long getTblId() {
            return tblId;
        }

        public Map<String, String> getPaimonOptionParams() {
            return paimonOptionParams;
        }

        public Map<String, String> getHadoopOptionParams() {
            return hadoopOptionParams;
        }

        public String getDbName() {
            return dbName;
        }

        public String getTblName() {
            return tblName;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            PaimonTableCacheKey that = (PaimonTableCacheKey) o;
            return ctlId == that.ctlId && dbId == that.dbId && tblId == that.tblId;
        }

        @Override
        public int hashCode() {
            return Objects.hashCode(ctlId, dbId, tblId);
        }

        @Override
        public String toString() {
            return "PaimonTableCacheKey{"
                    + "ctlId=" + ctlId
                    + ", dbId=" + dbId
                    + ", tblId=" + tblId
                    + ", paimonOptionParams=" + paimonOptionParams
                    + ", hadoopOptionParams=" + hadoopOptionParams
                    + ", dbName='" + dbName + '\''
                    + ", tblName='" + tblName + '\''
                    + '}';
        }
    }
}