be/src/format/table/paimon_cpp_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format/table/paimon_cpp_reader.h" |
19 | | |
20 | | #include <algorithm> |
21 | | #include <mutex> |
22 | | #include <tuple> |
23 | | #include <unordered_map> |
24 | | #include <utility> |
25 | | |
26 | | #include "arrow/c/bridge.h" |
27 | | #include "arrow/record_batch.h" |
28 | | #include "arrow/result.h" |
29 | | #include "core/block/block.h" |
30 | | #include "core/block/column_with_type_and_name.h" |
31 | | #include "format/table/paimon_doris_file_system.h" |
32 | | #include "format/table/partition_column_filler.h" |
33 | | #include "paimon/defs.h" |
34 | | #include "paimon/memory/memory_pool.h" |
35 | | #include "paimon/read_context.h" |
36 | | #include "paimon/table/source/table_read.h" |
37 | | #include "runtime/descriptors.h" |
38 | | #include "runtime/runtime_state.h" |
39 | | #include "util/url_coding.h" |
40 | | |
41 | | namespace doris { |
42 | | |
43 | | namespace { |
44 | | constexpr const char* VALUE_KIND_FIELD = "_VALUE_KIND"; |
45 | | |
46 | | } // namespace |
47 | | |
48 | | PaimonCppReader::PaimonCppReader(const std::vector<SlotDescriptor*>& file_slot_descs, |
49 | | RuntimeState* state, RuntimeProfile* profile, |
50 | | const TFileRangeDesc& range, |
51 | | const TFileScanRangeParams* range_params) |
52 | 2 | : _file_slot_descs(file_slot_descs), |
53 | 2 | _state(state), |
54 | 2 | _profile(profile), |
55 | 2 | _range(range), |
56 | 2 | _range_params(range_params) { |
57 | 2 | TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, _ctzz); |
58 | 2 | if (range.__isset.table_format_params && |
59 | 2 | range.table_format_params.__isset.table_level_row_count) { |
60 | 2 | _remaining_table_level_row_count = range.table_format_params.table_level_row_count; |
61 | 2 | } else { |
62 | 0 | _remaining_table_level_row_count = -1; |
63 | 0 | } |
64 | 2 | } |
65 | | |
66 | 2 | PaimonCppReader::~PaimonCppReader() = default; |
67 | | |
68 | 0 | Status PaimonCppReader::on_before_init_reader(ReaderInitContext* ctx) { |
69 | 0 | _column_descs = ctx->column_descs; |
70 | 0 | _partition_values.clear(); |
71 | 0 | _partition_value_is_null.clear(); |
72 | 0 | if (ctx->range == nullptr || ctx->tuple_descriptor == nullptr || |
73 | 0 | !ctx->range->__isset.columns_from_path_keys) { |
74 | 0 | return Status::OK(); |
75 | 0 | } |
76 | | |
77 | 0 | DORIS_CHECK(ctx->range->__isset.columns_from_path); |
78 | 0 | DORIS_CHECK(ctx->range->__isset.columns_from_path_is_null); |
79 | 0 | DORIS_CHECK(ctx->range->columns_from_path.size() == ctx->range->columns_from_path_keys.size()); |
80 | 0 | DORIS_CHECK(ctx->range->columns_from_path_is_null.size() == |
81 | 0 | ctx->range->columns_from_path_keys.size()); |
82 | |
|
83 | 0 | std::unordered_map<std::string, const SlotDescriptor*> name_to_slot; |
84 | 0 | for (auto* slot : ctx->tuple_descriptor->slots()) { |
85 | 0 | name_to_slot.emplace(slot->col_name(), slot); |
86 | 0 | } |
87 | 0 | for (size_t i = 0; i < ctx->range->columns_from_path_keys.size(); ++i) { |
88 | 0 | const auto& key = ctx->range->columns_from_path_keys[i]; |
89 | 0 | auto slot_it = name_to_slot.find(key); |
90 | 0 | if (slot_it == name_to_slot.end()) { |
91 | 0 | continue; |
92 | 0 | } |
93 | 0 | _partition_values.emplace( |
94 | 0 | key, std::make_tuple(ctx->range->columns_from_path[i], slot_it->second)); |
95 | 0 | _partition_value_is_null.emplace(key, ctx->range->columns_from_path_is_null[i]); |
96 | 0 | } |
97 | 0 | return Status::OK(); |
98 | 0 | } |
99 | | |
100 | 3 | Status PaimonCppReader::on_after_read_block(Block* block, size_t* read_rows) { |
101 | 3 | if (_column_descs == nullptr || _partition_values.empty() || *read_rows == 0 || |
102 | 3 | _push_down_agg_type == TPushAggOp::type::COUNT) { |
103 | 3 | return Status::OK(); |
104 | 3 | } |
105 | 0 | return _fill_partition_columns(block, *read_rows); |
106 | 3 | } |
107 | | |
108 | 2 | Status PaimonCppReader::init_reader() { |
109 | 2 | if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count >= 0) { |
110 | 1 | return Status::OK(); |
111 | 1 | } |
112 | 1 | return _init_paimon_reader(); |
113 | 2 | } |
114 | | |
115 | 3 | Status PaimonCppReader::_do_get_next_block(Block* block, size_t* read_rows, bool* eof) { |
116 | 3 | if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count >= 0) { |
117 | 3 | auto rows = std::min(_remaining_table_level_row_count, |
118 | 3 | (int64_t)_state->query_options().batch_size); |
119 | 3 | _remaining_table_level_row_count -= rows; |
120 | 3 | auto mutable_columns_guard = block->mutate_columns_scoped(); |
121 | 3 | auto& mutate_columns = mutable_columns_guard.mutable_columns(); |
122 | 3 | for (auto& col : mutate_columns) { |
123 | 0 | col->resize(rows); |
124 | 0 | } |
125 | 3 | *read_rows = rows; |
126 | 3 | *eof = false; |
127 | 3 | if (_remaining_table_level_row_count == 0) { |
128 | 2 | *eof = true; |
129 | 2 | } |
130 | 3 | return Status::OK(); |
131 | 3 | } |
132 | | |
133 | 0 | if (!_batch_reader) { |
134 | 0 | return Status::InternalError("paimon-cpp reader is not initialized"); |
135 | 0 | } |
136 | | |
137 | 0 | if (_col_name_to_block_idx.empty()) { |
138 | 0 | _col_name_to_block_idx = block->get_name_to_pos_map(); |
139 | 0 | } |
140 | |
|
141 | 0 | auto batch_result = _batch_reader->NextBatch(); |
142 | 0 | if (!batch_result.ok()) { |
143 | 0 | return Status::InternalError("paimon-cpp read batch failed: {}", |
144 | 0 | batch_result.status().ToString()); |
145 | 0 | } |
146 | 0 | auto batch = std::move(batch_result).value(); |
147 | 0 | if (paimon::BatchReader::IsEofBatch(batch)) { |
148 | 0 | *read_rows = 0; |
149 | 0 | *eof = true; |
150 | 0 | return Status::OK(); |
151 | 0 | } |
152 | | |
153 | 0 | arrow::Result<std::shared_ptr<arrow::RecordBatch>> import_result = |
154 | 0 | arrow::ImportRecordBatch(batch.first.get(), batch.second.get()); |
155 | 0 | if (!import_result.ok()) { |
156 | 0 | return Status::InternalError("failed to import paimon-cpp arrow batch: {}", |
157 | 0 | import_result.status().message()); |
158 | 0 | } |
159 | | |
160 | 0 | auto record_batch = std::move(import_result).ValueUnsafe(); |
161 | 0 | const auto num_rows = static_cast<size_t>(record_batch->num_rows()); |
162 | 0 | const auto num_columns = record_batch->num_columns(); |
163 | 0 | auto columns_guard = block->mutate_columns_scoped(); |
164 | 0 | auto& columns = columns_guard.mutable_columns(); |
165 | 0 | for (int c = 0; c < num_columns; ++c) { |
166 | 0 | const auto& field = record_batch->schema()->field(c); |
167 | 0 | if (field->name() == VALUE_KIND_FIELD) { |
168 | 0 | continue; |
169 | 0 | } |
170 | | |
171 | 0 | auto it = _col_name_to_block_idx.find(field->name()); |
172 | 0 | if (it == _col_name_to_block_idx.end()) { |
173 | | // Skip columns that are not in the block (e.g., partition columns handled elsewhere) |
174 | 0 | continue; |
175 | 0 | } |
176 | 0 | const auto block_pos = it->second; |
177 | 0 | try { |
178 | 0 | RETURN_IF_ERROR(columns_guard.get_datatype_by_position(block_pos) |
179 | 0 | ->get_serde() |
180 | 0 | ->read_column_from_arrow(*columns[block_pos], |
181 | 0 | record_batch->column(c).get(), 0, |
182 | 0 | num_rows, _ctzz)); |
183 | 0 | } catch (Exception& e) { |
184 | 0 | return Status::InternalError("Failed to convert from arrow to block: {}", e.what()); |
185 | 0 | } |
186 | 0 | } |
187 | | |
188 | 0 | *read_rows = num_rows; |
189 | 0 | *eof = false; |
190 | 0 | return Status::OK(); |
191 | 0 | } |
192 | | |
193 | | Status PaimonCppReader::_get_columns_impl( |
194 | 0 | std::unordered_map<std::string, DataTypePtr>* name_to_type) { |
195 | 0 | for (const auto& slot : _file_slot_descs) { |
196 | 0 | name_to_type->emplace(slot->col_name(), slot->type()); |
197 | 0 | } |
198 | 0 | return Status::OK(); |
199 | 0 | } |
200 | | |
201 | 0 | Status PaimonCppReader::_fill_partition_columns(Block* block, size_t num_rows) { |
202 | 0 | if (_col_name_to_block_idx.empty()) { |
203 | 0 | _col_name_to_block_idx = block->get_name_to_pos_map(); |
204 | 0 | } |
205 | |
|
206 | 0 | for (const auto& desc : *_column_descs) { |
207 | 0 | if (desc.category != ColumnCategory::PARTITION_KEY) { |
208 | 0 | continue; |
209 | 0 | } |
210 | 0 | auto value_it = _partition_values.find(desc.name); |
211 | 0 | if (value_it == _partition_values.end()) { |
212 | 0 | continue; |
213 | 0 | } |
214 | 0 | auto col_it = _col_name_to_block_idx.find(desc.name); |
215 | 0 | if (col_it == _col_name_to_block_idx.end()) { |
216 | 0 | return Status::InternalError("Missing partition column {} in block {}", desc.name, |
217 | 0 | block->dump_structure()); |
218 | 0 | } |
219 | | |
220 | 0 | auto& column_with_type_and_name = block->get_by_position(col_it->second); |
221 | 0 | auto mutable_column = std::move(*column_with_type_and_name.column).mutate(); |
222 | 0 | const auto& [value, slot_desc] = value_it->second; |
223 | 0 | auto null_it = _partition_value_is_null.find(desc.name); |
224 | 0 | DORIS_CHECK(null_it != _partition_value_is_null.end()); |
225 | 0 | RETURN_IF_ERROR(fill_partition_column_from_path_value(*mutable_column, *slot_desc, value, |
226 | 0 | num_rows, null_it->second)); |
227 | 0 | column_with_type_and_name.column = std::move(mutable_column); |
228 | 0 | } |
229 | 0 | return Status::OK(); |
230 | 0 | } |
231 | | |
232 | 0 | Status PaimonCppReader::close() { |
233 | 0 | if (_batch_reader) { |
234 | 0 | _batch_reader->Close(); |
235 | 0 | } |
236 | 0 | return Status::OK(); |
237 | 0 | } |
238 | | |
239 | 1 | Status PaimonCppReader::_init_paimon_reader() { |
240 | 1 | register_paimon_doris_file_system(); |
241 | 1 | RETURN_IF_ERROR(_decode_split(&_split)); |
242 | | |
243 | 0 | auto table_path_opt = _resolve_table_path(); |
244 | 0 | if (!table_path_opt.has_value()) { |
245 | 0 | return Status::InternalError( |
246 | 0 | "paimon-cpp missing paimon_table; cannot resolve paimon table root path"); |
247 | 0 | } |
248 | 0 | auto options = _build_options(); |
249 | 0 | auto read_columns = _build_read_columns(); |
250 | | |
251 | | // Avoid moving strings across module boundaries to prevent allocator mismatches in ASAN builds. |
252 | 0 | std::string table_path = table_path_opt.value(); |
253 | 0 | static std::once_flag options_log_once; |
254 | 0 | std::call_once(options_log_once, [&]() { |
255 | 0 | auto has_key = [&](const char* key) { |
256 | 0 | auto it = options.find(key); |
257 | 0 | return (it != options.end() && !it->second.empty()) ? "set" : "empty"; |
258 | 0 | }; |
259 | 0 | auto value_or = [&](const char* key) { |
260 | 0 | auto it = options.find(key); |
261 | 0 | return it != options.end() ? it->second : std::string("<unset>"); |
262 | 0 | }; |
263 | 0 | LOG(INFO) << "paimon-cpp options summary: table_path=" << table_path |
264 | 0 | << " AWS_ACCESS_KEY=" << has_key("AWS_ACCESS_KEY") |
265 | 0 | << " AWS_SECRET_KEY=" << has_key("AWS_SECRET_KEY") |
266 | 0 | << " AWS_TOKEN=" << has_key("AWS_TOKEN") |
267 | 0 | << " AWS_ENDPOINT=" << value_or("AWS_ENDPOINT") |
268 | 0 | << " AWS_REGION=" << value_or("AWS_REGION") |
269 | 0 | << " use_path_style=" << value_or("use_path_style") |
270 | 0 | << " fs.oss.endpoint=" << value_or("fs.oss.endpoint") |
271 | 0 | << " fs.s3a.endpoint=" << value_or("fs.s3a.endpoint"); |
272 | 0 | }); |
273 | 0 | paimon::ReadContextBuilder builder(table_path); |
274 | 0 | if (!read_columns.empty()) { |
275 | 0 | builder.SetReadSchema(read_columns); |
276 | 0 | } |
277 | 0 | if (!options.empty()) { |
278 | 0 | builder.SetOptions(options); |
279 | 0 | } |
280 | 0 | if (_predicate) { |
281 | 0 | builder.SetPredicate(_predicate); |
282 | 0 | builder.EnablePredicateFilter(true); |
283 | 0 | } |
284 | |
|
285 | 0 | auto context_result = builder.Finish(); |
286 | 0 | if (!context_result.ok()) { |
287 | 0 | return Status::InternalError("paimon-cpp build read context failed: {}", |
288 | 0 | context_result.status().ToString()); |
289 | 0 | } |
290 | 0 | auto context = std::move(context_result).value(); |
291 | |
|
292 | 0 | auto table_read_result = paimon::TableRead::Create(std::move(context)); |
293 | 0 | if (!table_read_result.ok()) { |
294 | 0 | return Status::InternalError("paimon-cpp create table read failed: {}", |
295 | 0 | table_read_result.status().ToString()); |
296 | 0 | } |
297 | 0 | auto table_read = std::move(table_read_result).value(); |
298 | 0 | auto reader_result = table_read->CreateReader(_split); |
299 | 0 | if (!reader_result.ok()) { |
300 | 0 | return Status::InternalError("paimon-cpp create reader failed: {}", |
301 | 0 | reader_result.status().ToString()); |
302 | 0 | } |
303 | 0 | _table_read = std::move(table_read); |
304 | 0 | _batch_reader = std::move(reader_result).value(); |
305 | 0 | return Status::OK(); |
306 | 0 | } |
307 | | |
308 | 1 | Status PaimonCppReader::_decode_split(std::shared_ptr<paimon::Split>* split) { |
309 | 1 | if (!_range.__isset.table_format_params || !_range.table_format_params.__isset.paimon_params || |
310 | 1 | !_range.table_format_params.paimon_params.__isset.paimon_split) { |
311 | 1 | return Status::InternalError("paimon-cpp missing paimon_split in scan range"); |
312 | 1 | } |
313 | 0 | const auto& encoded_split = _range.table_format_params.paimon_params.paimon_split; |
314 | 0 | std::string decoded_split; |
315 | 0 | if (!base64_decode(encoded_split, &decoded_split)) { |
316 | 0 | return Status::InternalError("paimon-cpp base64 decode paimon_split failed"); |
317 | 0 | } |
318 | 0 | auto pool = paimon::GetDefaultPool(); |
319 | 0 | auto split_result = |
320 | 0 | paimon::Split::Deserialize(decoded_split.data(), decoded_split.size(), pool); |
321 | 0 | if (!split_result.ok()) { |
322 | 0 | return Status::InternalError("paimon-cpp deserialize split failed: {}", |
323 | 0 | split_result.status().ToString()); |
324 | 0 | } |
325 | 0 | *split = std::move(split_result).value(); |
326 | 0 | return Status::OK(); |
327 | 0 | } |
328 | | |
329 | 0 | std::optional<std::string> PaimonCppReader::_resolve_table_path() const { |
330 | 0 | if (_range.__isset.table_format_params && _range.table_format_params.__isset.paimon_params && |
331 | 0 | _range.table_format_params.paimon_params.__isset.paimon_table && |
332 | 0 | !_range.table_format_params.paimon_params.paimon_table.empty()) { |
333 | 0 | return _range.table_format_params.paimon_params.paimon_table; |
334 | 0 | } |
335 | 0 | return std::nullopt; |
336 | 0 | } |
337 | | |
338 | 0 | std::vector<std::string> PaimonCppReader::_build_read_columns() const { |
339 | 0 | std::vector<std::string> columns; |
340 | 0 | columns.reserve(_file_slot_descs.size()); |
341 | 0 | for (const auto& slot : _file_slot_descs) { |
342 | 0 | columns.emplace_back(slot->col_name()); |
343 | 0 | } |
344 | 0 | return columns; |
345 | 0 | } |
346 | | |
347 | 0 | std::map<std::string, std::string> PaimonCppReader::_build_options() const { |
348 | 0 | std::map<std::string, std::string> options; |
349 | 0 | if (_range_params && _range_params->__isset.paimon_options && |
350 | 0 | !_range_params->paimon_options.empty()) { |
351 | 0 | options.insert(_range_params->paimon_options.begin(), _range_params->paimon_options.end()); |
352 | 0 | } else if (_range.__isset.table_format_params && |
353 | 0 | _range.table_format_params.__isset.paimon_params && |
354 | 0 | _range.table_format_params.paimon_params.__isset.paimon_options) { |
355 | 0 | options.insert(_range.table_format_params.paimon_params.paimon_options.begin(), |
356 | 0 | _range.table_format_params.paimon_params.paimon_options.end()); |
357 | 0 | } |
358 | |
|
359 | 0 | if (_range_params && _range_params->__isset.properties && !_range_params->properties.empty()) { |
360 | 0 | for (const auto& kv : _range_params->properties) { |
361 | 0 | options[kv.first] = kv.second; |
362 | 0 | } |
363 | 0 | } else if (_range.__isset.table_format_params && |
364 | 0 | _range.table_format_params.__isset.paimon_params && |
365 | 0 | _range.table_format_params.paimon_params.__isset.hadoop_conf) { |
366 | 0 | for (const auto& kv : _range.table_format_params.paimon_params.hadoop_conf) { |
367 | 0 | options[kv.first] = kv.second; |
368 | 0 | } |
369 | 0 | } |
370 | |
|
371 | 0 | auto copy_if_missing = [&](const char* from_key, const char* to_key) { |
372 | 0 | if (options.find(to_key) != options.end()) { |
373 | 0 | return; |
374 | 0 | } |
375 | 0 | auto it = options.find(from_key); |
376 | 0 | if (it != options.end() && !it->second.empty()) { |
377 | 0 | options[to_key] = it->second; |
378 | 0 | } |
379 | 0 | }; |
380 | | |
381 | | // Map common OSS/S3 Hadoop configs to Doris S3 property keys. |
382 | 0 | copy_if_missing("fs.oss.accessKeyId", "AWS_ACCESS_KEY"); |
383 | 0 | copy_if_missing("fs.oss.accessKeySecret", "AWS_SECRET_KEY"); |
384 | 0 | copy_if_missing("fs.oss.sessionToken", "AWS_TOKEN"); |
385 | 0 | copy_if_missing("fs.oss.endpoint", "AWS_ENDPOINT"); |
386 | 0 | copy_if_missing("fs.oss.region", "AWS_REGION"); |
387 | 0 | copy_if_missing("fs.s3a.access.key", "AWS_ACCESS_KEY"); |
388 | 0 | copy_if_missing("fs.s3a.secret.key", "AWS_SECRET_KEY"); |
389 | 0 | copy_if_missing("fs.s3a.session.token", "AWS_TOKEN"); |
390 | 0 | copy_if_missing("fs.s3a.endpoint", "AWS_ENDPOINT"); |
391 | 0 | copy_if_missing("fs.s3a.region", "AWS_REGION"); |
392 | 0 | copy_if_missing("fs.s3a.path.style.access", "use_path_style"); |
393 | | |
394 | | // Backfill file.format/manifest.format from split file_format to avoid |
395 | | // paimon-cpp falling back to default manifest.format=avro. |
396 | 0 | if (_range.__isset.table_format_params && _range.table_format_params.__isset.paimon_params && |
397 | 0 | _range.table_format_params.paimon_params.__isset.file_format && |
398 | 0 | !_range.table_format_params.paimon_params.file_format.empty()) { |
399 | 0 | const auto& split_file_format = _range.table_format_params.paimon_params.file_format; |
400 | 0 | auto file_format_it = options.find(paimon::Options::FILE_FORMAT); |
401 | 0 | if (file_format_it == options.end() || file_format_it->second.empty()) { |
402 | 0 | options[paimon::Options::FILE_FORMAT] = split_file_format; |
403 | 0 | } |
404 | 0 | auto manifest_format_it = options.find(paimon::Options::MANIFEST_FORMAT); |
405 | 0 | if (manifest_format_it == options.end() || manifest_format_it->second.empty()) { |
406 | 0 | options[paimon::Options::MANIFEST_FORMAT] = split_file_format; |
407 | 0 | } |
408 | 0 | } |
409 | |
|
410 | 0 | options[paimon::Options::FILE_SYSTEM] = "doris"; |
411 | 0 | return options; |
412 | 0 | } |
413 | | |
414 | | } // namespace doris |