ColumnIdFlushDaemon.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.alter.AlterLightSchChangeHelper;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.persist.AlterLightSchemaChangeInfo;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
/**
* note(tsy): this class is temporary, make table before 1.2 to enable light schema change
*/
public class ColumnIdFlushDaemon extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(ColumnIdFlushDaemon.class);
/**
* db name -> (tbl name -> status)
*/
private final Map<String, Map<String, FlushStatus>> resultCollector;
private final ReadWriteLock rwLock;
private final BiConsumer<Database, OlapTable> flushFunc;
public ColumnIdFlushDaemon() {
super("colum-id-flusher", TimeUnit.HOURS.toMillis(1));
resultCollector = Maps.newHashMap();
rwLock = new ReentrantReadWriteLock();
if (Config.enable_convert_light_weight_schema_change) {
flushFunc = this::doFlush;
} else {
flushFunc = (db, table) -> record(db.getFullName(), table.getName(), FlushStatus.init());
}
}
@Override
protected void runAfterCatalogReady() {
flush();
}
private void flush() {
List<Database> dbs = Env.getCurrentEnv().getInternalCatalog().getDbs();
for (Database db : dbs) {
rwLock.writeLock().lock();
try {
db.getTables()
.stream()
.filter(table -> table instanceof OlapTable)
.map(table -> (OlapTable) table)
.filter(olapTable -> !olapTable.getTableProperty().getUseSchemaLightChange())
.forEach(table -> flushFunc.accept(db, table));
} finally {
rwLock.writeLock().unlock();
}
try {
// avoid too often to call be
sleep(3000);
} catch (InterruptedException ignore) {
// do nothing
}
}
}
private void doFlush(Database db, OlapTable table) {
record(db.getFullName(), table.getName(), FlushStatus.init());
AlterLightSchChangeHelper schChangeHelper = new AlterLightSchChangeHelper(db, table);
AlterLightSchemaChangeInfo changeInfo;
try {
changeInfo = schChangeHelper.callForColumnsInfo();
} catch (IllegalStateException e) {
record(db.getFullName(), table.getName(), FlushStatus.failed(e.getMessage()));
return;
}
table.writeLock();
try {
if (table.getTableProperty().getUseSchemaLightChange()) {
removeRecord(db.getFullName(), table.getName());
return;
}
schChangeHelper.updateTableMeta(changeInfo);
Env.getCurrentEnv().getEditLog().logAlterLightSchemaChange(changeInfo);
LOG.info("successfully enable `light_schema_change`, db={}, tbl={}",
db.getFullName(), table.getName());
removeRecord(db.getFullName(), table.getName());
} catch (IllegalStateException e) {
record(db.getFullName(), table.getName(), FlushStatus.failed(e.getMessage()));
} finally {
table.writeUnlock();
}
}
private void record(String dbName, String tableName, FlushStatus status) {
resultCollector.putIfAbsent(dbName, Maps.newHashMap());
Map<String, FlushStatus> tableToStatus = resultCollector.get(dbName);
tableToStatus.put(tableName, status);
}
private void removeRecord(String dbName, String tableName) {
Map<String, FlushStatus> tableToStatus;
if (resultCollector.containsKey(dbName)
&& (tableToStatus = resultCollector.get(dbName)).containsKey(tableName)) {
tableToStatus.remove(tableName);
if (tableToStatus.isEmpty()) {
resultCollector.remove(dbName);
}
}
}
public Map<String, Map<String, FlushStatus>> getResultCollector() {
return resultCollector;
}
public void readLock() {
rwLock.readLock().lock();
}
public void readUnlock() {
rwLock.readLock().unlock();
}
public static class FlushStatus {
private FlushStatus() {
this.success = true;
this.msg = "Waiting to be converted";
}
private FlushStatus(String msg) {
this.success = false;
this.msg = msg;
}
public static FlushStatus init() {
return new FlushStatus();
}
public static FlushStatus failed(String reason) {
return new FlushStatus(reason);
}
public boolean isSuccess() {
return success;
}
public String getMsg() {
return msg;
}
private final boolean success;
private final String msg;
}
}