Coverage Report

Created: 2026-04-05 12:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/file_scanner.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/file_scanner.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Exprs_types.h>
22
#include <gen_cpp/Metrics_types.h>
23
#include <gen_cpp/Opcodes_types.h>
24
#include <gen_cpp/PaloInternalService_types.h>
25
#include <gen_cpp/PlanNodes_types.h>
26
#include <glog/logging.h>
27
28
#include <algorithm>
29
#include <boost/iterator/iterator_facade.hpp>
30
#include <map>
31
#include <ranges>
32
#include <tuple>
33
#include <unordered_map>
34
#include <utility>
35
36
#include "common/compiler_util.h" // IWYU pragma: keep
37
#include "common/config.h"
38
#include "common/consts.h"
39
#include "common/logging.h"
40
#include "common/status.h"
41
#include "core/block/column_with_type_and_name.h"
42
#include "core/block/columns_with_type_and_name.h"
43
#include "core/column/column.h"
44
#include "core/column/column_nullable.h"
45
#include "core/column/column_vector.h"
46
#include "core/data_type/data_type.h"
47
#include "core/data_type/data_type_nullable.h"
48
#include "core/data_type/data_type_string.h"
49
#include "core/string_ref.h"
50
#include "exec/common/stringop_substring.h"
51
#include "exec/rowid_fetcher.h"
52
#include "exec/scan/scan_node.h"
53
#include "exprs/aggregate/aggregate_function.h"
54
#include "exprs/function/function.h"
55
#include "exprs/function/simple_function_factory.h"
56
#include "exprs/vexpr.h"
57
#include "exprs/vexpr_context.h"
58
#include "exprs/vexpr_fwd.h"
59
#include "exprs/vslot_ref.h"
60
#include "format/arrow/arrow_stream_reader.h"
61
#include "format/count_reader.h"
62
#include "format/csv/csv_reader.h"
63
#include "format/json/new_json_reader.h"
64
#include "format/native/native_reader.h"
65
#include "format/orc/vorc_reader.h"
66
#include "format/parquet/vparquet_reader.h"
67
#include "format/table/hive_reader.h"
68
#include "format/table/hudi_jni_reader.h"
69
#include "format/table/hudi_reader.h"
70
#include "format/table/iceberg_reader.h"
71
#include "format/table/iceberg_sys_table_jni_reader.h"
72
#include "format/table/jdbc_jni_reader.h"
73
#include "format/table/max_compute_jni_reader.h"
74
#include "format/table/paimon_cpp_reader.h"
75
#include "format/table/paimon_jni_reader.h"
76
#include "format/table/paimon_predicate_converter.h"
77
#include "format/table/paimon_reader.h"
78
#include "format/table/remote_doris_reader.h"
79
#include "format/table/transactional_hive_reader.h"
80
#include "format/table/trino_connector_jni_reader.h"
81
#include "format/text/text_reader.h"
82
#include "io/cache/block_file_cache_profile.h"
83
#include "load/group_commit/wal/wal_reader.h"
84
#include "runtime/descriptors.h"
85
#include "runtime/runtime_profile.h"
86
#include "runtime/runtime_state.h"
87
88
namespace cctz {
89
class time_zone;
90
} // namespace cctz
91
namespace doris {
92
class ShardedKVCache;
93
} // namespace doris
94
95
namespace doris {
96
#include "common/compile_check_begin.h"
97
using namespace ErrorCode;
98
99
const std::string FileScanner::FileReadBytesProfile = "FileReadBytes";
100
const std::string FileScanner::FileReadTimeProfile = "FileReadTime";
101
102
FileScanner::FileScanner(RuntimeState* state, FileScanLocalState* local_state, int64_t limit,
103
                         std::shared_ptr<SplitSourceConnector> split_source,
104
                         RuntimeProfile* profile, ShardedKVCache* kv_cache,
105
                         const std::unordered_map<std::string, int>* colname_to_slot_id)
106
14
        : Scanner(state, local_state, limit, profile),
107
14
          _split_source(split_source),
108
14
          _cur_reader(nullptr),
109
14
          _cur_reader_eof(false),
110
14
          _kv_cache(kv_cache),
111
14
          _strict_mode(false),
112
14
          _col_name_to_slot_id(colname_to_slot_id) {
113
14
    if (state->get_query_ctx() != nullptr &&
114
14
        state->get_query_ctx()->file_scan_range_params_map.count(local_state->parent_id()) > 0) {
115
4
        _params = &(state->get_query_ctx()->file_scan_range_params_map[local_state->parent_id()]);
116
10
    } else {
117
        // old fe thrift protocol
118
10
        _params = _split_source->get_params();
119
10
    }
120
14
    if (_params->__isset.strict_mode) {
121
8
        _strict_mode = _params->strict_mode;
122
8
    }
123
124
    // For load scanner, there are input and output tuple.
125
    // For query scanner, there is only output tuple
126
14
    _input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_params->src_tuple_id);
127
14
    _real_tuple_desc = _input_tuple_desc == nullptr ? _output_tuple_desc : _input_tuple_desc;
128
14
    _is_load = (_input_tuple_desc != nullptr);
129
14
    _configure_file_scan_handlers();
130
14
}
131
132
30
void FileScanner::_configure_file_scan_handlers() {
133
30
    if (_is_load) {
134
10
        _init_src_block_handler = &FileScanner::_init_src_block_for_load;
135
10
        _process_src_block_after_read_handler =
136
10
                &FileScanner::_process_src_block_after_read_for_load;
137
10
        _should_push_down_predicates_handler = &FileScanner::_should_push_down_predicates_for_load;
138
10
        _should_enable_condition_cache_handler =
139
10
                &FileScanner::_should_enable_condition_cache_for_load;
140
20
    } else {
141
20
        _init_src_block_handler = &FileScanner::_init_src_block_for_query;
142
20
        _process_src_block_after_read_handler =
143
20
                &FileScanner::_process_src_block_after_read_for_query;
144
20
        _should_push_down_predicates_handler = &FileScanner::_should_push_down_predicates_for_query;
145
20
        _should_enable_condition_cache_handler =
146
20
                &FileScanner::_should_enable_condition_cache_for_query;
147
20
    }
148
30
}
149
150
14
Status FileScanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) {
151
14
    RETURN_IF_ERROR(Scanner::init(state, conjuncts));
152
14
    _get_block_timer =
153
14
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerGetBlockTime", 1);
154
14
    _cast_to_input_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
155
14
                                                      "FileScannerCastInputBlockTime", 1);
156
14
    _fill_missing_columns_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
157
14
                                                       "FileScannerFillMissingColumnTime", 1);
158
14
    _pre_filter_timer =
159
14
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerPreFilterTimer", 1);
160
14
    _convert_to_output_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
161
14
                                                          "FileScannerConvertOuputBlockTime", 1);
162
14
    _runtime_filter_partition_prune_timer = ADD_TIMER_WITH_LEVEL(
163
14
            _local_state->scanner_profile(), "FileScannerRuntimeFilterPartitionPruningTime", 1);
164
14
    _empty_file_counter =
165
14
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "EmptyFileNum", TUnit::UNIT, 1);
166
14
    _not_found_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
167
14
                                                     "NotFoundFileNum", TUnit::UNIT, 1);
168
14
    _fully_skipped_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
169
14
                                                         "FullySkippedFileNum", TUnit::UNIT, 1);
170
14
    _file_counter =
171
14
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT, 1);
172
173
14
    _file_read_bytes_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
174
14
                                                      FileReadBytesProfile, TUnit::BYTES, 1);
175
14
    _file_read_calls_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
176
14
                                                      "FileReadCalls", TUnit::UNIT, 1);
177
14
    _file_read_time_counter =
178
14
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), FileReadTimeProfile, 1);
179
180
14
    _runtime_filter_partition_pruned_range_counter =
181
14
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
182
14
                                   "RuntimeFilterPartitionPrunedRangeNum", TUnit::UNIT, 1);
183
184
14
    _file_cache_statistics.reset(new io::FileCacheStatistics());
185
14
    _file_reader_stats.reset(new io::FileReaderStats());
186
187
14
    RETURN_IF_ERROR(_init_io_ctx());
188
14
    _io_ctx->file_cache_stats = _file_cache_statistics.get();
189
14
    _io_ctx->file_reader_stats = _file_reader_stats.get();
190
14
    _io_ctx->is_disposable = _state->query_options().disable_file_cache;
191
192
14
    if (_is_load) {
193
8
        _src_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
194
8
                                              std::vector<TupleId>({_input_tuple_desc->id()})));
195
        // prepare pre filters
196
8
        if (_params->__isset.pre_filter_exprs_list) {
197
0
            RETURN_IF_ERROR(doris::VExpr::create_expr_trees(_params->pre_filter_exprs_list,
198
0
                                                            _pre_conjunct_ctxs));
199
8
        } else if (_params->__isset.pre_filter_exprs) {
200
0
            VExprContextSPtr context;
201
0
            RETURN_IF_ERROR(doris::VExpr::create_expr_tree(_params->pre_filter_exprs, context));
202
0
            _pre_conjunct_ctxs.emplace_back(context);
203
0
        }
204
205
8
        for (auto& conjunct : _pre_conjunct_ctxs) {
206
0
            RETURN_IF_ERROR(conjunct->prepare(_state, *_src_row_desc));
207
0
            RETURN_IF_ERROR(conjunct->open(_state));
208
0
        }
209
210
8
        _dest_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
211
8
                                               std::vector<TupleId>({_output_tuple_desc->id()})));
212
8
    }
213
214
14
    _default_val_row_desc.reset(
215
14
            new RowDescriptor(_state->desc_tbl(), std::vector<TupleId>({_real_tuple_desc->id()})));
216
217
14
    return Status::OK();
218
14
}
219
220
// check if the expr is a partition pruning expr
221
0
bool FileScanner::_check_partition_prune_expr(const VExprSPtr& expr) {
222
0
    if (expr->is_slot_ref()) {
223
0
        auto* slot_ref = static_cast<VSlotRef*>(expr.get());
224
0
        return _partition_slot_index_map.find(slot_ref->slot_id()) !=
225
0
               _partition_slot_index_map.end();
226
0
    }
227
0
    if (expr->is_literal()) {
228
0
        return true;
229
0
    }
230
0
    return std::ranges::all_of(expr->children(), [this](const auto& child) {
231
0
        return _check_partition_prune_expr(child);
232
0
    });
233
0
}
234
235
0
void FileScanner::_init_runtime_filter_partition_prune_ctxs() {
236
0
    _runtime_filter_partition_prune_ctxs.clear();
237
0
    for (auto& conjunct : _conjuncts) {
238
0
        auto impl = conjunct->root()->get_impl();
239
        // If impl is not null, which means this a conjuncts from runtime filter.
240
0
        auto expr = impl ? impl : conjunct->root();
241
0
        if (_check_partition_prune_expr(expr)) {
242
0
            _runtime_filter_partition_prune_ctxs.emplace_back(conjunct);
243
0
        }
244
0
    }
245
0
}
246
247
0
void FileScanner::_init_runtime_filter_partition_prune_block() {
248
    // init block with empty column
249
0
    for (auto const* slot_desc : _real_tuple_desc->slots()) {
250
0
        _runtime_filter_partition_prune_block.insert(
251
0
                ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
252
0
                                      slot_desc->get_data_type_ptr(), slot_desc->col_name()));
253
0
    }
254
0
}
255
256
0
Status FileScanner::_process_runtime_filters_partition_prune(bool& can_filter_all) {
257
0
    SCOPED_TIMER(_runtime_filter_partition_prune_timer);
258
0
    if (_runtime_filter_partition_prune_ctxs.empty() || _partition_col_descs.empty()) {
259
0
        return Status::OK();
260
0
    }
261
0
    size_t partition_value_column_size = 1;
262
263
    // 1. Get partition key values to string columns.
264
0
    std::unordered_map<SlotId, MutableColumnPtr> partition_slot_id_to_column;
265
0
    for (auto const& partition_col_desc : _partition_col_descs) {
266
0
        const auto& [partition_value, partition_slot_desc] = partition_col_desc.second;
267
0
        auto data_type = partition_slot_desc->get_data_type_ptr();
268
0
        auto test_serde = data_type->get_serde();
269
0
        auto partition_value_column = data_type->create_column();
270
0
        auto* col_ptr = static_cast<IColumn*>(partition_value_column.get());
271
0
        Slice slice(partition_value.data(), partition_value.size());
272
0
        uint64_t num_deserialized = 0;
273
0
        DataTypeSerDe::FormatOptions options {};
274
0
        if (_partition_value_is_null.contains(partition_slot_desc->col_name())) {
275
            // for iceberg/paimon table
276
            // NOTICE: column is always be nullable for iceberg/paimon table now
277
0
            DCHECK(data_type->is_nullable());
278
0
            test_serde = test_serde->get_nested_serdes()[0];
279
0
            auto* null_column = assert_cast<ColumnNullable*>(col_ptr);
280
0
            if (_partition_value_is_null[partition_slot_desc->col_name()]) {
281
0
                null_column->insert_many_defaults(partition_value_column_size);
282
0
            } else {
283
                // If the partition value is not null, we set null map to 0 and deserialize it normally.
284
0
                null_column->get_null_map_column().insert_many_vals(0, partition_value_column_size);
285
0
                RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
286
0
                        null_column->get_nested_column(), slice, partition_value_column_size,
287
0
                        &num_deserialized, options));
288
0
            }
289
0
        } else {
290
            // for hive/hudi table, the null value is set as "\\N"
291
            // TODO: this will be unified as iceberg/paimon table in the future
292
0
            RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
293
0
                    *col_ptr, slice, partition_value_column_size, &num_deserialized, options));
294
0
        }
295
296
0
        partition_slot_id_to_column[partition_slot_desc->id()] = std::move(partition_value_column);
297
0
    }
298
299
    // 2. Fill _runtime_filter_partition_prune_block from the partition column, then execute conjuncts and filter block.
300
    // 2.1 Fill _runtime_filter_partition_prune_block from the partition column to match the conjuncts executing.
301
0
    size_t index = 0;
302
0
    bool first_column_filled = false;
303
0
    for (auto const* slot_desc : _real_tuple_desc->slots()) {
304
0
        if (partition_slot_id_to_column.find(slot_desc->id()) !=
305
0
            partition_slot_id_to_column.end()) {
306
0
            auto data_type = slot_desc->get_data_type_ptr();
307
0
            auto partition_value_column = std::move(partition_slot_id_to_column[slot_desc->id()]);
308
0
            if (data_type->is_nullable()) {
309
0
                _runtime_filter_partition_prune_block.insert(
310
0
                        index, ColumnWithTypeAndName(
311
0
                                       ColumnNullable::create(
312
0
                                               std::move(partition_value_column),
313
0
                                               ColumnUInt8::create(partition_value_column_size, 0)),
314
0
                                       data_type, slot_desc->col_name()));
315
0
            } else {
316
0
                _runtime_filter_partition_prune_block.insert(
317
0
                        index, ColumnWithTypeAndName(std::move(partition_value_column), data_type,
318
0
                                                     slot_desc->col_name()));
319
0
            }
320
0
            if (index == 0) {
321
0
                first_column_filled = true;
322
0
            }
323
0
        }
324
0
        index++;
325
0
    }
326
327
    // 2.2 Execute conjuncts.
328
0
    if (!first_column_filled) {
329
        // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
330
        // The following process may be tricky and time-consuming, but we have no other way.
331
0
        _runtime_filter_partition_prune_block.get_by_position(0).column->assume_mutable()->resize(
332
0
                partition_value_column_size);
333
0
    }
334
0
    IColumn::Filter result_filter(_runtime_filter_partition_prune_block.rows(), 1);
335
0
    RETURN_IF_ERROR(VExprContext::execute_conjuncts(_runtime_filter_partition_prune_ctxs, nullptr,
336
0
                                                    &_runtime_filter_partition_prune_block,
337
0
                                                    &result_filter, &can_filter_all));
338
0
    return Status::OK();
339
0
}
340
341
1
Status FileScanner::_process_conjuncts() {
342
1
    _slot_id_to_filter_conjuncts.clear();
343
1
    _not_single_slot_filter_conjuncts.clear();
344
1
    for (auto& conjunct : _push_down_conjuncts) {
345
1
        auto impl = conjunct->root()->get_impl();
346
        // If impl is not null, which means this a conjuncts from runtime filter.
347
1
        auto cur_expr = impl ? impl : conjunct->root();
348
349
1
        std::vector<int> slot_ids;
350
1
        _get_slot_ids(cur_expr.get(), &slot_ids);
351
1
        if (slot_ids.empty()) {
352
1
            _not_single_slot_filter_conjuncts.emplace_back(conjunct);
353
1
            continue;
354
1
        }
355
0
        bool single_slot = true;
356
0
        for (int i = 1; i < slot_ids.size(); i++) {
357
0
            if (slot_ids[i] != slot_ids[0]) {
358
0
                single_slot = false;
359
0
                break;
360
0
            }
361
0
        }
362
0
        if (single_slot) {
363
0
            SlotId slot_id = slot_ids[0];
364
0
            _slot_id_to_filter_conjuncts[slot_id].emplace_back(conjunct);
365
0
        } else {
366
0
            _not_single_slot_filter_conjuncts.emplace_back(conjunct);
367
0
        }
368
0
    }
369
1
    return Status::OK();
370
1
}
371
372
1
Status FileScanner::_process_late_arrival_conjuncts() {
373
1
    if (_push_down_conjuncts.size() < _conjuncts.size()) {
374
1
        _push_down_conjuncts = _conjuncts;
375
        // Do not clear _conjuncts here!
376
        // We must keep it for fallback filtering, especially when mixing
377
        // Native readers (which use _push_down_conjuncts) and JNI readers (which rely on _conjuncts).
378
        // _conjuncts.clear();
379
1
        RETURN_IF_ERROR(_process_conjuncts());
380
1
    }
381
1
    if (_applied_rf_num == _total_rf_num) {
382
1
        _local_state->scanner_profile()->add_info_string("ApplyAllRuntimeFilters", "True");
383
1
    }
384
1
    return Status::OK();
385
1
}
386
387
1
void FileScanner::_get_slot_ids(VExpr* expr, std::vector<int>* slot_ids) {
388
1
    for (auto& child_expr : expr->children()) {
389
0
        if (child_expr->is_slot_ref()) {
390
0
            VSlotRef* slot_ref = reinterpret_cast<VSlotRef*>(child_expr.get());
391
0
            SlotDescriptor* slot_desc = _state->desc_tbl().get_slot_descriptor(slot_ref->slot_id());
392
0
            slot_desc->set_is_predicate(true);
393
0
            slot_ids->emplace_back(slot_ref->slot_id());
394
0
        } else {
395
0
            _get_slot_ids(child_expr.get(), slot_ids);
396
0
        }
397
0
    }
398
1
}
399
400
12
Status FileScanner::_open_impl(RuntimeState* state) {
401
12
    RETURN_IF_CANCELLED(state);
402
12
    RETURN_IF_ERROR(Scanner::_open_impl(state));
403
12
    if (_local_state) {
404
12
        _condition_cache_digest = _local_state->get_condition_cache_digest();
405
12
    }
406
12
    RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range));
407
12
    if (_first_scan_range) {
408
12
        RETURN_IF_ERROR(_init_expr_ctxes());
409
12
        if (_state->query_options().enable_runtime_filter_partition_prune &&
410
12
            !_partition_slot_index_map.empty()) {
411
0
            _init_runtime_filter_partition_prune_ctxs();
412
0
            _init_runtime_filter_partition_prune_block();
413
0
        }
414
12
    } else {
415
        // there's no scan range in split source. stop scanner directly.
416
0
        _scanner_eof = true;
417
0
    }
418
419
12
    return Status::OK();
420
12
}
421
422
271
Status FileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
423
271
    Status st = _get_block_wrapped(state, block, eof);
424
425
271
    if (!st.ok()) {
426
        // add cur path in error msg for easy debugging
427
1
        return std::move(st.append(". cur path: " + get_current_scan_range_name()));
428
1
    }
429
270
    return st;
430
271
}
431
432
// For query:
433
//                              [exist cols]  [non-exist cols]  [col from path]  input  output
434
//                              A     B    C  D                 E
435
// _init_src_block              x     x    x  x                 x                -      x
436
// get_next_block               x     x    x  -                 -                -      x
437
// _cast_to_input_block         -     -    -  -                 -                -      -
438
// _fill_columns_from_path      -     -    -  -                 x                -      x
439
// _fill_missing_columns        -     -    -  x                 -                -      x
440
// _convert_to_output_block     -     -    -  -                 -                -      -
441
//
442
// For load:
443
//                              [exist cols]  [non-exist cols]  [col from path]  input  output
444
//                              A     B    C  D                 E
445
// _init_src_block              x     x    x  x                 x                x      -
446
// get_next_block               x     x    x  -                 -                x      -
447
// _cast_to_input_block         x     x    x  -                 -                x      -
448
// _fill_columns_from_path      -     -    -  -                 x                x      -
449
// _fill_missing_columns        -     -    -  x                 -                x      -
450
// _convert_to_output_block     -     -    -  -                 -                -      x
451
271
Status FileScanner::_get_block_wrapped(RuntimeState* state, Block* block, bool* eof) {
452
271
    do {
453
271
        RETURN_IF_CANCELLED(state);
454
271
        if (_cur_reader == nullptr || _cur_reader_eof) {
455
25
            _finalize_reader_condition_cache();
456
            // The file may not exist because the file list is got from meta cache,
457
            // And the file may already be removed from storage.
458
            // Just ignore not found files.
459
25
            Status st = _get_next_reader();
460
25
            if (st.is<ErrorCode::NOT_FOUND>() && config::ignore_not_found_file_in_external_table) {
461
0
                _cur_reader_eof = true;
462
0
                COUNTER_UPDATE(_not_found_file_counter, 1);
463
0
                continue;
464
25
            } else if (st.is<ErrorCode::END_OF_FILE>()) {
465
0
                _cur_reader_eof = true;
466
0
                COUNTER_UPDATE(_fully_skipped_file_counter, 1);
467
0
                continue;
468
25
            } else if (!st) {
469
1
                return st;
470
1
            }
471
24
            _init_reader_condition_cache();
472
24
        }
473
474
270
        if (_scanner_eof) {
475
12
            *eof = true;
476
12
            return Status::OK();
477
12
        }
478
479
        // Init src block for load job based on the data file schema (e.g. parquet)
480
        // For query job, simply set _src_block_ptr to block.
481
258
        size_t read_rows = 0;
482
258
        RETURN_IF_ERROR(_init_src_block(block));
483
258
        {
484
258
            SCOPED_TIMER(_get_block_timer);
485
486
            // Read next block.
487
            // Some of column in block may not be filled (column not exist in file)
488
258
            RETURN_IF_ERROR(
489
258
                    _cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof));
490
258
        }
491
        // use read_rows instead of _src_block_ptr->rows(), because the first column of _src_block_ptr
492
        // may not be filled after calling `get_next_block()`, so _src_block_ptr->rows() may return wrong result.
493
258
        if (read_rows > 0) {
494
235
            if ((!_cur_reader->count_read_rows()) && _io_ctx) {
495
210
                _io_ctx->file_reader_stats->read_rows += read_rows;
496
210
            }
497
            // If the push_down_agg_type is COUNT, no need to do the rest,
498
            // because we only save a number in block.
499
235
            if (_get_push_down_agg_type() != TPushAggOp::type::COUNT) {
500
235
                RETURN_IF_ERROR(_process_src_block_after_read(block));
501
235
            }
502
235
        }
503
258
        break;
504
258
    } while (true);
505
506
    // Update filtered rows and unselected rows for load, reset counter.
507
    // {
508
    //     state->update_num_rows_load_filtered(_counter.num_rows_filtered);
509
    //     state->update_num_rows_load_unselected(_counter.num_rows_unselected);
510
    //     _reset_counter();
511
    // }
512
258
    return Status::OK();
513
271
}
514
515
/**
516
 * Check whether there are complex types in parquet/orc reader in broker/stream load.
517
 * Broker/stream load will cast any type as string type, and complex types will be casted wrong.
518
 * This is a temporary method, and will be replaced by tvf.
519
 */
520
16
Status FileScanner::_check_output_block_types() {
521
    // Only called from _init_src_block_for_load, so _is_load is always true.
522
16
    TFileFormatType::type format_type = _params->format_type;
523
16
    if (format_type == TFileFormatType::FORMAT_PARQUET ||
524
16
        format_type == TFileFormatType::FORMAT_ORC) {
525
0
        for (auto slot : _output_tuple_desc->slots()) {
526
0
            if (is_complex_type(slot->type()->get_primitive_type())) {
527
0
                return Status::InternalError(
528
0
                        "Parquet/orc doesn't support complex types in broker/stream load, "
529
0
                        "please use tvf(table value function) to insert complex types.");
530
0
            }
531
0
        }
532
0
    }
533
16
    return Status::OK();
534
16
}
535
536
258
Status FileScanner::_init_src_block(Block* block) {
537
258
    DCHECK(_init_src_block_handler != nullptr);
538
258
    return (this->*_init_src_block_handler)(block);
539
258
}
540
541
242
Status FileScanner::_init_src_block_for_query(Block* block) {
542
242
    _src_block_ptr = block;
543
544
    // Build name to index map only once on first call.
545
242
    if (_src_block_name_to_idx.empty()) {
546
20
        _src_block_name_to_idx = block->get_name_to_pos_map();
547
20
    }
548
242
    return Status::OK();
549
242
}
550
551
16
Status FileScanner::_init_src_block_for_load(Block* block) {
552
16
    static_cast<void>(block);
553
16
    RETURN_IF_ERROR(_check_output_block_types());
554
555
    // if (_src_block_init) {
556
    //     _src_block.clear_column_data();
557
    //     _src_block_ptr = &_src_block;
558
    //     return Status::OK();
559
    // }
560
561
16
    _src_block.clear();
562
16
    uint32_t idx = 0;
563
    // slots in _input_tuple_desc contains all slots describe in load statement, eg:
564
    // -H "columns: k1, k2, tmp1, k3 = tmp1 + 1"
565
    // _input_tuple_desc will contains: k1, k2, tmp1
566
    // and some of them are from file, such as k1 and k2, and some of them may not exist in file, such as tmp1
567
    // _input_tuple_desc also contains columns from path
568
564
    for (auto& slot : _input_tuple_desc->slots()) {
569
564
        DataTypePtr data_type;
570
564
        auto it = _slot_lower_name_to_col_type.find(slot->col_name());
571
564
        if (slot->is_skip_bitmap_col()) {
572
0
            _skip_bitmap_col_idx = idx;
573
0
        }
574
564
        if (_params->__isset.sequence_map_col) {
575
0
            if (_params->sequence_map_col == slot->col_name()) {
576
0
                _sequence_map_col_uid = slot->col_unique_id();
577
0
            }
578
0
        }
579
564
        data_type =
580
564
                it == _slot_lower_name_to_col_type.end() ? slot->type() : make_nullable(it->second);
581
564
        MutableColumnPtr data_column = data_type->create_column();
582
564
        _src_block.insert(
583
564
                ColumnWithTypeAndName(std::move(data_column), data_type, slot->col_name()));
584
564
        _src_block_name_to_idx.emplace(slot->col_name(), idx++);
585
564
    }
586
16
    if (_params->__isset.sequence_map_col) {
587
0
        for (const auto& slot : _output_tuple_desc->slots()) {
588
            // When the target table has seqeunce map column, _input_tuple_desc will not contains __DORIS_SEQUENCE_COL__,
589
            // so we should get its column unique id from _output_tuple_desc
590
0
            if (slot->is_sequence_col()) {
591
0
                _sequence_col_uid = slot->col_unique_id();
592
0
            }
593
0
        }
594
0
    }
595
16
    _src_block_ptr = &_src_block;
596
16
    _src_block_init = true;
597
16
    return Status::OK();
598
16
}
599
600
8
Status FileScanner::_cast_to_input_block(Block* block) {
601
    // Only called from _process_src_block_after_read_for_load, so _is_load is always true.
602
8
    SCOPED_TIMER(_cast_to_input_block_timer);
603
    // cast primitive type(PT0) to primitive type(PT1)
604
8
    uint32_t idx = 0;
605
282
    for (auto& slot_desc : _input_tuple_desc->slots()) {
606
282
        if (_slot_lower_name_to_col_type.find(slot_desc->col_name()) ==
607
282
            _slot_lower_name_to_col_type.end()) {
608
            // skip columns which does not exist in file
609
0
            continue;
610
0
        }
611
282
        auto& arg = _src_block_ptr->get_by_position(_src_block_name_to_idx[slot_desc->col_name()]);
612
282
        auto return_type = slot_desc->get_data_type_ptr();
613
        // remove nullable here, let the get_function decide whether nullable
614
282
        auto data_type = get_data_type_with_default_argument(remove_nullable(return_type));
615
282
        ColumnsWithTypeAndName arguments {
616
282
                arg, {data_type->create_column(), data_type, slot_desc->col_name()}};
617
282
        auto func_cast =
618
282
                SimpleFunctionFactory::instance().get_function("CAST", arguments, return_type, {});
619
282
        if (!func_cast) {
620
0
            return Status::InternalError("Function CAST[arg={}, col name={}, return={}] not found!",
621
0
                                         arg.type->get_name(), slot_desc->col_name(),
622
0
                                         return_type->get_name());
623
0
        }
624
282
        idx = _src_block_name_to_idx[slot_desc->col_name()];
625
282
        DCHECK(_state != nullptr);
626
282
        auto ctx = FunctionContext::create_context(_state, {}, {});
627
282
        RETURN_IF_ERROR(
628
282
                func_cast->execute(ctx.get(), *_src_block_ptr, {idx}, idx, arg.column->size()));
629
282
        _src_block_ptr->get_by_position(idx).type = std::move(return_type);
630
282
    }
631
8
    return Status::OK();
632
8
}
633
634
8
Status FileScanner::_pre_filter_src_block() {
635
    // Only called from _process_src_block_after_read_for_load, so _is_load is always true.
636
8
    if (!_pre_conjunct_ctxs.empty()) {
637
0
        SCOPED_TIMER(_pre_filter_timer);
638
0
        auto origin_column_num = _src_block_ptr->columns();
639
0
        auto old_rows = _src_block_ptr->rows();
640
0
        RETURN_IF_ERROR(
641
0
                VExprContext::filter_block(_pre_conjunct_ctxs, _src_block_ptr, origin_column_num));
642
0
        _counter.num_rows_unselected += old_rows - _src_block_ptr->rows();
643
0
    }
644
8
    return Status::OK();
645
8
}
646
647
8
Status FileScanner::_convert_to_output_block(Block* block) {
648
    // Only called from _process_src_block_after_read_for_load, so _is_load is always true.
649
8
    SCOPED_TIMER(_convert_to_output_block_timer);
650
    // The block is passed from scanner context's free blocks,
651
    // which is initialized by output columns
652
    // so no need to clear it
653
    // block->clear();
654
655
8
    int ctx_idx = 0;
656
8
    size_t rows = _src_block_ptr->rows();
657
8
    auto filter_column = ColumnUInt8::create(rows, 1);
658
8
    auto& filter_map = filter_column->get_data();
659
660
    // After convert, the column_ptr should be copied into output block.
661
    // Can not use block->insert() because it may cause use_count() non-zero bug
662
8
    MutableBlock mutable_output_block =
663
8
            VectorizedUtils::build_mutable_mem_reuse_block(block, *_dest_row_desc);
664
8
    auto& mutable_output_columns = mutable_output_block.mutable_columns();
665
666
8
    std::vector<BitmapValue>* skip_bitmaps {nullptr};
667
8
    if (_should_process_skip_bitmap_col()) {
668
0
        auto* skip_bitmap_nullable_col_ptr =
669
0
                assert_cast<ColumnNullable*>(_src_block_ptr->get_by_position(_skip_bitmap_col_idx)
670
0
                                                     .column->assume_mutable()
671
0
                                                     .get());
672
0
        skip_bitmaps = &(assert_cast<ColumnBitmap*>(
673
0
                                 skip_bitmap_nullable_col_ptr->get_nested_column_ptr().get())
674
0
                                 ->get_data());
675
        // NOTE:
676
        // - If the table has sequence type column, __DORIS_SEQUENCE_COL__ will be put in _input_tuple_desc, so whether
677
        //   __DORIS_SEQUENCE_COL__ will be marked in skip bitmap depends on whether it's specified in that row
678
        // - If the table has sequence map column, __DORIS_SEQUENCE_COL__ will not be put in _input_tuple_desc,
679
        //   so __DORIS_SEQUENCE_COL__ will be ommited if it't specified in a row and will not be marked in skip bitmap.
680
        //   So we should mark __DORIS_SEQUENCE_COL__ in skip bitmap here if the corresponding sequence map column us marked
681
0
        if (_sequence_map_col_uid != -1) {
682
0
            for (int j = 0; j < rows; ++j) {
683
0
                if ((*skip_bitmaps)[j].contains(_sequence_map_col_uid)) {
684
0
                    (*skip_bitmaps)[j].add(_sequence_col_uid);
685
0
                }
686
0
            }
687
0
        }
688
0
    }
689
690
    // for (auto slot_desc : _output_tuple_desc->slots()) {
691
352
    for (int j = 0; j < mutable_output_columns.size(); ++j) {
692
344
        auto* slot_desc = _output_tuple_desc->slots()[j];
693
344
        int dest_index = ctx_idx;
694
344
        ColumnPtr column_ptr;
695
696
344
        auto& ctx = _dest_vexpr_ctx[dest_index];
697
        // PT1 => dest primitive type
698
344
        RETURN_IF_ERROR(ctx->execute(_src_block_ptr, column_ptr));
699
        // column_ptr maybe a ColumnConst, convert it to a normal column
700
344
        column_ptr = column_ptr->convert_to_full_column_if_const();
701
344
        DCHECK(column_ptr);
702
703
        // because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr
704
        // is likely to be nullable
705
344
        if (LIKELY(column_ptr->is_nullable())) {
706
344
            const auto* nullable_column = reinterpret_cast<const ColumnNullable*>(column_ptr.get());
707
28.6k
            for (int i = 0; i < rows; ++i) {
708
28.2k
                if (filter_map[i] && nullable_column->is_null_at(i)) {
709
                    // skip checks for non-mentioned columns in flexible partial update
710
19.5k
                    if (skip_bitmaps == nullptr ||
711
19.5k
                        !skip_bitmaps->at(i).contains(slot_desc->col_unique_id())) {
712
                        // clang-format off
713
19.5k
                        if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) &&
714
19.5k
                            !_src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index]).column->is_null_at(i)) {
715
0
                            filter_map[i] = false;
716
0
                            RETURN_IF_ERROR(_state->append_error_msg_to_file(
717
0
                                [&]() -> std::string {
718
0
                                    return _src_block_ptr->dump_one_line(i, _num_of_columns_from_file);
719
0
                                },
720
0
                                [&]() -> std::string {
721
0
                                    auto raw_value =
722
0
                                            _src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index]).column->get_data_at(i);
723
0
                                    std::string raw_string = raw_value.to_string();
724
0
                                    fmt::memory_buffer error_msg;
725
0
                                    fmt::format_to(error_msg,"column({}) value is incorrect while strict mode is {}, src value is {}",
726
0
                                            slot_desc->col_name(), _strict_mode, raw_string);
727
0
                                    return fmt::to_string(error_msg);
728
0
                                }));
729
19.5k
                        } else if (!slot_desc->is_nullable()) {
730
0
                            filter_map[i] = false;
731
0
                            RETURN_IF_ERROR(_state->append_error_msg_to_file(
732
0
                                [&]() -> std::string {
733
0
                                    return _src_block_ptr->dump_one_line(i, _num_of_columns_from_file);
734
0
                                },
735
0
                                [&]() -> std::string {
736
0
                                    fmt::memory_buffer error_msg;
737
0
                                    fmt::format_to(error_msg, "column({}) values is null while columns is not nullable", slot_desc->col_name());
738
0
                                    return fmt::to_string(error_msg);
739
0
                                }));
740
0
                        }
741
                        // clang-format on
742
19.5k
                    }
743
19.5k
                }
744
28.2k
            }
745
344
            if (!slot_desc->is_nullable()) {
746
0
                column_ptr = remove_nullable(column_ptr);
747
0
            }
748
344
        } else if (slot_desc->is_nullable()) {
749
0
            column_ptr = make_nullable(column_ptr);
750
0
        }
751
344
        mutable_output_columns[j]->insert_range_from(*column_ptr, 0, rows);
752
344
        ctx_idx++;
753
344
    }
754
755
    // after do the dest block insert operation, clear _src_block to remove the reference of origin column
756
8
    _src_block_ptr->clear();
757
758
8
    size_t dest_size = block->columns();
759
    // do filter
760
8
    block->insert(ColumnWithTypeAndName(std::move(filter_column), std::make_shared<DataTypeUInt8>(),
761
8
                                        "filter column"));
762
8
    RETURN_IF_ERROR(Block::filter_block(block, dest_size, dest_size));
763
764
8
    _counter.num_rows_filtered += rows - block->rows();
765
8
    return Status::OK();
766
8
}
767
768
235
Status FileScanner::_process_src_block_after_read(Block* block) {
769
235
    DCHECK(_process_src_block_after_read_handler != nullptr);
770
235
    return (this->*_process_src_block_after_read_handler)(block);
771
235
}
772
773
227
Status FileScanner::_process_src_block_after_read_for_query(Block* block) {
774
    // Truncate CHAR/VARCHAR columns when target size is smaller than file schema.
775
    // This is needed for external table queries with truncate_char_or_varchar_columns=true.
776
227
    RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block));
777
227
    return Status::OK();
778
227
}
779
780
8
Status FileScanner::_fill_columns_from_path(size_t rows) {
781
8
    if (_partition_col_descs.empty()) {
782
8
        return Status::OK();
783
8
    }
784
0
    DataTypeSerDe::FormatOptions text_format_options;
785
0
    for (auto& kv : _partition_col_descs) {
786
0
        auto doris_column =
787
0
                _src_block_ptr->get_by_position(_src_block_name_to_idx[kv.first]).column;
788
0
        IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
789
0
        auto& [value, slot_desc] = kv.second;
790
0
        auto text_serde = slot_desc->get_data_type_ptr()->get_serde();
791
0
        Slice slice(value.data(), value.size());
792
0
        uint64_t num_deserialized = 0;
793
0
        if (_partition_value_is_null.contains(kv.first) && _partition_value_is_null[kv.first]) {
794
0
            col_ptr->insert_many_defaults(rows);
795
0
        } else if (text_serde->deserialize_column_from_fixed_json(
796
0
                           *col_ptr, slice, rows, &num_deserialized, text_format_options) !=
797
0
                   Status::OK()) {
798
0
            return Status::InternalError("Failed to fill partition column: {}={}",
799
0
                                         slot_desc->col_name(), value);
800
0
        } else if (num_deserialized != rows) {
801
0
            return Status::InternalError(
802
0
                    "Failed to fill partition column: {}={}. "
803
0
                    "Number of rows expected: {}, actual: {}",
804
0
                    slot_desc->col_name(), value, rows, num_deserialized);
805
0
        }
806
0
    }
807
0
    return Status::OK();
808
0
}
809
810
8
Status FileScanner::_fill_missing_columns(size_t rows) {
811
    // For columns in the table that are not from the file and not partition columns,
812
    // fill with default values or NULL.
813
282
    for (const auto& col_desc : _column_descs) {
814
282
        if (col_desc.category != ColumnCategory::REGULAR &&
815
282
            col_desc.category != ColumnCategory::GENERATED) {
816
0
            continue;
817
0
        }
818
282
        if (_is_file_slot.contains(col_desc.slot_desc->id())) {
819
282
            continue;
820
282
        }
821
0
        auto it = _src_block_name_to_idx.find(col_desc.name);
822
0
        if (it == _src_block_name_to_idx.end()) {
823
0
            continue;
824
0
        }
825
0
        auto doris_column = _src_block_ptr->get_by_position(it->second).column;
826
0
        IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
827
0
        if (col_ptr->size() >= rows) {
828
0
            continue;
829
0
        }
830
0
        size_t need_rows = rows - col_ptr->size();
831
0
        if (col_desc.default_expr != nullptr) {
832
0
            Block default_block;
833
0
            default_block.insert(
834
0
                    ColumnWithTypeAndName(col_desc.slot_desc->get_data_type_ptr()->create_column(),
835
0
                                          col_desc.slot_desc->get_data_type_ptr(), col_desc.name));
836
0
            int result_column_id = 0;
837
0
            RETURN_IF_ERROR(col_desc.default_expr->execute(&default_block, &result_column_id));
838
0
            auto& default_col = default_block.get_by_position(result_column_id).column;
839
0
            for (size_t i = 0; i < need_rows; ++i) {
840
0
                col_ptr->insert_from(*default_col, 0);
841
0
            }
842
0
        } else {
843
0
            col_ptr->insert_many_defaults(need_rows);
844
0
        }
845
0
    }
846
8
    return Status::OK();
847
8
}
848
849
8
Status FileScanner::_process_src_block_after_read_for_load(Block* block) {
850
    // Convert the src block columns type in-place.
851
8
    RETURN_IF_ERROR(_cast_to_input_block(block));
852
    // Compute row count from file columns (partition columns may be empty at this point).
853
8
    size_t rows = 0;
854
290
    for (size_t i = 0; i < _src_block_ptr->columns(); ++i) {
855
282
        size_t s = _src_block_ptr->get_by_position(i).column->size();
856
282
        if (s > rows) {
857
8
            rows = s;
858
8
        }
859
282
    }
860
    // Fill partition columns from path for readers that do not handle them internally
861
    // (e.g., CSV, JSON readers in broker/stream load).
862
8
    RETURN_IF_ERROR(_fill_columns_from_path(rows));
863
    // Fill missing columns (non-file, non-partition) with default values or NULL.
864
8
    RETURN_IF_ERROR(_fill_missing_columns(rows));
865
    // Apply _pre_conjunct_ctxs to filter src block.
866
8
    RETURN_IF_ERROR(_pre_filter_src_block());
867
868
    // Convert src block to output block (dest block), then apply filters.
869
8
    RETURN_IF_ERROR(_convert_to_output_block(block));
870
    // Truncate CHAR/VARCHAR columns when target size is smaller than file schema.
871
8
    RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block));
872
8
    return Status::OK();
873
8
}
874
875
235
Status FileScanner::_truncate_char_or_varchar_columns(Block* block) {
876
    // Truncate char columns or varchar columns if size is smaller than file columns
877
    // or not found in the file column schema.
878
235
    if (!_state->query_options().truncate_char_or_varchar_columns) {
879
235
        return Status::OK();
880
235
    }
881
0
    int idx = 0;
882
0
    for (auto* slot_desc : _real_tuple_desc->slots()) {
883
0
        const auto& type = slot_desc->type();
884
0
        if (type->get_primitive_type() != TYPE_VARCHAR && type->get_primitive_type() != TYPE_CHAR) {
885
0
            ++idx;
886
0
            continue;
887
0
        }
888
0
        auto iter = _source_file_col_name_types.find(slot_desc->col_name());
889
0
        if (iter != _source_file_col_name_types.end()) {
890
0
            const auto file_type_desc = _source_file_col_name_types[slot_desc->col_name()];
891
0
            int l = -1;
892
0
            if (auto* ftype = check_and_get_data_type<DataTypeString>(
893
0
                        remove_nullable(file_type_desc).get())) {
894
0
                l = ftype->len();
895
0
            }
896
0
            if ((assert_cast<const DataTypeString*>(remove_nullable(type).get())->len() > 0) &&
897
0
                (assert_cast<const DataTypeString*>(remove_nullable(type).get())->len() < l ||
898
0
                 l < 0)) {
899
0
                _truncate_char_or_varchar_column(
900
0
                        block, idx,
901
0
                        assert_cast<const DataTypeString*>(remove_nullable(type).get())->len());
902
0
            }
903
0
        } else {
904
0
            _truncate_char_or_varchar_column(
905
0
                    block, idx,
906
0
                    assert_cast<const DataTypeString*>(remove_nullable(type).get())->len());
907
0
        }
908
0
        ++idx;
909
0
    }
910
0
    return Status::OK();
911
235
}
912
913
// VARCHAR substring(VARCHAR str, INT pos[, INT len])
914
0
void FileScanner::_truncate_char_or_varchar_column(Block* block, int idx, int len) {
915
0
    auto int_type = std::make_shared<DataTypeInt32>();
916
0
    uint32_t num_columns_without_result = block->columns();
917
0
    const ColumnNullable* col_nullable =
918
0
            assert_cast<const ColumnNullable*>(block->get_by_position(idx).column.get());
919
0
    const ColumnPtr& string_column_ptr = col_nullable->get_nested_column_ptr();
920
0
    ColumnPtr null_map_column_ptr = col_nullable->get_null_map_column_ptr();
921
0
    block->replace_by_position(idx, std::move(string_column_ptr));
922
0
    block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(1)), int_type,
923
0
                   "const 1"}); // pos is 1
924
0
    block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(len)), int_type,
925
0
                   fmt::format("const {}", len)});                          // len
926
0
    block->insert({nullptr, std::make_shared<DataTypeString>(), "result"}); // result column
927
0
    ColumnNumbers temp_arguments(3);
928
0
    temp_arguments[0] = idx;                            // str column
929
0
    temp_arguments[1] = num_columns_without_result;     // pos
930
0
    temp_arguments[2] = num_columns_without_result + 1; // len
931
0
    uint32_t result_column_id = num_columns_without_result + 2;
932
933
0
    SubstringUtil::substring_execute(*block, temp_arguments, result_column_id, block->rows());
934
0
    auto res = ColumnNullable::create(block->get_by_position(result_column_id).column,
935
0
                                      null_map_column_ptr);
936
0
    block->replace_by_position(idx, std::move(res));
937
0
    Block::erase_useless_column(block, num_columns_without_result);
938
0
}
939
940
0
Status FileScanner::_create_row_id_column_iterator() {
941
0
    auto& id_file_map = _state->get_id_file_map();
942
0
    auto file_id = id_file_map->get_file_mapping_id(
943
0
            std::make_shared<FileMapping>(((FileScanLocalState*)_local_state)->parent_id(),
944
0
                                          _current_range, _should_enable_file_meta_cache()));
945
0
    _row_id_column_iterator_pair.first = std::make_shared<RowIdColumnIteratorV2>(
946
0
            IdManager::ID_VERSION, BackendOptions::get_backend_id(), file_id);
947
0
    return Status::OK();
948
0
}
949
950
38
void FileScanner::_fill_base_init_context(ReaderInitContext* ctx) {
951
38
    ctx->column_descs = &_column_descs;
952
38
    ctx->col_name_to_block_idx = &_src_block_name_to_idx;
953
38
    ctx->state = _state;
954
38
    ctx->tuple_descriptor = _real_tuple_desc;
955
38
    ctx->row_descriptor = _default_val_row_desc.get();
956
38
    ctx->params = _params;
957
38
    ctx->range = &_current_range;
958
38
    ctx->table_info_node = TableSchemaChangeHelper::ConstNode::get_instance();
959
38
    ctx->push_down_agg_type = _get_push_down_agg_type();
960
38
}
961
962
25
Status FileScanner::_get_next_reader() {
963
25
    while (true) {
964
25
        if (_cur_reader) {
965
12
            _cur_reader->collect_profile_before_close();
966
12
            RETURN_IF_ERROR(_cur_reader->close());
967
12
            _state->update_num_finished_scan_range(1);
968
12
        }
969
25
        _cur_reader.reset(nullptr);
970
25
        _src_block_init = false;
971
25
        bool has_next = _first_scan_range;
972
25
        if (!_first_scan_range) {
973
13
            RETURN_IF_ERROR(_split_source->get_next(&has_next, &_current_range));
974
13
        }
975
25
        _first_scan_range = false;
976
25
        if (!has_next || _should_stop) {
977
12
            _scanner_eof = true;
978
12
            return Status::OK();
979
12
        }
980
981
13
        const TFileRangeDesc& range = _current_range;
982
13
        _current_range_path = range.path;
983
984
13
        if (!_partition_slot_index_map.empty()) {
985
            // we need get partition columns first for runtime filter partition pruning
986
0
            RETURN_IF_ERROR(_generate_partition_columns());
987
988
0
            if (_state->query_options().enable_runtime_filter_partition_prune) {
989
                // if enable_runtime_filter_partition_prune is true, we need to check whether this range can be filtered out
990
                // by runtime filter partition prune
991
0
                if (_push_down_conjuncts.size() < _conjuncts.size()) {
992
                    // there are new runtime filters, need to re-init runtime filter partition pruning ctxs
993
0
                    _init_runtime_filter_partition_prune_ctxs();
994
0
                }
995
996
0
                bool can_filter_all = false;
997
0
                RETURN_IF_ERROR(_process_runtime_filters_partition_prune(can_filter_all));
998
0
                if (can_filter_all) {
999
                    // this range can be filtered out by runtime filter partition pruning
1000
                    // so we need to skip this range
1001
0
                    COUNTER_UPDATE(_runtime_filter_partition_pruned_range_counter, 1);
1002
0
                    continue;
1003
0
                }
1004
0
            }
1005
0
        }
1006
1007
        // create reader for specific format
1008
13
        Status init_status = Status::OK();
1009
13
        TFileFormatType::type format_type = _get_current_format_type();
1010
        // for compatibility, this logic is deprecated in 3.1
1011
13
        if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params) {
1012
0
            if (range.table_format_params.table_format_type == "paimon" &&
1013
0
                !range.table_format_params.paimon_params.__isset.paimon_split) {
1014
                // use native reader
1015
0
                auto format = range.table_format_params.paimon_params.file_format;
1016
0
                if (format == "orc") {
1017
0
                    format_type = TFileFormatType::FORMAT_ORC;
1018
0
                } else if (format == "parquet") {
1019
0
                    format_type = TFileFormatType::FORMAT_PARQUET;
1020
0
                } else {
1021
0
                    return Status::InternalError("Not supported paimon file format: {}", format);
1022
0
                }
1023
0
            }
1024
0
        }
1025
1026
13
        bool push_down_predicates = _should_push_down_predicates(format_type);
1027
13
        bool need_to_get_parsed_schema = false;
1028
13
        switch (format_type) {
1029
1
        case TFileFormatType::FORMAT_JNI: {
1030
1
            ReaderInitContext jni_ctx;
1031
1
            _fill_base_init_context(&jni_ctx);
1032
1033
1
            if (range.__isset.table_format_params &&
1034
1
                range.table_format_params.table_format_type == "max_compute") {
1035
0
                const auto* mc_desc = static_cast<const MaxComputeTableDescriptor*>(
1036
0
                        _real_tuple_desc->table_desc());
1037
0
                if (!mc_desc->init_status()) {
1038
0
                    return mc_desc->init_status();
1039
0
                }
1040
0
                std::unique_ptr<MaxComputeJniReader> mc_reader = MaxComputeJniReader::create_unique(
1041
0
                        mc_desc, range.table_format_params.max_compute_params, _file_slot_descs,
1042
0
                        range, _state, _profile);
1043
0
                init_status = static_cast<GenericReader*>(mc_reader.get())->init_reader(&jni_ctx);
1044
0
                _cur_reader = std::move(mc_reader);
1045
1
            } else if (range.__isset.table_format_params &&
1046
1
                       range.table_format_params.table_format_type == "paimon") {
1047
0
                if (_state->query_options().__isset.enable_paimon_cpp_reader &&
1048
0
                    _state->query_options().enable_paimon_cpp_reader) {
1049
0
                    auto cpp_reader = PaimonCppReader::create_unique(_file_slot_descs, _state,
1050
0
                                                                     _profile, range, _params);
1051
0
                    cpp_reader->set_push_down_agg_type(_get_push_down_agg_type());
1052
0
                    if (!_is_load && !_push_down_conjuncts.empty()) {
1053
0
                        PaimonPredicateConverter predicate_converter(_file_slot_descs, _state);
1054
0
                        auto predicate = predicate_converter.build(_push_down_conjuncts);
1055
0
                        if (predicate) {
1056
0
                            cpp_reader->set_predicate(std::move(predicate));
1057
0
                        }
1058
0
                    }
1059
0
                    init_status =
1060
0
                            static_cast<GenericReader*>(cpp_reader.get())->init_reader(&jni_ctx);
1061
0
                    _cur_reader = std::move(cpp_reader);
1062
0
                } else {
1063
0
                    auto paimon_reader = PaimonJniReader::create_unique(_file_slot_descs, _state,
1064
0
                                                                        _profile, range, _params);
1065
0
                    init_status =
1066
0
                            static_cast<GenericReader*>(paimon_reader.get())->init_reader(&jni_ctx);
1067
0
                    _cur_reader = std::move(paimon_reader);
1068
0
                }
1069
1
            } else if (range.__isset.table_format_params &&
1070
1
                       range.table_format_params.table_format_type == "hudi") {
1071
0
                auto hudi_reader = HudiJniReader::create_unique(
1072
0
                        *_params, range.table_format_params.hudi_params, _file_slot_descs, _state,
1073
0
                        _profile);
1074
0
                init_status = static_cast<GenericReader*>(hudi_reader.get())->init_reader(&jni_ctx);
1075
0
                _cur_reader = std::move(hudi_reader);
1076
1
            } else if (range.__isset.table_format_params &&
1077
1
                       range.table_format_params.table_format_type == "trino_connector") {
1078
0
                auto trino_reader = TrinoConnectorJniReader::create_unique(_file_slot_descs, _state,
1079
0
                                                                           _profile, range);
1080
0
                init_status =
1081
0
                        static_cast<GenericReader*>(trino_reader.get())->init_reader(&jni_ctx);
1082
0
                _cur_reader = std::move(trino_reader);
1083
1
            } else if (range.__isset.table_format_params &&
1084
1
                       range.table_format_params.table_format_type == "jdbc") {
1085
                // Extract jdbc params from table_format_params
1086
0
                std::map<std::string, std::string> jdbc_params(
1087
0
                        range.table_format_params.jdbc_params.begin(),
1088
0
                        range.table_format_params.jdbc_params.end());
1089
0
                auto jdbc_reader = JdbcJniReader::create_unique(_file_slot_descs, _state, _profile,
1090
0
                                                                jdbc_params);
1091
0
                init_status = static_cast<GenericReader*>(jdbc_reader.get())->init_reader(&jni_ctx);
1092
0
                _cur_reader = std::move(jdbc_reader);
1093
1
            } else if (range.__isset.table_format_params &&
1094
1
                       range.table_format_params.table_format_type == "iceberg") {
1095
0
                auto iceberg_sys_reader = IcebergSysTableJniReader::create_unique(
1096
0
                        _file_slot_descs, _state, _profile, range, _params);
1097
0
                init_status = static_cast<GenericReader*>(iceberg_sys_reader.get())
1098
0
                                      ->init_reader(&jni_ctx);
1099
0
                _cur_reader = std::move(iceberg_sys_reader);
1100
0
            }
1101
            // Set col_name_to_block_idx for JNI readers to avoid repeated map creation
1102
1
            if (_cur_reader) {
1103
0
                if (auto* jni_reader = dynamic_cast<JniReader*>(_cur_reader.get())) {
1104
0
                    jni_reader->set_col_name_to_block_idx(&_src_block_name_to_idx);
1105
0
                }
1106
0
            }
1107
1
            break;
1108
1
        }
1109
0
        case TFileFormatType::FORMAT_PARQUET: {
1110
0
            auto file_meta_cache_ptr = _should_enable_file_meta_cache()
1111
0
                                               ? ExecEnv::GetInstance()->file_meta_cache()
1112
0
                                               : nullptr;
1113
0
            if (push_down_predicates) {
1114
0
                RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1115
0
            }
1116
0
            RETURN_IF_ERROR(_init_parquet_reader(file_meta_cache_ptr));
1117
1118
0
            need_to_get_parsed_schema = true;
1119
0
            break;
1120
0
        }
1121
0
        case TFileFormatType::FORMAT_ORC: {
1122
0
            auto file_meta_cache_ptr = _should_enable_file_meta_cache()
1123
0
                                               ? ExecEnv::GetInstance()->file_meta_cache()
1124
0
                                               : nullptr;
1125
0
            if (push_down_predicates) {
1126
0
                RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1127
0
            }
1128
0
            RETURN_IF_ERROR(_init_orc_reader(file_meta_cache_ptr));
1129
1130
0
            need_to_get_parsed_schema = true;
1131
0
            break;
1132
0
        }
1133
8
        case TFileFormatType::FORMAT_CSV_PLAIN:
1134
8
        case TFileFormatType::FORMAT_CSV_GZ:
1135
8
        case TFileFormatType::FORMAT_CSV_BZ2:
1136
8
        case TFileFormatType::FORMAT_CSV_LZ4FRAME:
1137
8
        case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
1138
8
        case TFileFormatType::FORMAT_CSV_LZOP:
1139
8
        case TFileFormatType::FORMAT_CSV_DEFLATE:
1140
8
        case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
1141
8
        case TFileFormatType::FORMAT_PROTO: {
1142
8
            auto reader = CsvReader::create_unique(_state, _profile, &_counter, *_params, range,
1143
8
                                                   _file_slot_descs, _io_ctx.get());
1144
8
            CsvInitContext csv_ctx;
1145
8
            _fill_base_init_context(&csv_ctx);
1146
8
            csv_ctx.is_load = _is_load;
1147
8
            init_status = static_cast<GenericReader*>(reader.get())->init_reader(&csv_ctx);
1148
8
            _cur_reader = std::move(reader);
1149
8
            break;
1150
8
        }
1151
0
        case TFileFormatType::FORMAT_TEXT: {
1152
0
            auto reader = TextReader::create_unique(_state, _profile, &_counter, *_params, range,
1153
0
                                                    _file_slot_descs, _io_ctx.get());
1154
0
            CsvInitContext text_ctx;
1155
0
            _fill_base_init_context(&text_ctx);
1156
0
            text_ctx.is_load = _is_load;
1157
0
            init_status = static_cast<GenericReader*>(reader.get())->init_reader(&text_ctx);
1158
0
            _cur_reader = std::move(reader);
1159
0
            break;
1160
8
        }
1161
0
        case TFileFormatType::FORMAT_JSON: {
1162
0
            _cur_reader =
1163
0
                    NewJsonReader::create_unique(_state, _profile, &_counter, *_params, range,
1164
0
                                                 _file_slot_descs, &_scanner_eof, _io_ctx.get());
1165
0
            JsonInitContext json_ctx;
1166
0
            _fill_base_init_context(&json_ctx);
1167
0
            json_ctx.col_default_value_ctx = &_col_default_value_ctx;
1168
0
            json_ctx.is_load = _is_load;
1169
0
            init_status = _cur_reader->init_reader(&json_ctx);
1170
0
            break;
1171
8
        }
1172
1173
4
        case TFileFormatType::FORMAT_WAL: {
1174
4
            _cur_reader = WalReader::create_unique(_state);
1175
4
            WalInitContext wal_ctx;
1176
4
            _fill_base_init_context(&wal_ctx);
1177
4
            wal_ctx.output_tuple_descriptor = _output_tuple_desc;
1178
4
            init_status = _cur_reader->init_reader(&wal_ctx);
1179
4
            break;
1180
8
        }
1181
0
        case TFileFormatType::FORMAT_NATIVE: {
1182
0
            auto reader =
1183
0
                    NativeReader::create_unique(_profile, *_params, range, _io_ctx.get(), _state);
1184
0
            ReaderInitContext native_ctx;
1185
0
            _fill_base_init_context(&native_ctx);
1186
0
            init_status = static_cast<GenericReader*>(reader.get())->init_reader(&native_ctx);
1187
0
            _cur_reader = std::move(reader);
1188
0
            need_to_get_parsed_schema = false;
1189
0
            break;
1190
8
        }
1191
0
        case TFileFormatType::FORMAT_ARROW: {
1192
0
            ReaderInitContext arrow_ctx;
1193
0
            _fill_base_init_context(&arrow_ctx);
1194
1195
0
            if (range.__isset.table_format_params &&
1196
0
                range.table_format_params.table_format_type == "remote_doris") {
1197
0
                auto doris_reader =
1198
0
                        RemoteDorisReader::create_unique(_file_slot_descs, _state, _profile, range);
1199
0
                init_status =
1200
0
                        static_cast<GenericReader*>(doris_reader.get())->init_reader(&arrow_ctx);
1201
0
                if (doris_reader) {
1202
0
                    doris_reader->set_col_name_to_block_idx(&_src_block_name_to_idx);
1203
0
                }
1204
0
                _cur_reader = std::move(doris_reader);
1205
0
            } else {
1206
0
                auto arrow_reader =
1207
0
                        ArrowStreamReader::create_unique(_state, _profile, &_counter, *_params,
1208
0
                                                         range, _file_slot_descs, _io_ctx.get());
1209
0
                init_status =
1210
0
                        static_cast<GenericReader*>(arrow_reader.get())->init_reader(&arrow_ctx);
1211
0
                _cur_reader = std::move(arrow_reader);
1212
0
            }
1213
0
            break;
1214
8
        }
1215
0
        default:
1216
0
            return Status::NotSupported("Not supported create reader for file format: {}.",
1217
0
                                        to_string(_params->format_type));
1218
13
        }
1219
1220
13
        if (_cur_reader == nullptr) {
1221
1
            return Status::NotSupported(
1222
1
                    "Not supported create reader for table format: {} / file format: {}.",
1223
1
                    range.__isset.table_format_params ? range.table_format_params.table_format_type
1224
1
                                                      : "NotSet",
1225
1
                    to_string(_params->format_type));
1226
1
        }
1227
12
        COUNTER_UPDATE(_file_counter, 1);
1228
        // The FileScanner for external table may try to open not exist files,
1229
        // Because FE file cache for external table may out of date.
1230
        // So, NOT_FOUND for FileScanner is not a fail case.
1231
        // Will remove this after file reader refactor.
1232
12
        if (init_status.is<END_OF_FILE>()) {
1233
0
            COUNTER_UPDATE(_empty_file_counter, 1);
1234
0
            continue;
1235
12
        } else if (init_status.is<ErrorCode::NOT_FOUND>()) {
1236
0
            if (config::ignore_not_found_file_in_external_table) {
1237
0
                COUNTER_UPDATE(_not_found_file_counter, 1);
1238
0
                continue;
1239
0
            }
1240
0
            return Status::InternalError("failed to find reader, err: {}", init_status.to_string());
1241
12
        } else if (!init_status.ok()) {
1242
0
            return Status::InternalError("failed to init reader, err: {}", init_status.to_string());
1243
0
        }
1244
1245
        // For table-level COUNT pushdown, offsets are undefined so we must skip
1246
        // _set_fill_or_truncate_columns (it uses [start_offset, end_offset] to
1247
        // filter row groups, which would produce incorrect empty results).
1248
12
        bool is_table_level_count = _get_push_down_agg_type() == TPushAggOp::type::COUNT &&
1249
12
                                    range.__isset.table_format_params &&
1250
12
                                    range.table_format_params.table_level_row_count >= 0;
1251
12
        if (!is_table_level_count) {
1252
12
            Status status = _set_fill_or_truncate_columns(need_to_get_parsed_schema);
1253
12
            if (status.is<END_OF_FILE>()) { // all parquet row groups are filtered
1254
0
                continue;
1255
12
            } else if (!status.ok()) {
1256
0
                return Status::InternalError("failed to set_fill_or_truncate_columns, err: {}",
1257
0
                                             status.to_string());
1258
0
            }
1259
12
        }
1260
1261
        // Unified COUNT(*) pushdown: replace the real reader with CountReader
1262
        // decorator if the reader accepts COUNT and can provide a total row count.
1263
12
        if (_cur_reader->get_push_down_agg_type() == TPushAggOp::type::COUNT) {
1264
0
            int64_t total_rows = -1;
1265
0
            if (is_table_level_count) {
1266
                // FE-provided count (may account for table-format deletions)
1267
0
                total_rows = range.table_format_params.table_level_row_count;
1268
0
            } else if (_cur_reader->supports_count_pushdown()) {
1269
                // File metadata count (ORC footer / Parquet row groups)
1270
0
                total_rows = _cur_reader->get_total_rows();
1271
0
            }
1272
0
            if (total_rows >= 0) {
1273
0
                auto batch_size = _state->query_options().batch_size;
1274
0
                _cur_reader = std::make_unique<CountReader>(total_rows, batch_size,
1275
0
                                                            std::move(_cur_reader));
1276
0
            }
1277
0
        }
1278
12
        _cur_reader_eof = false;
1279
12
        break;
1280
12
    }
1281
12
    return Status::OK();
1282
25
}
1283
1284
Status FileScanner::_init_parquet_reader(FileMetaCache* file_meta_cache_ptr,
1285
14
                                         std::unique_ptr<ParquetReader> parquet_reader) {
1286
14
    const TFileRangeDesc& range = _current_range;
1287
14
    Status init_status = Status::OK();
1288
1289
14
    phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> slot_id_to_predicates =
1290
14
            _local_state
1291
14
                    ? _local_state->cast<FileScanLocalState>()._slot_id_to_predicates
1292
14
                    : phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> {};
1293
1294
    // Build unified ParquetInitContext (shared by all Parquet reader variants)
1295
14
    ParquetInitContext pctx;
1296
14
    _fill_base_init_context(&pctx);
1297
14
    pctx.conjuncts = &_push_down_conjuncts;
1298
14
    pctx.slot_id_to_predicates = &slot_id_to_predicates;
1299
14
    pctx.colname_to_slot_id = _col_name_to_slot_id;
1300
14
    pctx.not_single_slot_filter_conjuncts = &_not_single_slot_filter_conjuncts;
1301
14
    pctx.slot_id_to_filter_conjuncts = &_slot_id_to_filter_conjuncts;
1302
1303
14
    if (range.__isset.table_format_params &&
1304
14
        range.table_format_params.table_format_type == "iceberg") {
1305
        // IcebergParquetReader IS-A ParquetReader (CRTP mixin), no wrapping needed
1306
0
        std::unique_ptr<IcebergParquetReader> iceberg_reader = IcebergParquetReader::create_unique(
1307
0
                _kv_cache, _profile, *_params, range, _state->query_options().batch_size,
1308
0
                &_state->timezone_obj(), _io_ctx.get(), _state, file_meta_cache_ptr);
1309
1310
        // Transfer properties
1311
0
        if (_row_id_column_iterator_pair.second != -1) {
1312
0
            RETURN_IF_ERROR(_create_row_id_column_iterator());
1313
0
            iceberg_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
1314
0
        }
1315
0
        if (_row_lineage_columns.row_id_column_idx != -1 ||
1316
0
            _row_lineage_columns.last_updated_sequence_number_column_idx != -1) {
1317
0
            auto row_lineage_columns = std::make_shared<RowLineageColumns>();
1318
0
            row_lineage_columns->row_id_column_idx = _row_lineage_columns.row_id_column_idx;
1319
0
            row_lineage_columns->last_updated_sequence_number_column_idx =
1320
0
                    _row_lineage_columns.last_updated_sequence_number_column_idx;
1321
0
            const auto& iceberg_params = range.table_format_params.iceberg_params;
1322
0
            row_lineage_columns->first_row_id =
1323
0
                    iceberg_params.__isset.first_row_id ? iceberg_params.first_row_id : -1;
1324
0
            row_lineage_columns->last_updated_sequence_number =
1325
0
                    iceberg_params.__isset.last_updated_sequence_number
1326
0
                            ? iceberg_params.last_updated_sequence_number
1327
0
                            : -1;
1328
0
            iceberg_reader->set_row_lineage_columns(std::move(row_lineage_columns));
1329
0
        }
1330
0
        iceberg_reader->set_push_down_agg_type(_get_push_down_agg_type());
1331
1332
0
        init_status = static_cast<GenericReader*>(iceberg_reader.get())->init_reader(&pctx);
1333
0
        _cur_reader = std::move(iceberg_reader);
1334
14
    } else if (range.__isset.table_format_params &&
1335
14
               range.table_format_params.table_format_type == "paimon") {
1336
        // PaimonParquetReader IS-A ParquetReader, no wrapping needed
1337
0
        auto paimon_reader = PaimonParquetReader::create_unique(
1338
0
                _profile, *_params, range, _state->query_options().batch_size,
1339
0
                &_state->timezone_obj(), _kv_cache, _io_ctx.get(), _state, file_meta_cache_ptr);
1340
0
        if (_row_id_column_iterator_pair.second != -1) {
1341
0
            RETURN_IF_ERROR(_create_row_id_column_iterator());
1342
0
            paimon_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
1343
0
        }
1344
0
        paimon_reader->set_push_down_agg_type(_get_push_down_agg_type());
1345
0
        init_status = static_cast<GenericReader*>(paimon_reader.get())->init_reader(&pctx);
1346
0
        _cur_reader = std::move(paimon_reader);
1347
14
    } else if (range.__isset.table_format_params &&
1348
14
               range.table_format_params.table_format_type == "hudi") {
1349
        // HudiParquetReader IS-A ParquetReader, no wrapping needed
1350
0
        auto hudi_reader = HudiParquetReader::create_unique(
1351
0
                _profile, *_params, range, _state->query_options().batch_size,
1352
0
                &_state->timezone_obj(), _io_ctx.get(), _state, file_meta_cache_ptr);
1353
0
        if (_row_id_column_iterator_pair.second != -1) {
1354
0
            RETURN_IF_ERROR(_create_row_id_column_iterator());
1355
0
            hudi_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
1356
0
        }
1357
0
        hudi_reader->set_push_down_agg_type(_get_push_down_agg_type());
1358
0
        init_status = static_cast<GenericReader*>(hudi_reader.get())->init_reader(&pctx);
1359
0
        _cur_reader = std::move(hudi_reader);
1360
14
    } else if (range.table_format_params.table_format_type == "hive") {
1361
14
        auto hive_reader = HiveParquetReader::create_unique(
1362
14
                _profile, *_params, range, _state->query_options().batch_size,
1363
14
                &_state->timezone_obj(), _io_ctx.get(), _state, &_is_file_slot, file_meta_cache_ptr,
1364
14
                _state->query_options().enable_parquet_lazy_mat);
1365
14
        if (_row_id_column_iterator_pair.second != -1) {
1366
0
            RETURN_IF_ERROR(_create_row_id_column_iterator());
1367
0
            hive_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
1368
0
        }
1369
14
        hive_reader->set_push_down_agg_type(_get_push_down_agg_type());
1370
14
        init_status = static_cast<GenericReader*>(hive_reader.get())->init_reader(&pctx);
1371
14
        _cur_reader = std::move(hive_reader);
1372
14
    } else if (range.table_format_params.table_format_type == "tvf") {
1373
0
        if (!parquet_reader) {
1374
0
            parquet_reader = ParquetReader::create_unique(
1375
0
                    _profile, *_params, range, _state->query_options().batch_size,
1376
0
                    &_state->timezone_obj(), _io_ctx.get(), _state, file_meta_cache_ptr,
1377
0
                    _state->query_options().enable_parquet_lazy_mat);
1378
0
        }
1379
0
        parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
1380
0
        if (_row_id_column_iterator_pair.second != -1) {
1381
0
            RETURN_IF_ERROR(_create_row_id_column_iterator());
1382
0
            parquet_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
1383
0
        }
1384
0
        init_status = static_cast<GenericReader*>(parquet_reader.get())->init_reader(&pctx);
1385
0
        _cur_reader = std::move(parquet_reader);
1386
0
    } else if (_is_load) {
1387
0
        if (!parquet_reader) {
1388
0
            parquet_reader = ParquetReader::create_unique(
1389
0
                    _profile, *_params, range, _state->query_options().batch_size,
1390
0
                    &_state->timezone_obj(), _io_ctx.get(), _state, file_meta_cache_ptr,
1391
0
                    _state->query_options().enable_parquet_lazy_mat);
1392
0
        }
1393
0
        parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
1394
0
        init_status = static_cast<GenericReader*>(parquet_reader.get())->init_reader(&pctx);
1395
0
        _cur_reader = std::move(parquet_reader);
1396
0
    }
1397
1398
14
    return init_status;
1399
14
}
1400
1401
Status FileScanner::_init_orc_reader(FileMetaCache* file_meta_cache_ptr,
1402
11
                                     std::unique_ptr<OrcReader> orc_reader) {
1403
11
    const TFileRangeDesc& range = _current_range;
1404
11
    Status init_status = Status::OK();
1405
1406
    // Build unified OrcInitContext (shared by all ORC reader variants)
1407
11
    OrcInitContext octx;
1408
11
    _fill_base_init_context(&octx);
1409
11
    octx.conjuncts = &_push_down_conjuncts;
1410
11
    octx.not_single_slot_filter_conjuncts = &_not_single_slot_filter_conjuncts;
1411
11
    octx.slot_id_to_filter_conjuncts = &_slot_id_to_filter_conjuncts;
1412
1413
11
    if (range.__isset.table_format_params &&
1414
11
        range.table_format_params.table_format_type == "transactional_hive") {
1415
        // TransactionalHiveReader IS-A OrcReader, no wrapping needed
1416
0
        auto tran_orc_reader = TransactionalHiveReader::create_unique(
1417
0
                _profile, _state, *_params, range, _state->query_options().batch_size,
1418
0
                _state->timezone(), _io_ctx.get(), file_meta_cache_ptr);
1419
0
        if (_row_id_column_iterator_pair.second != -1) {
1420
0
            RETURN_IF_ERROR(_create_row_id_column_iterator());
1421
0
            tran_orc_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
1422
0
        }
1423
0
        tran_orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
1424
0
        init_status = static_cast<GenericReader*>(tran_orc_reader.get())->init_reader(&octx);
1425
1426
0
        _cur_reader = std::move(tran_orc_reader);
1427
11
    } else if (range.__isset.table_format_params &&
1428
11
               range.table_format_params.table_format_type == "iceberg") {
1429
        // IcebergOrcReader IS-A OrcReader (CRTP mixin), no wrapping needed
1430
0
        std::unique_ptr<IcebergOrcReader> iceberg_reader = IcebergOrcReader::create_unique(
1431
0
                _kv_cache, _profile, _state, *_params, range, _state->query_options().batch_size,
1432
0
                _state->timezone(), _io_ctx.get(), file_meta_cache_ptr);
1433
1434
        // Transfer properties
1435
0
        if (_row_id_column_iterator_pair.second != -1) {
1436
0
            RETURN_IF_ERROR(_create_row_id_column_iterator());
1437
0
            iceberg_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
1438
0
        }
1439
0
        if (_row_lineage_columns.row_id_column_idx != -1 ||
1440
0
            _row_lineage_columns.last_updated_sequence_number_column_idx != -1) {
1441
0
            auto row_lineage_columns = std::make_shared<RowLineageColumns>();
1442
0
            row_lineage_columns->row_id_column_idx = _row_lineage_columns.row_id_column_idx;
1443
0
            row_lineage_columns->last_updated_sequence_number_column_idx =
1444
0
                    _row_lineage_columns.last_updated_sequence_number_column_idx;
1445
0
            const auto& iceberg_params = range.table_format_params.iceberg_params;
1446
0
            row_lineage_columns->first_row_id =
1447
0
                    iceberg_params.__isset.first_row_id ? iceberg_params.first_row_id : -1;
1448
0
            row_lineage_columns->last_updated_sequence_number =
1449
0
                    iceberg_params.__isset.last_updated_sequence_number
1450
0
                            ? iceberg_params.last_updated_sequence_number
1451
0
                            : -1;
1452
0
            iceberg_reader->set_row_lineage_columns(std::move(row_lineage_columns));
1453
0
        }
1454
0
        iceberg_reader->set_push_down_agg_type(_get_push_down_agg_type());
1455
0
        init_status = static_cast<GenericReader*>(iceberg_reader.get())->init_reader(&octx);
1456
1457
0
        _cur_reader = std::move(iceberg_reader);
1458
11
    } else if (range.__isset.table_format_params &&
1459
11
               range.table_format_params.table_format_type == "paimon") {
1460
        // PaimonOrcReader IS-A OrcReader, no wrapping needed
1461
0
        auto paimon_reader = PaimonOrcReader::create_unique(
1462
0
                _profile, _state, *_params, range, _state->query_options().batch_size,
1463
0
                _state->timezone(), _kv_cache, _io_ctx.get(), file_meta_cache_ptr);
1464
0
        if (_row_id_column_iterator_pair.second != -1) {
1465
0
            RETURN_IF_ERROR(_create_row_id_column_iterator());
1466
0
            paimon_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
1467
0
        }
1468
0
        paimon_reader->set_push_down_agg_type(_get_push_down_agg_type());
1469
0
        init_status = static_cast<GenericReader*>(paimon_reader.get())->init_reader(&octx);
1470
1471
0
        _cur_reader = std::move(paimon_reader);
1472
11
    } else if (range.__isset.table_format_params &&
1473
11
               range.table_format_params.table_format_type == "hudi") {
1474
        // HudiOrcReader IS-A OrcReader, no wrapping needed
1475
0
        auto hudi_reader = HudiOrcReader::create_unique(
1476
0
                _profile, _state, *_params, range, _state->query_options().batch_size,
1477
0
                _state->timezone(), _io_ctx.get(), file_meta_cache_ptr);
1478
0
        if (_row_id_column_iterator_pair.second != -1) {
1479
0
            RETURN_IF_ERROR(_create_row_id_column_iterator());
1480
0
            hudi_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
1481
0
        }
1482
0
        hudi_reader->set_push_down_agg_type(_get_push_down_agg_type());
1483
0
        init_status = static_cast<GenericReader*>(hudi_reader.get())->init_reader(&octx);
1484
1485
0
        _cur_reader = std::move(hudi_reader);
1486
11
    } else if (range.__isset.table_format_params &&
1487
11
               range.table_format_params.table_format_type == "hive") {
1488
11
        auto hive_reader = HiveOrcReader::create_unique(
1489
11
                _profile, _state, *_params, range, _state->query_options().batch_size,
1490
11
                _state->timezone(), _io_ctx.get(), &_is_file_slot, file_meta_cache_ptr,
1491
11
                _state->query_options().enable_orc_lazy_mat);
1492
11
        if (_row_id_column_iterator_pair.second != -1) {
1493
0
            RETURN_IF_ERROR(_create_row_id_column_iterator());
1494
0
            hive_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
1495
0
        }
1496
11
        hive_reader->set_push_down_agg_type(_get_push_down_agg_type());
1497
11
        init_status = static_cast<GenericReader*>(hive_reader.get())->init_reader(&octx);
1498
1499
11
        _cur_reader = std::move(hive_reader);
1500
11
    } else if (range.__isset.table_format_params &&
1501
0
               range.table_format_params.table_format_type == "tvf") {
1502
0
        if (!orc_reader) {
1503
0
            orc_reader = OrcReader::create_unique(
1504
0
                    _profile, _state, *_params, range, _state->query_options().batch_size,
1505
0
                    _state->timezone(), _io_ctx.get(), file_meta_cache_ptr,
1506
0
                    _state->query_options().enable_orc_lazy_mat);
1507
0
        }
1508
0
        orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
1509
0
        if (_row_id_column_iterator_pair.second != -1) {
1510
0
            RETURN_IF_ERROR(_create_row_id_column_iterator());
1511
0
            orc_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
1512
0
        }
1513
0
        init_status = static_cast<GenericReader*>(orc_reader.get())->init_reader(&octx);
1514
0
        _cur_reader = std::move(orc_reader);
1515
0
    } else if (_is_load) {
1516
0
        if (!orc_reader) {
1517
0
            orc_reader = OrcReader::create_unique(
1518
0
                    _profile, _state, *_params, range, _state->query_options().batch_size,
1519
0
                    _state->timezone(), _io_ctx.get(), file_meta_cache_ptr,
1520
0
                    _state->query_options().enable_orc_lazy_mat);
1521
0
        }
1522
0
        orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
1523
0
        init_status = static_cast<GenericReader*>(orc_reader.get())->init_reader(&octx);
1524
0
        _cur_reader = std::move(orc_reader);
1525
0
    }
1526
1527
11
    return init_status;
1528
11
}
1529
1530
37
Status FileScanner::_set_fill_or_truncate_columns(bool need_to_get_parsed_schema) {
1531
37
    _slot_lower_name_to_col_type.clear();
1532
1533
37
    std::unordered_map<std::string, DataTypePtr> name_to_col_type;
1534
37
    RETURN_IF_ERROR(_cur_reader->get_columns(&name_to_col_type));
1535
1536
591
    for (const auto& [col_name, col_type] : name_to_col_type) {
1537
591
        auto col_name_lower = to_lower(col_name);
1538
591
        if (_partition_col_descs.contains(col_name_lower)) {
1539
            /*
1540
             * `_slot_lower_name_to_col_type` is used by `_init_src_block` and `_cast_to_input_block` during LOAD to
1541
             * generate columns of the corresponding type, which records the columns existing in the file.
1542
             *
1543
             * When a column in `COLUMNS FROM PATH` exists in a file column, the column type in the block will
1544
             * not match the slot type in `_output_tuple_desc`, causing an error when
1545
             * Serde `deserialize_one_cell_from_json` fills the partition values.
1546
             *
1547
             * So for partition column not need fill _slot_lower_name_to_col_type.
1548
             */
1549
0
            continue;
1550
0
        }
1551
591
        _slot_lower_name_to_col_type.emplace(col_name_lower, col_type);
1552
591
    }
1553
1554
37
    RETURN_IF_ERROR(_generate_truncate_columns(need_to_get_parsed_schema));
1555
37
    return Status::OK();
1556
37
}
1557
1558
37
Status FileScanner::_generate_truncate_columns(bool need_to_get_parsed_schema) {
1559
37
    _source_file_col_name_types.clear();
1560
    //  The col names and types of source file, such as parquet, orc files.
1561
37
    if (_state->query_options().truncate_char_or_varchar_columns && need_to_get_parsed_schema) {
1562
0
        std::vector<std::string> source_file_col_names;
1563
0
        std::vector<DataTypePtr> source_file_col_types;
1564
0
        Status status =
1565
0
                _cur_reader->get_parsed_schema(&source_file_col_names, &source_file_col_types);
1566
0
        if (!status.ok() && status.code() != TStatusCode::NOT_IMPLEMENTED_ERROR) {
1567
0
            return status;
1568
0
        }
1569
0
        DCHECK_EQ(source_file_col_names.size(), source_file_col_types.size());
1570
0
        for (int i = 0; i < source_file_col_names.size(); ++i) {
1571
0
            _source_file_col_name_types[to_lower(source_file_col_names[i])] =
1572
0
                    source_file_col_types[i];
1573
0
        }
1574
0
    }
1575
37
    return Status::OK();
1576
37
}
1577
1578
16
Status FileScanner::prepare_for_read_lines(const TFileRangeDesc& range) {
1579
16
    _current_range = range;
1580
1581
16
    _file_cache_statistics.reset(new io::FileCacheStatistics());
1582
16
    _file_reader_stats.reset(new io::FileReaderStats());
1583
1584
16
    _file_read_bytes_counter =
1585
16
            ADD_COUNTER_WITH_LEVEL(_profile, FileReadBytesProfile, TUnit::BYTES, 1);
1586
16
    _file_read_time_counter = ADD_TIMER_WITH_LEVEL(_profile, FileReadTimeProfile, 1);
1587
1588
16
    RETURN_IF_ERROR(_init_io_ctx());
1589
16
    _io_ctx->file_cache_stats = _file_cache_statistics.get();
1590
16
    _io_ctx->file_reader_stats = _file_reader_stats.get();
1591
16
    _default_val_row_desc.reset(new RowDescriptor((TupleDescriptor*)_real_tuple_desc));
1592
16
    RETURN_IF_ERROR(_init_expr_ctxes());
1593
1594
    // Since only one column is read from the file, there is no need to filter, so set these variables to empty.
1595
16
    _push_down_conjuncts.clear();
1596
16
    _not_single_slot_filter_conjuncts.clear();
1597
16
    _slot_id_to_filter_conjuncts.clear();
1598
16
    _kv_cache = nullptr;
1599
16
    return Status::OK();
1600
16
}
1601
1602
Status FileScanner::read_lines_from_range(const TFileRangeDesc& range,
1603
                                          const std::list<int64_t>& row_ids, Block* result_block,
1604
                                          const ExternalFileMappingInfo& external_info,
1605
25
                                          int64_t* init_reader_ms, int64_t* get_block_ms) {
1606
25
    _current_range = range;
1607
25
    RETURN_IF_ERROR(_generate_partition_columns());
1608
1609
25
    TFileFormatType::type format_type = _get_current_format_type();
1610
25
    Status init_status = Status::OK();
1611
1612
25
    auto file_meta_cache_ptr = external_info.enable_file_meta_cache
1613
25
                                       ? ExecEnv::GetInstance()->file_meta_cache()
1614
25
                                       : nullptr;
1615
1616
25
    RETURN_IF_ERROR(scope_timer_run(
1617
25
            [&]() -> Status {
1618
25
                switch (format_type) {
1619
25
                case TFileFormatType::FORMAT_PARQUET: {
1620
25
                    std::unique_ptr<ParquetReader> parquet_reader = ParquetReader::create_unique(
1621
25
                            _profile, *_params, range, 1, &_state->timezone_obj(), _io_ctx.get(),
1622
25
                            _state, file_meta_cache_ptr, false);
1623
25
                    RETURN_IF_ERROR(
1624
25
                            _init_parquet_reader(file_meta_cache_ptr, std::move(parquet_reader)));
1625
                    // _init_parquet_reader may create a new table-format specific reader
1626
                    // (e.g., HiveParquetReader) that replaces the original parquet_reader.
1627
                    // We need to re-apply read_by_rows to the actual _cur_reader.
1628
25
                    RETURN_IF_ERROR(_cur_reader->read_by_rows(row_ids));
1629
25
                    break;
1630
25
                }
1631
25
                case TFileFormatType::FORMAT_ORC: {
1632
25
                    std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
1633
25
                            _profile, _state, *_params, range, 1, _state->timezone(), _io_ctx.get(),
1634
25
                            file_meta_cache_ptr, false);
1635
25
                    RETURN_IF_ERROR(_init_orc_reader(file_meta_cache_ptr, std::move(orc_reader)));
1636
                    // Same as above: re-apply read_by_rows to the actual _cur_reader.
1637
25
                    RETURN_IF_ERROR(_cur_reader->read_by_rows(row_ids));
1638
25
                    break;
1639
25
                }
1640
25
                default: {
1641
25
                    return Status::NotSupported(
1642
25
                            "Not support create lines reader for file format: {},"
1643
25
                            "only support parquet and orc.",
1644
25
                            to_string(_params->format_type));
1645
25
                }
1646
25
                }
1647
25
                return Status::OK();
1648
25
            },
1649
25
            init_reader_ms));
1650
1651
25
    RETURN_IF_ERROR(_set_fill_or_truncate_columns(true));
1652
25
    _cur_reader_eof = false;
1653
1654
25
    RETURN_IF_ERROR(scope_timer_run(
1655
25
            [&]() -> Status {
1656
25
                while (!_cur_reader_eof) {
1657
25
                    bool eof = false;
1658
25
                    RETURN_IF_ERROR(_get_block_impl(_state, result_block, &eof));
1659
25
                }
1660
25
                return Status::OK();
1661
25
            },
1662
25
            get_block_ms));
1663
1664
25
    _cur_reader->collect_profile_before_close();
1665
25
    RETURN_IF_ERROR(_cur_reader->close());
1666
1667
25
    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
1668
25
    COUNTER_UPDATE(_file_read_time_counter, _file_reader_stats->read_time_ns);
1669
25
    return Status::OK();
1670
25
}
1671
1672
25
Status FileScanner::_generate_partition_columns() {
1673
25
    _partition_col_descs.clear();
1674
25
    _partition_value_is_null.clear();
1675
25
    const TFileRangeDesc& range = _current_range;
1676
25
    if (!range.__isset.columns_from_path_keys) {
1677
25
        return Status::OK();
1678
25
    }
1679
1680
0
    std::unordered_map<std::string, int> partition_name_to_key_index;
1681
0
    int index = 0;
1682
0
    for (const auto& key : range.columns_from_path_keys) {
1683
0
        partition_name_to_key_index.emplace(key, index++);
1684
0
    }
1685
1686
    // Iterate _column_descs to find PARTITION_KEY columns instead of _partition_slot_descs.
1687
0
    for (const auto& col_desc : _column_descs) {
1688
0
        if (col_desc.category != ColumnCategory::PARTITION_KEY) {
1689
0
            continue;
1690
0
        }
1691
0
        auto pit = partition_name_to_key_index.find(col_desc.name);
1692
0
        if (pit != partition_name_to_key_index.end()) {
1693
0
            int values_index = pit->second;
1694
0
            if (range.__isset.columns_from_path && values_index < range.columns_from_path.size()) {
1695
0
                _partition_col_descs.emplace(
1696
0
                        col_desc.name,
1697
0
                        std::make_tuple(range.columns_from_path[values_index], col_desc.slot_desc));
1698
0
                if (range.__isset.columns_from_path_is_null) {
1699
0
                    _partition_value_is_null.emplace(col_desc.name,
1700
0
                                                     range.columns_from_path_is_null[values_index]);
1701
0
                }
1702
0
            }
1703
0
        }
1704
0
    }
1705
0
    return Status::OK();
1706
25
}
1707
1708
28
Status FileScanner::_init_expr_ctxes() {
1709
28
    std::map<SlotId, int> full_src_index_map;
1710
28
    std::map<SlotId, SlotDescriptor*> full_src_slot_map;
1711
28
    std::map<std::string, int> partition_name_to_key_index_map;
1712
28
    int index = 0;
1713
459
    for (const auto& slot_desc : _real_tuple_desc->slots()) {
1714
459
        full_src_slot_map.emplace(slot_desc->id(), slot_desc);
1715
459
        full_src_index_map.emplace(slot_desc->id(), index++);
1716
459
    }
1717
1718
    // For external table query, find the index of column in path.
1719
    // Because query doesn't always search for all columns in a table
1720
    // and the order of selected columns is random.
1721
    // All ranges in _ranges vector should have identical columns_from_path_keys
1722
    // because they are all file splits for the same external table.
1723
    // So here use the first element of _ranges to fill the partition_name_to_key_index_map
1724
28
    if (_current_range.__isset.columns_from_path_keys) {
1725
0
        std::vector<std::string> key_map = _current_range.columns_from_path_keys;
1726
0
        if (!key_map.empty()) {
1727
0
            for (size_t i = 0; i < key_map.size(); i++) {
1728
0
                partition_name_to_key_index_map.emplace(key_map[i], i);
1729
0
            }
1730
0
        }
1731
0
    }
1732
1733
28
    _num_of_columns_from_file = _params->num_of_columns_from_file;
1734
467
    for (const auto& slot_info : _params->required_slots) {
1735
467
        auto slot_id = slot_info.slot_id;
1736
467
        auto it = full_src_slot_map.find(slot_id);
1737
467
        if (it == std::end(full_src_slot_map)) {
1738
0
            return Status::InternalError(
1739
0
                    fmt::format("Unknown source slot descriptor, slot_id={}", slot_id));
1740
0
        }
1741
1742
467
        ColumnDescriptor col_desc;
1743
467
        col_desc.name = it->second->col_name();
1744
467
        col_desc.slot_desc = it->second;
1745
1746
        // Read category from Thrift if available (new FE), otherwise fall back
1747
        // to slot_info.is_file_slot + partition_name_to_key_index_map for broker/stream load
1748
        // where the FE does not set TColumnCategory.
1749
467
        if (slot_info.__isset.category) {
1750
68
            switch (slot_info.category) {
1751
68
            case TColumnCategory::REGULAR:
1752
68
                col_desc.category = ColumnCategory::REGULAR;
1753
68
                break;
1754
0
            case TColumnCategory::PARTITION_KEY:
1755
0
                col_desc.category = ColumnCategory::PARTITION_KEY;
1756
0
                break;
1757
0
            case TColumnCategory::SYNTHESIZED:
1758
0
                col_desc.category = ColumnCategory::SYNTHESIZED;
1759
0
                break;
1760
0
            case TColumnCategory::GENERATED:
1761
0
                col_desc.category = ColumnCategory::GENERATED;
1762
0
                break;
1763
68
            }
1764
399
        } else if (partition_name_to_key_index_map.contains(it->second->col_name()) &&
1765
399
                   !slot_info.is_file_slot) {
1766
0
            col_desc.category = ColumnCategory::PARTITION_KEY;
1767
0
        }
1768
1769
467
        if (it->second->col_name().starts_with(BeConsts::GLOBAL_ROWID_COL)) {
1770
0
            _row_id_column_iterator_pair.second = _default_val_row_desc->get_column_id(slot_id);
1771
0
            continue;
1772
0
        }
1773
1774
467
        bool is_row_lineage_col = false;
1775
467
        if (it->second->col_name() == IcebergTableReader::ROW_LINEAGE_ROW_ID) {
1776
0
            _row_lineage_columns.row_id_column_idx = _default_val_row_desc->get_column_id(slot_id);
1777
0
            is_row_lineage_col = true;
1778
0
        }
1779
1780
467
        if (it->second->col_name() == IcebergTableReader::ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
1781
0
            _row_lineage_columns.last_updated_sequence_number_column_idx =
1782
0
                    _default_val_row_desc->get_column_id(slot_id);
1783
0
            is_row_lineage_col = true;
1784
0
        }
1785
467
        if (is_row_lineage_col) {
1786
0
            col_desc.category = ColumnCategory::SYNTHESIZED;
1787
0
        }
1788
1789
        // Derive is_file_slot from category
1790
467
        bool is_file_slot = (col_desc.category == ColumnCategory::REGULAR ||
1791
467
                             col_desc.category == ColumnCategory::GENERATED);
1792
1793
467
        if (partition_name_to_key_index_map.contains(it->second->col_name())) {
1794
0
            if (_is_load) {
1795
0
                auto iti = full_src_index_map.find(slot_id);
1796
0
                _partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file);
1797
0
            } else {
1798
0
                auto kit = partition_name_to_key_index_map.find(it->second->col_name());
1799
0
                _partition_slot_index_map.emplace(slot_id, kit->second);
1800
0
            }
1801
0
        }
1802
1803
467
        if (is_file_slot) {
1804
467
            _is_file_slot.emplace(slot_id);
1805
467
            _file_slot_descs.emplace_back(it->second);
1806
467
            _file_col_names.push_back(it->second->col_name());
1807
467
        }
1808
1809
467
        _column_descs.push_back(col_desc);
1810
467
    }
1811
1812
    // set column name to default value expr map
1813
    // new inline TFileScanSlotInfo.default_value_expr (preferred)
1814
467
    for (const auto& slot_info : _params->required_slots) {
1815
467
        auto slot_id = slot_info.slot_id;
1816
467
        auto it = full_src_slot_map.find(slot_id);
1817
467
        if (it == std::end(full_src_slot_map)) {
1818
0
            continue;
1819
0
        }
1820
467
        const std::string& col_name = it->second->col_name();
1821
1822
467
        VExprContextSPtr ctx;
1823
467
        bool has_default = false;
1824
1825
        // Prefer inline default_value_expr from TFileScanSlotInfo (new FE)
1826
467
        if (slot_info.__isset.default_value_expr && !slot_info.default_value_expr.nodes.empty()) {
1827
282
            RETURN_IF_ERROR(VExpr::create_expr_tree(slot_info.default_value_expr, ctx));
1828
282
            RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc));
1829
282
            RETURN_IF_ERROR(ctx->open(_state));
1830
282
            has_default = true;
1831
282
        } else if (slot_info.__isset.default_value_expr) {
1832
            // Empty nodes means null default (same as legacy empty TExpr)
1833
0
            has_default = true;
1834
0
        }
1835
1836
        // // Fall back to legacy default_value_of_src_slot map (old FE)
1837
        // if (!has_default) {
1838
        //     auto legacy_it = _params->default_value_of_src_slot.find(slot_id);
1839
        //     if (legacy_it != std::end(_params->default_value_of_src_slot)) {
1840
        //         if (!legacy_it->second.nodes.empty()) {
1841
        //             RETURN_IF_ERROR(VExpr::create_expr_tree(legacy_it->second, ctx));
1842
        //             RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc));
1843
        //             RETURN_IF_ERROR(ctx->open(_state));
1844
        //         }
1845
        //         has_default = true;
1846
        //     }
1847
        // }
1848
1849
467
        if (has_default) {
1850
            // if expr is empty, the default value will be null
1851
282
            _col_default_value_ctx.emplace(col_name, ctx);
1852
282
        }
1853
467
    }
1854
1855
    // Populate default_expr in each ColumnDescriptor from _col_default_value_ctx.
1856
    // This makes default values available to readers via column_descs, eliminating the
1857
    // need for the separate _generate_missing_columns roundtrip.
1858
467
    for (auto& col_desc : _column_descs) {
1859
467
        auto it = _col_default_value_ctx.find(col_desc.name);
1860
467
        if (it != _col_default_value_ctx.end()) {
1861
282
            col_desc.default_expr = it->second;
1862
282
        }
1863
467
    }
1864
1865
28
    if (_is_load) {
1866
        // follow desc expr map is only for load task.
1867
8
        bool has_slot_id_map = _params->__isset.dest_sid_to_src_sid_without_trans;
1868
8
        int idx = 0;
1869
344
        for (auto* slot_desc : _output_tuple_desc->slots()) {
1870
344
            auto it = _params->expr_of_dest_slot.find(slot_desc->id());
1871
344
            if (it == std::end(_params->expr_of_dest_slot)) {
1872
0
                return Status::InternalError("No expr for dest slot, id={}, name={}",
1873
0
                                             slot_desc->id(), slot_desc->col_name());
1874
0
            }
1875
1876
344
            VExprContextSPtr ctx;
1877
344
            if (!it->second.nodes.empty()) {
1878
344
                RETURN_IF_ERROR(VExpr::create_expr_tree(it->second, ctx));
1879
344
                RETURN_IF_ERROR(ctx->prepare(_state, *_src_row_desc));
1880
344
                RETURN_IF_ERROR(ctx->open(_state));
1881
344
            }
1882
344
            _dest_vexpr_ctx.emplace_back(ctx);
1883
344
            _dest_slot_name_to_idx[slot_desc->col_name()] = idx++;
1884
1885
344
            if (has_slot_id_map) {
1886
344
                auto it1 = _params->dest_sid_to_src_sid_without_trans.find(slot_desc->id());
1887
344
                if (it1 == std::end(_params->dest_sid_to_src_sid_without_trans)) {
1888
62
                    _src_slot_descs_order_by_dest.emplace_back(nullptr);
1889
282
                } else {
1890
282
                    auto _src_slot_it = full_src_slot_map.find(it1->second);
1891
282
                    if (_src_slot_it == std::end(full_src_slot_map)) {
1892
0
                        return Status::InternalError("No src slot {} in src slot descs",
1893
0
                                                     it1->second);
1894
0
                    }
1895
282
                    _dest_slot_to_src_slot_index.emplace(_src_slot_descs_order_by_dest.size(),
1896
282
                                                         full_src_index_map[_src_slot_it->first]);
1897
282
                    _src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second);
1898
282
                }
1899
344
            }
1900
344
        }
1901
8
    }
1902
28
    return Status::OK();
1903
28
}
1904
1905
63
bool FileScanner::_should_enable_condition_cache() {
1906
63
    DCHECK(_should_enable_condition_cache_handler != nullptr);
1907
63
    return _condition_cache_digest != 0 && (this->*_should_enable_condition_cache_handler)() &&
1908
63
           (!_conjuncts.empty() || !_push_down_conjuncts.empty());
1909
63
}
1910
1911
0
bool FileScanner::_should_enable_condition_cache_for_load() const {
1912
0
    return false;
1913
0
}
1914
1915
0
bool FileScanner::_should_enable_condition_cache_for_query() const {
1916
0
    return true;
1917
0
}
1918
1919
13
bool FileScanner::_should_push_down_predicates(TFileFormatType::type format_type) const {
1920
13
    DCHECK(_should_push_down_predicates_handler != nullptr);
1921
13
    return (this->*_should_push_down_predicates_handler)(format_type);
1922
13
}
1923
1924
9
bool FileScanner::_should_push_down_predicates_for_load(TFileFormatType::type format_type) const {
1925
9
    static_cast<void>(format_type);
1926
9
    return false;
1927
9
}
1928
1929
4
bool FileScanner::_should_push_down_predicates_for_query(TFileFormatType::type format_type) const {
1930
    // JNI readers handle predicate conversion in their own paths.
1931
4
    return format_type != TFileFormatType::FORMAT_JNI;
1932
4
}
1933
1934
24
void FileScanner::_init_reader_condition_cache() {
1935
24
    _condition_cache = nullptr;
1936
24
    _condition_cache_ctx = nullptr;
1937
1938
24
    if (!_should_enable_condition_cache() || !_cur_reader) {
1939
24
        return;
1940
24
    }
1941
1942
    // Disable condition cache when delete operations exist (e.g. Iceberg position/equality
1943
    // deletes, Hive ACID deletes). Cached granule results may become stale if delete files
1944
    // change between queries while the data file's cache key remains the same.
1945
0
    if (_cur_reader->has_delete_operations()) {
1946
0
        return;
1947
0
    }
1948
1949
0
    auto* cache = segment_v2::ConditionCache::instance();
1950
0
    _condition_cache_key = segment_v2::ConditionCache::ExternalCacheKey(
1951
0
            _current_range.path,
1952
0
            _current_range.__isset.modification_time ? _current_range.modification_time : 0,
1953
0
            _current_range.__isset.file_size ? _current_range.file_size : -1,
1954
0
            _condition_cache_digest,
1955
0
            _current_range.__isset.start_offset ? _current_range.start_offset : 0,
1956
0
            _current_range.__isset.size ? _current_range.size : -1);
1957
1958
0
    segment_v2::ConditionCacheHandle handle;
1959
0
    auto condition_cache_hit = cache->lookup(_condition_cache_key, &handle);
1960
0
    if (condition_cache_hit) {
1961
0
        _condition_cache = handle.get_filter_result();
1962
0
        _condition_cache_hit_count++;
1963
0
    } else {
1964
        // Allocate cache pre-sized to total number of granules.
1965
        // We add +1 as a safety margin: when a file is split across multiple scanners
1966
        // and the first row of this scanner's range is not aligned to a granule boundary,
1967
        // the data may span one more granule than ceil(total_rows / GRANULE_SIZE).
1968
        // The extra element costs only 1 bit and never affects correctness (an extra
1969
        // false-granule beyond the actual data range won't overlap any real row range).
1970
0
        int64_t total_rows = _cur_reader->get_total_rows();
1971
0
        if (total_rows > 0) {
1972
0
            size_t num_granules = (total_rows + ConditionCacheContext::GRANULE_SIZE - 1) /
1973
0
                                  ConditionCacheContext::GRANULE_SIZE;
1974
0
            _condition_cache = std::make_shared<std::vector<bool>>(num_granules + 1, false);
1975
0
        }
1976
0
    }
1977
1978
0
    if (_condition_cache) {
1979
        // Create context to pass to readers (native readers use it; non-native readers ignore it)
1980
0
        _condition_cache_ctx = std::make_shared<ConditionCacheContext>();
1981
0
        _condition_cache_ctx->is_hit = condition_cache_hit;
1982
0
        _condition_cache_ctx->filter_result = _condition_cache;
1983
0
        _cur_reader->set_condition_cache_context(_condition_cache_ctx);
1984
0
    }
1985
0
}
1986
1987
39
void FileScanner::_finalize_reader_condition_cache() {
1988
39
    if (!_should_enable_condition_cache() || !_condition_cache_ctx ||
1989
39
        _condition_cache_ctx->is_hit) {
1990
39
        _condition_cache = nullptr;
1991
39
        _condition_cache_ctx = nullptr;
1992
39
        return;
1993
39
    }
1994
    // Only store the cache if the reader was fully consumed. If the scan was
1995
    // truncated early (e.g. by LIMIT), the cache is incomplete — unread granules
1996
    // would remain false and cause surviving rows to be incorrectly skipped on HIT.
1997
0
    if (!_cur_reader_eof) {
1998
0
        _condition_cache = nullptr;
1999
0
        _condition_cache_ctx = nullptr;
2000
0
        return;
2001
0
    }
2002
2003
0
    auto* cache = segment_v2::ConditionCache::instance();
2004
0
    cache->insert(_condition_cache_key, std::move(_condition_cache));
2005
0
    _condition_cache = nullptr;
2006
0
    _condition_cache_ctx = nullptr;
2007
0
}
2008
2009
14
Status FileScanner::close(RuntimeState* state) {
2010
14
    if (!_try_close()) {
2011
0
        return Status::OK();
2012
0
    }
2013
2014
14
    _finalize_reader_condition_cache();
2015
2016
14
    if (_cur_reader) {
2017
0
        RETURN_IF_ERROR(_cur_reader->close());
2018
0
    }
2019
2020
14
    RETURN_IF_ERROR(Scanner::close(state));
2021
14
    return Status::OK();
2022
14
}
2023
2024
12
void FileScanner::try_stop() {
2025
12
    Scanner::try_stop();
2026
12
    if (_io_ctx) {
2027
12
        _io_ctx->should_stop = true;
2028
12
    }
2029
12
}
2030
2031
12
void FileScanner::update_realtime_counters() {
2032
12
    FileScanLocalState* local_state = static_cast<FileScanLocalState*>(_local_state);
2033
2034
12
    COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes);
2035
12
    COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows);
2036
2037
12
    _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_rows(
2038
12
            _file_reader_stats->read_rows);
2039
12
    _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes(
2040
12
            _file_reader_stats->read_bytes);
2041
2042
12
    int64_t delta_bytes_read_from_local =
2043
12
            _file_cache_statistics->bytes_read_from_local - _last_bytes_read_from_local;
2044
12
    int64_t delta_bytes_read_from_remote =
2045
12
            _file_cache_statistics->bytes_read_from_remote - _last_bytes_read_from_remote;
2046
12
    if (_file_cache_statistics->bytes_read_from_local == 0 &&
2047
12
        _file_cache_statistics->bytes_read_from_remote == 0) {
2048
12
        _state->get_query_ctx()
2049
12
                ->resource_ctx()
2050
12
                ->io_context()
2051
12
                ->update_scan_bytes_from_remote_storage(_file_reader_stats->read_bytes);
2052
12
        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
2053
12
                _file_reader_stats->read_bytes);
2054
12
    } else {
2055
0
        _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage(
2056
0
                delta_bytes_read_from_local);
2057
0
        _state->get_query_ctx()
2058
0
                ->resource_ctx()
2059
0
                ->io_context()
2060
0
                ->update_scan_bytes_from_remote_storage(delta_bytes_read_from_remote);
2061
0
        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
2062
0
                delta_bytes_read_from_local);
2063
0
        DorisMetrics::instance()->query_scan_bytes_from_remote->increment(
2064
0
                delta_bytes_read_from_remote);
2065
0
    }
2066
2067
12
    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
2068
2069
12
    DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes);
2070
12
    DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows);
2071
2072
12
    _file_reader_stats->read_bytes = 0;
2073
12
    _file_reader_stats->read_rows = 0;
2074
2075
12
    _last_bytes_read_from_local = _file_cache_statistics->bytes_read_from_local;
2076
12
    _last_bytes_read_from_remote = _file_cache_statistics->bytes_read_from_remote;
2077
12
}
2078
2079
12
void FileScanner::_collect_profile_before_close() {
2080
12
    Scanner::_collect_profile_before_close();
2081
12
    if (config::enable_file_cache && _state->query_options().enable_file_cache &&
2082
12
        _profile != nullptr) {
2083
0
        io::FileCacheProfileReporter cache_profile(_profile);
2084
0
        cache_profile.update(_file_cache_statistics.get());
2085
0
        _state->get_query_ctx()->resource_ctx()->io_context()->update_bytes_write_into_cache(
2086
0
                _file_cache_statistics->bytes_write_into_cache);
2087
0
    }
2088
2089
12
    if (_cur_reader != nullptr) {
2090
0
        _cur_reader->collect_profile_before_close();
2091
0
    }
2092
2093
12
    FileScanLocalState* local_state = static_cast<FileScanLocalState*>(_local_state);
2094
12
    COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes);
2095
12
    COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows);
2096
2097
12
    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
2098
12
    COUNTER_UPDATE(_file_read_calls_counter, _file_reader_stats->read_calls);
2099
12
    COUNTER_UPDATE(_file_read_time_counter, _file_reader_stats->read_time_ns);
2100
12
    COUNTER_UPDATE(local_state->_condition_cache_hit_counter, _condition_cache_hit_count);
2101
12
    if (_io_ctx) {
2102
12
        COUNTER_UPDATE(local_state->_condition_cache_filtered_rows_counter,
2103
12
                       _io_ctx->condition_cache_filtered_rows);
2104
12
    }
2105
2106
12
    DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes);
2107
12
    DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows);
2108
12
}
2109
2110
} // namespace doris