Coverage Report

Created: 2026-04-10 06:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exprs/function/function_file.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include <cstring>
19
#include <memory>
20
#include <optional>
21
#include <string>
22
#include <string_view>
23
#include <vector>
24
25
#include "common/cast_set.h"
26
#include "common/status.h"
27
#include "core/assert_cast.h"
28
#include "core/block/block.h"
29
#include "core/block/column_numbers.h"
30
#include "core/block/column_with_type_and_name.h"
31
#include "core/column/column_file.h"
32
#include "core/column/column_nullable.h"
33
#include "core/column/column_string.h"
34
#include "core/data_type/data_type_file.h"
35
#include "core/data_type/file_schema_descriptor.h"
36
#include "exprs/function/function.h"
37
#include "exprs/function/simple_function_factory.h"
38
#include "io/fs/obj_storage_client.h"
39
#include "util/jsonb_writer.h"
40
#include "util/s3_uri.h"
41
#include "util/s3_util.h"
42
43
namespace doris {
44
45
class FunctionToFile : public IFunction {
46
public:
47
    static constexpr auto name = "to_file";
48
49
16
    static FunctionPtr create() { return std::make_shared<FunctionToFile>(); }
50
51
1
    String get_name() const override { return name; }
52
53
8
    bool is_variadic() const override { return false; }
54
55
7
    size_t get_number_of_arguments() const override { return 5; }
56
57
7
    DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
58
7
        return std::make_shared<DataTypeFile>();
59
7
    }
60
61
    Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
62
7
                        uint32_t result, size_t input_rows_count) const override {
63
7
        DCHECK_EQ(arguments.size(), 5);
64
65
7
        ColumnPtr uri_holder, region_holder, endpoint_holder, ak_holder, sk_holder;
66
7
        const ColumnString* uri_col =
67
7
                _unwrap_string_column(block.get_by_position(arguments[0]), uri_holder);
68
7
        const ColumnString* region_col =
69
7
                _unwrap_string_column(block.get_by_position(arguments[1]), region_holder);
70
7
        const ColumnString* endpoint_col =
71
7
                _unwrap_string_column(block.get_by_position(arguments[2]), endpoint_holder);
72
7
        const ColumnString* ak_col =
73
7
                _unwrap_string_column(block.get_by_position(arguments[3]), ak_holder);
74
7
        const ColumnString* sk_col =
75
7
                _unwrap_string_column(block.get_by_position(arguments[4]), sk_holder);
76
77
7
        using S = FileSchemaDescriptor;
78
7
        const auto& schema = S::instance();
79
7
        auto result_col = ColumnFile::create(schema);
80
7
        auto& jsonb_col = assert_cast<ColumnString&>(result_col->get_jsonb_column());
81
7
        jsonb_col.reserve(input_rows_count);
82
7
        JsonbWriter writer;
83
84
9
        for (size_t row = 0; row < input_rows_count; ++row) {
85
7
            std::string uri = uri_col->get_data_at(row).to_string();
86
7
            std::string region = region_col->get_data_at(row).to_string();
87
7
            std::string endpoint = endpoint_col->get_data_at(row).to_string();
88
7
            std::string ak = ak_col->get_data_at(row).to_string();
89
7
            std::string sk = sk_col->get_data_at(row).to_string();
90
7
            std::string file_name = S::extract_file_name(uri);
91
7
            std::string content_type =
92
7
                    S::extension_to_content_type(S::extract_file_extension(file_name));
93
94
            // Ensure endpoint has http:// prefix for S3 SDK.
95
7
            std::string normalized_endpoint = _normalize_endpoint(endpoint);
96
97
            // Validate the object exists via HEAD request and get actual size.
98
7
            S3ClientConf s3_conf;
99
7
            s3_conf.endpoint = normalized_endpoint;
100
7
            s3_conf.region = region;
101
7
            s3_conf.ak = ak;
102
7
            s3_conf.sk = sk;
103
7
            auto s3_client = S3ClientFactory::instance().create(s3_conf);
104
7
            if (!s3_client) {
105
0
                return Status::InternalError(
106
0
                        "to_file: failed to create S3 client for endpoint '{}'", endpoint);
107
0
            }
108
            // Normalize oss:// etc. to s3:// for S3URI parser and storage.
109
7
            std::string normalized_uri = _normalize_uri_scheme(uri);
110
7
            S3URI s3_uri(normalized_uri);
111
7
            RETURN_IF_ERROR(s3_uri.parse());
112
7
            auto head_resp = s3_client->head_object(
113
7
                    {.bucket = s3_uri.get_bucket(), .key = s3_uri.get_key()});
114
7
            if (head_resp.resp.status.code != 0) {
115
5
                return Status::InvalidArgument("to_file: object '{}' is not accessible: {}", uri,
116
5
                                               head_resp.resp.status.msg);
117
5
            }
118
2
            int64_t file_size = head_resp.file_size;
119
120
2
            writer.reset();
121
2
            FileMetadata metadata {
122
2
                    .uri = normalized_uri,
123
2
                    .file_name = file_name,
124
2
                    .content_type = content_type,
125
2
                    .size = file_size,
126
2
                    .region = region,
127
2
                    .endpoint = normalized_endpoint,
128
2
                    .ak = ak,
129
2
                    .sk = sk,
130
2
                    .role_arn = {},
131
2
                    .external_id = {},
132
2
            };
133
2
            S::write_file_jsonb(writer, metadata);
134
2
            jsonb_col.insert_data(writer.getOutput()->getBuffer(), writer.getOutput()->getSize());
135
2
        }
136
2
        block.replace_by_position(result, std::move(result_col));
137
2
        return Status::OK();
138
7
    }
139
140
private:
141
    static const ColumnString* _unwrap_string_column(const ColumnWithTypeAndName& col_with_type,
142
35
                                                     ColumnPtr& holder) {
143
35
        holder = col_with_type.column->convert_to_full_column_if_const();
144
35
        if (const auto* nullable = check_and_get_column<ColumnNullable>(holder.get())) {
145
0
            return &assert_cast<const ColumnString&>(nullable->get_nested_column());
146
0
        }
147
35
        return &assert_cast<const ColumnString&>(*holder);
148
35
    }
149
150
    // Ensure endpoint has http:// scheme prefix.
151
7
    static std::string _normalize_endpoint(const std::string& endpoint) {
152
7
        if (endpoint.substr(0, 7) == "http://" || endpoint.substr(0, 8) == "https://") {
153
1
            return endpoint;
154
1
        }
155
6
        return "http://" + endpoint;
156
7
    }
157
158
    // Normalize oss:// etc. to s3:// for S3URI parser and storage.
159
7
    static std::string _normalize_uri_scheme(const std::string& uri) {
160
7
        if (uri.substr(0, 6) == "oss://") {
161
0
            return "s3://" + uri.substr(6);
162
0
        }
163
7
        if (uri.substr(0, 6) == "cos://") {
164
0
            return "s3://" + uri.substr(6);
165
0
        }
166
7
        if (uri.substr(0, 6) == "obs://") {
167
0
            return "s3://" + uri.substr(6);
168
0
        }
169
7
        return uri;
170
7
    }
171
};
172
173
8
void register_function_file(SimpleFunctionFactory& factory) {
174
8
    factory.register_function<FunctionToFile>();
175
8
}
176
177
} // namespace doris