Coverage Report

Created: 2026-07-02 17:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format_v2/table_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <bvar/status.h>
21
22
#include <algorithm>
23
#include <exception>
24
#include <map>
25
#include <memory>
26
#include <optional>
27
#include <string>
28
#include <string_view>
29
#include <utility>
30
#include <vector>
31
32
#include "common/cast_set.h"
33
#include "common/exception.h"
34
#include "common/logging.h"
35
#include "common/status.h"
36
#include "core/assert_cast.h"
37
#include "core/block/block.h"
38
#include "core/column/column_array.h"
39
#include "core/column/column_const.h"
40
#include "core/column/column_map.h"
41
#include "core/column/column_nullable.h"
42
#include "core/column/column_struct.h"
43
#include "core/column/column_vector.h"
44
#include "core/data_type/data_type.h"
45
#include "core/data_type/data_type_array.h"
46
#include "core/data_type/data_type_map.h"
47
#include "core/data_type/data_type_nullable.h"
48
#include "core/data_type/data_type_number.h"
49
#include "core/data_type/data_type_string.h"
50
#include "core/data_type/data_type_struct.h"
51
#include "core/field.h"
52
#include "exec/common/stringop_substring.h"
53
#include "exprs/vexpr.h"
54
#include "exprs/vexpr_context.h"
55
#include "exprs/vexpr_fwd.h"
56
#include "exprs/vslot_ref.h"
57
#include "format_v2/column_data.h"
58
#include "format_v2/column_mapper.h"
59
#include "format_v2/expr/cast.h"
60
#include "format_v2/expr/delete_predicate.h"
61
#include "format_v2/file_reader.h"
62
#include "format_v2/parquet/reader/column_reader.h"
63
#include "format_v2/schema_projection.h"
64
#include "gen_cpp/PlanNodes_types.h"
65
#include "runtime/descriptors.h"
66
#include "storage/segment/condition_cache.h"
67
68
namespace doris {
69
class Block;
70
class ColumnPredicate;
71
struct DeleteFileDesc;
72
class RuntimeState;
73
} // namespace doris
74
75
namespace doris::format {
76
77
using DeleteRows = std::vector<int64_t>;
78
79
// Row-level predicates on table/global schema. They are rewritten to file-local expressions when
80
// possible, and remain the source of row-level filtering after localization.
81
struct TableFilter {
82
    VExprContextSPtr conjunct;
83
    std::vector<GlobalIndex> global_indices;
84
};
85
86
struct ScanTask {
87
547
    virtual ~ScanTask() = default;
88
89
    std::unique_ptr<io::FileDescription> data_file;
90
};
91
92
struct ProjectedColumnBuildContext {
93
    const TFileScanRangeParams* scan_params = nullptr;
94
    const TFileRangeDesc* range = nullptr;
95
    RuntimeState* runtime_state = nullptr;
96
    std::optional<ColumnDefinition> schema_column = std::nullopt;
97
    size_t next_file_column_idx = 0;
98
};
99
100
struct ReadProfile {
101
    RuntimeProfile::Counter* num_delete_files = nullptr;
102
    RuntimeProfile::Counter* num_delete_rows = nullptr;
103
    RuntimeProfile::Counter* parse_delete_file_time = nullptr;
104
    RuntimeProfile::Counter* exec_timer = nullptr;
105
    RuntimeProfile::Counter* prepare_split_timer = nullptr;
106
    RuntimeProfile::Counter* finalize_timer = nullptr;
107
    RuntimeProfile::Counter* create_reader_timer = nullptr;
108
    RuntimeProfile::Counter* pushdown_agg_timer = nullptr;
109
    RuntimeProfile::Counter* open_reader_timer = nullptr;
110
};
111
112
struct TableReadOptions {
113
    // Columns need to be read from file and output by table reader. They are all in table/global
114
    // schema semantics.
115
    const std::vector<ColumnDefinition> projected_columns;
116
    // Simple predicates for a single column, which is parsed on scan operator.
117
    const TableColumnPredicates column_predicates;
118
    // All complex conjuncts from scan operator
119
    const VExprContextSPtrs conjuncts;
120
    // File format of the underlying data files, needed for reader initialization and reader-level
121
    // filter pushdown.
122
    const FileFormat format;
123
    TFileScanRangeParams* scan_params;
124
    std::shared_ptr<io::IOContext> io_ctx;
125
    RuntimeState* runtime_state;
126
    RuntimeProfile* scanner_profile;
127
    // File formats without complete self-describing metadata, such as CSV, Text, and JSON, need
128
    // the FE-planned physical file slots to build their file-local schema and deserialize values.
129
    const std::vector<SlotDescriptor*>* file_slot_descs = nullptr;
130
    // Push-down aggregate type.
131
    const TPushAggOp::type push_down_agg_type = TPushAggOp::type::NONE;
132
    // Digest of stable pushed-down predicates. A zero digest disables condition cache.
133
    uint64_t condition_cache_digest = 0;
134
};
135
136
struct SplitReadOptions {
137
    // Split-level information for reader initialization, which may include file path, partition values, delete file info, etc. The content is table format specific and opaque to table reader base class; it's the responsibility of the concrete table reader implementation to parse necessary information for reader initialization and filter pushdown.
138
    std::map<std::string, Field> partition_values;
139
    ShardedKVCache* cache;
140
    TFileRangeDesc current_range;
141
    FileFormat current_split_format = FileFormat::PARQUET;
142
    std::optional<GlobalRowIdContext> global_rowid_context;
143
};
144
145
// Base class for table-level readers.
146
// This layer owns common table-level orchestration, such as split iteration, dynamic partition
147
// pruning, delete handling and conversion from file-local blocks to table-schema blocks. Concrete
148
// table-format readers only need to provide format-specific hooks for opening readers and parsing
149
// split metadata.
150
class TableReader {
151
public:
152
565
    virtual ~TableReader() = default;
153
154
    // Initialize common runtime options for the table reader. Subclasses may call this from their
155
    // own init(options); table-format schema and split metadata are provided later per split.
156
    virtual Status init(TableReadOptions&& options);
157
158
    // FileScannerV2 adjusts this before each get_block() using an adaptive bytes-per-row estimate.
159
    // Store it here as well as forwarding to the current reader so newly opened split readers start
160
    // with the latest predicted batch size.
161
1.11k
    void set_batch_size(size_t batch_size) {
162
1.11k
        _batch_size = std::max<size_t>(1, batch_size);
163
1.11k
        if (_data_reader.reader != nullptr) {
164
648
            _data_reader.reader->set_batch_size(_batch_size);
165
648
        }
166
1.11k
    }
167
168
    // Prepare for reading a new split/task.
169
    // 1. Pass a new split/task to reader, which will be used in subsequent open_reader() to initialize the underlying file reader.
170
    // 2. Parse delete predicates from split/task information, which will be used for later dynamic filtering and delete handling.
171
    virtual Status prepare_split(const SplitReadOptions& options);
172
173
    // Public entry point for reading a table-schema block. The base class opens the current reader,
174
    // advances across EOF, and closes exhausted readers. Subclasses provide protected hooks for
175
    // table-format-specific behavior.
176
1.20k
    virtual Status get_block(Block* block, bool* eos) {
177
1.20k
        SCOPED_TIMER(_profile.exec_timer);
178
1.20k
        DORIS_CHECK(block->columns() == _projected_columns.size());
179
1.20k
        block->clear_column_data(_projected_columns.size());
180
181
1.68k
        while (true) {
182
1.68k
            if (*eos) {
183
0
                return Status::OK();
184
0
            }
185
1.68k
            if (!_data_reader.reader) {
186
1.01k
                if (_is_table_level_count_active()) {
187
6
                    RETURN_IF_ERROR(_read_table_level_count(block, eos));
188
6
                    return Status::OK();
189
6
                }
190
1.01k
                RETURN_IF_ERROR(create_next_reader(eos));
191
1.01k
                if (!_data_reader.reader) {
192
479
                    DCHECK(*eos);
193
479
                    return Status::OK();
194
479
                }
195
1.01k
            }
196
197
            // Materialize a reduced row set for upper aggregate operators when aggregate
198
            // pushdown can be applied. This is not the final aggregate result: COUNT emits
199
            // `count` default rows for the upper COUNT(*), and MIN/MAX emits two rows containing
200
            // file-level min/max values for the upper MIN/MAX.
201
1.19k
            if (!_aggregate_pushdown_tried) {
202
531
                SCOPED_TIMER(_profile.pushdown_agg_timer);
203
531
                bool pushed_down = false;
204
531
                RETURN_IF_ERROR(_try_materialize_aggregate_pushdown_rows(block, &pushed_down));
205
531
                if (pushed_down) {
206
6
                    return Status::OK();
207
6
                }
208
531
            }
209
210
1.19k
            bool current_eof = false;
211
1.19k
            _data_reader.block_template.clear_column_data(
212
1.19k
                    cast_set<int64_t>(_data_reader.file_block_layout.size()));
213
1.19k
            size_t current_rows = 0;
214
1.19k
            RETURN_IF_ERROR(_data_reader.reader->get_block(&_data_reader.block_template,
215
1.19k
                                                           &current_rows, &current_eof));
216
1.19k
            if (current_rows == 0) {
217
477
                if (current_eof) {
218
477
                    _current_reader_reached_eof = true;
219
477
                    RETURN_IF_ERROR(close_current_reader());
220
477
                }
221
477
                continue;
222
477
            }
223
1.19k
            DCHECK_EQ(_data_reader.block_template.columns(), _data_reader.file_block_layout.size())
224
0
                    << _data_reader.block_template.dump_structure();
225
713
#ifndef NDEBUG
226
713
            RETURN_IF_ERROR(_check_file_block_columns("after file reader get_block", current_rows));
227
713
#endif
228
713
            DORIS_CHECK(block->columns() == _data_reader.column_mapper->mappings().size());
229
713
            RETURN_IF_ERROR(finalize_chunk(block, current_rows));
230
713
#ifndef NDEBUG
231
713
            RETURN_IF_ERROR(
232
713
                    _check_table_block_columns("after finalize_chunk", block, current_rows));
233
713
#endif
234
713
            if (current_eof) {
235
6
                _current_reader_reached_eof = true;
236
6
                RETURN_IF_ERROR(close_current_reader());
237
6
            }
238
713
            return Status::OK();
239
713
        }
240
1.20k
    }
241
242
    // Close the table reader and the currently active file reader. Subclasses that hold additional
243
    // table-format resources should override this and call TableReader::close() first.
244
531
    virtual Status close() {
245
531
        if (_data_reader.reader) {
246
42
            RETURN_IF_ERROR(close_current_reader());
247
42
        }
248
531
        _current_task.reset();
249
531
        _current_file_description.reset();
250
531
        _remaining_table_level_count = -1;
251
531
        return Status::OK();
252
531
    }
253
254
931
    int64_t condition_cache_hit_count() const { return _condition_cache_hit_count; }
255
256
    virtual std::string debug_string() const;
257
258
    virtual Status annotate_projected_column(const TFileScanSlotInfo& slot_info,
259
                                             ProjectedColumnBuildContext* context,
260
                                             ColumnDefinition* column) const;
261
262
464
    virtual Status validate_projected_columns(const ProjectedColumnBuildContext& context) const {
263
464
        (void)context;
264
464
        return Status::OK();
265
464
    }
266
267
protected:
268
    // Parse deletion vector information from table format specific file description.
269
    virtual Status _parse_deletion_vector_file(const TTableFormatFileDesc& t_desc,
270
519
                                               DeleteFileDesc* desc, bool* has_delete_file) {
271
519
        *has_delete_file = false;
272
519
        return Status::OK();
273
519
    }
274
275
    // Advance to the next reader. This closes the current reader first and then opens the next
276
    // concrete reader. Subclasses should not duplicate this loop.
277
    Status create_next_reader(bool* eos);
278
    virtual Status create_file_reader(std::unique_ptr<FileReader>* reader);
279
512
    virtual TableColumnMappingMode mapping_mode() const { return TableColumnMappingMode::BY_NAME; }
280
531
    virtual Status annotate_file_schema(std::vector<ColumnDefinition>* file_schema) {
281
531
        DORIS_CHECK(file_schema != nullptr);
282
531
        return Status::OK();
283
531
    }
284
285
    // Open the concrete reader for the current split/task and build the file-local scan request.
286
532
    virtual Status open_reader() {
287
532
        SCOPED_TIMER(_profile.open_reader_timer);
288
        // 1. Get file schema and create column mapping.
289
532
        std::vector<ColumnDefinition> file_schema;
290
532
        RETURN_IF_ERROR(_data_reader.reader->get_schema(&file_schema));
291
        // For Paimon/Hudi, FE can provide field ids through `history_schema_info`. Annotate the
292
        // file schema before column mapping when the table format maps columns by field id.
293
532
        RETURN_IF_ERROR(annotate_file_schema(&file_schema));
294
532
        _data_reader.file_schema = file_schema;
295
532
        _mapper_options.mode = mapping_mode();
296
297
532
        _data_reader.column_mapper = _data_reader.reader->create_column_mapper(_mapper_options);
298
532
        DORIS_CHECK(_data_reader.column_mapper != nullptr);
299
532
        RETURN_IF_ERROR(_data_reader.column_mapper->create_mapping(_projected_columns,
300
532
                                                                   _partition_values, file_schema));
301
532
        DORIS_CHECK(_data_reader.column_mapper->mappings().size() == _projected_columns.size());
302
303
        // 2. Build table filters based on conjuncts and column predicates.
304
532
        RETURN_IF_ERROR(_build_table_filters_from_conjuncts());
305
306
        // 3. Create file scan request based on column mapping and table filters, then open file
307
        // reader with the request. File scan request carries row-level expression filters and
308
        // file-level pruning hints. Only expression filters decide returned rows; column predicates
309
        // are pruning hints.
310
532
        auto file_request = std::make_shared<FileScanRequest>();
311
532
        RETURN_IF_ERROR(_data_reader.column_mapper->create_scan_request(
312
532
                _table_filters, _table_column_predicates, _projected_columns, file_request.get(),
313
532
                _runtime_state));
314
532
        bool constant_filter_pruned_split = false;
315
532
        RETURN_IF_ERROR(_evaluate_constant_filters(&constant_filter_pruned_split));
316
532
        if (constant_filter_pruned_split) {
317
1
            RETURN_IF_ERROR(close_current_reader());
318
1
            return Status::OK();
319
1
        }
320
531
        RETURN_IF_ERROR(customize_file_scan_request(file_request.get()));
321
531
        RETURN_IF_ERROR(_open_local_filter_exprs(*file_request));
322
531
        _data_reader.file_block_layout.clear();
323
531
        _data_reader.block_template.clear();
324
531
        _data_reader.file_block_layout.resize(file_request->local_positions.size());
325
326
        // 4. Build file block layout from file schema and column mapping. The layout describes
327
        // the block returned by file reader before table-column materialization.
328
3.16k
        for (const auto& [file_column_id, block_position] : file_request->local_positions) {
329
3.16k
            DORIS_CHECK(block_position.value() < _data_reader.file_block_layout.size());
330
3.16k
            const auto* field = _find_column_definition(_data_reader.file_schema, file_column_id);
331
3.16k
            DORIS_CHECK(field != nullptr);
332
333
3.16k
            ColumnDefinition projected_field;
334
3.16k
            {
335
3.16k
                auto it = std::find_if(
336
3.16k
                        file_request->non_predicate_columns.begin(),
337
3.16k
                        file_request->non_predicate_columns.end(),
338
22.6k
                        [&](const LocalColumnIndex& p) { return p.column_id() == file_column_id; });
339
3.16k
                if (it != file_request->non_predicate_columns.end()) {
340
3.13k
                    RETURN_IF_ERROR(project_column_definition(*field, *it, &projected_field));
341
3.13k
                }
342
3.16k
            }
343
3.16k
            {
344
3.16k
                auto it = std::find_if(
345
3.16k
                        file_request->predicate_columns.begin(),
346
3.16k
                        file_request->predicate_columns.end(),
347
3.16k
                        [&](const LocalColumnIndex& p) { return p.column_id() == file_column_id; });
348
3.16k
                if (it != file_request->predicate_columns.end()) {
349
31
                    RETURN_IF_ERROR(project_column_definition(*field, *it, &projected_field));
350
31
                }
351
3.16k
            }
352
3.16k
            _data_reader.file_block_layout[block_position.value()] = {
353
3.16k
                    .file_column_id = file_column_id,
354
3.16k
                    .name = projected_field.name,
355
3.16k
                    .type = projected_field.type,
356
3.16k
            };
357
3.16k
            DORIS_CHECK(_data_reader.file_block_layout[block_position.value()].type != nullptr);
358
3.16k
        }
359
360
        // 5. Prepare block template from file block layout. The block template stores the block
361
        // returned by file reader before table-column materialization.
362
531
        _data_reader.block_template.reserve(_data_reader.file_block_layout.size());
363
3.16k
        for (const auto& column : _data_reader.file_block_layout) {
364
3.16k
            _data_reader.block_template.insert(
365
3.16k
                    {column.type->create_column(), column.type, column.name});
366
3.16k
        }
367
531
        if (VLOG_DEBUG_IS_ON) {
368
0
            VLOG_DEBUG << "TableReader debug: " << debug_string();
369
0
        }
370
531
        RETURN_IF_ERROR(_open_mapping_exprs());
371
531
        RETURN_IF_ERROR(_data_reader.reader->open(file_request));
372
531
        RETURN_IF_ERROR(_init_reader_condition_cache(*file_request));
373
531
        return Status::OK();
374
531
    }
375
376
    Status _build_table_filters_from_conjuncts();
377
    Status _open_local_filter_exprs(const FileScanRequest& file_request);
378
    Status _init_reader_condition_cache(const FileScanRequest& file_request);
379
    void _finalize_reader_condition_cache();
380
    bool _should_enable_condition_cache(const FileScanRequest& file_request) const;
381
382
532
    Status _evaluate_constant_filters(bool* can_filter_all) {
383
532
        DORIS_CHECK(can_filter_all != nullptr);
384
532
        *can_filter_all = false;
385
532
        for (const auto& table_filter : _table_filters) {
386
29
            if (table_filter.conjunct == nullptr ||
387
                // RuntimeFilterExpr does not implement execute_column_impl(); it is evaluated by
388
                // the row-level filter path through execute_filter(). Constant split pruning uses
389
                // VExprContext::execute() on a one-row synthetic block, so runtime filters must not
390
                // be pre-executed here even when their referenced slot maps to a constant value.
391
29
                table_filter.conjunct->root()->is_rf_wrapper() ||
392
29
                !_table_filter_has_only_constant_entries(table_filter)) {
393
27
                continue;
394
27
            }
395
2
            Block eval_block;
396
2
            RETURN_IF_ERROR(_build_constant_filter_block(table_filter, &eval_block));
397
2
            RowDescriptor row_desc;
398
2
            RETURN_IF_ERROR(table_filter.conjunct->prepare(_runtime_state, row_desc));
399
2
            RETURN_IF_ERROR(table_filter.conjunct->open(_runtime_state));
400
2
            int result_column_id = -1;
401
2
            RETURN_IF_ERROR(table_filter.conjunct->execute(&eval_block, &result_column_id));
402
2
            DORIS_CHECK(result_column_id >= 0);
403
2
            if (_filter_result_filters_all(eval_block.get_by_position(result_column_id).column)) {
404
1
                *can_filter_all = true;
405
1
                return Status::OK();
406
1
            }
407
2
        }
408
531
        return Status::OK();
409
532
    }
410
411
27
    bool _table_filter_has_only_constant_entries(const TableFilter& table_filter) const {
412
27
        const auto& filter_entries = _data_reader.column_mapper->filter_entries();
413
27
        for (const auto global_index : table_filter.global_indices) {
414
27
            const auto entry_it = filter_entries.find(global_index);
415
27
            if (entry_it == filter_entries.end() || !entry_it->second.is_constant()) {
416
25
                return false;
417
25
            }
418
27
        }
419
2
        return !table_filter.global_indices.empty();
420
27
    }
421
422
2
    Status _build_constant_filter_block(const TableFilter& table_filter, Block* eval_block) {
423
2
        DORIS_CHECK(eval_block != nullptr);
424
2
        eval_block->clear();
425
2
        const auto& mappings = _data_reader.column_mapper->mappings();
426
2
        const auto& filter_entries = _data_reader.column_mapper->filter_entries();
427
2
        DORIS_CHECK(mappings.size() == _projected_columns.size());
428
4
        for (size_t column_idx = 0; column_idx < mappings.size(); ++column_idx) {
429
2
            const auto global_index = GlobalIndex(column_idx);
430
2
            const auto& mapping = mappings[column_idx];
431
2
            const auto entry_it = filter_entries.find(global_index);
432
2
            const bool referenced_by_filter =
433
2
                    std::find(table_filter.global_indices.begin(),
434
2
                              table_filter.global_indices.end(),
435
2
                              global_index) != table_filter.global_indices.end();
436
2
            if (referenced_by_filter && entry_it != filter_entries.end() &&
437
2
                entry_it->second.is_constant()) {
438
2
                ColumnPtr constant_column;
439
2
                RETURN_IF_ERROR(_materialize_constant_filter_column(
440
2
                        entry_it->second.constant_index(), &constant_column));
441
2
                eval_block->insert({std::move(constant_column), mapping.table_type,
442
2
                                    mapping.table_column_name});
443
2
            } else {
444
0
                eval_block->insert({mapping.table_type->create_column_const_with_default_value(1),
445
0
                                    mapping.table_type, mapping.table_column_name});
446
0
            }
447
2
        }
448
2
        return Status::OK();
449
2
    }
450
451
2
    Status _materialize_constant_filter_column(ConstantIndex constant_index, ColumnPtr* column) {
452
2
        DORIS_CHECK(column != nullptr);
453
2
        const auto& constant_entry = _data_reader.column_mapper->constant_map().get(constant_index);
454
2
        DORIS_CHECK(constant_entry.expr != nullptr);
455
2
        DORIS_CHECK(constant_entry.type != nullptr);
456
2
        RowDescriptor row_desc;
457
2
        RETURN_IF_ERROR(constant_entry.expr->prepare(_runtime_state, row_desc));
458
2
        RETURN_IF_ERROR(constant_entry.expr->open(_runtime_state));
459
2
        Block eval_block;
460
2
        eval_block.insert({constant_entry.type->create_column_const_with_default_value(1),
461
2
                           constant_entry.type, "__table_reader_constant_filter"});
462
2
        int result_column_id = -1;
463
2
        RETURN_IF_ERROR(constant_entry.expr->execute(&eval_block, &result_column_id));
464
2
        DORIS_CHECK(result_column_id >= 0);
465
2
        *column = eval_block.get_by_position(result_column_id).column;
466
2
        DORIS_CHECK((*column)->size() == 1);
467
2
        return Status::OK();
468
2
    }
469
470
2
    static bool _filter_result_filters_all(const ColumnPtr& filter_column) {
471
2
        DORIS_CHECK(filter_column.get() != nullptr);
472
2
        DORIS_CHECK(filter_column->size() == 1);
473
2
        return !filter_column->get_bool(0);
474
2
    }
475
476
532
    virtual Status customize_file_scan_request(FileScanRequest* file_request) {
477
532
        return _append_delete_predicate(file_request);
478
532
    }
479
480
1.59k
    bool _is_table_level_count_active() const { return _remaining_table_level_count >= 0; }
481
482
6
    Status _materialize_count_rows(size_t rows, Block* block) const {
483
6
        DORIS_CHECK(block != nullptr);
484
6
        DORIS_CHECK(block->columns() > 0 || rows == 0);
485
12
        for (size_t column_idx = 0; column_idx < block->columns(); ++column_idx) {
486
6
            auto column = block->get_by_position(column_idx).type->create_column();
487
6
            column->resize(rows);
488
6
            block->replace_by_position(column_idx, std::move(column));
489
6
        }
490
6
        return Status::OK();
491
6
    }
492
493
6
    Status _read_table_level_count(Block* block, bool* eos) {
494
6
        DORIS_CHECK(block != nullptr);
495
6
        DORIS_CHECK(eos != nullptr);
496
6
        DORIS_CHECK(_push_down_agg_type == TPushAggOp::type::COUNT);
497
6
        DORIS_CHECK(_remaining_table_level_count >= 0);
498
6
        if (_remaining_table_level_count == 0) {
499
2
            _remaining_table_level_count = -1;
500
2
            _current_task.reset();
501
2
            *eos = true;
502
2
            return Status::OK();
503
2
        }
504
505
4
        const int64_t batch_size = _runtime_state == nullptr
506
4
                                           ? _remaining_table_level_count
507
4
                                           : static_cast<int64_t>(_runtime_state->batch_size());
508
4
        const auto rows = std::min(_remaining_table_level_count, batch_size);
509
4
        RETURN_IF_ERROR(_materialize_count_rows(cast_set<size_t>(rows), block));
510
4
        _remaining_table_level_count -= rows;
511
4
        *eos = false;
512
4
        return Status::OK();
513
4
    }
514
515
    void _append_file_scan_column(FileScanRequest* request, LocalColumnId column_id,
516
24
                                  std::vector<LocalColumnIndex>* scan_columns) {
517
24
        DORIS_CHECK(request != nullptr);
518
24
        DORIS_CHECK(scan_columns != nullptr);
519
24
        FileScanRequestBuilder builder(request);
520
24
        Status status;
521
24
        if (scan_columns == &request->predicate_columns) {
522
13
            status = builder.add_predicate_column(column_id);
523
13
        } else {
524
11
            DORIS_CHECK(scan_columns == &request->non_predicate_columns);
525
11
            status = builder.add_non_predicate_column(column_id);
526
11
        }
527
24
        DORIS_CHECK(status.ok()) << status.to_string();
528
24
        if (column_id == LocalColumnId(ROW_POSITION_COLUMN_ID) &&
529
24
            _find_column_definition(_data_reader.file_schema, column_id) == nullptr) {
530
17
            _data_reader.file_schema.push_back(row_position_column_definition());
531
17
        }
532
24
    }
533
534
    // Append DeletePredicate to file scan request if there are deletes. The predicate will be evaluated in file reader level and filter out deleted rows before returning data to table reader.
535
532
    Status _append_delete_predicate(FileScanRequest* request) {
536
532
        DORIS_CHECK(request != nullptr);
537
532
        if (_delete_rows == nullptr || _delete_rows->empty()) {
538
522
            return Status::OK();
539
522
        }
540
10
        const auto row_position_column_id = LocalColumnId(ROW_POSITION_COLUMN_ID);
541
10
        _append_file_scan_column(request, row_position_column_id, &request->predicate_columns);
542
543
10
        auto delete_predicate = std::make_shared<DeletePredicate>(*_delete_rows);
544
10
        const auto block_position = request->local_positions.at(row_position_column_id);
545
10
        delete_predicate->add_child(VSlotRef::create_shared(
546
10
                cast_set<int>(block_position.value()), cast_set<int>(block_position.value()), -1,
547
10
                std::make_shared<DataTypeInt64>(), ROW_POSITION_COLUMN_NAME));
548
549
10
        request->delete_conjuncts.push_back(
550
10
                VExprContext::create_shared(std::move(delete_predicate)));
551
10
        return Status::OK();
552
532
    }
553
554
    // Close the current concrete reader. This hook is called by both create_next_reader() and
555
    // close(), so it should remain idempotent.
556
532
    virtual Status close_current_reader() {
557
532
        _finalize_reader_condition_cache();
558
532
        RETURN_IF_ERROR(_data_reader.reader->close());
559
532
        _data_reader.reader.reset();
560
532
        if (_data_reader.column_mapper != nullptr) {
561
532
            _data_reader.column_mapper->clear();
562
532
            _data_reader.column_mapper.reset();
563
532
        }
564
532
        _table_filters.clear();
565
532
        _data_reader.file_schema.clear();
566
532
        _data_reader.file_block_layout.clear();
567
532
        _data_reader.block_template.clear();
568
532
        _current_task.reset();
569
532
        _current_file_description.reset();
570
532
        _current_reader_reached_eof = false;
571
532
        return Status::OK();
572
532
    }
573
574
    // Finalize file-local block to table/global schema block.
575
713
    Status finalize_chunk(Block* block, const size_t rows) {
576
713
        SCOPED_TIMER(_profile.finalize_timer);
577
713
        size_t idx = 0;
578
6.08k
        for (const auto& mapping : _data_reader.column_mapper->mappings()) {
579
6.08k
            ColumnPtr column;
580
6.08k
            RETURN_IF_ERROR(_materialize_mapping_column(mapping, &_data_reader.block_template, rows,
581
6.08k
                                                        &column));
582
6.08k
            block->replace_by_position(idx, IColumn::mutate(std::move(column)));
583
6.08k
            idx++;
584
6.08k
        }
585
713
        RETURN_IF_ERROR(materialize_virtual_columns(block));
586
        // Enforce CHAR/VARCHAR length declared by the table schema after all file-to-table
587
        // materialization has finished.
588
713
        RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block));
589
713
        return Status::OK();
590
713
    }
591
592
    // Materialize virtual columns in the table block, such as Iceberg _row_id and
593
    // _last_updated_sequence_number. This runs after normal column materialization so finalize
594
    // expressions can reference those virtual columns.
595
694
    virtual Status materialize_virtual_columns(Block* table_block) { return Status::OK(); }
596
597
#ifndef NDEBUG
598
713
    Status _check_file_block_columns(std::string_view stage, size_t rows) {
599
713
        DORIS_CHECK(_data_reader.block_template.columns() == _data_reader.file_block_layout.size());
600
6.79k
        for (size_t idx = 0; idx < _data_reader.block_template.columns(); ++idx) {
601
6.08k
            const auto& file_block_column = _data_reader.file_block_layout[idx];
602
6.08k
            const auto& column_with_type = _data_reader.block_template.get_by_position(idx);
603
6.08k
            const auto* column = column_with_type.column.get();
604
6.08k
            try {
605
6.08k
                if (column == nullptr) {
606
0
                    auto st = Status::InternalError(
607
0
                            "Invalid file block column {} at {}: file_column_id={}, name='{}', "
608
0
                            "type={}, column=null, expected_rows={}, reader={}",
609
0
                            idx, stage, file_block_column.file_column_id.value(),
610
0
                            file_block_column.name,
611
0
                            file_block_column.type == nullptr ? "null"
612
0
                                                              : file_block_column.type->get_name(),
613
0
                            rows, debug_string());
614
0
                    LOG(WARNING) << st;
615
0
                    return st;
616
0
                }
617
6.08k
                column->sanity_check();
618
6.08k
                auto st = column_with_type.check_type_and_column_match();
619
6.08k
                if (!st.ok()) {
620
0
                    auto contextual_status = Status::InternalError(
621
0
                            "Invalid file block column {} at {}: file_column_id={}, name='{}', "
622
0
                            "type={}, column={}, column_size={}, expected_rows={}, error={}, "
623
0
                            "reader={}",
624
0
                            idx, stage, file_block_column.file_column_id.value(),
625
0
                            file_block_column.name,
626
0
                            file_block_column.type == nullptr ? "null"
627
0
                                                              : file_block_column.type->get_name(),
628
0
                            column->get_name(), column->size(), rows, st.to_string(),
629
0
                            debug_string());
630
0
                    LOG(WARNING) << contextual_status;
631
0
                    return contextual_status;
632
0
                }
633
6.08k
            } catch (const Exception& e) {
634
0
                auto st = Status::InternalError(
635
0
                        "Invalid file block column {} at {}: file_column_id={}, name='{}', "
636
0
                        "type={}, column={}, column_size={}, expected_rows={}, error={}, "
637
0
                        "reader={}",
638
0
                        idx, stage, file_block_column.file_column_id.value(),
639
0
                        file_block_column.name,
640
0
                        file_block_column.type == nullptr ? "null"
641
0
                                                          : file_block_column.type->get_name(),
642
0
                        column == nullptr ? "null" : column->get_name(),
643
0
                        column == nullptr ? 0 : column->size(), rows, e.to_string(),
644
0
                        debug_string());
645
0
                LOG(WARNING) << st;
646
0
                return st;
647
0
            } catch (const std::exception& e) {
648
0
                auto st = Status::InternalError(
649
0
                        "Invalid file block column {} at {}: file_column_id={}, name='{}', "
650
0
                        "type={}, column={}, column_size={}, expected_rows={}, error={}, "
651
0
                        "reader={}",
652
0
                        idx, stage, file_block_column.file_column_id.value(),
653
0
                        file_block_column.name,
654
0
                        file_block_column.type == nullptr ? "null"
655
0
                                                          : file_block_column.type->get_name(),
656
0
                        column == nullptr ? "null" : column->get_name(),
657
0
                        column == nullptr ? 0 : column->size(), rows, e.what(), debug_string());
658
0
                LOG(WARNING) << st;
659
0
                return st;
660
0
            }
661
6.08k
        }
662
713
        return Status::OK();
663
713
    }
664
665
713
    Status _check_table_block_columns(std::string_view stage, const Block* block, size_t rows) {
666
713
        DORIS_CHECK(block != nullptr);
667
713
        DORIS_CHECK(block->columns() == _data_reader.column_mapper->mappings().size());
668
6.79k
        for (size_t idx = 0; idx < block->columns(); ++idx) {
669
6.08k
            const auto& mapping = _data_reader.column_mapper->mappings()[idx];
670
6.08k
            const auto& column_with_type = block->get_by_position(idx);
671
6.08k
            const auto* column = column_with_type.column.get();
672
6.08k
            try {
673
6.08k
                if (column == nullptr) {
674
0
                    auto st = Status::InternalError(
675
0
                            "Invalid table block column {} at {}: table_column='{}', "
676
0
                            "global_index={}, type={}, column=null, expected_rows={}, mapping={}",
677
0
                            idx, stage, mapping.table_column_name, mapping.global_index.value(),
678
0
                            mapping.table_type == nullptr ? "null" : mapping.table_type->get_name(),
679
0
                            rows, mapping.debug_string());
680
0
                    LOG(WARNING) << st;
681
0
                    return st;
682
0
                }
683
6.08k
                column->sanity_check();
684
6.08k
                auto st = column_with_type.check_type_and_column_match();
685
6.08k
                if (!st.ok()) {
686
0
                    auto contextual_status = Status::InternalError(
687
0
                            "Invalid table block column {} at {}: table_column='{}', "
688
0
                            "global_index={}, type={}, column={}, column_size={}, "
689
0
                            "expected_rows={}, error={}, mapping={}",
690
0
                            idx, stage, mapping.table_column_name, mapping.global_index.value(),
691
0
                            mapping.table_type == nullptr ? "null" : mapping.table_type->get_name(),
692
0
                            column->get_name(), column->size(), rows, st.to_string(),
693
0
                            mapping.debug_string());
694
0
                    LOG(WARNING) << contextual_status;
695
0
                    return contextual_status;
696
0
                }
697
6.08k
            } catch (const Exception& e) {
698
0
                auto st = Status::InternalError(
699
0
                        "Invalid table block column {} at {}: table_column='{}', global_index={}, "
700
0
                        "type={}, column={}, column_size={}, expected_rows={}, error={}, "
701
0
                        "mapping={}",
702
0
                        idx, stage, mapping.table_column_name, mapping.global_index.value(),
703
0
                        mapping.table_type == nullptr ? "null" : mapping.table_type->get_name(),
704
0
                        column == nullptr ? "null" : column->get_name(),
705
0
                        column == nullptr ? 0 : column->size(), rows, e.to_string(),
706
0
                        mapping.debug_string());
707
0
                LOG(WARNING) << st;
708
0
                return st;
709
0
            } catch (const std::exception& e) {
710
0
                auto st = Status::InternalError(
711
0
                        "Invalid table block column {} at {}: table_column='{}', global_index={}, "
712
0
                        "type={}, column={}, column_size={}, expected_rows={}, error={}, "
713
0
                        "mapping={}",
714
0
                        idx, stage, mapping.table_column_name, mapping.global_index.value(),
715
0
                        mapping.table_type == nullptr ? "null" : mapping.table_type->get_name(),
716
0
                        column == nullptr ? "null" : column->get_name(),
717
0
                        column == nullptr ? 0 : column->size(), rows, e.what(),
718
0
                        mapping.debug_string());
719
0
                LOG(WARNING) << st;
720
0
                return st;
721
0
            }
722
6.08k
        }
723
713
        return Status::OK();
724
713
    }
725
#endif
726
727
713
    Status _truncate_char_or_varchar_columns(Block* block) {
728
713
        DORIS_CHECK(block != nullptr);
729
713
        if (_runtime_state == nullptr ||
730
713
            !_runtime_state->query_options().truncate_char_or_varchar_columns) {
731
713
            return Status::OK();
732
713
        }
733
0
        DORIS_CHECK(block->columns() == _data_reader.column_mapper->mappings().size());
734
0
        for (size_t idx = 0; idx < _data_reader.column_mapper->mappings().size(); ++idx) {
735
0
            const auto& mapping = _data_reader.column_mapper->mappings()[idx];
736
0
            if (!_should_truncate_char_or_varchar_column(mapping)) {
737
0
                continue;
738
0
            }
739
0
            const auto target_len =
740
0
                    assert_cast<const DataTypeString*>(remove_nullable(mapping.table_type).get())
741
0
                            ->len();
742
0
            _truncate_char_or_varchar_column(block, idx, target_len);
743
0
        }
744
0
        return Status::OK();
745
713
    }
746
747
    // Return true when the table schema has a bounded CHAR/VARCHAR length that is stricter than
748
    // the file-side type. Examples:
749
    // - table VARCHAR(10), file VARCHAR(20): truncate to 10;
750
    // - table VARCHAR(10), file STRING: truncate to 10 because STRING has no declared bound;
751
    // - table STRING, any file type: no truncation because the target has no bound.
752
5
    static bool _should_truncate_char_or_varchar_column(const ColumnMapping& mapping) {
753
5
        if (mapping.table_type == nullptr) {
754
0
            return false;
755
0
        }
756
5
        const auto table_type = remove_nullable(mapping.table_type);
757
5
        const auto primitive_type = table_type->get_primitive_type();
758
5
        if (primitive_type != TYPE_VARCHAR && primitive_type != TYPE_CHAR) {
759
1
            return false;
760
1
        }
761
4
        const auto target_len = assert_cast<const DataTypeString*>(table_type.get())->len();
762
4
        if (target_len <= 0) {
763
0
            return false;
764
0
        }
765
4
        if (mapping.file_type == nullptr) {
766
0
            return true;
767
0
        }
768
4
        const auto file_type = remove_nullable(mapping.file_type);
769
4
        DORIS_CHECK(file_type != nullptr);
770
4
        int file_len = -1;
771
4
        if (file_type->get_primitive_type() == TYPE_VARCHAR ||
772
4
            file_type->get_primitive_type() == TYPE_CHAR ||
773
4
            file_type->get_primitive_type() == TYPE_STRING) {
774
3
            file_len = assert_cast<const DataTypeString*>(file_type.get())->len();
775
3
        }
776
777
4
        return file_len < 0 || target_len < file_len;
778
4
    }
779
780
    // Truncate a materialized CHAR/VARCHAR column in place by reusing the vectorized substring
781
    // implementation: substring(column, 1, len). Nullable columns are unwrapped before substring
782
    // execution and wrapped back with the original null map afterward, because substring operates
783
    // on the nested string payload only.
784
1
    static void _truncate_char_or_varchar_column(Block* block, size_t idx, int len) {
785
1
        DORIS_CHECK(block != nullptr);
786
1
        auto int_type = std::make_shared<DataTypeInt32>();
787
1
        const auto num_columns_without_result = cast_set<uint32_t>(block->columns());
788
1
        auto& target = block->get_by_position(idx);
789
1
        const bool is_nullable = target.type->is_nullable();
790
1
        ColumnPtr input_column = target.column;
791
1
        ColumnPtr null_map_column;
792
1
        if (is_nullable) {
793
1
            const auto* nullable_column = assert_cast<const ColumnNullable*>(target.column.get());
794
1
            input_column = nullable_column->get_nested_column_ptr();
795
1
            null_map_column = nullable_column->get_null_map_column_ptr();
796
1
        }
797
1
        block->replace_by_position(idx, std::move(input_column));
798
1
        block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(1)),
799
1
                       int_type, "const 1"});
800
1
        block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(len)),
801
1
                       int_type, "const len"});
802
1
        block->insert({nullptr, std::make_shared<DataTypeString>(), "result"});
803
804
1
        ColumnNumbers temp_arguments(3);
805
1
        temp_arguments[0] = cast_set<uint32_t>(idx);
806
1
        temp_arguments[1] = num_columns_without_result;
807
1
        temp_arguments[2] = num_columns_without_result + 1;
808
1
        const uint32_t result_column_id = num_columns_without_result + 2;
809
1
        SubstringUtil::substring_execute(*block, temp_arguments, result_column_id, block->rows());
810
811
1
        ColumnPtr result_column = block->get_by_position(result_column_id).column;
812
1
        if (is_nullable) {
813
1
            result_column = ColumnNullable::create(std::move(result_column), null_map_column);
814
1
        }
815
1
        block->replace_by_position(idx, std::move(result_column));
816
1
        block->erase_tail(num_columns_without_result);
817
1
    }
818
819
531
    Status _try_materialize_aggregate_pushdown_rows(Block* block, bool* pushed_down) {
820
531
        DORIS_CHECK(block != nullptr);
821
531
        DORIS_CHECK(pushed_down != nullptr);
822
531
        *pushed_down = false;
823
531
        block->clear_column_data(_projected_columns.size());
824
531
        _aggregate_pushdown_tried = true;
825
531
        if (!_supports_aggregate_pushdown(_push_down_agg_type)) {
826
524
            return Status::OK();
827
524
        }
828
829
7
        FileAggregateRequest file_request;
830
7
        RETURN_IF_ERROR(_build_file_aggregate_request(_push_down_agg_type, &file_request));
831
7
        FileAggregateResult file_result;
832
7
        const auto status = _data_reader.reader->get_aggregate_result(file_request, &file_result);
833
7
        if (status.is<ErrorCode::NOT_IMPLEMENTED_ERROR>()) {
834
1
            return Status::OK();
835
1
        }
836
6
        RETURN_IF_ERROR(status);
837
6
        RETURN_IF_ERROR(
838
6
                _materialize_aggregate_pushdown_rows(_push_down_agg_type, file_result, block));
839
6
        *pushed_down = true;
840
6
        RETURN_IF_ERROR(close_current_reader());
841
6
        return Status::OK();
842
6
    }
843
844
538
    virtual bool _supports_aggregate_pushdown(TPushAggOp::type agg_type) const {
845
        // Only COUNT and MIN/MAX can be push down.
846
538
        if (agg_type != TPushAggOp::type::COUNT && agg_type != TPushAggOp::type::MINMAX) {
847
515
            return false;
848
515
        }
849
        // Only support aggregate pushdown when there is no delete, filter and column predicate, so
850
        // the reduced rows consumed by the upper aggregate remain semantically equivalent to a
851
        // normal scan.
852
23
        if (_delete_rows != nullptr && !_delete_rows->empty()) {
853
3
            return false;
854
3
        }
855
20
        if (!_table_filters.empty() || !_table_column_predicates.empty()) {
856
3
            return false;
857
3
        }
858
17
        if (agg_type == TPushAggOp::type::COUNT) {
859
5
            return true;
860
5
        }
861
        // For MIN/MAX, only support direct file-to-table column mappings. The two emitted rows
862
        // must be enough for the upper MIN/MAX aggregate without evaluating default expressions or
863
        // virtual columns.
864
14
        for (const auto& mapping : _data_reader.column_mapper->mappings()) {
865
14
            if (!mapping.file_local_id.has_value() ||
866
14
                mapping.virtual_column_type != TableVirtualColumnType::INVALID ||
867
14
                mapping.default_expr != nullptr || mapping.file_type == nullptr ||
868
14
                mapping.table_type == nullptr) {
869
1
                return false;
870
1
            }
871
13
            if (!_can_push_down_minmax_for_mapping(mapping)) {
872
1
                return false;
873
1
            }
874
13
        }
875
10
        return true;
876
12
    }
877
878
6.08k
    static ColumnPtr _detach_column(ColumnPtr column) {
879
6.08k
        DORIS_CHECK(column.get() != nullptr);
880
6.08k
        return IColumn::mutate(std::move(column));
881
6.08k
    }
882
883
58
    static Status _align_column_nullability(ColumnPtr* column, const DataTypePtr& table_type) {
884
58
        DORIS_CHECK(column != nullptr);
885
58
        DORIS_CHECK(column->get() != nullptr);
886
58
        DORIS_CHECK(table_type != nullptr);
887
        // Must return non-const column
888
58
        *column = (*column)->convert_to_full_column_if_const();
889
58
        if (table_type->is_nullable()) {
890
23
            const auto& nested_type =
891
23
                    assert_cast<const DataTypeNullable&>(*table_type).get_nested_type();
892
23
            if (!(*column)->is_nullable()) {
893
2
                RETURN_IF_ERROR(_align_column_nullability(column, nested_type));
894
2
                *column = make_nullable(*column);
895
2
                return Status::OK();
896
2
            }
897
21
            const auto& nullable_column = assert_cast<const ColumnNullable&>(**column);
898
21
            ColumnPtr nested_column = nullable_column.get_nested_column_ptr();
899
21
            RETURN_IF_ERROR(_align_column_nullability(&nested_column, nested_type));
900
21
            *column = ColumnNullable::create(nested_column,
901
21
                                             nullable_column.get_null_map_column_ptr());
902
21
            return Status::OK();
903
21
        }
904
35
        if ((*column)->is_nullable()) {
905
0
            const auto& nullable_column = assert_cast<const ColumnNullable&>(**column);
906
0
            if (nullable_column.has_null()) {
907
0
                return Status::InternalError(
908
0
                        "Default expression produced NULL for non-nullable table column");
909
0
            }
910
0
            ColumnPtr nested_column = nullable_column.get_nested_column_ptr();
911
0
            RETURN_IF_ERROR(_align_column_nullability(&nested_column, table_type));
912
0
            *column = nested_column;
913
0
            return Status::OK();
914
0
        }
915
35
        if (const auto* array_type = typeid_cast<const DataTypeArray*>(table_type.get())) {
916
1
            const auto& array_column = assert_cast<const ColumnArray&>(**column);
917
1
            ColumnPtr nested_column = array_column.get_data_ptr();
918
1
            RETURN_IF_ERROR(
919
1
                    _align_column_nullability(&nested_column, array_type->get_nested_type()));
920
1
            *column = ColumnArray::create(nested_column, array_column.get_offsets_ptr());
921
1
            return Status::OK();
922
1
        }
923
34
        if (const auto* map_type = typeid_cast<const DataTypeMap*>(table_type.get())) {
924
0
            const auto& map_column = assert_cast<const ColumnMap&>(**column);
925
0
            ColumnPtr key_column = map_column.get_keys_ptr();
926
0
            ColumnPtr value_column = map_column.get_values_ptr();
927
0
            RETURN_IF_ERROR(_align_column_nullability(&key_column, map_type->get_key_type()));
928
0
            RETURN_IF_ERROR(_align_column_nullability(&value_column, map_type->get_value_type()));
929
0
            *column = ColumnMap::create(key_column, value_column, map_column.get_offsets_ptr());
930
0
            return Status::OK();
931
0
        }
932
34
        if (const auto* struct_type = typeid_cast<const DataTypeStruct*>(table_type.get())) {
933
5
            const auto& struct_column = assert_cast<const ColumnStruct&>(**column);
934
5
            Columns columns = struct_column.get_columns_copy();
935
5
            DORIS_CHECK(columns.size() == struct_type->get_elements().size());
936
16
            for (size_t i = 0; i < columns.size(); ++i) {
937
11
                RETURN_IF_ERROR(
938
11
                        _align_column_nullability(&columns[i], struct_type->get_element(i)));
939
11
            }
940
5
            *column = ColumnStruct::create(columns);
941
5
            return Status::OK();
942
5
        }
943
29
        return Status::OK();
944
34
    }
945
946
    static Status _execute_default_expr_without_root_type_check(
947
            const VExprContextSPtr& default_expr, const Block* block,
948
5
            ColumnWithTypeAndName* result_data) {
949
5
        DORIS_CHECK(default_expr != nullptr);
950
5
        DORIS_CHECK(block != nullptr);
951
5
        DORIS_CHECK(result_data != nullptr);
952
5
        ColumnPtr result_column;
953
5
        Status st;
954
5
        RETURN_IF_CATCH_EXCEPTION({
955
5
            st = default_expr->root()->execute_column_impl(default_expr.get(), block, nullptr,
956
5
                                                           block->rows(), result_column);
957
5
        });
958
5
        RETURN_IF_ERROR(st);
959
5
        DORIS_CHECK(result_column.get() != nullptr);
960
5
        if (result_column->size() != block->rows()) {
961
0
            return Status::InternalError(
962
0
                    "Default expr {} return column size {} not equal to expected size {}",
963
0
                    default_expr->expr_name(), result_column->size(), block->rows());
964
0
        }
965
5
        result_data->column = result_column;
966
5
        result_data->type = default_expr->execute_type(block);
967
5
        result_data->name = default_expr->expr_name();
968
5
        return Status::OK();
969
5
    }
970
971
    Status _cast_column_to_type(ColumnPtr* column, const DataTypePtr& file_type,
972
                                const DataTypePtr& table_type,
973
0
                                const std::string& column_name) const {
974
0
        DORIS_CHECK(column != nullptr);
975
0
        DORIS_CHECK(column->get() != nullptr);
976
0
        DORIS_CHECK(file_type != nullptr);
977
0
        DORIS_CHECK(table_type != nullptr);
978
0
        if (file_type->equals(*table_type)) {
979
0
            return Status::OK();
980
0
        }
981
982
0
        DataTypePtr input_type = file_type;
983
0
        if ((*column)->is_nullable() && !input_type->is_nullable()) {
984
0
            input_type = make_nullable(input_type);
985
0
        }
986
0
        Block cast_block;
987
0
        cast_block.insert({*column, input_type, column_name});
988
0
        auto slot_ref = VSlotRef::create_shared(0, 0, -1, input_type, column_name);
989
0
        auto cast_expr = Cast::create_shared(table_type);
990
0
        cast_expr->add_child(std::move(slot_ref));
991
0
        auto cast_ctx = VExprContext::create_shared(std::move(cast_expr));
992
0
        RowDescriptor row_desc;
993
0
        RETURN_IF_ERROR(cast_ctx->prepare(_runtime_state, row_desc));
994
0
        RETURN_IF_ERROR(cast_ctx->open(_runtime_state));
995
0
        ColumnPtr cast_column;
996
0
        RETURN_IF_ERROR(cast_ctx->execute(&cast_block, cast_column));
997
0
        *column = std::move(cast_column);
998
0
        return Status::OK();
999
0
    }
1000
1001
    Status _materialize_present_child_mapping_column(const ColumnMapping& mapping,
1002
                                                     const ColumnPtr& file_column,
1003
18
                                                     const size_t rows, ColumnPtr* column) {
1004
18
        DORIS_CHECK(column != nullptr);
1005
18
        DORIS_CHECK(mapping.file_type != nullptr);
1006
18
        DORIS_CHECK(mapping.table_type != nullptr);
1007
18
        *column = file_column;
1008
18
        if (!mapping.is_trivial) {
1009
5
            if (!mapping.child_mappings.empty()) {
1010
5
                RETURN_IF_ERROR(
1011
5
                        _materialize_complex_mapping_column(mapping, *column, rows, column));
1012
5
            } else {
1013
0
                RETURN_IF_ERROR(_cast_column_to_type(column, mapping.file_type, mapping.table_type,
1014
0
                                                     mapping.file_column_name));
1015
0
            }
1016
5
        }
1017
18
        RETURN_IF_ERROR(_align_column_nullability(column, mapping.table_type));
1018
18
        return Status::OK();
1019
18
    }
1020
1021
    Status _materialize_mapping_column(const ColumnMapping& mapping, Block* current_block,
1022
6.08k
                                       const size_t rows, ColumnPtr* column) {
1023
6.08k
        if (!mapping.is_trivial && mapping.file_local_id.has_value() &&
1024
6.08k
            !mapping.child_mappings.empty()) {
1025
5
            DCHECK(mapping.projection != nullptr);
1026
5
            int res_id;
1027
5
            auto st = mapping.projection->execute(current_block, &res_id);
1028
5
            if (!st.ok()) {
1029
0
                return Status::InternalError(
1030
0
                        "Failed to execute complex mapping projection for table column '{}' "
1031
0
                        "(global_index={}, file_local_id={}, rows={}): {}, mapping={}",
1032
0
                        mapping.table_column_name, mapping.global_index.value(),
1033
0
                        *mapping.file_local_id, rows, st.to_string(), mapping.debug_string());
1034
0
            }
1035
5
            ColumnPtr result_column = current_block->get_by_position(res_id).column;
1036
5
            RETURN_IF_ERROR(
1037
5
                    _materialize_complex_mapping_column(mapping, result_column, rows, column));
1038
5
            return Status::OK();
1039
5
        }
1040
6.08k
        if (mapping.projection != nullptr) {
1041
6.06k
            int res_id;
1042
6.06k
            auto st = mapping.projection->execute(current_block, &res_id);
1043
6.06k
            if (!st.ok()) {
1044
0
                std::string file_local_id = "null";
1045
0
                if (mapping.file_local_id.has_value()) {
1046
0
                    file_local_id = std::to_string(*mapping.file_local_id);
1047
0
                }
1048
0
                return Status::InternalError(
1049
0
                        "Failed to execute mapping projection for table column '{}' "
1050
0
                        "(global_index={}, file_local_id={}, rows={}): {}, mapping={}",
1051
0
                        mapping.table_column_name, mapping.global_index.value(), file_local_id,
1052
0
                        rows, st.to_string(), mapping.debug_string());
1053
0
            }
1054
6.06k
            ColumnPtr result_column = current_block->get_by_position(res_id).column;
1055
6.06k
            *column = _detach_column(std::move(result_column));
1056
6.06k
            return Status::OK();
1057
6.06k
        }
1058
18
        if (mapping.default_expr != nullptr) {
1059
5
            if (current_block->rows() == rows) {
1060
0
                ColumnWithTypeAndName result;
1061
0
                RETURN_IF_ERROR(_execute_default_expr_without_root_type_check(
1062
0
                        mapping.default_expr, current_block, &result));
1063
0
                ColumnPtr result_column = result.column;
1064
0
                RETURN_IF_ERROR(_align_column_nullability(&result_column, mapping.table_type));
1065
0
                *column = _detach_column(std::move(result_column));
1066
5
            } else {
1067
5
                DORIS_CHECK(mapping.constant_index.has_value());
1068
5
                Block eval_block;
1069
5
                eval_block.insert({mapping.table_type->create_column_const_with_default_value(rows),
1070
5
                                   mapping.table_type, "__table_reader_const_rows"});
1071
5
                ColumnWithTypeAndName result;
1072
5
                RETURN_IF_ERROR(_execute_default_expr_without_root_type_check(
1073
5
                        mapping.default_expr, &eval_block, &result));
1074
5
                ColumnPtr result_column = result.column;
1075
5
                RETURN_IF_ERROR(_align_column_nullability(&result_column, mapping.table_type));
1076
5
                *column = _detach_column(std::move(result_column));
1077
5
            }
1078
5
            return Status::OK();
1079
5
        }
1080
13
        ColumnPtr result_column = mapping.table_type->create_column_const_with_default_value(rows);
1081
13
        *column = _detach_column(std::move(result_column));
1082
13
        return Status::OK();
1083
18
    }
1084
1085
    Status _materialize_complex_mapping_column(const ColumnMapping& mapping,
1086
                                               const ColumnPtr& file_column, const size_t rows,
1087
10
                                               ColumnPtr* column) {
1088
10
        DORIS_CHECK(mapping.table_type != nullptr);
1089
10
        DORIS_CHECK(file_column.get() != nullptr);
1090
10
        const auto table_type = remove_nullable(mapping.table_type);
1091
10
        switch (table_type->get_primitive_type()) {
1092
7
        case TYPE_STRUCT:
1093
7
            RETURN_IF_ERROR(_materialize_struct_mapping_column(mapping, file_column, rows, column));
1094
7
            break;
1095
7
        case TYPE_ARRAY:
1096
2
            RETURN_IF_ERROR(_materialize_array_mapping_column(mapping, file_column, rows, column));
1097
2
            break;
1098
2
        case TYPE_MAP:
1099
1
            RETURN_IF_ERROR(_materialize_map_mapping_column(mapping, file_column, rows, column));
1100
1
            break;
1101
1
        default:
1102
0
            *column = _detach_column(file_column);
1103
0
            break;
1104
10
        }
1105
10
        return Status::OK();
1106
10
    }
1107
1108
    static std::vector<const ColumnMapping*> _present_child_mappings_in_file_order(
1109
7
            const std::vector<ColumnMapping>& child_mappings) {
1110
7
        std::vector<const ColumnMapping*> result;
1111
7
        result.reserve(child_mappings.size());
1112
16
        for (const auto& child_mapping : child_mappings) {
1113
16
            if (child_mapping.file_local_id.has_value()) {
1114
11
                result.push_back(&child_mapping);
1115
11
            }
1116
16
        }
1117
7
        std::ranges::sort(result, [](const ColumnMapping* lhs, const ColumnMapping* rhs) {
1118
6
            DORIS_CHECK(lhs->file_local_id.has_value());
1119
6
            DORIS_CHECK(rhs->file_local_id.has_value());
1120
6
            return *lhs->file_local_id < *rhs->file_local_id;
1121
6
        });
1122
7
        return result;
1123
7
    }
1124
1125
    static size_t _file_child_ordinal_for_mapping(
1126
            const ColumnMapping& mapping, const ColumnMapping& child_mapping,
1127
11
            const std::vector<const ColumnMapping*>& file_ordered_children) {
1128
11
        DORIS_CHECK(child_mapping.file_local_id.has_value());
1129
11
        if (!mapping.projected_file_children.empty()) {
1130
7
            const auto child_it = std::ranges::find_if(
1131
10
                    mapping.projected_file_children, [&](const ColumnDefinition& file_child) {
1132
10
                        return file_child.file_local_id() == *child_mapping.file_local_id;
1133
10
                    });
1134
7
            DORIS_CHECK(child_it != mapping.projected_file_children.end());
1135
7
            return static_cast<size_t>(
1136
7
                    std::distance(mapping.projected_file_children.begin(), child_it));
1137
7
        }
1138
4
        const auto child_it = std::ranges::find(file_ordered_children, &child_mapping);
1139
4
        DORIS_CHECK(child_it != file_ordered_children.end());
1140
4
        return static_cast<size_t>(std::distance(file_ordered_children.begin(), child_it));
1141
11
    }
1142
1143
    static std::vector<const ColumnMapping*> _child_mappings_in_table_type_order(
1144
7
            const ColumnMapping& mapping, const DataTypeStruct& table_type) {
1145
7
        std::vector<const ColumnMapping*> result;
1146
7
        result.reserve(mapping.child_mappings.size());
1147
23
        for (size_t child_idx = 0; child_idx < table_type.get_elements().size(); ++child_idx) {
1148
16
            const auto& child_name = table_type.get_element_name(child_idx);
1149
16
            const auto child_it = std::ranges::find_if(
1150
28
                    mapping.child_mappings, [&](const ColumnMapping& child_mapping) {
1151
28
                        return child_mapping.table_column_name == child_name;
1152
28
                    });
1153
16
            DORIS_CHECK(child_it != mapping.child_mappings.end())
1154
0
                    << mapping.debug_string() << ", table_child_name=" << child_name;
1155
16
            result.push_back(&*child_it);
1156
16
        }
1157
7
        return result;
1158
7
    }
1159
1160
    static const IColumn* _nested_column_if_nullable(const ColumnPtr& column,
1161
12
                                                     const NullMap** null_map) {
1162
12
        DORIS_CHECK(column.get() != nullptr);
1163
12
        if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) {
1164
8
            if (null_map != nullptr) {
1165
8
                *null_map = &nullable_column->get_null_map_data();
1166
8
            }
1167
8
            return &nullable_column->get_nested_column();
1168
8
        }
1169
4
        return column.get();
1170
12
    }
1171
1172
    Status _materialize_struct_mapping_column(const ColumnMapping& mapping,
1173
                                              const ColumnPtr& file_column, const size_t rows,
1174
7
                                              ColumnPtr* column) {
1175
7
        DORIS_CHECK(mapping.table_type != nullptr);
1176
7
        const auto* table_type =
1177
7
                assert_cast<const DataTypeStruct*>(remove_nullable(mapping.table_type).get());
1178
7
        const auto full_file_column = file_column->convert_to_full_column_if_const();
1179
7
        const NullMap* parent_null_map = nullptr;
1180
7
        const auto* nested_file_column =
1181
7
                _nested_column_if_nullable(full_file_column, &parent_null_map);
1182
7
        const auto* file_struct = assert_cast<const ColumnStruct*>(nested_file_column);
1183
7
        DORIS_CHECK(table_type->get_elements().size() == mapping.child_mappings.size());
1184
1185
7
        Columns child_columns;
1186
7
        child_columns.reserve(mapping.child_mappings.size());
1187
7
        const auto file_ordered_children =
1188
7
                _present_child_mappings_in_file_order(mapping.child_mappings);
1189
7
        const auto table_ordered_children =
1190
7
                _child_mappings_in_table_type_order(mapping, *table_type);
1191
16
        for (const auto* child_mapping : table_ordered_children) {
1192
16
            DORIS_CHECK(child_mapping != nullptr);
1193
16
            if (!child_mapping->file_local_id.has_value()) {
1194
5
                child_columns.push_back(
1195
5
                        child_mapping->table_type->create_column_const_with_default_value(rows)
1196
5
                                ->convert_to_full_column_if_const());
1197
5
                continue;
1198
5
            }
1199
11
            const auto file_child_idx =
1200
11
                    _file_child_ordinal_for_mapping(mapping, *child_mapping, file_ordered_children);
1201
11
            DORIS_CHECK(file_child_idx < file_struct->get_columns().size());
1202
11
            ColumnPtr child_column = file_struct->get_column_ptr(file_child_idx);
1203
11
            RETURN_IF_ERROR(_materialize_present_child_mapping_column(*child_mapping, child_column,
1204
11
                                                                      rows, &child_column));
1205
11
            child_columns.push_back(std::move(child_column));
1206
11
        }
1207
7
        MutableColumns mutable_child_columns;
1208
7
        mutable_child_columns.reserve(child_columns.size());
1209
16
        for (auto& child_column : child_columns) {
1210
16
            mutable_child_columns.push_back(IColumn::mutate(std::move(child_column)));
1211
16
        }
1212
7
        auto result = ColumnStruct::create(std::move(mutable_child_columns));
1213
7
        if (mapping.table_type->is_nullable()) {
1214
5
            auto null_map = ColumnUInt8::create();
1215
5
            auto& null_map_data = null_map->get_data();
1216
5
            null_map_data.resize(rows);
1217
5
            if (parent_null_map != nullptr) {
1218
5
                DORIS_CHECK(parent_null_map->size() == rows);
1219
5
                null_map_data.assign(parent_null_map->begin(), parent_null_map->end());
1220
5
            } else {
1221
0
                std::fill(null_map_data.begin(), null_map_data.end(), 0);
1222
0
            }
1223
5
            *column = ColumnNullable::create(std::move(result), std::move(null_map));
1224
5
        } else {
1225
2
            *column = std::move(result);
1226
2
        }
1227
7
        return Status::OK();
1228
7
    }
1229
1230
    Status _materialize_array_mapping_column(const ColumnMapping& mapping,
1231
                                             const ColumnPtr& file_column, const size_t rows,
1232
2
                                             ColumnPtr* column) {
1233
2
        DORIS_CHECK(mapping.child_mappings.size() == 1);
1234
2
        const auto full_file_column = file_column->convert_to_full_column_if_const();
1235
2
        const NullMap* parent_null_map = nullptr;
1236
2
        const auto* nested_file_column =
1237
2
                _nested_column_if_nullable(full_file_column, &parent_null_map);
1238
2
        const auto* file_array = assert_cast<const ColumnArray*>(nested_file_column);
1239
2
        ColumnPtr nested_column = file_array->get_data_ptr();
1240
2
        const auto& element_mapping = mapping.child_mappings[0];
1241
2
        RETURN_IF_ERROR(_materialize_present_child_mapping_column(
1242
2
                element_mapping, nested_column, nested_column->size(), &nested_column));
1243
2
        auto offsets_column = file_array->get_offsets_ptr()->convert_to_full_column_if_const();
1244
2
        auto result = ColumnArray::create(IColumn::mutate(std::move(nested_column)),
1245
2
                                          IColumn::mutate(std::move(offsets_column)));
1246
2
        if (mapping.table_type->is_nullable()) {
1247
2
            auto null_map = ColumnUInt8::create();
1248
2
            auto& null_map_data = null_map->get_data();
1249
2
            null_map_data.resize(rows);
1250
2
            if (parent_null_map != nullptr) {
1251
2
                DORIS_CHECK(parent_null_map->size() == rows);
1252
2
                null_map_data.assign(parent_null_map->begin(), parent_null_map->end());
1253
2
            } else {
1254
0
                std::fill(null_map_data.begin(), null_map_data.end(), 0);
1255
0
            }
1256
2
            *column = ColumnNullable::create(std::move(result), std::move(null_map));
1257
2
        } else {
1258
0
            *column = std::move(result);
1259
0
        }
1260
2
        return Status::OK();
1261
2
    }
1262
1263
    Status _materialize_map_mapping_column(const ColumnMapping& mapping,
1264
                                           const ColumnPtr& file_column, const size_t rows,
1265
3
                                           ColumnPtr* column) {
1266
3
        const auto full_file_column = file_column->convert_to_full_column_if_const();
1267
3
        const NullMap* parent_null_map = nullptr;
1268
3
        const auto* nested_file_column =
1269
3
                _nested_column_if_nullable(full_file_column, &parent_null_map);
1270
3
        const auto* file_map = assert_cast<const ColumnMap*>(nested_file_column);
1271
3
        ColumnPtr key_column = file_map->get_keys_ptr();
1272
3
        ColumnPtr value_column = file_map->get_values_ptr();
1273
1274
3
        const ColumnMapping* key_mapping = nullptr;
1275
3
        const ColumnMapping* value_mapping = nullptr;
1276
5
        for (const auto& child_mapping : mapping.child_mappings) {
1277
5
            if (!child_mapping.file_local_id.has_value()) {
1278
0
                continue;
1279
0
            }
1280
5
            if (*child_mapping.file_local_id == 0) {
1281
2
                key_mapping = &child_mapping;
1282
3
            } else if (*child_mapping.file_local_id == 1) {
1283
3
                value_mapping = &child_mapping;
1284
3
            }
1285
5
        }
1286
1287
3
        if (key_mapping != nullptr) {
1288
2
            RETURN_IF_ERROR(_materialize_present_child_mapping_column(
1289
2
                    *key_mapping, key_column, key_column->size(), &key_column));
1290
2
        }
1291
3
        if (value_mapping != nullptr) {
1292
3
            RETURN_IF_ERROR(_materialize_present_child_mapping_column(
1293
3
                    *value_mapping, value_column, value_column->size(), &value_column));
1294
3
        }
1295
3
        auto offsets_column = file_map->get_offsets_ptr()->convert_to_full_column_if_const();
1296
3
        auto result = ColumnMap::create(IColumn::mutate(std::move(key_column)),
1297
3
                                        IColumn::mutate(std::move(value_column)),
1298
3
                                        IColumn::mutate(std::move(offsets_column)));
1299
3
        if (mapping.table_type->is_nullable()) {
1300
1
            auto null_map = ColumnUInt8::create();
1301
1
            auto& null_map_data = null_map->get_data();
1302
1
            null_map_data.resize(rows);
1303
1
            if (parent_null_map != nullptr) {
1304
1
                DORIS_CHECK(parent_null_map->size() == rows);
1305
1
                null_map_data.assign(parent_null_map->begin(), parent_null_map->end());
1306
1
            } else {
1307
0
                std::fill(null_map_data.begin(), null_map_data.end(), 0);
1308
0
            }
1309
1
            *column = ColumnNullable::create(std::move(result), std::move(null_map));
1310
2
        } else {
1311
2
            *column = std::move(result);
1312
2
        }
1313
3
        return Status::OK();
1314
3
    }
1315
1316
530
    Status _open_mapping_exprs() {
1317
530
        RowDescriptor row_desc;
1318
3.16k
        for (const auto& mapping : _data_reader.column_mapper->mappings()) {
1319
3.16k
            if (mapping.projection != nullptr) {
1320
3.14k
                RETURN_IF_ERROR(mapping.projection->prepare(_runtime_state, row_desc));
1321
3.14k
                RETURN_IF_ERROR(mapping.projection->open(_runtime_state));
1322
3.14k
            }
1323
3.16k
            if (mapping.default_expr != nullptr) {
1324
5
                RETURN_IF_ERROR(mapping.default_expr->prepare(_runtime_state, row_desc));
1325
5
                RETURN_IF_ERROR(mapping.default_expr->open(_runtime_state));
1326
5
            }
1327
3.16k
        }
1328
530
        return Status::OK();
1329
530
    }
1330
1331
    Status _build_file_aggregate_request(TPushAggOp::type agg_type,
1332
7
                                         FileAggregateRequest* request) const {
1333
7
        DORIS_CHECK(request != nullptr);
1334
7
        DORIS_CHECK(_supports_aggregate_pushdown(agg_type));
1335
7
        request->agg_type = agg_type;
1336
7
        request->columns.clear();
1337
7
        if (agg_type == TPushAggOp::type::COUNT) {
1338
            // COUNT pushdown historically meant COUNT(*) and therefore carried no columns. For
1339
            // complex COUNT(col), materializing the full MAP/LIST/STRUCT value only to test the
1340
            // top-level NULL bit can be extremely expensive. When the scan projects exactly one
1341
            // directly-mapped complex column, pass that file column to the reader so formats such
1342
            // as Parquet can count the column shape from metadata/levels without decoding payload
1343
            // values like MAP value strings. Other COUNT cases stay on the existing row-count path
1344
            // to avoid changing count(*) semantics.
1345
2
            if (_data_reader.column_mapper->mappings().size() == 1) {
1346
2
                const auto& mapping = _data_reader.column_mapper->mappings()[0];
1347
2
                if (mapping.file_local_id.has_value() && mapping.file_type != nullptr &&
1348
2
                    is_complex_type(remove_nullable(mapping.file_type)->get_primitive_type()) &&
1349
2
                    mapping.virtual_column_type == TableVirtualColumnType::INVALID &&
1350
2
                    mapping.default_expr == nullptr) {
1351
0
                    FileAggregateRequest::Column column;
1352
0
                    column.projection =
1353
0
                            LocalColumnIndex::top_level(LocalColumnId(*mapping.file_local_id));
1354
0
                    request->columns.push_back(std::move(column));
1355
0
                }
1356
2
            }
1357
2
            return Status::OK();
1358
2
        }
1359
5
        request->columns.reserve(_data_reader.column_mapper->mappings().size());
1360
6
        for (const auto& mapping : _data_reader.column_mapper->mappings()) {
1361
6
            DORIS_CHECK(mapping.file_local_id.has_value());
1362
6
            FileAggregateRequest::Column column;
1363
6
            column.projection = LocalColumnIndex::top_level(LocalColumnId(*mapping.file_local_id));
1364
6
            if (!mapping.child_mappings.empty()) {
1365
1
                RETURN_IF_ERROR(build_aggregate_projection(mapping, &column.projection));
1366
1
            }
1367
6
            request->columns.push_back(std::move(column));
1368
6
        }
1369
5
        return Status::OK();
1370
5
    }
1371
1372
    Status _materialize_aggregate_pushdown_rows(TPushAggOp::type agg_type,
1373
                                                const FileAggregateResult& file_result,
1374
6
                                                Block* block) {
1375
6
        if (agg_type == TPushAggOp::type::COUNT) {
1376
            // COUNT pushdown is not a final count value. It emits `count` default rows so the
1377
            // upper COUNT(*) aggregate can count them and produce the final result, including
1378
            // zero rows when count is 0.
1379
2
            DORIS_CHECK(file_result.count >= 0);
1380
2
            return _materialize_count_rows(cast_set<size_t>(file_result.count), block);
1381
2
        }
1382
        // MIN/MAX pushdown emits two rows, min first and max second, for each projected column.
1383
        // The upper MIN/MAX aggregate consumes those two rows to produce the final aggregate value.
1384
4
        DORIS_CHECK(file_result.columns.size() == _data_reader.column_mapper->mappings().size());
1385
4
        DORIS_CHECK(block->columns() == _data_reader.column_mapper->mappings().size());
1386
4
        Block file_block;
1387
4
        file_block.reserve(_data_reader.file_block_layout.size());
1388
5
        for (const auto& column : _data_reader.file_block_layout) {
1389
5
            file_block.insert({column.type->create_column(), column.type, column.name});
1390
5
        }
1391
9
        for (size_t column_idx = 0; column_idx < file_result.columns.size(); ++column_idx) {
1392
5
            const auto& result_column = file_result.columns[column_idx];
1393
5
            if (!result_column.has_min || !result_column.has_max) {
1394
0
                return Status::NotSupported("Missing min/max aggregate result for column {}",
1395
0
                                            _projected_columns[column_idx].name);
1396
0
            }
1397
5
            bool found_file_column = false;
1398
6
            for (size_t block_position = 0; block_position < _data_reader.file_block_layout.size();
1399
6
                 ++block_position) {
1400
6
                if (_data_reader.file_block_layout[block_position].file_column_id ==
1401
6
                    file_result.columns[column_idx].projection.column_id()) {
1402
5
                    found_file_column = true;
1403
5
                    auto column = file_block.get_by_position(block_position)
1404
5
                                          .type->create_column()
1405
5
                                          ->assert_mutable();
1406
5
                    RETURN_IF_ERROR(_insert_aggregate_projection_value(
1407
5
                            file_result.columns[column_idx].projection, result_column.min_value,
1408
5
                            column.get()));
1409
5
                    RETURN_IF_ERROR(_insert_aggregate_projection_value(
1410
5
                            file_result.columns[column_idx].projection, result_column.max_value,
1411
5
                            column.get()));
1412
5
                    file_block.replace_by_position(block_position, std::move(column));
1413
5
                    break;
1414
5
                }
1415
6
            }
1416
5
            DORIS_CHECK(found_file_column);
1417
5
        }
1418
9
        for (size_t column_idx = 0; column_idx < _data_reader.column_mapper->mappings().size();
1419
5
             ++column_idx) {
1420
5
            ColumnPtr table_column;
1421
5
            RETURN_IF_ERROR(
1422
5
                    _materialize_mapping_column(_data_reader.column_mapper->mappings()[column_idx],
1423
5
                                                &file_block, 2, &table_column));
1424
5
            block->replace_by_position(column_idx, std::move(table_column));
1425
5
        }
1426
4
        return Status::OK();
1427
4
    }
1428
1429
    struct FileBlockColumn {
1430
        LocalColumnId file_column_id = LocalColumnId::invalid();
1431
        std::string name;
1432
        DataTypePtr type;
1433
    };
1434
1435
    struct DataReader {
1436
        std::unique_ptr<FileReader> reader;
1437
        std::unique_ptr<TableColumnMapper> column_mapper;
1438
        // Schema of the data file, also including virtual column (row position).
1439
        std::vector<ColumnDefinition> file_schema;
1440
        // Layout of the block returned by file reader, determined by column mapping and file
1441
        // schema. It is used for file reader to materialize columns into correct type and position.
1442
        std::vector<FileBlockColumn> file_block_layout;
1443
        Block block_template;
1444
    };
1445
    DataReader _data_reader;
1446
    std::vector<ColumnDefinition> _projected_columns;
1447
    std::unique_ptr<ScanTask> _current_task;
1448
    std::optional<io::FileDescription> _current_file_description;
1449
    // Range-level compression has higher priority than scan-param compression. TVF/load can keep
1450
    // the logical format as CSV/TEXT while carrying the concrete compression such as GZ or LZO on
1451
    // each TFileRangeDesc, matching the old FileScanner reader contract.
1452
    TFileCompressType::type _current_range_compress_type = TFileCompressType::UNKNOWN;
1453
    std::optional<TUniqueId> _current_range_load_id;
1454
    TFileRangeDesc _current_file_range_desc;
1455
    std::shared_ptr<io::FileSystemProperties> _system_properties;
1456
    // partition key -> value
1457
    std::map<std::string, Field> _partition_values;
1458
    // Predicates built from scan conjuncts before file-level localization.
1459
    std::vector<TableFilter> _table_filters;
1460
    TableColumnPredicates _table_column_predicates;
1461
    VExprContextSPtrs _conjuncts;
1462
    ReadProfile _profile;
1463
    // Parsed from row-position based delete files, including position delete and deletion vector.
1464
    DeleteRows* _delete_rows = nullptr;
1465
    TFileScanRangeParams* _scan_params;
1466
    std::shared_ptr<io::IOContext> _io_ctx;
1467
    RuntimeState* _runtime_state;
1468
    RuntimeProfile* _scanner_profile;
1469
    const std::vector<SlotDescriptor*>* _file_slot_descs = nullptr;
1470
    FileFormat _format;
1471
    TPushAggOp::type _push_down_agg_type = TPushAggOp::type::NONE;
1472
    size_t _batch_size = 0;
1473
    uint64_t _condition_cache_digest = 0;
1474
    segment_v2::ConditionCache::ExternalCacheKey _condition_cache_key;
1475
    std::shared_ptr<std::vector<bool>> _condition_cache;
1476
    std::shared_ptr<ConditionCacheContext> _condition_cache_ctx;
1477
    int64_t _condition_cache_hit_count = 0;
1478
    bool _current_reader_reached_eof = false;
1479
    int64_t _remaining_table_level_count = -1;
1480
    std::optional<GlobalRowIdContext> _global_rowid_context;
1481
    bool _aggregate_pushdown_tried = false;
1482
    TableColumnMapperOptions _mapper_options;
1483
1484
private:
1485
    static const ColumnDefinition* _find_column_definition(
1486
3.18k
            const std::vector<ColumnDefinition>& schema, LocalColumnId column_id) {
1487
23.0k
        for (const auto& field : schema) {
1488
23.0k
            if (field.file_local_id() == column_id.value()) {
1489
3.16k
                return &field;
1490
3.16k
            }
1491
23.0k
        }
1492
17
        return nullptr;
1493
3.18k
    }
1494
1495
15
    static bool _can_push_down_minmax_for_mapping(const ColumnMapping& mapping) {
1496
15
        if (mapping.child_mappings.empty()) {
1497
12
            return true;
1498
12
        }
1499
3
        const auto primitive_type = remove_nullable(mapping.file_type)->get_primitive_type();
1500
3
        if (primitive_type != TYPE_STRUCT) {
1501
1
            return false;
1502
1
        }
1503
2
        size_t mapped_children = 0;
1504
2
        const ColumnMapping* mapped_child = nullptr;
1505
2
        for (const auto& child_mapping : mapping.child_mappings) {
1506
2
            if (!child_mapping.file_local_id.has_value()) {
1507
0
                continue;
1508
0
            }
1509
2
            ++mapped_children;
1510
2
            mapped_child = &child_mapping;
1511
2
        }
1512
2
        return mapped_children == 1 && mapped_child != nullptr &&
1513
2
               _can_push_down_minmax_for_mapping(*mapped_child);
1514
3
    }
1515
1516
    static Status build_aggregate_projection(const ColumnMapping& mapping,
1517
2
                                             LocalColumnIndex* projection) {
1518
2
        DORIS_CHECK(projection != nullptr);
1519
2
        DORIS_CHECK(mapping.file_local_id.has_value());
1520
2
        *projection = LocalColumnIndex::local(*mapping.file_local_id);
1521
2
        projection->children.clear();
1522
2
        projection->project_all_children = true;
1523
2
        if (mapping.child_mappings.empty()) {
1524
1
            return Status::OK();
1525
1
        }
1526
1
        projection->project_all_children = false;
1527
1
        for (const auto& child_mapping : mapping.child_mappings) {
1528
1
            if (!child_mapping.file_local_id.has_value()) {
1529
0
                continue;
1530
0
            }
1531
1
            LocalColumnIndex child_projection;
1532
1
            RETURN_IF_ERROR(build_aggregate_projection(child_mapping, &child_projection));
1533
1
            projection->children.push_back(std::move(child_projection));
1534
1
        }
1535
1
        DORIS_CHECK(projection->children.size() == 1);
1536
1
        return Status::OK();
1537
1
    }
1538
1539
    static Status _insert_aggregate_projection_value(const LocalColumnIndex& projection,
1540
24
                                                     const Field& value, IColumn* column) {
1541
24
        DORIS_CHECK(column != nullptr);
1542
24
        if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) {
1543
12
            RETURN_IF_ERROR(_insert_aggregate_projection_value(
1544
12
                    projection, value, &nullable_column->get_nested_column()));
1545
12
            nullable_column->get_null_map_data().push_back(0);
1546
12
            return Status::OK();
1547
12
        }
1548
12
        if (projection.project_all_children || projection.children.empty()) {
1549
10
            column->insert(value);
1550
10
            return Status::OK();
1551
10
        }
1552
2
        auto* struct_column = assert_cast<ColumnStruct*>(column);
1553
2
        DORIS_CHECK(projection.children.size() == 1);
1554
2
        const auto& child_projection = projection.children[0];
1555
2
        DORIS_CHECK(struct_column->get_columns().size() == 1);
1556
2
        RETURN_IF_ERROR(_insert_aggregate_projection_value(child_projection, value,
1557
2
                                                           &struct_column->get_column(0)));
1558
2
        return Status::OK();
1559
2
    }
1560
1561
    // Parse row-position deletes from table format specific parameters, and fill in _delete_rows.
1562
    Status _parse_delete_predicates(const SplitReadOptions& options);
1563
};
1564
1565
} // namespace doris::format