FollowerColumnSender.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.statistics;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.system.Frontend;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TQueryColumn;
import org.apache.doris.thrift.TSyncQueryColumns;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.Set;
public class FollowerColumnSender extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(FollowerColumnSender.class);
public static final long INTERVAL = 60000;
public FollowerColumnSender() {
super("Follower Column Sender", INTERVAL);
}
@Override
protected void runAfterCatalogReady() {
if (!StatisticsUtil.enableAutoAnalyze()) {
return;
}
if (Env.getCurrentEnv().isMaster()) {
return;
}
if (Env.isCheckpointThread()) {
return;
}
send();
}
protected void send() {
if (Env.getCurrentEnv().isMaster()) {
return;
}
Env currentEnv = Env.getCurrentEnv();
AnalysisManager analysisManager = currentEnv.getAnalysisManager();
if (analysisManager.highPriorityColumns.isEmpty() && analysisManager.midPriorityColumns.isEmpty()) {
return;
}
Set<TQueryColumn> highs = getNeedAnalyzeColumns(analysisManager.highPriorityColumns);
Set<TQueryColumn> mids = getNeedAnalyzeColumns(analysisManager.midPriorityColumns);
mids.removeAll(highs);
TSyncQueryColumns queryColumns = new TSyncQueryColumns();
queryColumns.highPriorityColumns = new ArrayList<>(highs);
queryColumns.midPriorityColumns = new ArrayList<>(mids);
Frontend master = null;
try {
InetSocketAddress masterAddress = currentEnv.getHaProtocol().getLeader();
for (Frontend fe : currentEnv.getFrontends(FrontendNodeType.FOLLOWER)) {
InetSocketAddress socketAddress = new InetSocketAddress(fe.getHost(), fe.getEditLogPort());
if (socketAddress.equals(masterAddress)) {
master = fe;
break;
}
}
} catch (Exception e) {
LOG.warn("Failed to find master FE.", e);
return;
}
if (master == null) {
LOG.warn("No master found in cluster.");
return;
}
TNetworkAddress address = new TNetworkAddress(master.getHost(), master.getRpcPort());
FrontendService.Client client = null;
try {
client = ClientPool.frontendPool.borrowObject(address);
client.syncQueryColumns(queryColumns);
LOG.info("Send {} high priority columns and {} mid priority columns to master.",
highs.size(), mids.size());
} catch (Throwable t) {
LOG.warn("Failed to sync stats to master: {}", address, t);
} finally {
if (client != null) {
ClientPool.frontendPool.returnObject(address, client);
}
}
}
protected Set<TQueryColumn> getNeedAnalyzeColumns(Queue<QueryColumn> columnQueue) {
Set<TQueryColumn> ret = Sets.newHashSet();
TableIf table;
int size = columnQueue.size();
for (int i = 0; i < size; i++) {
QueryColumn column = columnQueue.poll();
if (column == null) {
continue;
}
try {
table = StatisticsUtil.findTable(column.catalogId, column.dbId, column.tblId);
} catch (Exception e) {
LOG.warn("Failed to find table for column {}", column.colName);
continue;
}
if (StatisticsUtil.isUnsupportedType(table.getColumn(column.colName).getType())) {
continue;
}
Set<Pair<String, String>> columnIndexPairs = table.getColumnIndexPairs(
Collections.singleton(column.colName));
for (Pair<String, String> pair : columnIndexPairs) {
if (StatisticsUtil.needAnalyzeColumn(table, pair)) {
ret.add(column.toThrift());
break;
}
}
}
return ret;
}
protected List<TQueryColumn> convertSetToList(Set<TQueryColumn> set) {
return new ArrayList<>(set);
}
}