FESessionMgr.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog;

import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService.HostInfo;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TFrontendReportAliveSessionRequest;
import org.apache.doris.thrift.TFrontendReportAliveSessionResult;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.concurrent.ExecutorService;

/*
 * FESessionMgr is for collecting alive sessions from frontends.
 * Only run on master FE.
 */
public class FESessionMgr extends MasterDaemon {
    private static final Logger LOG = LogManager.getLogger(FESessionMgr.class);

    private final ExecutorService executor;

    private int clusterId;

    private String token;

    public FESessionMgr() {
        super("fe-session-mgr", Config.alive_session_update_interval_second * 1000);
        this.executor = ThreadPoolManager.newDaemonFixedThreadPool(Config.fe_session_mgr_threads_num,
            Config.fe_session_mgr_blocking_queue_size, "all-fe-session-mgr-pool", false);
    }

    @Override
    protected void runAfterCatalogReady() {
        List<Frontend> frontends = Env.getCurrentEnv().getFrontends(null);
        for (Frontend frontend : frontends) {
            if (!frontend.isAlive()) {
                continue;
            }
            FEAliveSessionHandler handler = new FEAliveSessionHandler(frontend, clusterId, token);
            executor.submit(handler);
        }
    }

    public void setClusterId(int clusterId) {
        this.clusterId = clusterId;
    }

    public void setToken(String token) {
        this.token = token;
    }

    private class FEAliveSessionHandler implements Runnable {
        private final Frontend fe;
        private final int clusterId;
        private final String token;

        public FEAliveSessionHandler(Frontend fe, int clusterId, String token) {
            this.fe = fe;
            this.clusterId = clusterId;
            this.token = token;
        }

        @Override
        public void run() {
            Env env = Env.getCurrentEnv();
            HostInfo selfNode = env.getSelfNode();
            if (fe.getHost().equals(selfNode.getHost())) {
                if (env.isReady()) {
                    List<String> sessionIds = env.getAllAliveSessionIds();
                    for (String sessionId : sessionIds) {
                        Env.getCurrentEnv().checkAndRefreshSession(sessionId);
                    }
                } else {
                    LOG.info("Master FE is not ready");
                }
            } else {
                getAliveSessionAndRefresh();
            }
        }

        private void getAliveSessionAndRefresh() {
            FrontendService.Client client = null;
            TNetworkAddress addr = new TNetworkAddress(fe.getHost(), fe.getRpcPort());
            boolean ok = false;
            try {
                client = ClientPool.frontendPool.borrowObject(addr);
                TFrontendReportAliveSessionRequest request = new TFrontendReportAliveSessionRequest(clusterId, token);
                TFrontendReportAliveSessionResult result = client.getAliveSessions(request);
                ok = true;
                if (result.getStatus() == TStatusCode.OK) {
                    List<String> sessionIds = result.getSessionIdList();
                    for (String sessionId : sessionIds) {
                        Env.getCurrentEnv().checkAndRefreshSession(sessionId);
                    }
                } else {
                    LOG.warn("Error occurred when get alive session from " + fe.getHost()
                            + ", msg = " + result.getMsg());
                }
            } catch (Exception e) {
                LOG.warn("Error occurred when get alive session from " + fe.getHost()
                        + ", msg = " + e.getMessage());
            } finally {
                if (ok) {
                    ClientPool.frontendPool.returnObject(addr, client);
                } else {
                    ClientPool.frontendPool.invalidateObject(addr, client);
                }
            }
        }
    }
}