StreamingJobUtils.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.util;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.FeConstants;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.jdbc.client.JdbcClient;
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
import org.apache.doris.datasource.jdbc.client.JdbcMySQLClient;
import org.apache.doris.job.common.DataSourceType;
import org.apache.doris.job.common.LoadConstants;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.offset.jdbc.split.SnapshotSplit;
import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo;
import org.apache.doris.nereids.trees.plans.commands.info.DistributionDescriptor;
import org.apache.doris.nereids.trees.plans.commands.info.PartitionTableInfo;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.ResultRow;
import org.apache.doris.system.Backend;
import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TUniqueId;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.text.StringSubstitutor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
@Log4j2
public class StreamingJobUtils {
public static final String INTERNAL_STREAMING_JOB_META_TABLE_NAME = "streaming_job_meta";
public static final String FULL_QUALIFIED_META_TBL_NAME = InternalCatalog.INTERNAL_CATALOG_NAME
+ "." + FeConstants.INTERNAL_DB_NAME + "." + INTERNAL_STREAMING_JOB_META_TABLE_NAME;
private static final String CREATE_META_TABLE = "CREATE TABLE %s(\n"
+ "id int,\n"
+ "job_id bigint,\n"
+ "table_name string,\n"
+ "chunk_list json\n"
+ ")\n"
+ "UNIQUE KEY(id, job_id)\n"
+ "DISTRIBUTED BY HASH(job_id)\n"
+ "BUCKETS 2\n"
+ "PROPERTIES ('replication_num' = '1')"; // todo: modify replication num like statistic sys tbl
private static final String BATCH_INSERT_INTO_META_TABLE_TEMPLATE =
"INSERT INTO " + FULL_QUALIFIED_META_TBL_NAME + " values";
private static final String INSERT_INTO_META_TABLE_TEMPLATE =
"('${id}', '${job_id}', '${table_name}', '${chunk_list}')";
private static final String SELECT_SPLITS_TABLE_TEMPLATE =
"SELECT table_name, chunk_list from " + FULL_QUALIFIED_META_TBL_NAME + " WHERE job_id='%s' ORDER BY id ASC";
private static final ObjectMapper objectMapper = new ObjectMapper();
public static void createMetaTableIfNotExist() throws Exception {
Optional<Database> optionalDatabase =
Env.getCurrentEnv().getInternalCatalog()
.getDb(FeConstants.INTERNAL_DB_NAME);
if (!optionalDatabase.isPresent()) {
// should not happen
throw new JobException("Internal database does not exist");
}
Database database = optionalDatabase.get();
Table t = database.getTableNullable(FULL_QUALIFIED_META_TBL_NAME);
if (t == null) {
executeInsert(String.format(CREATE_META_TABLE, FULL_QUALIFIED_META_TBL_NAME));
}
// double check
t = database.getTableNullable(INTERNAL_STREAMING_JOB_META_TABLE_NAME);
if (t == null) {
throw new JobException(String.format("Table %s doesn't exist", FULL_QUALIFIED_META_TBL_NAME));
}
}
public static Map<String, List<SnapshotSplit>> restoreSplitsToJob(Long jobId) throws IOException {
List<ResultRow> resultRows;
String sql = String.format(SELECT_SPLITS_TABLE_TEMPLATE, jobId);
try (AutoCloseConnectContext context
= new AutoCloseConnectContext(buildConnectContext())) {
StmtExecutor stmtExecutor = new StmtExecutor(context.connectContext, sql);
resultRows = stmtExecutor.executeInternalQuery();
}
Map<String, List<SnapshotSplit>> tableSplits = new LinkedHashMap<>();
for (ResultRow row : resultRows) {
String tableName = row.get(0);
String chunkListStr = row.get(1);
List<SnapshotSplit> splits =
new ArrayList<>(Arrays.asList(objectMapper.readValue(chunkListStr, SnapshotSplit[].class)));
tableSplits.put(tableName, splits);
}
return tableSplits;
}
public static void insertSplitsToMeta(Long jobId, Map<String, List<SnapshotSplit>> tableSplits) throws Exception {
List<String> values = new ArrayList<>();
int index = 1;
for (Map.Entry<String, List<SnapshotSplit>> entry : tableSplits.entrySet()) {
Map<String, String> params = new HashMap<>();
params.put("id", index + "");
params.put("job_id", jobId + "");
params.put("table_name", entry.getKey());
params.put("chunk_list", objectMapper.writeValueAsString(entry.getValue()));
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(INSERT_INTO_META_TABLE_TEMPLATE);
values.add(sql);
index++;
}
batchInsert(values);
}
private static void batchInsert(List<String> values) throws Exception {
if (values.isEmpty()) {
return;
}
StringBuilder query = new StringBuilder(BATCH_INSERT_INTO_META_TABLE_TEMPLATE);
for (int i = 0; i < values.size(); i++) {
query.append(values.get(i));
if (i + 1 != values.size()) {
query.append(",");
} else {
query.append(";");
}
}
executeInsert(query.toString());
}
private static void executeInsert(String sql) throws Exception {
try (AutoCloseConnectContext context
= new AutoCloseConnectContext(buildConnectContext())) {
StmtExecutor stmtExecutor = new StmtExecutor(context.connectContext, sql);
stmtExecutor.execute();
}
}
private static ConnectContext buildConnectContext() {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setCurrentUserIdentity(UserIdentity.ADMIN);
ctx.getState().reset();
ctx.getState().setInternal(true);
ctx.getState().setNereids(true);
ctx.setThreadLocalInfo();
UUID uuid = UUID.randomUUID();
TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
ctx.setQueryId(queryId);
return ctx;
}
private static JdbcClient getJdbcClient(DataSourceType sourceType, Map<String, String> properties)
throws JobException {
JdbcClientConfig config = new JdbcClientConfig();
config.setCatalog(sourceType.name());
config.setUser(properties.get(LoadConstants.USER));
config.setPassword(properties.get(LoadConstants.PASSWORD));
config.setDriverClass(properties.get(LoadConstants.DRIVER_CLASS));
config.setDriverUrl(properties.get(LoadConstants.DRIVER_URL));
config.setJdbcUrl(properties.get(LoadConstants.JDBC_URL));
switch (sourceType) {
case MYSQL:
JdbcClient client = JdbcMySQLClient.createJdbcClient(config);
return client;
default:
throw new JobException("Unsupported source type " + sourceType);
}
}
public static Backend selectBackend(Long jobId) throws JobException {
Backend backend = null;
BeSelectionPolicy policy = null;
policy = new BeSelectionPolicy.Builder()
.setEnableRoundRobin(true)
.needLoadAvailable().build();
List<Long> backendIds;
backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, -1);
if (backendIds.isEmpty()) {
throw new JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
}
// jobid % backendSize
long index = backendIds.get(jobId.intValue() % backendIds.size());
backend = Env.getCurrentSystemInfo().getBackend(index);
if (backend == null) {
throw new JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
}
return backend;
}
public static List<CreateTableCommand> generateCreateTableCmds(String targetDb, DataSourceType sourceType,
Map<String, String> properties, Map<String, String> targetProperties)
throws JobException {
List<CreateTableCommand> createtblCmds = new ArrayList<>();
String includeTables = properties.get(LoadConstants.INCLUDE_TABLES);
String excludeTables = properties.get(LoadConstants.EXCLUDE_TABLES);
List<String> includeTablesList = new ArrayList<>();
if (includeTables != null) {
includeTablesList = Arrays.asList(includeTables.split(","));
}
String database = properties.get(LoadConstants.DATABASE);
JdbcClient jdbcClient = getJdbcClient(sourceType, properties);
List<String> tablesNameList = jdbcClient.getTablesNameList(database);
if (tablesNameList.isEmpty()) {
throw new JobException("No tables found in database " + database);
}
Map<String, String> tableCreateProperties = getTableCreateProperties(targetProperties);
for (String table : tablesNameList) {
if (!includeTablesList.isEmpty() && !includeTablesList.contains(table)) {
log.info("Skip table {} in database {} as it does not in include_tables {}", table, database,
includeTables);
continue;
}
if (excludeTables != null && excludeTables.contains(table)) {
log.info("Skip table {} in database {} as it in exclude_tables {}", table, database,
excludeTables);
continue;
}
List<Column> columns = jdbcClient.getColumnsFromJdbc(database, table);
List<String> primaryKeys = jdbcClient.getPrimaryKeys(database, table);
if (primaryKeys.isEmpty()) {
primaryKeys.add(columns.get(0).getName());
log.info("table {} no primary key, use first column {} to primary key", table,
columns.get(0).getName());
}
// Convert Column to ColumnDefinition
List<ColumnDefinition> columnDefinitions = columns.stream().map(col -> {
DataType dataType = DataType.fromCatalogType(col.getType());
return new ColumnDefinition(col.getName(), dataType, col.isAllowNull(), col.getComment());
}).collect(Collectors.toList());
// Create DistributionDescriptor
DistributionDescriptor distribution = new DistributionDescriptor(
true, // isHash
true, // isAutoBucket
FeConstants.default_bucket_num,
primaryKeys
);
// Create CreateTableInfo
CreateTableInfo createtblInfo = new CreateTableInfo(
true, // ifNotExists
false, // isExternal
false, // isTemp
InternalCatalog.INTERNAL_CATALOG_NAME, // ctlName
targetDb, // dbName
table, // tableName
columnDefinitions, // columns
ImmutableList.of(), // indexes
"olap", // engineName
KeysType.UNIQUE_KEYS, // keysType
primaryKeys, // keys
"", // comment
PartitionTableInfo.EMPTY, // partitionTableInfo
distribution, // distribution
ImmutableList.of(), // rollups
tableCreateProperties, // properties
ImmutableMap.of(), // extProperties
ImmutableList.of() // clusterKeyColumnNames
);
CreateTableCommand createtblCmd = new CreateTableCommand(Optional.empty(), createtblInfo);
createtblCmds.add(createtblCmd);
}
return createtblCmds;
}
private static Map<String, String> getTableCreateProperties(Map<String, String> properties) {
final Map<String, String> tableCreateProps = new HashMap<>();
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (entry.getKey().startsWith(LoadConstants.TABLE_PROPS_PREFIX)) {
String subKey = entry.getKey().substring(LoadConstants.TABLE_PROPS_PREFIX.length());
tableCreateProps.put(subKey, entry.getValue());
}
}
return tableCreateProps;
}
}