Coverage Report

Created: 2026-05-09 09:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/file_scanner.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/file_scanner.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Exprs_types.h>
22
#include <gen_cpp/Metrics_types.h>
23
#include <gen_cpp/Opcodes_types.h>
24
#include <gen_cpp/PaloInternalService_types.h>
25
#include <gen_cpp/PlanNodes_types.h>
26
#include <glog/logging.h>
27
28
#include <algorithm>
29
#include <boost/iterator/iterator_facade.hpp>
30
#include <map>
31
#include <ranges>
32
#include <tuple>
33
#include <unordered_map>
34
#include <utility>
35
36
#include "common/compiler_util.h" // IWYU pragma: keep
37
#include "common/config.h"
38
#include "common/consts.h"
39
#include "common/logging.h"
40
#include "common/status.h"
41
#include "core/block/column_with_type_and_name.h"
42
#include "core/block/columns_with_type_and_name.h"
43
#include "core/column/column.h"
44
#include "core/column/column_nullable.h"
45
#include "core/column/column_vector.h"
46
#include "core/data_type/data_type.h"
47
#include "core/data_type/data_type_nullable.h"
48
#include "core/data_type/data_type_string.h"
49
#include "core/string_ref.h"
50
#include "exec/common/stringop_substring.h"
51
#include "exec/rowid_fetcher.h"
52
#include "exec/scan/scan_node.h"
53
#include "exprs/aggregate/aggregate_function.h"
54
#include "exprs/function/function.h"
55
#include "exprs/function/simple_function_factory.h"
56
#include "exprs/vexpr.h"
57
#include "exprs/vexpr_context.h"
58
#include "exprs/vexpr_fwd.h"
59
#include "exprs/vslot_ref.h"
60
#include "format/arrow/arrow_stream_reader.h"
61
#include "format/count_reader.h"
62
#include "format/csv/csv_reader.h"
63
#include "format/json/new_json_reader.h"
64
#include "format/native/native_reader.h"
65
#include "format/orc/vorc_reader.h"
66
#include "format/parquet/vparquet_reader.h"
67
#include "format/table/es/es_http_reader.h"
68
#include "format/table/hive_reader.h"
69
#include "format/table/hudi_jni_reader.h"
70
#include "format/table/hudi_reader.h"
71
#include "format/table/iceberg_reader.h"
72
#include "format/table/iceberg_sys_table_jni_reader.h"
73
#include "format/table/jdbc_jni_reader.h"
74
#include "format/table/max_compute_jni_reader.h"
75
#include "format/table/paimon_cpp_reader.h"
76
#include "format/table/paimon_jni_reader.h"
77
#include "format/table/paimon_predicate_converter.h"
78
#include "format/table/paimon_reader.h"
79
#include "format/table/remote_doris_reader.h"
80
#include "format/table/transactional_hive_reader.h"
81
#include "format/table/trino_connector_jni_reader.h"
82
#include "format/text/text_reader.h"
83
#ifdef BUILD_RUST_READERS
84
#include "format/lance/lance_rust_reader.h"
85
#endif
86
#include "io/cache/block_file_cache_profile.h"
87
#include "load/group_commit/wal/wal_reader.h"
88
#include "runtime/descriptors.h"
89
#include "runtime/runtime_profile.h"
90
#include "runtime/runtime_state.h"
91
92
namespace cctz {
93
class time_zone;
94
} // namespace cctz
95
namespace doris {
96
class ShardedKVCache;
97
} // namespace doris
98
99
namespace doris {
100
using namespace ErrorCode;
101
102
const std::string FileScanner::FileReadBytesProfile = "FileReadBytes";
103
const std::string FileScanner::FileReadTimeProfile = "FileReadTime";
104
105
FileScanner::FileScanner(RuntimeState* state, FileScanLocalState* local_state, int64_t limit,
106
                         std::shared_ptr<SplitSourceConnector> split_source,
107
                         RuntimeProfile* profile, ShardedKVCache* kv_cache,
108
                         const std::unordered_map<std::string, int>* colname_to_slot_id)
109
87.0k
        : Scanner(state, local_state, limit, profile),
110
87.0k
          _split_source(split_source),
111
87.0k
          _cur_reader(nullptr),
112
87.0k
          _cur_reader_eof(false),
113
87.0k
          _kv_cache(kv_cache),
114
87.0k
          _strict_mode(false),
115
87.0k
          _col_name_to_slot_id(colname_to_slot_id) {
116
87.0k
    if (state->get_query_ctx() != nullptr &&
117
87.0k
        state->get_query_ctx()->file_scan_range_params_map.count(local_state->parent_id()) > 0) {
118
84.5k
        _params = &(state->get_query_ctx()->file_scan_range_params_map[local_state->parent_id()]);
119
84.5k
    } else {
120
        // old fe thrift protocol
121
2.49k
        _params = _split_source->get_params();
122
2.49k
    }
123
87.0k
    if (_params->__isset.strict_mode) {
124
2.53k
        _strict_mode = _params->strict_mode;
125
2.53k
    }
126
127
    // For load scanner, there are input and output tuple.
128
    // For query scanner, there is only output tuple
129
87.0k
    _input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_params->src_tuple_id);
130
87.0k
    _real_tuple_desc = _input_tuple_desc == nullptr ? _output_tuple_desc : _input_tuple_desc;
131
87.0k
    _is_load = (_input_tuple_desc != nullptr);
132
87.0k
    _configure_file_scan_handlers();
133
87.0k
}
134
135
90.1k
void FileScanner::_configure_file_scan_handlers() {
136
90.1k
    if (_is_load) {
137
2.54k
        _init_src_block_handler = &FileScanner::_init_src_block_for_load;
138
2.54k
        _process_src_block_after_read_handler =
139
2.54k
                &FileScanner::_process_src_block_after_read_for_load;
140
2.54k
        _should_push_down_predicates_handler = &FileScanner::_should_push_down_predicates_for_load;
141
2.54k
        _should_enable_condition_cache_handler =
142
2.54k
                &FileScanner::_should_enable_condition_cache_for_load;
143
87.6k
    } else {
144
87.6k
        _init_src_block_handler = &FileScanner::_init_src_block_for_query;
145
87.6k
        _process_src_block_after_read_handler =
146
87.6k
                &FileScanner::_process_src_block_after_read_for_query;
147
87.6k
        _should_push_down_predicates_handler = &FileScanner::_should_push_down_predicates_for_query;
148
87.6k
        _should_enable_condition_cache_handler =
149
87.6k
                &FileScanner::_should_enable_condition_cache_for_query;
150
87.6k
    }
151
90.1k
}
152
153
86.8k
Status FileScanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) {
154
86.8k
    RETURN_IF_ERROR(Scanner::init(state, conjuncts));
155
86.8k
    _get_block_timer =
156
86.8k
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerGetBlockTime", 1);
157
86.8k
    _cast_to_input_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
158
86.8k
                                                      "FileScannerCastInputBlockTime", 1);
159
86.8k
    _fill_missing_columns_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
160
86.8k
                                                       "FileScannerFillMissingColumnTime", 1);
161
86.8k
    _pre_filter_timer =
162
86.8k
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerPreFilterTimer", 1);
163
86.8k
    _convert_to_output_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
164
86.8k
                                                          "FileScannerConvertOuputBlockTime", 1);
165
86.8k
    _runtime_filter_partition_prune_timer = ADD_TIMER_WITH_LEVEL(
166
86.8k
            _local_state->scanner_profile(), "FileScannerRuntimeFilterPartitionPruningTime", 1);
167
86.8k
    _empty_file_counter =
168
86.8k
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "EmptyFileNum", TUnit::UNIT, 1);
169
86.8k
    _not_found_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
170
86.8k
                                                     "NotFoundFileNum", TUnit::UNIT, 1);
171
86.8k
    _fully_skipped_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
172
86.8k
                                                         "FullySkippedFileNum", TUnit::UNIT, 1);
173
86.8k
    _file_counter =
174
86.8k
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT, 1);
175
176
86.8k
    _file_read_bytes_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
177
86.8k
                                                      FileReadBytesProfile, TUnit::BYTES, 1);
178
86.8k
    _file_read_calls_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
179
86.8k
                                                      "FileReadCalls", TUnit::UNIT, 1);
180
86.8k
    _file_read_time_counter =
181
86.8k
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), FileReadTimeProfile, 1);
182
183
86.8k
    _runtime_filter_partition_pruned_range_counter =
184
86.8k
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
185
86.8k
                                   "RuntimeFilterPartitionPrunedRangeNum", TUnit::UNIT, 1);
186
    // Keep the current file's adaptive state while also preserving the peak value across all
187
    // files handled by this scanner instance.
188
86.8k
    _adaptive_batch_predicted_rows_counter =
189
86.8k
            _local_state->scanner_profile()->AddHighWaterMarkCounter(
190
86.8k
                    "AdaptiveBatchPredictedRows", TUnit::UNIT, RuntimeProfile::ROOT_COUNTER, 1);
191
86.8k
    _adaptive_batch_actual_bytes_before_truncate_counter =
192
86.8k
            _local_state->scanner_profile()->AddHighWaterMarkCounter(
193
86.8k
                    "AdaptiveBatchActualBytesBeforeTruncate", TUnit::BYTES,
194
86.8k
                    RuntimeProfile::ROOT_COUNTER, 1);
195
86.8k
    _adaptive_batch_actual_bytes_after_truncate_counter =
196
86.8k
            _local_state->scanner_profile()->AddHighWaterMarkCounter(
197
86.8k
                    "AdaptiveBatchActualBytesAfterTruncate", TUnit::BYTES,
198
86.8k
                    RuntimeProfile::ROOT_COUNTER, 1);
199
86.8k
    _adaptive_batch_probe_count_counter = ADD_COUNTER_WITH_LEVEL(
200
86.8k
            _local_state->scanner_profile(), "AdaptiveBatchProbeCount", TUnit::UNIT, 1);
201
202
86.8k
    _file_cache_statistics.reset(new io::FileCacheStatistics());
203
86.8k
    _file_reader_stats.reset(new io::FileReaderStats());
204
205
86.8k
    RETURN_IF_ERROR(_init_io_ctx());
206
86.8k
    _io_ctx->file_cache_stats = _file_cache_statistics.get();
207
86.8k
    _io_ctx->file_reader_stats = _file_reader_stats.get();
208
86.8k
    _io_ctx->is_disposable = _state->query_options().disable_file_cache;
209
210
86.8k
    if (_is_load) {
211
2.53k
        _src_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
212
2.53k
                                              std::vector<TupleId>({_input_tuple_desc->id()})));
213
        // prepare pre filters
214
2.53k
        if (_params->__isset.pre_filter_exprs_list) {
215
4
            RETURN_IF_ERROR(doris::VExpr::create_expr_trees(_params->pre_filter_exprs_list,
216
4
                                                            _pre_conjunct_ctxs));
217
2.52k
        } else if (_params->__isset.pre_filter_exprs) {
218
0
            VExprContextSPtr context;
219
0
            RETURN_IF_ERROR(doris::VExpr::create_expr_tree(_params->pre_filter_exprs, context));
220
0
            _pre_conjunct_ctxs.emplace_back(context);
221
0
        }
222
223
2.53k
        for (auto& conjunct : _pre_conjunct_ctxs) {
224
5
            RETURN_IF_ERROR(conjunct->prepare(_state, *_src_row_desc));
225
5
            RETURN_IF_ERROR(conjunct->open(_state));
226
5
        }
227
228
2.53k
        _dest_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
229
2.53k
                                               std::vector<TupleId>({_output_tuple_desc->id()})));
230
2.53k
    }
231
232
86.8k
    _default_val_row_desc.reset(
233
86.8k
            new RowDescriptor(_state->desc_tbl(), std::vector<TupleId>({_real_tuple_desc->id()})));
234
235
86.8k
    return Status::OK();
236
86.8k
}
237
238
71.9k
bool FileScanner::_should_enable_adaptive_batch_size(TFileFormatType::type format_type) const {
239
    // Only enable for readers that support set_batch_size().
240
    // Table-format wrappers are covered because they delegate to native readers.
241
71.9k
    if (!config::enable_adaptive_batch_size) {
242
2
        return false;
243
2
    }
244
71.9k
    switch (format_type) {
245
33.1k
    case TFileFormatType::FORMAT_PARQUET:
246
57.7k
    case TFileFormatType::FORMAT_ORC:
247
60.4k
    case TFileFormatType::FORMAT_CSV_PLAIN:
248
60.4k
    case TFileFormatType::FORMAT_CSV_GZ:
249
60.4k
    case TFileFormatType::FORMAT_CSV_BZ2:
250
60.4k
    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
251
60.4k
    case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
252
60.4k
    case TFileFormatType::FORMAT_CSV_LZOP:
253
60.4k
    case TFileFormatType::FORMAT_CSV_DEFLATE:
254
60.4k
    case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
255
60.5k
    case TFileFormatType::FORMAT_PROTO:
256
65.1k
    case TFileFormatType::FORMAT_TEXT:
257
65.9k
    case TFileFormatType::FORMAT_JSON:
258
71.1k
    case TFileFormatType::FORMAT_JNI:
259
71.1k
        return true;
260
811
    default:
261
811
        return false;
262
71.9k
    }
263
71.9k
}
264
265
494k
bool FileScanner::_should_run_adaptive_batch_size() const {
266
    // Skip adaptive batch sizing for pushed-down COUNT(*): the reader is wrapped by CountReader
267
    // and only emits a single aggregated row count instead of materializing real columns, so
268
    // there is no per-row byte cost to learn from and no benefit in tuning the batch size.
269
494k
    return _block_size_predictor != nullptr && _get_push_down_agg_type() != TPushAggOp::type::COUNT;
270
494k
}
271
272
230k
void FileScanner::_reset_adaptive_batch_size_state() {
273
230k
    _block_size_predictor.reset();
274
230k
    COUNTER_SET(_adaptive_batch_predicted_rows_counter, int64_t(0));
275
230k
    COUNTER_SET(_adaptive_batch_actual_bytes_before_truncate_counter, int64_t(0));
276
230k
    COUNTER_SET(_adaptive_batch_actual_bytes_after_truncate_counter, int64_t(0));
277
230k
}
278
279
71.7k
void FileScanner::_init_adaptive_batch_size_state(TFileFormatType::type format_type) {
280
71.7k
    _reset_adaptive_batch_size_state();
281
71.7k
    if (!_should_enable_adaptive_batch_size(format_type)) {
282
813
        return;
283
813
    }
284
285
    // External file readers do not provide reliable memory-size metadata hints. Use a small probe
286
    // batch so the predictor can learn from real FileScanner output quickly.
287
70.9k
    _block_size_predictor = std::make_unique<AdaptiveBlockSizePredictor>(
288
70.9k
            _state->preferred_block_size_bytes(), 0.0, ADAPTIVE_BATCH_INITIAL_PROBE_ROWS,
289
70.9k
            _state->batch_size());
290
70.9k
}
291
292
217k
size_t FileScanner::_predict_reader_batch_rows() {
293
217k
    DCHECK(_block_size_predictor != nullptr);
294
217k
    size_t predicted_rows = _block_size_predictor->predict_next_rows();
295
217k
    COUNTER_SET(_adaptive_batch_predicted_rows_counter, static_cast<int64_t>(predicted_rows));
296
217k
    return predicted_rows;
297
217k
}
298
299
128k
void FileScanner::_update_adaptive_batch_size_before_truncate(const Block& block) {
300
128k
    if (!_should_run_adaptive_batch_size()) {
301
9.94k
        return;
302
9.94k
    }
303
304
    // Learn from the logical bytes before CHAR/VARCHAR truncation. The truncated block can be
305
    // much smaller than the data the reader and FileScanner have already materialized.
306
118k
    COUNTER_SET(_adaptive_batch_actual_bytes_before_truncate_counter,
307
118k
                static_cast<int64_t>(block.bytes()));
308
118k
    if (block.rows() == 0) {
309
14.2k
        return;
310
14.2k
    }
311
312
    // Count a probe only when we actually obtain the first non-empty sample that seeds history.
313
104k
    if (!_block_size_predictor->has_history()) {
314
52.6k
        COUNTER_UPDATE(_adaptive_batch_probe_count_counter, 1);
315
52.6k
    }
316
104k
    _block_size_predictor->update(block);
317
104k
}
318
319
128k
void FileScanner::_update_adaptive_batch_size_after_truncate(const Block& block) {
320
128k
    if (!_should_run_adaptive_batch_size()) {
321
9.94k
        return;
322
9.94k
    }
323
324
    // Keep the post-truncate size only for observability. It should not affect the next batch
325
    // because truncation happens after the upstream memory cost has already been paid.
326
118k
    COUNTER_SET(_adaptive_batch_actual_bytes_after_truncate_counter,
327
118k
                static_cast<int64_t>(block.bytes()));
328
118k
}
329
330
// check if the expr is a partition pruning expr
331
28.0k
bool FileScanner::_check_partition_prune_expr(const VExprSPtr& expr) {
332
28.0k
    if (expr->is_slot_ref()) {
333
11.0k
        auto* slot_ref = static_cast<VSlotRef*>(expr.get());
334
11.0k
        return _partition_slot_index_map.find(slot_ref->slot_id()) !=
335
11.0k
               _partition_slot_index_map.end();
336
11.0k
    }
337
17.0k
    if (expr->is_literal()) {
338
5.33k
        return true;
339
5.33k
    }
340
17.2k
    return std::ranges::all_of(expr->children(), [this](const auto& child) {
341
17.2k
        return _check_partition_prune_expr(child);
342
17.2k
    });
343
17.0k
}
344
345
10.7k
void FileScanner::_init_runtime_filter_partition_prune_ctxs() {
346
10.7k
    _runtime_filter_partition_prune_ctxs.clear();
347
10.9k
    for (auto& conjunct : _conjuncts) {
348
10.9k
        auto impl = conjunct->root()->get_impl();
349
        // If impl is not null, which means this a conjuncts from runtime filter.
350
10.9k
        auto expr = impl ? impl : conjunct->root();
351
10.9k
        if (_check_partition_prune_expr(expr)) {
352
7.76k
            _runtime_filter_partition_prune_ctxs.emplace_back(conjunct);
353
7.76k
        }
354
10.9k
    }
355
10.7k
}
356
357
6.94k
void FileScanner::_init_runtime_filter_partition_prune_block() {
358
    // init block with empty column
359
65.5k
    for (auto const* slot_desc : _real_tuple_desc->slots()) {
360
65.5k
        _runtime_filter_partition_prune_block.insert(
361
65.5k
                ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
362
65.5k
                                      slot_desc->get_data_type_ptr(), slot_desc->col_name()));
363
65.5k
    }
364
6.94k
}
365
366
7.70k
Status FileScanner::_process_runtime_filters_partition_prune(bool& can_filter_all) {
367
7.70k
    SCOPED_TIMER(_runtime_filter_partition_prune_timer);
368
7.70k
    if (_runtime_filter_partition_prune_ctxs.empty() || _partition_col_descs.empty()) {
369
6.28k
        return Status::OK();
370
6.28k
    }
371
1.41k
    size_t partition_value_column_size = 1;
372
373
    // 1. Get partition key values to string columns.
374
1.41k
    std::unordered_map<SlotId, MutableColumnPtr> partition_slot_id_to_column;
375
2.14k
    for (auto const& partition_col_desc : _partition_col_descs) {
376
2.14k
        const auto& [partition_value, partition_slot_desc] = partition_col_desc.second;
377
2.14k
        auto data_type = partition_slot_desc->get_data_type_ptr();
378
2.14k
        auto test_serde = data_type->get_serde();
379
2.14k
        auto partition_value_column = data_type->create_column();
380
2.14k
        auto* col_ptr = static_cast<IColumn*>(partition_value_column.get());
381
2.14k
        Slice slice(partition_value.data(), partition_value.size());
382
2.14k
        uint64_t num_deserialized = 0;
383
2.14k
        DataTypeSerDe::FormatOptions options {};
384
2.14k
        if (_partition_value_is_null.contains(partition_slot_desc->col_name())) {
385
            // for iceberg/paimon table
386
            // NOTICE: column is always be nullable for iceberg/paimon table now
387
0
            DCHECK(data_type->is_nullable());
388
0
            test_serde = test_serde->get_nested_serdes()[0];
389
0
            auto* null_column = assert_cast<ColumnNullable*>(col_ptr);
390
0
            if (_partition_value_is_null[partition_slot_desc->col_name()]) {
391
0
                null_column->insert_many_defaults(partition_value_column_size);
392
0
            } else {
393
                // If the partition value is not null, we set null map to 0 and deserialize it normally.
394
0
                null_column->get_null_map_column().insert_many_vals(0, partition_value_column_size);
395
0
                RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
396
0
                        null_column->get_nested_column(), slice, partition_value_column_size,
397
0
                        &num_deserialized, options));
398
0
            }
399
2.14k
        } else {
400
            // for hive/hudi table, the null value is set as "\\N"
401
            // TODO: this will be unified as iceberg/paimon table in the future
402
2.14k
            RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
403
2.14k
                    *col_ptr, slice, partition_value_column_size, &num_deserialized, options));
404
2.14k
        }
405
406
2.14k
        partition_slot_id_to_column[partition_slot_desc->id()] = std::move(partition_value_column);
407
2.14k
    }
408
409
    // 2. Fill _runtime_filter_partition_prune_block from the partition column, then execute conjuncts and filter block.
410
    // 2.1 Fill _runtime_filter_partition_prune_block from the partition column to match the conjuncts executing.
411
1.41k
    size_t index = 0;
412
1.41k
    bool first_column_filled = false;
413
4.54k
    for (auto const* slot_desc : _real_tuple_desc->slots()) {
414
4.54k
        if (partition_slot_id_to_column.find(slot_desc->id()) !=
415
4.54k
            partition_slot_id_to_column.end()) {
416
2.14k
            auto data_type = slot_desc->get_data_type_ptr();
417
2.14k
            auto partition_value_column = std::move(partition_slot_id_to_column[slot_desc->id()]);
418
2.14k
            if (data_type->is_nullable()) {
419
2.14k
                _runtime_filter_partition_prune_block.insert(
420
2.14k
                        index, ColumnWithTypeAndName(
421
2.14k
                                       ColumnNullable::create(
422
2.14k
                                               std::move(partition_value_column),
423
2.14k
                                               ColumnUInt8::create(partition_value_column_size, 0)),
424
2.14k
                                       data_type, slot_desc->col_name()));
425
2.14k
            } else {
426
4
                _runtime_filter_partition_prune_block.insert(
427
4
                        index, ColumnWithTypeAndName(std::move(partition_value_column), data_type,
428
4
                                                     slot_desc->col_name()));
429
4
            }
430
2.14k
            if (index == 0) {
431
218
                first_column_filled = true;
432
218
            }
433
2.14k
        }
434
4.54k
        index++;
435
4.54k
    }
436
437
    // 2.2 Execute conjuncts.
438
1.41k
    if (!first_column_filled) {
439
        // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
440
        // The following process may be tricky and time-consuming, but we have no other way.
441
1.19k
        _runtime_filter_partition_prune_block.get_by_position(0).column->assume_mutable()->resize(
442
1.19k
                partition_value_column_size);
443
1.19k
    }
444
1.41k
    IColumn::Filter result_filter(_runtime_filter_partition_prune_block.rows(), 1);
445
1.41k
    RETURN_IF_ERROR(VExprContext::execute_conjuncts(_runtime_filter_partition_prune_ctxs, nullptr,
446
1.41k
                                                    &_runtime_filter_partition_prune_block,
447
1.41k
                                                    &result_filter, &can_filter_all));
448
1.41k
    return Status::OK();
449
1.41k
}
450
451
20.9k
Status FileScanner::_process_conjuncts() {
452
20.9k
    _slot_id_to_filter_conjuncts.clear();
453
20.9k
    _not_single_slot_filter_conjuncts.clear();
454
29.7k
    for (auto& conjunct : _push_down_conjuncts) {
455
29.7k
        auto impl = conjunct->root()->get_impl();
456
        // If impl is not null, which means this a conjuncts from runtime filter.
457
29.7k
        auto cur_expr = impl ? impl : conjunct->root();
458
459
29.7k
        std::vector<int> slot_ids;
460
29.7k
        _get_slot_ids(cur_expr.get(), &slot_ids);
461
29.7k
        if (slot_ids.empty()) {
462
1
            _not_single_slot_filter_conjuncts.emplace_back(conjunct);
463
1
            continue;
464
1
        }
465
29.7k
        bool single_slot = true;
466
29.8k
        for (int i = 1; i < slot_ids.size(); i++) {
467
880
            if (slot_ids[i] != slot_ids[0]) {
468
772
                single_slot = false;
469
772
                break;
470
772
            }
471
880
        }
472
29.7k
        if (single_slot) {
473
28.9k
            SlotId slot_id = slot_ids[0];
474
28.9k
            _slot_id_to_filter_conjuncts[slot_id].emplace_back(conjunct);
475
28.9k
        } else {
476
790
            _not_single_slot_filter_conjuncts.emplace_back(conjunct);
477
790
        }
478
29.7k
    }
479
20.9k
    return Status::OK();
480
20.9k
}
481
482
57.7k
Status FileScanner::_process_late_arrival_conjuncts() {
483
57.7k
    if (_push_down_conjuncts.size() < _conjuncts.size()) {
484
20.9k
        _push_down_conjuncts = _conjuncts;
485
        // Do not clear _conjuncts here!
486
        // We must keep it for fallback filtering, especially when mixing
487
        // Native readers (which use _push_down_conjuncts) and JNI readers (which rely on _conjuncts).
488
        // _conjuncts.clear();
489
20.9k
        RETURN_IF_ERROR(_process_conjuncts());
490
20.9k
    }
491
57.7k
    if (_applied_rf_num == _total_rf_num) {
492
57.7k
        _local_state->scanner_profile()->add_info_string("ApplyAllRuntimeFilters", "True");
493
57.7k
    }
494
57.7k
    return Status::OK();
495
57.7k
}
496
497
62.9k
void FileScanner::_get_slot_ids(VExpr* expr, std::vector<int>* slot_ids) {
498
63.9k
    for (auto& child_expr : expr->children()) {
499
63.9k
        if (child_expr->is_slot_ref()) {
500
30.6k
            VSlotRef* slot_ref = reinterpret_cast<VSlotRef*>(child_expr.get());
501
30.6k
            SlotDescriptor* slot_desc = _state->desc_tbl().get_slot_descriptor(slot_ref->slot_id());
502
30.6k
            slot_desc->set_is_predicate(true);
503
30.6k
            slot_ids->emplace_back(slot_ref->slot_id());
504
33.2k
        } else {
505
33.2k
            _get_slot_ids(child_expr.get(), slot_ids);
506
33.2k
        }
507
63.9k
    }
508
62.9k
}
509
510
87.2k
Status FileScanner::_open_impl(RuntimeState* state) {
511
87.2k
    RETURN_IF_CANCELLED(state);
512
87.2k
    RETURN_IF_ERROR(Scanner::_open_impl(state));
513
87.2k
    if (_local_state) {
514
87.1k
        _condition_cache_digest = _local_state->get_condition_cache_digest();
515
87.1k
    }
516
87.2k
    RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range));
517
87.2k
    if (_first_scan_range) {
518
50.6k
        RETURN_IF_ERROR(_init_expr_ctxes());
519
50.6k
        if (_state->query_options().enable_runtime_filter_partition_prune &&
520
50.6k
            !_partition_slot_index_map.empty()) {
521
6.94k
            _init_runtime_filter_partition_prune_ctxs();
522
6.94k
            _init_runtime_filter_partition_prune_block();
523
6.94k
        }
524
50.6k
    } else {
525
        // there's no scan range in split source. stop scanner directly.
526
36.5k
        _scanner_eof = true;
527
36.5k
    }
528
529
87.2k
    return Status::OK();
530
87.2k
}
531
532
323k
Status FileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
533
323k
    Status st = _get_block_wrapped(state, block, eof);
534
535
323k
    if (!st.ok()) {
536
        // add cur path in error msg for easy debugging
537
80
        return std::move(st.append(". cur path: " + get_current_scan_range_name()));
538
80
    }
539
323k
    return st;
540
323k
}
541
542
// For query:
543
//                              [exist cols]  [non-exist cols]  [col from path]  input  output
544
//                              A     B    C  D                 E
545
// _init_src_block              x     x    x  x                 x                -      x
546
// get_next_block               x     x    x  -                 -                -      x
547
// _cast_to_input_block         -     -    -  -                 -                -      -
548
// _fill_columns_from_path      -     -    -  -                 x                -      x
549
// _fill_missing_columns        -     -    -  x                 -                -      x
550
// _convert_to_output_block     -     -    -  -                 -                -      -
551
//
552
// For load:
553
//                              [exist cols]  [non-exist cols]  [col from path]  input  output
554
//                              A     B    C  D                 E
555
// _init_src_block              x     x    x  x                 x                x      -
556
// get_next_block               x     x    x  -                 -                x      -
557
// _cast_to_input_block         x     x    x  -                 -                x      -
558
// _fill_columns_from_path      -     -    -  -                 x                x      -
559
// _fill_missing_columns        -     -    -  x                 -                x      -
560
// _convert_to_output_block     -     -    -  -                 -                -      x
561
323k
Status FileScanner::_get_block_wrapped(RuntimeState* state, Block* block, bool* eof) {
562
323k
    do {
563
323k
        RETURN_IF_CANCELLED(state);
564
323k
        if (_cur_reader == nullptr || _cur_reader_eof) {
565
158k
            _finalize_reader_condition_cache();
566
            // The file may not exist because the file list is got from meta cache,
567
            // And the file may already be removed from storage.
568
            // Just ignore not found files.
569
158k
            Status st = _get_next_reader();
570
158k
            if (st.is<ErrorCode::NOT_FOUND>() && config::ignore_not_found_file_in_external_table) {
571
0
                _cur_reader_eof = true;
572
0
                COUNTER_UPDATE(_not_found_file_counter, 1);
573
0
                continue;
574
158k
            } else if (st.is<ErrorCode::END_OF_FILE>()) {
575
24
                _cur_reader_eof = true;
576
24
                COUNTER_UPDATE(_fully_skipped_file_counter, 1);
577
24
                continue;
578
158k
            } else if (!st) {
579
2
                return st;
580
2
            }
581
158k
            _init_reader_condition_cache();
582
158k
        }
583
584
323k
        if (_scanner_eof) {
585
86.5k
            *eof = true;
586
86.5k
            return Status::OK();
587
86.5k
        }
588
589
        // Init src block for load job based on the data file schema (e.g. parquet)
590
        // For query job, simply set _src_block_ptr to block.
591
236k
        size_t read_rows = 0;
592
236k
        RETURN_IF_ERROR(_init_src_block(block));
593
594
236k
        if (_should_run_adaptive_batch_size()) {
595
217k
            _cur_reader->set_batch_size(_predict_reader_batch_rows());
596
217k
        }
597
236k
        {
598
236k
            SCOPED_TIMER(_get_block_timer);
599
600
            // Read next block.
601
            // Some of column in block may not be filled (column not exist in file)
602
236k
            RETURN_IF_ERROR(
603
236k
                    _cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof));
604
236k
        }
605
        // use read_rows instead of _src_block_ptr->rows(), because the first column of _src_block_ptr
606
        // may not be filled after calling `get_next_block()`, so _src_block_ptr->rows() may return wrong result.
607
236k
        if (read_rows > 0) {
608
132k
            if ((!_cur_reader->count_read_rows()) && _io_ctx) {
609
20.4k
                _io_ctx->file_reader_stats->read_rows += read_rows;
610
20.4k
            }
611
            // If the push_down_agg_type is COUNT, no need to do the rest,
612
            // because we only save a number in block.
613
132k
            if (_get_push_down_agg_type() != TPushAggOp::type::COUNT) {
614
128k
                RETURN_IF_ERROR(_process_src_block_after_read(block));
615
128k
            }
616
132k
        }
617
236k
        break;
618
236k
    } while (true);
619
620
    // Update filtered rows and unselected rows for load, reset counter.
621
    // {
622
    //     state->update_num_rows_load_filtered(_counter.num_rows_filtered);
623
    //     state->update_num_rows_load_unselected(_counter.num_rows_unselected);
624
    //     _reset_counter();
625
    // }
626
236k
    return Status::OK();
627
323k
}
628
629
/**
630
 * Check whether there are complex types in parquet/orc reader in broker/stream load.
631
 * Broker/stream load will cast any type as string type, and complex types will be casted wrong.
632
 * This is a temporary method, and will be replaced by tvf.
633
 */
634
8.71k
Status FileScanner::_check_output_block_types() {
635
    // Only called from _init_src_block_for_load, so _is_load is always true.
636
8.71k
    TFileFormatType::type format_type = _params->format_type;
637
8.71k
    if (format_type == TFileFormatType::FORMAT_PARQUET ||
638
8.71k
        format_type == TFileFormatType::FORMAT_ORC) {
639
1.84k
        for (auto slot : _output_tuple_desc->slots()) {
640
1.84k
            if (is_complex_type(slot->type()->get_primitive_type())) {
641
0
                return Status::InternalError(
642
0
                        "Parquet/orc doesn't support complex types in broker/stream load, "
643
0
                        "please use tvf(table value function) to insert complex types.");
644
0
            }
645
1.84k
        }
646
64
    }
647
8.71k
    return Status::OK();
648
8.71k
}
649
650
236k
Status FileScanner::_init_src_block(Block* block) {
651
236k
    DCHECK(_init_src_block_handler != nullptr);
652
236k
    return (this->*_init_src_block_handler)(block);
653
236k
}
654
655
227k
Status FileScanner::_init_src_block_for_query(Block* block) {
656
227k
    _src_block_ptr = block;
657
658
    // Build name to index map only once on first call.
659
227k
    if (_src_block_name_to_idx.empty()) {
660
51.0k
        _src_block_name_to_idx = block->get_name_to_pos_map();
661
51.0k
    }
662
227k
    return Status::OK();
663
227k
}
664
665
8.71k
Status FileScanner::_init_src_block_for_load(Block* block) {
666
8.71k
    static_cast<void>(block);
667
8.71k
    RETURN_IF_ERROR(_check_output_block_types());
668
669
    // if (_src_block_init) {
670
    //     _src_block.clear_column_data();
671
    //     _src_block_ptr = &_src_block;
672
    //     return Status::OK();
673
    // }
674
675
8.71k
    _src_block.clear();
676
8.71k
    uint32_t idx = 0;
677
    // slots in _input_tuple_desc contains all slots describe in load statement, eg:
678
    // -H "columns: k1, k2, tmp1, k3 = tmp1 + 1"
679
    // _input_tuple_desc will contains: k1, k2, tmp1
680
    // and some of them are from file, such as k1 and k2, and some of them may not exist in file, such as tmp1
681
    // _input_tuple_desc also contains columns from path
682
100k
    for (auto& slot : _input_tuple_desc->slots()) {
683
100k
        DataTypePtr data_type;
684
100k
        auto it = _slot_lower_name_to_col_type.find(slot->col_name());
685
100k
        if (slot->is_skip_bitmap_col()) {
686
358
            _skip_bitmap_col_idx = idx;
687
358
        }
688
100k
        if (_params->__isset.sequence_map_col) {
689
1.29k
            if (_params->sequence_map_col == slot->col_name()) {
690
203
                _sequence_map_col_uid = slot->col_unique_id();
691
203
            }
692
1.29k
        }
693
100k
        data_type =
694
100k
                it == _slot_lower_name_to_col_type.end() ? slot->type() : make_nullable(it->second);
695
100k
        MutableColumnPtr data_column = data_type->create_column();
696
100k
        _src_block.insert(
697
100k
                ColumnWithTypeAndName(std::move(data_column), data_type, slot->col_name()));
698
100k
        _src_block_name_to_idx.emplace(slot->col_name(), idx++);
699
100k
    }
700
8.71k
    if (_params->__isset.sequence_map_col) {
701
1.81k
        for (const auto& slot : _output_tuple_desc->slots()) {
702
            // When the target table has seqeunce map column, _input_tuple_desc will not contains __DORIS_SEQUENCE_COL__,
703
            // so we should get its column unique id from _output_tuple_desc
704
1.81k
            if (slot->is_sequence_col()) {
705
211
                _sequence_col_uid = slot->col_unique_id();
706
211
            }
707
1.81k
        }
708
247
    }
709
8.71k
    _src_block_ptr = &_src_block;
710
8.71k
    _src_block_init = true;
711
8.71k
    return Status::OK();
712
8.71k
}
713
714
6.19k
Status FileScanner::_cast_to_input_block(Block* block) {
715
    // Only called from _process_src_block_after_read_for_load, so _is_load is always true.
716
6.19k
    SCOPED_TIMER(_cast_to_input_block_timer);
717
    // cast primitive type(PT0) to primitive type(PT1)
718
6.19k
    uint32_t idx = 0;
719
78.1k
    for (auto& slot_desc : _input_tuple_desc->slots()) {
720
78.1k
        if (_slot_lower_name_to_col_type.find(slot_desc->col_name()) ==
721
78.1k
            _slot_lower_name_to_col_type.end()) {
722
            // skip columns which does not exist in file
723
587
            continue;
724
587
        }
725
77.5k
        auto& arg = _src_block_ptr->get_by_position(_src_block_name_to_idx[slot_desc->col_name()]);
726
77.5k
        auto return_type = slot_desc->get_data_type_ptr();
727
        // remove nullable here, let the get_function decide whether nullable
728
77.5k
        auto data_type = get_data_type_with_default_argument(remove_nullable(return_type));
729
77.5k
        ColumnsWithTypeAndName arguments {
730
77.5k
                arg, {data_type->create_column(), data_type, slot_desc->col_name()}};
731
77.5k
        auto func_cast =
732
77.5k
                SimpleFunctionFactory::instance().get_function("CAST", arguments, return_type, {});
733
77.5k
        if (!func_cast) {
734
0
            return Status::InternalError("Function CAST[arg={}, col name={}, return={}] not found!",
735
0
                                         arg.type->get_name(), slot_desc->col_name(),
736
0
                                         return_type->get_name());
737
0
        }
738
77.5k
        idx = _src_block_name_to_idx[slot_desc->col_name()];
739
77.5k
        DCHECK(_state != nullptr);
740
77.5k
        auto ctx = FunctionContext::create_context(_state, {}, {});
741
77.5k
        RETURN_IF_ERROR(
742
77.5k
                func_cast->execute(ctx.get(), *_src_block_ptr, {idx}, idx, arg.column->size()));
743
77.5k
        _src_block_ptr->get_by_position(idx).type = std::move(return_type);
744
77.5k
    }
745
6.19k
    return Status::OK();
746
6.19k
}
747
748
6.19k
Status FileScanner::_pre_filter_src_block() {
749
    // Only called from _process_src_block_after_read_for_load, so _is_load is always true.
750
6.19k
    if (!_pre_conjunct_ctxs.empty()) {
751
29
        SCOPED_TIMER(_pre_filter_timer);
752
29
        auto origin_column_num = _src_block_ptr->columns();
753
29
        auto old_rows = _src_block_ptr->rows();
754
29
        RETURN_IF_ERROR(
755
29
                VExprContext::filter_block(_pre_conjunct_ctxs, _src_block_ptr, origin_column_num));
756
29
        _counter.num_rows_unselected += old_rows - _src_block_ptr->rows();
757
29
    }
758
6.19k
    return Status::OK();
759
6.19k
}
760
761
6.19k
Status FileScanner::_convert_to_output_block(Block* block) {
762
    // Only called from _process_src_block_after_read_for_load, so _is_load is always true.
763
6.19k
    SCOPED_TIMER(_convert_to_output_block_timer);
764
    // The block is passed from scanner context's free blocks,
765
    // which is initialized by output columns
766
    // so no need to clear it
767
    // block->clear();
768
769
6.19k
    int ctx_idx = 0;
770
6.19k
    size_t rows = _src_block_ptr->rows();
771
6.19k
    auto filter_column = ColumnUInt8::create(rows, 1);
772
6.19k
    auto& filter_map = filter_column->get_data();
773
774
    // After convert, the column_ptr should be copied into output block.
775
    // Can not use block->insert() because it may cause use_count() non-zero bug
776
6.19k
    MutableBlock mutable_output_block =
777
6.19k
            VectorizedUtils::build_mutable_mem_reuse_block(block, *_dest_row_desc);
778
6.19k
    auto& mutable_output_columns = mutable_output_block.mutable_columns();
779
780
6.19k
    std::vector<BitmapValue>* skip_bitmaps {nullptr};
781
6.19k
    if (_should_process_skip_bitmap_col()) {
782
201
        auto* skip_bitmap_nullable_col_ptr =
783
201
                assert_cast<ColumnNullable*>(_src_block_ptr->get_by_position(_skip_bitmap_col_idx)
784
201
                                                     .column->assume_mutable()
785
201
                                                     .get());
786
201
        skip_bitmaps = &(assert_cast<ColumnBitmap*>(
787
201
                                 skip_bitmap_nullable_col_ptr->get_nested_column_ptr().get())
788
201
                                 ->get_data());
789
        // NOTE:
790
        // - If the table has sequence type column, __DORIS_SEQUENCE_COL__ will be put in _input_tuple_desc, so whether
791
        //   __DORIS_SEQUENCE_COL__ will be marked in skip bitmap depends on whether it's specified in that row
792
        // - If the table has sequence map column, __DORIS_SEQUENCE_COL__ will not be put in _input_tuple_desc,
793
        //   so __DORIS_SEQUENCE_COL__ will be ommited if it't specified in a row and will not be marked in skip bitmap.
794
        //   So we should mark __DORIS_SEQUENCE_COL__ in skip bitmap here if the corresponding sequence map column us marked
795
201
        if (_sequence_map_col_uid != -1) {
796
352
            for (int j = 0; j < rows; ++j) {
797
328
                if ((*skip_bitmaps)[j].contains(_sequence_map_col_uid)) {
798
137
                    (*skip_bitmaps)[j].add(_sequence_col_uid);
799
137
                }
800
328
            }
801
24
        }
802
201
    }
803
804
    // for (auto slot_desc : _output_tuple_desc->slots()) {
805
89.6k
    for (int j = 0; j < mutable_output_columns.size(); ++j) {
806
83.5k
        auto* slot_desc = _output_tuple_desc->slots()[j];
807
83.5k
        int dest_index = ctx_idx;
808
83.5k
        ColumnPtr column_ptr;
809
810
83.5k
        auto& ctx = _dest_vexpr_ctx[dest_index];
811
        // PT1 => dest primitive type
812
83.5k
        RETURN_IF_ERROR(ctx->execute(_src_block_ptr, column_ptr));
813
        // column_ptr maybe a ColumnConst, convert it to a normal column
814
83.5k
        column_ptr = column_ptr->convert_to_full_column_if_const();
815
83.5k
        DCHECK(column_ptr);
816
817
        // because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr
818
        // is likely to be nullable
819
83.5k
        if (LIKELY(column_ptr->is_nullable())) {
820
77.9k
            const auto* nullable_column = reinterpret_cast<const ColumnNullable*>(column_ptr.get());
821
178M
            for (int i = 0; i < rows; ++i) {
822
178M
                if (filter_map[i] && nullable_column->is_null_at(i)) {
823
                    // skip checks for non-mentioned columns in flexible partial update
824
2.00M
                    if (skip_bitmaps == nullptr ||
825
2.00M
                        !skip_bitmaps->at(i).contains(slot_desc->col_unique_id())) {
826
                        // clang-format off
827
1.95M
                        if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) &&
828
1.95M
                            !_src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index]).column->is_null_at(i)) {
829
5.49k
                            filter_map[i] = false;
830
5.49k
                            RETURN_IF_ERROR(_state->append_error_msg_to_file(
831
5.49k
                                [&]() -> std::string {
832
5.49k
                                    return _src_block_ptr->dump_one_line(i, _num_of_columns_from_file);
833
5.49k
                                },
834
5.49k
                                [&]() -> std::string {
835
5.49k
                                    auto raw_value =
836
5.49k
                                            _src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index]).column->get_data_at(i);
837
5.49k
                                    std::string raw_string = raw_value.to_string();
838
5.49k
                                    fmt::memory_buffer error_msg;
839
5.49k
                                    fmt::format_to(error_msg,"column({}) value is incorrect while strict mode is {}, src value is {}",
840
5.49k
                                            slot_desc->col_name(), _strict_mode, raw_string);
841
5.49k
                                    return fmt::to_string(error_msg);
842
5.49k
                                }));
843
1.94M
                        } else if (!slot_desc->is_nullable()) {
844
648
                            filter_map[i] = false;
845
648
                            RETURN_IF_ERROR(_state->append_error_msg_to_file(
846
648
                                [&]() -> std::string {
847
648
                                    return _src_block_ptr->dump_one_line(i, _num_of_columns_from_file);
848
648
                                },
849
648
                                [&]() -> std::string {
850
648
                                    fmt::memory_buffer error_msg;
851
648
                                    fmt::format_to(error_msg, "column({}) values is null while columns is not nullable", slot_desc->col_name());
852
648
                                    return fmt::to_string(error_msg);
853
648
                                }));
854
648
                        }
855
                        // clang-format on
856
1.95M
                    }
857
2.00M
                }
858
178M
            }
859
77.9k
            if (!slot_desc->is_nullable()) {
860
43.9k
                column_ptr = remove_nullable(column_ptr);
861
43.9k
            }
862
77.9k
        } else if (slot_desc->is_nullable()) {
863
115
            column_ptr = make_nullable(column_ptr);
864
115
        }
865
83.4k
        mutable_output_columns[j]->insert_range_from(*column_ptr, 0, rows);
866
83.4k
        ctx_idx++;
867
83.4k
    }
868
869
    // after do the dest block insert operation, clear _src_block to remove the reference of origin column
870
6.19k
    _src_block_ptr->clear();
871
872
6.19k
    size_t dest_size = block->columns();
873
    // do filter
874
6.19k
    block->insert(ColumnWithTypeAndName(std::move(filter_column), std::make_shared<DataTypeUInt8>(),
875
6.19k
                                        "filter column"));
876
6.19k
    RETURN_IF_ERROR(Block::filter_block(block, dest_size, dest_size));
877
878
6.19k
    _counter.num_rows_filtered += rows - block->rows();
879
6.19k
    return Status::OK();
880
6.19k
}
881
882
128k
Status FileScanner::_process_src_block_after_read(Block* block) {
883
128k
    DCHECK(_process_src_block_after_read_handler != nullptr);
884
128k
    return (this->*_process_src_block_after_read_handler)(block);
885
128k
}
886
887
122k
Status FileScanner::_process_src_block_after_read_for_query(Block* block) {
888
122k
    _update_adaptive_batch_size_before_truncate(*block);
889
890
    // Truncate CHAR/VARCHAR columns when target size is smaller than file schema.
891
    // This is needed for external table queries with truncate_char_or_varchar_columns=true.
892
122k
    RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block));
893
894
122k
    _update_adaptive_batch_size_after_truncate(*block);
895
122k
    return Status::OK();
896
122k
}
897
898
6.19k
Status FileScanner::_process_src_block_after_read_for_load(Block* block) {
899
    // Convert the src block columns type in-place.
900
6.19k
    RETURN_IF_ERROR(_cast_to_input_block(block));
901
    // All readers fill partition and missing columns inside get_next_block:
902
    //   ORC/Parquet: in _do_get_next_block via on_fill_partition_columns/on_fill_missing_columns
903
    //   CSV/JSON/others: in on_after_read_block via fill_remaining_columns
904
    // Assert all columns have consistent row counts.
905
6.19k
    if (_src_block_ptr->columns() > 0) {
906
6.19k
        size_t rows = _src_block_ptr->get_by_position(0).column->size();
907
78.1k
        for (size_t i = 1; i < _src_block_ptr->columns(); ++i) {
908
71.9k
            DCHECK_EQ(_src_block_ptr->get_by_position(i).column->size(), rows)
909
0
                    << "Column " << _src_block_ptr->get_by_position(i).name << " has "
910
0
                    << _src_block_ptr->get_by_position(i).column->size() << " rows, expected "
911
0
                    << rows;
912
71.9k
        }
913
6.19k
    }
914
    // Apply _pre_conjunct_ctxs to filter src block.
915
6.19k
    RETURN_IF_ERROR(_pre_filter_src_block());
916
917
    // Convert src block to output block (dest block), then apply filters.
918
6.19k
    RETURN_IF_ERROR(_convert_to_output_block(block));
919
920
6.19k
    _update_adaptive_batch_size_before_truncate(*block);
921
922
    // Truncate CHAR/VARCHAR columns when target size is smaller than file schema.
923
6.19k
    RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block));
924
925
6.19k
    _update_adaptive_batch_size_after_truncate(*block);
926
6.19k
    return Status::OK();
927
6.19k
}
928
929
128k
Status FileScanner::_truncate_char_or_varchar_columns(Block* block) {
930
    // Truncate char columns or varchar columns if size is smaller than file columns
931
    // or not found in the file column schema.
932
128k
    if (!_state->query_options().truncate_char_or_varchar_columns) {
933
128k
        return Status::OK();
934
128k
    }
935
10
    int idx = 0;
936
36
    for (auto* slot_desc : _real_tuple_desc->slots()) {
937
36
        const auto& type = slot_desc->type();
938
36
        if (type->get_primitive_type() != TYPE_VARCHAR && type->get_primitive_type() != TYPE_CHAR) {
939
12
            ++idx;
940
12
            continue;
941
12
        }
942
24
        auto iter = _source_file_col_name_types.find(slot_desc->col_name());
943
24
        if (iter != _source_file_col_name_types.end()) {
944
16
            const auto file_type_desc = _source_file_col_name_types[slot_desc->col_name()];
945
16
            int l = -1;
946
16
            if (auto* ftype = check_and_get_data_type<DataTypeString>(
947
16
                        remove_nullable(file_type_desc).get())) {
948
16
                l = ftype->len();
949
16
            }
950
16
            if ((assert_cast<const DataTypeString*>(remove_nullable(type).get())->len() > 0) &&
951
16
                (assert_cast<const DataTypeString*>(remove_nullable(type).get())->len() < l ||
952
16
                 l < 0)) {
953
16
                _truncate_char_or_varchar_column(
954
16
                        block, idx,
955
16
                        assert_cast<const DataTypeString*>(remove_nullable(type).get())->len());
956
16
            }
957
16
        } else {
958
8
            _truncate_char_or_varchar_column(
959
8
                    block, idx,
960
8
                    assert_cast<const DataTypeString*>(remove_nullable(type).get())->len());
961
8
        }
962
24
        ++idx;
963
24
    }
964
10
    return Status::OK();
965
128k
}
966
967
// VARCHAR substring(VARCHAR str, INT pos[, INT len])
968
24
void FileScanner::_truncate_char_or_varchar_column(Block* block, int idx, int len) {
969
24
    auto int_type = std::make_shared<DataTypeInt32>();
970
24
    uint32_t num_columns_without_result = block->columns();
971
24
    const ColumnNullable* col_nullable =
972
24
            assert_cast<const ColumnNullable*>(block->get_by_position(idx).column.get());
973
24
    const ColumnPtr& string_column_ptr = col_nullable->get_nested_column_ptr();
974
24
    ColumnPtr null_map_column_ptr = col_nullable->get_null_map_column_ptr();
975
24
    block->replace_by_position(idx, std::move(string_column_ptr));
976
24
    block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(1)), int_type,
977
24
                   "const 1"}); // pos is 1
978
24
    block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(len)), int_type,
979
24
                   fmt::format("const {}", len)});                          // len
980
24
    block->insert({nullptr, std::make_shared<DataTypeString>(), "result"}); // result column
981
24
    ColumnNumbers temp_arguments(3);
982
24
    temp_arguments[0] = idx;                            // str column
983
24
    temp_arguments[1] = num_columns_without_result;     // pos
984
24
    temp_arguments[2] = num_columns_without_result + 1; // len
985
24
    uint32_t result_column_id = num_columns_without_result + 2;
986
987
24
    SubstringUtil::substring_execute(*block, temp_arguments, result_column_id, block->rows());
988
24
    auto res = ColumnNullable::create(block->get_by_position(result_column_id).column,
989
24
                                      null_map_column_ptr);
990
24
    block->replace_by_position(idx, std::move(res));
991
24
    Block::erase_useless_column(block, num_columns_without_result);
992
24
}
993
994
7.46k
std::shared_ptr<segment_v2::RowIdColumnIteratorV2> FileScanner::_create_row_id_column_iterator() {
995
7.46k
    auto& id_file_map = _state->get_id_file_map();
996
7.46k
    auto file_id = id_file_map->get_file_mapping_id(
997
7.46k
            std::make_shared<FileMapping>(((FileScanLocalState*)_local_state)->parent_id(),
998
7.46k
                                          _current_range, _should_enable_file_meta_cache()));
999
7.46k
    return std::make_shared<RowIdColumnIteratorV2>(IdManager::ID_VERSION,
1000
7.46k
                                                   BackendOptions::get_backend_id(), file_id);
1001
7.46k
}
1002
1003
74.4k
void FileScanner::_fill_base_init_context(ReaderInitContext* ctx) {
1004
74.4k
    ctx->column_descs = &_column_descs;
1005
74.4k
    ctx->col_name_to_block_idx = &_src_block_name_to_idx;
1006
74.4k
    ctx->state = _state;
1007
74.4k
    ctx->tuple_descriptor = _real_tuple_desc;
1008
74.4k
    ctx->row_descriptor = _default_val_row_desc.get();
1009
74.4k
    ctx->params = _params;
1010
74.4k
    ctx->range = &_current_range;
1011
74.4k
    ctx->table_info_node = TableSchemaChangeHelper::ConstNode::get_instance();
1012
74.4k
    ctx->push_down_agg_type = _get_push_down_agg_type();
1013
74.4k
}
1014
1015
158k
Status FileScanner::_get_next_reader() {
1016
158k
    while (true) {
1017
158k
        if (_cur_reader) {
1018
71.3k
            _cur_reader->collect_profile_before_close();
1019
71.3k
            RETURN_IF_ERROR(_cur_reader->close());
1020
71.3k
            _state->update_num_finished_scan_range(1);
1021
71.3k
        }
1022
158k
        _cur_reader.reset(nullptr);
1023
158k
        _reset_adaptive_batch_size_state();
1024
158k
        _src_block_init = false;
1025
158k
        bool has_next = _first_scan_range;
1026
158k
        if (!_first_scan_range) {
1027
107k
            RETURN_IF_ERROR(_split_source->get_next(&has_next, &_current_range));
1028
107k
        }
1029
158k
        _first_scan_range = false;
1030
158k
        if (!has_next || _should_stop) {
1031
86.5k
            _scanner_eof = true;
1032
86.5k
            return Status::OK();
1033
86.5k
        }
1034
1035
72.1k
        const TFileRangeDesc& range = _current_range;
1036
72.1k
        _current_range_path = range.path;
1037
1038
72.1k
        if (!_partition_slot_index_map.empty()) {
1039
            // we need get partition columns first for runtime filter partition pruning
1040
7.70k
            RETURN_IF_ERROR(_generate_partition_columns());
1041
1042
7.70k
            if (_state->query_options().enable_runtime_filter_partition_prune) {
1043
                // if enable_runtime_filter_partition_prune is true, we need to check whether this range can be filtered out
1044
                // by runtime filter partition prune
1045
7.70k
                if (_push_down_conjuncts.size() < _conjuncts.size()) {
1046
                    // there are new runtime filters, need to re-init runtime filter partition pruning ctxs
1047
3.77k
                    _init_runtime_filter_partition_prune_ctxs();
1048
3.77k
                }
1049
1050
7.70k
                bool can_filter_all = false;
1051
7.70k
                RETURN_IF_ERROR(_process_runtime_filters_partition_prune(can_filter_all));
1052
7.70k
                if (can_filter_all) {
1053
                    // this range can be filtered out by runtime filter partition pruning
1054
                    // so we need to skip this range
1055
80
                    COUNTER_UPDATE(_runtime_filter_partition_pruned_range_counter, 1);
1056
80
                    continue;
1057
80
                }
1058
7.70k
            }
1059
7.70k
        }
1060
1061
        // create reader for specific format
1062
72.0k
        Status init_status = Status::OK();
1063
72.0k
        TFileFormatType::type format_type = _get_current_format_type();
1064
        // for compatibility, this logic is deprecated in 3.1
1065
72.0k
        if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params) {
1066
5.18k
            if (range.table_format_params.table_format_type == "paimon" &&
1067
5.18k
                !range.table_format_params.paimon_params.__isset.paimon_split) {
1068
                // use native reader
1069
0
                auto format = range.table_format_params.paimon_params.file_format;
1070
0
                if (format == "orc") {
1071
0
                    format_type = TFileFormatType::FORMAT_ORC;
1072
0
                } else if (format == "parquet") {
1073
0
                    format_type = TFileFormatType::FORMAT_PARQUET;
1074
0
                } else {
1075
0
                    return Status::InternalError("Not supported paimon file format: {}", format);
1076
0
                }
1077
0
            }
1078
5.18k
        }
1079
1080
72.0k
        bool push_down_predicates = _should_push_down_predicates(format_type);
1081
72.0k
        bool need_to_get_parsed_schema = false;
1082
72.0k
        switch (format_type) {
1083
5.18k
        case TFileFormatType::FORMAT_JNI: {
1084
5.18k
            ReaderInitContext jni_ctx;
1085
5.18k
            _fill_base_init_context(&jni_ctx);
1086
1087
5.18k
            if (range.__isset.table_format_params &&
1088
5.18k
                range.table_format_params.table_format_type == "max_compute") {
1089
0
                const auto* mc_desc = static_cast<const MaxComputeTableDescriptor*>(
1090
0
                        _real_tuple_desc->table_desc());
1091
0
                if (!mc_desc->init_status()) {
1092
0
                    return mc_desc->init_status();
1093
0
                }
1094
0
                std::unique_ptr<MaxComputeJniReader> mc_reader = MaxComputeJniReader::create_unique(
1095
0
                        mc_desc, range.table_format_params.max_compute_params, _file_slot_descs,
1096
0
                        range, _state, _profile);
1097
0
                init_status = static_cast<GenericReader*>(mc_reader.get())->init_reader(&jni_ctx);
1098
0
                _cur_reader = std::move(mc_reader);
1099
5.18k
            } else if (range.__isset.table_format_params &&
1100
5.18k
                       range.table_format_params.table_format_type == "paimon") {
1101
1.91k
                if (_state->query_options().__isset.enable_paimon_cpp_reader &&
1102
1.91k
                    _state->query_options().enable_paimon_cpp_reader) {
1103
0
                    auto cpp_reader = PaimonCppReader::create_unique(_file_slot_descs, _state,
1104
0
                                                                     _profile, range, _params);
1105
0
                    if (!_is_load && !_push_down_conjuncts.empty()) {
1106
0
                        PaimonPredicateConverter predicate_converter(_file_slot_descs, _state);
1107
0
                        auto predicate = predicate_converter.build(_push_down_conjuncts);
1108
0
                        if (predicate) {
1109
0
                            cpp_reader->set_predicate(std::move(predicate));
1110
0
                        }
1111
0
                    }
1112
0
                    init_status =
1113
0
                            static_cast<GenericReader*>(cpp_reader.get())->init_reader(&jni_ctx);
1114
0
                    _cur_reader = std::move(cpp_reader);
1115
1.91k
                } else {
1116
1.91k
                    auto paimon_reader = PaimonJniReader::create_unique(_file_slot_descs, _state,
1117
1.91k
                                                                        _profile, range, _params);
1118
1.91k
                    init_status =
1119
1.91k
                            static_cast<GenericReader*>(paimon_reader.get())->init_reader(&jni_ctx);
1120
1.91k
                    _cur_reader = std::move(paimon_reader);
1121
1.91k
                }
1122
3.27k
            } else if (range.__isset.table_format_params &&
1123
3.27k
                       range.table_format_params.table_format_type == "hudi") {
1124
0
                auto hudi_reader = HudiJniReader::create_unique(
1125
0
                        *_params, range.table_format_params.hudi_params, _file_slot_descs, _state,
1126
0
                        _profile);
1127
0
                init_status = static_cast<GenericReader*>(hudi_reader.get())->init_reader(&jni_ctx);
1128
0
                _cur_reader = std::move(hudi_reader);
1129
3.27k
            } else if (range.__isset.table_format_params &&
1130
3.27k
                       range.table_format_params.table_format_type == "trino_connector") {
1131
790
                auto trino_reader = TrinoConnectorJniReader::create_unique(_file_slot_descs, _state,
1132
790
                                                                           _profile, range);
1133
790
                init_status =
1134
790
                        static_cast<GenericReader*>(trino_reader.get())->init_reader(&jni_ctx);
1135
790
                _cur_reader = std::move(trino_reader);
1136
2.48k
            } else if (range.__isset.table_format_params &&
1137
2.48k
                       range.table_format_params.table_format_type == "jdbc") {
1138
                // Extract jdbc params from table_format_params
1139
1.24k
                std::map<std::string, std::string> jdbc_params(
1140
1.24k
                        range.table_format_params.jdbc_params.begin(),
1141
1.24k
                        range.table_format_params.jdbc_params.end());
1142
1.24k
                auto jdbc_reader = JdbcJniReader::create_unique(_file_slot_descs, _state, _profile,
1143
1.24k
                                                                jdbc_params);
1144
1.24k
                init_status = static_cast<GenericReader*>(jdbc_reader.get())->init_reader(&jni_ctx);
1145
1.24k
                _cur_reader = std::move(jdbc_reader);
1146
1.24k
            } else if (range.__isset.table_format_params &&
1147
1.24k
                       range.table_format_params.table_format_type == "iceberg") {
1148
1.24k
                auto iceberg_sys_reader = IcebergSysTableJniReader::create_unique(
1149
1.24k
                        _file_slot_descs, _state, _profile, range, _params);
1150
1.24k
                init_status = static_cast<GenericReader*>(iceberg_sys_reader.get())
1151
1.24k
                                      ->init_reader(&jni_ctx);
1152
1.24k
                _cur_reader = std::move(iceberg_sys_reader);
1153
1.24k
            }
1154
            // Set col_name_to_block_idx for JNI readers to avoid repeated map creation
1155
5.18k
            if (_cur_reader) {
1156
5.18k
                if (auto* jni_reader = dynamic_cast<JniReader*>(_cur_reader.get())) {
1157
5.18k
                    jni_reader->set_col_name_to_block_idx(&_src_block_name_to_idx);
1158
5.18k
                }
1159
5.18k
            }
1160
5.18k
            break;
1161
5.18k
        }
1162
33.2k
        case TFileFormatType::FORMAT_PARQUET: {
1163
33.2k
            auto file_meta_cache_ptr = _should_enable_file_meta_cache()
1164
33.2k
                                               ? ExecEnv::GetInstance()->file_meta_cache()
1165
33.2k
                                               : nullptr;
1166
33.2k
            if (push_down_predicates) {
1167
33.2k
                RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1168
33.2k
            }
1169
33.2k
            RETURN_IF_ERROR(_init_parquet_reader(file_meta_cache_ptr));
1170
1171
33.2k
            need_to_get_parsed_schema = true;
1172
33.2k
            break;
1173
33.2k
        }
1174
24.5k
        case TFileFormatType::FORMAT_ORC: {
1175
24.5k
            auto file_meta_cache_ptr = _should_enable_file_meta_cache()
1176
24.5k
                                               ? ExecEnv::GetInstance()->file_meta_cache()
1177
24.5k
                                               : nullptr;
1178
24.5k
            if (push_down_predicates) {
1179
24.5k
                RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1180
24.5k
            }
1181
24.5k
            RETURN_IF_ERROR(_init_orc_reader(file_meta_cache_ptr));
1182
1183
24.5k
            need_to_get_parsed_schema = true;
1184
24.5k
            break;
1185
24.5k
        }
1186
2.71k
        case TFileFormatType::FORMAT_CSV_PLAIN:
1187
2.71k
        case TFileFormatType::FORMAT_CSV_GZ:
1188
2.71k
        case TFileFormatType::FORMAT_CSV_BZ2:
1189
2.71k
        case TFileFormatType::FORMAT_CSV_LZ4FRAME:
1190
2.71k
        case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
1191
2.71k
        case TFileFormatType::FORMAT_CSV_LZOP:
1192
2.71k
        case TFileFormatType::FORMAT_CSV_DEFLATE:
1193
2.71k
        case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
1194
2.79k
        case TFileFormatType::FORMAT_PROTO: {
1195
2.79k
            auto reader =
1196
2.79k
                    CsvReader::create_unique(_state, _profile, &_counter, *_params, range,
1197
2.79k
                                             _file_slot_descs, _state->batch_size(), _io_ctx.get());
1198
2.79k
            CsvInitContext csv_ctx;
1199
2.79k
            _fill_base_init_context(&csv_ctx);
1200
2.79k
            csv_ctx.is_load = _is_load;
1201
2.79k
            init_status = static_cast<GenericReader*>(reader.get())->init_reader(&csv_ctx);
1202
2.79k
            _cur_reader = std::move(reader);
1203
2.79k
            break;
1204
2.71k
        }
1205
4.58k
        case TFileFormatType::FORMAT_TEXT: {
1206
4.58k
            auto reader = TextReader::create_unique(_state, _profile, &_counter, *_params, range,
1207
4.58k
                                                    _file_slot_descs, _state->batch_size(),
1208
4.58k
                                                    _io_ctx.get());
1209
4.58k
            CsvInitContext text_ctx;
1210
4.58k
            _fill_base_init_context(&text_ctx);
1211
4.58k
            text_ctx.is_load = _is_load;
1212
4.58k
            init_status = static_cast<GenericReader*>(reader.get())->init_reader(&text_ctx);
1213
4.58k
            _cur_reader = std::move(reader);
1214
4.58k
            break;
1215
2.71k
        }
1216
821
        case TFileFormatType::FORMAT_JSON: {
1217
821
            _cur_reader = NewJsonReader::create_unique(_state, _profile, &_counter, *_params, range,
1218
821
                                                       _file_slot_descs, &_scanner_eof,
1219
821
                                                       _state->batch_size(), _io_ctx.get());
1220
821
            JsonInitContext json_ctx;
1221
821
            _fill_base_init_context(&json_ctx);
1222
821
            json_ctx.col_default_value_ctx = &_col_default_value_ctx;
1223
821
            json_ctx.is_load = _is_load;
1224
821
            init_status = _cur_reader->init_reader(&json_ctx);
1225
821
            break;
1226
2.71k
        }
1227
1228
0
        case TFileFormatType::FORMAT_WAL: {
1229
0
            _cur_reader = WalReader::create_unique(_state);
1230
0
            WalInitContext wal_ctx;
1231
0
            _fill_base_init_context(&wal_ctx);
1232
0
            wal_ctx.output_tuple_descriptor = _output_tuple_desc;
1233
0
            init_status = _cur_reader->init_reader(&wal_ctx);
1234
0
            break;
1235
2.71k
        }
1236
3
        case TFileFormatType::FORMAT_NATIVE: {
1237
3
            auto reader =
1238
3
                    NativeReader::create_unique(_profile, *_params, range, _io_ctx.get(), _state);
1239
3
            ReaderInitContext native_ctx;
1240
3
            _fill_base_init_context(&native_ctx);
1241
3
            init_status = static_cast<GenericReader*>(reader.get())->init_reader(&native_ctx);
1242
3
            _cur_reader = std::move(reader);
1243
3
            need_to_get_parsed_schema = false;
1244
3
            break;
1245
2.71k
        }
1246
108
        case TFileFormatType::FORMAT_ARROW: {
1247
108
            ReaderInitContext arrow_ctx;
1248
108
            _fill_base_init_context(&arrow_ctx);
1249
1250
108
            if (range.__isset.table_format_params &&
1251
108
                range.table_format_params.table_format_type == "remote_doris") {
1252
98
                auto doris_reader =
1253
98
                        RemoteDorisReader::create_unique(_file_slot_descs, _state, _profile, range);
1254
98
                init_status =
1255
98
                        static_cast<GenericReader*>(doris_reader.get())->init_reader(&arrow_ctx);
1256
98
                if (doris_reader) {
1257
98
                    doris_reader->set_col_name_to_block_idx(&_src_block_name_to_idx);
1258
98
                }
1259
98
                _cur_reader = std::move(doris_reader);
1260
98
            } else {
1261
10
                auto arrow_reader =
1262
10
                        ArrowStreamReader::create_unique(_state, _profile, &_counter, *_params,
1263
10
                                                         range, _file_slot_descs, _io_ctx.get());
1264
10
                init_status =
1265
10
                        static_cast<GenericReader*>(arrow_reader.get())->init_reader(&arrow_ctx);
1266
10
                _cur_reader = std::move(arrow_reader);
1267
10
            }
1268
108
            break;
1269
2.71k
        }
1270
0
#ifdef BUILD_RUST_READERS
1271
28
        case TFileFormatType::FORMAT_LANCE: {
1272
28
            auto lance_reader = LanceRustReader::create_unique(_file_slot_descs, _state, _profile,
1273
28
                                                               range, _params);
1274
28
            init_status = lance_reader->init_reader();
1275
28
            _cur_reader = std::move(lance_reader);
1276
28
            need_to_get_parsed_schema = true;
1277
28
            break;
1278
2.71k
        }
1279
0
#endif
1280
672
        case TFileFormatType::FORMAT_ES_HTTP: {
1281
672
            _cur_reader = EsHttpReader::create_unique(_file_slot_descs, _state, _profile, range,
1282
672
                                                      *_params, _real_tuple_desc);
1283
672
            init_status = static_cast<EsHttpReader*>(_cur_reader.get())->init_reader();
1284
672
            break;
1285
2.71k
        }
1286
0
        default:
1287
0
            return Status::NotSupported("Not supported create reader for file format: {}.",
1288
0
                                        to_string(_params->format_type));
1289
72.0k
        }
1290
1291
71.9k
        if (_cur_reader == nullptr) {
1292
0
            return Status::NotSupported(
1293
0
                    "Not supported create reader for table format: {} / file format: {}.",
1294
0
                    range.__isset.table_format_params ? range.table_format_params.table_format_type
1295
0
                                                      : "NotSet",
1296
0
                    to_string(_params->format_type));
1297
0
        }
1298
71.9k
        COUNTER_UPDATE(_file_counter, 1);
1299
        // The FileScanner for external table may try to open not exist files,
1300
        // Because FE file cache for external table may out of date.
1301
        // So, NOT_FOUND for FileScanner is not a fail case.
1302
        // Will remove this after file reader refactor.
1303
71.9k
        if (init_status.is<END_OF_FILE>()) {
1304
0
            COUNTER_UPDATE(_empty_file_counter, 1);
1305
0
            continue;
1306
71.9k
        } else if (init_status.is<ErrorCode::NOT_FOUND>()) {
1307
0
            if (config::ignore_not_found_file_in_external_table) {
1308
0
                COUNTER_UPDATE(_not_found_file_counter, 1);
1309
0
                continue;
1310
0
            }
1311
0
            return Status::InternalError("failed to find reader, err: {}", init_status.to_string());
1312
71.9k
        } else if (!init_status.ok()) {
1313
1
            return Status::InternalError("failed to init reader, err: {}", init_status.to_string());
1314
1
        }
1315
1316
        // For table-level COUNT pushdown, offsets are undefined so we must skip
1317
        // _set_fill_or_truncate_columns (it uses [start_offset, end_offset] to
1318
        // filter row groups, which would produce incorrect empty results).
1319
71.9k
        bool is_table_level_count = _get_push_down_agg_type() == TPushAggOp::type::COUNT &&
1320
71.9k
                                    range.__isset.table_format_params &&
1321
71.9k
                                    range.table_format_params.table_level_row_count >= 0;
1322
71.9k
        if (!is_table_level_count) {
1323
71.6k
            Status status = _set_fill_or_truncate_columns(need_to_get_parsed_schema);
1324
71.6k
            if (status.is<END_OF_FILE>()) { // all parquet row groups are filtered
1325
0
                continue;
1326
71.6k
            } else if (!status.ok()) {
1327
0
                return Status::InternalError("failed to set_fill_or_truncate_columns, err: {}",
1328
0
                                             status.to_string());
1329
0
            }
1330
71.6k
        }
1331
1332
        // Unified COUNT(*) pushdown: replace the real reader with CountReader
1333
        // decorator if the reader accepts COUNT and can provide a total row count.
1334
71.9k
        if (_cur_reader->get_push_down_agg_type() == TPushAggOp::type::COUNT) {
1335
1.83k
            int64_t total_rows = -1;
1336
1.83k
            if (is_table_level_count) {
1337
                // FE-provided count (may account for table-format deletions)
1338
210
                total_rows = range.table_format_params.table_level_row_count;
1339
1.62k
            } else if (_cur_reader->supports_count_pushdown()) {
1340
                // File metadata count (ORC footer / Parquet row groups)
1341
588
                total_rows = _cur_reader->get_total_rows();
1342
588
            }
1343
1.83k
            if (total_rows >= 0) {
1344
798
                auto batch_size = _state->batch_size();
1345
798
                _cur_reader = std::make_unique<CountReader>(total_rows, batch_size,
1346
798
                                                            std::move(_cur_reader));
1347
798
            }
1348
1.83k
        }
1349
71.9k
        _cur_reader_eof = false;
1350
71.9k
        _init_adaptive_batch_size_state(format_type);
1351
71.9k
        break;
1352
71.9k
    }
1353
71.9k
    return Status::OK();
1354
158k
}
1355
1356
Status FileScanner::_init_parquet_reader(FileMetaCache* file_meta_cache_ptr,
1357
34.5k
                                         std::unique_ptr<ParquetReader> parquet_reader) {
1358
34.5k
    const TFileRangeDesc& range = _current_range;
1359
34.5k
    Status init_status = Status::OK();
1360
1361
34.5k
    phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> slot_id_to_predicates =
1362
34.5k
            _local_state
1363
34.5k
                    ? _local_state->cast<FileScanLocalState>()._slot_id_to_predicates
1364
34.5k
                    : phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> {};
1365
1366
    // Build unified ParquetInitContext (shared by all Parquet reader variants)
1367
34.5k
    ParquetInitContext pctx;
1368
34.5k
    _fill_base_init_context(&pctx);
1369
34.5k
    pctx.conjuncts = &_push_down_conjuncts;
1370
34.5k
    pctx.slot_id_to_predicates = &slot_id_to_predicates;
1371
34.5k
    pctx.colname_to_slot_id = _col_name_to_slot_id;
1372
34.5k
    pctx.not_single_slot_filter_conjuncts = &_not_single_slot_filter_conjuncts;
1373
34.5k
    pctx.slot_id_to_filter_conjuncts = &_slot_id_to_filter_conjuncts;
1374
1375
34.5k
    if (range.__isset.table_format_params &&
1376
34.5k
        range.table_format_params.table_format_type == "iceberg") {
1377
        // IcebergParquetReader IS-A ParquetReader (CRTP mixin), no wrapping needed
1378
18.2k
        std::unique_ptr<IcebergParquetReader> iceberg_reader = IcebergParquetReader::create_unique(
1379
18.2k
                _kv_cache, _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(),
1380
18.2k
                _io_ctx.get(), _state, file_meta_cache_ptr);
1381
18.2k
        iceberg_reader->set_create_row_id_column_iterator_func(
1382
18.2k
                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
1383
246
                    return _create_row_id_column_iterator();
1384
246
                });
1385
18.2k
        init_status = static_cast<GenericReader*>(iceberg_reader.get())->init_reader(&pctx);
1386
18.2k
        _cur_reader = std::move(iceberg_reader);
1387
18.2k
    } else if (range.__isset.table_format_params &&
1388
16.3k
               range.table_format_params.table_format_type == "paimon") {
1389
        // PaimonParquetReader IS-A ParquetReader, no wrapping needed
1390
2.68k
        auto paimon_reader = PaimonParquetReader::create_unique(
1391
2.68k
                _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(), _kv_cache,
1392
2.68k
                _io_ctx.get(), _state, file_meta_cache_ptr);
1393
2.68k
        init_status = static_cast<GenericReader*>(paimon_reader.get())->init_reader(&pctx);
1394
2.68k
        _cur_reader = std::move(paimon_reader);
1395
13.6k
    } else if (range.__isset.table_format_params &&
1396
13.6k
               range.table_format_params.table_format_type == "hudi") {
1397
        // HudiParquetReader IS-A ParquetReader, no wrapping needed
1398
0
        auto hudi_reader = HudiParquetReader::create_unique(
1399
0
                _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(),
1400
0
                _io_ctx.get(), _state, file_meta_cache_ptr);
1401
0
        init_status = static_cast<GenericReader*>(hudi_reader.get())->init_reader(&pctx);
1402
0
        _cur_reader = std::move(hudi_reader);
1403
13.6k
    } else if (range.table_format_params.table_format_type == "hive") {
1404
10.4k
        auto hive_reader = HiveParquetReader::create_unique(
1405
10.4k
                _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(),
1406
10.4k
                _io_ctx.get(), _state, &_is_file_slot, file_meta_cache_ptr,
1407
10.4k
                _state->query_options().enable_parquet_lazy_mat);
1408
10.4k
        hive_reader->set_create_row_id_column_iterator_func(
1409
10.4k
                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
1410
1.83k
                    return _create_row_id_column_iterator();
1411
1.83k
                });
1412
10.4k
        init_status = static_cast<GenericReader*>(hive_reader.get())->init_reader(&pctx);
1413
10.4k
        _cur_reader = std::move(hive_reader);
1414
10.4k
    } else if (range.table_format_params.table_format_type == "tvf") {
1415
3.10k
        if (!parquet_reader) {
1416
3.05k
            parquet_reader = ParquetReader::create_unique(
1417
3.05k
                    _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(),
1418
3.05k
                    _io_ctx.get(), _state, file_meta_cache_ptr,
1419
3.05k
                    _state->query_options().enable_parquet_lazy_mat);
1420
3.05k
        }
1421
3.10k
        parquet_reader->set_create_row_id_column_iterator_func(
1422
3.10k
                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
1423
52
                    return _create_row_id_column_iterator();
1424
52
                });
1425
3.10k
        init_status = static_cast<GenericReader*>(parquet_reader.get())->init_reader(&pctx);
1426
3.10k
        _cur_reader = std::move(parquet_reader);
1427
3.10k
    } else if (_is_load) {
1428
24
        if (!parquet_reader) {
1429
24
            parquet_reader = ParquetReader::create_unique(
1430
24
                    _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(),
1431
24
                    _io_ctx.get(), _state, file_meta_cache_ptr,
1432
24
                    _state->query_options().enable_parquet_lazy_mat);
1433
24
        }
1434
24
        init_status = static_cast<GenericReader*>(parquet_reader.get())->init_reader(&pctx);
1435
24
        _cur_reader = std::move(parquet_reader);
1436
24
    }
1437
1438
34.5k
    return init_status;
1439
34.5k
}
1440
1441
Status FileScanner::_init_orc_reader(FileMetaCache* file_meta_cache_ptr,
1442
26.4k
                                     std::unique_ptr<OrcReader> orc_reader) {
1443
26.4k
    const TFileRangeDesc& range = _current_range;
1444
26.4k
    Status init_status = Status::OK();
1445
1446
    // Build unified OrcInitContext (shared by all ORC reader variants)
1447
26.4k
    OrcInitContext octx;
1448
26.4k
    _fill_base_init_context(&octx);
1449
26.4k
    octx.conjuncts = &_push_down_conjuncts;
1450
26.4k
    octx.not_single_slot_filter_conjuncts = &_not_single_slot_filter_conjuncts;
1451
26.4k
    octx.slot_id_to_filter_conjuncts = &_slot_id_to_filter_conjuncts;
1452
1453
26.4k
    if (range.__isset.table_format_params &&
1454
26.4k
        range.table_format_params.table_format_type == "transactional_hive") {
1455
        // TransactionalHiveReader IS-A OrcReader, no wrapping needed
1456
292
        auto tran_orc_reader = TransactionalHiveReader::create_unique(
1457
292
                _profile, _state, *_params, range, _state->batch_size(), _state->timezone(),
1458
292
                _io_ctx.get(), file_meta_cache_ptr);
1459
292
        tran_orc_reader->set_create_row_id_column_iterator_func(
1460
292
                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
1461
54
                    return _create_row_id_column_iterator();
1462
54
                });
1463
292
        init_status = static_cast<GenericReader*>(tran_orc_reader.get())->init_reader(&octx);
1464
1465
292
        _cur_reader = std::move(tran_orc_reader);
1466
26.1k
    } else if (range.__isset.table_format_params &&
1467
26.1k
               range.table_format_params.table_format_type == "iceberg") {
1468
        // IcebergOrcReader IS-A OrcReader (CRTP mixin), no wrapping needed
1469
6.48k
        std::unique_ptr<IcebergOrcReader> iceberg_reader = IcebergOrcReader::create_unique(
1470
6.48k
                _kv_cache, _profile, _state, *_params, range, _state->batch_size(),
1471
6.48k
                _state->timezone(), _io_ctx.get(), file_meta_cache_ptr);
1472
6.48k
        iceberg_reader->set_create_row_id_column_iterator_func(
1473
6.48k
                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
1474
230
                    return _create_row_id_column_iterator();
1475
230
                });
1476
6.48k
        init_status = static_cast<GenericReader*>(iceberg_reader.get())->init_reader(&octx);
1477
1478
6.48k
        _cur_reader = std::move(iceberg_reader);
1479
19.6k
    } else if (range.__isset.table_format_params &&
1480
19.6k
               range.table_format_params.table_format_type == "paimon") {
1481
        // PaimonOrcReader IS-A OrcReader, no wrapping needed
1482
2.08k
        auto paimon_reader = PaimonOrcReader::create_unique(
1483
2.08k
                _profile, _state, *_params, range, _state->batch_size(), _state->timezone(),
1484
2.08k
                _kv_cache, _io_ctx.get(), file_meta_cache_ptr);
1485
2.08k
        init_status = static_cast<GenericReader*>(paimon_reader.get())->init_reader(&octx);
1486
1487
2.08k
        _cur_reader = std::move(paimon_reader);
1488
17.5k
    } else if (range.__isset.table_format_params &&
1489
17.5k
               range.table_format_params.table_format_type == "hudi") {
1490
        // HudiOrcReader IS-A OrcReader, no wrapping needed
1491
0
        auto hudi_reader = HudiOrcReader::create_unique(_profile, _state, *_params, range,
1492
0
                                                        _state->batch_size(), _state->timezone(),
1493
0
                                                        _io_ctx.get(), file_meta_cache_ptr);
1494
0
        init_status = static_cast<GenericReader*>(hudi_reader.get())->init_reader(&octx);
1495
1496
0
        _cur_reader = std::move(hudi_reader);
1497
17.5k
    } else if (range.__isset.table_format_params &&
1498
17.5k
               range.table_format_params.table_format_type == "hive") {
1499
15.3k
        auto hive_reader = HiveOrcReader::create_unique(
1500
15.3k
                _profile, _state, *_params, range, _state->batch_size(), _state->timezone(),
1501
15.3k
                _io_ctx.get(), &_is_file_slot, file_meta_cache_ptr,
1502
15.3k
                _state->query_options().enable_orc_lazy_mat);
1503
15.3k
        hive_reader->set_create_row_id_column_iterator_func(
1504
15.3k
                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
1505
4.98k
                    return _create_row_id_column_iterator();
1506
4.98k
                });
1507
15.3k
        init_status = static_cast<GenericReader*>(hive_reader.get())->init_reader(&octx);
1508
1509
15.3k
        _cur_reader = std::move(hive_reader);
1510
15.3k
    } else if (range.__isset.table_format_params &&
1511
2.26k
               range.table_format_params.table_format_type == "tvf") {
1512
2.22k
        if (!orc_reader) {
1513
2.08k
            orc_reader = OrcReader::create_unique(_profile, _state, *_params, range,
1514
2.08k
                                                  _state->batch_size(), _state->timezone(),
1515
2.08k
                                                  _io_ctx.get(), file_meta_cache_ptr,
1516
2.08k
                                                  _state->query_options().enable_orc_lazy_mat);
1517
2.08k
        }
1518
2.22k
        orc_reader->set_create_row_id_column_iterator_func(
1519
2.22k
                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
1520
98
                    return _create_row_id_column_iterator();
1521
98
                });
1522
2.22k
        init_status = static_cast<GenericReader*>(orc_reader.get())->init_reader(&octx);
1523
2.22k
        _cur_reader = std::move(orc_reader);
1524
2.22k
    } else if (_is_load) {
1525
13
        if (!orc_reader) {
1526
13
            orc_reader = OrcReader::create_unique(_profile, _state, *_params, range,
1527
13
                                                  _state->batch_size(), _state->timezone(),
1528
13
                                                  _io_ctx.get(), file_meta_cache_ptr,
1529
13
                                                  _state->query_options().enable_orc_lazy_mat);
1530
13
        }
1531
13
        init_status = static_cast<GenericReader*>(orc_reader.get())->init_reader(&octx);
1532
13
        _cur_reader = std::move(orc_reader);
1533
13
    }
1534
1535
26.4k
    return init_status;
1536
26.4k
}
1537
1538
74.8k
Status FileScanner::_set_fill_or_truncate_columns(bool need_to_get_parsed_schema) {
1539
74.8k
    _slot_lower_name_to_col_type.clear();
1540
1541
74.8k
    std::unordered_map<std::string, DataTypePtr> name_to_col_type;
1542
74.8k
    RETURN_IF_ERROR(_cur_reader->get_columns(&name_to_col_type));
1543
1544
2.32M
    for (const auto& [col_name, col_type] : name_to_col_type) {
1545
2.32M
        auto col_name_lower = to_lower(col_name);
1546
2.32M
        if (_partition_col_descs.contains(col_name_lower)) {
1547
            /*
1548
             * `_slot_lower_name_to_col_type` is used by `_init_src_block` and `_cast_to_input_block` during LOAD to
1549
             * generate columns of the corresponding type, which records the columns existing in the file.
1550
             *
1551
             * When a column in `COLUMNS FROM PATH` exists in a file column, the column type in the block will
1552
             * not match the slot type in `_output_tuple_desc`, causing an error when
1553
             * Serde `deserialize_one_cell_from_json` fills the partition values.
1554
             *
1555
             * So for partition column not need fill _slot_lower_name_to_col_type.
1556
             */
1557
68
            continue;
1558
68
        }
1559
2.32M
        _slot_lower_name_to_col_type.emplace(col_name_lower, col_type);
1560
2.32M
    }
1561
1562
74.8k
    RETURN_IF_ERROR(_generate_truncate_columns(need_to_get_parsed_schema));
1563
74.8k
    return Status::OK();
1564
74.8k
}
1565
1566
74.8k
Status FileScanner::_generate_truncate_columns(bool need_to_get_parsed_schema) {
1567
74.8k
    _source_file_col_name_types.clear();
1568
    //  The col names and types of source file, such as parquet, orc files.
1569
74.8k
    if (_state->query_options().truncate_char_or_varchar_columns && need_to_get_parsed_schema) {
1570
8
        std::vector<std::string> source_file_col_names;
1571
8
        std::vector<DataTypePtr> source_file_col_types;
1572
8
        Status status =
1573
8
                _cur_reader->get_parsed_schema(&source_file_col_names, &source_file_col_types);
1574
8
        if (!status.ok() && status.code() != TStatusCode::NOT_IMPLEMENTED_ERROR) {
1575
0
            return status;
1576
0
        }
1577
8
        DCHECK_EQ(source_file_col_names.size(), source_file_col_types.size());
1578
32
        for (int i = 0; i < source_file_col_names.size(); ++i) {
1579
24
            _source_file_col_name_types[to_lower(source_file_col_names[i])] =
1580
24
                    source_file_col_types[i];
1581
24
        }
1582
8
    }
1583
74.8k
    return Status::OK();
1584
74.8k
}
1585
1586
3.16k
Status FileScanner::prepare_for_read_lines(const TFileRangeDesc& range) {
1587
3.16k
    _current_range = range;
1588
1589
3.16k
    _file_cache_statistics.reset(new io::FileCacheStatistics());
1590
3.16k
    _file_reader_stats.reset(new io::FileReaderStats());
1591
1592
3.16k
    _file_read_bytes_counter =
1593
3.16k
            ADD_COUNTER_WITH_LEVEL(_profile, FileReadBytesProfile, TUnit::BYTES, 1);
1594
3.16k
    _file_read_time_counter = ADD_TIMER_WITH_LEVEL(_profile, FileReadTimeProfile, 1);
1595
1596
3.16k
    RETURN_IF_ERROR(_init_io_ctx());
1597
3.16k
    _io_ctx->file_cache_stats = _file_cache_statistics.get();
1598
3.16k
    _io_ctx->file_reader_stats = _file_reader_stats.get();
1599
3.16k
    _default_val_row_desc.reset(new RowDescriptor((TupleDescriptor*)_real_tuple_desc));
1600
3.16k
    RETURN_IF_ERROR(_init_expr_ctxes());
1601
1602
    // Since only one column is read from the file, there is no need to filter, so set these variables to empty.
1603
3.16k
    _push_down_conjuncts.clear();
1604
3.16k
    _not_single_slot_filter_conjuncts.clear();
1605
3.16k
    _slot_id_to_filter_conjuncts.clear();
1606
3.16k
    _kv_cache = nullptr;
1607
3.16k
    return Status::OK();
1608
3.16k
}
1609
1610
Status FileScanner::read_lines_from_range(const TFileRangeDesc& range,
1611
                                          const std::list<int64_t>& row_ids, Block* result_block,
1612
                                          const ExternalFileMappingInfo& external_info,
1613
3.14k
                                          int64_t* init_reader_ms, int64_t* get_block_ms) {
1614
3.14k
    _current_range = range;
1615
3.14k
    RETURN_IF_ERROR(_generate_partition_columns());
1616
1617
3.14k
    TFileFormatType::type format_type = _get_current_format_type();
1618
3.14k
    Status init_status = Status::OK();
1619
1620
3.14k
    auto file_meta_cache_ptr = external_info.enable_file_meta_cache
1621
3.15k
                                       ? ExecEnv::GetInstance()->file_meta_cache()
1622
18.4E
                                       : nullptr;
1623
1624
3.14k
    RETURN_IF_ERROR(scope_timer_run(
1625
3.14k
            [&]() -> Status {
1626
3.14k
                switch (format_type) {
1627
3.14k
                case TFileFormatType::FORMAT_PARQUET: {
1628
3.14k
                    std::unique_ptr<ParquetReader> parquet_reader = ParquetReader::create_unique(
1629
3.14k
                            _profile, *_params, range, 1, &_state->timezone_obj(), _io_ctx.get(),
1630
3.14k
                            _state, file_meta_cache_ptr, false);
1631
3.14k
                    RETURN_IF_ERROR(
1632
3.14k
                            _init_parquet_reader(file_meta_cache_ptr, std::move(parquet_reader)));
1633
                    // _init_parquet_reader may create a new table-format specific reader
1634
                    // (e.g., HiveParquetReader) that replaces the original parquet_reader.
1635
                    // We need to re-apply read_by_rows to the actual _cur_reader.
1636
3.14k
                    RETURN_IF_ERROR(_cur_reader->read_by_rows(row_ids));
1637
3.14k
                    break;
1638
3.14k
                }
1639
3.14k
                case TFileFormatType::FORMAT_ORC: {
1640
3.14k
                    std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
1641
3.14k
                            _profile, _state, *_params, range, 1, _state->timezone(), _io_ctx.get(),
1642
3.14k
                            file_meta_cache_ptr, false);
1643
3.14k
                    RETURN_IF_ERROR(_init_orc_reader(file_meta_cache_ptr, std::move(orc_reader)));
1644
                    // Same as above: re-apply read_by_rows to the actual _cur_reader.
1645
3.14k
                    RETURN_IF_ERROR(_cur_reader->read_by_rows(row_ids));
1646
3.14k
                    break;
1647
3.14k
                }
1648
3.14k
                default: {
1649
3.14k
                    return Status::NotSupported(
1650
3.14k
                            "Not support create lines reader for file format: {},"
1651
3.14k
                            "only support parquet and orc.",
1652
3.14k
                            to_string(_params->format_type));
1653
3.14k
                }
1654
3.14k
                }
1655
3.14k
                return Status::OK();
1656
3.14k
            },
1657
3.14k
            init_reader_ms));
1658
1659
3.14k
    RETURN_IF_ERROR(_set_fill_or_truncate_columns(true));
1660
3.14k
    _cur_reader_eof = false;
1661
1662
3.14k
    RETURN_IF_ERROR(scope_timer_run(
1663
3.14k
            [&]() -> Status {
1664
3.14k
                while (!_cur_reader_eof) {
1665
3.14k
                    bool eof = false;
1666
3.14k
                    RETURN_IF_ERROR(_get_block_impl(_state, result_block, &eof));
1667
3.14k
                }
1668
3.14k
                return Status::OK();
1669
3.14k
            },
1670
3.14k
            get_block_ms));
1671
1672
3.14k
    _cur_reader->collect_profile_before_close();
1673
3.14k
    RETURN_IF_ERROR(_cur_reader->close());
1674
1675
3.14k
    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
1676
3.14k
    COUNTER_UPDATE(_file_read_time_counter, _file_reader_stats->read_time_ns);
1677
3.14k
    return Status::OK();
1678
3.14k
}
1679
1680
10.8k
Status FileScanner::_generate_partition_columns() {
1681
10.8k
    _partition_col_descs.clear();
1682
10.8k
    _partition_value_is_null.clear();
1683
10.8k
    const TFileRangeDesc& range = _current_range;
1684
10.8k
    if (!range.__isset.columns_from_path_keys) {
1685
1.84k
        return Status::OK();
1686
1.84k
    }
1687
1688
9.02k
    std::unordered_map<std::string, int> partition_name_to_key_index;
1689
9.02k
    int index = 0;
1690
17.2k
    for (const auto& key : range.columns_from_path_keys) {
1691
17.2k
        partition_name_to_key_index.emplace(key, index++);
1692
17.2k
    }
1693
1694
    // Iterate _column_descs to find PARTITION_KEY columns instead of _partition_slot_descs.
1695
87.2k
    for (const auto& col_desc : _column_descs) {
1696
87.2k
        if (col_desc.category != ColumnCategory::PARTITION_KEY) {
1697
76.1k
            continue;
1698
76.1k
        }
1699
11.0k
        auto pit = partition_name_to_key_index.find(col_desc.name);
1700
11.1k
        if (pit != partition_name_to_key_index.end()) {
1701
11.1k
            int values_index = pit->second;
1702
11.1k
            if (range.__isset.columns_from_path && values_index < range.columns_from_path.size()) {
1703
11.1k
                _partition_col_descs.emplace(
1704
11.1k
                        col_desc.name,
1705
11.1k
                        std::make_tuple(range.columns_from_path[values_index], col_desc.slot_desc));
1706
11.1k
                if (range.__isset.columns_from_path_is_null) {
1707
0
                    _partition_value_is_null.emplace(col_desc.name,
1708
0
                                                     range.columns_from_path_is_null[values_index]);
1709
0
                }
1710
11.1k
            }
1711
11.1k
        }
1712
11.0k
    }
1713
9.02k
    return Status::OK();
1714
10.8k
}
1715
1716
53.8k
Status FileScanner::_init_expr_ctxes() {
1717
53.8k
    std::map<SlotId, int> full_src_index_map;
1718
53.8k
    std::map<SlotId, SlotDescriptor*> full_src_slot_map;
1719
53.8k
    std::map<std::string, int> partition_name_to_key_index_map;
1720
53.8k
    int index = 0;
1721
330k
    for (const auto& slot_desc : _real_tuple_desc->slots()) {
1722
330k
        full_src_slot_map.emplace(slot_desc->id(), slot_desc);
1723
330k
        full_src_index_map.emplace(slot_desc->id(), index++);
1724
330k
    }
1725
1726
    // For external table query, find the index of column in path.
1727
    // Because query doesn't always search for all columns in a table
1728
    // and the order of selected columns is random.
1729
    // All ranges in _ranges vector should have identical columns_from_path_keys
1730
    // because they are all file splits for the same external table.
1731
    // So here use the first element of _ranges to fill the partition_name_to_key_index_map
1732
53.8k
    if (_current_range.__isset.columns_from_path_keys) {
1733
14.6k
        std::vector<std::string> key_map = _current_range.columns_from_path_keys;
1734
14.6k
        if (!key_map.empty()) {
1735
30.6k
            for (size_t i = 0; i < key_map.size(); i++) {
1736
19.9k
                partition_name_to_key_index_map.emplace(key_map[i], i);
1737
19.9k
            }
1738
10.7k
        }
1739
14.6k
    }
1740
1741
53.8k
    _num_of_columns_from_file = _params->num_of_columns_from_file;
1742
330k
    for (const auto& slot_info : _params->required_slots) {
1743
330k
        auto slot_id = slot_info.slot_id;
1744
330k
        auto it = full_src_slot_map.find(slot_id);
1745
330k
        if (it == std::end(full_src_slot_map)) {
1746
0
            return Status::InternalError(
1747
0
                    fmt::format("Unknown source slot descriptor, slot_id={}", slot_id));
1748
0
        }
1749
1750
330k
        ColumnDescriptor col_desc;
1751
330k
        col_desc.name = it->second->col_name();
1752
330k
        col_desc.slot_desc = it->second;
1753
1754
        // Read category from Thrift if available (new FE), otherwise fall back
1755
        // to slot_info.is_file_slot + partition_name_to_key_index_map for broker/stream load
1756
        // where the FE does not set TColumnCategory.
1757
330k
        if (slot_info.__isset.category) {
1758
283k
            switch (slot_info.category) {
1759
269k
            case TColumnCategory::REGULAR:
1760
269k
                col_desc.category = ColumnCategory::REGULAR;
1761
269k
                break;
1762
8.49k
            case TColumnCategory::PARTITION_KEY:
1763
8.49k
                col_desc.category = ColumnCategory::PARTITION_KEY;
1764
8.49k
                break;
1765
4.42k
            case TColumnCategory::SYNTHESIZED:
1766
4.42k
                col_desc.category = ColumnCategory::SYNTHESIZED;
1767
4.42k
                break;
1768
812
            case TColumnCategory::GENERATED:
1769
812
                col_desc.category = ColumnCategory::GENERATED;
1770
812
                break;
1771
283k
            }
1772
283k
        } else if (partition_name_to_key_index_map.contains(it->second->col_name()) &&
1773
46.8k
                   !slot_info.is_file_slot) {
1774
2.11k
            col_desc.category = ColumnCategory::PARTITION_KEY;
1775
2.11k
        }
1776
1777
        // Derive is_file_slot from category
1778
330k
        bool is_file_slot = (col_desc.category == ColumnCategory::REGULAR ||
1779
330k
                             col_desc.category == ColumnCategory::GENERATED);
1780
1781
330k
        if (partition_name_to_key_index_map.contains(it->second->col_name())) {
1782
14.4k
            if (_is_load) {
1783
11
                auto iti = full_src_index_map.find(slot_id);
1784
11
                _partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file);
1785
14.3k
            } else {
1786
14.3k
                auto kit = partition_name_to_key_index_map.find(it->second->col_name());
1787
14.3k
                _partition_slot_index_map.emplace(slot_id, kit->second);
1788
14.3k
            }
1789
14.4k
        }
1790
1791
330k
        if (is_file_slot) {
1792
314k
            _is_file_slot.emplace(slot_id);
1793
314k
            _file_slot_descs.emplace_back(it->second);
1794
314k
            _file_col_names.push_back(it->second->col_name());
1795
314k
        }
1796
1797
330k
        _column_descs.push_back(col_desc);
1798
330k
    }
1799
1800
    // set column name to default value expr map
1801
    // new inline TFileScanSlotInfo.default_value_expr (preferred)
1802
331k
    for (const auto& slot_info : _params->required_slots) {
1803
331k
        auto slot_id = slot_info.slot_id;
1804
331k
        auto it = full_src_slot_map.find(slot_id);
1805
331k
        if (it == std::end(full_src_slot_map)) {
1806
0
            continue;
1807
0
        }
1808
331k
        const std::string& col_name = it->second->col_name();
1809
1810
331k
        VExprContextSPtr ctx;
1811
331k
        bool has_default = false;
1812
1813
        // Prefer inline default_value_expr from TFileScanSlotInfo (new FE)
1814
331k
        if (slot_info.__isset.default_value_expr && !slot_info.default_value_expr.nodes.empty()) {
1815
297k
            RETURN_IF_ERROR(VExpr::create_expr_tree(slot_info.default_value_expr, ctx));
1816
297k
            RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc));
1817
297k
            RETURN_IF_ERROR(ctx->open(_state));
1818
297k
            has_default = true;
1819
297k
        } else if (slot_info.__isset.default_value_expr) {
1820
            // Empty nodes means null default (same as legacy empty TExpr)
1821
5.34k
            has_default = true;
1822
5.34k
        }
1823
1824
        // Fall back to legacy default_value_of_src_slot map for mixed-version FE/BE
1825
        // and callers that have not preserved inline default_value_expr yet.
1826
331k
        if (!has_default) {
1827
28.5k
            auto legacy_it = _params->default_value_of_src_slot.find(slot_id);
1828
28.5k
            if (legacy_it != std::end(_params->default_value_of_src_slot)) {
1829
23.7k
                if (!legacy_it->second.nodes.empty()) {
1830
0
                    RETURN_IF_ERROR(VExpr::create_expr_tree(legacy_it->second, ctx));
1831
0
                    RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc));
1832
0
                    RETURN_IF_ERROR(ctx->open(_state));
1833
0
                }
1834
23.7k
                has_default = true;
1835
23.7k
            }
1836
28.5k
        }
1837
1838
331k
        if (has_default) {
1839
            // if expr is empty, the default value will be null
1840
326k
            _col_default_value_ctx.emplace(col_name, ctx);
1841
326k
        }
1842
331k
    }
1843
1844
    // Populate default_expr in each ColumnDescriptor from _col_default_value_ctx.
1845
    // This makes default values available to readers via column_descs, eliminating the
1846
    // need for the separate _generate_missing_columns roundtrip.
1847
331k
    for (auto& col_desc : _column_descs) {
1848
331k
        auto it = _col_default_value_ctx.find(col_desc.name);
1849
331k
        if (it != _col_default_value_ctx.end()) {
1850
326k
            col_desc.default_expr = it->second;
1851
326k
        }
1852
331k
    }
1853
1854
53.9k
    if (_is_load) {
1855
        // follow desc expr map is only for load task.
1856
2.53k
        bool has_slot_id_map = _params->__isset.dest_sid_to_src_sid_without_trans;
1857
2.53k
        int idx = 0;
1858
29.3k
        for (auto* slot_desc : _output_tuple_desc->slots()) {
1859
29.3k
            auto it = _params->expr_of_dest_slot.find(slot_desc->id());
1860
29.3k
            if (it == std::end(_params->expr_of_dest_slot)) {
1861
0
                return Status::InternalError("No expr for dest slot, id={}, name={}",
1862
0
                                             slot_desc->id(), slot_desc->col_name());
1863
0
            }
1864
1865
29.3k
            VExprContextSPtr ctx;
1866
29.3k
            if (!it->second.nodes.empty()) {
1867
29.3k
                RETURN_IF_ERROR(VExpr::create_expr_tree(it->second, ctx));
1868
29.3k
                RETURN_IF_ERROR(ctx->prepare(_state, *_src_row_desc));
1869
29.3k
                RETURN_IF_ERROR(ctx->open(_state));
1870
29.3k
            }
1871
29.3k
            _dest_vexpr_ctx.emplace_back(ctx);
1872
29.3k
            _dest_slot_name_to_idx[slot_desc->col_name()] = idx++;
1873
1874
29.3k
            if (has_slot_id_map) {
1875
29.3k
                auto it1 = _params->dest_sid_to_src_sid_without_trans.find(slot_desc->id());
1876
29.3k
                if (it1 == std::end(_params->dest_sid_to_src_sid_without_trans)) {
1877
6.51k
                    _src_slot_descs_order_by_dest.emplace_back(nullptr);
1878
22.7k
                } else {
1879
22.7k
                    auto _src_slot_it = full_src_slot_map.find(it1->second);
1880
22.7k
                    if (_src_slot_it == std::end(full_src_slot_map)) {
1881
0
                        return Status::InternalError("No src slot {} in src slot descs",
1882
0
                                                     it1->second);
1883
0
                    }
1884
22.7k
                    _dest_slot_to_src_slot_index.emplace(_src_slot_descs_order_by_dest.size(),
1885
22.7k
                                                         full_src_index_map[_src_slot_it->first]);
1886
22.7k
                    _src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second);
1887
22.7k
                }
1888
29.3k
            }
1889
29.3k
        }
1890
2.53k
    }
1891
53.9k
    return Status::OK();
1892
53.9k
}
1893
1894
404k
bool FileScanner::_should_enable_condition_cache() {
1895
404k
    DCHECK(_should_enable_condition_cache_handler != nullptr);
1896
404k
    return _condition_cache_digest != 0 && (this->*_should_enable_condition_cache_handler)() &&
1897
404k
           (!_conjuncts.empty() || !_push_down_conjuncts.empty());
1898
404k
}
1899
1900
0
bool FileScanner::_should_enable_condition_cache_for_load() const {
1901
0
    return false;
1902
0
}
1903
1904
336k
bool FileScanner::_should_enable_condition_cache_for_query() const {
1905
336k
    return true;
1906
336k
}
1907
1908
71.9k
bool FileScanner::_should_push_down_predicates(TFileFormatType::type format_type) const {
1909
71.9k
    DCHECK(_should_push_down_predicates_handler != nullptr);
1910
71.9k
    return (this->*_should_push_down_predicates_handler)(format_type);
1911
71.9k
}
1912
1913
2.54k
bool FileScanner::_should_push_down_predicates_for_load(TFileFormatType::type format_type) const {
1914
2.54k
    static_cast<void>(format_type);
1915
2.54k
    return false;
1916
2.54k
}
1917
1918
69.4k
bool FileScanner::_should_push_down_predicates_for_query(TFileFormatType::type format_type) const {
1919
    // JNI readers handle predicate conversion in their own paths.
1920
69.4k
    return format_type != TFileFormatType::FORMAT_JNI;
1921
69.4k
}
1922
1923
158k
void FileScanner::_init_reader_condition_cache() {
1924
158k
    _condition_cache = nullptr;
1925
158k
    _condition_cache_ctx = nullptr;
1926
1927
158k
    if (!_should_enable_condition_cache() || !_cur_reader) {
1928
136k
        return;
1929
136k
    }
1930
1931
    // Disable condition cache when delete operations exist (e.g. Iceberg position/equality
1932
    // deletes, Hive ACID deletes). Cached granule results may become stale if delete files
1933
    // change between queries while the data file's cache key remains the same.
1934
21.3k
    if (_cur_reader->has_delete_operations()) {
1935
2.19k
        return;
1936
2.19k
    }
1937
1938
19.1k
    auto* cache = segment_v2::ConditionCache::instance();
1939
19.1k
    _condition_cache_key = segment_v2::ConditionCache::ExternalCacheKey(
1940
19.1k
            _current_range.path,
1941
18.4E
            _current_range.__isset.modification_time ? _current_range.modification_time : 0,
1942
19.1k
            _current_range.__isset.file_size ? _current_range.file_size : -1,
1943
19.1k
            _condition_cache_digest,
1944
18.4E
            _current_range.__isset.start_offset ? _current_range.start_offset : 0,
1945
18.4E
            _current_range.__isset.size ? _current_range.size : -1);
1946
1947
19.1k
    segment_v2::ConditionCacheHandle handle;
1948
19.1k
    auto condition_cache_hit = cache->lookup(_condition_cache_key, &handle);
1949
19.1k
    if (condition_cache_hit) {
1950
7.74k
        _condition_cache = handle.get_filter_result();
1951
7.74k
        _condition_cache_hit_count++;
1952
11.3k
    } else {
1953
        // Allocate cache pre-sized to total number of granules.
1954
        // We add +1 as a safety margin: when a file is split across multiple scanners
1955
        // and the first row of this scanner's range is not aligned to a granule boundary,
1956
        // the data may span one more granule than ceil(total_rows / GRANULE_SIZE).
1957
        // The extra element costs only 1 bit and never affects correctness (an extra
1958
        // false-granule beyond the actual data range won't overlap any real row range).
1959
11.3k
        int64_t total_rows = _cur_reader->get_total_rows();
1960
11.3k
        if (total_rows > 0) {
1961
9.40k
            size_t num_granules = (total_rows + ConditionCacheContext::GRANULE_SIZE - 1) /
1962
9.40k
                                  ConditionCacheContext::GRANULE_SIZE;
1963
9.40k
            _condition_cache = std::make_shared<std::vector<bool>>(num_granules + 1, false);
1964
9.40k
        }
1965
11.3k
    }
1966
1967
19.1k
    if (_condition_cache) {
1968
        // Create context to pass to readers (native readers use it; non-native readers ignore it)
1969
17.1k
        _condition_cache_ctx = std::make_shared<ConditionCacheContext>();
1970
17.1k
        _condition_cache_ctx->is_hit = condition_cache_hit;
1971
17.1k
        _condition_cache_ctx->filter_result = _condition_cache;
1972
17.1k
        _cur_reader->set_condition_cache_context(_condition_cache_ctx);
1973
17.1k
    }
1974
19.1k
}
1975
1976
245k
void FileScanner::_finalize_reader_condition_cache() {
1977
245k
    if (!_should_enable_condition_cache() || !_condition_cache_ctx ||
1978
245k
        _condition_cache_ctx->is_hit) {
1979
236k
        _condition_cache = nullptr;
1980
236k
        _condition_cache_ctx = nullptr;
1981
236k
        return;
1982
236k
    }
1983
    // Only store the cache if the reader was fully consumed. If the scan was
1984
    // truncated early (e.g. by LIMIT), the cache is incomplete — unread granules
1985
    // would remain false and cause surviving rows to be incorrectly skipped on HIT.
1986
9.42k
    if (!_cur_reader_eof) {
1987
58
        _condition_cache = nullptr;
1988
58
        _condition_cache_ctx = nullptr;
1989
58
        return;
1990
58
    }
1991
1992
9.37k
    auto* cache = segment_v2::ConditionCache::instance();
1993
9.37k
    cache->insert(_condition_cache_key, std::move(_condition_cache));
1994
9.37k
    _condition_cache = nullptr;
1995
9.37k
    _condition_cache_ctx = nullptr;
1996
9.37k
}
1997
1998
87.2k
Status FileScanner::close(RuntimeState* state) {
1999
87.2k
    if (!_try_close()) {
2000
0
        return Status::OK();
2001
0
    }
2002
2003
87.2k
    _finalize_reader_condition_cache();
2004
2005
87.2k
    if (_cur_reader) {
2006
663
        RETURN_IF_ERROR(_cur_reader->close());
2007
663
    }
2008
2009
87.2k
    RETURN_IF_ERROR(Scanner::close(state));
2010
87.2k
    return Status::OK();
2011
87.2k
}
2012
2013
87.2k
void FileScanner::try_stop() {
2014
87.2k
    Scanner::try_stop();
2015
87.2k
    if (_io_ctx) {
2016
87.2k
        _io_ctx->should_stop = true;
2017
87.2k
    }
2018
87.2k
}
2019
2020
141k
void FileScanner::update_realtime_counters() {
2021
141k
    FileScanLocalState* local_state = static_cast<FileScanLocalState*>(_local_state);
2022
2023
141k
    COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes);
2024
141k
    COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows);
2025
2026
141k
    _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_rows(
2027
141k
            _file_reader_stats->read_rows);
2028
141k
    _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes(
2029
141k
            _file_reader_stats->read_bytes);
2030
2031
141k
    int64_t delta_bytes_read_from_local =
2032
141k
            _file_cache_statistics->bytes_read_from_local - _last_bytes_read_from_local;
2033
141k
    int64_t delta_bytes_read_from_remote =
2034
141k
            _file_cache_statistics->bytes_read_from_remote - _last_bytes_read_from_remote;
2035
141k
    if (_file_cache_statistics->bytes_read_from_local == 0 &&
2036
141k
        _file_cache_statistics->bytes_read_from_remote == 0) {
2037
136k
        _state->get_query_ctx()
2038
136k
                ->resource_ctx()
2039
136k
                ->io_context()
2040
136k
                ->update_scan_bytes_from_remote_storage(_file_reader_stats->read_bytes);
2041
136k
        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
2042
136k
                _file_reader_stats->read_bytes);
2043
136k
    } else {
2044
5.47k
        _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage(
2045
5.47k
                delta_bytes_read_from_local);
2046
5.47k
        _state->get_query_ctx()
2047
5.47k
                ->resource_ctx()
2048
5.47k
                ->io_context()
2049
5.47k
                ->update_scan_bytes_from_remote_storage(delta_bytes_read_from_remote);
2050
5.47k
        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
2051
5.47k
                delta_bytes_read_from_local);
2052
5.47k
        DorisMetrics::instance()->query_scan_bytes_from_remote->increment(
2053
5.47k
                delta_bytes_read_from_remote);
2054
5.47k
    }
2055
2056
141k
    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
2057
2058
141k
    DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes);
2059
141k
    DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows);
2060
2061
141k
    _file_reader_stats->read_bytes = 0;
2062
141k
    _file_reader_stats->read_rows = 0;
2063
2064
141k
    _last_bytes_read_from_local = _file_cache_statistics->bytes_read_from_local;
2065
141k
    _last_bytes_read_from_remote = _file_cache_statistics->bytes_read_from_remote;
2066
141k
}
2067
2068
87.1k
void FileScanner::_collect_profile_before_close() {
2069
87.1k
    Scanner::_collect_profile_before_close();
2070
87.1k
    if (config::enable_file_cache && _state->query_options().enable_file_cache &&
2071
87.1k
        _profile != nullptr) {
2072
1.08k
        io::FileCacheProfileReporter cache_profile(_profile);
2073
1.08k
        cache_profile.update(_file_cache_statistics.get());
2074
1.08k
        _state->get_query_ctx()->resource_ctx()->io_context()->update_bytes_write_into_cache(
2075
1.08k
                _file_cache_statistics->bytes_write_into_cache);
2076
1.08k
    }
2077
2078
87.1k
    if (_cur_reader != nullptr) {
2079
663
        _cur_reader->collect_profile_before_close();
2080
663
    }
2081
2082
87.1k
    FileScanLocalState* local_state = static_cast<FileScanLocalState*>(_local_state);
2083
87.1k
    COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes);
2084
87.1k
    COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows);
2085
2086
87.1k
    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
2087
87.1k
    COUNTER_UPDATE(_file_read_calls_counter, _file_reader_stats->read_calls);
2088
87.1k
    COUNTER_UPDATE(_file_read_time_counter, _file_reader_stats->read_time_ns);
2089
87.1k
    COUNTER_UPDATE(local_state->_condition_cache_hit_counter, _condition_cache_hit_count);
2090
87.1k
    if (_io_ctx) {
2091
87.1k
        COUNTER_UPDATE(local_state->_condition_cache_filtered_rows_counter,
2092
87.1k
                       _io_ctx->condition_cache_filtered_rows);
2093
87.1k
    }
2094
2095
87.1k
    DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes);
2096
87.1k
    DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows);
2097
87.1k
}
2098
2099
} // namespace doris