Coverage Report

Created: 2026-07-03 18:59

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format_v2/table_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <bvar/status.h>
21
22
#include <algorithm>
23
#include <exception>
24
#include <map>
25
#include <memory>
26
#include <optional>
27
#include <string>
28
#include <string_view>
29
#include <utility>
30
#include <vector>
31
32
#include "common/cast_set.h"
33
#include "common/exception.h"
34
#include "common/logging.h"
35
#include "common/status.h"
36
#include "core/assert_cast.h"
37
#include "core/block/block.h"
38
#include "core/column/column_array.h"
39
#include "core/column/column_const.h"
40
#include "core/column/column_map.h"
41
#include "core/column/column_nullable.h"
42
#include "core/column/column_struct.h"
43
#include "core/column/column_vector.h"
44
#include "core/data_type/data_type.h"
45
#include "core/data_type/data_type_array.h"
46
#include "core/data_type/data_type_map.h"
47
#include "core/data_type/data_type_nullable.h"
48
#include "core/data_type/data_type_number.h"
49
#include "core/data_type/data_type_string.h"
50
#include "core/data_type/data_type_struct.h"
51
#include "core/field.h"
52
#include "exec/common/stringop_substring.h"
53
#include "exprs/vexpr.h"
54
#include "exprs/vexpr_context.h"
55
#include "exprs/vexpr_fwd.h"
56
#include "exprs/vslot_ref.h"
57
#include "format_v2/column_data.h"
58
#include "format_v2/column_mapper.h"
59
#include "format_v2/expr/cast.h"
60
#include "format_v2/expr/delete_predicate.h"
61
#include "format_v2/file_reader.h"
62
#include "format_v2/parquet/reader/column_reader.h"
63
#include "format_v2/schema_projection.h"
64
#include "gen_cpp/PlanNodes_types.h"
65
#include "runtime/descriptors.h"
66
#include "storage/segment/condition_cache.h"
67
68
namespace doris {
69
class Block;
70
class ColumnPredicate;
71
struct DeleteFileDesc;
72
class RuntimeState;
73
} // namespace doris
74
75
namespace doris::format {
76
77
using DeleteRows = std::vector<int64_t>;
78
79
// Row-level predicates on table/global schema. They are rewritten to file-local expressions when
80
// possible, and remain the source of row-level filtering after localization.
81
struct TableFilter {
82
    VExprContextSPtr conjunct;
83
    std::vector<GlobalIndex> global_indices;
84
};
85
86
struct ScanTask {
87
44.0k
    virtual ~ScanTask() = default;
88
89
    std::unique_ptr<io::FileDescription> data_file;
90
};
91
92
struct ProjectedColumnBuildContext {
93
    const TFileScanRangeParams* scan_params = nullptr;
94
    const TFileRangeDesc* range = nullptr;
95
    RuntimeState* runtime_state = nullptr;
96
    std::optional<ColumnDefinition> schema_column = std::nullopt;
97
    size_t next_file_column_idx = 0;
98
};
99
100
struct ReadProfile {
101
    RuntimeProfile::Counter* num_delete_files = nullptr;
102
    RuntimeProfile::Counter* num_delete_rows = nullptr;
103
    RuntimeProfile::Counter* parse_delete_file_time = nullptr;
104
    RuntimeProfile::Counter* exec_timer = nullptr;
105
    RuntimeProfile::Counter* prepare_split_timer = nullptr;
106
    RuntimeProfile::Counter* finalize_timer = nullptr;
107
    RuntimeProfile::Counter* create_reader_timer = nullptr;
108
    RuntimeProfile::Counter* pushdown_agg_timer = nullptr;
109
    RuntimeProfile::Counter* open_reader_timer = nullptr;
110
};
111
112
struct TableReadOptions {
113
    // Columns need to be read from file and output by table reader. They are all in table/global
114
    // schema semantics.
115
    const std::vector<ColumnDefinition> projected_columns;
116
    // Simple predicates for a single column, which is parsed on scan operator.
117
    const TableColumnPredicates column_predicates;
118
    // All complex conjuncts from scan operator
119
    const VExprContextSPtrs conjuncts;
120
    // File format of the underlying data files, needed for reader initialization and reader-level
121
    // filter pushdown.
122
    const FileFormat format;
123
    TFileScanRangeParams* scan_params;
124
    std::shared_ptr<io::IOContext> io_ctx;
125
    RuntimeState* runtime_state;
126
    RuntimeProfile* scanner_profile;
127
    // File formats without complete self-describing metadata, such as CSV, Text, and JSON, need
128
    // the FE-planned physical file slots to build their file-local schema and deserialize values.
129
    const std::vector<SlotDescriptor*>* file_slot_descs = nullptr;
130
    // Push-down aggregate type.
131
    const TPushAggOp::type push_down_agg_type = TPushAggOp::type::NONE;
132
    // Digest of stable pushed-down predicates. A zero digest disables condition cache.
133
    uint64_t condition_cache_digest = 0;
134
};
135
136
struct SplitReadOptions {
137
    // Split-level information for reader initialization, which may include file path, partition values, delete file info, etc. The content is table format specific and opaque to table reader base class; it's the responsibility of the concrete table reader implementation to parse necessary information for reader initialization and filter pushdown.
138
    std::map<std::string, Field> partition_values;
139
    ShardedKVCache* cache;
140
    TFileRangeDesc current_range;
141
    FileFormat current_split_format = FileFormat::PARQUET;
142
    std::optional<GlobalRowIdContext> global_rowid_context;
143
};
144
145
// Base class for table-level readers.
146
// This layer owns common table-level orchestration, such as split iteration, dynamic partition
147
// pruning, delete handling and conversion from file-local blocks to table-schema blocks. Concrete
148
// table-format readers only need to provide format-specific hooks for opening readers and parsing
149
// split metadata.
150
class TableReader {
151
public:
152
36.8k
    virtual ~TableReader() = default;
153
154
    // Initialize common runtime options for the table reader. Subclasses may call this from their
155
    // own init(options); table-format schema and split metadata are provided later per split.
156
    virtual Status init(TableReadOptions&& options);
157
158
    // FileScannerV2 adjusts this before each get_block() using an adaptive bytes-per-row estimate.
159
    // Store it here as well as forwarding to the current reader so newly opened split readers start
160
    // with the latest predicted batch size.
161
121k
    void set_batch_size(size_t batch_size) {
162
121k
        _batch_size = std::max<size_t>(1, batch_size);
163
121k
        if (_data_reader.reader != nullptr) {
164
72.0k
            _data_reader.reader->set_batch_size(_batch_size);
165
72.0k
        }
166
121k
    }
167
168
    // Prepare for reading a new split/task.
169
    // 1. Pass a new split/task to reader, which will be used in subsequent open_reader() to initialize the underlying file reader.
170
    // 2. Parse delete predicates from split/task information, which will be used for later dynamic filtering and delete handling.
171
    virtual Status prepare_split(const SplitReadOptions& options);
172
173
    // Public entry point for reading a table-schema block. The base class opens the current reader,
174
    // advances across EOF, and closes exhausted readers. Subclasses provide protected hooks for
175
    // table-format-specific behavior.
176
118k
    virtual Status get_block(Block* block, bool* eos) {
177
118k
        SCOPED_TIMER(_profile.exec_timer);
178
118k
        DORIS_CHECK(block->columns() == _projected_columns.size());
179
118k
        block->clear_column_data(_projected_columns.size());
180
181
155k
        while (true) {
182
155k
            if (*eos) {
183
0
                return Status::OK();
184
0
            }
185
155k
            if (!_data_reader.reader) {
186
76.6k
                if (_is_table_level_count_active()) {
187
234
                    RETURN_IF_ERROR(_read_table_level_count(block, eos));
188
234
                    return Status::OK();
189
234
                }
190
76.4k
                RETURN_IF_ERROR(create_next_reader(eos));
191
76.3k
                if (!_data_reader.reader) {
192
38.3k
                    DCHECK(*eos);
193
38.3k
                    return Status::OK();
194
38.3k
                }
195
76.3k
            }
196
197
            // Materialize a reduced row set for upper aggregate operators when aggregate
198
            // pushdown can be applied. This is not the final aggregate result: COUNT emits
199
            // `count` default rows for the upper COUNT(*), and MIN/MAX emits two rows containing
200
            // file-level min/max values for the upper MIN/MAX.
201
117k
            if (!_aggregate_pushdown_tried) {
202
38.1k
                SCOPED_TIMER(_profile.pushdown_agg_timer);
203
38.1k
                bool pushed_down = false;
204
38.1k
                RETURN_IF_ERROR(_try_materialize_aggregate_pushdown_rows(block, &pushed_down));
205
38.1k
                if (pushed_down) {
206
6
                    return Status::OK();
207
6
                }
208
38.1k
            }
209
210
117k
            bool current_eof = false;
211
117k
            _data_reader.block_template.clear_column_data(
212
117k
                    cast_set<int64_t>(_data_reader.file_block_layout.size()));
213
117k
            size_t current_rows = 0;
214
117k
            RETURN_IF_ERROR(_data_reader.reader->get_block(&_data_reader.block_template,
215
117k
                                                           &current_rows, &current_eof));
216
117k
            if (current_rows == 0) {
217
37.6k
                if (current_eof) {
218
37.5k
                    _current_reader_reached_eof = true;
219
37.5k
                    RETURN_IF_ERROR(close_current_reader());
220
37.5k
                }
221
37.6k
                continue;
222
37.6k
            }
223
117k
            DCHECK_EQ(_data_reader.block_template.columns(), _data_reader.file_block_layout.size())
224
0
                    << _data_reader.block_template.dump_structure();
225
79.5k
#ifndef NDEBUG
226
79.5k
            RETURN_IF_ERROR(_check_file_block_columns("after file reader get_block", current_rows));
227
79.5k
#endif
228
79.5k
            DORIS_CHECK(block->columns() == _data_reader.column_mapper->mappings().size());
229
79.5k
            RETURN_IF_ERROR(finalize_chunk(block, current_rows));
230
79.5k
#ifndef NDEBUG
231
79.5k
            RETURN_IF_ERROR(
232
79.5k
                    _check_table_block_columns("after finalize_chunk", block, current_rows));
233
79.5k
#endif
234
79.5k
            if (current_eof) {
235
6
                _current_reader_reached_eof = true;
236
6
                RETURN_IF_ERROR(close_current_reader());
237
6
            }
238
79.5k
            return Status::OK();
239
79.5k
        }
240
118k
    }
241
242
    // Close the table reader and the currently active file reader. Subclasses that hold additional
243
    // table-format resources should override this and call TableReader::close() first.
244
32.6k
    virtual Status close() {
245
32.6k
        if (_data_reader.reader) {
246
510
            RETURN_IF_ERROR(close_current_reader());
247
510
        }
248
32.6k
        _current_task.reset();
249
32.6k
        _current_file_description.reset();
250
32.6k
        _remaining_table_level_count = -1;
251
32.6k
        return Status::OK();
252
32.6k
    }
253
254
65.1k
    int64_t condition_cache_hit_count() const { return _condition_cache_hit_count; }
255
256
    virtual std::string debug_string() const;
257
258
    virtual Status annotate_projected_column(const TFileScanSlotInfo& slot_info,
259
                                             ProjectedColumnBuildContext* context,
260
                                             ColumnDefinition* column) const;
261
262
21.2k
    virtual Status validate_projected_columns(const ProjectedColumnBuildContext& context) const {
263
21.2k
        (void)context;
264
21.2k
        return Status::OK();
265
21.2k
    }
266
267
protected:
268
    // Parse deletion vector information from table format specific file description.
269
    virtual Status _parse_deletion_vector_file(const TTableFormatFileDesc& t_desc,
270
23.1k
                                               DeleteFileDesc* desc, bool* has_delete_file) {
271
23.1k
        *has_delete_file = false;
272
23.1k
        return Status::OK();
273
23.1k
    }
274
275
    // Advance to the next reader. This closes the current reader first and then opens the next
276
    // concrete reader. Subclasses should not duplicate this loop.
277
    Status create_next_reader(bool* eos);
278
    virtual Status create_file_reader(std::unique_ptr<FileReader>* reader);
279
3.98k
    virtual TableColumnMappingMode mapping_mode() const { return TableColumnMappingMode::BY_NAME; }
280
36.1k
    virtual Status annotate_file_schema(std::vector<ColumnDefinition>* file_schema) {
281
36.1k
        DORIS_CHECK(file_schema != nullptr);
282
36.1k
        return Status::OK();
283
36.1k
    }
284
285
    // Open the concrete reader for the current split/task and build the file-local scan request.
286
38.8k
    virtual Status open_reader() {
287
38.8k
        SCOPED_TIMER(_profile.open_reader_timer);
288
        // 1. Get file schema and create column mapping.
289
38.8k
        std::vector<ColumnDefinition> file_schema;
290
38.8k
        RETURN_IF_ERROR(_data_reader.reader->get_schema(&file_schema));
291
        // For Paimon/Hudi, FE can provide field ids through `history_schema_info`. Annotate the
292
        // file schema before column mapping when the table format maps columns by field id.
293
38.8k
        RETURN_IF_ERROR(annotate_file_schema(&file_schema));
294
38.8k
        _data_reader.file_schema = file_schema;
295
38.8k
        _mapper_options.mode = mapping_mode();
296
297
38.8k
        _data_reader.column_mapper = _data_reader.reader->create_column_mapper(_mapper_options);
298
38.8k
        DORIS_CHECK(_data_reader.column_mapper != nullptr);
299
38.8k
        RETURN_IF_ERROR(_data_reader.column_mapper->create_mapping(_projected_columns,
300
38.8k
                                                                   _partition_values, file_schema));
301
38.8k
        DORIS_CHECK(_data_reader.column_mapper->mappings().size() == _projected_columns.size());
302
303
        // 2. Build table filters based on conjuncts and column predicates.
304
38.8k
        RETURN_IF_ERROR(_build_table_filters_from_conjuncts());
305
306
        // 3. Create file scan request based on column mapping and table filters, then open file
307
        // reader with the request. File scan request carries row-level expression filters and
308
        // file-level pruning hints. Only expression filters decide returned rows; column predicates
309
        // are pruning hints.
310
38.8k
        auto file_request = std::make_shared<FileScanRequest>();
311
38.8k
        RETURN_IF_ERROR(_data_reader.column_mapper->create_scan_request(
312
38.8k
                _table_filters, _table_column_predicates, _projected_columns, file_request.get(),
313
38.8k
                _runtime_state));
314
38.8k
        bool constant_filter_pruned_split = false;
315
38.8k
        RETURN_IF_ERROR(_evaluate_constant_filters(&constant_filter_pruned_split));
316
38.8k
        if (constant_filter_pruned_split) {
317
737
            RETURN_IF_ERROR(close_current_reader());
318
737
            return Status::OK();
319
737
        }
320
38.0k
        RETURN_IF_ERROR(customize_file_scan_request(file_request.get()));
321
38.0k
        RETURN_IF_ERROR(_open_local_filter_exprs(*file_request));
322
38.0k
        _data_reader.file_block_layout.clear();
323
38.0k
        _data_reader.block_template.clear();
324
38.0k
        _data_reader.file_block_layout.resize(file_request->local_positions.size());
325
326
        // 4. Build file block layout from file schema and column mapping. The layout describes
327
        // the block returned by file reader before table-column materialization.
328
308k
        for (const auto& [file_column_id, block_position] : file_request->local_positions) {
329
308k
            DORIS_CHECK(block_position.value() < _data_reader.file_block_layout.size());
330
308k
            const auto* field = _find_column_definition(_data_reader.file_schema, file_column_id);
331
308k
            DORIS_CHECK(field != nullptr);
332
333
308k
            ColumnDefinition projected_field;
334
308k
            {
335
308k
                auto it = std::find_if(
336
308k
                        file_request->non_predicate_columns.begin(),
337
308k
                        file_request->non_predicate_columns.end(),
338
7.22M
                        [&](const LocalColumnIndex& p) { return p.column_id() == file_column_id; });
339
308k
                if (it != file_request->non_predicate_columns.end()) {
340
291k
                    RETURN_IF_ERROR(project_column_definition(*field, *it, &projected_field));
341
291k
                }
342
308k
            }
343
308k
            {
344
308k
                auto it = std::find_if(
345
308k
                        file_request->predicate_columns.begin(),
346
308k
                        file_request->predicate_columns.end(),
347
308k
                        [&](const LocalColumnIndex& p) { return p.column_id() == file_column_id; });
348
308k
                if (it != file_request->predicate_columns.end()) {
349
17.0k
                    RETURN_IF_ERROR(project_column_definition(*field, *it, &projected_field));
350
17.0k
                }
351
308k
            }
352
308k
            _data_reader.file_block_layout[block_position.value()] = {
353
308k
                    .file_column_id = file_column_id,
354
308k
                    .name = projected_field.name,
355
308k
                    .type = projected_field.type,
356
308k
            };
357
308k
            DORIS_CHECK(_data_reader.file_block_layout[block_position.value()].type != nullptr);
358
308k
        }
359
360
        // 5. Prepare block template from file block layout. The block template stores the block
361
        // returned by file reader before table-column materialization.
362
38.0k
        _data_reader.block_template.reserve(_data_reader.file_block_layout.size());
363
308k
        for (const auto& column : _data_reader.file_block_layout) {
364
308k
            _data_reader.block_template.insert(
365
308k
                    {column.type->create_column(), column.type, column.name});
366
308k
        }
367
38.0k
        if (VLOG_DEBUG_IS_ON) {
368
0
            VLOG_DEBUG << "TableReader debug: " << debug_string();
369
0
        }
370
38.0k
        RETURN_IF_ERROR(_open_mapping_exprs());
371
38.0k
        RETURN_IF_ERROR(_data_reader.reader->open(file_request));
372
38.0k
        RETURN_IF_ERROR(_init_reader_condition_cache(*file_request));
373
38.0k
        return Status::OK();
374
38.0k
    }
375
376
    Status _build_table_filters_from_conjuncts();
377
    Status _open_local_filter_exprs(const FileScanRequest& file_request);
378
    Status _init_reader_condition_cache(const FileScanRequest& file_request);
379
    void _finalize_reader_condition_cache();
380
    bool _should_enable_condition_cache(const FileScanRequest& file_request) const;
381
382
38.7k
    Status _evaluate_constant_filters(bool* can_filter_all) {
383
38.7k
        DORIS_CHECK(can_filter_all != nullptr);
384
38.7k
        *can_filter_all = false;
385
38.7k
        for (const auto& table_filter : _table_filters) {
386
22.9k
            if (table_filter.conjunct == nullptr ||
387
                // RuntimeFilterExpr does not implement execute_column_impl(); it is evaluated by
388
                // the row-level filter path through execute_filter(). Constant split pruning uses
389
                // VExprContext::execute() on a one-row synthetic block, so runtime filters must not
390
                // be pre-executed here even when their referenced slot maps to a constant value.
391
22.9k
                table_filter.conjunct->root()->is_rf_wrapper() ||
392
22.9k
                !_table_filter_has_only_constant_entries(table_filter)) {
393
20.3k
                continue;
394
20.3k
            }
395
2.58k
            Block eval_block;
396
2.58k
            RETURN_IF_ERROR(_build_constant_filter_block(table_filter, &eval_block));
397
2.58k
            RowDescriptor row_desc;
398
2.58k
            RETURN_IF_ERROR(table_filter.conjunct->prepare(_runtime_state, row_desc));
399
2.58k
            RETURN_IF_ERROR(table_filter.conjunct->open(_runtime_state));
400
2.58k
            int result_column_id = -1;
401
2.58k
            RETURN_IF_ERROR(table_filter.conjunct->execute(&eval_block, &result_column_id));
402
2.58k
            DORIS_CHECK(result_column_id >= 0);
403
2.58k
            if (_filter_result_filters_all(eval_block.get_by_position(result_column_id).column)) {
404
737
                *can_filter_all = true;
405
737
                return Status::OK();
406
737
            }
407
2.58k
        }
408
38.0k
        return Status::OK();
409
38.7k
    }
410
411
16.0k
    bool _table_filter_has_only_constant_entries(const TableFilter& table_filter) const {
412
16.0k
        const auto& filter_entries = _data_reader.column_mapper->filter_entries();
413
16.1k
        for (const auto global_index : table_filter.global_indices) {
414
16.1k
            const auto entry_it = filter_entries.find(global_index);
415
16.1k
            if (entry_it == filter_entries.end() || !entry_it->second.is_constant()) {
416
13.4k
                return false;
417
13.4k
            }
418
16.1k
        }
419
2.58k
        return !table_filter.global_indices.empty();
420
16.0k
    }
421
422
2.59k
    Status _build_constant_filter_block(const TableFilter& table_filter, Block* eval_block) {
423
2.59k
        DORIS_CHECK(eval_block != nullptr);
424
2.59k
        eval_block->clear();
425
2.59k
        const auto& mappings = _data_reader.column_mapper->mappings();
426
2.59k
        const auto& filter_entries = _data_reader.column_mapper->filter_entries();
427
2.59k
        DORIS_CHECK(mappings.size() == _projected_columns.size());
428
9.87k
        for (size_t column_idx = 0; column_idx < mappings.size(); ++column_idx) {
429
7.28k
            const auto global_index = GlobalIndex(column_idx);
430
7.28k
            const auto& mapping = mappings[column_idx];
431
7.28k
            const auto entry_it = filter_entries.find(global_index);
432
7.28k
            const bool referenced_by_filter =
433
7.28k
                    std::find(table_filter.global_indices.begin(),
434
7.28k
                              table_filter.global_indices.end(),
435
7.28k
                              global_index) != table_filter.global_indices.end();
436
7.28k
            if (referenced_by_filter && entry_it != filter_entries.end() &&
437
7.28k
                entry_it->second.is_constant()) {
438
2.73k
                ColumnPtr constant_column;
439
2.73k
                RETURN_IF_ERROR(_materialize_constant_filter_column(
440
2.73k
                        entry_it->second.constant_index(), &constant_column));
441
2.73k
                eval_block->insert({std::move(constant_column), mapping.table_type,
442
2.73k
                                    mapping.table_column_name});
443
4.54k
            } else {
444
4.54k
                eval_block->insert({mapping.table_type->create_column_const_with_default_value(1),
445
4.54k
                                    mapping.table_type, mapping.table_column_name});
446
4.54k
            }
447
7.28k
        }
448
2.59k
        return Status::OK();
449
2.59k
    }
450
451
2.73k
    Status _materialize_constant_filter_column(ConstantIndex constant_index, ColumnPtr* column) {
452
2.73k
        DORIS_CHECK(column != nullptr);
453
2.73k
        const auto& constant_entry = _data_reader.column_mapper->constant_map().get(constant_index);
454
2.73k
        DORIS_CHECK(constant_entry.expr != nullptr);
455
2.73k
        DORIS_CHECK(constant_entry.type != nullptr);
456
2.73k
        RowDescriptor row_desc;
457
2.73k
        RETURN_IF_ERROR(constant_entry.expr->prepare(_runtime_state, row_desc));
458
2.73k
        RETURN_IF_ERROR(constant_entry.expr->open(_runtime_state));
459
2.73k
        Block eval_block;
460
2.73k
        eval_block.insert({constant_entry.type->create_column_const_with_default_value(1),
461
2.73k
                           constant_entry.type, "__table_reader_constant_filter"});
462
2.73k
        int result_column_id = -1;
463
2.73k
        RETURN_IF_ERROR(constant_entry.expr->execute(&eval_block, &result_column_id));
464
2.73k
        DORIS_CHECK(result_column_id >= 0);
465
2.73k
        *column = eval_block.get_by_position(result_column_id).column;
466
2.73k
        DORIS_CHECK((*column)->size() == 1);
467
2.73k
        return Status::OK();
468
2.73k
    }
469
470
2.59k
    static bool _filter_result_filters_all(const ColumnPtr& filter_column) {
471
2.59k
        DORIS_CHECK(filter_column.get() != nullptr);
472
2.59k
        DORIS_CHECK(filter_column->size() == 1);
473
2.59k
        return !filter_column->get_bool(0);
474
2.59k
    }
475
476
38.0k
    virtual Status customize_file_scan_request(FileScanRequest* file_request) {
477
38.0k
        return _append_delete_predicate(file_request);
478
38.0k
    }
479
480
154k
    bool _is_table_level_count_active() const { return _remaining_table_level_count >= 0; }
481
482
186
    Status _materialize_count_rows(size_t rows, Block* block) const {
483
186
        DORIS_CHECK(block != nullptr);
484
186
        DORIS_CHECK(block->columns() > 0 || rows == 0);
485
372
        for (size_t column_idx = 0; column_idx < block->columns(); ++column_idx) {
486
186
            auto column = block->get_by_position(column_idx).type->create_column();
487
186
            column->resize(rows);
488
186
            block->replace_by_position(column_idx, std::move(column));
489
186
        }
490
186
        return Status::OK();
491
186
    }
492
493
366
    Status _read_table_level_count(Block* block, bool* eos) {
494
366
        DORIS_CHECK(block != nullptr);
495
366
        DORIS_CHECK(eos != nullptr);
496
366
        DORIS_CHECK(_push_down_agg_type == TPushAggOp::type::COUNT);
497
366
        DORIS_CHECK(_remaining_table_level_count >= 0);
498
366
        if (_remaining_table_level_count == 0) {
499
182
            _remaining_table_level_count = -1;
500
182
            _current_task.reset();
501
182
            *eos = true;
502
182
            return Status::OK();
503
182
        }
504
505
184
        const int64_t batch_size = _runtime_state == nullptr
506
184
                                           ? _remaining_table_level_count
507
184
                                           : static_cast<int64_t>(_runtime_state->batch_size());
508
184
        const auto rows = std::min(_remaining_table_level_count, batch_size);
509
184
        RETURN_IF_ERROR(_materialize_count_rows(cast_set<size_t>(rows), block));
510
184
        _remaining_table_level_count -= rows;
511
184
        *eos = false;
512
184
        return Status::OK();
513
184
    }
514
515
    void _append_file_scan_column(FileScanRequest* request, LocalColumnId column_id,
516
5.56k
                                  std::vector<LocalColumnIndex>* scan_columns) {
517
5.56k
        DORIS_CHECK(request != nullptr);
518
5.56k
        DORIS_CHECK(scan_columns != nullptr);
519
5.56k
        FileScanRequestBuilder builder(request);
520
5.56k
        Status status;
521
5.56k
        if (scan_columns == &request->predicate_columns) {
522
5.26k
            status = builder.add_predicate_column(column_id);
523
5.26k
        } else {
524
299
            DORIS_CHECK(scan_columns == &request->non_predicate_columns);
525
299
            status = builder.add_non_predicate_column(column_id);
526
299
        }
527
5.56k
        DORIS_CHECK(status.ok()) << status.to_string();
528
5.56k
        if (column_id == LocalColumnId(ROW_POSITION_COLUMN_ID) &&
529
5.56k
            _find_column_definition(_data_reader.file_schema, column_id) == nullptr) {
530
3.27k
            _data_reader.file_schema.push_back(row_position_column_definition());
531
3.27k
        }
532
5.56k
    }
533
534
    // Append DeletePredicate to file scan request if there are deletes. The predicate will be evaluated in file reader level and filter out deleted rows before returning data to table reader.
535
37.9k
    Status _append_delete_predicate(FileScanRequest* request) {
536
37.9k
        DORIS_CHECK(request != nullptr);
537
37.9k
        if (_delete_rows == nullptr || _delete_rows->empty()) {
538
34.9k
            return Status::OK();
539
34.9k
        }
540
2.99k
        const auto row_position_column_id = LocalColumnId(ROW_POSITION_COLUMN_ID);
541
2.99k
        _append_file_scan_column(request, row_position_column_id, &request->predicate_columns);
542
543
2.99k
        auto delete_predicate = std::make_shared<DeletePredicate>(*_delete_rows);
544
2.99k
        const auto block_position = request->local_positions.at(row_position_column_id);
545
2.99k
        delete_predicate->add_child(VSlotRef::create_shared(
546
2.99k
                cast_set<int>(block_position.value()), cast_set<int>(block_position.value()), -1,
547
2.99k
                std::make_shared<DataTypeInt64>(), ROW_POSITION_COLUMN_NAME));
548
549
2.99k
        request->delete_conjuncts.push_back(
550
2.99k
                VExprContext::create_shared(std::move(delete_predicate)));
551
2.99k
        return Status::OK();
552
37.9k
    }
553
554
    // Close the current concrete reader. This hook is called by both create_next_reader() and
555
    // close(), so it should remain idempotent.
556
38.8k
    virtual Status close_current_reader() {
557
38.8k
        _finalize_reader_condition_cache();
558
38.8k
        RETURN_IF_ERROR(_data_reader.reader->close());
559
38.8k
        _data_reader.reader.reset();
560
38.8k
        if (_data_reader.column_mapper != nullptr) {
561
38.8k
            _data_reader.column_mapper->clear();
562
38.8k
            _data_reader.column_mapper.reset();
563
38.8k
        }
564
38.8k
        _table_filters.clear();
565
38.8k
        _data_reader.file_schema.clear();
566
38.8k
        _data_reader.file_block_layout.clear();
567
38.8k
        _data_reader.block_template.clear();
568
38.8k
        _current_task.reset();
569
38.8k
        _current_file_description.reset();
570
38.8k
        _current_reader_reached_eof = false;
571
38.8k
        return Status::OK();
572
38.8k
    }
573
574
    // Finalize file-local block to table/global schema block.
575
79.5k
    Status finalize_chunk(Block* block, const size_t rows) {
576
79.5k
        SCOPED_TIMER(_profile.finalize_timer);
577
79.5k
        size_t idx = 0;
578
507k
        for (const auto& mapping : _data_reader.column_mapper->mappings()) {
579
507k
            ColumnPtr column;
580
507k
            RETURN_IF_ERROR(_materialize_mapping_column(mapping, &_data_reader.block_template, rows,
581
507k
                                                        &column));
582
507k
            block->replace_by_position(idx, IColumn::mutate(std::move(column)));
583
507k
            idx++;
584
507k
        }
585
79.5k
        RETURN_IF_ERROR(materialize_virtual_columns(block));
586
        // Enforce CHAR/VARCHAR length declared by the table schema after all file-to-table
587
        // materialization has finished.
588
79.5k
        RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block));
589
79.5k
        return Status::OK();
590
79.5k
    }
591
592
    // Materialize virtual columns in the table block, such as Iceberg _row_id and
593
    // _last_updated_sequence_number. This runs after normal column materialization so finalize
594
    // expressions can reference those virtual columns.
595
61.6k
    virtual Status materialize_virtual_columns(Block* table_block) { return Status::OK(); }
596
597
#ifndef NDEBUG
598
79.5k
    Status _check_file_block_columns(std::string_view stage, size_t rows) {
599
79.5k
        DORIS_CHECK(_data_reader.block_template.columns() == _data_reader.file_block_layout.size());
600
576k
        for (size_t idx = 0; idx < _data_reader.block_template.columns(); ++idx) {
601
497k
            const auto& file_block_column = _data_reader.file_block_layout[idx];
602
497k
            const auto& column_with_type = _data_reader.block_template.get_by_position(idx);
603
497k
            const auto* column = column_with_type.column.get();
604
497k
            try {
605
497k
                if (column == nullptr) {
606
0
                    auto st = Status::InternalError(
607
0
                            "Invalid file block column {} at {}: file_column_id={}, name='{}', "
608
0
                            "type={}, column=null, expected_rows={}, reader={}",
609
0
                            idx, stage, file_block_column.file_column_id.value(),
610
0
                            file_block_column.name,
611
0
                            file_block_column.type == nullptr ? "null"
612
0
                                                              : file_block_column.type->get_name(),
613
0
                            rows, debug_string());
614
0
                    LOG(WARNING) << st;
615
0
                    return st;
616
0
                }
617
497k
                column->sanity_check();
618
497k
                auto st = column_with_type.check_type_and_column_match();
619
497k
                if (!st.ok()) {
620
0
                    auto contextual_status = Status::InternalError(
621
0
                            "Invalid file block column {} at {}: file_column_id={}, name='{}', "
622
0
                            "type={}, column={}, column_size={}, expected_rows={}, error={}, "
623
0
                            "reader={}",
624
0
                            idx, stage, file_block_column.file_column_id.value(),
625
0
                            file_block_column.name,
626
0
                            file_block_column.type == nullptr ? "null"
627
0
                                                              : file_block_column.type->get_name(),
628
0
                            column->get_name(), column->size(), rows, st.to_string(),
629
0
                            debug_string());
630
0
                    LOG(WARNING) << contextual_status;
631
0
                    return contextual_status;
632
0
                }
633
497k
            } catch (const Exception& e) {
634
0
                auto st = Status::InternalError(
635
0
                        "Invalid file block column {} at {}: file_column_id={}, name='{}', "
636
0
                        "type={}, column={}, column_size={}, expected_rows={}, error={}, "
637
0
                        "reader={}",
638
0
                        idx, stage, file_block_column.file_column_id.value(),
639
0
                        file_block_column.name,
640
0
                        file_block_column.type == nullptr ? "null"
641
0
                                                          : file_block_column.type->get_name(),
642
0
                        column == nullptr ? "null" : column->get_name(),
643
0
                        column == nullptr ? 0 : column->size(), rows, e.to_string(),
644
0
                        debug_string());
645
0
                LOG(WARNING) << st;
646
0
                return st;
647
0
            } catch (const std::exception& e) {
648
0
                auto st = Status::InternalError(
649
0
                        "Invalid file block column {} at {}: file_column_id={}, name='{}', "
650
0
                        "type={}, column={}, column_size={}, expected_rows={}, error={}, "
651
0
                        "reader={}",
652
0
                        idx, stage, file_block_column.file_column_id.value(),
653
0
                        file_block_column.name,
654
0
                        file_block_column.type == nullptr ? "null"
655
0
                                                          : file_block_column.type->get_name(),
656
0
                        column == nullptr ? "null" : column->get_name(),
657
0
                        column == nullptr ? 0 : column->size(), rows, e.what(), debug_string());
658
0
                LOG(WARNING) << st;
659
0
                return st;
660
0
            }
661
497k
        }
662
79.5k
        return Status::OK();
663
79.5k
    }
664
665
79.6k
    Status _check_table_block_columns(std::string_view stage, const Block* block, size_t rows) {
666
79.6k
        DORIS_CHECK(block != nullptr);
667
79.6k
        DORIS_CHECK(block->columns() == _data_reader.column_mapper->mappings().size());
668
586k
        for (size_t idx = 0; idx < block->columns(); ++idx) {
669
507k
            const auto& mapping = _data_reader.column_mapper->mappings()[idx];
670
507k
            const auto& column_with_type = block->get_by_position(idx);
671
507k
            const auto* column = column_with_type.column.get();
672
507k
            try {
673
507k
                if (column == nullptr) {
674
0
                    auto st = Status::InternalError(
675
0
                            "Invalid table block column {} at {}: table_column='{}', "
676
0
                            "global_index={}, type={}, column=null, expected_rows={}, mapping={}",
677
0
                            idx, stage, mapping.table_column_name, mapping.global_index.value(),
678
0
                            mapping.table_type == nullptr ? "null" : mapping.table_type->get_name(),
679
0
                            rows, mapping.debug_string());
680
0
                    LOG(WARNING) << st;
681
0
                    return st;
682
0
                }
683
507k
                column->sanity_check();
684
507k
                auto st = column_with_type.check_type_and_column_match();
685
507k
                if (!st.ok()) {
686
0
                    auto contextual_status = Status::InternalError(
687
0
                            "Invalid table block column {} at {}: table_column='{}', "
688
0
                            "global_index={}, type={}, column={}, column_size={}, "
689
0
                            "expected_rows={}, error={}, mapping={}",
690
0
                            idx, stage, mapping.table_column_name, mapping.global_index.value(),
691
0
                            mapping.table_type == nullptr ? "null" : mapping.table_type->get_name(),
692
0
                            column->get_name(), column->size(), rows, st.to_string(),
693
0
                            mapping.debug_string());
694
0
                    LOG(WARNING) << contextual_status;
695
0
                    return contextual_status;
696
0
                }
697
507k
            } catch (const Exception& e) {
698
0
                auto st = Status::InternalError(
699
0
                        "Invalid table block column {} at {}: table_column='{}', global_index={}, "
700
0
                        "type={}, column={}, column_size={}, expected_rows={}, error={}, "
701
0
                        "mapping={}",
702
0
                        idx, stage, mapping.table_column_name, mapping.global_index.value(),
703
0
                        mapping.table_type == nullptr ? "null" : mapping.table_type->get_name(),
704
0
                        column == nullptr ? "null" : column->get_name(),
705
0
                        column == nullptr ? 0 : column->size(), rows, e.to_string(),
706
0
                        mapping.debug_string());
707
0
                LOG(WARNING) << st;
708
0
                return st;
709
0
            } catch (const std::exception& e) {
710
0
                auto st = Status::InternalError(
711
0
                        "Invalid table block column {} at {}: table_column='{}', global_index={}, "
712
0
                        "type={}, column={}, column_size={}, expected_rows={}, error={}, "
713
0
                        "mapping={}",
714
0
                        idx, stage, mapping.table_column_name, mapping.global_index.value(),
715
0
                        mapping.table_type == nullptr ? "null" : mapping.table_type->get_name(),
716
0
                        column == nullptr ? "null" : column->get_name(),
717
0
                        column == nullptr ? 0 : column->size(), rows, e.what(),
718
0
                        mapping.debug_string());
719
0
                LOG(WARNING) << st;
720
0
                return st;
721
0
            }
722
507k
        }
723
79.6k
        return Status::OK();
724
79.6k
    }
725
#endif
726
727
79.6k
    Status _truncate_char_or_varchar_columns(Block* block) {
728
79.6k
        DORIS_CHECK(block != nullptr);
729
79.6k
        if (_runtime_state == nullptr ||
730
79.6k
            !_runtime_state->query_options().truncate_char_or_varchar_columns) {
731
79.5k
            return Status::OK();
732
79.5k
        }
733
10
        DORIS_CHECK(block->columns() == _data_reader.column_mapper->mappings().size());
734
34
        for (size_t idx = 0; idx < _data_reader.column_mapper->mappings().size(); ++idx) {
735
24
            const auto& mapping = _data_reader.column_mapper->mappings()[idx];
736
24
            if (!_should_truncate_char_or_varchar_column(mapping)) {
737
8
                continue;
738
8
            }
739
16
            const auto target_len =
740
16
                    assert_cast<const DataTypeString*>(remove_nullable(mapping.table_type).get())
741
16
                            ->len();
742
16
            _truncate_char_or_varchar_column(block, idx, target_len);
743
16
        }
744
10
        return Status::OK();
745
79.6k
    }
746
747
    // Return true when the table schema has a bounded CHAR/VARCHAR length that is stricter than
748
    // the file-side type. Examples:
749
    // - table VARCHAR(10), file VARCHAR(20): truncate to 10;
750
    // - table VARCHAR(10), file STRING: truncate to 10 because STRING has no declared bound;
751
    // - table STRING, any file type: no truncation because the target has no bound.
752
29
    static bool _should_truncate_char_or_varchar_column(const ColumnMapping& mapping) {
753
29
        if (mapping.table_type == nullptr) {
754
0
            return false;
755
0
        }
756
29
        const auto table_type = remove_nullable(mapping.table_type);
757
29
        const auto primitive_type = table_type->get_primitive_type();
758
29
        if (primitive_type != TYPE_VARCHAR && primitive_type != TYPE_CHAR) {
759
9
            return false;
760
9
        }
761
20
        const auto target_len = assert_cast<const DataTypeString*>(table_type.get())->len();
762
20
        if (target_len <= 0) {
763
0
            return false;
764
0
        }
765
20
        if (mapping.file_type == nullptr) {
766
0
            return true;
767
0
        }
768
20
        const auto file_type = remove_nullable(mapping.file_type);
769
20
        DORIS_CHECK(file_type != nullptr);
770
20
        int file_len = -1;
771
20
        if (file_type->get_primitive_type() == TYPE_VARCHAR ||
772
20
            file_type->get_primitive_type() == TYPE_CHAR ||
773
20
            file_type->get_primitive_type() == TYPE_STRING) {
774
19
            file_len = assert_cast<const DataTypeString*>(file_type.get())->len();
775
19
        }
776
777
20
        return file_len < 0 || target_len < file_len;
778
20
    }
779
780
    // Truncate a materialized CHAR/VARCHAR column in place by reusing the vectorized substring
781
    // implementation: substring(column, 1, len). Nullable columns are unwrapped before substring
782
    // execution and wrapped back with the original null map afterward, because substring operates
783
    // on the nested string payload only.
784
17
    static void _truncate_char_or_varchar_column(Block* block, size_t idx, int len) {
785
17
        DORIS_CHECK(block != nullptr);
786
17
        auto int_type = std::make_shared<DataTypeInt32>();
787
17
        const auto num_columns_without_result = cast_set<uint32_t>(block->columns());
788
17
        auto& target = block->get_by_position(idx);
789
17
        const bool is_nullable = target.type->is_nullable();
790
17
        ColumnPtr input_column = target.column;
791
17
        ColumnPtr null_map_column;
792
17
        if (is_nullable) {
793
17
            const auto* nullable_column = assert_cast<const ColumnNullable*>(target.column.get());
794
17
            input_column = nullable_column->get_nested_column_ptr();
795
17
            null_map_column = nullable_column->get_null_map_column_ptr();
796
17
        }
797
17
        block->replace_by_position(idx, std::move(input_column));
798
17
        block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(1)),
799
17
                       int_type, "const 1"});
800
17
        block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(len)),
801
17
                       int_type, "const len"});
802
17
        block->insert({nullptr, std::make_shared<DataTypeString>(), "result"});
803
804
17
        ColumnNumbers temp_arguments(3);
805
17
        temp_arguments[0] = cast_set<uint32_t>(idx);
806
17
        temp_arguments[1] = num_columns_without_result;
807
17
        temp_arguments[2] = num_columns_without_result + 1;
808
17
        const uint32_t result_column_id = num_columns_without_result + 2;
809
17
        SubstringUtil::substring_execute(*block, temp_arguments, result_column_id, block->rows());
810
811
17
        ColumnPtr result_column = block->get_by_position(result_column_id).column;
812
17
        if (is_nullable) {
813
17
            result_column = ColumnNullable::create(std::move(result_column), null_map_column);
814
17
        }
815
17
        block->replace_by_position(idx, std::move(result_column));
816
17
        block->erase_tail(num_columns_without_result);
817
17
    }
818
819
38.0k
    Status _try_materialize_aggregate_pushdown_rows(Block* block, bool* pushed_down) {
820
38.0k
        DORIS_CHECK(block != nullptr);
821
38.0k
        DORIS_CHECK(pushed_down != nullptr);
822
38.0k
        *pushed_down = false;
823
38.0k
        block->clear_column_data(_projected_columns.size());
824
38.0k
        _aggregate_pushdown_tried = true;
825
38.0k
        if (!_supports_aggregate_pushdown(_push_down_agg_type)) {
826
38.0k
            return Status::OK();
827
38.0k
        }
828
829
41
        FileAggregateRequest file_request;
830
41
        RETURN_IF_ERROR(_build_file_aggregate_request(_push_down_agg_type, &file_request));
831
41
        FileAggregateResult file_result;
832
41
        const auto status = _data_reader.reader->get_aggregate_result(file_request, &file_result);
833
41
        if (status.is<ErrorCode::NOT_IMPLEMENTED_ERROR>()) {
834
1
            return Status::OK();
835
1
        }
836
40
        RETURN_IF_ERROR(status);
837
40
        RETURN_IF_ERROR(
838
40
                _materialize_aggregate_pushdown_rows(_push_down_agg_type, file_result, block));
839
40
        *pushed_down = true;
840
40
        RETURN_IF_ERROR(close_current_reader());
841
40
        return Status::OK();
842
40
    }
843
844
38.0k
    virtual bool _supports_aggregate_pushdown(TPushAggOp::type agg_type) const {
845
        // Only COUNT and MIN/MAX can be push down.
846
38.0k
        if (agg_type != TPushAggOp::type::COUNT && agg_type != TPushAggOp::type::MINMAX) {
847
36.4k
            return false;
848
36.4k
        }
849
        // Only support aggregate pushdown when there is no delete, filter and column predicate, so
850
        // the reduced rows consumed by the upper aggregate remain semantically equivalent to a
851
        // normal scan.
852
1.58k
        if (_delete_rows != nullptr && !_delete_rows->empty()) {
853
527
            return false;
854
527
        }
855
1.06k
        if (!_table_filters.empty() || !_table_column_predicates.empty()) {
856
1.03k
            return false;
857
1.03k
        }
858
29
        if (agg_type == TPushAggOp::type::COUNT) {
859
5
            return true;
860
5
        }
861
        // For MIN/MAX, only support direct file-to-table column mappings. The two emitted rows
862
        // must be enough for the upper MIN/MAX aggregate without evaluating default expressions or
863
        // virtual columns.
864
24
        for (const auto& mapping : _data_reader.column_mapper->mappings()) {
865
14
            if (!mapping.file_local_id.has_value() ||
866
14
                mapping.virtual_column_type != TableVirtualColumnType::INVALID ||
867
14
                mapping.default_expr != nullptr || mapping.file_type == nullptr ||
868
14
                mapping.table_type == nullptr) {
869
1
                return false;
870
1
            }
871
13
            if (!_can_push_down_minmax_for_mapping(mapping)) {
872
1
                return false;
873
1
            }
874
13
        }
875
22
        return true;
876
24
    }
877
878
503k
    static ColumnPtr _detach_column(ColumnPtr column) {
879
503k
        DORIS_CHECK(column.get() != nullptr);
880
503k
        return IColumn::mutate(std::move(column));
881
503k
    }
882
883
52.8k
    static Status _align_column_nullability(ColumnPtr* column, const DataTypePtr& table_type) {
884
52.8k
        DORIS_CHECK(column != nullptr);
885
52.8k
        DORIS_CHECK(column->get() != nullptr);
886
52.8k
        DORIS_CHECK(table_type != nullptr);
887
        // Must return non-const column
888
52.8k
        *column = (*column)->convert_to_full_column_if_const();
889
52.8k
        if (table_type->is_nullable()) {
890
26.3k
            const auto& nested_type =
891
26.3k
                    assert_cast<const DataTypeNullable&>(*table_type).get_nested_type();
892
26.3k
            if (!(*column)->is_nullable()) {
893
2
                RETURN_IF_ERROR(_align_column_nullability(column, nested_type));
894
2
                *column = make_nullable(*column);
895
2
                return Status::OK();
896
2
            }
897
26.3k
            const auto& nullable_column = assert_cast<const ColumnNullable&>(**column);
898
26.3k
            ColumnPtr nested_column = nullable_column.get_nested_column_ptr();
899
26.3k
            RETURN_IF_ERROR(_align_column_nullability(&nested_column, nested_type));
900
26.3k
            *column = ColumnNullable::create(nested_column,
901
26.3k
                                             nullable_column.get_null_map_column_ptr());
902
26.3k
            return Status::OK();
903
26.3k
        }
904
26.4k
        if ((*column)->is_nullable()) {
905
0
            const auto& nullable_column = assert_cast<const ColumnNullable&>(**column);
906
0
            if (nullable_column.has_null()) {
907
0
                return Status::InternalError(
908
0
                        "Default expression produced NULL for non-nullable table column");
909
0
            }
910
0
            ColumnPtr nested_column = nullable_column.get_nested_column_ptr();
911
0
            RETURN_IF_ERROR(_align_column_nullability(&nested_column, table_type));
912
0
            *column = nested_column;
913
0
            return Status::OK();
914
0
        }
915
26.4k
        if (const auto* array_type = typeid_cast<const DataTypeArray*>(table_type.get())) {
916
47
            const auto& array_column = assert_cast<const ColumnArray&>(**column);
917
47
            ColumnPtr nested_column = array_column.get_data_ptr();
918
47
            RETURN_IF_ERROR(
919
47
                    _align_column_nullability(&nested_column, array_type->get_nested_type()));
920
47
            *column = ColumnArray::create(nested_column, array_column.get_offsets_ptr());
921
47
            return Status::OK();
922
47
        }
923
26.3k
        if (const auto* map_type = typeid_cast<const DataTypeMap*>(table_type.get())) {
924
0
            const auto& map_column = assert_cast<const ColumnMap&>(**column);
925
0
            ColumnPtr key_column = map_column.get_keys_ptr();
926
0
            ColumnPtr value_column = map_column.get_values_ptr();
927
0
            RETURN_IF_ERROR(_align_column_nullability(&key_column, map_type->get_key_type()));
928
0
            RETURN_IF_ERROR(_align_column_nullability(&value_column, map_type->get_value_type()));
929
0
            *column = ColumnMap::create(key_column, value_column, map_column.get_offsets_ptr());
930
0
            return Status::OK();
931
0
        }
932
26.3k
        if (const auto* struct_type = typeid_cast<const DataTypeStruct*>(table_type.get())) {
933
3.08k
            const auto& struct_column = assert_cast<const ColumnStruct&>(**column);
934
3.08k
            Columns columns = struct_column.get_columns_copy();
935
3.08k
            DORIS_CHECK(columns.size() == struct_type->get_elements().size());
936
10.4k
            for (size_t i = 0; i < columns.size(); ++i) {
937
7.37k
                RETURN_IF_ERROR(
938
7.37k
                        _align_column_nullability(&columns[i], struct_type->get_element(i)));
939
7.37k
            }
940
3.08k
            *column = ColumnStruct::create(columns);
941
3.08k
            return Status::OK();
942
3.08k
        }
943
23.3k
        return Status::OK();
944
26.3k
    }
945
946
    static Status _execute_default_expr_without_root_type_check(
947
            const VExprContextSPtr& default_expr, const Block* block,
948
12.8k
            ColumnWithTypeAndName* result_data) {
949
12.8k
        DORIS_CHECK(default_expr != nullptr);
950
12.8k
        DORIS_CHECK(block != nullptr);
951
12.8k
        DORIS_CHECK(result_data != nullptr);
952
12.8k
        ColumnPtr result_column;
953
12.8k
        Status st;
954
12.8k
        RETURN_IF_CATCH_EXCEPTION({
955
12.8k
            st = default_expr->root()->execute_column_impl(default_expr.get(), block, nullptr,
956
12.8k
                                                           block->rows(), result_column);
957
12.8k
        });
958
12.8k
        RETURN_IF_ERROR(st);
959
12.8k
        DORIS_CHECK(result_column.get() != nullptr);
960
12.8k
        if (result_column->size() != block->rows()) {
961
0
            return Status::InternalError(
962
0
                    "Default expr {} return column size {} not equal to expected size {}",
963
0
                    default_expr->expr_name(), result_column->size(), block->rows());
964
0
        }
965
12.8k
        result_data->column = result_column;
966
12.8k
        result_data->type = default_expr->execute_type(block);
967
12.8k
        result_data->name = default_expr->expr_name();
968
12.8k
        return Status::OK();
969
12.8k
    }
970
971
    Status _cast_column_to_type(ColumnPtr* column, const DataTypePtr& file_type,
972
                                const DataTypePtr& table_type,
973
1.74k
                                const std::string& column_name) const {
974
1.74k
        DORIS_CHECK(column != nullptr);
975
1.74k
        DORIS_CHECK(column->get() != nullptr);
976
1.74k
        DORIS_CHECK(file_type != nullptr);
977
1.74k
        DORIS_CHECK(table_type != nullptr);
978
1.74k
        if (file_type->equals(*table_type)) {
979
0
            return Status::OK();
980
0
        }
981
982
1.74k
        DataTypePtr input_type = file_type;
983
1.74k
        if ((*column)->is_nullable() && !input_type->is_nullable()) {
984
0
            input_type = make_nullable(input_type);
985
0
        }
986
1.74k
        Block cast_block;
987
1.74k
        cast_block.insert({*column, input_type, column_name});
988
1.74k
        auto slot_ref = VSlotRef::create_shared(0, 0, -1, input_type, column_name);
989
1.74k
        auto cast_expr = Cast::create_shared(table_type);
990
1.74k
        cast_expr->add_child(std::move(slot_ref));
991
1.74k
        auto cast_ctx = VExprContext::create_shared(std::move(cast_expr));
992
1.74k
        RowDescriptor row_desc;
993
1.74k
        RETURN_IF_ERROR(cast_ctx->prepare(_runtime_state, row_desc));
994
1.74k
        RETURN_IF_ERROR(cast_ctx->open(_runtime_state));
995
1.74k
        ColumnPtr cast_column;
996
1.74k
        RETURN_IF_ERROR(cast_ctx->execute(&cast_block, cast_column));
997
1.74k
        *column = std::move(cast_column);
998
1.74k
        return Status::OK();
999
1.74k
    }
1000
1001
    Status _materialize_present_child_mapping_column(const ColumnMapping& mapping,
1002
                                                     const ColumnPtr& file_column,
1003
6.22k
                                                     const size_t rows, ColumnPtr* column) {
1004
6.22k
        DORIS_CHECK(column != nullptr);
1005
6.22k
        DORIS_CHECK(mapping.file_type != nullptr);
1006
6.22k
        DORIS_CHECK(mapping.table_type != nullptr);
1007
6.22k
        *column = file_column;
1008
6.22k
        if (!mapping.is_trivial) {
1009
2.74k
            if (!mapping.child_mappings.empty()) {
1010
1.00k
                RETURN_IF_ERROR(
1011
1.00k
                        _materialize_complex_mapping_column(mapping, *column, rows, column));
1012
1.74k
            } else {
1013
1.74k
                RETURN_IF_ERROR(_cast_column_to_type(column, mapping.file_type, mapping.table_type,
1014
1.74k
                                                     mapping.file_column_name));
1015
1.74k
            }
1016
2.74k
        }
1017
6.22k
        RETURN_IF_ERROR(_align_column_nullability(column, mapping.table_type));
1018
6.22k
        return Status::OK();
1019
6.22k
    }
1020
1021
    Status _materialize_mapping_column(const ColumnMapping& mapping, Block* current_block,
1022
507k
                                       const size_t rows, ColumnPtr* column) {
1023
507k
        if (!mapping.is_trivial && mapping.file_local_id.has_value() &&
1024
507k
            !mapping.child_mappings.empty()) {
1025
3.19k
            DCHECK(mapping.projection != nullptr);
1026
3.19k
            int res_id;
1027
3.19k
            auto st = mapping.projection->execute(current_block, &res_id);
1028
3.19k
            if (!st.ok()) {
1029
0
                return Status::InternalError(
1030
0
                        "Failed to execute complex mapping projection for table column '{}' "
1031
0
                        "(global_index={}, file_local_id={}, rows={}): {}, mapping={}",
1032
0
                        mapping.table_column_name, mapping.global_index.value(),
1033
0
                        *mapping.file_local_id, rows, st.to_string(), mapping.debug_string());
1034
0
            }
1035
3.19k
            ColumnPtr result_column = current_block->get_by_position(res_id).column;
1036
3.19k
            RETURN_IF_ERROR(
1037
3.19k
                    _materialize_complex_mapping_column(mapping, result_column, rows, column));
1038
3.19k
            return Status::OK();
1039
3.19k
        }
1040
504k
        if (mapping.projection != nullptr) {
1041
490k
            int res_id;
1042
490k
            auto st = mapping.projection->execute(current_block, &res_id);
1043
490k
            if (!st.ok()) {
1044
0
                std::string file_local_id = "null";
1045
0
                if (mapping.file_local_id.has_value()) {
1046
0
                    file_local_id = std::to_string(*mapping.file_local_id);
1047
0
                }
1048
0
                return Status::InternalError(
1049
0
                        "Failed to execute mapping projection for table column '{}' "
1050
0
                        "(global_index={}, file_local_id={}, rows={}): {}, mapping={}",
1051
0
                        mapping.table_column_name, mapping.global_index.value(), file_local_id,
1052
0
                        rows, st.to_string(), mapping.debug_string());
1053
0
            }
1054
490k
            ColumnPtr result_column = current_block->get_by_position(res_id).column;
1055
490k
            *column = _detach_column(std::move(result_column));
1056
490k
            return Status::OK();
1057
490k
        }
1058
13.1k
        if (mapping.default_expr != nullptr) {
1059
12.8k
            if (current_block->rows() == rows) {
1060
9.74k
                ColumnWithTypeAndName result;
1061
9.74k
                RETURN_IF_ERROR(_execute_default_expr_without_root_type_check(
1062
9.74k
                        mapping.default_expr, current_block, &result));
1063
9.74k
                ColumnPtr result_column = result.column;
1064
9.74k
                RETURN_IF_ERROR(_align_column_nullability(&result_column, mapping.table_type));
1065
9.74k
                *column = _detach_column(std::move(result_column));
1066
9.74k
            } else {
1067
3.05k
                DORIS_CHECK(mapping.constant_index.has_value());
1068
3.05k
                Block eval_block;
1069
3.05k
                eval_block.insert({mapping.table_type->create_column_const_with_default_value(rows),
1070
3.05k
                                   mapping.table_type, "__table_reader_const_rows"});
1071
3.05k
                ColumnWithTypeAndName result;
1072
3.05k
                RETURN_IF_ERROR(_execute_default_expr_without_root_type_check(
1073
3.05k
                        mapping.default_expr, &eval_block, &result));
1074
3.05k
                ColumnPtr result_column = result.column;
1075
3.05k
                RETURN_IF_ERROR(_align_column_nullability(&result_column, mapping.table_type));
1076
3.05k
                *column = _detach_column(std::move(result_column));
1077
3.05k
            }
1078
12.8k
            return Status::OK();
1079
12.8k
        }
1080
371
        ColumnPtr result_column = mapping.table_type->create_column_const_with_default_value(rows);
1081
371
        *column = _detach_column(std::move(result_column));
1082
371
        return Status::OK();
1083
13.1k
    }
1084
1085
    Status _materialize_complex_mapping_column(const ColumnMapping& mapping,
1086
                                               const ColumnPtr& file_column, const size_t rows,
1087
4.19k
                                               ColumnPtr* column) {
1088
4.19k
        DORIS_CHECK(mapping.table_type != nullptr);
1089
4.19k
        DORIS_CHECK(file_column.get() != nullptr);
1090
4.19k
        const auto table_type = remove_nullable(mapping.table_type);
1091
4.19k
        switch (table_type->get_primitive_type()) {
1092
1.93k
        case TYPE_STRUCT:
1093
1.93k
            RETURN_IF_ERROR(_materialize_struct_mapping_column(mapping, file_column, rows, column));
1094
1.93k
            break;
1095
1.93k
        case TYPE_ARRAY:
1096
1.34k
            RETURN_IF_ERROR(_materialize_array_mapping_column(mapping, file_column, rows, column));
1097
1.34k
            break;
1098
1.34k
        case TYPE_MAP:
1099
921
            RETURN_IF_ERROR(_materialize_map_mapping_column(mapping, file_column, rows, column));
1100
921
            break;
1101
921
        default:
1102
0
            *column = _detach_column(file_column);
1103
0
            break;
1104
4.19k
        }
1105
4.19k
        return Status::OK();
1106
4.19k
    }
1107
1108
    static std::vector<const ColumnMapping*> _present_child_mappings_in_file_order(
1109
1.93k
            const std::vector<ColumnMapping>& child_mappings) {
1110
1.93k
        std::vector<const ColumnMapping*> result;
1111
1.93k
        result.reserve(child_mappings.size());
1112
4.99k
        for (const auto& child_mapping : child_mappings) {
1113
4.99k
            if (child_mapping.file_local_id.has_value()) {
1114
3.02k
                result.push_back(&child_mapping);
1115
3.02k
            }
1116
4.99k
        }
1117
2.29k
        std::ranges::sort(result, [](const ColumnMapping* lhs, const ColumnMapping* rhs) {
1118
2.29k
            DORIS_CHECK(lhs->file_local_id.has_value());
1119
2.29k
            DORIS_CHECK(rhs->file_local_id.has_value());
1120
2.29k
            return *lhs->file_local_id < *rhs->file_local_id;
1121
2.29k
        });
1122
1.93k
        return result;
1123
1.93k
    }
1124
1125
    static size_t _file_child_ordinal_for_mapping(
1126
            const ColumnMapping& mapping, const ColumnMapping& child_mapping,
1127
3.02k
            const std::vector<const ColumnMapping*>& file_ordered_children) {
1128
3.02k
        DORIS_CHECK(child_mapping.file_local_id.has_value());
1129
3.02k
        if (!mapping.projected_file_children.empty()) {
1130
3.02k
            const auto child_it = std::ranges::find_if(
1131
5.17k
                    mapping.projected_file_children, [&](const ColumnDefinition& file_child) {
1132
5.17k
                        return file_child.file_local_id() == *child_mapping.file_local_id;
1133
5.17k
                    });
1134
3.02k
            DORIS_CHECK(child_it != mapping.projected_file_children.end());
1135
3.02k
            return static_cast<size_t>(
1136
3.02k
                    std::distance(mapping.projected_file_children.begin(), child_it));
1137
3.02k
        }
1138
4
        const auto child_it = std::ranges::find(file_ordered_children, &child_mapping);
1139
4
        DORIS_CHECK(child_it != file_ordered_children.end());
1140
4
        return static_cast<size_t>(std::distance(file_ordered_children.begin(), child_it));
1141
3.02k
    }
1142
1143
    static std::vector<const ColumnMapping*> _child_mappings_in_table_type_order(
1144
1.93k
            const ColumnMapping& mapping, const DataTypeStruct& table_type) {
1145
1.93k
        std::vector<const ColumnMapping*> result;
1146
1.93k
        result.reserve(mapping.child_mappings.size());
1147
6.92k
        for (size_t child_idx = 0; child_idx < table_type.get_elements().size(); ++child_idx) {
1148
4.99k
            const auto& child_name = table_type.get_element_name(child_idx);
1149
4.99k
            const auto child_it = std::ranges::find_if(
1150
10.0k
                    mapping.child_mappings, [&](const ColumnMapping& child_mapping) {
1151
10.0k
                        return child_mapping.table_column_name == child_name;
1152
10.0k
                    });
1153
4.99k
            DORIS_CHECK(child_it != mapping.child_mappings.end())
1154
0
                    << mapping.debug_string() << ", table_child_name=" << child_name;
1155
4.99k
            result.push_back(&*child_it);
1156
4.99k
        }
1157
1.93k
        return result;
1158
1.93k
    }
1159
1160
    static const IColumn* _nested_column_if_nullable(const ColumnPtr& column,
1161
4.20k
                                                     const NullMap** null_map) {
1162
4.20k
        DORIS_CHECK(column.get() != nullptr);
1163
4.20k
        if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) {
1164
4.19k
            if (null_map != nullptr) {
1165
4.19k
                *null_map = &nullable_column->get_null_map_data();
1166
4.19k
            }
1167
4.19k
            return &nullable_column->get_nested_column();
1168
4.19k
        }
1169
4
        return column.get();
1170
4.20k
    }
1171
1172
    Status _materialize_struct_mapping_column(const ColumnMapping& mapping,
1173
                                              const ColumnPtr& file_column, const size_t rows,
1174
1.93k
                                              ColumnPtr* column) {
1175
1.93k
        DORIS_CHECK(mapping.table_type != nullptr);
1176
1.93k
        const auto* table_type =
1177
1.93k
                assert_cast<const DataTypeStruct*>(remove_nullable(mapping.table_type).get());
1178
1.93k
        const auto full_file_column = file_column->convert_to_full_column_if_const();
1179
1.93k
        const NullMap* parent_null_map = nullptr;
1180
1.93k
        const auto* nested_file_column =
1181
1.93k
                _nested_column_if_nullable(full_file_column, &parent_null_map);
1182
1.93k
        const auto* file_struct = assert_cast<const ColumnStruct*>(nested_file_column);
1183
1.93k
        DORIS_CHECK(table_type->get_elements().size() == mapping.child_mappings.size());
1184
1185
1.93k
        Columns child_columns;
1186
1.93k
        child_columns.reserve(mapping.child_mappings.size());
1187
1.93k
        const auto file_ordered_children =
1188
1.93k
                _present_child_mappings_in_file_order(mapping.child_mappings);
1189
1.93k
        const auto table_ordered_children =
1190
1.93k
                _child_mappings_in_table_type_order(mapping, *table_type);
1191
4.99k
        for (const auto* child_mapping : table_ordered_children) {
1192
4.99k
            DORIS_CHECK(child_mapping != nullptr);
1193
4.99k
            if (!child_mapping->file_local_id.has_value()) {
1194
1.96k
                child_columns.push_back(
1195
1.96k
                        child_mapping->table_type->create_column_const_with_default_value(rows)
1196
1.96k
                                ->convert_to_full_column_if_const());
1197
1.96k
                continue;
1198
1.96k
            }
1199
3.02k
            const auto file_child_idx =
1200
3.02k
                    _file_child_ordinal_for_mapping(mapping, *child_mapping, file_ordered_children);
1201
3.02k
            DORIS_CHECK(file_child_idx < file_struct->get_columns().size());
1202
3.02k
            ColumnPtr child_column = file_struct->get_column_ptr(file_child_idx);
1203
3.02k
            RETURN_IF_ERROR(_materialize_present_child_mapping_column(*child_mapping, child_column,
1204
3.02k
                                                                      rows, &child_column));
1205
3.02k
            child_columns.push_back(std::move(child_column));
1206
3.02k
        }
1207
1.93k
        MutableColumns mutable_child_columns;
1208
1.93k
        mutable_child_columns.reserve(child_columns.size());
1209
4.99k
        for (auto& child_column : child_columns) {
1210
4.99k
            mutable_child_columns.push_back(IColumn::mutate(std::move(child_column)));
1211
4.99k
        }
1212
1.93k
        auto result = ColumnStruct::create(std::move(mutable_child_columns));
1213
1.93k
        if (mapping.table_type->is_nullable()) {
1214
1.92k
            auto null_map = ColumnUInt8::create();
1215
1.92k
            auto& null_map_data = null_map->get_data();
1216
1.92k
            null_map_data.resize(rows);
1217
1.92k
            if (parent_null_map != nullptr) {
1218
1.92k
                DORIS_CHECK(parent_null_map->size() == rows);
1219
1.92k
                null_map_data.assign(parent_null_map->begin(), parent_null_map->end());
1220
1.92k
            } else {
1221
0
                std::fill(null_map_data.begin(), null_map_data.end(), 0);
1222
0
            }
1223
1.92k
            *column = ColumnNullable::create(std::move(result), std::move(null_map));
1224
1.92k
        } else {
1225
2
            *column = std::move(result);
1226
2
        }
1227
1.93k
        return Status::OK();
1228
1.93k
    }
1229
1230
    Status _materialize_array_mapping_column(const ColumnMapping& mapping,
1231
                                             const ColumnPtr& file_column, const size_t rows,
1232
1.34k
                                             ColumnPtr* column) {
1233
1.34k
        DORIS_CHECK(mapping.child_mappings.size() == 1);
1234
1.34k
        const auto full_file_column = file_column->convert_to_full_column_if_const();
1235
1.34k
        const NullMap* parent_null_map = nullptr;
1236
1.34k
        const auto* nested_file_column =
1237
1.34k
                _nested_column_if_nullable(full_file_column, &parent_null_map);
1238
1.34k
        const auto* file_array = assert_cast<const ColumnArray*>(nested_file_column);
1239
1.34k
        ColumnPtr nested_column = file_array->get_data_ptr();
1240
1.34k
        const auto& element_mapping = mapping.child_mappings[0];
1241
1.34k
        RETURN_IF_ERROR(_materialize_present_child_mapping_column(
1242
1.34k
                element_mapping, nested_column, nested_column->size(), &nested_column));
1243
1.34k
        auto offsets_column = file_array->get_offsets_ptr()->convert_to_full_column_if_const();
1244
1.34k
        auto result = ColumnArray::create(IColumn::mutate(std::move(nested_column)),
1245
1.34k
                                          IColumn::mutate(std::move(offsets_column)));
1246
1.34k
        if (mapping.table_type->is_nullable()) {
1247
1.34k
            auto null_map = ColumnUInt8::create();
1248
1.34k
            auto& null_map_data = null_map->get_data();
1249
1.34k
            null_map_data.resize(rows);
1250
1.34k
            if (parent_null_map != nullptr) {
1251
1.34k
                DORIS_CHECK(parent_null_map->size() == rows);
1252
1.34k
                null_map_data.assign(parent_null_map->begin(), parent_null_map->end());
1253
1.34k
            } else {
1254
0
                std::fill(null_map_data.begin(), null_map_data.end(), 0);
1255
0
            }
1256
1.34k
            *column = ColumnNullable::create(std::move(result), std::move(null_map));
1257
1.34k
        } else {
1258
0
            *column = std::move(result);
1259
0
        }
1260
1.34k
        return Status::OK();
1261
1.34k
    }
1262
1263
    Status _materialize_map_mapping_column(const ColumnMapping& mapping,
1264
                                           const ColumnPtr& file_column, const size_t rows,
1265
923
                                           ColumnPtr* column) {
1266
923
        const auto full_file_column = file_column->convert_to_full_column_if_const();
1267
923
        const NullMap* parent_null_map = nullptr;
1268
923
        const auto* nested_file_column =
1269
923
                _nested_column_if_nullable(full_file_column, &parent_null_map);
1270
923
        const auto* file_map = assert_cast<const ColumnMap*>(nested_file_column);
1271
923
        ColumnPtr key_column = file_map->get_keys_ptr();
1272
923
        ColumnPtr value_column = file_map->get_values_ptr();
1273
1274
923
        const ColumnMapping* key_mapping = nullptr;
1275
923
        const ColumnMapping* value_mapping = nullptr;
1276
1.84k
        for (const auto& child_mapping : mapping.child_mappings) {
1277
1.84k
            if (!child_mapping.file_local_id.has_value()) {
1278
0
                continue;
1279
0
            }
1280
1.84k
            if (*child_mapping.file_local_id == 0) {
1281
922
                key_mapping = &child_mapping;
1282
923
            } else if (*child_mapping.file_local_id == 1) {
1283
923
                value_mapping = &child_mapping;
1284
923
            }
1285
1.84k
        }
1286
1287
923
        if (key_mapping != nullptr) {
1288
922
            RETURN_IF_ERROR(_materialize_present_child_mapping_column(
1289
922
                    *key_mapping, key_column, key_column->size(), &key_column));
1290
922
        }
1291
923
        if (value_mapping != nullptr) {
1292
923
            RETURN_IF_ERROR(_materialize_present_child_mapping_column(
1293
923
                    *value_mapping, value_column, value_column->size(), &value_column));
1294
923
        }
1295
923
        auto offsets_column = file_map->get_offsets_ptr()->convert_to_full_column_if_const();
1296
923
        auto result = ColumnMap::create(IColumn::mutate(std::move(key_column)),
1297
923
                                        IColumn::mutate(std::move(value_column)),
1298
923
                                        IColumn::mutate(std::move(offsets_column)));
1299
923
        if (mapping.table_type->is_nullable()) {
1300
921
            auto null_map = ColumnUInt8::create();
1301
921
            auto& null_map_data = null_map->get_data();
1302
921
            null_map_data.resize(rows);
1303
921
            if (parent_null_map != nullptr) {
1304
921
                DORIS_CHECK(parent_null_map->size() == rows);
1305
921
                null_map_data.assign(parent_null_map->begin(), parent_null_map->end());
1306
921
            } else {
1307
0
                std::fill(null_map_data.begin(), null_map_data.end(), 0);
1308
0
            }
1309
921
            *column = ColumnNullable::create(std::move(result), std::move(null_map));
1310
921
        } else {
1311
2
            *column = std::move(result);
1312
2
        }
1313
923
        return Status::OK();
1314
923
    }
1315
1316
38.0k
    Status _open_mapping_exprs() {
1317
38.0k
        RowDescriptor row_desc;
1318
318k
        for (const auto& mapping : _data_reader.column_mapper->mappings()) {
1319
318k
            if (mapping.projection != nullptr) {
1320
304k
                RETURN_IF_ERROR(mapping.projection->prepare(_runtime_state, row_desc));
1321
304k
                RETURN_IF_ERROR(mapping.projection->open(_runtime_state));
1322
304k
            }
1323
318k
            if (mapping.default_expr != nullptr) {
1324
13.3k
                RETURN_IF_ERROR(mapping.default_expr->prepare(_runtime_state, row_desc));
1325
13.3k
                RETURN_IF_ERROR(mapping.default_expr->open(_runtime_state));
1326
13.3k
            }
1327
318k
        }
1328
38.0k
        return Status::OK();
1329
38.0k
    }
1330
1331
    Status _build_file_aggregate_request(TPushAggOp::type agg_type,
1332
7
                                         FileAggregateRequest* request) const {
1333
7
        DORIS_CHECK(request != nullptr);
1334
7
        DORIS_CHECK(_supports_aggregate_pushdown(agg_type));
1335
7
        request->agg_type = agg_type;
1336
7
        request->columns.clear();
1337
7
        if (agg_type == TPushAggOp::type::COUNT) {
1338
            // COUNT pushdown historically meant COUNT(*) and therefore carried no columns. For
1339
            // complex COUNT(col), materializing the full MAP/LIST/STRUCT value only to test the
1340
            // top-level NULL bit can be extremely expensive. When the scan projects exactly one
1341
            // directly-mapped complex column, pass that file column to the reader so formats such
1342
            // as Parquet can count the column shape from metadata/levels without decoding payload
1343
            // values like MAP value strings. Other COUNT cases stay on the existing row-count path
1344
            // to avoid changing count(*) semantics.
1345
2
            if (_data_reader.column_mapper->mappings().size() == 1) {
1346
2
                const auto& mapping = _data_reader.column_mapper->mappings()[0];
1347
2
                if (mapping.file_local_id.has_value() && mapping.file_type != nullptr &&
1348
2
                    is_complex_type(remove_nullable(mapping.file_type)->get_primitive_type()) &&
1349
2
                    mapping.virtual_column_type == TableVirtualColumnType::INVALID &&
1350
2
                    mapping.default_expr == nullptr) {
1351
0
                    FileAggregateRequest::Column column;
1352
0
                    column.projection =
1353
0
                            LocalColumnIndex::top_level(LocalColumnId(*mapping.file_local_id));
1354
0
                    request->columns.push_back(std::move(column));
1355
0
                }
1356
2
            }
1357
2
            return Status::OK();
1358
2
        }
1359
5
        request->columns.reserve(_data_reader.column_mapper->mappings().size());
1360
6
        for (const auto& mapping : _data_reader.column_mapper->mappings()) {
1361
6
            DORIS_CHECK(mapping.file_local_id.has_value());
1362
6
            FileAggregateRequest::Column column;
1363
6
            column.projection = LocalColumnIndex::top_level(LocalColumnId(*mapping.file_local_id));
1364
6
            if (!mapping.child_mappings.empty()) {
1365
1
                RETURN_IF_ERROR(build_aggregate_projection(mapping, &column.projection));
1366
1
            }
1367
6
            request->columns.push_back(std::move(column));
1368
6
        }
1369
5
        return Status::OK();
1370
5
    }
1371
1372
    Status _materialize_aggregate_pushdown_rows(TPushAggOp::type agg_type,
1373
                                                const FileAggregateResult& file_result,
1374
6
                                                Block* block) {
1375
6
        if (agg_type == TPushAggOp::type::COUNT) {
1376
            // COUNT pushdown is not a final count value. It emits `count` default rows so the
1377
            // upper COUNT(*) aggregate can count them and produce the final result, including
1378
            // zero rows when count is 0.
1379
2
            DORIS_CHECK(file_result.count >= 0);
1380
2
            return _materialize_count_rows(cast_set<size_t>(file_result.count), block);
1381
2
        }
1382
        // MIN/MAX pushdown emits two rows, min first and max second, for each projected column.
1383
        // The upper MIN/MAX aggregate consumes those two rows to produce the final aggregate value.
1384
4
        DORIS_CHECK(file_result.columns.size() == _data_reader.column_mapper->mappings().size());
1385
4
        DORIS_CHECK(block->columns() == _data_reader.column_mapper->mappings().size());
1386
4
        Block file_block;
1387
4
        file_block.reserve(_data_reader.file_block_layout.size());
1388
5
        for (const auto& column : _data_reader.file_block_layout) {
1389
5
            file_block.insert({column.type->create_column(), column.type, column.name});
1390
5
        }
1391
9
        for (size_t column_idx = 0; column_idx < file_result.columns.size(); ++column_idx) {
1392
5
            const auto& result_column = file_result.columns[column_idx];
1393
5
            if (!result_column.has_min || !result_column.has_max) {
1394
0
                return Status::NotSupported("Missing min/max aggregate result for column {}",
1395
0
                                            _projected_columns[column_idx].name);
1396
0
            }
1397
5
            bool found_file_column = false;
1398
6
            for (size_t block_position = 0; block_position < _data_reader.file_block_layout.size();
1399
6
                 ++block_position) {
1400
6
                if (_data_reader.file_block_layout[block_position].file_column_id ==
1401
6
                    file_result.columns[column_idx].projection.column_id()) {
1402
5
                    found_file_column = true;
1403
5
                    auto column = file_block.get_by_position(block_position)
1404
5
                                          .type->create_column()
1405
5
                                          ->assert_mutable();
1406
5
                    RETURN_IF_ERROR(_insert_aggregate_projection_value(
1407
5
                            file_result.columns[column_idx].projection, result_column.min_value,
1408
5
                            column.get()));
1409
5
                    RETURN_IF_ERROR(_insert_aggregate_projection_value(
1410
5
                            file_result.columns[column_idx].projection, result_column.max_value,
1411
5
                            column.get()));
1412
5
                    file_block.replace_by_position(block_position, std::move(column));
1413
5
                    break;
1414
5
                }
1415
6
            }
1416
5
            DORIS_CHECK(found_file_column);
1417
5
        }
1418
9
        for (size_t column_idx = 0; column_idx < _data_reader.column_mapper->mappings().size();
1419
5
             ++column_idx) {
1420
5
            ColumnPtr table_column;
1421
5
            RETURN_IF_ERROR(
1422
5
                    _materialize_mapping_column(_data_reader.column_mapper->mappings()[column_idx],
1423
5
                                                &file_block, 2, &table_column));
1424
5
            block->replace_by_position(column_idx, std::move(table_column));
1425
5
        }
1426
4
        return Status::OK();
1427
4
    }
1428
1429
    struct FileBlockColumn {
1430
        LocalColumnId file_column_id = LocalColumnId::invalid();
1431
        std::string name;
1432
        DataTypePtr type;
1433
    };
1434
1435
    struct DataReader {
1436
        std::unique_ptr<FileReader> reader;
1437
        std::unique_ptr<TableColumnMapper> column_mapper;
1438
        // Schema of the data file, also including virtual column (row position).
1439
        std::vector<ColumnDefinition> file_schema;
1440
        // Layout of the block returned by file reader, determined by column mapping and file
1441
        // schema. It is used for file reader to materialize columns into correct type and position.
1442
        std::vector<FileBlockColumn> file_block_layout;
1443
        Block block_template;
1444
    };
1445
    DataReader _data_reader;
1446
    std::vector<ColumnDefinition> _projected_columns;
1447
    std::unique_ptr<ScanTask> _current_task;
1448
    std::optional<io::FileDescription> _current_file_description;
1449
    // Range-level compression has higher priority than scan-param compression. TVF/load can keep
1450
    // the logical format as CSV/TEXT while carrying the concrete compression such as GZ or LZO on
1451
    // each TFileRangeDesc, matching the old FileScanner reader contract.
1452
    TFileCompressType::type _current_range_compress_type = TFileCompressType::UNKNOWN;
1453
    std::optional<TUniqueId> _current_range_load_id;
1454
    TFileRangeDesc _current_file_range_desc;
1455
    std::shared_ptr<io::FileSystemProperties> _system_properties;
1456
    // partition key -> value
1457
    std::map<std::string, Field> _partition_values;
1458
    // Predicates built from scan conjuncts before file-level localization.
1459
    std::vector<TableFilter> _table_filters;
1460
    TableColumnPredicates _table_column_predicates;
1461
    VExprContextSPtrs _conjuncts;
1462
    ReadProfile _profile;
1463
    // Parsed from row-position based delete files, including position delete and deletion vector.
1464
    DeleteRows* _delete_rows = nullptr;
1465
    TFileScanRangeParams* _scan_params;
1466
    std::shared_ptr<io::IOContext> _io_ctx;
1467
    RuntimeState* _runtime_state;
1468
    RuntimeProfile* _scanner_profile;
1469
    const std::vector<SlotDescriptor*>* _file_slot_descs = nullptr;
1470
    FileFormat _format;
1471
    TPushAggOp::type _push_down_agg_type = TPushAggOp::type::NONE;
1472
    size_t _batch_size = 0;
1473
    uint64_t _condition_cache_digest = 0;
1474
    segment_v2::ConditionCache::ExternalCacheKey _condition_cache_key;
1475
    std::shared_ptr<std::vector<bool>> _condition_cache;
1476
    std::shared_ptr<ConditionCacheContext> _condition_cache_ctx;
1477
    int64_t _condition_cache_hit_count = 0;
1478
    bool _current_reader_reached_eof = false;
1479
    int64_t _remaining_table_level_count = -1;
1480
    std::optional<GlobalRowIdContext> _global_rowid_context;
1481
    bool _aggregate_pushdown_tried = false;
1482
    TableColumnMapperOptions _mapper_options;
1483
1484
private:
1485
    static const ColumnDefinition* _find_column_definition(
1486
311k
            const std::vector<ColumnDefinition>& schema, LocalColumnId column_id) {
1487
7.35M
        for (const auto& field : schema) {
1488
7.35M
            if (field.file_local_id() == column_id.value()) {
1489
308k
                return &field;
1490
308k
            }
1491
7.35M
        }
1492
3.35k
        return nullptr;
1493
311k
    }
1494
1495
15
    static bool _can_push_down_minmax_for_mapping(const ColumnMapping& mapping) {
1496
15
        if (mapping.child_mappings.empty()) {
1497
12
            return true;
1498
12
        }
1499
3
        const auto primitive_type = remove_nullable(mapping.file_type)->get_primitive_type();
1500
3
        if (primitive_type != TYPE_STRUCT) {
1501
1
            return false;
1502
1
        }
1503
2
        size_t mapped_children = 0;
1504
2
        const ColumnMapping* mapped_child = nullptr;
1505
2
        for (const auto& child_mapping : mapping.child_mappings) {
1506
2
            if (!child_mapping.file_local_id.has_value()) {
1507
0
                continue;
1508
0
            }
1509
2
            ++mapped_children;
1510
2
            mapped_child = &child_mapping;
1511
2
        }
1512
2
        return mapped_children == 1 && mapped_child != nullptr &&
1513
2
               _can_push_down_minmax_for_mapping(*mapped_child);
1514
3
    }
1515
1516
    static Status build_aggregate_projection(const ColumnMapping& mapping,
1517
2
                                             LocalColumnIndex* projection) {
1518
2
        DORIS_CHECK(projection != nullptr);
1519
2
        DORIS_CHECK(mapping.file_local_id.has_value());
1520
2
        *projection = LocalColumnIndex::local(*mapping.file_local_id);
1521
2
        projection->children.clear();
1522
2
        projection->project_all_children = true;
1523
2
        if (mapping.child_mappings.empty()) {
1524
1
            return Status::OK();
1525
1
        }
1526
1
        projection->project_all_children = false;
1527
1
        for (const auto& child_mapping : mapping.child_mappings) {
1528
1
            if (!child_mapping.file_local_id.has_value()) {
1529
0
                continue;
1530
0
            }
1531
1
            LocalColumnIndex child_projection;
1532
1
            RETURN_IF_ERROR(build_aggregate_projection(child_mapping, &child_projection));
1533
1
            projection->children.push_back(std::move(child_projection));
1534
1
        }
1535
1
        DORIS_CHECK(projection->children.size() == 1);
1536
1
        return Status::OK();
1537
1
    }
1538
1539
    static Status _insert_aggregate_projection_value(const LocalColumnIndex& projection,
1540
24
                                                     const Field& value, IColumn* column) {
1541
24
        DORIS_CHECK(column != nullptr);
1542
24
        if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) {
1543
12
            RETURN_IF_ERROR(_insert_aggregate_projection_value(
1544
12
                    projection, value, &nullable_column->get_nested_column()));
1545
12
            nullable_column->get_null_map_data().push_back(0);
1546
12
            return Status::OK();
1547
12
        }
1548
12
        if (projection.project_all_children || projection.children.empty()) {
1549
10
            column->insert(value);
1550
10
            return Status::OK();
1551
10
        }
1552
2
        auto* struct_column = assert_cast<ColumnStruct*>(column);
1553
2
        DORIS_CHECK(projection.children.size() == 1);
1554
2
        const auto& child_projection = projection.children[0];
1555
2
        DORIS_CHECK(struct_column->get_columns().size() == 1);
1556
2
        RETURN_IF_ERROR(_insert_aggregate_projection_value(child_projection, value,
1557
2
                                                           &struct_column->get_column(0)));
1558
2
        return Status::OK();
1559
2
    }
1560
1561
    // Parse row-position deletes from table format specific parameters, and fill in _delete_rows.
1562
    Status _parse_delete_predicates(const SplitReadOptions& options);
1563
};
1564
1565
} // namespace doris::format