TopicPublisherThread.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.publish;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPublishTopicRequest;
import org.apache.doris.thrift.TTopicInfoType;
import org.apache.doris.thrift.TWorkloadGroupInfo;
import org.apache.doris.thrift.TopicInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
public class TopicPublisherThread extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(TopicPublisherThread.class);
private SystemInfoService clusterInfoService;
private ExecutorService executor = ThreadPoolManager
.newDaemonFixedThreadPool(6, 256, "topic-publish-thread", true);
public TopicPublisherThread(String name, long intervalMs,
SystemInfoService clusterInfoService) {
super(name, intervalMs);
this.clusterInfoService = clusterInfoService;
}
private List<TopicPublisher> topicPublisherList = new ArrayList<>();
public void addToTopicPublisherList(TopicPublisher topicPublisher) {
this.topicPublisherList.add(topicPublisher);
}
@Override
protected void runAfterCatalogReady() {
LOG.info("[topic_publish]begin publish topic info");
// step 1: get all publish topic info
TPublishTopicRequest request = new TPublishTopicRequest();
for (TopicPublisher topicPublisher : topicPublisherList) {
topicPublisher.getTopicInfo(request);
}
// even request contains no group and schedule policy, we still need to send an empty rpc.
// because it may means workload group/policy is dropped
// step 2: publish topic info to all be
List<Backend> nodesToPublish = new ArrayList<>();
try {
for (Backend be : clusterInfoService.getAllBackendsByAllCluster().values()) {
if (be.isAlive()) {
nodesToPublish.add(be);
}
}
} catch (Exception e) {
LOG.warn("get backends failed", e);
return;
}
if (nodesToPublish.isEmpty()) {
LOG.info("no alive backend, skip publish topic");
return;
}
AckResponseHandler handler = new AckResponseHandler(nodesToPublish);
for (Backend be : nodesToPublish) {
executor.submit(new TopicPublishWorker(request, be, handler));
}
try {
int timeoutMs = Config.publish_topic_info_interval_ms / 3 * 2;
timeoutMs = timeoutMs <= 0 ? 3000 : timeoutMs;
if (!handler.awaitAllInMs(timeoutMs)) {
Backend[] backends = handler.pendingNodes();
if (backends.length > 0) {
LOG.warn("timed out waiting for all nodes to publish. (pending nodes: {})",
Arrays.toString(backends));
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public class TopicPublishWorker implements Runnable {
private TPublishTopicRequest request;
private Backend be;
private ResponseHandler handler;
public TopicPublishWorker(TPublishTopicRequest request, Backend node, ResponseHandler handler) {
this.request = request;
this.be = node;
this.handler = handler;
}
@Override
public void run() {
long beginTime = System.currentTimeMillis();
BackendService.Client client = null;
TNetworkAddress address = null;
boolean ok = false;
String logStr = "";
try {
for (Map.Entry<TTopicInfoType, List<TopicInfo>> entry : request.getTopicMap().entrySet()) {
logStr += " " + entry.getKey() + "=" + entry.getValue().size() + " ";
}
} catch (Exception e) {
LOG.warn("[topic_publish]make log detail for publish failed:", e);
}
try {
address = new TNetworkAddress(be.getHost(), be.getBePort());
client = ClientPool.backendPool.borrowObject(address);
// check whether workload group tag math current be
TPublishTopicRequest copiedRequest = request.deepCopy();
if (copiedRequest.isSetTopicMap()) {
Map<TTopicInfoType, List<TopicInfo>> topicMap = copiedRequest.getTopicMap();
List<TopicInfo> topicInfoList = topicMap.get(TTopicInfoType.WORKLOAD_GROUP);
if (topicInfoList != null) {
String beComputeGroup = be.getComputeGroup();
Iterator<TopicInfo> topicIter = topicInfoList.iterator();
while (topicIter.hasNext()) {
TopicInfo topicInfo = topicIter.next();
if (topicInfo.isSetWorkloadGroupInfo()) {
TWorkloadGroupInfo tWgInfo = topicInfo.getWorkloadGroupInfo();
if (tWgInfo.isSetTag() && !tWgInfo.getTag().equals(beComputeGroup)) {
// currently TopicInfo could not contain both policy and workload group,
// so we can remove TopicInfo directly.
topicIter.remove();
}
}
}
}
}
client.publishTopicInfo(copiedRequest);
ok = true;
LOG.info("[topic_publish]publish topic info to be {} success, time cost={} ms, details:{}",
be.getHost(), (System.currentTimeMillis() - beginTime), logStr);
} catch (Exception e) {
LOG.warn("[topic_publish]publish topic info to be {} error happens: , time cost={} ms, details:{}",
be.getHost(), (System.currentTimeMillis() - beginTime), logStr, e);
} finally {
try {
if (ok) {
ClientPool.backendPool.returnObject(address, client);
} else {
ClientPool.backendPool.invalidateObject(address, client);
}
} catch (Throwable e) {
LOG.warn("[topic_publish]recycle topic publish client failed. related backend[{}]", be.getHost(),
e);
}
handler.onResponse(be);
}
}
}
}