Coverage Report

Created: 2026-06-09 13:53

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format_v2/table_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format_v2/table_reader.h"
19
20
#include <gen_cpp/PlanNodes_types.h>
21
#include <gen_cpp/Types_types.h>
22
23
#include <cstring>
24
#include <set>
25
#include <sstream>
26
#include <stdexcept>
27
#include <vector>
28
29
#include "common/cast_set.h"
30
#include "common/status.h"
31
#include "core/assert_cast.h"
32
#include "exec/common/endian.h"
33
#include "exprs/vexpr_context.h"
34
#include "exprs/vslot_ref.h"
35
#include "format_v2/parquet/parquet_reader.h"
36
#include "format_v2/column_mapper.h"
37
#include "format/table/deletion_vector_reader.h"
38
#include "io/io_common.h"
39
#include "roaring/roaring64map.hh"
40
41
namespace doris::format {
42
namespace {
43
44
template <typename T, typename Formatter>
45
0
std::string join_table_reader_debug_strings(const std::vector<T>& values, Formatter formatter) {
46
0
    std::ostringstream out;
47
0
    out << "[";
48
0
    for (size_t i = 0; i < values.size(); ++i) {
49
0
        if (i > 0) {
50
0
            out << ", ";
51
0
        }
52
0
        out << formatter(values[i]);
53
0
    }
54
0
    out << "]";
55
0
    return out.str();
56
0
}
Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_16ColumnDefinitionEZNKS0_11TableReader12debug_stringB5cxx11EvE3$_0EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_
Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_11TableFilterEZNKS0_11TableReader12debug_stringB5cxx11EvE3$_1EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_
Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_11GlobalIndexEZNS1_25table_filter_debug_stringB5cxx11ERKNS0_11TableFilterEE3$_0EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISF_EET0_
Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsISt10shared_ptrINS_12VExprContextEEZNKS0_11TableReader12debug_stringB5cxx11EvE3$_2EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISF_EET0_
Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_16ColumnDefinitionEZNKS0_11TableReader12debug_stringB5cxx11EvE3$_3EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_
Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_11TableReader15FileBlockColumnEZNKS3_12debug_stringB5cxx11EvE3$_4EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_
57
58
0
std::string file_format_to_string(FileFormat format) {
59
0
    switch (format) {
60
0
    case FileFormat::PARQUET:
61
0
        return "PARQUET";
62
0
    case FileFormat::ORC:
63
0
        return "ORC";
64
0
    case FileFormat::CSV:
65
0
        return "CSV";
66
0
    }
67
0
    return "UNKNOWN";
68
0
}
69
70
0
std::string push_down_agg_to_string(TPushAggOp::type op) {
71
0
    switch (op) {
72
0
    case TPushAggOp::NONE:
73
0
        return "NONE";
74
0
    case TPushAggOp::COUNT:
75
0
        return "COUNT";
76
0
    case TPushAggOp::MINMAX:
77
0
        return "MINMAX";
78
0
    case TPushAggOp::MIX:
79
0
        return "MIX";
80
0
    case TPushAggOp::COUNT_ON_INDEX:
81
0
        return "COUNT_ON_INDEX";
82
0
    }
83
0
    return "UNKNOWN";
84
0
}
85
86
0
std::string current_file_debug_string(const std::unique_ptr<ScanTask>& task) {
87
0
    if (task == nullptr || task->data_file == nullptr) {
88
0
        return "null";
89
0
    }
90
0
    const auto& file = *task->data_file;
91
0
    std::ostringstream out;
92
0
    out << "FileDescription{path=" << file.path << ", file_size=" << file.file_size
93
0
        << ", range_start_offset=" << file.range_start_offset << ", range_size=" << file.range_size
94
0
        << ", mtime=" << file.mtime << ", fs_name=" << file.fs_name
95
0
        << ", file_cache_admission=" << file.file_cache_admission << "}";
96
0
    return out.str();
97
0
}
98
99
0
std::string partition_values_debug_string(const std::map<std::string, Field>& partition_values) {
100
0
    std::ostringstream out;
101
0
    out << "{";
102
0
    size_t idx = 0;
103
0
    for (const auto& [key, _] : partition_values) {
104
0
        if (idx++ > 0) {
105
0
            out << ", ";
106
0
        }
107
0
        out << key;
108
0
    }
109
0
    out << "}";
110
0
    return out.str();
111
0
}
112
113
0
std::string expr_context_debug_string(const VExprContextSPtr& context) {
114
0
    if (context == nullptr) {
115
0
        return "null";
116
0
    }
117
0
    const auto root = context->root();
118
0
    if (root == nullptr) {
119
0
        return "VExprContext{root=null}";
120
0
    }
121
0
    std::ostringstream out;
122
0
    out << "VExprContext{root_name=" << root->expr_name() << ", root_debug=" << root->debug_string()
123
0
        << "}";
124
0
    return out.str();
125
0
}
126
127
0
std::string table_filter_debug_string(const TableFilter& filter) {
128
0
    std::ostringstream out;
129
0
    out << "TableFilter{conjunct=" << expr_context_debug_string(filter.conjunct)
130
0
        << ", global_indices="
131
0
        << join_table_reader_debug_strings(
132
0
                   filter.global_indices,
133
0
                   [](GlobalIndex global_index) { return std::to_string(global_index.value()); })
134
0
        << "}";
135
0
    return out.str();
136
0
}
137
138
0
std::string table_column_predicates_debug_string(const TableColumnPredicates& predicates) {
139
0
    std::ostringstream out;
140
0
    out << "{";
141
0
    size_t idx = 0;
142
0
    for (const auto& [global_index, column_predicates] : predicates) {
143
0
        if (idx++ > 0) {
144
0
            out << ", ";
145
0
        }
146
0
        out << global_index.value() << ":{predicate_count=" << column_predicates.size() << "}";
147
0
    }
148
0
    out << "}";
149
0
    return out.str();
150
0
}
151
152
0
void collect_global_indices(const VExprSPtr& expr, std::set<GlobalIndex>* global_indices) {
153
0
    if (expr == nullptr) {
154
0
        return;
155
0
    }
156
0
    if (expr->is_slot_ref()) {
157
0
        const auto* slot_ref = assert_cast<const VSlotRef*>(expr.get());
158
0
        DORIS_CHECK(slot_ref->column_id() >= 0);
159
0
        global_indices->insert(GlobalIndex(cast_set<size_t>(slot_ref->column_id())));
160
0
    }
161
0
    for (const auto& child : expr->children()) {
162
0
        collect_global_indices(child, global_indices);
163
0
    }
164
0
}
165
166
Status build_table_filters_from_conjunct(const VExprContextSPtr& conjunct, RuntimeState* state,
167
0
                                         std::vector<TableFilter>* table_filters) {
168
0
    if (conjunct == nullptr) {
169
0
        return Status::OK();
170
0
    }
171
0
    std::set<GlobalIndex> global_indices;
172
0
    collect_global_indices(conjunct->root(), &global_indices);
173
0
    if (!global_indices.empty()) {
174
0
        TableFilter table_filter;
175
0
        VExprSPtr filter_root;
176
0
        RETURN_IF_ERROR(clone_table_expr_tree(conjunct->root(), &filter_root));
177
0
        table_filter.conjunct = VExprContext::create_shared(std::move(filter_root));
178
0
        for (const auto global_index : global_indices) {
179
0
            table_filter.global_indices.push_back(global_index);
180
0
        }
181
0
        table_filters->push_back(std::move(table_filter));
182
0
    }
183
0
    return Status::OK();
184
0
}
185
186
Status parse_deletion_vector(const char* buf, size_t buffer_size, DeleteFileDesc::Format format,
187
0
                             DeleteRows* delete_rows) {
188
0
    DORIS_CHECK(buf != nullptr);
189
0
    DORIS_CHECK(delete_rows != nullptr);
190
0
    DORIS_CHECK(format == DeleteFileDesc::Format::PAIMON ||
191
0
                format == DeleteFileDesc::Format::ICEBERG);
192
193
0
    const size_t checksum_size = format == DeleteFileDesc::Format::ICEBERG ? 4 : 0;
194
0
    if (buffer_size < 8 + checksum_size) [[unlikely]] {
195
0
        return Status::DataQualityError("Deletion vector file size too small: {}", buffer_size);
196
0
    }
197
198
0
    auto total_length = BigEndian::Load32(buf);
199
0
    if (total_length + 4 + checksum_size != buffer_size) [[unlikely]] {
200
0
        return Status::DataQualityError("Deletion vector length mismatch, expected: {}, actual: {}",
201
0
                                        total_length + 4 + checksum_size, buffer_size);
202
0
    }
203
204
0
    constexpr static char MAGIC_NUMBER[] = {'\xD1', '\xD3', '\x39', '\x64'};
205
0
    if (memcmp(buf + sizeof(total_length), MAGIC_NUMBER, 4) != 0) [[unlikely]] {
206
0
        return Status::DataQualityError("Deletion vector magic number mismatch");
207
0
    }
208
209
0
    const char* bitmap_buf = buf + 8;
210
0
    const size_t bitmap_size = buffer_size - 8 - checksum_size;
211
0
    if (format == DeleteFileDesc::Format::PAIMON) {
212
0
        roaring::Roaring bitmap;
213
0
        try {
214
0
            bitmap = roaring::Roaring::readSafe(bitmap_buf, bitmap_size);
215
0
        } catch (const std::runtime_error& e) {
216
0
            return Status::DataQualityError("Decode roaring bitmap failed, {}", e.what());
217
0
        }
218
219
0
        delete_rows->reserve(bitmap.cardinality());
220
0
        for (auto it = bitmap.begin(); it != bitmap.end(); it++) {
221
0
            delete_rows->push_back(*it);
222
0
        }
223
0
        return Status::OK();
224
0
    }
225
226
0
    roaring::Roaring64Map bitmap;
227
0
    try {
228
0
        bitmap = roaring::Roaring64Map::readSafe(bitmap_buf, bitmap_size);
229
0
    } catch (const std::runtime_error& e) {
230
0
        return Status::DataQualityError("Decode roaring bitmap failed, {}", e.what());
231
0
    }
232
233
0
    delete_rows->reserve(bitmap.cardinality());
234
0
    for (auto it = bitmap.begin(); it != bitmap.end(); it++) {
235
0
        delete_rows->push_back(cast_set<int64_t>(*it));
236
0
    }
237
0
    return Status::OK();
238
0
}
239
240
} // namespace
241
242
std::shared_ptr<io::FileSystemProperties> create_system_properties(
243
0
        const TFileScanRangeParams* scan_params) {
244
0
    auto system_properties = std::make_shared<io::FileSystemProperties>();
245
0
    if (scan_params == nullptr || !scan_params->__isset.file_type) {
246
0
        system_properties->system_type = TFileType::FILE_LOCAL;
247
0
        return system_properties;
248
0
    }
249
0
    system_properties->system_type = scan_params->file_type;
250
0
    system_properties->properties = scan_params->properties;
251
0
    system_properties->hdfs_params = scan_params->hdfs_params;
252
0
    if (scan_params->__isset.broker_addresses) {
253
0
        system_properties->broker_addresses.assign(scan_params->broker_addresses.begin(),
254
0
                                                   scan_params->broker_addresses.end());
255
0
    }
256
0
    return system_properties;
257
0
}
258
259
0
std::string TableReader::debug_string() const {
260
0
    std::ostringstream out;
261
0
    out << "TableReader{format=" << file_format_to_string(_format)
262
0
        << ", push_down_agg_type=" << push_down_agg_to_string(_push_down_agg_type)
263
0
        << ", aggregate_pushdown_tried=" << _aggregate_pushdown_tried
264
0
        << ", has_current_reader=" << (_data_reader.reader != nullptr)
265
0
        << ", has_current_task=" << (_current_task != nullptr)
266
0
        << ", current_file=" << current_file_debug_string(_current_task)
267
0
        << ", has_delete_rows=" << (_delete_rows != nullptr)
268
0
        << ", delete_row_count=" << (_delete_rows == nullptr ? 0 : _delete_rows->size())
269
0
        << ", has_profile=" << (_profile != nullptr)
270
0
        << ", has_system_properties=" << (_system_properties != nullptr) << ", system_type="
271
0
        << (_system_properties == nullptr ? static_cast<int>(TFileType::FILE_LOCAL)
272
0
                                          : static_cast<int>(_system_properties->system_type))
273
0
        << ", has_scan_params=" << (_scan_params != nullptr)
274
0
        << ", has_io_ctx=" << (_io_ctx != nullptr)
275
0
        << ", has_runtime_state=" << (_runtime_state != nullptr)
276
0
        << ", has_scanner_profile=" << (_scanner_profile != nullptr)
277
0
        << ", mapper_options=" << _mapper_options.debug_string() << ", projected_columns="
278
0
        << join_table_reader_debug_strings(
279
0
                   _projected_columns,
280
0
                   [](const ColumnDefinition& column) { return column.debug_string(); })
281
0
        << ", partition_values=" << partition_values_debug_string(_partition_values)
282
0
        << ", table_filters="
283
0
        << join_table_reader_debug_strings(
284
0
                   _table_filters,
285
0
                   [](const TableFilter& filter) { return table_filter_debug_string(filter); })
286
0
        << ", table_column_predicates="
287
0
        << table_column_predicates_debug_string(_table_column_predicates)
288
0
        << ", conjunct_count=" << _conjuncts.size() << ", conjuncts="
289
0
        << join_table_reader_debug_strings(_conjuncts,
290
0
                                           [](const VExprContextSPtr& conjunct) {
291
0
                                               return expr_context_debug_string(conjunct);
292
0
                                           })
293
0
        << ", file_schema="
294
0
        << join_table_reader_debug_strings(
295
0
                   _data_reader.file_schema,
296
0
                   [](const ColumnDefinition& field) { return field.debug_string(); })
297
0
        << ", file_block_layout="
298
0
        << join_table_reader_debug_strings(
299
0
                   _data_reader.file_block_layout,
300
0
                   [](const FileBlockColumn& column) {
301
0
                       std::ostringstream column_out;
302
0
                       column_out << "FileBlockColumn{file_column_id=" << column.file_column_id
303
0
                                  << ", name=" << column.name << ", type="
304
0
                                  << (column.type == nullptr ? "null" : column.type->get_name())
305
0
                                  << "}";
306
0
                       return column_out.str();
307
0
                   })
308
0
        << ", block_template_columns=" << _data_reader.block_template.columns()
309
0
        << ", column_mapper=" << _data_reader.column_mapper.debug_string() << "}";
310
0
    return out.str();
311
0
}
312
313
0
Status TableReader::init(TableReadOptions&& options) {
314
0
    _scan_params = options.scan_params;
315
0
    _format = options.format;
316
0
    _io_ctx = options.io_ctx;
317
0
    _runtime_state = options.runtime_state;
318
0
    _scanner_profile = options.scanner_profile;
319
0
    _push_down_agg_type = options.push_down_agg_type;
320
0
    _projected_columns = std::move(options.projected_columns);
321
0
    _system_properties = create_system_properties(_scan_params);
322
0
    _profile = std::move(options.profile);
323
0
    _mapper_options.mode = TableColumnMappingMode::BY_NAME;
324
0
    _mapper_options.allow_missing_columns = options.allow_missing_columns;
325
0
    _conjuncts = std::move(options.conjuncts);
326
0
    _table_column_predicates = std::move(options.column_predicates);
327
0
    return Status::OK();
328
0
}
329
330
0
Status TableReader::_build_table_filters_from_conjuncts() {
331
0
    _table_filters.clear();
332
0
    for (const auto& conjunct : _conjuncts) {
333
0
        RETURN_IF_ERROR(
334
0
                build_table_filters_from_conjunct(conjunct, _runtime_state, &_table_filters));
335
0
    }
336
0
    return Status::OK();
337
0
}
338
339
0
Status TableReader::_open_local_filter_exprs(const FileScanRequest& file_request) {
340
0
    RowDescriptor row_desc;
341
0
    for (const auto& conjunct : file_request.conjuncts) {
342
0
        RETURN_IF_ERROR(conjunct->prepare(_runtime_state, row_desc));
343
0
        RETURN_IF_ERROR(conjunct->open(_runtime_state));
344
0
    }
345
0
    for (const auto& delete_conjunct : file_request.delete_conjuncts) {
346
0
        RETURN_IF_ERROR(delete_conjunct->prepare(_runtime_state, row_desc));
347
0
        RETURN_IF_ERROR(delete_conjunct->open(_runtime_state));
348
0
    }
349
0
    return Status::OK();
350
0
}
351
352
0
Status TableReader::create_next_reader(bool* eos) {
353
0
    DCHECK(_data_reader.reader == nullptr);
354
0
    if (_current_task == nullptr) {
355
0
        *eos = true;
356
0
        return Status::OK();
357
0
    }
358
359
0
    switch (_format) {
360
0
    case FileFormat::PARQUET: {
361
0
        _data_reader.reader = std::make_unique<parquet::ParquetReader>(
362
0
                _system_properties, _current_task->data_file, _io_ctx, _scanner_profile);
363
0
        break;
364
0
    }
365
0
    case FileFormat::ORC:
366
0
    case FileFormat::CSV:
367
0
        return Status::NotSupported("TableReader does not support file format {}",
368
0
                                    static_cast<int>(_format));
369
0
    }
370
371
0
    RETURN_IF_ERROR(_data_reader.reader->init(_runtime_state));
372
0
    RETURN_IF_ERROR(open_reader());
373
0
    if (_data_reader.reader == nullptr) {
374
0
        *eos = _current_task == nullptr;
375
0
        return Status::OK();
376
0
    }
377
0
    *eos = false;
378
0
    return Status::OK();
379
0
}
380
381
0
std::unique_ptr<io::FileDescription> create_file_description(const TFileRangeDesc& range) {
382
0
    auto file_description = std::make_unique<io::FileDescription>();
383
0
    file_description->path = range.path;
384
0
    file_description->file_size = range.__isset.file_size ? range.file_size : -1;
385
0
    file_description->range_start_offset = range.__isset.start_offset ? range.start_offset : 0;
386
0
    file_description->range_size = range.__isset.size ? range.size : -1;
387
0
    if (range.__isset.fs_name) {
388
0
        file_description->fs_name = range.fs_name;
389
0
    }
390
0
    if (range.__isset.file_cache_admission) {
391
0
        file_description->file_cache_admission = range.file_cache_admission;
392
0
    }
393
0
    return file_description;
394
0
}
395
396
0
Status TableReader::prepare_split(const SplitReadOptions& options) {
397
0
    _partition_values = std::move(options.partition_values);
398
0
    _current_task = std::make_unique<ScanTask>();
399
0
    _current_task->data_file = create_file_description(options.current_range);
400
0
    _delete_rows = nullptr;
401
0
    _aggregate_pushdown_tried = false;
402
0
    return _parse_delete_predicates(options);
403
0
}
404
405
0
Status TableReader::_parse_delete_predicates(const SplitReadOptions& options) {
406
0
    DeleteFileDesc desc {.fs_name = options.current_range.fs_name};
407
0
    bool has_delete_file = false;
408
0
    RETURN_IF_ERROR(_parse_deletion_vector_file(options.current_range.table_format_params, &desc,
409
0
                                                &has_delete_file));
410
0
    if (has_delete_file) {
411
0
        DORIS_CHECK(options.cache != nullptr);
412
0
        Status create_status = Status::OK();
413
414
0
        _delete_rows = options.cache->get<DeleteRows>(desc.key, [&]() -> DeleteRows* {
415
0
            auto* delete_rows = new DeleteRows;
416
417
0
            DeletionVectorReader dv_reader(_runtime_state, _scanner_profile, *_scan_params, desc,
418
0
                                           _io_ctx.get());
419
0
            create_status = dv_reader.open();
420
0
            if (!create_status.ok()) [[unlikely]] {
421
0
                return nullptr;
422
0
            }
423
424
0
            size_t bytes_read = desc.size;
425
0
            std::vector<char> buffer(bytes_read);
426
0
            create_status = dv_reader.read_at(desc.start_offset, {buffer.data(), bytes_read});
427
0
            if (!create_status.ok()) [[unlikely]] {
428
0
                return nullptr;
429
0
            }
430
431
0
            const char* buf = buffer.data();
432
0
            SCOPED_TIMER(_profile->parse_delete_file_time);
433
0
            create_status = parse_deletion_vector(buf, bytes_read, desc.format, delete_rows);
434
0
            if (!create_status.ok()) [[unlikely]] {
435
0
                return nullptr;
436
0
            }
437
0
            COUNTER_UPDATE(_profile->num_delete_rows, delete_rows->size());
438
0
            return delete_rows;
439
0
        });
440
0
        RETURN_IF_ERROR(create_status);
441
0
    }
442
443
0
    return Status::OK();
444
0
}
445
} // namespace doris::format