Coverage Report

Created: 2026-06-25 13:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format_v2/table_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format_v2/table_reader.h"
19
20
#include <gen_cpp/ExternalTableSchema_types.h>
21
#include <gen_cpp/PlanNodes_types.h>
22
#include <gen_cpp/Types_types.h>
23
24
#include <algorithm>
25
#include <cstring>
26
#include <ranges>
27
#include <set>
28
#include <sstream>
29
#include <stdexcept>
30
#include <utility>
31
#include <vector>
32
33
#include "common/cast_set.h"
34
#include "common/status.h"
35
#include "core/assert_cast.h"
36
#include "core/data_type/data_type_array.h"
37
#include "core/data_type/data_type_map.h"
38
#include "core/data_type/data_type_struct.h"
39
#include "exec/common/endian.h"
40
#include "exprs/vexpr_context.h"
41
#include "exprs/vslot_ref.h"
42
#include "format/table/deletion_vector_reader.h"
43
#include "format_v2/column_mapper.h"
44
#include "format_v2/delimited_text/csv_reader.h"
45
#include "format_v2/delimited_text/text_reader.h"
46
#include "format_v2/json/json_reader.h"
47
#include "format_v2/parquet/parquet_reader.h"
48
#include "roaring/roaring64map.hh"
49
#include "storage/segment/condition_cache.h"
50
#include "util/string_util.h"
51
52
namespace doris::format {
53
namespace {
54
55
template <typename T, typename Formatter>
56
0
std::string join_table_reader_debug_strings(const std::vector<T>& values, Formatter formatter) {
57
0
    std::ostringstream out;
58
0
    out << "[";
59
0
    for (size_t i = 0; i < values.size(); ++i) {
60
0
        if (i > 0) {
61
0
            out << ", ";
62
0
        }
63
0
        out << formatter(values[i]);
64
0
    }
65
0
    out << "]";
66
0
    return out.str();
67
0
}
Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_16ColumnDefinitionEZNKS0_11TableReader12debug_stringB5cxx11EvE3$_0EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_
Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_11TableFilterEZNKS0_11TableReader12debug_stringB5cxx11EvE3$_1EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_
Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_11GlobalIndexEZNS1_25table_filter_debug_stringB5cxx11ERKNS0_11TableFilterEE3$_0EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISF_EET0_
Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsISt10shared_ptrINS_12VExprContextEEZNKS0_11TableReader12debug_stringB5cxx11EvE3$_2EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISF_EET0_
Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_16ColumnDefinitionEZNKS0_11TableReader12debug_stringB5cxx11EvE3$_3EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_
Unexecuted instantiation: table_reader.cpp:_ZN5doris6format12_GLOBAL__N_131join_table_reader_debug_stringsINS0_11TableReader15FileBlockColumnEZNKS3_12debug_stringB5cxx11EvE3$_4EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_
68
69
0
std::string file_format_to_string(FileFormat format) {
70
0
    switch (format) {
71
0
    case FileFormat::PARQUET:
72
0
        return "PARQUET";
73
0
    case FileFormat::ORC:
74
0
        return "ORC";
75
0
    case FileFormat::CSV:
76
0
        return "CSV";
77
0
    case FileFormat::JSON:
78
0
        return "JSON";
79
0
    case FileFormat::TEXT:
80
0
        return "TEXT";
81
0
    case FileFormat::JNI:
82
0
        return "JNI";
83
0
    }
84
0
    return "UNKNOWN";
85
0
}
86
87
0
std::string push_down_agg_to_string(TPushAggOp::type op) {
88
0
    switch (op) {
89
0
    case TPushAggOp::NONE:
90
0
        return "NONE";
91
0
    case TPushAggOp::COUNT:
92
0
        return "COUNT";
93
0
    case TPushAggOp::MINMAX:
94
0
        return "MINMAX";
95
0
    case TPushAggOp::MIX:
96
0
        return "MIX";
97
0
    case TPushAggOp::COUNT_ON_INDEX:
98
0
        return "COUNT_ON_INDEX";
99
0
    }
100
0
    return "UNKNOWN";
101
0
}
102
103
0
std::string current_file_debug_string(const std::unique_ptr<ScanTask>& task) {
104
0
    if (task == nullptr || task->data_file == nullptr) {
105
0
        return "null";
106
0
    }
107
0
    const auto& file = *task->data_file;
108
0
    std::ostringstream out;
109
0
    out << "FileDescription{path=" << file.path << ", file_size=" << file.file_size
110
0
        << ", range_start_offset=" << file.range_start_offset << ", range_size=" << file.range_size
111
0
        << ", mtime=" << file.mtime << ", fs_name=" << file.fs_name
112
0
        << ", file_cache_admission=" << file.file_cache_admission << "}";
113
0
    return out.str();
114
0
}
115
116
0
std::string partition_values_debug_string(const std::map<std::string, Field>& partition_values) {
117
0
    std::ostringstream out;
118
0
    out << "{";
119
0
    size_t idx = 0;
120
0
    for (const auto& [key, _] : partition_values) {
121
0
        if (idx++ > 0) {
122
0
            out << ", ";
123
0
        }
124
0
        out << key;
125
0
    }
126
0
    out << "}";
127
0
    return out.str();
128
0
}
129
130
0
const schema::external::TField* get_field_ptr(const schema::external::TFieldPtr& field_ptr) {
131
0
    if (!field_ptr.__isset.field_ptr || field_ptr.field_ptr == nullptr) {
132
0
        return nullptr;
133
0
    }
134
0
    return field_ptr.field_ptr.get();
135
0
}
136
137
0
bool external_field_matches_name(const schema::external::TField& field, const std::string& name) {
138
0
    if (field.__isset.name && to_lower(field.name) == to_lower(name)) {
139
0
        return true;
140
0
    }
141
0
    return field.__isset.name_mapping &&
142
0
           std::ranges::any_of(field.name_mapping, [&](const std::string& alias) {
143
0
               return to_lower(alias) == to_lower(name);
144
0
           });
145
0
}
146
147
DataTypePtr find_struct_child_type_by_name(const DataTypeStruct& struct_type,
148
0
                                           const std::string& field_name) {
149
0
    for (size_t field_idx = 0; field_idx < struct_type.get_elements().size(); ++field_idx) {
150
0
        if (to_lower(struct_type.get_element_name(field_idx)) == to_lower(field_name)) {
151
0
            return struct_type.get_element(field_idx);
152
0
        }
153
0
    }
154
0
    return nullptr;
155
0
}
156
157
ColumnDefinition build_schema_column_from_external_field(const schema::external::TField& field,
158
0
                                                         DataTypePtr type) {
159
0
    ColumnDefinition column {
160
0
            .identifier = field.__isset.id ? Field::create_field<TYPE_INT>(field.id) : Field {},
161
0
            .name = field.__isset.name ? field.name : "",
162
0
            .name_mapping =
163
0
                    field.__isset.name_mapping ? field.name_mapping : std::vector<std::string> {},
164
0
            .type = std::move(type),
165
0
            .children = {},
166
0
            .default_expr = nullptr,
167
0
            .is_partition_key = false,
168
0
    };
169
0
    if (column.type == nullptr || !field.__isset.nestedField) {
170
0
        return column;
171
0
    }
172
173
0
    const auto nested_type = remove_nullable(column.type);
174
0
    switch (nested_type->get_primitive_type()) {
175
0
    case TYPE_STRUCT: {
176
0
        if (!field.nestedField.__isset.struct_field ||
177
0
            !field.nestedField.struct_field.__isset.fields) {
178
0
            return column;
179
0
        }
180
0
        const auto& struct_type = assert_cast<const DataTypeStruct&>(*nested_type);
181
0
        for (const auto& child_ptr : field.nestedField.struct_field.fields) {
182
0
            const auto* child_field = get_field_ptr(child_ptr);
183
0
            if (child_field == nullptr || !child_field->__isset.name) {
184
0
                continue;
185
0
            }
186
0
            auto child_type = find_struct_child_type_by_name(struct_type, child_field->name);
187
0
            if (child_type == nullptr) {
188
0
                continue;
189
0
            }
190
0
            column.children.push_back(
191
0
                    build_schema_column_from_external_field(*child_field, child_type));
192
0
        }
193
0
        break;
194
0
    }
195
0
    case TYPE_ARRAY: {
196
0
        if (!field.nestedField.__isset.array_field ||
197
0
            !field.nestedField.array_field.__isset.item_field) {
198
0
            return column;
199
0
        }
200
0
        const auto* item_field = get_field_ptr(field.nestedField.array_field.item_field);
201
0
        if (item_field == nullptr) {
202
0
            return column;
203
0
        }
204
0
        const auto& array_type = assert_cast<const DataTypeArray&>(*nested_type);
205
0
        auto child =
206
0
                build_schema_column_from_external_field(*item_field, array_type.get_nested_type());
207
0
        child.name = "element";
208
0
        if (child.has_identifier_name()) {
209
0
            child.identifier = Field::create_field<TYPE_STRING>(child.name);
210
0
        }
211
0
        column.children.push_back(std::move(child));
212
0
        break;
213
0
    }
214
0
    case TYPE_MAP: {
215
0
        if (!field.nestedField.__isset.map_field ||
216
0
            !field.nestedField.map_field.__isset.key_field ||
217
0
            !field.nestedField.map_field.__isset.value_field) {
218
0
            return column;
219
0
        }
220
0
        const auto& map_type = assert_cast<const DataTypeMap&>(*nested_type);
221
0
        const auto* key_field = get_field_ptr(field.nestedField.map_field.key_field);
222
0
        if (key_field != nullptr) {
223
0
            auto child =
224
0
                    build_schema_column_from_external_field(*key_field, map_type.get_key_type());
225
0
            child.name = "key";
226
0
            if (child.has_identifier_name()) {
227
0
                child.identifier = Field::create_field<TYPE_STRING>(child.name);
228
0
            }
229
0
            column.children.push_back(std::move(child));
230
0
        }
231
0
        const auto* value_field = get_field_ptr(field.nestedField.map_field.value_field);
232
0
        if (value_field != nullptr) {
233
0
            auto child = build_schema_column_from_external_field(*value_field,
234
0
                                                                 map_type.get_value_type());
235
0
            child.name = "value";
236
0
            if (child.has_identifier_name()) {
237
0
                child.identifier = Field::create_field<TYPE_STRING>(child.name);
238
0
            }
239
0
            column.children.push_back(std::move(child));
240
0
        }
241
0
        break;
242
0
    }
243
0
    default:
244
0
        break;
245
0
    }
246
0
    return column;
247
0
}
248
249
const schema::external::TField* find_external_root_field(const TFileScanRangeParams* params,
250
6
                                                         const ColumnDefinition& column) {
251
6
    if (params == nullptr || !params->__isset.history_schema_info ||
252
6
        params->history_schema_info.empty()) {
253
6
        return nullptr;
254
6
    }
255
0
    const auto* schema = &params->history_schema_info.front();
256
0
    if (params->__isset.current_schema_id) {
257
0
        for (const auto& candidate_schema : params->history_schema_info) {
258
0
            if (candidate_schema.__isset.schema_id &&
259
0
                candidate_schema.schema_id == params->current_schema_id) {
260
0
                schema = &candidate_schema;
261
0
                break;
262
0
            }
263
0
        }
264
0
    }
265
0
    if (!schema->__isset.root_field || !schema->root_field.__isset.fields) {
266
0
        return nullptr;
267
0
    }
268
0
    for (const auto& field_ptr : schema->root_field.fields) {
269
0
        const auto* field = get_field_ptr(field_ptr);
270
0
        if (field == nullptr) {
271
0
            continue;
272
0
        }
273
0
        if (external_field_matches_name(*field, column.name)) {
274
0
            return field;
275
0
        }
276
0
    }
277
0
    return nullptr;
278
0
}
279
280
0
std::string expr_context_debug_string(const VExprContextSPtr& context) {
281
0
    if (context == nullptr) {
282
0
        return "null";
283
0
    }
284
0
    const auto root = context->root();
285
0
    if (root == nullptr) {
286
0
        return "VExprContext{root=null}";
287
0
    }
288
0
    std::ostringstream out;
289
0
    out << "VExprContext{root_name=" << root->expr_name() << ", root_debug=" << root->debug_string()
290
0
        << "}";
291
0
    return out.str();
292
0
}
293
294
0
std::string table_filter_debug_string(const TableFilter& filter) {
295
0
    std::ostringstream out;
296
0
    out << "TableFilter{conjunct=" << expr_context_debug_string(filter.conjunct)
297
0
        << ", global_indices="
298
0
        << join_table_reader_debug_strings(
299
0
                   filter.global_indices,
300
0
                   [](GlobalIndex global_index) { return std::to_string(global_index.value()); })
301
0
        << "}";
302
0
    return out.str();
303
0
}
304
305
0
std::string table_column_predicates_debug_string(const TableColumnPredicates& predicates) {
306
0
    std::ostringstream out;
307
0
    out << "{";
308
0
    size_t idx = 0;
309
0
    for (const auto& [global_index, column_predicates] : predicates) {
310
0
        if (idx++ > 0) {
311
0
            out << ", ";
312
0
        }
313
0
        out << global_index.value() << ":{predicate_count=" << column_predicates.size() << "}";
314
0
    }
315
0
    out << "}";
316
0
    return out.str();
317
0
}
318
319
3
bool contains_runtime_filter(const VExprContextSPtrs& conjuncts) {
320
3
    return std::ranges::any_of(conjuncts, [](const auto& conjunct) {
321
3
        return conjunct != nullptr && conjunct->root() != nullptr &&
322
3
               conjunct->root()->is_rf_wrapper();
323
3
    });
324
3
}
325
326
61
void collect_global_indices(const VExprSPtr& expr, std::set<GlobalIndex>* global_indices) {
327
61
    if (expr == nullptr) {
328
0
        return;
329
0
    }
330
61
    if (expr->is_rf_wrapper()) {
331
        // RuntimeFilterExpr wraps a real predicate expression but its own thrift node can still
332
        // look like SLOT_REF. Collect indices from the wrapped predicate; do not cast the wrapper
333
        // itself to VSlotRef.
334
2
        collect_global_indices(expr->get_impl(), global_indices);
335
2
        return;
336
2
    }
337
59
    if (expr->is_slot_ref()) {
338
20
        const auto* slot_ref = assert_cast<const VSlotRef*>(expr.get());
339
20
        DORIS_CHECK(slot_ref->column_id() >= 0);
340
20
        global_indices->insert(GlobalIndex(cast_set<size_t>(slot_ref->column_id())));
341
20
    }
342
59
    for (const auto& child : expr->children()) {
343
40
        collect_global_indices(child, global_indices);
344
40
    }
345
59
}
346
347
Status build_table_filters_from_conjunct(const VExprContextSPtr& conjunct, RuntimeState* state,
348
19
                                         std::vector<TableFilter>* table_filters) {
349
19
    if (conjunct == nullptr) {
350
0
        return Status::OK();
351
0
    }
352
19
    std::set<GlobalIndex> global_indices;
353
19
    collect_global_indices(conjunct->root(), &global_indices);
354
19
    if (!global_indices.empty()) {
355
19
        TableFilter table_filter;
356
19
        VExprSPtr filter_root;
357
19
        RETURN_IF_ERROR(clone_table_expr_tree(conjunct->root(), &filter_root));
358
19
        table_filter.conjunct = VExprContext::create_shared(std::move(filter_root));
359
20
        for (const auto global_index : global_indices) {
360
20
            table_filter.global_indices.push_back(global_index);
361
20
        }
362
19
        table_filters->push_back(std::move(table_filter));
363
19
    }
364
19
    return Status::OK();
365
19
}
366
367
Status parse_deletion_vector(const char* buf, size_t buffer_size, DeleteFileDesc::Format format,
368
4
                             DeleteRows* delete_rows) {
369
4
    DORIS_CHECK(buf != nullptr);
370
4
    DORIS_CHECK(delete_rows != nullptr);
371
4
    DORIS_CHECK(format == DeleteFileDesc::Format::PAIMON ||
372
4
                format == DeleteFileDesc::Format::ICEBERG);
373
374
4
    const size_t checksum_size = format == DeleteFileDesc::Format::ICEBERG ? 4 : 0;
375
4
    if (buffer_size < 8 + checksum_size) [[unlikely]] {
376
0
        return Status::DataQualityError("Deletion vector file size too small: {}", buffer_size);
377
0
    }
378
379
4
    auto total_length = BigEndian::Load32(buf);
380
4
    if (total_length + 4 + checksum_size != buffer_size) [[unlikely]] {
381
0
        return Status::DataQualityError("Deletion vector length mismatch, expected: {}, actual: {}",
382
0
                                        total_length + 4 + checksum_size, buffer_size);
383
0
    }
384
385
4
    const char* bitmap_buf = buf + 8;
386
4
    const size_t bitmap_size = buffer_size - 8 - checksum_size;
387
4
    if (format == DeleteFileDesc::Format::PAIMON) {
388
        // Paimon BitmapDeletionVector stores:
389
        //   [4-byte big-endian length][4-byte magic 0x5E43F2D0][32-bit roaring bitmap]
390
        // The length covers magic + bitmap, and does not include the leading length field.
391
1
        constexpr static char PAIMON_BITMAP_MAGIC[] = {'\x5E', '\x43', '\xF2', '\xD0'};
392
1
        if (memcmp(buf + sizeof(total_length), PAIMON_BITMAP_MAGIC, 4) != 0) [[unlikely]] {
393
0
            return Status::DataQualityError(
394
0
                    "Paimon deletion vector magic number mismatch, expected: {}, actual: {}",
395
0
                    BigEndian::Load32(PAIMON_BITMAP_MAGIC),
396
0
                    BigEndian::Load32(buf + sizeof(total_length)));
397
0
        }
398
399
1
        roaring::Roaring bitmap;
400
1
        try {
401
1
            bitmap = roaring::Roaring::readSafe(bitmap_buf, bitmap_size);
402
1
        } catch (const std::runtime_error& e) {
403
0
            return Status::DataQualityError("Decode roaring bitmap failed, {}", e.what());
404
0
        }
405
406
1
        delete_rows->reserve(bitmap.cardinality());
407
3
        for (auto it = bitmap.begin(); it != bitmap.end(); it++) {
408
2
            delete_rows->push_back(*it);
409
2
        }
410
1
        return Status::OK();
411
1
    }
412
413
3
    constexpr static char ICEBERG_DV_MAGIC[] = {'\xD1', '\xD3', '\x39', '\x64'};
414
3
    if (memcmp(buf + sizeof(total_length), ICEBERG_DV_MAGIC, 4) != 0) [[unlikely]] {
415
0
        return Status::DataQualityError(
416
0
                "Iceberg deletion vector magic number mismatch, expected: {}, actual: {}",
417
0
                BigEndian::Load32(ICEBERG_DV_MAGIC), BigEndian::Load32(buf + sizeof(total_length)));
418
0
    }
419
420
3
    roaring::Roaring64Map bitmap;
421
3
    try {
422
3
        bitmap = roaring::Roaring64Map::readSafe(bitmap_buf, bitmap_size);
423
3
    } catch (const std::runtime_error& e) {
424
0
        return Status::DataQualityError("Decode roaring bitmap failed, {}", e.what());
425
0
    }
426
427
3
    delete_rows->reserve(bitmap.cardinality());
428
7
    for (auto it = bitmap.begin(); it != bitmap.end(); it++) {
429
4
        delete_rows->push_back(cast_set<int64_t>(*it));
430
4
    }
431
3
    return Status::OK();
432
3
}
433
434
} // namespace
435
436
std::shared_ptr<io::FileSystemProperties> create_system_properties(
437
70
        const TFileScanRangeParams* scan_params) {
438
70
    auto system_properties = std::make_shared<io::FileSystemProperties>();
439
70
    if (scan_params == nullptr || !scan_params->__isset.file_type) {
440
56
        system_properties->system_type = TFileType::FILE_LOCAL;
441
56
        return system_properties;
442
56
    }
443
14
    system_properties->system_type = scan_params->file_type;
444
14
    system_properties->properties = scan_params->properties;
445
14
    system_properties->hdfs_params = scan_params->hdfs_params;
446
14
    if (scan_params->__isset.broker_addresses) {
447
0
        system_properties->broker_addresses.assign(scan_params->broker_addresses.begin(),
448
0
                                                   scan_params->broker_addresses.end());
449
0
    }
450
14
    return system_properties;
451
70
}
452
453
0
std::string TableReader::debug_string() const {
454
0
    std::ostringstream out;
455
0
    out << "TableReader{format=" << file_format_to_string(_format)
456
0
        << ", push_down_agg_type=" << push_down_agg_to_string(_push_down_agg_type)
457
0
        << ", aggregate_pushdown_tried=" << _aggregate_pushdown_tried
458
0
        << ", has_current_reader=" << (_data_reader.reader != nullptr)
459
0
        << ", has_current_task=" << (_current_task != nullptr)
460
0
        << ", current_file=" << current_file_debug_string(_current_task)
461
0
        << ", has_delete_rows=" << (_delete_rows != nullptr)
462
0
        << ", delete_row_count=" << (_delete_rows == nullptr ? 0 : _delete_rows->size())
463
0
        << ", has_system_properties=" << (_system_properties != nullptr) << ", system_type="
464
0
        << (_system_properties == nullptr ? static_cast<int>(TFileType::FILE_LOCAL)
465
0
                                          : static_cast<int>(_system_properties->system_type))
466
0
        << ", has_scan_params=" << (_scan_params != nullptr)
467
0
        << ", has_io_ctx=" << (_io_ctx != nullptr)
468
0
        << ", has_runtime_state=" << (_runtime_state != nullptr)
469
0
        << ", has_scanner_profile=" << (_scanner_profile != nullptr)
470
0
        << ", mapper_options=" << _mapper_options.debug_string() << ", projected_columns="
471
0
        << join_table_reader_debug_strings(
472
0
                   _projected_columns,
473
0
                   [](const ColumnDefinition& column) { return column.debug_string(); })
474
0
        << ", partition_values=" << partition_values_debug_string(_partition_values)
475
0
        << ", table_filters="
476
0
        << join_table_reader_debug_strings(
477
0
                   _table_filters,
478
0
                   [](const TableFilter& filter) { return table_filter_debug_string(filter); })
479
0
        << ", table_column_predicates="
480
0
        << table_column_predicates_debug_string(_table_column_predicates)
481
0
        << ", conjunct_count=" << _conjuncts.size() << ", conjuncts="
482
0
        << join_table_reader_debug_strings(_conjuncts,
483
0
                                           [](const VExprContextSPtr& conjunct) {
484
0
                                               return expr_context_debug_string(conjunct);
485
0
                                           })
486
0
        << ", file_schema="
487
0
        << join_table_reader_debug_strings(
488
0
                   _data_reader.file_schema,
489
0
                   [](const ColumnDefinition& field) { return field.debug_string(); })
490
0
        << ", file_block_layout="
491
0
        << join_table_reader_debug_strings(
492
0
                   _data_reader.file_block_layout,
493
0
                   [](const FileBlockColumn& column) {
494
0
                       std::ostringstream column_out;
495
0
                       column_out << "FileBlockColumn{file_column_id=" << column.file_column_id
496
0
                                  << ", name=" << column.name << ", type="
497
0
                                  << (column.type == nullptr ? "null" : column.type->get_name())
498
0
                                  << "}";
499
0
                       return column_out.str();
500
0
                   })
501
0
        << ", block_template_columns=" << _data_reader.block_template.columns()
502
0
        << ", column_mapper="
503
0
        << (_data_reader.column_mapper == nullptr ? "null"
504
0
                                                  : _data_reader.column_mapper->debug_string())
505
0
        << "}";
506
0
    return out.str();
507
0
}
508
509
Status TableReader::annotate_projected_column(const TFileScanSlotInfo& slot_info,
510
                                              ProjectedColumnBuildContext* context,
511
6
                                              ColumnDefinition* column) const {
512
6
    (void)slot_info;
513
6
    DORIS_CHECK(context != nullptr);
514
6
    DORIS_CHECK(column != nullptr);
515
6
    context->schema_column.reset();
516
6
    const auto* schema_field = find_external_root_field(context->scan_params, *column);
517
6
    if (schema_field == nullptr) {
518
6
        return Status::OK();
519
6
    }
520
0
    context->schema_column = build_schema_column_from_external_field(*schema_field, column->type);
521
0
    column->identifier = context->schema_column->identifier;
522
0
    column->name_mapping = context->schema_column->name_mapping;
523
0
    return Status::OK();
524
6
}
525
526
70
Status TableReader::init(TableReadOptions&& options) {
527
70
    _scan_params = options.scan_params;
528
70
    _format = options.format;
529
70
    _io_ctx = options.io_ctx;
530
70
    _runtime_state = options.runtime_state;
531
70
    _scanner_profile = options.scanner_profile;
532
70
    _file_slot_descs = options.file_slot_descs;
533
70
    _push_down_agg_type = options.push_down_agg_type;
534
70
    _condition_cache_digest = options.condition_cache_digest;
535
70
    _projected_columns = std::move(options.projected_columns);
536
70
    _system_properties = create_system_properties(_scan_params);
537
70
    _mapper_options.mode = TableColumnMappingMode::BY_NAME;
538
70
    _conjuncts = std::move(options.conjuncts);
539
70
    _table_column_predicates = std::move(options.column_predicates);
540
541
70
    if (_scanner_profile != nullptr) {
542
17
        static const char* table_profile = "TableReader";
543
17
        ADD_TIMER_WITH_LEVEL(_scanner_profile, table_profile, 1);
544
17
        _profile.num_delete_files = ADD_CHILD_COUNTER_WITH_LEVEL(_scanner_profile, "NumDeleteFiles",
545
17
                                                                 TUnit::UNIT, table_profile, 1);
546
17
        _profile.num_delete_rows = ADD_CHILD_COUNTER_WITH_LEVEL(_scanner_profile, "NumDeleteRows",
547
17
                                                                TUnit::UNIT, table_profile, 1);
548
17
        _profile.parse_delete_file_time = ADD_CHILD_TIMER_WITH_LEVEL(
549
17
                _scanner_profile, "ParseDeleteFileTime", table_profile, 1);
550
17
        _profile.exec_timer =
551
17
                ADD_CHILD_TIMER_WITH_LEVEL(_scanner_profile, "GetBlockTime", table_profile, 1);
552
17
        _profile.prepare_split_timer =
553
17
                ADD_CHILD_TIMER_WITH_LEVEL(_scanner_profile, "PrepareSplitTime", table_profile, 1);
554
17
        _profile.finalize_timer =
555
17
                ADD_CHILD_TIMER_WITH_LEVEL(_scanner_profile, "FinalizeBlockTime", table_profile, 1);
556
17
        _profile.create_reader_timer =
557
17
                ADD_CHILD_TIMER_WITH_LEVEL(_scanner_profile, "CreateReaderTime", table_profile, 1);
558
17
        _profile.pushdown_agg_timer =
559
17
                ADD_CHILD_TIMER_WITH_LEVEL(_scanner_profile, "PushDownAggTime", table_profile, 1);
560
17
        _profile.open_reader_timer =
561
17
                ADD_CHILD_TIMER_WITH_LEVEL(_scanner_profile, "OpenReaderTime", table_profile, 1);
562
17
    }
563
70
    return Status::OK();
564
70
}
565
566
64
Status TableReader::_build_table_filters_from_conjuncts() {
567
64
    _table_filters.clear();
568
64
    for (const auto& conjunct : _conjuncts) {
569
19
        RETURN_IF_ERROR(
570
19
                build_table_filters_from_conjunct(conjunct, _runtime_state, &_table_filters));
571
19
    }
572
64
    return Status::OK();
573
64
}
574
575
63
Status TableReader::_open_local_filter_exprs(const FileScanRequest& file_request) {
576
63
    RowDescriptor row_desc;
577
63
    for (const auto& conjunct : file_request.conjuncts) {
578
14
        RETURN_IF_ERROR(conjunct->prepare(_runtime_state, row_desc));
579
14
        RETURN_IF_ERROR(conjunct->open(_runtime_state));
580
14
    }
581
63
    for (const auto& delete_conjunct : file_request.delete_conjuncts) {
582
12
        RETURN_IF_ERROR(delete_conjunct->prepare(_runtime_state, row_desc));
583
12
        RETURN_IF_ERROR(delete_conjunct->open(_runtime_state));
584
12
    }
585
63
    return Status::OK();
586
63
}
587
588
63
bool TableReader::_should_enable_condition_cache(const FileScanRequest& file_request) const {
589
63
    if (_condition_cache_digest == 0 || _push_down_agg_type == TPushAggOp::type::COUNT ||
590
63
        _current_file_description == std::nullopt || _data_reader.reader == nullptr) {
591
58
        return false;
592
58
    }
593
    // Condition cache is populated by file readers after evaluating file-local row-level
594
    // conjuncts. ColumnPredicate-only scans can prune row groups/pages, but they do not produce a
595
    // per-row survivor bitmap that can safely populate the cache.
596
5
    if (file_request.conjuncts.empty()) {
597
1
        return false;
598
1
    }
599
    // Delete files/deletion vectors are table-format state. They may change independently of the
600
    // data file path/mtime/size used by the external cache key, so caching their result can become
601
    // stale. Keep delete filtering enabled, but do not read or write condition cache.
602
4
    if (_delete_rows != nullptr || !file_request.delete_conjuncts.empty()) {
603
1
        return false;
604
1
    }
605
    // Runtime filters can arrive late and their payload is not guaranteed to be represented by the
606
    // scan-local digest. Without a read-only mode, a MISS could insert a bitmap for P AND RF under
607
    // the digest for only P. This mirrors the old FileScanner guard.
608
3
    return !contains_runtime_filter(file_request.conjuncts);
609
4
}
610
611
63
Status TableReader::_init_reader_condition_cache(const FileScanRequest& file_request) {
612
63
    _condition_cache = nullptr;
613
63
    _condition_cache_ctx = nullptr;
614
63
    if (!_should_enable_condition_cache(file_request)) {
615
61
        return Status::OK();
616
61
    }
617
618
2
    auto* cache = segment_v2::ConditionCache::instance();
619
2
    if (cache == nullptr) {
620
0
        return Status::OK();
621
0
    }
622
2
    const auto& file = *_current_file_description;
623
2
    _condition_cache_key = segment_v2::ConditionCache::ExternalCacheKey(
624
2
            file.path, file.mtime, file.file_size, _condition_cache_digest, file.range_start_offset,
625
2
            file.range_size);
626
627
2
    segment_v2::ConditionCacheHandle handle;
628
2
    const bool condition_cache_hit = cache->lookup(_condition_cache_key, &handle);
629
2
    if (condition_cache_hit) {
630
0
        _condition_cache = handle.get_filter_result();
631
0
        ++_condition_cache_hit_count;
632
2
    } else {
633
2
        const int64_t total_rows = _data_reader.reader->get_total_rows();
634
2
        if (total_rows <= 0) {
635
0
            return Status::OK();
636
0
        }
637
        // Add one guard granule for split ranges that start in the middle of a granule. A guard
638
        // false bit beyond the real range never overlaps real rows, but avoids boundary overflow
639
        // when a reader marks the last partial granule.
640
2
        const size_t num_granules = (total_rows + ConditionCacheContext::GRANULE_SIZE - 1) /
641
2
                                    ConditionCacheContext::GRANULE_SIZE;
642
2
        _condition_cache = std::make_shared<std::vector<bool>>(num_granules + 1, false);
643
2
    }
644
645
2
    if (_condition_cache != nullptr) {
646
2
        _condition_cache_ctx = std::make_shared<ConditionCacheContext>();
647
2
        _condition_cache_ctx->is_hit = condition_cache_hit;
648
2
        _condition_cache_ctx->filter_result = _condition_cache;
649
2
        _data_reader.reader->set_condition_cache_context(_condition_cache_ctx);
650
2
    }
651
2
    return Status::OK();
652
2
}
653
654
64
void TableReader::_finalize_reader_condition_cache() {
655
64
    if (_condition_cache_ctx == nullptr || _condition_cache_ctx->is_hit) {
656
62
        _condition_cache = nullptr;
657
62
        _condition_cache_ctx = nullptr;
658
62
        return;
659
62
    }
660
    // LIMIT or scanner cancellation may close a reader before all selected row ranges are visited.
661
    // Unvisited granules remain false in a MISS bitmap, so inserting a partial bitmap would make a
662
    // later HIT skip valid rows. Only publish cache entries after the physical reader reaches EOF.
663
2
    if (!_current_reader_reached_eof) {
664
1
        _condition_cache = nullptr;
665
1
        _condition_cache_ctx = nullptr;
666
1
        return;
667
1
    }
668
1
    segment_v2::ConditionCache::instance()->insert(_condition_cache_key,
669
1
                                                   std::move(_condition_cache));
670
1
    _condition_cache = nullptr;
671
1
    _condition_cache_ctx = nullptr;
672
1
}
673
674
75
Status TableReader::create_next_reader(bool* eos) {
675
75
    SCOPED_TIMER(_profile.create_reader_timer);
676
75
    DCHECK(_data_reader.reader == nullptr);
677
75
    if (_current_task == nullptr) {
678
11
        *eos = true;
679
11
        return Status::OK();
680
11
    }
681
682
64
    RETURN_IF_ERROR(create_file_reader(&_data_reader.reader));
683
64
    DORIS_CHECK(_data_reader.reader != nullptr);
684
64
    RETURN_IF_ERROR(_data_reader.reader->init(_runtime_state));
685
64
    RETURN_IF_ERROR(open_reader());
686
64
    if (_data_reader.reader == nullptr) {
687
1
        *eos = _current_task == nullptr;
688
1
        return Status::OK();
689
1
    }
690
63
    *eos = false;
691
63
    return Status::OK();
692
64
}
693
694
57
Status TableReader::create_file_reader(std::unique_ptr<FileReader>* reader) {
695
57
    DORIS_CHECK(reader != nullptr);
696
57
    if (_format == FileFormat::PARQUET) {
697
57
        const bool enable_mapping_timestamp_tz =
698
57
                _scan_params != nullptr && _scan_params->__isset.enable_mapping_timestamp_tz &&
699
57
                _scan_params->enable_mapping_timestamp_tz;
700
57
        *reader = std::make_unique<format::parquet::ParquetReader>(
701
57
                _system_properties, _current_task->data_file, _io_ctx, _scanner_profile,
702
57
                _global_rowid_context, enable_mapping_timestamp_tz);
703
57
        return Status::OK();
704
57
    }
705
0
    if (_format == FileFormat::CSV) {
706
0
        if (_file_slot_descs == nullptr) {
707
0
            return Status::InvalidArgument("CSV reader requires file slot descriptors");
708
0
        }
709
        // CSV has no embedded schema. TableReader owns table-level mapping, while CsvReader needs
710
        // only the physical file slots plus scan text parameters to build a file-local schema.
711
        // Non-file columns such as partitions/defaults/virtual row ids are intentionally excluded
712
        // from `_file_slot_descs` and are materialized during finalize_chunk().
713
0
        *reader = std::make_unique<format::csv::CsvReader>(
714
0
                _system_properties, _current_task->data_file, _io_ctx, _scanner_profile,
715
0
                _scan_params, *_file_slot_descs, _current_range_compress_type,
716
0
                _current_range_load_id);
717
0
        return Status::OK();
718
0
    }
719
0
    if (_format == FileFormat::TEXT) {
720
0
        if (_file_slot_descs == nullptr) {
721
0
            return Status::InvalidArgument("Text reader requires file slot descriptors");
722
0
        }
723
        // Text files have no embedded schema. As with CSV, TableReader handles table-level mapping
724
        // and only passes physical file slots to the v2 TextReader.
725
0
        *reader = std::make_unique<format::text::TextReader>(
726
0
                _system_properties, _current_task->data_file, _io_ctx, _scanner_profile,
727
0
                _scan_params, *_file_slot_descs, _current_range_compress_type,
728
0
                _current_range_load_id);
729
0
        return Status::OK();
730
0
    }
731
0
    if (_format == FileFormat::JSON) {
732
0
        if (_file_slot_descs == nullptr) {
733
0
            return Status::InvalidArgument("JSON reader requires file slot descriptors");
734
0
        }
735
0
        *reader = std::make_unique<format::json::JsonReader>(
736
0
                _system_properties, _current_task->data_file, _io_ctx, _scanner_profile,
737
0
                _scan_params, _current_file_range_desc, *_file_slot_descs,
738
0
                _current_range_compress_type, _current_range_load_id);
739
0
        return Status::OK();
740
0
    }
741
0
    return Status::NotSupported("TableReader does not support file format {}",
742
0
                                file_format_to_string(_format));
743
0
}
744
745
76
std::unique_ptr<io::FileDescription> create_file_description(const TFileRangeDesc& range) {
746
76
    auto file_description = std::make_unique<io::FileDescription>();
747
76
    file_description->path = range.path;
748
76
    file_description->file_size = range.__isset.file_size ? range.file_size : -1;
749
76
    file_description->mtime = range.__isset.modification_time ? range.modification_time : 0;
750
76
    file_description->range_start_offset = range.__isset.start_offset ? range.start_offset : 0;
751
76
    file_description->range_size = range.__isset.size ? range.size : -1;
752
76
    if (range.__isset.fs_name) {
753
0
        file_description->fs_name = range.fs_name;
754
0
    }
755
76
    if (range.__isset.file_cache_admission) {
756
0
        file_description->file_cache_admission = range.file_cache_admission;
757
0
    }
758
76
    return file_description;
759
76
}
760
761
76
Status TableReader::prepare_split(const SplitReadOptions& options) {
762
76
    SCOPED_TIMER(_profile.prepare_split_timer);
763
76
    _partition_values = std::move(options.partition_values);
764
76
    _current_task = std::make_unique<ScanTask>();
765
76
    _current_task->data_file = create_file_description(options.current_range);
766
76
    _current_file_description = *_current_task->data_file;
767
76
    _current_file_range_desc = options.current_range;
768
76
    _current_range_compress_type = options.current_range.__isset.compress_type
769
76
                                           ? options.current_range.compress_type
770
76
                                           : TFileCompressType::UNKNOWN;
771
76
    _current_range_load_id = options.current_range.__isset.load_id
772
76
                                     ? std::make_optional(options.current_range.load_id)
773
76
                                     : std::nullopt;
774
76
    _global_rowid_context = options.global_rowid_context;
775
76
    _delete_rows = nullptr;
776
76
    _aggregate_pushdown_tried = false;
777
76
    _remaining_table_level_count = -1;
778
76
    _current_reader_reached_eof = false;
779
76
    if (_push_down_agg_type == TPushAggOp::type::COUNT &&
780
76
        options.current_range.__isset.table_format_params &&
781
76
        options.current_range.table_format_params.__isset.table_level_row_count) {
782
6
        DORIS_CHECK(options.current_range.table_format_params.table_level_row_count >= -1);
783
6
        _remaining_table_level_count =
784
6
                options.current_range.table_format_params.table_level_row_count;
785
6
    }
786
76
    if (_is_table_level_count_active()) {
787
2
        return Status::OK();
788
2
    }
789
74
    return _parse_delete_predicates(options);
790
76
}
791
792
74
Status TableReader::_parse_delete_predicates(const SplitReadOptions& options) {
793
74
    DeleteFileDesc desc {.fs_name = options.current_range.fs_name};
794
74
    bool has_delete_file = false;
795
74
    RETURN_IF_ERROR(_parse_deletion_vector_file(options.current_range.table_format_params, &desc,
796
74
                                                &has_delete_file));
797
74
    if (has_delete_file) {
798
4
        DORIS_CHECK(options.cache != nullptr);
799
4
        Status create_status = Status::OK();
800
801
4
        _delete_rows = options.cache->get<DeleteRows>(desc.key, [&]() -> DeleteRows* {
802
4
            auto* delete_rows = new DeleteRows;
803
804
4
            DeletionVectorReader dv_reader(_runtime_state, _scanner_profile, *_scan_params, desc,
805
4
                                           _io_ctx.get());
806
4
            create_status = dv_reader.open();
807
4
            if (!create_status.ok()) [[unlikely]] {
808
0
                return nullptr;
809
0
            }
810
811
4
            size_t bytes_read = desc.size;
812
4
            std::vector<char> buffer(bytes_read);
813
4
            create_status = dv_reader.read_at(desc.start_offset, {buffer.data(), bytes_read});
814
4
            if (!create_status.ok()) [[unlikely]] {
815
0
                return nullptr;
816
0
            }
817
818
4
            const char* buf = buffer.data();
819
4
            SCOPED_TIMER(_profile.parse_delete_file_time);
820
4
            create_status = parse_deletion_vector(buf, bytes_read, desc.format, delete_rows);
821
4
            if (!create_status.ok()) [[unlikely]] {
822
0
                return nullptr;
823
0
            }
824
4
            COUNTER_UPDATE(_profile.num_delete_rows, delete_rows->size());
825
4
            return delete_rows;
826
4
        });
827
4
        RETURN_IF_ERROR(create_status);
828
4
    }
829
830
74
    return Status::OK();
831
74
}
832
} // namespace doris::format