BinlogConfigCache.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.binlog;
import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class BinlogConfigCache {
private static final Logger LOG = LogManager.getLogger(BinlogConfigCache.class);
private Map<Long, BinlogConfig> dbTableBinlogEnableMap; // db or table all use id
private Map<Long, TableIf.TableType> tableTypeMap;
private ReentrantReadWriteLock lock;
public BinlogConfigCache() {
dbTableBinlogEnableMap = new HashMap<Long, BinlogConfig>();
tableTypeMap = new HashMap<Long, TableIf.TableType>();
lock = new ReentrantReadWriteLock();
}
// Get the binlog config of the specified db, return null if no such database
// exists.
public BinlogConfig getDBBinlogConfig(long dbId) {
lock.readLock().lock();
BinlogConfig binlogConfig = dbTableBinlogEnableMap.get(dbId);
lock.readLock().unlock();
if (binlogConfig != null) {
return binlogConfig;
}
lock.writeLock().lock();
try {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
LOG.warn("db not found. dbId: {}", dbId);
return null;
}
binlogConfig = db.getBinlogConfig();
dbTableBinlogEnableMap.put(dbId, binlogConfig);
} finally {
lock.writeLock().unlock();
}
return binlogConfig;
}
public boolean isEnableDB(long dbId) {
BinlogConfig dBinlogConfig = getDBBinlogConfig(dbId);
if (dBinlogConfig == null) {
return false;
}
return dBinlogConfig.isEnable();
}
public long getDBTtlSeconds(long dbId) {
BinlogConfig dBinlogConfig = getDBBinlogConfig(dbId);
if (dBinlogConfig == null) {
return BinlogConfig.TTL_SECONDS;
}
return dBinlogConfig.getTtlSeconds();
}
public BinlogConfig getTableBinlogConfig(long dbId, long tableId) {
lock.readLock().lock();
BinlogConfig tableBinlogConfig = dbTableBinlogEnableMap.get(tableId);
lock.readLock().unlock();
if (tableBinlogConfig != null) {
return tableBinlogConfig;
}
lock.writeLock().lock();
try {
loadTableBinlogConfig(dbId, tableId);
return dbTableBinlogEnableMap.get(tableId); // null if not exists
} catch (Exception e) {
LOG.warn("fail to get table. db: {}, table id: {}", dbId, tableId);
return null;
} finally {
lock.writeLock().unlock();
}
}
private void loadTableBinlogConfig(long dbId, long tableId) {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
LOG.warn("db not found. dbId: {}", dbId);
return;
}
Table table = db.getTableNullable(tableId);
if (table == null) {
LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId);
return;
}
if (!(table instanceof OlapTable)) { // MTMV is an instance of OlapTable
LOG.warn("table is not olap table. db: {}, table id: {}", db.getFullName(), tableId);
return;
}
OlapTable olapTable = (OlapTable) table;
// get table binlog config, when table modify binlogConfig
// it create a new binlog, not update inplace, so we don't need to clone
// binlogConfig
dbTableBinlogEnableMap.put(tableId, olapTable.getBinlogConfig());
tableTypeMap.put(tableId, table.getType());
}
public boolean isAsyncMvTable(long dbId, long tableId) {
lock.readLock().lock();
TableIf.TableType tableType = tableTypeMap.get(tableId);
lock.readLock().unlock();
if (tableType != null) {
return tableType == TableIf.TableType.MATERIALIZED_VIEW;
}
lock.writeLock().lock();
try {
loadTableBinlogConfig(dbId, tableId);
return tableTypeMap.get(tableId) == TableIf.TableType.MATERIALIZED_VIEW;
} finally {
lock.writeLock().unlock();
}
}
public boolean isTemporaryTable(long dbId, long tableId) {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
LOG.warn("db not found. dbId: {}", dbId);
return false;
}
Table table = db.getTableNullable(tableId);
if (table == null) {
LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId);
return false;
}
return table.isTemporary();
}
public boolean isEnableTable(long dbId, long tableId) {
BinlogConfig tableBinlogConfig = getTableBinlogConfig(dbId, tableId);
if (tableBinlogConfig == null) {
return false;
}
return tableBinlogConfig.isEnable();
}
public long getTableTtlSeconds(long dbId, long tableId) {
BinlogConfig tableBinlogConfig = getTableBinlogConfig(dbId, tableId);
if (tableBinlogConfig == null) {
return BinlogConfig.TTL_SECONDS;
}
return tableBinlogConfig.getTtlSeconds();
}
public void remove(long id) {
lock.writeLock().lock();
try {
dbTableBinlogEnableMap.remove(id);
} finally {
lock.writeLock().unlock();
}
}
}