Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include <cstring> |
19 | | #include <memory> |
20 | | #include <optional> |
21 | | #include <string> |
22 | | #include <string_view> |
23 | | #include <vector> |
24 | | |
25 | | #include "common/cast_set.h" |
26 | | #include "common/status.h" |
27 | | #include "core/assert_cast.h" |
28 | | #include "core/block/block.h" |
29 | | #include "core/block/column_numbers.h" |
30 | | #include "core/block/column_with_type_and_name.h" |
31 | | #include "core/column/column_file.h" |
32 | | #include "core/column/column_nullable.h" |
33 | | #include "core/column/column_string.h" |
34 | | #include "core/data_type/data_type_file.h" |
35 | | #include "core/data_type/file_schema_descriptor.h" |
36 | | #include "exprs/function/function.h" |
37 | | #include "exprs/function/simple_function_factory.h" |
38 | | #include "io/fs/obj_storage_client.h" |
39 | | #include "util/jsonb_writer.h" |
40 | | #include "util/s3_uri.h" |
41 | | #include "util/s3_util.h" |
42 | | |
43 | | namespace doris { |
44 | | |
45 | | class FunctionToFile : public IFunction { |
46 | | public: |
47 | | static constexpr auto name = "to_file"; |
48 | | |
49 | 16 | static FunctionPtr create() { return std::make_shared<FunctionToFile>(); } |
50 | | |
51 | 1 | String get_name() const override { return name; } |
52 | | |
53 | 8 | bool is_variadic() const override { return false; } |
54 | | |
55 | 7 | size_t get_number_of_arguments() const override { return 5; } |
56 | | |
57 | 7 | DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { |
58 | 7 | return std::make_shared<DataTypeFile>(); |
59 | 7 | } |
60 | | |
61 | | Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, |
62 | 7 | uint32_t result, size_t input_rows_count) const override { |
63 | 7 | DCHECK_EQ(arguments.size(), 5); |
64 | | |
65 | 7 | ColumnPtr uri_holder, region_holder, endpoint_holder, ak_holder, sk_holder; |
66 | 7 | const ColumnString* uri_col = |
67 | 7 | _unwrap_string_column(block.get_by_position(arguments[0]), uri_holder); |
68 | 7 | const ColumnString* region_col = |
69 | 7 | _unwrap_string_column(block.get_by_position(arguments[1]), region_holder); |
70 | 7 | const ColumnString* endpoint_col = |
71 | 7 | _unwrap_string_column(block.get_by_position(arguments[2]), endpoint_holder); |
72 | 7 | const ColumnString* ak_col = |
73 | 7 | _unwrap_string_column(block.get_by_position(arguments[3]), ak_holder); |
74 | 7 | const ColumnString* sk_col = |
75 | 7 | _unwrap_string_column(block.get_by_position(arguments[4]), sk_holder); |
76 | | |
77 | 7 | using S = FileSchemaDescriptor; |
78 | 7 | const auto& schema = S::instance(); |
79 | 7 | auto result_col = ColumnFile::create(schema); |
80 | 7 | auto& jsonb_col = assert_cast<ColumnString&>(result_col->get_jsonb_column()); |
81 | 7 | jsonb_col.reserve(input_rows_count); |
82 | 7 | JsonbWriter writer; |
83 | | |
84 | 9 | for (size_t row = 0; row < input_rows_count; ++row) { |
85 | 7 | std::string uri = uri_col->get_data_at(row).to_string(); |
86 | 7 | std::string region = region_col->get_data_at(row).to_string(); |
87 | 7 | std::string endpoint = endpoint_col->get_data_at(row).to_string(); |
88 | 7 | std::string ak = ak_col->get_data_at(row).to_string(); |
89 | 7 | std::string sk = sk_col->get_data_at(row).to_string(); |
90 | 7 | std::string file_name = S::extract_file_name(uri); |
91 | 7 | std::string content_type = |
92 | 7 | S::extension_to_content_type(S::extract_file_extension(file_name)); |
93 | | |
94 | | // Ensure endpoint has http:// prefix for S3 SDK. |
95 | 7 | std::string normalized_endpoint = _normalize_endpoint(endpoint); |
96 | | |
97 | | // Validate the object exists via HEAD request and get actual size. |
98 | 7 | S3ClientConf s3_conf; |
99 | 7 | s3_conf.endpoint = normalized_endpoint; |
100 | 7 | s3_conf.region = region; |
101 | 7 | s3_conf.ak = ak; |
102 | 7 | s3_conf.sk = sk; |
103 | 7 | auto s3_client = S3ClientFactory::instance().create(s3_conf); |
104 | 7 | if (!s3_client) { |
105 | 0 | return Status::InternalError( |
106 | 0 | "to_file: failed to create S3 client for endpoint '{}'", endpoint); |
107 | 0 | } |
108 | | // Normalize oss:// etc. to s3:// for S3URI parser and storage. |
109 | 7 | std::string normalized_uri = _normalize_uri_scheme(uri); |
110 | 7 | S3URI s3_uri(normalized_uri); |
111 | 7 | RETURN_IF_ERROR(s3_uri.parse()); |
112 | 7 | auto head_resp = s3_client->head_object( |
113 | 7 | {.bucket = s3_uri.get_bucket(), .key = s3_uri.get_key()}); |
114 | 7 | if (head_resp.resp.status.code != 0) { |
115 | 5 | return Status::InvalidArgument("to_file: object '{}' is not accessible: {}", uri, |
116 | 5 | head_resp.resp.status.msg); |
117 | 5 | } |
118 | 2 | int64_t file_size = head_resp.file_size; |
119 | | |
120 | 2 | writer.reset(); |
121 | 2 | FileMetadata metadata { |
122 | 2 | .uri = normalized_uri, |
123 | 2 | .file_name = file_name, |
124 | 2 | .content_type = content_type, |
125 | 2 | .size = file_size, |
126 | 2 | .region = region, |
127 | 2 | .endpoint = normalized_endpoint, |
128 | 2 | .ak = ak, |
129 | 2 | .sk = sk, |
130 | 2 | .role_arn = {}, |
131 | 2 | .external_id = {}, |
132 | 2 | }; |
133 | 2 | S::write_file_jsonb(writer, metadata); |
134 | 2 | jsonb_col.insert_data(writer.getOutput()->getBuffer(), writer.getOutput()->getSize()); |
135 | 2 | } |
136 | 2 | block.replace_by_position(result, std::move(result_col)); |
137 | 2 | return Status::OK(); |
138 | 7 | } |
139 | | |
140 | | private: |
141 | | static const ColumnString* _unwrap_string_column(const ColumnWithTypeAndName& col_with_type, |
142 | 35 | ColumnPtr& holder) { |
143 | 35 | holder = col_with_type.column->convert_to_full_column_if_const(); |
144 | 35 | if (const auto* nullable = check_and_get_column<ColumnNullable>(holder.get())) { |
145 | 0 | return &assert_cast<const ColumnString&>(nullable->get_nested_column()); |
146 | 0 | } |
147 | 35 | return &assert_cast<const ColumnString&>(*holder); |
148 | 35 | } |
149 | | |
150 | | // Ensure endpoint has http:// scheme prefix. |
151 | 7 | static std::string _normalize_endpoint(const std::string& endpoint) { |
152 | 7 | if (endpoint.substr(0, 7) == "http://" || endpoint.substr(0, 8) == "https://") { |
153 | 1 | return endpoint; |
154 | 1 | } |
155 | 6 | return "http://" + endpoint; |
156 | 7 | } |
157 | | |
158 | | // Normalize oss:// etc. to s3:// for S3URI parser and storage. |
159 | 7 | static std::string _normalize_uri_scheme(const std::string& uri) { |
160 | 7 | if (uri.substr(0, 6) == "oss://") { |
161 | 0 | return "s3://" + uri.substr(6); |
162 | 0 | } |
163 | 7 | if (uri.substr(0, 6) == "cos://") { |
164 | 0 | return "s3://" + uri.substr(6); |
165 | 0 | } |
166 | 7 | if (uri.substr(0, 6) == "obs://") { |
167 | 0 | return "s3://" + uri.substr(6); |
168 | 0 | } |
169 | 7 | return uri; |
170 | 7 | } |
171 | | }; |
172 | | |
173 | 8 | void register_function_file(SimpleFunctionFactory& factory) { |
174 | 8 | factory.register_function<FunctionToFile>(); |
175 | 8 | } |
176 | | |
177 | | } // namespace doris |