BrokerFileSystem.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fsv2.remote;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.backup.Status;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.storage.BrokerProperties;
import org.apache.doris.fs.operations.BrokerFileOperations;
import org.apache.doris.fs.operations.OpParams;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TBrokerCheckPathExistRequest;
import org.apache.doris.thrift.TBrokerCheckPathExistResponse;
import org.apache.doris.thrift.TBrokerDeletePathRequest;
import org.apache.doris.thrift.TBrokerFD;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TBrokerIsSplittableRequest;
import org.apache.doris.thrift.TBrokerIsSplittableResponse;
import org.apache.doris.thrift.TBrokerListPathRequest;
import org.apache.doris.thrift.TBrokerListResponse;
import org.apache.doris.thrift.TBrokerOperationStatus;
import org.apache.doris.thrift.TBrokerOperationStatusCode;
import org.apache.doris.thrift.TBrokerPReadRequest;
import org.apache.doris.thrift.TBrokerPWriteRequest;
import org.apache.doris.thrift.TBrokerReadResponse;
import org.apache.doris.thrift.TBrokerRenamePathRequest;
import org.apache.doris.thrift.TBrokerVersion;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloBrokerService;
import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
public class BrokerFileSystem extends RemoteFileSystem {
private static final Logger LOG = LogManager.getLogger(BrokerFileSystem.class);
private final BrokerFileOperations operations;
public BrokerFileSystem(String name, Map<String, String> properties) {
super(name, StorageBackend.StorageType.BROKER);
properties.putAll(PropertyConverter.convertToHadoopFSProperties(properties));
this.properties = properties;
this.operations = new BrokerFileOperations(name, properties);
// support broker properties in future
this.storageProperties = new BrokerProperties(properties);
}
public Pair<TPaloBrokerService.Client, TNetworkAddress> getBroker() {
Pair<TPaloBrokerService.Client, TNetworkAddress> result = Pair.of(null, null);
FsBroker broker;
try {
String localIP = FrontendOptions.getLocalHostAddress();
broker = Env.getCurrentEnv().getBrokerMgr().getBroker(name, localIP);
} catch (AnalysisException e) {
LOG.warn("failed to get a broker address: " + e.getMessage());
return null;
}
TNetworkAddress address = new TNetworkAddress(broker.host, broker.port);
TPaloBrokerService.Client client;
try {
client = ClientPool.brokerPool.borrowObject(address);
} catch (Exception e) {
LOG.warn("failed to get broker client: " + e.getMessage());
return null;
}
result.first = client;
result.second = address;
LOG.info("get broker: {}", BrokerUtil.printBroker(name, address));
return result;
}
@Override
public Status exists(String remotePath) {
// 1. get a proper broker
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker();
if (pair == null) {
return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client");
}
TPaloBrokerService.Client client = pair.first;
TNetworkAddress address = pair.second;
// check path
boolean needReturn = true;
try {
TBrokerCheckPathExistRequest req = new TBrokerCheckPathExistRequest(TBrokerVersion.VERSION_ONE,
remotePath, properties);
TBrokerCheckPathExistResponse rep = client.checkPathExist(req);
TBrokerOperationStatus opst = rep.getOpStatus();
if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) {
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to check remote path exist: " + remotePath
+ ", broker: " + BrokerUtil.printBroker(name, address)
+ ". msg: " + opst.getMessage());
}
if (!rep.isIsPathExist()) {
return new Status(Status.ErrCode.NOT_FOUND, "remote path does not exist: " + remotePath);
}
return Status.OK;
} catch (TException e) {
needReturn = false;
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to check remote path exist: " + remotePath
+ ", broker: " + BrokerUtil.printBroker(name, address)
+ ". msg: " + e.getMessage());
} finally {
if (needReturn) {
ClientPool.brokerPool.returnObject(address, client);
} else {
ClientPool.brokerPool.invalidateObject(address, client);
}
}
}
@Override
public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) {
if (LOG.isDebugEnabled()) {
LOG.debug("download from {} to {}, file size: {}.", remoteFilePath, localFilePath, fileSize);
}
long start = System.currentTimeMillis();
// 1. get a proper broker
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker();
if (pair == null) {
return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client");
}
TPaloBrokerService.Client client = pair.first;
TNetworkAddress address = pair.second;
// 2. open file reader with broker
TBrokerFD fd = new TBrokerFD();
Status opStatus = operations.openReader(OpParams.of(client, address, remoteFilePath, fd));
if (!opStatus.ok()) {
return opStatus;
}
LOG.info("finished to open reader. fd: {}. download {} to {}.",
fd, remoteFilePath, localFilePath);
Preconditions.checkNotNull(fd);
// 3. delete local file if exist
File localFile = new File(localFilePath);
if (localFile.exists()) {
try {
Files.walk(Paths.get(localFilePath), FileVisitOption.FOLLOW_LINKS)
.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
} catch (IOException e) {
return new Status(Status.ErrCode.COMMON_ERROR, "failed to delete exist local file: " + localFilePath);
}
}
// 4. create local file
Status status = Status.OK;
try {
if (!localFile.createNewFile()) {
return new Status(Status.ErrCode.COMMON_ERROR, "failed to create local file: " + localFilePath);
}
} catch (IOException e) {
return new Status(Status.ErrCode.COMMON_ERROR, "failed to create local file: "
+ localFilePath + ", msg: " + e.getMessage());
}
// 5. read remote file with broker and write to local
String lastErrMsg = null;
try (BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(localFile))) {
final long bufSize = 1024 * 1024; // 1MB
long leftSize = fileSize;
long readOffset = 0;
while (leftSize > 0) {
long readLen = Math.min(leftSize, bufSize);
TBrokerReadResponse rep = null;
// We only retry if we encounter a timeout thrift exception.
int tryTimes = 0;
while (tryTimes < 3) {
try {
TBrokerPReadRequest req = new TBrokerPReadRequest(TBrokerVersion.VERSION_ONE,
fd, readOffset, readLen);
rep = client.pread(req);
if (rep.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) {
// pread return failure.
lastErrMsg = String.format("failed to read via broker %s. "
+ "current read offset: %d, read length: %d,"
+ " file size: %d, file: %s, err code: %d, msg: %s",
BrokerUtil.printBroker(name, address),
readOffset, readLen, fileSize,
remoteFilePath, rep.getOpStatus().getStatusCode().getValue(),
rep.getOpStatus().getMessage());
LOG.warn(lastErrMsg);
status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg);
}
if (rep.opStatus.statusCode != TBrokerOperationStatusCode.END_OF_FILE) {
if (LOG.isDebugEnabled()) {
LOG.debug("download. readLen: {}, read data len: {}, left size:{}. total size: {}",
readLen, rep.getData().length, leftSize, fileSize);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("read eof: " + remoteFilePath);
}
}
break;
} catch (TTransportException e) {
if (e.getType() == TTransportException.TIMED_OUT) {
// we only retry when we encounter timeout exception.
lastErrMsg = String.format("failed to read via broker %s. "
+ "current read offset: %d, read length: %d,"
+ " file size: %d, file: %s, timeout.",
BrokerUtil.printBroker(name, address),
readOffset, readLen, fileSize,
remoteFilePath);
tryTimes++;
continue;
}
lastErrMsg = String.format("failed to read via broker %s. "
+ "current read offset: %d, read length: %d,"
+ " file size: %d, file: %s. msg: %s",
BrokerUtil.printBroker(name, address),
readOffset, readLen, fileSize,
remoteFilePath, e.getMessage());
LOG.warn(lastErrMsg);
status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg);
break;
} catch (TException e) {
lastErrMsg = String.format("failed to read via broker %s. "
+ "current read offset: %d, read length: %d,"
+ " file size: %d, file: %s. msg: %s",
BrokerUtil.printBroker(name, address),
readOffset, readLen, fileSize,
remoteFilePath, e.getMessage());
LOG.warn(lastErrMsg);
status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg);
break;
}
} // end of retry loop
if (status.ok() && tryTimes < 3) {
// read succeed, write to local file
Preconditions.checkNotNull(rep);
// NOTICE(cmy): Sometimes the actual read length does not equal to the expected read length,
// even if the broker's read buffer size is large enough.
// I don't know why, but have to adapt to it.
if (rep.getData().length != readLen) {
LOG.warn("the actual read length does not equal to "
+ "the expected read length: {} vs. {}, file: {}, broker: {}",
rep.getData().length, readLen, remoteFilePath,
BrokerUtil.printBroker(name, address));
}
out.write(rep.getData());
readOffset += rep.getData().length;
leftSize -= rep.getData().length;
} else {
status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg);
break;
}
} // end of reading remote file
} catch (IOException e) {
return new Status(Status.ErrCode.COMMON_ERROR, "Got exception: " + e.getMessage() + ", broker: "
+ BrokerUtil.printBroker(name, address));
} finally {
// close broker reader
Status closeStatus = operations.closeReader(OpParams.of(client, address, fd));
if (!closeStatus.ok()) {
LOG.warn(closeStatus.getErrMsg());
if (status.ok()) {
// we return close write error only if no other error has been encountered.
status = closeStatus;
}
ClientPool.brokerPool.invalidateObject(address, client);
} else {
ClientPool.brokerPool.returnObject(address, client);
}
}
LOG.info("finished to download from {} to {} with size: {}. cost {} ms",
remoteFilePath, localFilePath, fileSize, (System.currentTimeMillis() - start));
return status;
}
// directly upload the content to remote file
@Override
public Status directUpload(String content, String remoteFile) {
// 1. get a proper broker
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker();
if (pair == null) {
return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client");
}
TPaloBrokerService.Client client = pair.first;
TNetworkAddress address = pair.second;
TBrokerFD fd = new TBrokerFD();
Status status = Status.OK;
try {
// 2. open file writer with broker
status = operations.openWriter(OpParams.of(client, address, remoteFile, fd));
if (!status.ok()) {
return status;
}
// 3. write content
try {
ByteBuffer bb = ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8));
TBrokerPWriteRequest req = new TBrokerPWriteRequest(TBrokerVersion.VERSION_ONE, fd, 0, bb);
TBrokerOperationStatus opst = client.pwrite(req);
if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) {
// pwrite return failure.
status = new Status(Status.ErrCode.COMMON_ERROR, "write failed: " + opst.getMessage()
+ ", broker: " + BrokerUtil.printBroker(name, address));
}
} catch (TException e) {
status = new Status(Status.ErrCode.BAD_CONNECTION, "write exception: " + e.getMessage()
+ ", broker: " + BrokerUtil.printBroker(name, address));
}
} finally {
Status closeStatus = operations.closeWriter(OpParams.of(client, address, fd));
if (closeStatus.getErrCode() == Status.ErrCode.BAD_CONNECTION
|| status.getErrCode() == Status.ErrCode.BAD_CONNECTION) {
ClientPool.brokerPool.invalidateObject(address, client);
} else {
ClientPool.brokerPool.returnObject(address, client);
}
}
return status;
}
@Override
public Status upload(String localPath, String remotePath) {
long start = System.currentTimeMillis();
// 1. get a proper broker
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker();
if (pair == null) {
return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client");
}
TPaloBrokerService.Client client = pair.first;
TNetworkAddress address = pair.second;
// 2. open file write with broker
TBrokerFD fd = new TBrokerFD();
Status status = operations.openWriter(OpParams.of(client, address, remotePath, fd));
if (!status.ok()) {
return status;
}
// 3. read local file and write to remote with broker
File localFile = new File(localPath);
long fileLength = localFile.length();
byte[] readBuf = new byte[1024];
try (BufferedInputStream in = new BufferedInputStream(new FileInputStream(localFile))) {
// save the last err msg
String lastErrMsg = null;
// save the current write offset of remote file
long writeOffset = 0;
// read local file, 1MB at a time
int bytesRead;
while ((bytesRead = in.read(readBuf)) != -1) {
ByteBuffer bb = ByteBuffer.wrap(readBuf, 0, bytesRead);
// We only retry if we encounter a timeout thrift exception.
int tryTimes = 0;
while (tryTimes < 3) {
try {
TBrokerPWriteRequest req
= new TBrokerPWriteRequest(TBrokerVersion.VERSION_ONE, fd, writeOffset, bb);
TBrokerOperationStatus opst = client.pwrite(req);
if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) {
// pwrite return failure.
lastErrMsg = String.format("failed to write via broker %s. "
+ "current write offset: %d, write length: %d,"
+ " file length: %d, file: %s, err code: %d, msg: %s",
BrokerUtil.printBroker(name, address),
writeOffset, bytesRead, fileLength,
remotePath, opst.getStatusCode().getValue(), opst.getMessage());
LOG.warn(lastErrMsg);
status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg);
}
break;
} catch (TTransportException e) {
if (e.getType() == TTransportException.TIMED_OUT) {
// we only retry when we encounter timeout exception.
lastErrMsg = String.format("failed to write via broker %s. "
+ "current write offset: %d, write length: %d,"
+ " file length: %d, file: %s. timeout",
BrokerUtil.printBroker(name, address),
writeOffset, bytesRead, fileLength,
remotePath);
tryTimes++;
continue;
}
lastErrMsg = String.format("failed to write via broker %s. "
+ "current write offset: %d, write length: %d,"
+ " file length: %d, file: %s. encounter TTransportException: %s",
BrokerUtil.printBroker(name, address),
writeOffset, bytesRead, fileLength,
remotePath, e.getMessage());
LOG.warn(lastErrMsg, e);
status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg);
break;
} catch (TException e) {
lastErrMsg = String.format("failed to write via broker %s. "
+ "current write offset: %d, write length: %d,"
+ " file length: %d, file: %s. encounter TException: %s",
BrokerUtil.printBroker(name, address),
writeOffset, bytesRead, fileLength,
remotePath, e.getMessage());
LOG.warn(lastErrMsg, e);
status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg);
break;
}
}
if (status.ok() && tryTimes < 3) {
// write succeed, update current write offset
writeOffset += bytesRead;
} else {
status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg);
break;
}
} // end of read local file loop
} catch (FileNotFoundException e1) {
return new Status(Status.ErrCode.COMMON_ERROR, "encounter file not found exception: " + e1.getMessage()
+ ", broker: " + BrokerUtil.printBroker(name, address));
} catch (IOException e1) {
return new Status(Status.ErrCode.COMMON_ERROR, "encounter io exception: " + e1.getMessage()
+ ", broker: " + BrokerUtil.printBroker(name, address));
} finally {
// close write
Status closeStatus = operations.closeWriter(OpParams.of(client, address, fd));
if (!closeStatus.ok()) {
LOG.warn(closeStatus.getErrMsg());
if (status.ok()) {
// we return close write error only if no other error has been encountered.
status = closeStatus;
}
ClientPool.brokerPool.invalidateObject(address, client);
} else {
ClientPool.brokerPool.returnObject(address, client);
}
}
if (status.ok()) {
LOG.info("finished to upload {} to remote path {}. cost: {} ms",
localPath, remotePath, (System.currentTimeMillis() - start));
}
return status;
}
@Override
public Status rename(String origFilePath, String destFilePath) {
long start = System.currentTimeMillis();
// 1. get a proper broker
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker();
if (pair == null) {
return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client");
}
TPaloBrokerService.Client client = pair.first;
TNetworkAddress address = pair.second;
// 2. rename
boolean needReturn = true;
try {
TBrokerRenamePathRequest req = new TBrokerRenamePathRequest(TBrokerVersion.VERSION_ONE,
origFilePath, destFilePath, properties);
TBrokerOperationStatus ost = client.renamePath(req);
if (ost.getStatusCode() != TBrokerOperationStatusCode.OK) {
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to rename " + origFilePath + " to " + destFilePath + ", msg: " + ost.getMessage()
+ ", broker: " + BrokerUtil.printBroker(name, address));
}
} catch (TException e) {
needReturn = false;
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to rename " + origFilePath + " to " + destFilePath + ", msg: " + e.getMessage()
+ ", broker: " + BrokerUtil.printBroker(name, address));
} finally {
if (needReturn) {
ClientPool.brokerPool.returnObject(address, client);
} else {
ClientPool.brokerPool.invalidateObject(address, client);
}
}
LOG.info("finished to rename {} to {}. cost: {} ms",
origFilePath, destFilePath, (System.currentTimeMillis() - start));
return Status.OK;
}
@Override
public Status delete(String remotePath) {
// get a proper broker
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker();
if (pair == null) {
return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client");
}
TPaloBrokerService.Client client = pair.first;
TNetworkAddress address = pair.second;
// delete
boolean needReturn = true;
try {
TBrokerDeletePathRequest req = new TBrokerDeletePathRequest(TBrokerVersion.VERSION_ONE,
remotePath, properties);
TBrokerOperationStatus opst = client.deletePath(req);
if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) {
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to delete remote path: " + remotePath + ". msg: " + opst.getMessage()
+ ", broker: " + BrokerUtil.printBroker(name, address));
}
LOG.info("finished to delete remote path {}.", remotePath);
} catch (TException e) {
needReturn = false;
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to delete remote path: " + remotePath + ". msg: " + e.getMessage()
+ ", broker: " + BrokerUtil.printBroker(name, address));
} finally {
if (needReturn) {
ClientPool.brokerPool.returnObject(address, client);
} else {
ClientPool.brokerPool.invalidateObject(address, client);
}
}
return Status.OK;
}
@Override
public Status listFiles(String remotePath, boolean recursive, List<RemoteFile> result) {
// get a proper broker
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker();
if (pair == null) {
return new Status(Status.ErrCode.BAD_CONNECTION, "failed to get broker client");
}
TPaloBrokerService.Client client = pair.first;
TNetworkAddress address = pair.second;
// invoke broker 'listLocatedFiles' interface
boolean needReturn = true;
try {
TBrokerListPathRequest req = new TBrokerListPathRequest(TBrokerVersion.VERSION_ONE, remotePath,
recursive, properties);
req.setOnlyFiles(true);
TBrokerListResponse response = client.listLocatedFiles(req);
TBrokerOperationStatus operationStatus = response.getOpStatus();
if (operationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) {
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to listLocatedFiles, remote path: " + remotePath + ". msg: "
+ operationStatus.getMessage() + ", broker: " + BrokerUtil.printBroker(name, address));
}
List<TBrokerFileStatus> fileStatus = response.getFiles();
for (TBrokerFileStatus tFile : fileStatus) {
org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(tFile.path);
RemoteFile file = new RemoteFile(path.getName(), path, !tFile.isDir, tFile.isDir, tFile.size,
tFile.getBlockSize(), tFile.getModificationTime(), null /* blockLocations is null*/);
result.add(file);
}
LOG.info("finished to listLocatedFiles, remote path {}. get files: {}", remotePath, result);
return Status.OK;
} catch (TException e) {
needReturn = false;
return new Status(Status.ErrCode.COMMON_ERROR, "failed to listLocatedFiles, remote path: "
+ remotePath + ". msg: " + e.getMessage() + ", broker: " + BrokerUtil.printBroker(name, address));
} finally {
if (needReturn) {
ClientPool.brokerPool.returnObject(address, client);
} else {
ClientPool.brokerPool.invalidateObject(address, client);
}
}
}
public boolean isSplittable(String remotePath, String inputFormat) throws UserException {
// get a proper broker
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker();
if (pair == null) {
throw new UserException("failed to get broker client");
}
TPaloBrokerService.Client client = pair.first;
TNetworkAddress address = pair.second;
// invoke 'isSplittable' interface
boolean needReturn = true;
try {
TBrokerIsSplittableRequest req = new TBrokerIsSplittableRequest().setVersion(TBrokerVersion.VERSION_ONE)
.setPath(remotePath).setInputFormat(inputFormat).setProperties(properties);
TBrokerIsSplittableResponse response = client.isSplittable(req);
TBrokerOperationStatus operationStatus = response.getOpStatus();
if (operationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) {
throw new UserException("failed to get path isSplittable, remote path: " + remotePath + ". msg: "
+ operationStatus.getMessage() + ", broker: " + BrokerUtil.printBroker(name, address));
}
boolean result = response.isSplittable();
LOG.info("finished to get path isSplittable, remote path {} with format {}, isSplittable: {}",
remotePath, inputFormat, result);
return result;
} catch (TException e) {
needReturn = false;
throw new UserException("failed to get path isSplittable, remote path: "
+ remotePath + ". msg: " + e.getMessage() + ", broker: " + BrokerUtil.printBroker(name, address));
} finally {
if (needReturn) {
ClientPool.brokerPool.returnObject(address, client);
} else {
ClientPool.brokerPool.invalidateObject(address, client);
}
}
}
// List files in remotePath
@Override
public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
// get a proper broker
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker();
if (pair == null) {
return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client");
}
TPaloBrokerService.Client client = pair.first;
TNetworkAddress address = pair.second;
// list
boolean needReturn = true;
try {
TBrokerListPathRequest req = new TBrokerListPathRequest(TBrokerVersion.VERSION_ONE, remotePath,
false /* not recursive */, properties);
req.setFileNameOnly(fileNameOnly);
TBrokerListResponse rep = client.listPath(req);
TBrokerOperationStatus opst = rep.getOpStatus();
if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) {
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to list remote path: " + remotePath + ". msg: " + opst.getMessage()
+ ", broker: " + BrokerUtil.printBroker(name, address));
}
List<TBrokerFileStatus> fileStatus = rep.getFiles();
for (TBrokerFileStatus tFile : fileStatus) {
RemoteFile file = new RemoteFile(tFile.path, !tFile.isDir, tFile.size, 0, tFile.getModificationTime());
result.add(file);
}
LOG.info("finished to list remote path {}. get files: {}", remotePath, result);
} catch (TException e) {
needReturn = false;
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to list remote path: " + remotePath + ". msg: " + e.getMessage()
+ ", broker: " + BrokerUtil.printBroker(name, address));
} finally {
if (needReturn) {
ClientPool.brokerPool.returnObject(address, client);
} else {
ClientPool.brokerPool.invalidateObject(address, client);
}
}
return Status.OK;
}
@Override
public Status makeDir(String remotePath) {
return new Status(Status.ErrCode.COMMON_ERROR, "mkdir is not implemented.");
}
}