BrokerUtil.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.util;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.backup.Status;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.fsv2.FileSystemFactory;
import org.apache.doris.fsv2.remote.RemoteFile;
import org.apache.doris.fsv2.remote.RemoteFileSystem;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TBrokerCheckPathExistRequest;
import org.apache.doris.thrift.TBrokerCheckPathExistResponse;
import org.apache.doris.thrift.TBrokerCloseReaderRequest;
import org.apache.doris.thrift.TBrokerCloseWriterRequest;
import org.apache.doris.thrift.TBrokerDeletePathRequest;
import org.apache.doris.thrift.TBrokerFD;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TBrokerListPathRequest;
import org.apache.doris.thrift.TBrokerListResponse;
import org.apache.doris.thrift.TBrokerOpenMode;
import org.apache.doris.thrift.TBrokerOpenReaderRequest;
import org.apache.doris.thrift.TBrokerOpenReaderResponse;
import org.apache.doris.thrift.TBrokerOpenWriterRequest;
import org.apache.doris.thrift.TBrokerOpenWriterResponse;
import org.apache.doris.thrift.TBrokerOperationStatus;
import org.apache.doris.thrift.TBrokerOperationStatusCode;
import org.apache.doris.thrift.TBrokerPReadRequest;
import org.apache.doris.thrift.TBrokerPWriteRequest;
import org.apache.doris.thrift.TBrokerReadResponse;
import org.apache.doris.thrift.TBrokerRenamePathRequest;
import org.apache.doris.thrift.TBrokerVersion;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloBrokerService;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class BrokerUtil {
private static final Logger LOG = LogManager.getLogger(BrokerUtil.class);
private static final int READ_BUFFER_SIZE_B = 1024 * 1024;
/**
* Parse file status in path with broker, except directory
* @param path
* @param brokerDesc
* @param fileStatuses: file path, size, isDir, isSplittable
* @throws UserException if broker op failed
*/
public static void parseFile(String path, BrokerDesc brokerDesc, List<TBrokerFileStatus> fileStatuses)
throws UserException {
List<RemoteFile> rfiles = new ArrayList<>();
try (RemoteFileSystem fileSystem = FileSystemFactory.get(brokerDesc)) {
Status st = fileSystem.globList(path, rfiles, false);
if (!st.ok()) {
throw new UserException(st.getErrMsg());
}
} catch (Exception e) {
LOG.warn("{} list path exception, path={}", brokerDesc.getName(), path, e);
throw new UserException(brokerDesc.getName() + " list path exception. path="
+ path + ", err: " + e.getMessage());
}
for (RemoteFile r : rfiles) {
if (r.isFile()) {
TBrokerFileStatus status = new TBrokerFileStatus(r.getName(), !r.isFile(), r.getSize(), r.isFile());
status.setBlockSize(r.getBlockSize());
status.setModificationTime(r.getModificationTime());
fileStatuses.add(status);
}
}
}
public static void deleteDirectoryWithFileSystem(String path, BrokerDesc brokerDesc) throws UserException {
try (RemoteFileSystem fileSystem = FileSystemFactory.get(brokerDesc.getStorageProperties())) {
Status st = fileSystem.deleteDirectory(path);
if (!st.ok()) {
throw new UserException(brokerDesc.getName() + " delete directory exception. path="
+ path + ", err: " + st.getErrMsg());
}
} catch (Exception e) {
LOG.warn("{} delete directory exception, path={}", brokerDesc.getName(), path, e);
throw new UserException(brokerDesc.getName() + " delete directory exception. path="
+ path + ", err: " + e.getMessage());
}
}
public static String printBroker(String brokerName, TNetworkAddress address) {
return brokerName + "[" + address.toString() + "]";
}
public static List<String> parseColumnsFromPath(String filePath, List<String> columnsFromPath)
throws UserException {
return parseColumnsFromPath(filePath, columnsFromPath, true, false);
}
public static List<String> parseColumnsFromPath(
String filePath,
List<String> columnsFromPath,
boolean caseSensitive,
boolean isACID)
throws UserException {
if (columnsFromPath == null || columnsFromPath.isEmpty()) {
return Collections.emptyList();
}
// if it is ACID, the path count is 3. The hdfs path is hdfs://xxx/table_name/par=xxx/delta(or base)_xxx/.
int pathCount = isACID ? 3 : 2;
if (!caseSensitive) {
for (int i = 0; i < columnsFromPath.size(); i++) {
String path = columnsFromPath.remove(i);
columnsFromPath.add(i, path.toLowerCase());
}
}
String[] strings = filePath.split("/");
if (strings.length < 2) {
throw new UserException("Fail to parse columnsFromPath, expected: "
+ columnsFromPath + ", filePath: " + filePath);
}
String[] columns = new String[columnsFromPath.size()];
int size = 0;
boolean skipOnce = true;
for (int i = strings.length - pathCount; i >= 0; i--) {
String str = strings[i];
if (str != null && str.isEmpty()) {
continue;
}
if (str == null || !str.contains("=")) {
if (!isACID && skipOnce) {
skipOnce = false;
continue;
}
throw new UserException("Fail to parse columnsFromPath, expected: "
+ columnsFromPath + ", filePath: " + filePath);
}
skipOnce = false;
String[] pair = str.split("=", 2);
if (pair.length != 2) {
throw new UserException("Fail to parse columnsFromPath, expected: "
+ columnsFromPath + ", filePath: " + filePath);
}
String parsedColumnName = caseSensitive ? pair[0] : pair[0].toLowerCase();
int index = columnsFromPath.indexOf(parsedColumnName);
if (index == -1) {
continue;
}
columns[index] = HiveMetaStoreCache.HIVE_DEFAULT_PARTITION.equals(pair[1])
? FeConstants.null_string : pair[1];
size++;
if (size >= columnsFromPath.size()) {
break;
}
}
if (size != columnsFromPath.size()) {
throw new UserException("Fail to parse columnsFromPath, expected: "
+ columnsFromPath + ", filePath: " + filePath);
}
return Lists.newArrayList(columns);
}
/**
* Read binary data from path with broker
* @param path
* @param brokerDesc
* @return byte[]
* @throws UserException if broker op failed or not only one file
*/
public static byte[] readFile(String path, BrokerDesc brokerDesc, long maxReadLen) throws UserException {
TNetworkAddress address = getAddress(brokerDesc);
TPaloBrokerService.Client client = borrowClient(address);
boolean failed = true;
TBrokerFD fd = null;
try {
// get file size
TBrokerListPathRequest request = new TBrokerListPathRequest(
TBrokerVersion.VERSION_ONE, path, false, brokerDesc.getBackendConfigProperties());
TBrokerListResponse tBrokerListResponse = null;
try {
tBrokerListResponse = client.listPath(request);
} catch (TException e) {
reopenClient(client);
tBrokerListResponse = client.listPath(request);
}
if (tBrokerListResponse.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) {
throw new UserException("Broker list path failed. path=" + path + ", broker=" + address
+ ",msg=" + tBrokerListResponse.getOpStatus().getMessage());
}
List<TBrokerFileStatus> fileStatuses = tBrokerListResponse.getFiles();
if (fileStatuses.size() != 1) {
throw new UserException("Broker files num error. path=" + path + ", broker=" + address
+ ", files num: " + fileStatuses.size());
}
Preconditions.checkState(!fileStatuses.get(0).isIsDir());
long fileSize = fileStatuses.get(0).getSize();
// open reader
String clientId = NetUtils
.getHostPortInAccessibleFormat(FrontendOptions.getLocalHostAddress(), Config.rpc_port);
TBrokerOpenReaderRequest tOpenReaderRequest = new TBrokerOpenReaderRequest(
TBrokerVersion.VERSION_ONE, path, 0, clientId, brokerDesc.getBackendConfigProperties());
TBrokerOpenReaderResponse tOpenReaderResponse = null;
try {
tOpenReaderResponse = client.openReader(tOpenReaderRequest);
} catch (TException e) {
reopenClient(client);
tOpenReaderResponse = client.openReader(tOpenReaderRequest);
}
if (tOpenReaderResponse.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) {
throw new UserException("Broker open reader failed. path=" + path + ", broker=" + address
+ ", msg=" + tOpenReaderResponse.getOpStatus().getMessage());
}
fd = tOpenReaderResponse.getFd();
// read
long readLen = fileSize;
if (maxReadLen > 0 && maxReadLen < fileSize) {
readLen = maxReadLen;
}
TBrokerPReadRequest tPReadRequest = new TBrokerPReadRequest(
TBrokerVersion.VERSION_ONE, fd, 0, readLen);
TBrokerReadResponse tReadResponse = null;
try {
tReadResponse = client.pread(tPReadRequest);
} catch (TException e) {
reopenClient(client);
tReadResponse = client.pread(tPReadRequest);
}
if (tReadResponse.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) {
throw new UserException("Broker read failed. path=" + path + ", broker=" + address
+ ", msg=" + tReadResponse.getOpStatus().getMessage());
}
failed = false;
return tReadResponse.getData();
} catch (TException e) {
String failMsg = "Broker read file exception. path=" + path + ", broker=" + address;
LOG.warn(failMsg, e);
throw new UserException(failMsg);
} finally {
// close reader
if (fd != null) {
failed = true;
TBrokerCloseReaderRequest tCloseReaderRequest = new TBrokerCloseReaderRequest(
TBrokerVersion.VERSION_ONE, fd);
TBrokerOperationStatus tOperationStatus = null;
try {
tOperationStatus = client.closeReader(tCloseReaderRequest);
} catch (TException e) {
reopenClient(client);
try {
tOperationStatus = client.closeReader(tCloseReaderRequest);
} catch (TException ex) {
LOG.warn("Broker close reader failed. path={}, address={}", path, address, ex);
}
}
if (tOperationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) {
LOG.warn("Broker close reader failed. path={}, address={}, error={}", path, address,
tOperationStatus.getMessage());
} else {
failed = false;
}
}
// return client
returnClient(client, address, failed);
}
}
/**
* Write binary data to destFilePath with broker
* @param data
* @param destFilePath
* @param brokerDesc
* @throws UserException if broker op failed
*/
public static void writeFile(byte[] data, String destFilePath, BrokerDesc brokerDesc) throws UserException {
BrokerWriter writer = new BrokerWriter(destFilePath, brokerDesc);
try {
writer.open();
ByteBuffer byteBuffer = ByteBuffer.wrap(data);
writer.write(byteBuffer, data.length);
} finally {
writer.close();
}
}
/**
* Write srcFilePath file to destFilePath with broker
* @param srcFilePath
* @param destFilePath
* @param brokerDesc
* @throws UserException if broker op failed
*/
public static void writeFile(String srcFilePath, String destFilePath,
BrokerDesc brokerDesc) throws UserException {
FileInputStream fis = null;
FileChannel channel = null;
BrokerWriter writer = new BrokerWriter(destFilePath, brokerDesc);
ByteBuffer byteBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE_B);
try {
writer.open();
fis = new FileInputStream(srcFilePath);
channel = fis.getChannel();
while (true) {
int readSize = channel.read(byteBuffer);
if (readSize == -1) {
break;
}
byteBuffer.flip();
writer.write(byteBuffer, readSize);
byteBuffer.clear();
}
} catch (IOException e) {
String failMsg = "Read file exception. filePath=" + srcFilePath;
LOG.warn(failMsg, e);
throw new UserException(failMsg);
} finally {
// close broker file writer and local file input stream
writer.close();
try {
if (channel != null) {
channel.close();
}
if (fis != null) {
fis.close();
}
} catch (IOException e) {
LOG.warn("Close local file failed. srcPath={}", srcFilePath, e);
}
}
}
/**
* Delete path with broker
* @param path
* @param brokerDesc
* @throws UserException if broker op failed
*/
public static void deletePathWithBroker(String path, BrokerDesc brokerDesc) throws UserException {
TNetworkAddress address = getAddress(brokerDesc);
TPaloBrokerService.Client client = borrowClient(address);
boolean failed = true;
try {
TBrokerDeletePathRequest tDeletePathRequest = new TBrokerDeletePathRequest(
TBrokerVersion.VERSION_ONE, path, brokerDesc.getBackendConfigProperties());
TBrokerOperationStatus tOperationStatus = null;
try {
tOperationStatus = client.deletePath(tDeletePathRequest);
} catch (TException e) {
reopenClient(client);
tOperationStatus = client.deletePath(tDeletePathRequest);
}
if (tOperationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) {
throw new UserException("Broker delete path failed. path=" + path + ", broker=" + address
+ ", msg=" + tOperationStatus.getMessage());
}
failed = false;
} catch (TException e) {
LOG.warn("Broker read path exception, path={}, address={}, exception={}", path, address, e);
throw new UserException("Broker read path exception. path=" + path + ",broker=" + address);
} finally {
returnClient(client, address, failed);
}
}
public static boolean checkPathExist(String remotePath, BrokerDesc brokerDesc) throws UserException {
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBrokerAddressAndClient(brokerDesc);
TPaloBrokerService.Client client = pair.first;
TNetworkAddress address = pair.second;
boolean failed = true;
try {
TBrokerCheckPathExistRequest req = new TBrokerCheckPathExistRequest(TBrokerVersion.VERSION_ONE,
remotePath, brokerDesc.getBackendConfigProperties());
TBrokerCheckPathExistResponse rep = client.checkPathExist(req);
if (rep.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) {
throw new UserException("Broker check path exist failed. path=" + remotePath + ", broker=" + address
+ ", msg=" + rep.getOpStatus().getMessage());
}
failed = false;
return rep.isPathExist;
} catch (TException e) {
LOG.warn("Broker check path exist failed, path={}, address={}, exception={}", remotePath, address, e);
throw new UserException("Broker check path exist exception. path=" + remotePath + ",broker=" + address);
} finally {
returnClient(client, address, failed);
}
}
public static void rename(String origFilePath, String destFilePath, BrokerDesc brokerDesc) throws UserException {
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBrokerAddressAndClient(brokerDesc);
TPaloBrokerService.Client client = pair.first;
TNetworkAddress address = pair.second;
boolean failed = true;
try {
TBrokerRenamePathRequest req = new TBrokerRenamePathRequest(TBrokerVersion.VERSION_ONE, origFilePath,
destFilePath, brokerDesc.getBackendConfigProperties());
TBrokerOperationStatus rep = client.renamePath(req);
if (rep.getStatusCode() != TBrokerOperationStatusCode.OK) {
throw new UserException("failed to rename " + origFilePath + " to " + destFilePath
+ ", msg: " + rep.getMessage() + ", broker: " + address);
}
failed = false;
} catch (TException e) {
LOG.warn("Broker rename file failed, origin path={}, dest path={}, address={}, exception={}",
origFilePath, destFilePath, address, e);
throw new UserException("Broker rename file exception. origin path=" + origFilePath
+ ", dest path=" + destFilePath + ", broker=" + address);
} finally {
returnClient(client, address, failed);
}
}
public static Pair<TPaloBrokerService.Client, TNetworkAddress> getBrokerAddressAndClient(BrokerDesc brokerDesc)
throws UserException {
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = Pair.of(null, null);
TNetworkAddress address = getAddress(brokerDesc);
TPaloBrokerService.Client client = borrowClient(address);
pair.first = client;
pair.second = address;
return pair;
}
public static TNetworkAddress getAddress(BrokerDesc brokerDesc) throws UserException {
FsBroker broker = null;
try {
String localIP = FrontendOptions.getLocalHostAddress();
broker = Env.getCurrentEnv().getBrokerMgr().getBroker(brokerDesc.getName(), localIP);
} catch (AnalysisException e) {
throw new UserException(e.getMessage());
}
return new TNetworkAddress(broker.host, broker.port);
}
public static TPaloBrokerService.Client borrowClient(TNetworkAddress address) throws UserException {
TPaloBrokerService.Client client = null;
try {
client = ClientPool.brokerPool.borrowObject(address);
} catch (Exception e) {
try {
client = ClientPool.brokerPool.borrowObject(address);
} catch (Exception e1) {
throw new UserException("Create connection to broker(" + address + ") failed.");
}
}
return client;
}
private static void returnClient(TPaloBrokerService.Client client, TNetworkAddress address, boolean failed) {
if (failed) {
ClientPool.brokerPool.invalidateObject(address, client);
} else {
ClientPool.brokerPool.returnObject(address, client);
}
}
private static void reopenClient(TPaloBrokerService.Client client) {
ClientPool.brokerPool.reopen(client);
}
private static class BrokerWriter {
private String brokerFilePath;
private BrokerDesc brokerDesc;
private TPaloBrokerService.Client client;
private TNetworkAddress address;
private TBrokerFD fd;
private long currentOffset;
private boolean isReady;
private boolean failed;
public BrokerWriter(String brokerFilePath, BrokerDesc brokerDesc) {
this.brokerFilePath = brokerFilePath;
this.brokerDesc = brokerDesc;
this.isReady = false;
this.failed = true;
}
public void open() throws UserException {
failed = true;
address = BrokerUtil.getAddress(brokerDesc);
client = BrokerUtil.borrowClient(address);
try {
String clientId = NetUtils
.getHostPortInAccessibleFormat(FrontendOptions.getLocalHostAddress(), Config.rpc_port);
TBrokerOpenWriterRequest tOpenWriterRequest = new TBrokerOpenWriterRequest(
TBrokerVersion.VERSION_ONE, brokerFilePath, TBrokerOpenMode.APPEND,
clientId, brokerDesc.getBackendConfigProperties());
TBrokerOpenWriterResponse tOpenWriterResponse = null;
try {
tOpenWriterResponse = client.openWriter(tOpenWriterRequest);
} catch (TException e) {
reopenClient(client);
tOpenWriterResponse = client.openWriter(tOpenWriterRequest);
}
if (tOpenWriterResponse.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) {
throw new UserException("Broker open writer failed. destPath=" + brokerFilePath
+ ", broker=" + address
+ ", msg=" + tOpenWriterResponse.getOpStatus().getMessage());
}
failed = false;
fd = tOpenWriterResponse.getFd();
currentOffset = 0L;
isReady = true;
} catch (TException e) {
String failMsg = "Broker open writer exception. filePath=" + brokerFilePath + ", broker=" + address;
LOG.warn(failMsg, e);
throw new UserException(failMsg);
}
}
public void write(ByteBuffer byteBuffer, long bufferSize) throws UserException {
if (!isReady) {
throw new UserException("Broker writer is not ready. filePath="
+ brokerFilePath + ", broker=" + address);
}
failed = true;
TBrokerOperationStatus tOperationStatus = null;
TBrokerPWriteRequest tPWriteRequest = new TBrokerPWriteRequest(
TBrokerVersion.VERSION_ONE, fd, currentOffset, byteBuffer);
try {
try {
tOperationStatus = client.pwrite(tPWriteRequest);
} catch (TException e) {
reopenClient(client);
tOperationStatus = client.pwrite(tPWriteRequest);
}
if (tOperationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) {
throw new UserException("Broker write failed. filePath=" + brokerFilePath + ", broker=" + address
+ ", msg=" + tOperationStatus.getMessage());
}
failed = false;
currentOffset += bufferSize;
} catch (TException e) {
String failMsg = "Broker write exception. filePath=" + brokerFilePath + ", broker=" + address;
LOG.warn(failMsg, e);
throw new UserException(failMsg);
}
}
public void close() {
// close broker writer
failed = true;
TBrokerOperationStatus tOperationStatus = null;
if (fd != null) {
TBrokerCloseWriterRequest tCloseWriterRequest = new TBrokerCloseWriterRequest(
TBrokerVersion.VERSION_ONE, fd);
try {
tOperationStatus = client.closeWriter(tCloseWriterRequest);
} catch (TException e) {
reopenClient(client);
try {
tOperationStatus = client.closeWriter(tCloseWriterRequest);
} catch (TException ex) {
LOG.warn("Broker close writer failed. filePath={}, address={}", brokerFilePath, address, ex);
}
}
if (tOperationStatus == null) {
LOG.warn("Broker close reader failed. fd={}, address={}", fd.toString(), address);
} else if (tOperationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) {
LOG.warn("Broker close writer failed. filePath={}, address={}, error={}", brokerFilePath,
address, tOperationStatus.getMessage());
} else {
failed = false;
}
}
// return client
returnClient(client, address, failed);
isReady = false;
}
}
}