Coverage Report

Created: 2026-06-25 12:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format_v2/table_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <bvar/status.h>
21
22
#include <algorithm>
23
#include <exception>
24
#include <map>
25
#include <memory>
26
#include <optional>
27
#include <string>
28
#include <string_view>
29
#include <utility>
30
#include <vector>
31
32
#include "common/cast_set.h"
33
#include "common/exception.h"
34
#include "common/logging.h"
35
#include "common/status.h"
36
#include "core/assert_cast.h"
37
#include "core/block/block.h"
38
#include "core/column/column_array.h"
39
#include "core/column/column_const.h"
40
#include "core/column/column_map.h"
41
#include "core/column/column_nullable.h"
42
#include "core/column/column_struct.h"
43
#include "core/column/column_vector.h"
44
#include "core/data_type/data_type.h"
45
#include "core/data_type/data_type_array.h"
46
#include "core/data_type/data_type_map.h"
47
#include "core/data_type/data_type_nullable.h"
48
#include "core/data_type/data_type_number.h"
49
#include "core/data_type/data_type_string.h"
50
#include "core/data_type/data_type_struct.h"
51
#include "core/field.h"
52
#include "exec/common/stringop_substring.h"
53
#include "exprs/vexpr.h"
54
#include "exprs/vexpr_context.h"
55
#include "exprs/vexpr_fwd.h"
56
#include "exprs/vslot_ref.h"
57
#include "format_v2/column_data.h"
58
#include "format_v2/column_mapper.h"
59
#include "format_v2/expr/cast.h"
60
#include "format_v2/expr/delete_predicate.h"
61
#include "format_v2/file_reader.h"
62
#include "format_v2/parquet/reader/column_reader.h"
63
#include "format_v2/schema_projection.h"
64
#include "gen_cpp/PlanNodes_types.h"
65
#include "runtime/descriptors.h"
66
#include "storage/segment/condition_cache.h"
67
68
namespace doris {
69
class Block;
70
class ColumnPredicate;
71
struct DeleteFileDesc;
72
class RuntimeState;
73
} // namespace doris
74
75
namespace doris::format {
76
77
using DeleteRows = std::vector<int64_t>;
78
79
// Row-level predicates on table/global schema. They are rewritten to file-local expressions when
80
// possible, and remain the source of row-level filtering after localization.
81
struct TableFilter {
82
    VExprContextSPtr conjunct;
83
    std::vector<GlobalIndex> global_indices;
84
};
85
86
struct ScanTask {
87
76
    virtual ~ScanTask() = default;
88
89
    std::unique_ptr<io::FileDescription> data_file;
90
};
91
92
struct ProjectedColumnBuildContext {
93
    const TFileScanRangeParams* scan_params = nullptr;
94
    const TFileRangeDesc* range = nullptr;
95
    RuntimeState* runtime_state = nullptr;
96
    std::optional<ColumnDefinition> schema_column = std::nullopt;
97
    size_t next_file_column_idx = 0;
98
};
99
100
struct ReadProfile {
101
    RuntimeProfile::Counter* num_delete_files = nullptr;
102
    RuntimeProfile::Counter* num_delete_rows = nullptr;
103
    RuntimeProfile::Counter* parse_delete_file_time = nullptr;
104
    RuntimeProfile::Counter* exec_timer = nullptr;
105
    RuntimeProfile::Counter* prepare_split_timer = nullptr;
106
    RuntimeProfile::Counter* finalize_timer = nullptr;
107
    RuntimeProfile::Counter* create_reader_timer = nullptr;
108
    RuntimeProfile::Counter* pushdown_agg_timer = nullptr;
109
    RuntimeProfile::Counter* open_reader_timer = nullptr;
110
};
111
112
struct TableReadOptions {
113
    // Columns need to be read from file and output by table reader. They are all in table/global
114
    // schema semantics.
115
    const std::vector<ColumnDefinition> projected_columns;
116
    // Simple predicates for a single column, which is parsed on scan operator.
117
    const TableColumnPredicates column_predicates;
118
    // All complex conjuncts from scan operator
119
    const VExprContextSPtrs conjuncts;
120
    // File format of the underlying data files, needed for reader initialization and reader-level
121
    // filter pushdown.
122
    const FileFormat format;
123
    TFileScanRangeParams* scan_params;
124
    std::shared_ptr<io::IOContext> io_ctx;
125
    RuntimeState* runtime_state;
126
    RuntimeProfile* scanner_profile;
127
    // File formats without self-describing metadata, such as CSV, need the original FE slot
128
    // descriptors to build their file-local schema and deserialize values. Self-describing formats
129
    // ignore this field and use metadata parsed from the file footer/header.
130
    const std::vector<SlotDescriptor*>* file_slot_descs = nullptr;
131
    // Push-down aggregate type.
132
    const TPushAggOp::type push_down_agg_type = TPushAggOp::type::NONE;
133
    // Digest of stable pushed-down predicates. A zero digest disables condition cache.
134
    uint64_t condition_cache_digest = 0;
135
};
136
137
struct SplitReadOptions {
138
    // Split-level information for reader initialization, which may include file path, partition values, delete file info, etc. The content is table format specific and opaque to table reader base class; it's the responsibility of the concrete table reader implementation to parse necessary information for reader initialization and filter pushdown.
139
    std::map<std::string, Field> partition_values;
140
    ShardedKVCache* cache;
141
    TFileRangeDesc current_range;
142
    std::optional<GlobalRowIdContext> global_rowid_context;
143
};
144
145
// Base class for table-level readers.
146
// This layer owns common table-level orchestration, such as split iteration, dynamic partition
147
// pruning, delete handling and conversion from file-local blocks to table-schema blocks. Concrete
148
// table-format readers only need to provide format-specific hooks for opening readers and parsing
149
// split metadata.
150
class TableReader {
151
public:
152
86
    virtual ~TableReader() = default;
153
154
    // Initialize common runtime options for the table reader. Subclasses may call this from their
155
    // own init(options); table-format schema and split metadata are provided later per split.
156
    virtual Status init(TableReadOptions&& options);
157
158
    // Prepare for reading a new split/task.
159
    // 1. Pass a new split/task to reader, which will be used in subsequent open_reader() to initialize the underlying file reader.
160
    // 2. Parse delete predicates from split/task information, which will be used for later dynamic filtering and delete handling.
161
    virtual Status prepare_split(const SplitReadOptions& options);
162
163
    // Public entry point for reading a table-schema block. The base class opens the current reader,
164
    // advances across EOF, and closes exhausted readers. Subclasses provide protected hooks for
165
    // table-format-specific behavior.
166
80
    virtual Status get_block(Block* block, bool* eos) {
167
80
        SCOPED_TIMER(_profile.exec_timer);
168
80
        DORIS_CHECK(block->columns() == _projected_columns.size());
169
80
        block->clear_column_data(_projected_columns.size());
170
171
90
        while (true) {
172
90
            if (*eos) {
173
0
                return Status::OK();
174
0
            }
175
90
            if (!_data_reader.reader) {
176
81
                if (_is_table_level_count_active()) {
177
6
                    RETURN_IF_ERROR(_read_table_level_count(block, eos));
178
6
                    return Status::OK();
179
6
                }
180
75
                RETURN_IF_ERROR(create_next_reader(eos));
181
75
                if (!_data_reader.reader) {
182
12
                    DCHECK(*eos);
183
12
                    return Status::OK();
184
12
                }
185
75
            }
186
187
            // Materialize a reduced row set for upper aggregate operators when aggregate
188
            // pushdown can be applied. This is not the final aggregate result: COUNT emits
189
            // `count` default rows for the upper COUNT(*), and MIN/MAX emits two rows containing
190
            // file-level min/max values for the upper MIN/MAX.
191
72
            if (!_aggregate_pushdown_tried) {
192
63
                SCOPED_TIMER(_profile.pushdown_agg_timer);
193
63
                bool pushed_down = false;
194
63
                RETURN_IF_ERROR(_try_materialize_aggregate_pushdown_rows(block, &pushed_down));
195
63
                if (pushed_down) {
196
6
                    return Status::OK();
197
6
                }
198
63
            }
199
200
66
            bool current_eof = false;
201
66
            _data_reader.block_template.clear_column_data(
202
66
                    cast_set<int64_t>(_data_reader.file_block_layout.size()));
203
66
            size_t current_rows = 0;
204
66
            RETURN_IF_ERROR(_data_reader.reader->get_block(&_data_reader.block_template,
205
66
                                                           &current_rows, &current_eof));
206
66
            if (current_rows == 0) {
207
10
                if (current_eof) {
208
10
                    _current_reader_reached_eof = true;
209
10
                    RETURN_IF_ERROR(close_current_reader());
210
10
                }
211
10
                continue;
212
10
            }
213
66
            DCHECK_EQ(_data_reader.block_template.columns(), _data_reader.file_block_layout.size())
214
0
                    << _data_reader.block_template.dump_structure();
215
56
#ifndef NDEBUG
216
56
            RETURN_IF_ERROR(_check_file_block_columns("after file reader get_block", current_rows));
217
56
#endif
218
56
            DORIS_CHECK(block->columns() == _data_reader.column_mapper->mappings().size());
219
56
            RETURN_IF_ERROR(finalize_chunk(block, current_rows));
220
56
#ifndef NDEBUG
221
56
            RETURN_IF_ERROR(
222
56
                    _check_table_block_columns("after finalize_chunk", block, current_rows));
223
56
#endif
224
56
            if (current_eof) {
225
6
                _current_reader_reached_eof = true;
226
6
                RETURN_IF_ERROR(close_current_reader());
227
6
            }
228
56
            return Status::OK();
229
56
        }
230
80
    }
231
232
    // Close the table reader and the currently active file reader. Subclasses that hold additional
233
    // table-format resources should override this and call TableReader::close() first.
234
66
    virtual Status close() {
235
66
        if (_data_reader.reader) {
236
41
            RETURN_IF_ERROR(close_current_reader());
237
41
        }
238
66
        _current_task.reset();
239
66
        _current_file_description.reset();
240
66
        _remaining_table_level_count = -1;
241
66
        return Status::OK();
242
66
    }
243
244
3
    int64_t condition_cache_hit_count() const { return _condition_cache_hit_count; }
245
246
    virtual std::string debug_string() const;
247
248
    virtual Status annotate_projected_column(const TFileScanSlotInfo& slot_info,
249
                                             ProjectedColumnBuildContext* context,
250
                                             ColumnDefinition* column) const;
251
252
0
    virtual Status validate_projected_columns(const ProjectedColumnBuildContext& context) const {
253
0
        (void)context;
254
0
        return Status::OK();
255
0
    }
256
257
protected:
258
    // Parse deletion vector information from table format specific file description.
259
    virtual Status _parse_deletion_vector_file(const TTableFormatFileDesc& t_desc,
260
48
                                               DeleteFileDesc* desc, bool* has_delete_file) {
261
48
        *has_delete_file = false;
262
48
        return Status::OK();
263
48
    }
264
265
    // Advance to the next reader. This closes the current reader first and then opens the next
266
    // concrete reader. Subclasses should not duplicate this loop.
267
    Status create_next_reader(bool* eos);
268
    virtual Status create_file_reader(std::unique_ptr<FileReader>* reader);
269
44
    virtual TableColumnMappingMode mapping_mode() const { return TableColumnMappingMode::BY_NAME; }
270
63
    virtual Status annotate_file_schema(std::vector<ColumnDefinition>* file_schema) {
271
63
        DORIS_CHECK(file_schema != nullptr);
272
63
        return Status::OK();
273
63
    }
274
275
    // Open the concrete reader for the current split/task and build the file-local scan request.
276
64
    virtual Status open_reader() {
277
64
        SCOPED_TIMER(_profile.open_reader_timer);
278
        // 1. Get file schema and create column mapping.
279
64
        std::vector<ColumnDefinition> file_schema;
280
64
        RETURN_IF_ERROR(_data_reader.reader->get_schema(&file_schema));
281
        // For Paimon/Hudi, FE can provide field ids through `history_schema_info`. Annotate the
282
        // file schema before column mapping when the table format maps columns by field id.
283
64
        RETURN_IF_ERROR(annotate_file_schema(&file_schema));
284
64
        _data_reader.file_schema = file_schema;
285
64
        _mapper_options.mode = mapping_mode();
286
287
64
        _data_reader.column_mapper = _data_reader.reader->create_column_mapper(_mapper_options);
288
64
        DORIS_CHECK(_data_reader.column_mapper != nullptr);
289
64
        RETURN_IF_ERROR(_data_reader.column_mapper->create_mapping(_projected_columns,
290
64
                                                                   _partition_values, file_schema));
291
64
        DORIS_CHECK(_data_reader.column_mapper->mappings().size() == _projected_columns.size());
292
293
        // 2. Build table filters based on conjuncts and column predicates.
294
64
        RETURN_IF_ERROR(_build_table_filters_from_conjuncts());
295
296
        // 3. Create file scan request based on column mapping and table filters, then open file
297
        // reader with the request. File scan request carries row-level expression filters and
298
        // file-level pruning hints. Only expression filters decide returned rows; column predicates
299
        // are pruning hints.
300
64
        auto file_request = std::make_shared<FileScanRequest>();
301
64
        RETURN_IF_ERROR(_data_reader.column_mapper->create_scan_request(
302
64
                _table_filters, _table_column_predicates, _projected_columns, file_request.get(),
303
64
                _runtime_state));
304
64
        bool constant_filter_pruned_split = false;
305
64
        RETURN_IF_ERROR(_evaluate_constant_filters(&constant_filter_pruned_split));
306
64
        if (constant_filter_pruned_split) {
307
1
            RETURN_IF_ERROR(close_current_reader());
308
1
            return Status::OK();
309
1
        }
310
63
        RETURN_IF_ERROR(customize_file_scan_request(file_request.get()));
311
63
        RETURN_IF_ERROR(_open_local_filter_exprs(*file_request));
312
63
        _data_reader.file_block_layout.clear();
313
63
        _data_reader.block_template.clear();
314
63
        _data_reader.file_block_layout.resize(file_request->local_positions.size());
315
316
        // 4. Build file block layout from file schema and column mapping. The layout describes
317
        // the block returned by file reader before table-column materialization.
318
91
        for (const auto& [file_column_id, block_position] : file_request->local_positions) {
319
91
            DORIS_CHECK(block_position.value() < _data_reader.file_block_layout.size());
320
91
            const auto* field = _find_column_definition(_data_reader.file_schema, file_column_id);
321
91
            DORIS_CHECK(field != nullptr);
322
323
91
            ColumnDefinition projected_field;
324
91
            {
325
91
                auto it = std::find_if(
326
91
                        file_request->non_predicate_columns.begin(),
327
91
                        file_request->non_predicate_columns.end(),
328
111
                        [&](const LocalColumnIndex& p) { return p.column_id() == file_column_id; });
329
91
                if (it != file_request->non_predicate_columns.end()) {
330
65
                    RETURN_IF_ERROR(project_column_definition(*field, *it, &projected_field));
331
65
                }
332
91
            }
333
91
            {
334
91
                auto it = std::find_if(
335
91
                        file_request->predicate_columns.begin(),
336
91
                        file_request->predicate_columns.end(),
337
91
                        [&](const LocalColumnIndex& p) { return p.column_id() == file_column_id; });
338
91
                if (it != file_request->predicate_columns.end()) {
339
26
                    RETURN_IF_ERROR(project_column_definition(*field, *it, &projected_field));
340
26
                }
341
91
            }
342
91
            _data_reader.file_block_layout[block_position.value()] = {
343
91
                    .file_column_id = file_column_id,
344
91
                    .name = projected_field.name,
345
91
                    .type = projected_field.type,
346
91
            };
347
91
            DORIS_CHECK(_data_reader.file_block_layout[block_position.value()].type != nullptr);
348
91
        }
349
350
        // 5. Prepare block template from file block layout. The block template stores the block
351
        // returned by file reader before table-column materialization.
352
63
        _data_reader.block_template.reserve(_data_reader.file_block_layout.size());
353
91
        for (const auto& column : _data_reader.file_block_layout) {
354
91
            _data_reader.block_template.insert(
355
91
                    {column.type->create_column(), column.type, column.name});
356
91
        }
357
63
        if (VLOG_DEBUG_IS_ON) {
358
0
            VLOG_DEBUG << "TableReader debug: " << debug_string();
359
0
        }
360
63
        RETURN_IF_ERROR(_open_mapping_exprs());
361
63
        RETURN_IF_ERROR(_data_reader.reader->open(file_request));
362
63
        RETURN_IF_ERROR(_init_reader_condition_cache(*file_request));
363
63
        return Status::OK();
364
63
    }
365
366
    Status _build_table_filters_from_conjuncts();
367
    Status _open_local_filter_exprs(const FileScanRequest& file_request);
368
    Status _init_reader_condition_cache(const FileScanRequest& file_request);
369
    void _finalize_reader_condition_cache();
370
    bool _should_enable_condition_cache(const FileScanRequest& file_request) const;
371
372
64
    Status _evaluate_constant_filters(bool* can_filter_all) {
373
64
        DORIS_CHECK(can_filter_all != nullptr);
374
64
        *can_filter_all = false;
375
64
        for (const auto& table_filter : _table_filters) {
376
19
            if (table_filter.conjunct == nullptr ||
377
                // RuntimeFilterExpr does not implement execute_column_impl(); it is evaluated by
378
                // the row-level filter path through execute_filter(). Constant split pruning uses
379
                // VExprContext::execute() on a one-row synthetic block, so runtime filters must not
380
                // be pre-executed here even when their referenced slot maps to a constant value.
381
19
                table_filter.conjunct->root()->is_rf_wrapper() ||
382
19
                !_table_filter_has_only_constant_entries(table_filter)) {
383
17
                continue;
384
17
            }
385
2
            Block eval_block;
386
2
            RETURN_IF_ERROR(_build_constant_filter_block(table_filter, &eval_block));
387
2
            RowDescriptor row_desc;
388
2
            RETURN_IF_ERROR(table_filter.conjunct->prepare(_runtime_state, row_desc));
389
2
            RETURN_IF_ERROR(table_filter.conjunct->open(_runtime_state));
390
2
            int result_column_id = -1;
391
2
            RETURN_IF_ERROR(table_filter.conjunct->execute(&eval_block, &result_column_id));
392
2
            DORIS_CHECK(result_column_id >= 0);
393
2
            if (_filter_result_filters_all(eval_block.get_by_position(result_column_id).column)) {
394
1
                *can_filter_all = true;
395
1
                return Status::OK();
396
1
            }
397
2
        }
398
63
        return Status::OK();
399
64
    }
400
401
17
    bool _table_filter_has_only_constant_entries(const TableFilter& table_filter) const {
402
17
        const auto& filter_entries = _data_reader.column_mapper->filter_entries();
403
17
        for (const auto global_index : table_filter.global_indices) {
404
17
            const auto entry_it = filter_entries.find(global_index);
405
17
            if (entry_it == filter_entries.end() || !entry_it->second.is_constant()) {
406
15
                return false;
407
15
            }
408
17
        }
409
2
        return !table_filter.global_indices.empty();
410
17
    }
411
412
2
    Status _build_constant_filter_block(const TableFilter& table_filter, Block* eval_block) {
413
2
        DORIS_CHECK(eval_block != nullptr);
414
2
        eval_block->clear();
415
2
        const auto& mappings = _data_reader.column_mapper->mappings();
416
2
        const auto& filter_entries = _data_reader.column_mapper->filter_entries();
417
2
        DORIS_CHECK(mappings.size() == _projected_columns.size());
418
4
        for (size_t column_idx = 0; column_idx < mappings.size(); ++column_idx) {
419
2
            const auto global_index = GlobalIndex(column_idx);
420
2
            const auto& mapping = mappings[column_idx];
421
2
            const auto entry_it = filter_entries.find(global_index);
422
2
            const bool referenced_by_filter =
423
2
                    std::find(table_filter.global_indices.begin(),
424
2
                              table_filter.global_indices.end(),
425
2
                              global_index) != table_filter.global_indices.end();
426
2
            if (referenced_by_filter && entry_it != filter_entries.end() &&
427
2
                entry_it->second.is_constant()) {
428
2
                ColumnPtr constant_column;
429
2
                RETURN_IF_ERROR(_materialize_constant_filter_column(
430
2
                        entry_it->second.constant_index(), &constant_column));
431
2
                eval_block->insert({std::move(constant_column), mapping.table_type,
432
2
                                    mapping.table_column_name});
433
2
            } else {
434
0
                eval_block->insert({mapping.table_type->create_column_const_with_default_value(1),
435
0
                                    mapping.table_type, mapping.table_column_name});
436
0
            }
437
2
        }
438
2
        return Status::OK();
439
2
    }
440
441
2
    Status _materialize_constant_filter_column(ConstantIndex constant_index, ColumnPtr* column) {
442
2
        DORIS_CHECK(column != nullptr);
443
2
        const auto& constant_entry = _data_reader.column_mapper->constant_map().get(constant_index);
444
2
        DORIS_CHECK(constant_entry.expr != nullptr);
445
2
        DORIS_CHECK(constant_entry.type != nullptr);
446
2
        RowDescriptor row_desc;
447
2
        RETURN_IF_ERROR(constant_entry.expr->prepare(_runtime_state, row_desc));
448
2
        RETURN_IF_ERROR(constant_entry.expr->open(_runtime_state));
449
2
        Block eval_block;
450
2
        eval_block.insert({constant_entry.type->create_column_const_with_default_value(1),
451
2
                           constant_entry.type, "__table_reader_constant_filter"});
452
2
        int result_column_id = -1;
453
2
        RETURN_IF_ERROR(constant_entry.expr->execute(&eval_block, &result_column_id));
454
2
        DORIS_CHECK(result_column_id >= 0);
455
2
        *column = eval_block.get_by_position(result_column_id).column;
456
2
        DORIS_CHECK((*column)->size() == 1);
457
2
        return Status::OK();
458
2
    }
459
460
2
    static bool _filter_result_filters_all(const ColumnPtr& filter_column) {
461
2
        DORIS_CHECK(filter_column.get() != nullptr);
462
2
        DORIS_CHECK(filter_column->size() == 1);
463
2
        return !filter_column->get_bool(0);
464
2
    }
465
466
64
    virtual Status customize_file_scan_request(FileScanRequest* file_request) {
467
64
        return _append_delete_predicate(file_request);
468
64
    }
469
470
178
    bool _is_table_level_count_active() const { return _remaining_table_level_count >= 0; }
471
472
6
    Status _materialize_count_rows(size_t rows, Block* block) const {
473
6
        DORIS_CHECK(block != nullptr);
474
6
        DORIS_CHECK(block->columns() > 0 || rows == 0);
475
12
        for (size_t column_idx = 0; column_idx < block->columns(); ++column_idx) {
476
6
            auto column = block->get_by_position(column_idx).type->create_column();
477
6
            column->resize(rows);
478
6
            block->replace_by_position(column_idx, std::move(column));
479
6
        }
480
6
        return Status::OK();
481
6
    }
482
483
6
    Status _read_table_level_count(Block* block, bool* eos) {
484
6
        DORIS_CHECK(block != nullptr);
485
6
        DORIS_CHECK(eos != nullptr);
486
6
        DORIS_CHECK(_push_down_agg_type == TPushAggOp::type::COUNT);
487
6
        DORIS_CHECK(_remaining_table_level_count >= 0);
488
6
        if (_remaining_table_level_count == 0) {
489
2
            _remaining_table_level_count = -1;
490
2
            _current_task.reset();
491
2
            *eos = true;
492
2
            return Status::OK();
493
2
        }
494
495
4
        const int64_t batch_size = _runtime_state == nullptr
496
4
                                           ? _remaining_table_level_count
497
4
                                           : static_cast<int64_t>(_runtime_state->batch_size());
498
4
        const auto rows = std::min(_remaining_table_level_count, batch_size);
499
4
        RETURN_IF_ERROR(_materialize_count_rows(cast_set<size_t>(rows), block));
500
4
        _remaining_table_level_count -= rows;
501
4
        *eos = false;
502
4
        return Status::OK();
503
4
    }
504
505
    void _append_file_scan_column(FileScanRequest* request, LocalColumnId column_id,
506
24
                                  std::vector<LocalColumnIndex>* scan_columns) {
507
24
        DORIS_CHECK(request != nullptr);
508
24
        DORIS_CHECK(scan_columns != nullptr);
509
24
        FileScanRequestBuilder builder(request);
510
24
        Status status;
511
24
        if (scan_columns == &request->predicate_columns) {
512
13
            status = builder.add_predicate_column(column_id);
513
13
        } else {
514
11
            DORIS_CHECK(scan_columns == &request->non_predicate_columns);
515
11
            status = builder.add_non_predicate_column(column_id);
516
11
        }
517
24
        DORIS_CHECK(status.ok()) << status.to_string();
518
24
        if (column_id == LocalColumnId(ROW_POSITION_COLUMN_ID) &&
519
24
            _find_column_definition(_data_reader.file_schema, column_id) == nullptr) {
520
17
            _data_reader.file_schema.push_back(row_position_column_definition());
521
17
        }
522
24
    }
523
524
    // Append DeletePredicate to file scan request if there are deletes. The predicate will be evaluated in file reader level and filter out deleted rows before returning data to table reader.
525
64
    Status _append_delete_predicate(FileScanRequest* request) {
526
64
        DORIS_CHECK(request != nullptr);
527
64
        if (_delete_rows == nullptr || _delete_rows->empty()) {
528
54
            return Status::OK();
529
54
        }
530
10
        const auto row_position_column_id = LocalColumnId(ROW_POSITION_COLUMN_ID);
531
10
        _append_file_scan_column(request, row_position_column_id, &request->predicate_columns);
532
533
10
        auto delete_predicate = std::make_shared<DeletePredicate>(*_delete_rows);
534
10
        const auto block_position = request->local_positions.at(row_position_column_id);
535
10
        delete_predicate->add_child(VSlotRef::create_shared(
536
10
                cast_set<int>(block_position.value()), cast_set<int>(block_position.value()), -1,
537
10
                std::make_shared<DataTypeInt64>(), ROW_POSITION_COLUMN_NAME));
538
539
10
        request->delete_conjuncts.push_back(
540
10
                VExprContext::create_shared(std::move(delete_predicate)));
541
10
        return Status::OK();
542
64
    }
543
544
    // Close the current concrete reader. This hook is called by both create_next_reader() and
545
    // close(), so it should remain idempotent.
546
64
    virtual Status close_current_reader() {
547
64
        _finalize_reader_condition_cache();
548
64
        RETURN_IF_ERROR(_data_reader.reader->close());
549
64
        _data_reader.reader.reset();
550
64
        if (_data_reader.column_mapper != nullptr) {
551
64
            _data_reader.column_mapper->clear();
552
64
            _data_reader.column_mapper.reset();
553
64
        }
554
64
        _table_filters.clear();
555
64
        _data_reader.file_schema.clear();
556
64
        _data_reader.file_block_layout.clear();
557
64
        _data_reader.block_template.clear();
558
64
        _current_task.reset();
559
64
        _current_file_description.reset();
560
64
        _current_reader_reached_eof = false;
561
64
        return Status::OK();
562
64
    }
563
564
    // Finalize file-local block to table/global schema block.
565
56
    Status finalize_chunk(Block* block, const size_t rows) {
566
56
        SCOPED_TIMER(_profile.finalize_timer);
567
56
        size_t idx = 0;
568
84
        for (const auto& mapping : _data_reader.column_mapper->mappings()) {
569
84
            ColumnPtr column;
570
84
            RETURN_IF_ERROR(_materialize_mapping_column(mapping, &_data_reader.block_template, rows,
571
84
                                                        &column));
572
84
            block->replace_by_position(idx, IColumn::mutate(std::move(column)));
573
84
            idx++;
574
84
        }
575
56
        RETURN_IF_ERROR(materialize_virtual_columns(block));
576
        // Enforce CHAR/VARCHAR length declared by the table schema after all file-to-table
577
        // materialization has finished.
578
56
        RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block));
579
56
        return Status::OK();
580
56
    }
581
582
    // Materialize virtual columns in the table block, such as Iceberg _row_id and
583
    // _last_updated_sequence_number. This runs after normal column materialization so finalize
584
    // expressions can reference those virtual columns.
585
37
    virtual Status materialize_virtual_columns(Block* table_block) { return Status::OK(); }
586
587
#ifndef NDEBUG
588
56
    Status _check_file_block_columns(std::string_view stage, size_t rows) {
589
56
        DORIS_CHECK(_data_reader.block_template.columns() == _data_reader.file_block_layout.size());
590
138
        for (size_t idx = 0; idx < _data_reader.block_template.columns(); ++idx) {
591
82
            const auto& file_block_column = _data_reader.file_block_layout[idx];
592
82
            const auto& column_with_type = _data_reader.block_template.get_by_position(idx);
593
82
            const auto* column = column_with_type.column.get();
594
82
            try {
595
82
                if (column == nullptr) {
596
0
                    auto st = Status::InternalError(
597
0
                            "Invalid file block column {} at {}: file_column_id={}, name='{}', "
598
0
                            "type={}, column=null, expected_rows={}, reader={}",
599
0
                            idx, stage, file_block_column.file_column_id.value(),
600
0
                            file_block_column.name,
601
0
                            file_block_column.type == nullptr ? "null"
602
0
                                                              : file_block_column.type->get_name(),
603
0
                            rows, debug_string());
604
0
                    LOG(WARNING) << st;
605
0
                    return st;
606
0
                }
607
82
                column->sanity_check();
608
82
                auto st = column_with_type.check_type_and_column_match();
609
82
                if (!st.ok()) {
610
0
                    auto contextual_status = Status::InternalError(
611
0
                            "Invalid file block column {} at {}: file_column_id={}, name='{}', "
612
0
                            "type={}, column={}, column_size={}, expected_rows={}, error={}, "
613
0
                            "reader={}",
614
0
                            idx, stage, file_block_column.file_column_id.value(),
615
0
                            file_block_column.name,
616
0
                            file_block_column.type == nullptr ? "null"
617
0
                                                              : file_block_column.type->get_name(),
618
0
                            column->get_name(), column->size(), rows, st.to_string(),
619
0
                            debug_string());
620
0
                    LOG(WARNING) << contextual_status;
621
0
                    return contextual_status;
622
0
                }
623
82
            } catch (const Exception& e) {
624
0
                auto st = Status::InternalError(
625
0
                        "Invalid file block column {} at {}: file_column_id={}, name='{}', "
626
0
                        "type={}, column={}, column_size={}, expected_rows={}, error={}, "
627
0
                        "reader={}",
628
0
                        idx, stage, file_block_column.file_column_id.value(),
629
0
                        file_block_column.name,
630
0
                        file_block_column.type == nullptr ? "null"
631
0
                                                          : file_block_column.type->get_name(),
632
0
                        column == nullptr ? "null" : column->get_name(),
633
0
                        column == nullptr ? 0 : column->size(), rows, e.to_string(),
634
0
                        debug_string());
635
0
                LOG(WARNING) << st;
636
0
                return st;
637
0
            } catch (const std::exception& e) {
638
0
                auto st = Status::InternalError(
639
0
                        "Invalid file block column {} at {}: file_column_id={}, name='{}', "
640
0
                        "type={}, column={}, column_size={}, expected_rows={}, error={}, "
641
0
                        "reader={}",
642
0
                        idx, stage, file_block_column.file_column_id.value(),
643
0
                        file_block_column.name,
644
0
                        file_block_column.type == nullptr ? "null"
645
0
                                                          : file_block_column.type->get_name(),
646
0
                        column == nullptr ? "null" : column->get_name(),
647
0
                        column == nullptr ? 0 : column->size(), rows, e.what(), debug_string());
648
0
                LOG(WARNING) << st;
649
0
                return st;
650
0
            }
651
82
        }
652
56
        return Status::OK();
653
56
    }
654
655
56
    Status _check_table_block_columns(std::string_view stage, const Block* block, size_t rows) {
656
56
        DORIS_CHECK(block != nullptr);
657
56
        DORIS_CHECK(block->columns() == _data_reader.column_mapper->mappings().size());
658
140
        for (size_t idx = 0; idx < block->columns(); ++idx) {
659
84
            const auto& mapping = _data_reader.column_mapper->mappings()[idx];
660
84
            const auto& column_with_type = block->get_by_position(idx);
661
84
            const auto* column = column_with_type.column.get();
662
84
            try {
663
84
                if (column == nullptr) {
664
0
                    auto st = Status::InternalError(
665
0
                            "Invalid table block column {} at {}: table_column='{}', "
666
0
                            "global_index={}, type={}, column=null, expected_rows={}, mapping={}",
667
0
                            idx, stage, mapping.table_column_name, mapping.global_index.value(),
668
0
                            mapping.table_type == nullptr ? "null" : mapping.table_type->get_name(),
669
0
                            rows, mapping.debug_string());
670
0
                    LOG(WARNING) << st;
671
0
                    return st;
672
0
                }
673
84
                column->sanity_check();
674
84
                auto st = column_with_type.check_type_and_column_match();
675
84
                if (!st.ok()) {
676
0
                    auto contextual_status = Status::InternalError(
677
0
                            "Invalid table block column {} at {}: table_column='{}', "
678
0
                            "global_index={}, type={}, column={}, column_size={}, "
679
0
                            "expected_rows={}, error={}, mapping={}",
680
0
                            idx, stage, mapping.table_column_name, mapping.global_index.value(),
681
0
                            mapping.table_type == nullptr ? "null" : mapping.table_type->get_name(),
682
0
                            column->get_name(), column->size(), rows, st.to_string(),
683
0
                            mapping.debug_string());
684
0
                    LOG(WARNING) << contextual_status;
685
0
                    return contextual_status;
686
0
                }
687
84
            } catch (const Exception& e) {
688
0
                auto st = Status::InternalError(
689
0
                        "Invalid table block column {} at {}: table_column='{}', global_index={}, "
690
0
                        "type={}, column={}, column_size={}, expected_rows={}, error={}, "
691
0
                        "mapping={}",
692
0
                        idx, stage, mapping.table_column_name, mapping.global_index.value(),
693
0
                        mapping.table_type == nullptr ? "null" : mapping.table_type->get_name(),
694
0
                        column == nullptr ? "null" : column->get_name(),
695
0
                        column == nullptr ? 0 : column->size(), rows, e.to_string(),
696
0
                        mapping.debug_string());
697
0
                LOG(WARNING) << st;
698
0
                return st;
699
0
            } catch (const std::exception& e) {
700
0
                auto st = Status::InternalError(
701
0
                        "Invalid table block column {} at {}: table_column='{}', global_index={}, "
702
0
                        "type={}, column={}, column_size={}, expected_rows={}, error={}, "
703
0
                        "mapping={}",
704
0
                        idx, stage, mapping.table_column_name, mapping.global_index.value(),
705
0
                        mapping.table_type == nullptr ? "null" : mapping.table_type->get_name(),
706
0
                        column == nullptr ? "null" : column->get_name(),
707
0
                        column == nullptr ? 0 : column->size(), rows, e.what(),
708
0
                        mapping.debug_string());
709
0
                LOG(WARNING) << st;
710
0
                return st;
711
0
            }
712
84
        }
713
56
        return Status::OK();
714
56
    }
715
#endif
716
717
56
    Status _truncate_char_or_varchar_columns(Block* block) {
718
56
        DORIS_CHECK(block != nullptr);
719
56
        if (_runtime_state == nullptr ||
720
56
            !_runtime_state->query_options().truncate_char_or_varchar_columns) {
721
56
            return Status::OK();
722
56
        }
723
0
        DORIS_CHECK(block->columns() == _data_reader.column_mapper->mappings().size());
724
0
        for (size_t idx = 0; idx < _data_reader.column_mapper->mappings().size(); ++idx) {
725
0
            const auto& mapping = _data_reader.column_mapper->mappings()[idx];
726
0
            if (!_should_truncate_char_or_varchar_column(mapping)) {
727
0
                continue;
728
0
            }
729
0
            const auto target_len =
730
0
                    assert_cast<const DataTypeString*>(remove_nullable(mapping.table_type).get())
731
0
                            ->len();
732
0
            _truncate_char_or_varchar_column(block, idx, target_len);
733
0
        }
734
0
        return Status::OK();
735
56
    }
736
737
    // Return true when the table schema has a bounded CHAR/VARCHAR length that is stricter than
738
    // the file-side type. Examples:
739
    // - table VARCHAR(10), file VARCHAR(20): truncate to 10;
740
    // - table VARCHAR(10), file STRING: truncate to 10 because STRING has no declared bound;
741
    // - table STRING, any file type: no truncation because the target has no bound.
742
5
    static bool _should_truncate_char_or_varchar_column(const ColumnMapping& mapping) {
743
5
        if (mapping.table_type == nullptr) {
744
0
            return false;
745
0
        }
746
5
        const auto table_type = remove_nullable(mapping.table_type);
747
5
        const auto primitive_type = table_type->get_primitive_type();
748
5
        if (primitive_type != TYPE_VARCHAR && primitive_type != TYPE_CHAR) {
749
1
            return false;
750
1
        }
751
4
        const auto target_len = assert_cast<const DataTypeString*>(table_type.get())->len();
752
4
        if (target_len <= 0) {
753
0
            return false;
754
0
        }
755
4
        if (mapping.file_type == nullptr) {
756
0
            return true;
757
0
        }
758
4
        const auto file_type = remove_nullable(mapping.file_type);
759
4
        DORIS_CHECK(file_type != nullptr);
760
4
        int file_len = -1;
761
4
        if (file_type->get_primitive_type() == TYPE_VARCHAR ||
762
4
            file_type->get_primitive_type() == TYPE_CHAR ||
763
4
            file_type->get_primitive_type() == TYPE_STRING) {
764
3
            file_len = assert_cast<const DataTypeString*>(file_type.get())->len();
765
3
        }
766
767
4
        return file_len < 0 || target_len < file_len;
768
4
    }
769
770
    // Truncate a materialized CHAR/VARCHAR column in place by reusing the vectorized substring
771
    // implementation: substring(column, 1, len). Nullable columns are unwrapped before substring
772
    // execution and wrapped back with the original null map afterward, because substring operates
773
    // on the nested string payload only.
774
1
    static void _truncate_char_or_varchar_column(Block* block, size_t idx, int len) {
775
1
        DORIS_CHECK(block != nullptr);
776
1
        auto int_type = std::make_shared<DataTypeInt32>();
777
1
        const auto num_columns_without_result = cast_set<uint32_t>(block->columns());
778
1
        auto& target = block->get_by_position(idx);
779
1
        const bool is_nullable = target.type->is_nullable();
780
1
        ColumnPtr input_column = target.column;
781
1
        ColumnPtr null_map_column;
782
1
        if (is_nullable) {
783
1
            const auto* nullable_column = assert_cast<const ColumnNullable*>(target.column.get());
784
1
            input_column = nullable_column->get_nested_column_ptr();
785
1
            null_map_column = nullable_column->get_null_map_column_ptr();
786
1
        }
787
1
        block->replace_by_position(idx, std::move(input_column));
788
1
        block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(1)),
789
1
                       int_type, "const 1"});
790
1
        block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(len)),
791
1
                       int_type, "const len"});
792
1
        block->insert({nullptr, std::make_shared<DataTypeString>(), "result"});
793
794
1
        ColumnNumbers temp_arguments(3);
795
1
        temp_arguments[0] = cast_set<uint32_t>(idx);
796
1
        temp_arguments[1] = num_columns_without_result;
797
1
        temp_arguments[2] = num_columns_without_result + 1;
798
1
        const uint32_t result_column_id = num_columns_without_result + 2;
799
1
        SubstringUtil::substring_execute(*block, temp_arguments, result_column_id, block->rows());
800
801
1
        ColumnPtr result_column = block->get_by_position(result_column_id).column;
802
1
        if (is_nullable) {
803
1
            result_column = ColumnNullable::create(std::move(result_column), null_map_column);
804
1
        }
805
1
        block->replace_by_position(idx, std::move(result_column));
806
1
        block->erase_tail(num_columns_without_result);
807
1
    }
808
809
63
    Status _try_materialize_aggregate_pushdown_rows(Block* block, bool* pushed_down) {
810
63
        DORIS_CHECK(block != nullptr);
811
63
        DORIS_CHECK(pushed_down != nullptr);
812
63
        *pushed_down = false;
813
63
        block->clear_column_data(_projected_columns.size());
814
63
        _aggregate_pushdown_tried = true;
815
63
        if (!_supports_aggregate_pushdown(_push_down_agg_type)) {
816
56
            return Status::OK();
817
56
        }
818
819
7
        FileAggregateRequest file_request;
820
7
        RETURN_IF_ERROR(_build_file_aggregate_request(_push_down_agg_type, &file_request));
821
7
        FileAggregateResult file_result;
822
7
        const auto status = _data_reader.reader->get_aggregate_result(file_request, &file_result);
823
7
        if (status.is<ErrorCode::NOT_IMPLEMENTED_ERROR>()) {
824
1
            return Status::OK();
825
1
        }
826
6
        RETURN_IF_ERROR(status);
827
6
        RETURN_IF_ERROR(
828
6
                _materialize_aggregate_pushdown_rows(_push_down_agg_type, file_result, block));
829
6
        *pushed_down = true;
830
6
        RETURN_IF_ERROR(close_current_reader());
831
6
        return Status::OK();
832
6
    }
833
834
70
    virtual bool _supports_aggregate_pushdown(TPushAggOp::type agg_type) const {
835
        // Only COUNT and MIN/MAX can be push down.
836
70
        if (agg_type != TPushAggOp::type::COUNT && agg_type != TPushAggOp::type::MINMAX) {
837
48
            return false;
838
48
        }
839
        // Only support aggregate pushdown when there is no delete, filter and column predicate, so
840
        // the reduced rows consumed by the upper aggregate remain semantically equivalent to a
841
        // normal scan.
842
22
        if (_delete_rows != nullptr && !_delete_rows->empty()) {
843
3
            return false;
844
3
        }
845
19
        if (!_table_filters.empty() || !_table_column_predicates.empty()) {
846
2
            return false;
847
2
        }
848
17
        if (agg_type == TPushAggOp::type::COUNT) {
849
5
            return true;
850
5
        }
851
        // For MIN/MAX, only support direct file-to-table column mappings. The two emitted rows
852
        // must be enough for the upper MIN/MAX aggregate without evaluating default expressions or
853
        // virtual columns.
854
14
        for (const auto& mapping : _data_reader.column_mapper->mappings()) {
855
14
            if (!mapping.file_local_id.has_value() ||
856
14
                mapping.virtual_column_type != TableVirtualColumnType::INVALID ||
857
14
                mapping.default_expr != nullptr || mapping.file_type == nullptr ||
858
14
                mapping.table_type == nullptr) {
859
1
                return false;
860
1
            }
861
13
            if (!_can_push_down_minmax_for_mapping(mapping)) {
862
1
                return false;
863
1
            }
864
13
        }
865
10
        return true;
866
12
    }
867
868
84
    static ColumnPtr _detach_column(ColumnPtr column) {
869
84
        DORIS_CHECK(column.get() != nullptr);
870
84
        return IColumn::mutate(std::move(column));
871
84
    }
872
873
58
    static Status _align_column_nullability(ColumnPtr* column, const DataTypePtr& table_type) {
874
58
        DORIS_CHECK(column != nullptr);
875
58
        DORIS_CHECK(column->get() != nullptr);
876
58
        DORIS_CHECK(table_type != nullptr);
877
        // Must return non-const column
878
58
        *column = (*column)->convert_to_full_column_if_const();
879
58
        if (table_type->is_nullable()) {
880
23
            const auto& nested_type =
881
23
                    assert_cast<const DataTypeNullable&>(*table_type).get_nested_type();
882
23
            if (!(*column)->is_nullable()) {
883
2
                RETURN_IF_ERROR(_align_column_nullability(column, nested_type));
884
2
                *column = make_nullable(*column);
885
2
                return Status::OK();
886
2
            }
887
21
            const auto& nullable_column = assert_cast<const ColumnNullable&>(**column);
888
21
            ColumnPtr nested_column = nullable_column.get_nested_column_ptr();
889
21
            RETURN_IF_ERROR(_align_column_nullability(&nested_column, nested_type));
890
21
            *column = ColumnNullable::create(nested_column,
891
21
                                             nullable_column.get_null_map_column_ptr());
892
21
            return Status::OK();
893
21
        }
894
35
        if ((*column)->is_nullable()) {
895
0
            const auto& nullable_column = assert_cast<const ColumnNullable&>(**column);
896
0
            if (nullable_column.has_null()) {
897
0
                return Status::InternalError(
898
0
                        "Default expression produced NULL for non-nullable table column");
899
0
            }
900
0
            ColumnPtr nested_column = nullable_column.get_nested_column_ptr();
901
0
            RETURN_IF_ERROR(_align_column_nullability(&nested_column, table_type));
902
0
            *column = nested_column;
903
0
            return Status::OK();
904
0
        }
905
35
        if (const auto* array_type = typeid_cast<const DataTypeArray*>(table_type.get())) {
906
1
            const auto& array_column = assert_cast<const ColumnArray&>(**column);
907
1
            ColumnPtr nested_column = array_column.get_data_ptr();
908
1
            RETURN_IF_ERROR(
909
1
                    _align_column_nullability(&nested_column, array_type->get_nested_type()));
910
1
            *column = ColumnArray::create(nested_column, array_column.get_offsets_ptr());
911
1
            return Status::OK();
912
1
        }
913
34
        if (const auto* map_type = typeid_cast<const DataTypeMap*>(table_type.get())) {
914
0
            const auto& map_column = assert_cast<const ColumnMap&>(**column);
915
0
            ColumnPtr key_column = map_column.get_keys_ptr();
916
0
            ColumnPtr value_column = map_column.get_values_ptr();
917
0
            RETURN_IF_ERROR(_align_column_nullability(&key_column, map_type->get_key_type()));
918
0
            RETURN_IF_ERROR(_align_column_nullability(&value_column, map_type->get_value_type()));
919
0
            *column = ColumnMap::create(key_column, value_column, map_column.get_offsets_ptr());
920
0
            return Status::OK();
921
0
        }
922
34
        if (const auto* struct_type = typeid_cast<const DataTypeStruct*>(table_type.get())) {
923
5
            const auto& struct_column = assert_cast<const ColumnStruct&>(**column);
924
5
            Columns columns = struct_column.get_columns_copy();
925
5
            DORIS_CHECK(columns.size() == struct_type->get_elements().size());
926
16
            for (size_t i = 0; i < columns.size(); ++i) {
927
11
                RETURN_IF_ERROR(
928
11
                        _align_column_nullability(&columns[i], struct_type->get_element(i)));
929
11
            }
930
5
            *column = ColumnStruct::create(columns);
931
5
            return Status::OK();
932
5
        }
933
29
        return Status::OK();
934
34
    }
935
936
    static Status _execute_default_expr_without_root_type_check(
937
            const VExprContextSPtr& default_expr, const Block* block,
938
5
            ColumnWithTypeAndName* result_data) {
939
5
        DORIS_CHECK(default_expr != nullptr);
940
5
        DORIS_CHECK(block != nullptr);
941
5
        DORIS_CHECK(result_data != nullptr);
942
5
        ColumnPtr result_column;
943
5
        Status st;
944
5
        RETURN_IF_CATCH_EXCEPTION({
945
5
            st = default_expr->root()->execute_column_impl(default_expr.get(), block, nullptr,
946
5
                                                           block->rows(), result_column);
947
5
        });
948
5
        RETURN_IF_ERROR(st);
949
5
        DORIS_CHECK(result_column.get() != nullptr);
950
5
        if (result_column->size() != block->rows()) {
951
0
            return Status::InternalError(
952
0
                    "Default expr {} return column size {} not equal to expected size {}",
953
0
                    default_expr->expr_name(), result_column->size(), block->rows());
954
0
        }
955
5
        result_data->column = result_column;
956
5
        result_data->type = default_expr->execute_type(block);
957
5
        result_data->name = default_expr->expr_name();
958
5
        return Status::OK();
959
5
    }
960
961
    Status _cast_column_to_type(ColumnPtr* column, const DataTypePtr& file_type,
962
                                const DataTypePtr& table_type,
963
0
                                const std::string& column_name) const {
964
0
        DORIS_CHECK(column != nullptr);
965
0
        DORIS_CHECK(column->get() != nullptr);
966
0
        DORIS_CHECK(file_type != nullptr);
967
0
        DORIS_CHECK(table_type != nullptr);
968
0
        if (file_type->equals(*table_type)) {
969
0
            return Status::OK();
970
0
        }
971
972
0
        DataTypePtr input_type = file_type;
973
0
        if ((*column)->is_nullable() && !input_type->is_nullable()) {
974
0
            input_type = make_nullable(input_type);
975
0
        }
976
0
        Block cast_block;
977
0
        cast_block.insert({*column, input_type, column_name});
978
0
        auto slot_ref = VSlotRef::create_shared(0, 0, -1, input_type, column_name);
979
0
        auto cast_expr = Cast::create_shared(table_type);
980
0
        cast_expr->add_child(std::move(slot_ref));
981
0
        auto cast_ctx = VExprContext::create_shared(std::move(cast_expr));
982
0
        RowDescriptor row_desc;
983
0
        RETURN_IF_ERROR(cast_ctx->prepare(_runtime_state, row_desc));
984
0
        RETURN_IF_ERROR(cast_ctx->open(_runtime_state));
985
0
        ColumnPtr cast_column;
986
0
        RETURN_IF_ERROR(cast_ctx->execute(&cast_block, cast_column));
987
0
        *column = std::move(cast_column);
988
0
        return Status::OK();
989
0
    }
990
991
    Status _materialize_present_child_mapping_column(const ColumnMapping& mapping,
992
                                                     const ColumnPtr& file_column,
993
18
                                                     const size_t rows, ColumnPtr* column) {
994
18
        DORIS_CHECK(column != nullptr);
995
18
        DORIS_CHECK(mapping.file_type != nullptr);
996
18
        DORIS_CHECK(mapping.table_type != nullptr);
997
18
        *column = file_column;
998
18
        if (!mapping.is_trivial) {
999
5
            if (!mapping.child_mappings.empty()) {
1000
5
                RETURN_IF_ERROR(
1001
5
                        _materialize_complex_mapping_column(mapping, *column, rows, column));
1002
5
            } else {
1003
0
                RETURN_IF_ERROR(_cast_column_to_type(column, mapping.file_type, mapping.table_type,
1004
0
                                                     mapping.file_column_name));
1005
0
            }
1006
5
        }
1007
18
        RETURN_IF_ERROR(_align_column_nullability(column, mapping.table_type));
1008
18
        return Status::OK();
1009
18
    }
1010
1011
    Status _materialize_mapping_column(const ColumnMapping& mapping, Block* current_block,
1012
89
                                       const size_t rows, ColumnPtr* column) {
1013
89
        if (!mapping.is_trivial && mapping.file_local_id.has_value() &&
1014
89
            !mapping.child_mappings.empty()) {
1015
5
            DCHECK(mapping.projection != nullptr);
1016
5
            int res_id;
1017
5
            auto st = mapping.projection->execute(current_block, &res_id);
1018
5
            if (!st.ok()) {
1019
0
                return Status::InternalError(
1020
0
                        "Failed to execute complex mapping projection for table column '{}' "
1021
0
                        "(global_index={}, file_local_id={}, rows={}): {}, mapping={}",
1022
0
                        mapping.table_column_name, mapping.global_index.value(),
1023
0
                        *mapping.file_local_id, rows, st.to_string(), mapping.debug_string());
1024
0
            }
1025
5
            ColumnPtr result_column = current_block->get_by_position(res_id).column;
1026
5
            RETURN_IF_ERROR(
1027
5
                    _materialize_complex_mapping_column(mapping, result_column, rows, column));
1028
5
            return Status::OK();
1029
5
        }
1030
84
        if (mapping.projection != nullptr) {
1031
66
            int res_id;
1032
66
            auto st = mapping.projection->execute(current_block, &res_id);
1033
66
            if (!st.ok()) {
1034
0
                std::string file_local_id = "null";
1035
0
                if (mapping.file_local_id.has_value()) {
1036
0
                    file_local_id = std::to_string(*mapping.file_local_id);
1037
0
                }
1038
0
                return Status::InternalError(
1039
0
                        "Failed to execute mapping projection for table column '{}' "
1040
0
                        "(global_index={}, file_local_id={}, rows={}): {}, mapping={}",
1041
0
                        mapping.table_column_name, mapping.global_index.value(), file_local_id,
1042
0
                        rows, st.to_string(), mapping.debug_string());
1043
0
            }
1044
66
            ColumnPtr result_column = current_block->get_by_position(res_id).column;
1045
66
            *column = _detach_column(std::move(result_column));
1046
66
            return Status::OK();
1047
66
        }
1048
18
        if (mapping.default_expr != nullptr) {
1049
5
            if (current_block->rows() == rows) {
1050
0
                ColumnWithTypeAndName result;
1051
0
                RETURN_IF_ERROR(_execute_default_expr_without_root_type_check(
1052
0
                        mapping.default_expr, current_block, &result));
1053
0
                ColumnPtr result_column = result.column;
1054
0
                RETURN_IF_ERROR(_align_column_nullability(&result_column, mapping.table_type));
1055
0
                *column = _detach_column(std::move(result_column));
1056
5
            } else {
1057
5
                DORIS_CHECK(mapping.constant_index.has_value());
1058
5
                Block eval_block;
1059
5
                eval_block.insert({mapping.table_type->create_column_const_with_default_value(rows),
1060
5
                                   mapping.table_type, "__table_reader_const_rows"});
1061
5
                ColumnWithTypeAndName result;
1062
5
                RETURN_IF_ERROR(_execute_default_expr_without_root_type_check(
1063
5
                        mapping.default_expr, &eval_block, &result));
1064
5
                ColumnPtr result_column = result.column;
1065
5
                RETURN_IF_ERROR(_align_column_nullability(&result_column, mapping.table_type));
1066
5
                *column = _detach_column(std::move(result_column));
1067
5
            }
1068
5
            return Status::OK();
1069
5
        }
1070
13
        ColumnPtr result_column = mapping.table_type->create_column_const_with_default_value(rows);
1071
13
        *column = _detach_column(std::move(result_column));
1072
13
        return Status::OK();
1073
18
    }
1074
1075
    Status _materialize_complex_mapping_column(const ColumnMapping& mapping,
1076
                                               const ColumnPtr& file_column, const size_t rows,
1077
10
                                               ColumnPtr* column) {
1078
10
        DORIS_CHECK(mapping.table_type != nullptr);
1079
10
        DORIS_CHECK(file_column.get() != nullptr);
1080
10
        const auto table_type = remove_nullable(mapping.table_type);
1081
10
        switch (table_type->get_primitive_type()) {
1082
7
        case TYPE_STRUCT:
1083
7
            RETURN_IF_ERROR(_materialize_struct_mapping_column(mapping, file_column, rows, column));
1084
7
            break;
1085
7
        case TYPE_ARRAY:
1086
2
            RETURN_IF_ERROR(_materialize_array_mapping_column(mapping, file_column, rows, column));
1087
2
            break;
1088
2
        case TYPE_MAP:
1089
1
            RETURN_IF_ERROR(_materialize_map_mapping_column(mapping, file_column, rows, column));
1090
1
            break;
1091
1
        default:
1092
0
            *column = _detach_column(file_column);
1093
0
            break;
1094
10
        }
1095
10
        return Status::OK();
1096
10
    }
1097
1098
    static std::vector<const ColumnMapping*> _present_child_mappings_in_file_order(
1099
7
            const std::vector<ColumnMapping>& child_mappings) {
1100
7
        std::vector<const ColumnMapping*> result;
1101
7
        result.reserve(child_mappings.size());
1102
16
        for (const auto& child_mapping : child_mappings) {
1103
16
            if (child_mapping.file_local_id.has_value()) {
1104
11
                result.push_back(&child_mapping);
1105
11
            }
1106
16
        }
1107
7
        std::ranges::sort(result, [](const ColumnMapping* lhs, const ColumnMapping* rhs) {
1108
6
            DORIS_CHECK(lhs->file_local_id.has_value());
1109
6
            DORIS_CHECK(rhs->file_local_id.has_value());
1110
6
            return *lhs->file_local_id < *rhs->file_local_id;
1111
6
        });
1112
7
        return result;
1113
7
    }
1114
1115
    static size_t _file_child_ordinal_for_mapping(
1116
            const ColumnMapping& mapping, const ColumnMapping& child_mapping,
1117
11
            const std::vector<const ColumnMapping*>& file_ordered_children) {
1118
11
        DORIS_CHECK(child_mapping.file_local_id.has_value());
1119
11
        if (!mapping.projected_file_children.empty()) {
1120
7
            const auto child_it = std::ranges::find_if(
1121
10
                    mapping.projected_file_children, [&](const ColumnDefinition& file_child) {
1122
10
                        return file_child.file_local_id() == *child_mapping.file_local_id;
1123
10
                    });
1124
7
            DORIS_CHECK(child_it != mapping.projected_file_children.end());
1125
7
            return static_cast<size_t>(
1126
7
                    std::distance(mapping.projected_file_children.begin(), child_it));
1127
7
        }
1128
4
        const auto child_it = std::ranges::find(file_ordered_children, &child_mapping);
1129
4
        DORIS_CHECK(child_it != file_ordered_children.end());
1130
4
        return static_cast<size_t>(std::distance(file_ordered_children.begin(), child_it));
1131
11
    }
1132
1133
    static std::vector<const ColumnMapping*> _child_mappings_in_table_type_order(
1134
7
            const ColumnMapping& mapping, const DataTypeStruct& table_type) {
1135
7
        std::vector<const ColumnMapping*> result;
1136
7
        result.reserve(mapping.child_mappings.size());
1137
23
        for (size_t child_idx = 0; child_idx < table_type.get_elements().size(); ++child_idx) {
1138
16
            const auto& child_name = table_type.get_element_name(child_idx);
1139
16
            const auto child_it = std::ranges::find_if(
1140
28
                    mapping.child_mappings, [&](const ColumnMapping& child_mapping) {
1141
28
                        return child_mapping.table_column_name == child_name;
1142
28
                    });
1143
16
            DORIS_CHECK(child_it != mapping.child_mappings.end())
1144
0
                    << mapping.debug_string() << ", table_child_name=" << child_name;
1145
16
            result.push_back(&*child_it);
1146
16
        }
1147
7
        return result;
1148
7
    }
1149
1150
    static const IColumn* _nested_column_if_nullable(const ColumnPtr& column,
1151
12
                                                     const NullMap** null_map) {
1152
12
        DORIS_CHECK(column.get() != nullptr);
1153
12
        if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) {
1154
8
            if (null_map != nullptr) {
1155
8
                *null_map = &nullable_column->get_null_map_data();
1156
8
            }
1157
8
            return &nullable_column->get_nested_column();
1158
8
        }
1159
4
        return column.get();
1160
12
    }
1161
1162
    Status _materialize_struct_mapping_column(const ColumnMapping& mapping,
1163
                                              const ColumnPtr& file_column, const size_t rows,
1164
7
                                              ColumnPtr* column) {
1165
7
        DORIS_CHECK(mapping.table_type != nullptr);
1166
7
        const auto* table_type =
1167
7
                assert_cast<const DataTypeStruct*>(remove_nullable(mapping.table_type).get());
1168
7
        const auto full_file_column = file_column->convert_to_full_column_if_const();
1169
7
        const NullMap* parent_null_map = nullptr;
1170
7
        const auto* nested_file_column =
1171
7
                _nested_column_if_nullable(full_file_column, &parent_null_map);
1172
7
        const auto* file_struct = assert_cast<const ColumnStruct*>(nested_file_column);
1173
7
        DORIS_CHECK(table_type->get_elements().size() == mapping.child_mappings.size());
1174
1175
7
        Columns child_columns;
1176
7
        child_columns.reserve(mapping.child_mappings.size());
1177
7
        const auto file_ordered_children =
1178
7
                _present_child_mappings_in_file_order(mapping.child_mappings);
1179
7
        const auto table_ordered_children =
1180
7
                _child_mappings_in_table_type_order(mapping, *table_type);
1181
16
        for (const auto* child_mapping : table_ordered_children) {
1182
16
            DORIS_CHECK(child_mapping != nullptr);
1183
16
            if (!child_mapping->file_local_id.has_value()) {
1184
5
                child_columns.push_back(
1185
5
                        child_mapping->table_type->create_column_const_with_default_value(rows)
1186
5
                                ->convert_to_full_column_if_const());
1187
5
                continue;
1188
5
            }
1189
11
            const auto file_child_idx =
1190
11
                    _file_child_ordinal_for_mapping(mapping, *child_mapping, file_ordered_children);
1191
11
            DORIS_CHECK(file_child_idx < file_struct->get_columns().size());
1192
11
            ColumnPtr child_column = file_struct->get_column_ptr(file_child_idx);
1193
11
            RETURN_IF_ERROR(_materialize_present_child_mapping_column(*child_mapping, child_column,
1194
11
                                                                      rows, &child_column));
1195
11
            child_columns.push_back(std::move(child_column));
1196
11
        }
1197
7
        MutableColumns mutable_child_columns;
1198
7
        mutable_child_columns.reserve(child_columns.size());
1199
16
        for (auto& child_column : child_columns) {
1200
16
            mutable_child_columns.push_back(IColumn::mutate(std::move(child_column)));
1201
16
        }
1202
7
        auto result = ColumnStruct::create(std::move(mutable_child_columns));
1203
7
        if (mapping.table_type->is_nullable()) {
1204
5
            auto null_map = ColumnUInt8::create();
1205
5
            auto& null_map_data = null_map->get_data();
1206
5
            null_map_data.resize(rows);
1207
5
            if (parent_null_map != nullptr) {
1208
5
                DORIS_CHECK(parent_null_map->size() == rows);
1209
5
                null_map_data.assign(parent_null_map->begin(), parent_null_map->end());
1210
5
            } else {
1211
0
                std::fill(null_map_data.begin(), null_map_data.end(), 0);
1212
0
            }
1213
5
            *column = ColumnNullable::create(std::move(result), std::move(null_map));
1214
5
        } else {
1215
2
            *column = std::move(result);
1216
2
        }
1217
7
        return Status::OK();
1218
7
    }
1219
1220
    Status _materialize_array_mapping_column(const ColumnMapping& mapping,
1221
                                             const ColumnPtr& file_column, const size_t rows,
1222
2
                                             ColumnPtr* column) {
1223
2
        DORIS_CHECK(mapping.child_mappings.size() == 1);
1224
2
        const auto full_file_column = file_column->convert_to_full_column_if_const();
1225
2
        const NullMap* parent_null_map = nullptr;
1226
2
        const auto* nested_file_column =
1227
2
                _nested_column_if_nullable(full_file_column, &parent_null_map);
1228
2
        const auto* file_array = assert_cast<const ColumnArray*>(nested_file_column);
1229
2
        ColumnPtr nested_column = file_array->get_data_ptr();
1230
2
        const auto& element_mapping = mapping.child_mappings[0];
1231
2
        RETURN_IF_ERROR(_materialize_present_child_mapping_column(
1232
2
                element_mapping, nested_column, nested_column->size(), &nested_column));
1233
2
        auto offsets_column = file_array->get_offsets_ptr()->convert_to_full_column_if_const();
1234
2
        auto result = ColumnArray::create(IColumn::mutate(std::move(nested_column)),
1235
2
                                          IColumn::mutate(std::move(offsets_column)));
1236
2
        if (mapping.table_type->is_nullable()) {
1237
2
            auto null_map = ColumnUInt8::create();
1238
2
            auto& null_map_data = null_map->get_data();
1239
2
            null_map_data.resize(rows);
1240
2
            if (parent_null_map != nullptr) {
1241
2
                DORIS_CHECK(parent_null_map->size() == rows);
1242
2
                null_map_data.assign(parent_null_map->begin(), parent_null_map->end());
1243
2
            } else {
1244
0
                std::fill(null_map_data.begin(), null_map_data.end(), 0);
1245
0
            }
1246
2
            *column = ColumnNullable::create(std::move(result), std::move(null_map));
1247
2
        } else {
1248
0
            *column = std::move(result);
1249
0
        }
1250
2
        return Status::OK();
1251
2
    }
1252
1253
    Status _materialize_map_mapping_column(const ColumnMapping& mapping,
1254
                                           const ColumnPtr& file_column, const size_t rows,
1255
3
                                           ColumnPtr* column) {
1256
3
        const auto full_file_column = file_column->convert_to_full_column_if_const();
1257
3
        const NullMap* parent_null_map = nullptr;
1258
3
        const auto* nested_file_column =
1259
3
                _nested_column_if_nullable(full_file_column, &parent_null_map);
1260
3
        const auto* file_map = assert_cast<const ColumnMap*>(nested_file_column);
1261
3
        ColumnPtr key_column = file_map->get_keys_ptr();
1262
3
        ColumnPtr value_column = file_map->get_values_ptr();
1263
1264
3
        const ColumnMapping* key_mapping = nullptr;
1265
3
        const ColumnMapping* value_mapping = nullptr;
1266
5
        for (const auto& child_mapping : mapping.child_mappings) {
1267
5
            if (!child_mapping.file_local_id.has_value()) {
1268
0
                continue;
1269
0
            }
1270
5
            if (*child_mapping.file_local_id == 0) {
1271
2
                key_mapping = &child_mapping;
1272
3
            } else if (*child_mapping.file_local_id == 1) {
1273
3
                value_mapping = &child_mapping;
1274
3
            }
1275
5
        }
1276
1277
3
        if (key_mapping != nullptr) {
1278
2
            RETURN_IF_ERROR(_materialize_present_child_mapping_column(
1279
2
                    *key_mapping, key_column, key_column->size(), &key_column));
1280
2
        }
1281
3
        if (value_mapping != nullptr) {
1282
3
            RETURN_IF_ERROR(_materialize_present_child_mapping_column(
1283
3
                    *value_mapping, value_column, value_column->size(), &value_column));
1284
3
        }
1285
3
        auto offsets_column = file_map->get_offsets_ptr()->convert_to_full_column_if_const();
1286
3
        auto result = ColumnMap::create(IColumn::mutate(std::move(key_column)),
1287
3
                                        IColumn::mutate(std::move(value_column)),
1288
3
                                        IColumn::mutate(std::move(offsets_column)));
1289
3
        if (mapping.table_type->is_nullable()) {
1290
1
            auto null_map = ColumnUInt8::create();
1291
1
            auto& null_map_data = null_map->get_data();
1292
1
            null_map_data.resize(rows);
1293
1
            if (parent_null_map != nullptr) {
1294
1
                DORIS_CHECK(parent_null_map->size() == rows);
1295
1
                null_map_data.assign(parent_null_map->begin(), parent_null_map->end());
1296
1
            } else {
1297
0
                std::fill(null_map_data.begin(), null_map_data.end(), 0);
1298
0
            }
1299
1
            *column = ColumnNullable::create(std::move(result), std::move(null_map));
1300
2
        } else {
1301
2
            *column = std::move(result);
1302
2
        }
1303
3
        return Status::OK();
1304
3
    }
1305
1306
63
    Status _open_mapping_exprs() {
1307
63
        RowDescriptor row_desc;
1308
93
        for (const auto& mapping : _data_reader.column_mapper->mappings()) {
1309
93
            if (mapping.projection != nullptr) {
1310
75
                RETURN_IF_ERROR(mapping.projection->prepare(_runtime_state, row_desc));
1311
75
                RETURN_IF_ERROR(mapping.projection->open(_runtime_state));
1312
75
            }
1313
93
            if (mapping.default_expr != nullptr) {
1314
5
                RETURN_IF_ERROR(mapping.default_expr->prepare(_runtime_state, row_desc));
1315
5
                RETURN_IF_ERROR(mapping.default_expr->open(_runtime_state));
1316
5
            }
1317
93
        }
1318
63
        return Status::OK();
1319
63
    }
1320
1321
    Status _build_file_aggregate_request(TPushAggOp::type agg_type,
1322
7
                                         FileAggregateRequest* request) const {
1323
7
        DORIS_CHECK(request != nullptr);
1324
7
        DORIS_CHECK(_supports_aggregate_pushdown(agg_type));
1325
7
        request->agg_type = agg_type;
1326
7
        request->columns.clear();
1327
7
        if (agg_type == TPushAggOp::type::COUNT) {
1328
2
            return Status::OK();
1329
2
        }
1330
5
        request->columns.reserve(_data_reader.column_mapper->mappings().size());
1331
6
        for (const auto& mapping : _data_reader.column_mapper->mappings()) {
1332
6
            DORIS_CHECK(mapping.file_local_id.has_value());
1333
6
            FileAggregateRequest::Column column;
1334
6
            column.projection = LocalColumnIndex::top_level(LocalColumnId(*mapping.file_local_id));
1335
6
            if (!mapping.child_mappings.empty()) {
1336
1
                RETURN_IF_ERROR(build_aggregate_projection(mapping, &column.projection));
1337
1
            }
1338
6
            request->columns.push_back(std::move(column));
1339
6
        }
1340
5
        return Status::OK();
1341
5
    }
1342
1343
    Status _materialize_aggregate_pushdown_rows(TPushAggOp::type agg_type,
1344
                                                const FileAggregateResult& file_result,
1345
6
                                                Block* block) {
1346
6
        if (agg_type == TPushAggOp::type::COUNT) {
1347
            // COUNT pushdown is not a final count value. It emits `count` default rows so the
1348
            // upper COUNT(*) aggregate can count them and produce the final result, including
1349
            // zero rows when count is 0.
1350
2
            DORIS_CHECK(file_result.count >= 0);
1351
2
            return _materialize_count_rows(cast_set<size_t>(file_result.count), block);
1352
2
        }
1353
        // MIN/MAX pushdown emits two rows, min first and max second, for each projected column.
1354
        // The upper MIN/MAX aggregate consumes those two rows to produce the final aggregate value.
1355
4
        DORIS_CHECK(file_result.columns.size() == _data_reader.column_mapper->mappings().size());
1356
4
        DORIS_CHECK(block->columns() == _data_reader.column_mapper->mappings().size());
1357
4
        Block file_block;
1358
4
        file_block.reserve(_data_reader.file_block_layout.size());
1359
5
        for (const auto& column : _data_reader.file_block_layout) {
1360
5
            file_block.insert({column.type->create_column(), column.type, column.name});
1361
5
        }
1362
9
        for (size_t column_idx = 0; column_idx < file_result.columns.size(); ++column_idx) {
1363
5
            const auto& result_column = file_result.columns[column_idx];
1364
5
            if (!result_column.has_min || !result_column.has_max) {
1365
0
                return Status::NotSupported("Missing min/max aggregate result for column {}",
1366
0
                                            _projected_columns[column_idx].name);
1367
0
            }
1368
5
            bool found_file_column = false;
1369
6
            for (size_t block_position = 0; block_position < _data_reader.file_block_layout.size();
1370
6
                 ++block_position) {
1371
6
                if (_data_reader.file_block_layout[block_position].file_column_id ==
1372
6
                    file_result.columns[column_idx].projection.column_id()) {
1373
5
                    found_file_column = true;
1374
5
                    auto column = file_block.get_by_position(block_position)
1375
5
                                          .type->create_column()
1376
5
                                          ->assert_mutable();
1377
5
                    RETURN_IF_ERROR(_insert_aggregate_projection_value(
1378
5
                            file_result.columns[column_idx].projection, result_column.min_value,
1379
5
                            column.get()));
1380
5
                    RETURN_IF_ERROR(_insert_aggregate_projection_value(
1381
5
                            file_result.columns[column_idx].projection, result_column.max_value,
1382
5
                            column.get()));
1383
5
                    file_block.replace_by_position(block_position, std::move(column));
1384
5
                    break;
1385
5
                }
1386
6
            }
1387
5
            DORIS_CHECK(found_file_column);
1388
5
        }
1389
9
        for (size_t column_idx = 0; column_idx < _data_reader.column_mapper->mappings().size();
1390
5
             ++column_idx) {
1391
5
            ColumnPtr table_column;
1392
5
            RETURN_IF_ERROR(
1393
5
                    _materialize_mapping_column(_data_reader.column_mapper->mappings()[column_idx],
1394
5
                                                &file_block, 2, &table_column));
1395
5
            block->replace_by_position(column_idx, std::move(table_column));
1396
5
        }
1397
4
        return Status::OK();
1398
4
    }
1399
1400
    struct FileBlockColumn {
1401
        LocalColumnId file_column_id = LocalColumnId::invalid();
1402
        std::string name;
1403
        DataTypePtr type;
1404
    };
1405
1406
    struct DataReader {
1407
        std::unique_ptr<FileReader> reader;
1408
        std::unique_ptr<TableColumnMapper> column_mapper;
1409
        // Schema of the data file, also including virtual column (row position).
1410
        std::vector<ColumnDefinition> file_schema;
1411
        // Layout of the block returned by file reader, determined by column mapping and file
1412
        // schema. It is used for file reader to materialize columns into correct type and position.
1413
        std::vector<FileBlockColumn> file_block_layout;
1414
        Block block_template;
1415
    };
1416
    DataReader _data_reader;
1417
    std::vector<ColumnDefinition> _projected_columns;
1418
    std::unique_ptr<ScanTask> _current_task;
1419
    std::optional<io::FileDescription> _current_file_description;
1420
    // Range-level compression has higher priority than scan-param compression. TVF/load can keep
1421
    // the logical format as CSV/TEXT while carrying the concrete compression such as GZ or LZO on
1422
    // each TFileRangeDesc, matching the old FileScanner reader contract.
1423
    TFileCompressType::type _current_range_compress_type = TFileCompressType::UNKNOWN;
1424
    std::optional<TUniqueId> _current_range_load_id;
1425
    std::shared_ptr<io::FileSystemProperties> _system_properties;
1426
    // partition key -> value
1427
    std::map<std::string, Field> _partition_values;
1428
    // Predicates built from scan conjuncts before file-level localization.
1429
    std::vector<TableFilter> _table_filters;
1430
    TableColumnPredicates _table_column_predicates;
1431
    VExprContextSPtrs _conjuncts;
1432
    ReadProfile _profile;
1433
    // Parsed from row-position based delete files, including position delete and deletion vector.
1434
    DeleteRows* _delete_rows = nullptr;
1435
    TFileScanRangeParams* _scan_params;
1436
    std::shared_ptr<io::IOContext> _io_ctx;
1437
    RuntimeState* _runtime_state;
1438
    RuntimeProfile* _scanner_profile;
1439
    const std::vector<SlotDescriptor*>* _file_slot_descs = nullptr;
1440
    FileFormat _format;
1441
    TPushAggOp::type _push_down_agg_type = TPushAggOp::type::NONE;
1442
    uint64_t _condition_cache_digest = 0;
1443
    segment_v2::ConditionCache::ExternalCacheKey _condition_cache_key;
1444
    std::shared_ptr<std::vector<bool>> _condition_cache;
1445
    std::shared_ptr<ConditionCacheContext> _condition_cache_ctx;
1446
    int64_t _condition_cache_hit_count = 0;
1447
    bool _current_reader_reached_eof = false;
1448
    int64_t _remaining_table_level_count = -1;
1449
    std::optional<GlobalRowIdContext> _global_rowid_context;
1450
    bool _aggregate_pushdown_tried = false;
1451
    TableColumnMapperOptions _mapper_options;
1452
1453
private:
1454
    static const ColumnDefinition* _find_column_definition(
1455
110
            const std::vector<ColumnDefinition>& schema, LocalColumnId column_id) {
1456
219
        for (const auto& field : schema) {
1457
219
            if (field.file_local_id() == column_id.value()) {
1458
93
                return &field;
1459
93
            }
1460
219
        }
1461
17
        return nullptr;
1462
110
    }
1463
1464
15
    static bool _can_push_down_minmax_for_mapping(const ColumnMapping& mapping) {
1465
15
        if (mapping.child_mappings.empty()) {
1466
12
            return true;
1467
12
        }
1468
3
        const auto primitive_type = remove_nullable(mapping.file_type)->get_primitive_type();
1469
3
        if (primitive_type != TYPE_STRUCT) {
1470
1
            return false;
1471
1
        }
1472
2
        size_t mapped_children = 0;
1473
2
        const ColumnMapping* mapped_child = nullptr;
1474
2
        for (const auto& child_mapping : mapping.child_mappings) {
1475
2
            if (!child_mapping.file_local_id.has_value()) {
1476
0
                continue;
1477
0
            }
1478
2
            ++mapped_children;
1479
2
            mapped_child = &child_mapping;
1480
2
        }
1481
2
        return mapped_children == 1 && mapped_child != nullptr &&
1482
2
               _can_push_down_minmax_for_mapping(*mapped_child);
1483
3
    }
1484
1485
    static Status build_aggregate_projection(const ColumnMapping& mapping,
1486
2
                                             LocalColumnIndex* projection) {
1487
2
        DORIS_CHECK(projection != nullptr);
1488
2
        DORIS_CHECK(mapping.file_local_id.has_value());
1489
2
        *projection = LocalColumnIndex::local(*mapping.file_local_id);
1490
2
        projection->children.clear();
1491
2
        projection->project_all_children = true;
1492
2
        if (mapping.child_mappings.empty()) {
1493
1
            return Status::OK();
1494
1
        }
1495
1
        projection->project_all_children = false;
1496
1
        for (const auto& child_mapping : mapping.child_mappings) {
1497
1
            if (!child_mapping.file_local_id.has_value()) {
1498
0
                continue;
1499
0
            }
1500
1
            LocalColumnIndex child_projection;
1501
1
            RETURN_IF_ERROR(build_aggregate_projection(child_mapping, &child_projection));
1502
1
            projection->children.push_back(std::move(child_projection));
1503
1
        }
1504
1
        DORIS_CHECK(projection->children.size() == 1);
1505
1
        return Status::OK();
1506
1
    }
1507
1508
    static Status _insert_aggregate_projection_value(const LocalColumnIndex& projection,
1509
24
                                                     const Field& value, IColumn* column) {
1510
24
        DORIS_CHECK(column != nullptr);
1511
24
        if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) {
1512
12
            RETURN_IF_ERROR(_insert_aggregate_projection_value(
1513
12
                    projection, value, &nullable_column->get_nested_column()));
1514
12
            nullable_column->get_null_map_data().push_back(0);
1515
12
            return Status::OK();
1516
12
        }
1517
12
        if (projection.project_all_children || projection.children.empty()) {
1518
10
            column->insert(value);
1519
10
            return Status::OK();
1520
10
        }
1521
2
        auto* struct_column = assert_cast<ColumnStruct*>(column);
1522
2
        DORIS_CHECK(projection.children.size() == 1);
1523
2
        const auto& child_projection = projection.children[0];
1524
2
        DORIS_CHECK(struct_column->get_columns().size() == 1);
1525
2
        RETURN_IF_ERROR(_insert_aggregate_projection_value(child_projection, value,
1526
2
                                                           &struct_column->get_column(0)));
1527
2
        return Status::OK();
1528
2
    }
1529
1530
    // Parse row-position deletes from table format specific parameters, and fill in _delete_rows.
1531
    Status _parse_delete_predicates(const SplitReadOptions& options);
1532
};
1533
1534
} // namespace doris::format