Coverage Report

Created: 2026-06-04 16:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/table/paimon_cpp_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/table/paimon_cpp_reader.h"
19
20
#include <algorithm>
21
#include <mutex>
22
#include <tuple>
23
#include <unordered_map>
24
#include <utility>
25
26
#include "arrow/c/bridge.h"
27
#include "arrow/record_batch.h"
28
#include "arrow/result.h"
29
#include "core/block/block.h"
30
#include "core/block/column_with_type_and_name.h"
31
#include "format/table/paimon_doris_file_system.h"
32
#include "format/table/partition_column_filler.h"
33
#include "paimon/defs.h"
34
#include "paimon/memory/memory_pool.h"
35
#include "paimon/read_context.h"
36
#include "paimon/table/source/table_read.h"
37
#include "runtime/descriptors.h"
38
#include "runtime/runtime_state.h"
39
#include "util/url_coding.h"
40
41
namespace doris {
42
43
namespace {
44
constexpr const char* VALUE_KIND_FIELD = "_VALUE_KIND";
45
46
} // namespace
47
48
PaimonCppReader::PaimonCppReader(const std::vector<SlotDescriptor*>& file_slot_descs,
49
                                 RuntimeState* state, RuntimeProfile* profile,
50
                                 const TFileRangeDesc& range,
51
                                 const TFileScanRangeParams* range_params)
52
2
        : _file_slot_descs(file_slot_descs),
53
2
          _state(state),
54
2
          _profile(profile),
55
2
          _range(range),
56
2
          _range_params(range_params) {
57
2
    TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, _ctzz);
58
2
    if (range.__isset.table_format_params &&
59
2
        range.table_format_params.__isset.table_level_row_count) {
60
2
        _remaining_table_level_row_count = range.table_format_params.table_level_row_count;
61
2
    } else {
62
0
        _remaining_table_level_row_count = -1;
63
0
    }
64
2
}
65
66
2
PaimonCppReader::~PaimonCppReader() = default;
67
68
0
Status PaimonCppReader::on_before_init_reader(ReaderInitContext* ctx) {
69
0
    _column_descs = ctx->column_descs;
70
0
    _partition_values.clear();
71
0
    _partition_value_is_null.clear();
72
0
    if (ctx->range == nullptr || ctx->tuple_descriptor == nullptr ||
73
0
        !ctx->range->__isset.columns_from_path_keys) {
74
0
        return Status::OK();
75
0
    }
76
77
0
    DORIS_CHECK(ctx->range->__isset.columns_from_path);
78
0
    DORIS_CHECK(ctx->range->columns_from_path.size() == ctx->range->columns_from_path_keys.size());
79
0
    const bool has_null_flags = ctx->range->__isset.columns_from_path_is_null;
80
0
    if (has_null_flags) {
81
0
        DORIS_CHECK(ctx->range->columns_from_path_is_null.size() ==
82
0
                    ctx->range->columns_from_path_keys.size());
83
0
    }
84
85
0
    std::unordered_map<std::string, const SlotDescriptor*> name_to_slot;
86
0
    for (auto* slot : ctx->tuple_descriptor->slots()) {
87
0
        name_to_slot.emplace(slot->col_name(), slot);
88
0
    }
89
0
    for (size_t i = 0; i < ctx->range->columns_from_path_keys.size(); ++i) {
90
0
        const auto& key = ctx->range->columns_from_path_keys[i];
91
0
        auto slot_it = name_to_slot.find(key);
92
0
        if (slot_it == name_to_slot.end()) {
93
0
            continue;
94
0
        }
95
0
        _partition_values.emplace(
96
0
                key, std::make_tuple(ctx->range->columns_from_path[i], slot_it->second));
97
0
        _partition_value_is_null.emplace(
98
0
                key, has_null_flags ? ctx->range->columns_from_path_is_null[i] : false);
99
0
    }
100
0
    return Status::OK();
101
0
}
102
103
3
Status PaimonCppReader::on_after_read_block(Block* block, size_t* read_rows) {
104
3
    if (_column_descs == nullptr || _partition_values.empty() || *read_rows == 0 ||
105
3
        _push_down_agg_type == TPushAggOp::type::COUNT) {
106
3
        return Status::OK();
107
3
    }
108
0
    return _fill_partition_columns(block, *read_rows);
109
3
}
110
111
2
Status PaimonCppReader::init_reader() {
112
2
    if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count >= 0) {
113
1
        return Status::OK();
114
1
    }
115
1
    return _init_paimon_reader();
116
2
}
117
118
3
Status PaimonCppReader::_do_get_next_block(Block* block, size_t* read_rows, bool* eof) {
119
3
    if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count >= 0) {
120
3
        auto rows = std::min(_remaining_table_level_row_count,
121
3
                             (int64_t)_state->query_options().batch_size);
122
3
        _remaining_table_level_row_count -= rows;
123
3
        auto mutable_columns_guard = block->mutate_columns_scoped();
124
3
        auto& mutate_columns = mutable_columns_guard.mutable_columns();
125
3
        for (auto& col : mutate_columns) {
126
0
            col->resize(rows);
127
0
        }
128
3
        *read_rows = rows;
129
3
        *eof = false;
130
3
        if (_remaining_table_level_row_count == 0) {
131
2
            *eof = true;
132
2
        }
133
3
        return Status::OK();
134
3
    }
135
136
0
    if (!_batch_reader) {
137
0
        return Status::InternalError("paimon-cpp reader is not initialized");
138
0
    }
139
140
0
    if (_col_name_to_block_idx.empty()) {
141
0
        _col_name_to_block_idx = block->get_name_to_pos_map();
142
0
    }
143
144
0
    auto batch_result = _batch_reader->NextBatch();
145
0
    if (!batch_result.ok()) {
146
0
        return Status::InternalError("paimon-cpp read batch failed: {}",
147
0
                                     batch_result.status().ToString());
148
0
    }
149
0
    auto batch = std::move(batch_result).value();
150
0
    if (paimon::BatchReader::IsEofBatch(batch)) {
151
0
        *read_rows = 0;
152
0
        *eof = true;
153
0
        return Status::OK();
154
0
    }
155
156
0
    arrow::Result<std::shared_ptr<arrow::RecordBatch>> import_result =
157
0
            arrow::ImportRecordBatch(batch.first.get(), batch.second.get());
158
0
    if (!import_result.ok()) {
159
0
        return Status::InternalError("failed to import paimon-cpp arrow batch: {}",
160
0
                                     import_result.status().message());
161
0
    }
162
163
0
    auto record_batch = std::move(import_result).ValueUnsafe();
164
0
    const auto num_rows = static_cast<size_t>(record_batch->num_rows());
165
0
    const auto num_columns = record_batch->num_columns();
166
0
    auto columns_guard = block->mutate_columns_scoped();
167
0
    auto& columns = columns_guard.mutable_columns();
168
0
    for (int c = 0; c < num_columns; ++c) {
169
0
        const auto& field = record_batch->schema()->field(c);
170
0
        if (field->name() == VALUE_KIND_FIELD) {
171
0
            continue;
172
0
        }
173
174
0
        auto it = _col_name_to_block_idx.find(field->name());
175
0
        if (it == _col_name_to_block_idx.end()) {
176
            // Skip columns that are not in the block (e.g., partition columns handled elsewhere)
177
0
            continue;
178
0
        }
179
0
        const auto block_pos = it->second;
180
0
        try {
181
0
            RETURN_IF_ERROR(columns_guard.get_datatype_by_position(block_pos)
182
0
                                    ->get_serde()
183
0
                                    ->read_column_from_arrow(*columns[block_pos],
184
0
                                                             record_batch->column(c).get(), 0,
185
0
                                                             num_rows, _ctzz));
186
0
        } catch (Exception& e) {
187
0
            return Status::InternalError("Failed to convert from arrow to block: {}", e.what());
188
0
        }
189
0
    }
190
191
0
    *read_rows = num_rows;
192
0
    *eof = false;
193
0
    return Status::OK();
194
0
}
195
196
Status PaimonCppReader::_get_columns_impl(
197
0
        std::unordered_map<std::string, DataTypePtr>* name_to_type) {
198
0
    for (const auto& slot : _file_slot_descs) {
199
0
        name_to_type->emplace(slot->col_name(), slot->type());
200
0
    }
201
0
    return Status::OK();
202
0
}
203
204
0
Status PaimonCppReader::_fill_partition_columns(Block* block, size_t num_rows) {
205
0
    if (_col_name_to_block_idx.empty()) {
206
0
        _col_name_to_block_idx = block->get_name_to_pos_map();
207
0
    }
208
209
0
    for (const auto& desc : *_column_descs) {
210
0
        if (desc.category != ColumnCategory::PARTITION_KEY) {
211
0
            continue;
212
0
        }
213
0
        auto value_it = _partition_values.find(desc.name);
214
0
        if (value_it == _partition_values.end()) {
215
0
            continue;
216
0
        }
217
0
        auto col_it = _col_name_to_block_idx.find(desc.name);
218
0
        if (col_it == _col_name_to_block_idx.end()) {
219
0
            return Status::InternalError("Missing partition column {} in block {}", desc.name,
220
0
                                         block->dump_structure());
221
0
        }
222
223
0
        auto& column_with_type_and_name = block->get_by_position(col_it->second);
224
0
        auto mutable_column = std::move(*column_with_type_and_name.column).mutate();
225
0
        const auto& [value, slot_desc] = value_it->second;
226
0
        auto null_it = _partition_value_is_null.find(desc.name);
227
0
        DORIS_CHECK(null_it != _partition_value_is_null.end());
228
0
        RETURN_IF_ERROR(fill_partition_column_from_path_value(*mutable_column, *slot_desc, value,
229
0
                                                              num_rows, null_it->second));
230
0
        column_with_type_and_name.column = std::move(mutable_column);
231
0
    }
232
0
    return Status::OK();
233
0
}
234
235
0
Status PaimonCppReader::close() {
236
0
    if (_batch_reader) {
237
0
        _batch_reader->Close();
238
0
    }
239
0
    return Status::OK();
240
0
}
241
242
1
Status PaimonCppReader::_init_paimon_reader() {
243
1
    register_paimon_doris_file_system();
244
1
    RETURN_IF_ERROR(_decode_split(&_split));
245
246
0
    auto table_path_opt = _resolve_table_path();
247
0
    if (!table_path_opt.has_value()) {
248
0
        return Status::InternalError(
249
0
                "paimon-cpp missing paimon_table; cannot resolve paimon table root path");
250
0
    }
251
0
    auto options = _build_options();
252
0
    auto read_columns = _build_read_columns();
253
254
    // Avoid moving strings across module boundaries to prevent allocator mismatches in ASAN builds.
255
0
    std::string table_path = table_path_opt.value();
256
0
    static std::once_flag options_log_once;
257
0
    std::call_once(options_log_once, [&]() {
258
0
        auto has_key = [&](const char* key) {
259
0
            auto it = options.find(key);
260
0
            return (it != options.end() && !it->second.empty()) ? "set" : "empty";
261
0
        };
262
0
        auto value_or = [&](const char* key) {
263
0
            auto it = options.find(key);
264
0
            return it != options.end() ? it->second : std::string("<unset>");
265
0
        };
266
0
        LOG(INFO) << "paimon-cpp options summary: table_path=" << table_path
267
0
                  << " AWS_ACCESS_KEY=" << has_key("AWS_ACCESS_KEY")
268
0
                  << " AWS_SECRET_KEY=" << has_key("AWS_SECRET_KEY")
269
0
                  << " AWS_TOKEN=" << has_key("AWS_TOKEN")
270
0
                  << " AWS_ENDPOINT=" << value_or("AWS_ENDPOINT")
271
0
                  << " AWS_REGION=" << value_or("AWS_REGION")
272
0
                  << " use_path_style=" << value_or("use_path_style")
273
0
                  << " fs.oss.endpoint=" << value_or("fs.oss.endpoint")
274
0
                  << " fs.s3a.endpoint=" << value_or("fs.s3a.endpoint");
275
0
    });
276
0
    paimon::ReadContextBuilder builder(table_path);
277
0
    if (!read_columns.empty()) {
278
0
        builder.SetReadSchema(read_columns);
279
0
    }
280
0
    if (!options.empty()) {
281
0
        builder.SetOptions(options);
282
0
    }
283
0
    if (_predicate) {
284
0
        builder.SetPredicate(_predicate);
285
0
        builder.EnablePredicateFilter(true);
286
0
    }
287
288
0
    auto context_result = builder.Finish();
289
0
    if (!context_result.ok()) {
290
0
        return Status::InternalError("paimon-cpp build read context failed: {}",
291
0
                                     context_result.status().ToString());
292
0
    }
293
0
    auto context = std::move(context_result).value();
294
295
0
    auto table_read_result = paimon::TableRead::Create(std::move(context));
296
0
    if (!table_read_result.ok()) {
297
0
        return Status::InternalError("paimon-cpp create table read failed: {}",
298
0
                                     table_read_result.status().ToString());
299
0
    }
300
0
    auto table_read = std::move(table_read_result).value();
301
0
    auto reader_result = table_read->CreateReader(_split);
302
0
    if (!reader_result.ok()) {
303
0
        return Status::InternalError("paimon-cpp create reader failed: {}",
304
0
                                     reader_result.status().ToString());
305
0
    }
306
0
    _table_read = std::move(table_read);
307
0
    _batch_reader = std::move(reader_result).value();
308
0
    return Status::OK();
309
0
}
310
311
1
Status PaimonCppReader::_decode_split(std::shared_ptr<paimon::Split>* split) {
312
1
    if (!_range.__isset.table_format_params || !_range.table_format_params.__isset.paimon_params ||
313
1
        !_range.table_format_params.paimon_params.__isset.paimon_split) {
314
1
        return Status::InternalError("paimon-cpp missing paimon_split in scan range");
315
1
    }
316
0
    const auto& encoded_split = _range.table_format_params.paimon_params.paimon_split;
317
0
    std::string decoded_split;
318
0
    if (!base64_decode(encoded_split, &decoded_split)) {
319
0
        return Status::InternalError("paimon-cpp base64 decode paimon_split failed");
320
0
    }
321
0
    auto pool = paimon::GetDefaultPool();
322
0
    auto split_result =
323
0
            paimon::Split::Deserialize(decoded_split.data(), decoded_split.size(), pool);
324
0
    if (!split_result.ok()) {
325
0
        return Status::InternalError("paimon-cpp deserialize split failed: {}",
326
0
                                     split_result.status().ToString());
327
0
    }
328
0
    *split = std::move(split_result).value();
329
0
    return Status::OK();
330
0
}
331
332
0
std::optional<std::string> PaimonCppReader::_resolve_table_path() const {
333
0
    if (_range.__isset.table_format_params && _range.table_format_params.__isset.paimon_params &&
334
0
        _range.table_format_params.paimon_params.__isset.paimon_table &&
335
0
        !_range.table_format_params.paimon_params.paimon_table.empty()) {
336
0
        return _range.table_format_params.paimon_params.paimon_table;
337
0
    }
338
0
    return std::nullopt;
339
0
}
340
341
0
std::vector<std::string> PaimonCppReader::_build_read_columns() const {
342
0
    std::vector<std::string> columns;
343
0
    columns.reserve(_file_slot_descs.size());
344
0
    for (const auto& slot : _file_slot_descs) {
345
0
        columns.emplace_back(slot->col_name());
346
0
    }
347
0
    return columns;
348
0
}
349
350
0
std::map<std::string, std::string> PaimonCppReader::_build_options() const {
351
0
    std::map<std::string, std::string> options;
352
0
    if (_range_params && _range_params->__isset.paimon_options &&
353
0
        !_range_params->paimon_options.empty()) {
354
0
        options.insert(_range_params->paimon_options.begin(), _range_params->paimon_options.end());
355
0
    } else if (_range.__isset.table_format_params &&
356
0
               _range.table_format_params.__isset.paimon_params &&
357
0
               _range.table_format_params.paimon_params.__isset.paimon_options) {
358
0
        options.insert(_range.table_format_params.paimon_params.paimon_options.begin(),
359
0
                       _range.table_format_params.paimon_params.paimon_options.end());
360
0
    }
361
362
0
    if (_range_params && _range_params->__isset.properties && !_range_params->properties.empty()) {
363
0
        for (const auto& kv : _range_params->properties) {
364
0
            options[kv.first] = kv.second;
365
0
        }
366
0
    } else if (_range.__isset.table_format_params &&
367
0
               _range.table_format_params.__isset.paimon_params &&
368
0
               _range.table_format_params.paimon_params.__isset.hadoop_conf) {
369
0
        for (const auto& kv : _range.table_format_params.paimon_params.hadoop_conf) {
370
0
            options[kv.first] = kv.second;
371
0
        }
372
0
    }
373
374
0
    auto copy_if_missing = [&](const char* from_key, const char* to_key) {
375
0
        if (options.find(to_key) != options.end()) {
376
0
            return;
377
0
        }
378
0
        auto it = options.find(from_key);
379
0
        if (it != options.end() && !it->second.empty()) {
380
0
            options[to_key] = it->second;
381
0
        }
382
0
    };
383
384
    // Map common OSS/S3 Hadoop configs to Doris S3 property keys.
385
0
    copy_if_missing("fs.oss.accessKeyId", "AWS_ACCESS_KEY");
386
0
    copy_if_missing("fs.oss.accessKeySecret", "AWS_SECRET_KEY");
387
0
    copy_if_missing("fs.oss.sessionToken", "AWS_TOKEN");
388
0
    copy_if_missing("fs.oss.endpoint", "AWS_ENDPOINT");
389
0
    copy_if_missing("fs.oss.region", "AWS_REGION");
390
0
    copy_if_missing("fs.s3a.access.key", "AWS_ACCESS_KEY");
391
0
    copy_if_missing("fs.s3a.secret.key", "AWS_SECRET_KEY");
392
0
    copy_if_missing("fs.s3a.session.token", "AWS_TOKEN");
393
0
    copy_if_missing("fs.s3a.endpoint", "AWS_ENDPOINT");
394
0
    copy_if_missing("fs.s3a.region", "AWS_REGION");
395
0
    copy_if_missing("fs.s3a.path.style.access", "use_path_style");
396
397
    // Backfill file.format/manifest.format from split file_format to avoid
398
    // paimon-cpp falling back to default manifest.format=avro.
399
0
    if (_range.__isset.table_format_params && _range.table_format_params.__isset.paimon_params &&
400
0
        _range.table_format_params.paimon_params.__isset.file_format &&
401
0
        !_range.table_format_params.paimon_params.file_format.empty()) {
402
0
        const auto& split_file_format = _range.table_format_params.paimon_params.file_format;
403
0
        auto file_format_it = options.find(paimon::Options::FILE_FORMAT);
404
0
        if (file_format_it == options.end() || file_format_it->second.empty()) {
405
0
            options[paimon::Options::FILE_FORMAT] = split_file_format;
406
0
        }
407
0
        auto manifest_format_it = options.find(paimon::Options::MANIFEST_FORMAT);
408
0
        if (manifest_format_it == options.end() || manifest_format_it->second.empty()) {
409
0
            options[paimon::Options::MANIFEST_FORMAT] = split_file_format;
410
0
        }
411
0
    }
412
413
0
    options[paimon::Options::FILE_SYSTEM] = "doris";
414
0
    return options;
415
0
}
416
417
} // namespace doris