FetchRemoteTabletSchemaUtil.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.util;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.proto.InternalService.PFetchRemoteSchemaRequest;
import org.apache.doris.proto.InternalService.PFetchRemoteSchemaResponse;
import org.apache.doris.proto.InternalService.PTabletsLocation;
import org.apache.doris.proto.OlapFile.ColumnPB;
import org.apache.doris.proto.OlapFile.TabletSchemaPB;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TStatusCode;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
// This class is used to pull the specified tablets' columns existing on the Backend (BE)
// including regular columns and columns decomposed by variants
public class FetchRemoteTabletSchemaUtil {
private static final Logger LOG = LogManager.getLogger(FetchRemoteTabletSchemaUtil.class);
private List<Tablet> remoteTablets;
private List<Column> tableColumns;
public FetchRemoteTabletSchemaUtil(List<Tablet> tablets) {
this.remoteTablets = tablets;
this.tableColumns = Lists.newArrayList();
}
public List<Column> fetch() {
// 1. Find which Backend (BE) servers the tablets are on
Preconditions.checkNotNull(remoteTablets);
Map<Long, Set<Long>> beIdToTabletId = Maps.newHashMap();
for (Tablet tablet : remoteTablets) {
for (Replica replica : tablet.getReplicas()) {
// only need alive replica
if (replica.isAlive()) {
Set<Long> tabletIds = beIdToTabletId.computeIfAbsent(
replica.getBackendIdWithoutException(), k -> Sets.newHashSet());
tabletIds.add(tablet.getId());
}
}
}
// 2. Randomly select 2 Backend (BE) servers to act as coordinators.
// Coordinator BE is responsible for collecting all table columns and returning to the FE.
// Two BE provide a retry opportunity with the second one in case the first attempt fails.
List<PTabletsLocation> locations = Lists.newArrayList();
List<Backend> coordinatorBackend = Lists.newArrayList();
for (Map.Entry<Long, Set<Long>> entry : beIdToTabletId.entrySet()) {
Long backendId = entry.getKey();
Set<Long> tabletIds = entry.getValue();
Backend backend = Env.getCurrentEnv().getCurrentSystemInfo().getBackend(backendId);
LOG.debug("fetch schema from coord backend {}, sample tablets count {}",
backend.getId(), tabletIds.size());
// only need alive be
if (!backend.isAlive()) {
continue;
}
coordinatorBackend.add(backend);
PTabletsLocation.Builder locationBuilder = PTabletsLocation.newBuilder()
.setHost(backend.getHost())
.setBrpcPort(backend.getBrpcPort());
PTabletsLocation location = locationBuilder.addAllTabletId(tabletIds).build();
locations.add(location);
}
// pick 2 random coordinator
Collections.shuffle(coordinatorBackend);
if (!coordinatorBackend.isEmpty()) {
coordinatorBackend = coordinatorBackend.subList(0, Math.min(2, coordinatorBackend.size()));
LOG.debug("pick coordinator backend {}", coordinatorBackend.get(0));
}
PFetchRemoteSchemaRequest.Builder requestBuilder = PFetchRemoteSchemaRequest.newBuilder()
.addAllTabletLocation(locations)
.setIsCoordinator(true);
// 3. Send rpc to coordinatorBackend util succeed or retry
for (Backend be : coordinatorBackend) {
try {
PFetchRemoteSchemaRequest request = requestBuilder.build();
Future<PFetchRemoteSchemaResponse> future = BackendServiceProxy.getInstance()
.fetchRemoteTabletSchemaAsync(be.getBrpcAddress(), request);
PFetchRemoteSchemaResponse response = null;
try {
response = future.get(
ConnectContext.get().getSessionVariable().fetchRemoteSchemaTimeoutSeconds, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode());
String errMsg;
if (code != TStatusCode.OK) {
if (!response.getStatus().getErrorMsgsList().isEmpty()) {
errMsg = response.getStatus().getErrorMsgsList().get(0);
} else {
errMsg = "fetchRemoteTabletSchemaAsync failed. backend address: "
+ be.getHost() + " : " + be.getBrpcPort();
}
throw new RpcException(be.getHost(), errMsg);
}
fillColumns(response);
return tableColumns;
} catch (AnalysisException e) {
// continue to get result
LOG.warn(e);
} catch (InterruptedException e) {
// continue to get result
LOG.warn("fetch remote schema future get interrupted Exception");
} catch (TimeoutException e) {
future.cancel(true);
// continue to get result
LOG.warn("fetch remote schema result timeout, addr {}", be.getBrpcAddress());
}
} catch (RpcException e) {
LOG.warn("fetch remote schema result rpc exception {}, e {}", be.getBrpcAddress(), e);
} catch (ExecutionException e) {
LOG.warn("fetch remote schema ExecutionException, addr {}, e {}", be.getBrpcAddress(), e);
}
}
return tableColumns;
}
private void fillColumns(PFetchRemoteSchemaResponse response) throws AnalysisException {
TabletSchemaPB schemaPB = response.getMergedSchema();
for (ColumnPB columnPB : schemaPB.getColumnList()) {
try {
Column remoteColumn = initColumnFromPB(columnPB);
tableColumns.add(remoteColumn);
} catch (Exception e) {
throw new AnalysisException("column default value to string failed");
}
}
// sort the columns
Collections.sort(tableColumns, new Comparator<Column>() {
@Override
public int compare(Column c1, Column c2) {
return c1.getName().compareTo(c2.getName());
}
});
}
private Column initColumnFromPB(ColumnPB column) throws AnalysisException {
try {
AggregateType aggType = AggregateType.getAggTypeFromAggName(column.getAggregation());
Type type = Type.getTypeFromTypeName(column.getType());
String columnName = column.getName();
boolean isKey = column.getIsKey();
boolean isNullable = column.getIsNullable();
String defaultValue = column.getDefaultValue().toString("UTF-8");
if (defaultValue.equals("")) {
defaultValue = null;
}
if (isKey) {
aggType = null;
}
do {
if (type.isArrayType()) {
List<ColumnPB> childColumn = column.getChildrenColumnsList();
if (childColumn == null || childColumn.size() != 1) {
break;
}
Column child = initColumnFromPB(childColumn.get(0));
type = new ArrayType(child.getType());
} else if (type.isMapType()) {
List<ColumnPB> childColumn = column.getChildrenColumnsList();
if (childColumn == null || childColumn.size() != 2) {
break;
}
Column keyChild = initColumnFromPB(childColumn.get(0));
Column valueChild = initColumnFromPB(childColumn.get(1));
type = new MapType(keyChild.getType(), valueChild.getType());
} else if (type.isStructType()) {
List<ColumnPB> childColumn = column.getChildrenColumnsList();
if (childColumn == null) {
break;
}
List<Type> childTypes = Lists.newArrayList();
for (ColumnPB childPB : childColumn) {
childTypes.add(initColumnFromPB(childPB).getType());
}
type = new StructType(childTypes);
}
} while (false);
return new Column(columnName, type, isKey, aggType, isNullable,
defaultValue, "remote schema");
} catch (Exception e) {
throw new AnalysisException("default value to string failed");
}
}
}