CosObjStorage.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs.obj;
import org.apache.doris.cloud.storage.ObjectInfoAdapter;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.property.storage.COSProperties;
import com.google.common.collect.ImmutableList;
import com.qcloud.cos.exception.CosClientException;
import com.qcloud.cos.exception.CosServiceException;
import com.qcloud.cos.http.HttpMethodName;
import com.qcloud.cos.model.COSObjectSummary;
import com.qcloud.cos.model.ListObjectsRequest;
import com.qcloud.cos.model.ObjectListing;
import com.qcloud.cos.model.ObjectMetadata;
import com.tencentcloudapi.common.Credential;
import com.tencentcloudapi.common.profile.ClientProfile;
import com.tencentcloudapi.common.profile.HttpProfile;
import com.tencentcloudapi.sts.v20180813.StsClient;
import com.tencentcloudapi.sts.v20180813.models.AssumeRoleRequest;
import com.tencentcloudapi.sts.v20180813.models.AssumeRoleResponse;
import com.tencentcloudapi.sts.v20180813.models.Credentials;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
/**
* Tencent Cloud COS-specific {@link ObjStorage} implementation.
*
* <p>Inherits generic CRUD from {@link S3ObjStorage}; overrides list/head/STS/presigned
* with COS native SDK. COS V1 list uses marker-based pagination (not continuation token).
*/
public class CosObjStorage extends S3ObjStorage {
private static final Logger LOG = LogManager.getLogger(CosObjStorage.class);
private static final long SESSION_EXPIRE_SECONDS = 3600L;
private final COSProperties cosProperties;
public CosObjStorage(COSProperties properties) {
super(properties);
this.cosProperties = properties;
}
// ----------------------------------------------------------------
// STS: Tencent Cloud STS AssumeRole
// ----------------------------------------------------------------
@Override
public Triple<String, String, String> getStsToken() throws DdlException {
String roleArn = cosProperties.getOrigProps().get(ObjectInfoAdapter.STS_ROLE_ARN_KEY);
String externalId = cosProperties.getOrigProps().get(ObjectInfoAdapter.STS_EXTERNAL_ID_KEY);
if (roleArn == null || roleArn.isEmpty()) {
throw new DdlException("COS STS role ARN is not configured");
}
ClientProfile clientProfile = null;
if (Config.enable_sts_vpc) {
HttpProfile httpProfile = new HttpProfile();
httpProfile.setEndpoint("sts.internal.tencentcloudapi.com");
clientProfile = new ClientProfile();
clientProfile.setHttpProfile(httpProfile);
}
Credential credential = new Credential(cosProperties.getAccessKey(), cosProperties.getSecretKey());
StsClient stsClient = (clientProfile != null)
? new StsClient(credential, cosProperties.getRegion(), clientProfile)
: new StsClient(credential, cosProperties.getRegion());
AssumeRoleRequest req = new AssumeRoleRequest();
req.setRoleArn(roleArn);
req.setDurationSeconds((long) ObjectInfoAdapter.getDurationSeconds());
req.setRoleSessionName(ObjectInfoAdapter.getNewRoleSessionName());
if (externalId != null && !externalId.isEmpty()) {
req.setExternalId(externalId);
}
try {
AssumeRoleResponse resp = stsClient.AssumeRole(req);
Credentials cred = resp.getCredentials();
return Triple.of(cred.getTmpSecretId(), cred.getTmpSecretKey(), cred.getToken());
} catch (Exception e) {
LOG.warn("Failed to get COS STS token, roleArn={}", roleArn, e);
throw new DdlException("Failed to get COS STS token: " + e.getMessage());
}
}
// ----------------------------------------------------------------
// Presigned URL: COS native SDK
// ----------------------------------------------------------------
@Override
public String getPresignedUrl(String objectKey) throws IOException {
try (CosNativeClient nativeClient = new CosNativeClient(cosProperties)) {
Date expiry = new Date(System.currentTimeMillis() + SESSION_EXPIRE_SECONDS * 1000L);
URL url = nativeClient.get().generatePresignedUrl(
cosProperties.getBucket(), objectKey, expiry,
HttpMethodName.PUT, new HashMap<>(), new HashMap<>());
LOG.info("Generated COS presigned URL for key={}", objectKey);
return url.toString();
} catch (CosClientException e) {
throw new IOException("Failed to generate COS presigned URL: " + e.getMessage(), e);
}
}
// ----------------------------------------------------------------
// listObjectsWithPrefix: COS V1 list (marker-based pagination)
// ----------------------------------------------------------------
@Override
public ListObjectsResult listObjectsWithPrefix(
String prefix, String subPrefix, String continuationToken) throws IOException {
String fullPrefix = normalizeAndCombinePrefix(prefix, subPrefix);
try (CosNativeClient nativeClient = new CosNativeClient(cosProperties)) {
ListObjectsRequest req = new ListObjectsRequest();
req.setBucketName(cosProperties.getBucket());
req.setPrefix(fullPrefix);
req.setMaxKeys(1000);
if (!StringUtils.isEmpty(continuationToken)) {
// COS V1 uses marker for pagination, not continuationToken
req.setMarker(continuationToken);
}
ObjectListing listing = nativeClient.get().listObjects(req);
List<ObjectFile> files = new ArrayList<>();
for (COSObjectSummary s : listing.getObjectSummaries()) {
files.add(new ObjectFile(s.getKey(),
getRelativePathSafe(prefix, s.getKey()),
s.getETag(), s.getSize()));
}
// COS V1 next page marker
String nextToken = listing.isTruncated() ? listing.getNextMarker() : null;
return new ListObjectsResult(files, listing.isTruncated(), nextToken);
} catch (CosClientException e) {
LOG.warn("Failed to list COS objects, prefix={}", fullPrefix, e);
throw new IOException("Failed to list COS objects: " + e.getMessage(), e);
}
}
// ----------------------------------------------------------------
// headObjectWithMeta: COS getObjectMetadata (handles 404)
// ----------------------------------------------------------------
@Override
public ListObjectsResult headObjectWithMeta(String prefix, String subKey) throws IOException {
String fullKey = normalizeAndCombinePrefix(prefix, subKey);
try (CosNativeClient nativeClient = new CosNativeClient(cosProperties)) {
ObjectMetadata metadata = nativeClient.get().getObjectMetadata(
cosProperties.getBucket(), fullKey);
ObjectFile of = new ObjectFile(fullKey,
getRelativePathSafe(prefix, fullKey),
metadata.getETag(), metadata.getContentLength());
return new ListObjectsResult(ImmutableList.of(of), false, null);
} catch (CosServiceException e) {
if (e.getStatusCode() == 404) {
LOG.warn("Key not found in COS headObjectWithMeta, key={}", fullKey);
return new ListObjectsResult(ImmutableList.of(), false, null);
}
throw new IOException("Failed to head COS object: " + e.getMessage(), e);
} catch (CosClientException e) {
throw new IOException("Failed to head COS object: " + e.getMessage(), e);
}
}
}