MasterCatalogExecutor.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.UserException;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TInitExternalCtlMetaRequest;
import org.apache.doris.thrift.TInitExternalCtlMetaResult;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* The client for Observer FE to forward external datasource object init request to master.
* Including init ExternalCatalog, ExternalDatabase and ExternalTable.
* This client will wait for the journal ID replayed at this Observer FE before return.
*/
public class MasterCatalogExecutor {
private static final Logger LOG = LogManager.getLogger(MasterCatalogExecutor.class);
public static final String STATUS_OK = "OK";
private int waitTimeoutMs;
public MasterCatalogExecutor(int waitTimeoutMs) {
this.waitTimeoutMs = waitTimeoutMs;
}
public void forward(long catalogId, long dbId) throws Exception {
Env.getCurrentEnv().checkReadyOrThrow();
String masterHost = Env.getCurrentEnv().getMasterHost();
int masterRpcPort = Env.getCurrentEnv().getMasterRpcPort();
TNetworkAddress thriftAddress = new TNetworkAddress(masterHost, masterRpcPort);
FrontendService.Client client = null;
try {
client = ClientPool.frontendPool.borrowObject(thriftAddress, waitTimeoutMs);
} catch (Exception e) {
throw new Exception("Failed to get master client.", e);
}
TInitExternalCtlMetaRequest request = new TInitExternalCtlMetaRequest();
request.setCatalogId(catalogId);
if (dbId != -1) {
request.setDbId(dbId);
}
boolean isReturnToPool = false;
try {
TInitExternalCtlMetaResult result = client.initExternalCtlMeta(request);
if (!result.getStatus().equalsIgnoreCase(STATUS_OK)) {
throw new UserException(result.getStatus());
} else {
// DO NOT wait on journal replayed, this may cause deadlock.
// 1. hold table read lock
// 2. wait on journal replayed
// 3. previous journal (eg, txn journal) replayed need to hold table write lock
// 4. deadlock
// But no waiting on journal replayed may cause some request on non-master FE failed for some time.
// There is no good solution for this.
// In feature version, this whole process is refactored, so we temporarily remove this waiting.
// Env.getCurrentEnv().getJournalObservable().waitOn(result.maxJournalId, timeoutMs);
isReturnToPool = true;
}
} catch (Exception e) {
LOG.warn("Failed to finish forward init operation, please try again. ", e);
throw e;
} finally {
if (isReturnToPool) {
ClientPool.frontendPool.returnObject(thriftAddress, client);
} else {
ClientPool.frontendPool.invalidateObject(thriftAddress, client);
}
}
}
}