CacheBeProxy.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe.cache;
import org.apache.doris.common.Status;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.Types;
import org.apache.doris.qe.SimpleScheduler;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Encapsulates access to BE, including network and other exception handling
*/
public class CacheBeProxy extends CacheProxy {
private static final Logger LOG = LogManager.getLogger(CacheBeProxy.class);
public void updateCache(InternalService.PUpdateCacheRequest request, int timeoutMs, Status status) {
Types.PUniqueId sqlKey = request.getSqlKey();
Backend backend = CacheCoordinator.getInstance().findBackend(sqlKey);
if (backend == null) {
LOG.warn("update cache can't find backend, sqlKey {}", sqlKey);
return;
}
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
try {
Future<InternalService.PCacheResponse> future = BackendServiceProxy.getInstance()
.updateCache(address, request);
InternalService.PCacheResponse response = future.get(timeoutMs, TimeUnit.MILLISECONDS);
if (response.getStatus() == InternalService.PCacheStatus.CACHE_OK) {
status.updateStatus(TStatusCode.OK, "CACHE_OK");
} else {
status.updateStatus(TStatusCode.INTERNAL_ERROR, response.getStatus().toString());
}
} catch (Exception e) {
LOG.warn("update cache exception, sqlKey {}", sqlKey, e);
status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage());
SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage());
}
}
public InternalService.PFetchCacheResult fetchCache(InternalService.PFetchCacheRequest request,
int timeoutMs, Status status) {
Types.PUniqueId sqlKey = request.getSqlKey();
Backend backend = CacheCoordinator.getInstance().findBackend(sqlKey);
if (backend == null) {
return null;
}
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
try {
Future<InternalService.PFetchCacheResult> future = BackendServiceProxy.getInstance()
.fetchCache(address, request);
return future.get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (RpcException e) {
LOG.warn("fetch catch rpc exception, sqlKey {}, backend {}", sqlKey, backend.getId(), e);
status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage());
SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage());
} catch (InterruptedException e) {
LOG.warn("future get interrupted exception, sqlKey {}, backend {}", sqlKey, backend.getId(), e);
status.updateStatus(TStatusCode.INTERNAL_ERROR, "interrupted exception");
} catch (ExecutionException e) {
LOG.warn("future get execution exception, sqlKey {}, backend {}", sqlKey, backend.getId(), e);
status.updateStatus(TStatusCode.INTERNAL_ERROR, "execution exception");
} catch (TimeoutException e) {
LOG.warn("fetch result timeout, sqlKey {}, backend {}", sqlKey, backend.getId(), e);
status.updateStatus(TStatusCode.TIMEOUT, "query timeout");
}
return null;
}
public void clearCache(InternalService.PClearCacheRequest request) {
this.clearCache(request, CacheCoordinator.getInstance().getBackendList());
}
public void clearCache(InternalService.PClearCacheRequest request, List<Backend> beList) {
int retry;
Status status = new Status();
for (Backend backend : beList) {
retry = 1;
while (retry < 3 && !this.clearCache(request, backend, CLEAR_TIMEOUT, status)) {
retry++;
try {
Thread.sleep(1000); //sleep 1 second
} catch (Exception e) {
// CHECKSTYLE IGNORE THIS LINE
}
}
if (retry >= 3) {
String errMsg = "clear cache timeout, backend " + backend.getId();
LOG.warn(errMsg);
SimpleScheduler.addToBlacklist(backend.getId(), errMsg);
}
}
}
protected boolean clearCache(InternalService.PClearCacheRequest request,
Backend backend, int timeoutMs, Status status) {
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
try {
request = request.toBuilder().setClearType(InternalService.PClearType.CLEAR_ALL).build();
LOG.info("clear all backend cache, backendId {}", backend.getId());
Future<InternalService.PCacheResponse> future
= BackendServiceProxy.getInstance().clearCache(address, request);
InternalService.PCacheResponse response = future.get(timeoutMs, TimeUnit.MILLISECONDS);
if (response.getStatus() == InternalService.PCacheStatus.CACHE_OK) {
status.updateStatus(TStatusCode.OK, "CACHE_OK");
return true;
} else {
status.updateStatus(TStatusCode.INTERNAL_ERROR, response.getStatus().toString());
return false;
}
} catch (Exception e) {
LOG.warn("clear cache exception, backendId {}", backend.getId(), e);
}
return false;
}
}