Coverage Report

Created: 2026-06-29 16:39

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format_v2/table_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format_v2/table_reader.h"
19
20
#include <gen_cpp/ExternalTableSchema_types.h>
21
#include <gen_cpp/PlanNodes_types.h>
22
#include <gen_cpp/Types_types.h>
23
24
#include <algorithm>
25
#include <cstring>
26
#include <ranges>
27
#include <set>
28
#include <sstream>
29
#include <stdexcept>
30
#include <utility>
31
#include <vector>
32
33
#include "common/cast_set.h"
34
#include "common/status.h"
35
#include "core/assert_cast.h"
36
#include "core/data_type/data_type_array.h"
37
#include "core/data_type/data_type_map.h"
38
#include "core/data_type/data_type_struct.h"
39
#include "exec/common/endian.h"
40
#include "exprs/vexpr_context.h"
41
#include "exprs/vslot_ref.h"
42
#include "format/table/deletion_vector_reader.h"
43
#include "format_v2/column_mapper.h"
44
#include "format_v2/delimited_text/csv_reader.h"
45
#include "format_v2/delimited_text/text_reader.h"
46
#include "format_v2/json/json_reader.h"
47
#include "format_v2/native/native_reader.h"
48
#include "format_v2/parquet/parquet_reader.h"
49
#include "roaring/roaring64map.hh"
50
#include "storage/segment/condition_cache.h"
51
#include "util/string_util.h"
52
53
namespace doris::format {
54
namespace {
55
56
template <typename T, typename Formatter>
57
0
std::string join_table_reader_debug_strings(const std::vector<T>& values, Formatter formatter) {
58
0
    std::ostringstream out;
59
0
    out << "[";
60
0
    for (size_t i = 0; i < values.size(); ++i) {
61
0
        if (i > 0) {
62
0
            out << ", ";
63
0
        }
64
0
        out << formatter(values[i]);
65
0
    }
66
0
    out << "]";
67
0
    return out.str();
68
0
}
Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_16ColumnDefinitionEZNKS0_11TableReader12debug_stringB5cxx11EvE3$_0EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_
Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_11TableFilterEZNKS0_11TableReader12debug_stringB5cxx11EvE3$_1EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_
Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_11GlobalIndexEZNS1_25table_filter_debug_stringB5cxx11ERKNS0_11TableFilterEE3$_0EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISF_EET0_
Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsISt10shared_ptrINS_12VExprContextEEZNKS0_11TableReader12debug_stringB5cxx11EvE3$_2EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISF_EET0_
Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_16ColumnDefinitionEZNKS0_11TableReader12debug_stringB5cxx11EvE3$_3EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_
Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_11TableReader15FileBlockColumnEZNKS3_12debug_stringB5cxx11EvE3$_4EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_
69
70
0
std::string file_format_to_string(FileFormat format) {
71
0
    switch (format) {
72
0
    case FileFormat::PARQUET:
73
0
        return "PARQUET";
74
0
    case FileFormat::ORC:
75
0
        return "ORC";
76
0
    case FileFormat::CSV:
77
0
        return "CSV";
78
0
    case FileFormat::JSON:
79
0
        return "JSON";
80
0
    case FileFormat::TEXT:
81
0
        return "TEXT";
82
0
    case FileFormat::JNI:
83
0
        return "JNI";
84
0
    case FileFormat::NATIVE:
85
0
        return "NATIVE";
86
0
    case FileFormat::ARROW:
87
0
        return "ARROW";
88
0
    }
89
0
    return "UNKNOWN";
90
0
}
91
92
0
std::string push_down_agg_to_string(TPushAggOp::type op) {
93
0
    switch (op) {
94
0
    case TPushAggOp::NONE:
95
0
        return "NONE";
96
0
    case TPushAggOp::COUNT:
97
0
        return "COUNT";
98
0
    case TPushAggOp::MINMAX:
99
0
        return "MINMAX";
100
0
    case TPushAggOp::MIX:
101
0
        return "MIX";
102
0
    case TPushAggOp::COUNT_ON_INDEX:
103
0
        return "COUNT_ON_INDEX";
104
0
    }
105
0
    return "UNKNOWN";
106
0
}
107
108
0
std::string current_file_debug_string(const std::unique_ptr<ScanTask>& task) {
109
0
    if (task == nullptr || task->data_file == nullptr) {
110
0
        return "null";
111
0
    }
112
0
    const auto& file = *task->data_file;
113
0
    std::ostringstream out;
114
0
    out << "FileDescription{path=" << file.path << ", file_size=" << file.file_size
115
0
        << ", range_start_offset=" << file.range_start_offset << ", range_size=" << file.range_size
116
0
        << ", mtime=" << file.mtime << ", fs_name=" << file.fs_name
117
0
        << ", file_cache_admission=" << file.file_cache_admission << "}";
118
0
    return out.str();
119
0
}
120
121
0
std::string partition_values_debug_string(const std::map<std::string, Field>& partition_values) {
122
0
    std::ostringstream out;
123
0
    out << "{";
124
0
    size_t idx = 0;
125
0
    for (const auto& [key, _] : partition_values) {
126
0
        if (idx++ > 0) {
127
0
            out << ", ";
128
0
        }
129
0
        out << key;
130
0
    }
131
0
    out << "}";
132
0
    return out.str();
133
0
}
134
135
0
const schema::external::TField* get_field_ptr(const schema::external::TFieldPtr& field_ptr) {
136
0
    if (!field_ptr.__isset.field_ptr || field_ptr.field_ptr == nullptr) {
137
0
        return nullptr;
138
0
    }
139
0
    return field_ptr.field_ptr.get();
140
0
}
141
142
0
bool external_field_matches_name(const schema::external::TField& field, const std::string& name) {
143
0
    if (field.__isset.name && to_lower(field.name) == to_lower(name)) {
144
0
        return true;
145
0
    }
146
0
    return field.__isset.name_mapping &&
147
0
           std::ranges::any_of(field.name_mapping, [&](const std::string& alias) {
148
0
               return to_lower(alias) == to_lower(name);
149
0
           });
150
0
}
151
152
DataTypePtr find_struct_child_type_by_name(const DataTypeStruct& struct_type,
153
0
                                           const std::string& field_name) {
154
0
    for (size_t field_idx = 0; field_idx < struct_type.get_elements().size(); ++field_idx) {
155
0
        if (to_lower(struct_type.get_element_name(field_idx)) == to_lower(field_name)) {
156
0
            return struct_type.get_element(field_idx);
157
0
        }
158
0
    }
159
0
    return nullptr;
160
0
}
161
162
ColumnDefinition build_schema_column_from_external_field(const schema::external::TField& field,
163
0
                                                         DataTypePtr type) {
164
0
    ColumnDefinition column {
165
0
            .identifier = field.__isset.id ? Field::create_field<TYPE_INT>(field.id) : Field {},
166
0
            .name = field.__isset.name ? field.name : "",
167
0
            .name_mapping =
168
0
                    field.__isset.name_mapping ? field.name_mapping : std::vector<std::string> {},
169
0
            .type = std::move(type),
170
0
            .children = {},
171
0
            .default_expr = nullptr,
172
0
            .is_partition_key = false,
173
0
    };
174
0
    if (column.type == nullptr || !field.__isset.nestedField) {
175
0
        return column;
176
0
    }
177
178
0
    const auto nested_type = remove_nullable(column.type);
179
0
    switch (nested_type->get_primitive_type()) {
180
0
    case TYPE_STRUCT: {
181
0
        if (!field.nestedField.__isset.struct_field ||
182
0
            !field.nestedField.struct_field.__isset.fields) {
183
0
            return column;
184
0
        }
185
0
        const auto& struct_type = assert_cast<const DataTypeStruct&>(*nested_type);
186
0
        for (const auto& child_ptr : field.nestedField.struct_field.fields) {
187
0
            const auto* child_field = get_field_ptr(child_ptr);
188
0
            if (child_field == nullptr || !child_field->__isset.name) {
189
0
                continue;
190
0
            }
191
0
            auto child_type = find_struct_child_type_by_name(struct_type, child_field->name);
192
0
            if (child_type == nullptr) {
193
0
                continue;
194
0
            }
195
0
            column.children.push_back(
196
0
                    build_schema_column_from_external_field(*child_field, child_type));
197
0
        }
198
0
        break;
199
0
    }
200
0
    case TYPE_ARRAY: {
201
0
        if (!field.nestedField.__isset.array_field ||
202
0
            !field.nestedField.array_field.__isset.item_field) {
203
0
            return column;
204
0
        }
205
0
        const auto* item_field = get_field_ptr(field.nestedField.array_field.item_field);
206
0
        if (item_field == nullptr) {
207
0
            return column;
208
0
        }
209
0
        const auto& array_type = assert_cast<const DataTypeArray&>(*nested_type);
210
0
        auto child =
211
0
                build_schema_column_from_external_field(*item_field, array_type.get_nested_type());
212
0
        child.name = "element";
213
0
        if (child.has_identifier_name()) {
214
0
            child.identifier = Field::create_field<TYPE_STRING>(child.name);
215
0
        }
216
0
        column.children.push_back(std::move(child));
217
0
        break;
218
0
    }
219
0
    case TYPE_MAP: {
220
0
        if (!field.nestedField.__isset.map_field ||
221
0
            !field.nestedField.map_field.__isset.key_field ||
222
0
            !field.nestedField.map_field.__isset.value_field) {
223
0
            return column;
224
0
        }
225
0
        const auto& map_type = assert_cast<const DataTypeMap&>(*nested_type);
226
0
        const auto* key_field = get_field_ptr(field.nestedField.map_field.key_field);
227
0
        if (key_field != nullptr) {
228
0
            auto child =
229
0
                    build_schema_column_from_external_field(*key_field, map_type.get_key_type());
230
0
            child.name = "key";
231
0
            if (child.has_identifier_name()) {
232
0
                child.identifier = Field::create_field<TYPE_STRING>(child.name);
233
0
            }
234
0
            column.children.push_back(std::move(child));
235
0
        }
236
0
        const auto* value_field = get_field_ptr(field.nestedField.map_field.value_field);
237
0
        if (value_field != nullptr) {
238
0
            auto child = build_schema_column_from_external_field(*value_field,
239
0
                                                                 map_type.get_value_type());
240
0
            child.name = "value";
241
0
            if (child.has_identifier_name()) {
242
0
                child.identifier = Field::create_field<TYPE_STRING>(child.name);
243
0
            }
244
0
            column.children.push_back(std::move(child));
245
0
        }
246
0
        break;
247
0
    }
248
0
    default:
249
0
        break;
250
0
    }
251
0
    return column;
252
0
}
253
254
const schema::external::TField* find_external_root_field(const TFileScanRangeParams* params,
255
6
                                                         const ColumnDefinition& column) {
256
6
    if (params == nullptr || !params->__isset.history_schema_info ||
257
6
        params->history_schema_info.empty()) {
258
6
        return nullptr;
259
6
    }
260
0
    const auto* schema = &params->history_schema_info.front();
261
0
    if (params->__isset.current_schema_id) {
262
0
        for (const auto& candidate_schema : params->history_schema_info) {
263
0
            if (candidate_schema.__isset.schema_id &&
264
0
                candidate_schema.schema_id == params->current_schema_id) {
265
0
                schema = &candidate_schema;
266
0
                break;
267
0
            }
268
0
        }
269
0
    }
270
0
    if (!schema->__isset.root_field || !schema->root_field.__isset.fields) {
271
0
        return nullptr;
272
0
    }
273
0
    for (const auto& field_ptr : schema->root_field.fields) {
274
0
        const auto* field = get_field_ptr(field_ptr);
275
0
        if (field == nullptr) {
276
0
            continue;
277
0
        }
278
0
        if (external_field_matches_name(*field, column.name)) {
279
0
            return field;
280
0
        }
281
0
    }
282
0
    return nullptr;
283
0
}
284
285
0
std::string expr_context_debug_string(const VExprContextSPtr& context) {
286
0
    if (context == nullptr) {
287
0
        return "null";
288
0
    }
289
0
    const auto root = context->root();
290
0
    if (root == nullptr) {
291
0
        return "VExprContext{root=null}";
292
0
    }
293
0
    std::ostringstream out;
294
0
    out << "VExprContext{root_name=" << root->expr_name() << ", root_debug=" << root->debug_string()
295
0
        << "}";
296
0
    return out.str();
297
0
}
298
299
0
std::string table_filter_debug_string(const TableFilter& filter) {
300
0
    std::ostringstream out;
301
0
    out << "TableFilter{conjunct=" << expr_context_debug_string(filter.conjunct)
302
0
        << ", global_indices="
303
0
        << join_table_reader_debug_strings(
304
0
                   filter.global_indices,
305
0
                   [](GlobalIndex global_index) { return std::to_string(global_index.value()); })
306
0
        << "}";
307
0
    return out.str();
308
0
}
309
310
0
std::string table_column_predicates_debug_string(const TableColumnPredicates& predicates) {
311
0
    std::ostringstream out;
312
0
    out << "{";
313
0
    size_t idx = 0;
314
0
    for (const auto& [global_index, column_predicates] : predicates) {
315
0
        if (idx++ > 0) {
316
0
            out << ", ";
317
0
        }
318
0
        out << global_index.value() << ":{predicate_count=" << column_predicates.size() << "}";
319
0
    }
320
0
    out << "}";
321
0
    return out.str();
322
0
}
323
324
3
bool contains_runtime_filter(const VExprContextSPtrs& conjuncts) {
325
3
    return std::ranges::any_of(conjuncts, [](const auto& conjunct) {
326
3
        return conjunct != nullptr && conjunct->root() != nullptr &&
327
3
               conjunct->root()->is_rf_wrapper();
328
3
    });
329
3
}
330
331
61
void collect_global_indices(const VExprSPtr& expr, std::set<GlobalIndex>* global_indices) {
332
61
    if (expr == nullptr) {
333
0
        return;
334
0
    }
335
61
    if (expr->is_rf_wrapper()) {
336
        // RuntimeFilterExpr wraps a real predicate expression but its own thrift node can still
337
        // look like SLOT_REF. Collect indices from the wrapped predicate; do not cast the wrapper
338
        // itself to VSlotRef.
339
2
        collect_global_indices(expr->get_impl(), global_indices);
340
2
        return;
341
2
    }
342
59
    if (expr->is_slot_ref()) {
343
20
        const auto* slot_ref = assert_cast<const VSlotRef*>(expr.get());
344
20
        DORIS_CHECK(slot_ref->column_id() >= 0);
345
20
        global_indices->insert(GlobalIndex(cast_set<size_t>(slot_ref->column_id())));
346
20
    }
347
59
    for (const auto& child : expr->children()) {
348
40
        collect_global_indices(child, global_indices);
349
40
    }
350
59
}
351
352
Status build_table_filters_from_conjunct(const VExprContextSPtr& conjunct, RuntimeState* state,
353
19
                                         std::vector<TableFilter>* table_filters) {
354
19
    if (conjunct == nullptr) {
355
0
        return Status::OK();
356
0
    }
357
19
    std::set<GlobalIndex> global_indices;
358
19
    collect_global_indices(conjunct->root(), &global_indices);
359
19
    if (!global_indices.empty()) {
360
19
        TableFilter table_filter;
361
19
        VExprSPtr filter_root;
362
19
        RETURN_IF_ERROR(clone_table_expr_tree(conjunct->root(), &filter_root));
363
19
        table_filter.conjunct = VExprContext::create_shared(std::move(filter_root));
364
20
        for (const auto global_index : global_indices) {
365
20
            table_filter.global_indices.push_back(global_index);
366
20
        }
367
19
        table_filters->push_back(std::move(table_filter));
368
19
    }
369
19
    return Status::OK();
370
19
}
371
372
Status parse_deletion_vector(const char* buf, size_t buffer_size, DeleteFileDesc::Format format,
373
4
                             DeleteRows* delete_rows) {
374
4
    DORIS_CHECK(buf != nullptr);
375
4
    DORIS_CHECK(delete_rows != nullptr);
376
4
    DORIS_CHECK(format == DeleteFileDesc::Format::PAIMON ||
377
4
                format == DeleteFileDesc::Format::ICEBERG);
378
379
4
    const size_t checksum_size = format == DeleteFileDesc::Format::ICEBERG ? 4 : 0;
380
4
    if (buffer_size < 8 + checksum_size) [[unlikely]] {
381
0
        return Status::DataQualityError("Deletion vector file size too small: {}", buffer_size);
382
0
    }
383
384
4
    auto total_length = BigEndian::Load32(buf);
385
4
    if (total_length + 4 + checksum_size != buffer_size) [[unlikely]] {
386
0
        return Status::DataQualityError("Deletion vector length mismatch, expected: {}, actual: {}",
387
0
                                        total_length + 4 + checksum_size, buffer_size);
388
0
    }
389
390
4
    const char* bitmap_buf = buf + 8;
391
4
    const size_t bitmap_size = buffer_size - 8 - checksum_size;
392
4
    if (format == DeleteFileDesc::Format::PAIMON) {
393
        // Paimon BitmapDeletionVector stores:
394
        //   [4-byte big-endian length][4-byte magic 0x5E43F2D0][32-bit roaring bitmap]
395
        // The length covers magic + bitmap, and does not include the leading length field.
396
1
        constexpr static char PAIMON_BITMAP_MAGIC[] = {'\x5E', '\x43', '\xF2', '\xD0'};
397
1
        if (memcmp(buf + sizeof(total_length), PAIMON_BITMAP_MAGIC, 4) != 0) [[unlikely]] {
398
0
            return Status::DataQualityError(
399
0
                    "Paimon deletion vector magic number mismatch, expected: {}, actual: {}",
400
0
                    BigEndian::Load32(PAIMON_BITMAP_MAGIC),
401
0
                    BigEndian::Load32(buf + sizeof(total_length)));
402
0
        }
403
404
1
        roaring::Roaring bitmap;
405
1
        try {
406
1
            bitmap = roaring::Roaring::readSafe(bitmap_buf, bitmap_size);
407
1
        } catch (const std::runtime_error& e) {
408
0
            return Status::DataQualityError("Decode roaring bitmap failed, {}", e.what());
409
0
        }
410
411
1
        delete_rows->reserve(bitmap.cardinality());
412
3
        for (auto it = bitmap.begin(); it != bitmap.end(); it++) {
413
2
            delete_rows->push_back(*it);
414
2
        }
415
1
        return Status::OK();
416
1
    }
417
418
3
    constexpr static char ICEBERG_DV_MAGIC[] = {'\xD1', '\xD3', '\x39', '\x64'};
419
3
    if (memcmp(buf + sizeof(total_length), ICEBERG_DV_MAGIC, 4) != 0) [[unlikely]] {
420
0
        return Status::DataQualityError(
421
0
                "Iceberg deletion vector magic number mismatch, expected: {}, actual: {}",
422
0
                BigEndian::Load32(ICEBERG_DV_MAGIC), BigEndian::Load32(buf + sizeof(total_length)));
423
0
    }
424
425
3
    roaring::Roaring64Map bitmap;
426
3
    try {
427
3
        bitmap = roaring::Roaring64Map::readSafe(bitmap_buf, bitmap_size);
428
3
    } catch (const std::runtime_error& e) {
429
0
        return Status::DataQualityError("Decode roaring bitmap failed, {}", e.what());
430
0
    }
431
432
3
    delete_rows->reserve(bitmap.cardinality());
433
7
    for (auto it = bitmap.begin(); it != bitmap.end(); it++) {
434
4
        delete_rows->push_back(cast_set<int64_t>(*it));
435
4
    }
436
3
    return Status::OK();
437
3
}
438
439
} // namespace
440
441
std::shared_ptr<io::FileSystemProperties> create_system_properties(
442
70
        const TFileScanRangeParams* scan_params) {
443
70
    auto system_properties = std::make_shared<io::FileSystemProperties>();
444
70
    if (scan_params == nullptr || !scan_params->__isset.file_type) {
445
56
        system_properties->system_type = TFileType::FILE_LOCAL;
446
56
        return system_properties;
447
56
    }
448
14
    system_properties->system_type = scan_params->file_type;
449
14
    system_properties->properties = scan_params->properties;
450
14
    system_properties->hdfs_params = scan_params->hdfs_params;
451
14
    if (scan_params->__isset.broker_addresses) {
452
0
        system_properties->broker_addresses.assign(scan_params->broker_addresses.begin(),
453
0
                                                   scan_params->broker_addresses.end());
454
0
    }
455
14
    return system_properties;
456
70
}
457
458
0
std::string TableReader::debug_string() const {
459
0
    std::ostringstream out;
460
0
    out << "TableReader{format=" << file_format_to_string(_format)
461
0
        << ", push_down_agg_type=" << push_down_agg_to_string(_push_down_agg_type)
462
0
        << ", aggregate_pushdown_tried=" << _aggregate_pushdown_tried
463
0
        << ", has_current_reader=" << (_data_reader.reader != nullptr)
464
0
        << ", has_current_task=" << (_current_task != nullptr)
465
0
        << ", current_file=" << current_file_debug_string(_current_task)
466
0
        << ", has_delete_rows=" << (_delete_rows != nullptr)
467
0
        << ", delete_row_count=" << (_delete_rows == nullptr ? 0 : _delete_rows->size())
468
0
        << ", has_system_properties=" << (_system_properties != nullptr) << ", system_type="
469
0
        << (_system_properties == nullptr ? static_cast<int>(TFileType::FILE_LOCAL)
470
0
                                          : static_cast<int>(_system_properties->system_type))
471
0
        << ", has_scan_params=" << (_scan_params != nullptr)
472
0
        << ", has_io_ctx=" << (_io_ctx != nullptr)
473
0
        << ", has_runtime_state=" << (_runtime_state != nullptr)
474
0
        << ", has_scanner_profile=" << (_scanner_profile != nullptr)
475
0
        << ", mapper_options=" << _mapper_options.debug_string() << ", projected_columns="
476
0
        << join_table_reader_debug_strings(
477
0
                   _projected_columns,
478
0
                   [](const ColumnDefinition& column) { return column.debug_string(); })
479
0
        << ", partition_values=" << partition_values_debug_string(_partition_values)
480
0
        << ", table_filters="
481
0
        << join_table_reader_debug_strings(
482
0
                   _table_filters,
483
0
                   [](const TableFilter& filter) { return table_filter_debug_string(filter); })
484
0
        << ", table_column_predicates="
485
0
        << table_column_predicates_debug_string(_table_column_predicates)
486
0
        << ", conjunct_count=" << _conjuncts.size() << ", conjuncts="
487
0
        << join_table_reader_debug_strings(_conjuncts,
488
0
                                           [](const VExprContextSPtr& conjunct) {
489
0
                                               return expr_context_debug_string(conjunct);
490
0
                                           })
491
0
        << ", file_schema="
492
0
        << join_table_reader_debug_strings(
493
0
                   _data_reader.file_schema,
494
0
                   [](const ColumnDefinition& field) { return field.debug_string(); })
495
0
        << ", file_block_layout="
496
0
        << join_table_reader_debug_strings(
497
0
                   _data_reader.file_block_layout,
498
0
                   [](const FileBlockColumn& column) {
499
0
                       std::ostringstream column_out;
500
0
                       column_out << "FileBlockColumn{file_column_id=" << column.file_column_id
501
0
                                  << ", name=" << column.name << ", type="
502
0
                                  << (column.type == nullptr ? "null" : column.type->get_name())
503
0
                                  << "}";
504
0
                       return column_out.str();
505
0
                   })
506
0
        << ", block_template_columns=" << _data_reader.block_template.columns()
507
0
        << ", column_mapper="
508
0
        << (_data_reader.column_mapper == nullptr ? "null"
509
0
                                                  : _data_reader.column_mapper->debug_string())
510
0
        << "}";
511
0
    return out.str();
512
0
}
513
514
Status TableReader::annotate_projected_column(const TFileScanSlotInfo& slot_info,
515
                                              ProjectedColumnBuildContext* context,
516
6
                                              ColumnDefinition* column) const {
517
6
    (void)slot_info;
518
6
    DORIS_CHECK(context != nullptr);
519
6
    DORIS_CHECK(column != nullptr);
520
6
    context->schema_column.reset();
521
6
    const auto* schema_field = find_external_root_field(context->scan_params, *column);
522
6
    if (schema_field == nullptr) {
523
6
        return Status::OK();
524
6
    }
525
0
    context->schema_column = build_schema_column_from_external_field(*schema_field, column->type);
526
0
    column->identifier = context->schema_column->identifier;
527
0
    column->name_mapping = context->schema_column->name_mapping;
528
0
    return Status::OK();
529
6
}
530
531
70
Status TableReader::init(TableReadOptions&& options) {
532
70
    _scan_params = options.scan_params;
533
70
    _format = options.format;
534
70
    _io_ctx = options.io_ctx;
535
70
    _runtime_state = options.runtime_state;
536
70
    _scanner_profile = options.scanner_profile;
537
70
    _file_slot_descs = options.file_slot_descs;
538
70
    _push_down_agg_type = options.push_down_agg_type;
539
70
    _condition_cache_digest = options.condition_cache_digest;
540
70
    _projected_columns = std::move(options.projected_columns);
541
70
    _system_properties = create_system_properties(_scan_params);
542
70
    _mapper_options.mode = TableColumnMappingMode::BY_NAME;
543
70
    _conjuncts = std::move(options.conjuncts);
544
70
    _table_column_predicates = std::move(options.column_predicates);
545
546
70
    if (_scanner_profile != nullptr) {
547
17
        static const char* table_profile = "TableReader";
548
17
        ADD_TIMER_WITH_LEVEL(_scanner_profile, table_profile, 1);
549
17
        _profile.num_delete_files = ADD_CHILD_COUNTER_WITH_LEVEL(_scanner_profile, "NumDeleteFiles",
550
17
                                                                 TUnit::UNIT, table_profile, 1);
551
17
        _profile.num_delete_rows = ADD_CHILD_COUNTER_WITH_LEVEL(_scanner_profile, "NumDeleteRows",
552
17
                                                                TUnit::UNIT, table_profile, 1);
553
17
        _profile.parse_delete_file_time = ADD_CHILD_TIMER_WITH_LEVEL(
554
17
                _scanner_profile, "ParseDeleteFileTime", table_profile, 1);
555
17
        _profile.exec_timer =
556
17
                ADD_CHILD_TIMER_WITH_LEVEL(_scanner_profile, "GetBlockTime", table_profile, 1);
557
17
        _profile.prepare_split_timer =
558
17
                ADD_CHILD_TIMER_WITH_LEVEL(_scanner_profile, "PrepareSplitTime", table_profile, 1);
559
17
        _profile.finalize_timer =
560
17
                ADD_CHILD_TIMER_WITH_LEVEL(_scanner_profile, "FinalizeBlockTime", table_profile, 1);
561
17
        _profile.create_reader_timer =
562
17
                ADD_CHILD_TIMER_WITH_LEVEL(_scanner_profile, "CreateReaderTime", table_profile, 1);
563
17
        _profile.pushdown_agg_timer =
564
17
                ADD_CHILD_TIMER_WITH_LEVEL(_scanner_profile, "PushDownAggTime", table_profile, 1);
565
17
        _profile.open_reader_timer =
566
17
                ADD_CHILD_TIMER_WITH_LEVEL(_scanner_profile, "OpenReaderTime", table_profile, 1);
567
17
    }
568
70
    return Status::OK();
569
70
}
570
571
64
Status TableReader::_build_table_filters_from_conjuncts() {
572
64
    _table_filters.clear();
573
64
    for (const auto& conjunct : _conjuncts) {
574
19
        RETURN_IF_ERROR(
575
19
                build_table_filters_from_conjunct(conjunct, _runtime_state, &_table_filters));
576
19
    }
577
64
    return Status::OK();
578
64
}
579
580
63
Status TableReader::_open_local_filter_exprs(const FileScanRequest& file_request) {
581
63
    RowDescriptor row_desc;
582
63
    for (const auto& conjunct : file_request.conjuncts) {
583
14
        RETURN_IF_ERROR(conjunct->prepare(_runtime_state, row_desc));
584
14
        RETURN_IF_ERROR(conjunct->open(_runtime_state));
585
14
    }
586
63
    for (const auto& delete_conjunct : file_request.delete_conjuncts) {
587
12
        RETURN_IF_ERROR(delete_conjunct->prepare(_runtime_state, row_desc));
588
12
        RETURN_IF_ERROR(delete_conjunct->open(_runtime_state));
589
12
    }
590
63
    return Status::OK();
591
63
}
592
593
63
bool TableReader::_should_enable_condition_cache(const FileScanRequest& file_request) const {
594
63
    if (_condition_cache_digest == 0 || _push_down_agg_type == TPushAggOp::type::COUNT ||
595
63
        _current_file_description == std::nullopt || _data_reader.reader == nullptr) {
596
58
        return false;
597
58
    }
598
    // Condition cache is populated by file readers after evaluating file-local row-level
599
    // conjuncts. ColumnPredicate-only scans can prune row groups/pages, but they do not produce a
600
    // per-row survivor bitmap that can safely populate the cache.
601
5
    if (file_request.conjuncts.empty()) {
602
1
        return false;
603
1
    }
604
    // Delete files/deletion vectors are table-format state. They may change independently of the
605
    // data file path/mtime/size used by the external cache key, so caching their result can become
606
    // stale. Keep delete filtering enabled, but do not read or write condition cache.
607
4
    if (_delete_rows != nullptr || !file_request.delete_conjuncts.empty()) {
608
1
        return false;
609
1
    }
610
    // Runtime filters can arrive late and their payload is not guaranteed to be represented by the
611
    // scan-local digest. Without a read-only mode, a MISS could insert a bitmap for P AND RF under
612
    // the digest for only P. This mirrors the old FileScanner guard.
613
3
    return !contains_runtime_filter(file_request.conjuncts);
614
4
}
615
616
63
Status TableReader::_init_reader_condition_cache(const FileScanRequest& file_request) {
617
63
    _condition_cache = nullptr;
618
63
    _condition_cache_ctx = nullptr;
619
63
    if (!_should_enable_condition_cache(file_request)) {
620
61
        return Status::OK();
621
61
    }
622
623
2
    auto* cache = segment_v2::ConditionCache::instance();
624
2
    if (cache == nullptr) {
625
0
        return Status::OK();
626
0
    }
627
2
    const auto& file = *_current_file_description;
628
2
    _condition_cache_key = segment_v2::ConditionCache::ExternalCacheKey(
629
2
            file.path, file.mtime, file.file_size, _condition_cache_digest, file.range_start_offset,
630
2
            file.range_size);
631
632
2
    segment_v2::ConditionCacheHandle handle;
633
2
    const bool condition_cache_hit = cache->lookup(_condition_cache_key, &handle);
634
2
    if (condition_cache_hit) {
635
0
        _condition_cache = handle.get_filter_result();
636
0
        ++_condition_cache_hit_count;
637
2
    } else {
638
2
        const int64_t total_rows = _data_reader.reader->get_total_rows();
639
2
        if (total_rows <= 0) {
640
0
            return Status::OK();
641
0
        }
642
        // Add one guard granule for split ranges that start in the middle of a granule. A guard
643
        // false bit beyond the real range never overlaps real rows, but avoids boundary overflow
644
        // when a reader marks the last partial granule.
645
2
        const size_t num_granules = (total_rows + ConditionCacheContext::GRANULE_SIZE - 1) /
646
2
                                    ConditionCacheContext::GRANULE_SIZE;
647
2
        _condition_cache = std::make_shared<std::vector<bool>>(num_granules + 1, false);
648
2
    }
649
650
2
    if (_condition_cache != nullptr) {
651
2
        _condition_cache_ctx = std::make_shared<ConditionCacheContext>();
652
2
        _condition_cache_ctx->is_hit = condition_cache_hit;
653
2
        _condition_cache_ctx->filter_result = _condition_cache;
654
2
        _data_reader.reader->set_condition_cache_context(_condition_cache_ctx);
655
2
    }
656
2
    return Status::OK();
657
2
}
658
659
64
void TableReader::_finalize_reader_condition_cache() {
660
64
    if (_condition_cache_ctx == nullptr || _condition_cache_ctx->is_hit) {
661
62
        _condition_cache = nullptr;
662
62
        _condition_cache_ctx = nullptr;
663
62
        return;
664
62
    }
665
    // LIMIT or scanner cancellation may close a reader before all selected row ranges are visited.
666
    // Unvisited granules remain false in a MISS bitmap, so inserting a partial bitmap would make a
667
    // later HIT skip valid rows. Only publish cache entries after the physical reader reaches EOF.
668
2
    if (!_current_reader_reached_eof) {
669
1
        _condition_cache = nullptr;
670
1
        _condition_cache_ctx = nullptr;
671
1
        return;
672
1
    }
673
1
    segment_v2::ConditionCache::instance()->insert(_condition_cache_key,
674
1
                                                   std::move(_condition_cache));
675
1
    _condition_cache = nullptr;
676
1
    _condition_cache_ctx = nullptr;
677
1
}
678
679
75
Status TableReader::create_next_reader(bool* eos) {
680
75
    SCOPED_TIMER(_profile.create_reader_timer);
681
75
    DCHECK(_data_reader.reader == nullptr);
682
75
    if (_current_task == nullptr) {
683
11
        *eos = true;
684
11
        return Status::OK();
685
11
    }
686
687
64
    RETURN_IF_ERROR(create_file_reader(&_data_reader.reader));
688
64
    DORIS_CHECK(_data_reader.reader != nullptr);
689
64
    if (_batch_size > 0) {
690
0
        _data_reader.reader->set_batch_size(_batch_size);
691
0
    }
692
64
    RETURN_IF_ERROR(_data_reader.reader->init(_runtime_state));
693
64
    RETURN_IF_ERROR(open_reader());
694
64
    if (_data_reader.reader == nullptr) {
695
1
        *eos = _current_task == nullptr;
696
1
        return Status::OK();
697
1
    }
698
63
    *eos = false;
699
63
    return Status::OK();
700
64
}
701
702
57
Status TableReader::create_file_reader(std::unique_ptr<FileReader>* reader) {
703
57
    DORIS_CHECK(reader != nullptr);
704
57
    if (_format == FileFormat::PARQUET) {
705
57
        const bool enable_mapping_timestamp_tz =
706
57
                _scan_params != nullptr && _scan_params->__isset.enable_mapping_timestamp_tz &&
707
57
                _scan_params->enable_mapping_timestamp_tz;
708
57
        *reader = std::make_unique<format::parquet::ParquetReader>(
709
57
                _system_properties, _current_task->data_file, _io_ctx, _scanner_profile,
710
57
                _global_rowid_context, enable_mapping_timestamp_tz);
711
57
        return Status::OK();
712
57
    }
713
0
    if (_format == FileFormat::CSV) {
714
0
        if (_file_slot_descs == nullptr) {
715
0
            return Status::InvalidArgument("CSV reader requires file slot descriptors");
716
0
        }
717
        // CSV has no embedded schema. TableReader owns table-level mapping, while CsvReader needs
718
        // only the physical file slots plus scan text parameters to build a file-local schema.
719
        // Non-file columns such as partitions/defaults/virtual row ids are intentionally excluded
720
        // from `_file_slot_descs` and are materialized during finalize_chunk().
721
0
        *reader = std::make_unique<format::csv::CsvReader>(
722
0
                _system_properties, _current_task->data_file, _io_ctx, _scanner_profile,
723
0
                _scan_params, *_file_slot_descs, _current_range_compress_type,
724
0
                _current_range_load_id);
725
0
        return Status::OK();
726
0
    }
727
0
    if (_format == FileFormat::TEXT) {
728
0
        if (_file_slot_descs == nullptr) {
729
0
            return Status::InvalidArgument("Text reader requires file slot descriptors");
730
0
        }
731
        // Text files have no embedded schema. As with CSV, TableReader handles table-level mapping
732
        // and only passes physical file slots to the v2 TextReader.
733
0
        *reader = std::make_unique<format::text::TextReader>(
734
0
                _system_properties, _current_task->data_file, _io_ctx, _scanner_profile,
735
0
                _scan_params, *_file_slot_descs, _current_range_compress_type,
736
0
                _current_range_load_id);
737
0
        return Status::OK();
738
0
    }
739
0
    if (_format == FileFormat::JSON) {
740
0
        if (_file_slot_descs == nullptr) {
741
0
            return Status::InvalidArgument("JSON reader requires file slot descriptors");
742
0
        }
743
0
        *reader = std::make_unique<format::json::JsonReader>(
744
0
                _system_properties, _current_task->data_file, _io_ctx, _scanner_profile,
745
0
                _scan_params, _current_file_range_desc, *_file_slot_descs,
746
0
                _current_range_compress_type, _current_range_load_id);
747
0
        return Status::OK();
748
0
    }
749
0
    if (_format == FileFormat::NATIVE) {
750
0
        *reader = std::make_unique<format::native::NativeReader>(
751
0
                _system_properties, _current_task->data_file, _io_ctx, _scanner_profile);
752
0
        return Status::OK();
753
0
    }
754
0
    return Status::NotSupported("TableReader does not support file format {}",
755
0
                                file_format_to_string(_format));
756
0
}
757
758
76
std::unique_ptr<io::FileDescription> create_file_description(const TFileRangeDesc& range) {
759
76
    auto file_description = std::make_unique<io::FileDescription>();
760
76
    file_description->path = range.path;
761
76
    file_description->file_size = range.__isset.file_size ? range.file_size : -1;
762
76
    file_description->mtime = range.__isset.modification_time ? range.modification_time : 0;
763
76
    file_description->range_start_offset = range.__isset.start_offset ? range.start_offset : 0;
764
76
    file_description->range_size = range.__isset.size ? range.size : -1;
765
76
    if (range.__isset.fs_name) {
766
0
        file_description->fs_name = range.fs_name;
767
0
    }
768
76
    if (range.__isset.file_cache_admission) {
769
0
        file_description->file_cache_admission = range.file_cache_admission;
770
0
    }
771
76
    return file_description;
772
76
}
773
774
76
Status TableReader::prepare_split(const SplitReadOptions& options) {
775
76
    SCOPED_TIMER(_profile.prepare_split_timer);
776
76
    _partition_values = std::move(options.partition_values);
777
76
    _current_task = std::make_unique<ScanTask>();
778
76
    _current_task->data_file = create_file_description(options.current_range);
779
76
    _current_file_description = *_current_task->data_file;
780
76
    _current_file_range_desc = options.current_range;
781
76
    _current_range_compress_type = options.current_range.__isset.compress_type
782
76
                                           ? options.current_range.compress_type
783
76
                                           : TFileCompressType::UNKNOWN;
784
76
    _current_range_load_id = options.current_range.__isset.load_id
785
76
                                     ? std::make_optional(options.current_range.load_id)
786
76
                                     : std::nullopt;
787
76
    _global_rowid_context = options.global_rowid_context;
788
76
    _delete_rows = nullptr;
789
76
    _aggregate_pushdown_tried = false;
790
76
    _remaining_table_level_count = -1;
791
76
    _current_reader_reached_eof = false;
792
76
    if (_push_down_agg_type == TPushAggOp::type::COUNT &&
793
76
        options.current_range.__isset.table_format_params &&
794
76
        options.current_range.table_format_params.__isset.table_level_row_count) {
795
6
        DORIS_CHECK(options.current_range.table_format_params.table_level_row_count >= -1);
796
6
        _remaining_table_level_count =
797
6
                options.current_range.table_format_params.table_level_row_count;
798
6
    }
799
76
    if (_is_table_level_count_active()) {
800
2
        return Status::OK();
801
2
    }
802
74
    return _parse_delete_predicates(options);
803
76
}
804
805
74
Status TableReader::_parse_delete_predicates(const SplitReadOptions& options) {
806
74
    DeleteFileDesc desc {.fs_name = options.current_range.fs_name};
807
74
    bool has_delete_file = false;
808
74
    RETURN_IF_ERROR(_parse_deletion_vector_file(options.current_range.table_format_params, &desc,
809
74
                                                &has_delete_file));
810
74
    if (has_delete_file) {
811
4
        DORIS_CHECK(options.cache != nullptr);
812
4
        Status create_status = Status::OK();
813
814
4
        _delete_rows = options.cache->get<DeleteRows>(desc.key, [&]() -> DeleteRows* {
815
4
            auto* delete_rows = new DeleteRows;
816
817
4
            DeletionVectorReader dv_reader(_runtime_state, _scanner_profile, *_scan_params, desc,
818
4
                                           _io_ctx.get());
819
4
            create_status = dv_reader.open();
820
4
            if (!create_status.ok()) [[unlikely]] {
821
0
                return nullptr;
822
0
            }
823
824
4
            size_t bytes_read = desc.size;
825
4
            std::vector<char> buffer(bytes_read);
826
4
            create_status = dv_reader.read_at(desc.start_offset, {buffer.data(), bytes_read});
827
4
            if (!create_status.ok()) [[unlikely]] {
828
0
                return nullptr;
829
0
            }
830
831
4
            const char* buf = buffer.data();
832
4
            SCOPED_TIMER(_profile.parse_delete_file_time);
833
4
            create_status = parse_deletion_vector(buf, bytes_read, desc.format, delete_rows);
834
4
            if (!create_status.ok()) [[unlikely]] {
835
0
                return nullptr;
836
0
            }
837
4
            COUNTER_UPDATE(_profile.num_delete_rows, delete_rows->size());
838
4
            return delete_rows;
839
4
        });
840
4
        RETURN_IF_ERROR(create_status);
841
4
    }
842
843
74
    return Status::OK();
844
74
}
845
} // namespace doris::format