DefaultConnectorValidationContext.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.connector;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.JdbcResource;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.connector.api.ConnectorValidationContext;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.proto.InternalService.PJdbcTestConnectionRequest;
import org.apache.doris.proto.InternalService.PJdbcTestConnectionResult;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
import com.google.protobuf.ByteString;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* Engine-side implementation of {@link ConnectorValidationContext}.
*
* <p>Provides driver validation (via {@link JdbcResource}), checksum computation,
* and deferred BE���external connectivity testing (via BRPC) as infrastructure
* services that connectors can call during pre-creation validation.</p>
*
* <p>Connectors register a BE connectivity test via {@link #requestBeConnectivityTest};
* the engine calls {@link #executePendingBeTests()} after validation to send
* the BRPC request to an alive backend.</p>
*/
public class DefaultConnectorValidationContext implements ConnectorValidationContext {
private final long catalogId;
private final CatalogProperty catalogProperty;
// Pending BE connectivity test, populated by connector during preCreateValidation().
private byte[] pendingBeTestPayload;
private int pendingBeTestConnectionType;
private String pendingBeTestQuery;
public DefaultConnectorValidationContext(long catalogId, CatalogProperty catalogProperty) {
this.catalogId = catalogId;
this.catalogProperty = catalogProperty;
}
@Override
public long getCatalogId() {
return catalogId;
}
@Override
public String getProperty(String key) {
return catalogProperty.getOrDefault(key, null);
}
@Override
public void storeProperty(String key, String value) {
catalogProperty.addProperty(key, value);
}
@Override
public String validateAndResolveDriverPath(String driverUrl) throws Exception {
return JdbcResource.getFullDriverUrl(driverUrl);
}
@Override
public String computeDriverChecksum(String driverUrl) throws Exception {
return JdbcResource.computeObjectChecksum(driverUrl);
}
@Override
public void requestBeConnectivityTest(byte[] serializedDescriptor, int connectionTypeValue,
String testQuery) {
this.pendingBeTestPayload = serializedDescriptor;
this.pendingBeTestConnectionType = connectionTypeValue;
this.pendingBeTestQuery = testQuery;
}
/**
* Executes the pending BE connectivity test, if any was registered by
* the connector during {@code preCreateValidation()}.
*
* <p>This is an engine-only method (not on the SPI interface). It finds
* an alive backend and sends a BRPC test-connection request.</p>
*/
public void executePendingBeTests() throws DdlException {
if (pendingBeTestPayload == null) {
return;
}
if (FeConstants.runningUnitTest) {
return;
}
Backend aliveBe = findAliveBackend();
TNetworkAddress address = new TNetworkAddress(aliveBe.getHost(), aliveBe.getBrpcPort());
try {
PJdbcTestConnectionRequest request = PJdbcTestConnectionRequest.newBuilder()
.setJdbcTable(ByteString.copyFrom(pendingBeTestPayload))
.setJdbcTableType(pendingBeTestConnectionType)
.setQueryStr(pendingBeTestQuery)
.build();
Future<PJdbcTestConnectionResult> future = BackendServiceProxy.getInstance()
.testJdbcConnection(address, request);
PJdbcTestConnectionResult result = future.get();
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
throw new DdlException("BE connectivity test failed: "
+ result.getStatus().getErrorMsgs(0));
}
} catch (RpcException | ExecutionException | InterruptedException e) {
throw new DdlException("BE connectivity test failed: " + e.getMessage(), e);
}
}
private static Backend findAliveBackend() throws DdlException {
try {
for (Backend be : Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values()) {
if (be.isAlive()) {
return be;
}
}
} catch (Exception e) {
throw new DdlException("Failed to find alive backend: " + e.getMessage(), e);
}
throw new DdlException("BE connectivity test failed: No alive backends");
}
}