AlterLightSchChangeHelper.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.alter;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.persist.AlterLightSchemaChangeInfo;
import org.apache.doris.proto.InternalService.PFetchColIdsRequest;
import org.apache.doris.proto.InternalService.PFetchColIdsRequest.Builder;
import org.apache.doris.proto.InternalService.PFetchColIdsRequest.PFetchColIdParam;
import org.apache.doris.proto.InternalService.PFetchColIdsResponse;
import org.apache.doris.proto.InternalService.PFetchColIdsResponse.PFetchColIdsResultEntry;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* For alter light_schema_change table property
*/
public class AlterLightSchChangeHelper {
private static final Logger LOG = LogManager.getLogger(AlterLightSchChangeHelper.class);
private static final long DEFAULT_RPC_TIMEOUT = 900L;
private final Database db;
private final OlapTable olapTable;
private final long rpcTimoutMs;
public AlterLightSchChangeHelper(Database db, OlapTable olapTable) {
this.db = db;
this.olapTable = olapTable;
ConnectContext connectContext = ConnectContext.get();
if (connectContext == null) {
rpcTimoutMs = DEFAULT_RPC_TIMEOUT * 1000L;
} else {
rpcTimoutMs = connectContext.getExecTimeoutS() * 1000L;
}
}
/**
* 1. rpc read columnUniqueIds from BE
* 2. refresh table metadata
* 3. write edit log
*/
public void enableLightSchemaChange() throws IllegalStateException {
final AlterLightSchemaChangeInfo info = callForColumnsInfo();
updateTableMeta(info);
Env.getCurrentEnv().getEditLog().logAlterLightSchemaChange(info);
LOG.info("successfully enable `light_schema_change`, db={}, tbl={}", db.getFullName(), olapTable.getName());
}
/**
* This method creates RPC params for several BEs which target tablets lie on.
* Each param contains several target indexIds and each indexId is mapped to all the tablet ids on it.
* We should pass all tablet id for a consistency check of index schema.
*
* @return beId -> set(indexId -> tabletIds)
*/
private Map<Long, PFetchColIdsRequest> initParams() {
// params: indexId -> tabletIds
Map<Long, Map<Long, Set<Long>>> beIdToRequestInfo = new HashMap<>();
final Collection<Partition> partitions = olapTable.getAllPartitions();
for (Partition partition : partitions) {
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
for (Tablet tablet : index.getTablets()) {
buildParams(index.getId(), tablet, beIdToRequestInfo);
}
}
}
// transfer to rpc params
final Map<Long, PFetchColIdsRequest> beIdToRequest = new HashMap<>();
beIdToRequestInfo.keySet().forEach(beId -> {
final Map<Long, Set<Long>> indexIdToTabletIds = beIdToRequestInfo.get(beId);
final Builder requestBuilder = PFetchColIdsRequest.newBuilder();
for (Long indexId : indexIdToTabletIds.keySet()) {
final PFetchColIdParam.Builder paramBuilder = PFetchColIdParam.newBuilder();
final PFetchColIdParam param = paramBuilder
.setIndexId(indexId)
.addAllTabletIds(indexIdToTabletIds.get(indexId))
.build();
requestBuilder.addParams(param);
}
beIdToRequest.put(beId, requestBuilder.build());
});
return beIdToRequest;
}
private void buildParams(Long indexId, Tablet tablet, Map<Long, Map<Long, Set<Long>>> beIdToRequestInfo) {
final List<Long> backendIds = tablet.getNormalReplicaBackendIds();
for (Long backendId : backendIds) {
beIdToRequestInfo.putIfAbsent(backendId, new HashMap<>());
final Map<Long, Set<Long>> indexIdToTabletId = beIdToRequestInfo.get(backendId);
indexIdToTabletId.putIfAbsent(indexId, new HashSet<>());
indexIdToTabletId.computeIfPresent(indexId, (idxId, set) -> {
set.add(tablet.getId());
return set;
});
}
}
/**
* @return contains indexIds to each tablet schema info which consists of columnName to corresponding
* column unique id pairs
* @throws IllegalStateException as a wrapper for rpc failures
*/
public AlterLightSchemaChangeInfo callForColumnsInfo()
throws IllegalStateException {
Map<Long, PFetchColIdsRequest> beIdToRequest = initParams();
Map<Long, Future<PFetchColIdsResponse>> beIdToRespFuture = new HashMap<>();
try {
for (Long beId : beIdToRequest.keySet()) {
final Backend backend = Env.getCurrentSystemInfo().getAllBackendsByAllCluster().get(beId);
final TNetworkAddress address =
new TNetworkAddress(Objects.requireNonNull(backend).getHost(), backend.getBrpcPort());
final Future<PFetchColIdsResponse> responseFuture = BackendServiceProxy.getInstance()
.getColumnIdsByTabletIds(address, beIdToRequest.get(beId));
beIdToRespFuture.put(beId, responseFuture);
}
} catch (RpcException | UserException e) {
throw new IllegalStateException("fetch columnIds RPC failed", e);
}
// wait for and get results
final long start = System.currentTimeMillis();
long timeoutMs = rpcTimoutMs;
final List<PFetchColIdsResponse> resultList = new ArrayList<>();
try {
for (Map.Entry<Long, Future<PFetchColIdsResponse>> entry : beIdToRespFuture.entrySet()) {
final PFetchColIdsResponse response = entry.getValue().get(timeoutMs, TimeUnit.MILLISECONDS);
if (response.getStatus().getStatusCode() != TStatusCode.OK.getValue()) {
throw new IllegalStateException(String.format("fail to get column info from be: %s, msg:%s",
entry.getKey(), response.getStatus().getErrorMsgs(0)));
}
resultList.add(response);
// refresh the timeout
final long now = System.currentTimeMillis();
final long deltaMs = now - start;
timeoutMs -= deltaMs;
Preconditions.checkState(timeoutMs >= 0,
"impossible state, timeout should happened");
}
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException("fetch columnIds RPC result failed: ", e);
} catch (TimeoutException e) {
throw new IllegalStateException("fetch columnIds RPC result timeout", e);
}
return compactToAlterLscInfo(resultList);
}
/**
* Since the result collected from several BEs may contain repeated indexes in distributed storage scenarios,
* we should do consistency check for the result for the same index, and get the unique result.
*/
private AlterLightSchemaChangeInfo compactToAlterLscInfo(List<PFetchColIdsResponse> resultList) {
final PFetchColIdsResponse.Builder builder = PFetchColIdsResponse.newBuilder();
Map<Long, Map<String, Integer>> indexIdToTabletInfo = new HashMap<>();
resultList.forEach(response -> {
for (PFetchColIdsResultEntry entry : response.getEntriesList()) {
final long indexId = entry.getIndexId();
if (!indexIdToTabletInfo.containsKey(indexId)) {
indexIdToTabletInfo.put(indexId, entry.getColNameToIdMap());
builder.addEntries(entry);
continue;
}
// check tablet schema info consistency
final Map<String, Integer> colNameToId = indexIdToTabletInfo.get(indexId);
Preconditions.checkState(colNameToId.equals(entry.getColNameToIdMap()),
"index: " + indexId + "got inconsistent schema in storage");
}
});
return new AlterLightSchemaChangeInfo(db.getId(), olapTable.getId(), indexIdToTabletInfo);
}
public void updateTableMeta(AlterLightSchemaChangeInfo info) throws IllegalStateException {
Preconditions.checkNotNull(info, "passed in info should be not null");
// update index-meta once and for all
// schema pair: <maxColId, columns>
final List<Pair<Integer, List<Column>>> schemaPairs = new ArrayList<>();
final List<Long> indexIds = new ArrayList<>();
info.getIndexIdToColumnInfo().forEach((indexId, colNameToId) -> {
final List<Column> columns = olapTable.getSchemaByIndexId(indexId, true);
Preconditions.checkState(columns.size() == colNameToId.size(),
"size mismatch for original columns meta and that in change info");
int maxColId = Column.COLUMN_UNIQUE_ID_INIT_VALUE;
final List<Column> newSchema = new ArrayList<>();
for (Column column : columns) {
final String columnName = column.getName();
final int columnId = Preconditions.checkNotNull(colNameToId.get(columnName),
"failed to fetch column id of column:{" + columnName + "}");
final Column newColumn = new Column(column);
newColumn.setUniqueId(columnId);
newSchema.add(newColumn);
maxColId = Math.max(columnId, maxColId);
}
schemaPairs.add(Pair.of(maxColId, newSchema));
indexIds.add(indexId);
});
Preconditions.checkState(schemaPairs.size() == indexIds.size(),
"impossible state, size of schemaPairs and indexIds should be the same");
// update index-meta once and for all
try {
for (int i = 0; i < indexIds.size(); i++) {
final MaterializedIndexMeta indexMeta = olapTable.getIndexMetaByIndexId(indexIds.get(i));
final Pair<Integer, List<Column>> schemaPair = schemaPairs.get(i);
indexMeta.setMaxColUniqueId(schemaPair.first);
indexMeta.setSchema(schemaPair.second);
}
} catch (IOException e) {
throw new IllegalStateException("fail to reset index schema", e);
}
// write table property
olapTable.setEnableLightSchemaChange(true);
}
}