Coverage Report

Created: 2026-06-30 15:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/file_scanner.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/file_scanner.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Exprs_types.h>
22
#include <gen_cpp/Metrics_types.h>
23
#include <gen_cpp/Opcodes_types.h>
24
#include <gen_cpp/PaloInternalService_types.h>
25
#include <gen_cpp/PlanNodes_types.h>
26
#include <glog/logging.h>
27
28
#include <algorithm>
29
#include <boost/iterator/iterator_facade.hpp>
30
#include <map>
31
#include <ranges>
32
#include <tuple>
33
#include <unordered_map>
34
#include <utility>
35
36
#include "common/cast_set.h"
37
#include "common/compiler_util.h" // IWYU pragma: keep
38
#include "common/config.h"
39
#include "common/consts.h"
40
#include "common/logging.h"
41
#include "common/status.h"
42
#include "core/block/column_with_type_and_name.h"
43
#include "core/block/columns_with_type_and_name.h"
44
#include "core/column/column.h"
45
#include "core/column/column_nullable.h"
46
#include "core/column/column_vector.h"
47
#include "core/data_type/data_type.h"
48
#include "core/data_type/data_type_nullable.h"
49
#include "core/data_type/data_type_string.h"
50
#include "core/string_ref.h"
51
#include "exec/common/stringop_substring.h"
52
#include "exec/rowid_fetcher.h"
53
#include "exec/scan/scan_node.h"
54
#include "exprs/aggregate/aggregate_function.h"
55
#include "exprs/function/function.h"
56
#include "exprs/function/simple_function_factory.h"
57
#include "exprs/vexpr.h"
58
#include "exprs/vexpr_context.h"
59
#include "exprs/vexpr_fwd.h"
60
#include "exprs/vslot_ref.h"
61
#include "format/arrow/arrow_stream_reader.h"
62
#include "format/count_reader.h"
63
#include "format/csv/csv_reader.h"
64
#include "format/json/new_json_reader.h"
65
#include "format/native/native_reader.h"
66
#include "format/orc/vorc_reader.h"
67
#include "format/parquet/vparquet_reader.h"
68
#include "format/table/es/es_http_reader.h"
69
#include "format/table/hive_reader.h"
70
#include "format/table/hudi_jni_reader.h"
71
#include "format/table/hudi_reader.h"
72
#include "format/table/iceberg_reader.h"
73
#include "format/table/iceberg_sys_table_jni_reader.h"
74
#include "format/table/jdbc_jni_reader.h"
75
#include "format/table/max_compute_jni_reader.h"
76
#include "format/table/paimon_cpp_reader.h"
77
#include "format/table/paimon_jni_reader.h"
78
#include "format/table/paimon_predicate_converter.h"
79
#include "format/table/paimon_reader.h"
80
#include "format/table/partition_column_filler.h"
81
#include "format/table/remote_doris_reader.h"
82
#include "format/table/transactional_hive_reader.h"
83
#include "format/table/trino_connector_jni_reader.h"
84
#include "format/text/text_reader.h"
85
#ifdef BUILD_RUST_READERS
86
#include "format/lance/lance_rust_reader.h"
87
#endif
88
#include "io/cache/block_file_cache_profile.h"
89
#include "load/group_commit/wal/wal_reader.h"
90
#include "runtime/descriptors.h"
91
#include "runtime/runtime_profile.h"
92
#include "runtime/runtime_state.h"
93
94
namespace cctz {
95
class time_zone;
96
} // namespace cctz
97
namespace doris {
98
class ShardedKVCache;
99
} // namespace doris
100
101
namespace doris {
102
using namespace ErrorCode;
103
104
const std::string FileScanner::FileReadBytesProfile = "FileReadBytes";
105
const std::string FileScanner::FileReadTimeProfile = "FileReadTime";
106
107
FileScanner::FileScanner(RuntimeState* state, FileScanLocalState* local_state, int64_t limit,
108
                         std::shared_ptr<SplitSourceConnector> split_source,
109
                         RuntimeProfile* profile, ShardedKVCache* kv_cache,
110
                         const std::unordered_map<std::string, int>* colname_to_slot_id)
111
90.0k
        : Scanner(state, local_state, limit, profile),
112
90.0k
          _split_source(split_source),
113
90.0k
          _cur_reader(nullptr),
114
90.0k
          _cur_reader_eof(false),
115
90.0k
          _kv_cache(kv_cache),
116
90.0k
          _strict_mode(false),
117
90.0k
          _col_name_to_slot_id(colname_to_slot_id) {
118
90.0k
    if (state->get_query_ctx() != nullptr &&
119
90.0k
        state->get_query_ctx()->file_scan_range_params_map.count(local_state->parent_id()) > 0) {
120
85.3k
        _params = &(state->get_query_ctx()->file_scan_range_params_map[local_state->parent_id()]);
121
85.3k
    } else {
122
        // old fe thrift protocol
123
4.69k
        _params = _split_source->get_params();
124
4.69k
    }
125
90.0k
    if (_params->__isset.strict_mode) {
126
4.70k
        _strict_mode = _params->strict_mode;
127
4.70k
    }
128
129
    // For load scanner, there are input and output tuple.
130
    // For query scanner, there is only output tuple
131
90.0k
    _input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_params->src_tuple_id);
132
90.0k
    _real_tuple_desc = _input_tuple_desc == nullptr ? _output_tuple_desc : _input_tuple_desc;
133
90.0k
    _is_load = (_input_tuple_desc != nullptr);
134
90.0k
    _configure_file_scan_handlers();
135
90.0k
}
136
137
93.2k
void FileScanner::_configure_file_scan_handlers() {
138
93.2k
    if (_is_load) {
139
4.71k
        _init_src_block_handler = &FileScanner::_init_src_block_for_load;
140
4.71k
        _process_src_block_after_read_handler =
141
4.71k
                &FileScanner::_process_src_block_after_read_for_load;
142
4.71k
        _should_push_down_predicates_handler = &FileScanner::_should_push_down_predicates_for_load;
143
4.71k
        _should_enable_condition_cache_handler =
144
4.71k
                &FileScanner::_should_enable_condition_cache_for_load;
145
88.5k
    } else {
146
88.5k
        _init_src_block_handler = &FileScanner::_init_src_block_for_query;
147
88.5k
        _process_src_block_after_read_handler =
148
88.5k
                &FileScanner::_process_src_block_after_read_for_query;
149
88.5k
        _should_push_down_predicates_handler = &FileScanner::_should_push_down_predicates_for_query;
150
88.5k
        _should_enable_condition_cache_handler =
151
88.5k
                &FileScanner::_should_enable_condition_cache_for_query;
152
88.5k
    }
153
93.2k
}
154
155
90.0k
Status FileScanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) {
156
90.0k
    RETURN_IF_ERROR(Scanner::init(state, conjuncts));
157
90.0k
    _get_block_timer =
158
90.0k
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerGetBlockTime", 1);
159
90.0k
    _cast_to_input_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
160
90.0k
                                                      "FileScannerCastInputBlockTime", 1);
161
90.0k
    _fill_missing_columns_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
162
90.0k
                                                       "FileScannerFillMissingColumnTime", 1);
163
90.0k
    _pre_filter_timer =
164
90.0k
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerPreFilterTimer", 1);
165
90.0k
    _convert_to_output_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
166
90.0k
                                                          "FileScannerConvertOuputBlockTime", 1);
167
90.0k
    _runtime_filter_partition_prune_timer = ADD_TIMER_WITH_LEVEL(
168
90.0k
            _local_state->scanner_profile(), "FileScannerRuntimeFilterPartitionPruningTime", 1);
169
90.0k
    _empty_file_counter =
170
90.0k
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "EmptyFileNum", TUnit::UNIT, 1);
171
90.0k
    _not_found_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
172
90.0k
                                                     "NotFoundFileNum", TUnit::UNIT, 1);
173
90.0k
    _fully_skipped_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
174
90.0k
                                                         "FullySkippedFileNum", TUnit::UNIT, 1);
175
90.0k
    _file_counter =
176
90.0k
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT, 1);
177
178
90.0k
    _file_read_bytes_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
179
90.0k
                                                      FileReadBytesProfile, TUnit::BYTES, 1);
180
90.0k
    _file_read_calls_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
181
90.0k
                                                      "FileReadCalls", TUnit::UNIT, 1);
182
90.0k
    _file_read_time_counter =
183
90.0k
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), FileReadTimeProfile, 1);
184
185
90.0k
    _runtime_filter_partition_pruned_range_counter =
186
90.0k
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
187
90.0k
                                   "RuntimeFilterPartitionPrunedRangeNum", TUnit::UNIT, 1);
188
    // Keep the current file's adaptive state while also preserving the peak value across all
189
    // files handled by this scanner instance.
190
90.0k
    _adaptive_batch_predicted_rows_counter =
191
90.0k
            _local_state->scanner_profile()->AddHighWaterMarkCounter(
192
90.0k
                    "AdaptiveBatchPredictedRows", TUnit::UNIT, RuntimeProfile::ROOT_COUNTER, 1);
193
90.0k
    _adaptive_batch_actual_bytes_before_truncate_counter =
194
90.0k
            _local_state->scanner_profile()->AddHighWaterMarkCounter(
195
90.0k
                    "AdaptiveBatchActualBytesBeforeTruncate", TUnit::BYTES,
196
90.0k
                    RuntimeProfile::ROOT_COUNTER, 1);
197
90.0k
    _adaptive_batch_actual_bytes_after_truncate_counter =
198
90.0k
            _local_state->scanner_profile()->AddHighWaterMarkCounter(
199
90.0k
                    "AdaptiveBatchActualBytesAfterTruncate", TUnit::BYTES,
200
90.0k
                    RuntimeProfile::ROOT_COUNTER, 1);
201
90.0k
    _adaptive_batch_probe_count_counter = ADD_COUNTER_WITH_LEVEL(
202
90.0k
            _local_state->scanner_profile(), "AdaptiveBatchProbeCount", TUnit::UNIT, 1);
203
204
90.0k
    _file_cache_statistics.reset(new io::FileCacheStatistics());
205
90.0k
    _file_reader_stats.reset(new io::FileReaderStats());
206
207
90.0k
    RETURN_IF_ERROR(_init_io_ctx());
208
90.0k
    _io_ctx->file_cache_stats = _file_cache_statistics.get();
209
90.0k
    _io_ctx->file_reader_stats = _file_reader_stats.get();
210
90.0k
    _io_ctx->is_disposable = _state->query_options().disable_file_cache;
211
212
90.0k
    if (_is_load) {
213
4.70k
        _src_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
214
4.70k
                                              std::vector<TupleId>({_input_tuple_desc->id()})));
215
        // prepare pre filters
216
4.70k
        if (_params->__isset.pre_filter_exprs_list) {
217
6
            RETURN_IF_ERROR(doris::VExpr::create_expr_trees(_params->pre_filter_exprs_list,
218
6
                                                            _pre_conjunct_ctxs));
219
4.70k
        } else if (_params->__isset.pre_filter_exprs) {
220
0
            VExprContextSPtr context;
221
0
            RETURN_IF_ERROR(doris::VExpr::create_expr_tree(_params->pre_filter_exprs, context));
222
0
            _pre_conjunct_ctxs.emplace_back(context);
223
0
        }
224
225
4.70k
        for (auto& conjunct : _pre_conjunct_ctxs) {
226
8
            RETURN_IF_ERROR(conjunct->prepare(_state, *_src_row_desc));
227
8
            RETURN_IF_ERROR(conjunct->open(_state));
228
8
        }
229
230
4.70k
        _dest_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
231
4.70k
                                               std::vector<TupleId>({_output_tuple_desc->id()})));
232
4.70k
    }
233
234
90.0k
    _default_val_row_desc.reset(
235
90.0k
            new RowDescriptor(_state->desc_tbl(), std::vector<TupleId>({_real_tuple_desc->id()})));
236
237
90.0k
    return Status::OK();
238
90.0k
}
239
240
74.4k
bool FileScanner::_should_enable_adaptive_batch_size(TFileFormatType::type format_type) const {
241
    // Only enable for readers that support set_batch_size().
242
    // Table-format wrappers are covered because they delegate to native readers.
243
74.4k
    if (!config::enable_adaptive_batch_size) {
244
6
        return false;
245
6
    }
246
74.4k
    switch (format_type) {
247
32.7k
    case TFileFormatType::FORMAT_PARQUET:
248
57.5k
    case TFileFormatType::FORMAT_ORC:
249
60.2k
    case TFileFormatType::FORMAT_CSV_PLAIN:
250
60.2k
    case TFileFormatType::FORMAT_CSV_GZ:
251
60.2k
    case TFileFormatType::FORMAT_CSV_BZ2:
252
60.2k
    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
253
60.2k
    case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
254
60.2k
    case TFileFormatType::FORMAT_CSV_LZOP:
255
60.2k
    case TFileFormatType::FORMAT_CSV_DEFLATE:
256
60.2k
    case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
257
60.3k
    case TFileFormatType::FORMAT_PROTO:
258
65.0k
    case TFileFormatType::FORMAT_TEXT:
259
68.3k
    case TFileFormatType::FORMAT_JSON:
260
73.6k
    case TFileFormatType::FORMAT_JNI:
261
73.6k
        return true;
262
811
    default:
263
811
        return false;
264
74.4k
    }
265
74.4k
}
266
267
499k
bool FileScanner::_should_run_adaptive_batch_size() const {
268
    // Skip adaptive batch sizing for pushed-down COUNT(*): the reader is wrapped by CountReader
269
    // and only emits a single aggregated row count instead of materializing real columns, so
270
    // there is no per-row byte cost to learn from and no benefit in tuning the batch size.
271
499k
    return _block_size_predictor != nullptr && _get_push_down_agg_type() != TPushAggOp::type::COUNT;
272
499k
}
273
274
238k
void FileScanner::_reset_adaptive_batch_size_state() {
275
238k
    _block_size_predictor.reset();
276
238k
    COUNTER_SET(_adaptive_batch_predicted_rows_counter, int64_t(0));
277
238k
    COUNTER_SET(_adaptive_batch_actual_bytes_before_truncate_counter, int64_t(0));
278
238k
    COUNTER_SET(_adaptive_batch_actual_bytes_after_truncate_counter, int64_t(0));
279
238k
}
280
281
74.2k
void FileScanner::_init_adaptive_batch_size_state(TFileFormatType::type format_type) {
282
74.2k
    _reset_adaptive_batch_size_state();
283
74.2k
    if (!_should_enable_adaptive_batch_size(format_type)) {
284
817
        return;
285
817
    }
286
287
    // External file readers do not provide reliable memory-size metadata hints. Use a small probe
288
    // batch so the predictor can learn from real FileScanner output quickly.
289
73.4k
    _block_size_predictor = std::make_unique<AdaptiveBlockSizePredictor>(
290
73.4k
            _state->preferred_block_size_bytes(), 0.0, ADAPTIVE_BATCH_INITIAL_PROBE_ROWS,
291
73.4k
            _state->batch_size());
292
73.4k
}
293
294
220k
size_t FileScanner::_predict_reader_batch_rows() {
295
220k
    DCHECK(_block_size_predictor != nullptr);
296
220k
    size_t predicted_rows = _block_size_predictor->predict_next_rows();
297
220k
    COUNTER_SET(_adaptive_batch_predicted_rows_counter, static_cast<int64_t>(predicted_rows));
298
220k
    return predicted_rows;
299
220k
}
300
301
130k
void FileScanner::_update_adaptive_batch_size_before_truncate(const Block& block) {
302
130k
    if (!_should_run_adaptive_batch_size()) {
303
9.98k
        return;
304
9.98k
    }
305
306
    // Learn from the logical bytes before CHAR/VARCHAR truncation. The truncated block can be
307
    // much smaller than the data the reader and FileScanner have already materialized.
308
120k
    COUNTER_SET(_adaptive_batch_actual_bytes_before_truncate_counter,
309
120k
                static_cast<int64_t>(block.bytes()));
310
120k
    if (block.rows() == 0) {
311
12.9k
        return;
312
12.9k
    }
313
314
    // Count a probe only when we actually obtain the first non-empty sample that seeds history.
315
107k
    if (!_block_size_predictor->has_history()) {
316
55.5k
        COUNTER_UPDATE(_adaptive_batch_probe_count_counter, 1);
317
55.5k
    }
318
107k
    _block_size_predictor->update(block);
319
107k
}
320
321
130k
void FileScanner::_update_adaptive_batch_size_after_truncate(const Block& block) {
322
130k
    if (!_should_run_adaptive_batch_size()) {
323
9.98k
        return;
324
9.98k
    }
325
326
    // Keep the post-truncate size only for observability. It should not affect the next batch
327
    // because truncation happens after the upstream memory cost has already been paid.
328
120k
    COUNTER_SET(_adaptive_batch_actual_bytes_after_truncate_counter,
329
120k
                static_cast<int64_t>(block.bytes()));
330
120k
}
331
332
// check if the expr is a partition pruning expr
333
30.1k
bool FileScanner::_check_partition_prune_expr(const VExprSPtr& expr) {
334
30.1k
    if (expr->is_slot_ref()) {
335
11.7k
        auto* slot_ref = static_cast<VSlotRef*>(expr.get());
336
11.7k
        return _partition_slot_index_map.find(slot_ref->slot_id()) !=
337
11.7k
               _partition_slot_index_map.end();
338
11.7k
    }
339
18.3k
    if (expr->is_literal()) {
340
5.83k
        return true;
341
5.83k
    }
342
18.5k
    return std::ranges::all_of(expr->children(), [this](const auto& child) {
343
18.5k
        return _check_partition_prune_expr(child);
344
18.5k
    });
345
18.3k
}
346
347
120k
bool FileScanner::_contains_runtime_filter(const VExprContextSPtrs& conjuncts) const {
348
120k
    return std::ranges::any_of(
349
129k
            conjuncts, [](const auto& conjunct) { return conjunct->root()->is_rf_wrapper(); });
350
120k
}
351
352
11.3k
void FileScanner::_init_runtime_filter_partition_prune_ctxs() {
353
11.3k
    _runtime_filter_partition_prune_ctxs.clear();
354
11.6k
    for (auto& conjunct : _conjuncts) {
355
11.6k
        auto impl = conjunct->root()->get_impl();
356
        // If impl is not null, which means this a conjuncts from runtime filter.
357
11.6k
        auto expr = impl ? impl : conjunct->root();
358
11.6k
        if (_check_partition_prune_expr(expr)) {
359
8.40k
            _runtime_filter_partition_prune_ctxs.emplace_back(conjunct);
360
8.40k
        }
361
11.6k
    }
362
11.3k
}
363
364
7.40k
void FileScanner::_init_runtime_filter_partition_prune_block() {
365
    // init block with empty column
366
67.7k
    for (auto const* slot_desc : _real_tuple_desc->slots()) {
367
67.7k
        _runtime_filter_partition_prune_block.insert(
368
67.7k
                ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
369
67.7k
                                      slot_desc->get_data_type_ptr(), slot_desc->col_name()));
370
67.7k
    }
371
7.40k
}
372
373
8.63k
Status FileScanner::_process_runtime_filters_partition_prune(bool& can_filter_all) {
374
8.63k
    SCOPED_TIMER(_runtime_filter_partition_prune_timer);
375
8.63k
    if (_runtime_filter_partition_prune_ctxs.empty() || _partition_col_descs.empty()) {
376
5.76k
        return Status::OK();
377
5.76k
    }
378
2.87k
    size_t partition_value_column_size = 1;
379
380
    // 1. Get partition key values to string columns.
381
2.87k
    std::unordered_map<SlotId, MutableColumnPtr> partition_slot_id_to_column;
382
3.73k
    for (auto const& partition_col_desc : _partition_col_descs) {
383
3.73k
        const auto& [partition_value, partition_slot_desc] = partition_col_desc.second;
384
3.73k
        auto data_type = partition_slot_desc->get_data_type_ptr();
385
3.73k
        auto partition_value_column = data_type->create_column();
386
3.73k
        auto null_it = _partition_value_is_null.find(partition_slot_desc->col_name());
387
3.73k
        DORIS_CHECK(null_it != _partition_value_is_null.end());
388
3.73k
        RETURN_IF_ERROR(fill_partition_column_from_path_value(
389
3.73k
                *partition_value_column, *partition_slot_desc, partition_value,
390
3.73k
                partition_value_column_size, null_it->second));
391
392
3.73k
        partition_slot_id_to_column[partition_slot_desc->id()] = std::move(partition_value_column);
393
3.73k
    }
394
395
    // 2. Fill _runtime_filter_partition_prune_block from the partition column, then execute conjuncts and filter block.
396
    // 2.1 Fill _runtime_filter_partition_prune_block from the partition column to match the conjuncts executing.
397
2.87k
    size_t index = 0;
398
2.87k
    bool first_column_filled = false;
399
7.09k
    for (auto const* slot_desc : _real_tuple_desc->slots()) {
400
7.09k
        if (partition_slot_id_to_column.find(slot_desc->id()) !=
401
7.09k
            partition_slot_id_to_column.end()) {
402
3.73k
            auto partition_value_column = std::move(partition_slot_id_to_column[slot_desc->id()]);
403
3.73k
            _runtime_filter_partition_prune_block.replace_by_position(
404
3.73k
                    index, std::move(partition_value_column));
405
3.73k
            if (index == 0) {
406
1.30k
                first_column_filled = true;
407
1.30k
            }
408
3.73k
        }
409
7.09k
        index++;
410
7.09k
    }
411
412
    // 2.2 Execute conjuncts.
413
2.87k
    if (!first_column_filled) {
414
        // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
415
        // The following process may be tricky and time-consuming, but we have no other way.
416
1.56k
        auto column = IColumn::mutate(
417
1.56k
                std::move(_runtime_filter_partition_prune_block.get_by_position(0).column));
418
1.56k
        column->resize(partition_value_column_size);
419
1.56k
        _runtime_filter_partition_prune_block.replace_by_position(0, std::move(column));
420
1.56k
    }
421
2.87k
    IColumn::Filter result_filter(_runtime_filter_partition_prune_block.rows(), 1);
422
2.87k
    RETURN_IF_ERROR(VExprContext::execute_conjuncts(_runtime_filter_partition_prune_ctxs, nullptr,
423
2.87k
                                                    &_runtime_filter_partition_prune_block,
424
2.87k
                                                    &result_filter, &can_filter_all));
425
2.87k
    return Status::OK();
426
2.87k
}
427
428
20.8k
Status FileScanner::_process_conjuncts() {
429
20.8k
    _slot_id_to_filter_conjuncts.clear();
430
20.8k
    _not_single_slot_filter_conjuncts.clear();
431
29.1k
    for (auto& conjunct : _push_down_conjuncts) {
432
29.1k
        auto impl = conjunct->root()->get_impl();
433
        // If impl is not null, which means this a conjuncts from runtime filter.
434
29.1k
        auto cur_expr = impl ? impl : conjunct->root();
435
436
29.1k
        std::vector<int> slot_ids;
437
29.1k
        _get_slot_ids(cur_expr.get(), &slot_ids);
438
29.1k
        if (slot_ids.empty()) {
439
1
            _not_single_slot_filter_conjuncts.emplace_back(conjunct);
440
1
            continue;
441
1
        }
442
29.1k
        bool single_slot = true;
443
29.2k
        for (int i = 1; i < slot_ids.size(); i++) {
444
878
            if (slot_ids[i] != slot_ids[0]) {
445
772
                single_slot = false;
446
772
                break;
447
772
            }
448
878
        }
449
29.1k
        if (single_slot) {
450
28.3k
            SlotId slot_id = slot_ids[0];
451
28.3k
            _slot_id_to_filter_conjuncts[slot_id].emplace_back(conjunct);
452
28.3k
        } else {
453
778
            _not_single_slot_filter_conjuncts.emplace_back(conjunct);
454
778
        }
455
29.1k
    }
456
20.8k
    return Status::OK();
457
20.8k
}
458
459
57.5k
Status FileScanner::_process_late_arrival_conjuncts() {
460
57.5k
    if (_push_down_conjuncts.size() < _conjuncts.size()) {
461
20.8k
        _push_down_conjuncts = _conjuncts;
462
        // Do not clear _conjuncts here!
463
        // We must keep it for fallback filtering, especially when mixing
464
        // Native readers (which use _push_down_conjuncts) and JNI readers (which rely on _conjuncts).
465
        // _conjuncts.clear();
466
20.8k
        RETURN_IF_ERROR(_process_conjuncts());
467
20.8k
    }
468
57.5k
    if (_applied_rf_num == _total_rf_num) {
469
57.5k
        _local_state->scanner_profile()->add_info_string("ApplyAllRuntimeFilters", "True");
470
57.5k
    }
471
57.5k
    return Status::OK();
472
57.5k
}
473
474
61.9k
void FileScanner::_get_slot_ids(VExpr* expr, std::vector<int>* slot_ids) {
475
62.8k
    for (auto& child_expr : expr->children()) {
476
62.8k
        if (child_expr->is_slot_ref()) {
477
30.0k
            VSlotRef* slot_ref = reinterpret_cast<VSlotRef*>(child_expr.get());
478
30.0k
            SlotDescriptor* slot_desc = _state->desc_tbl().get_slot_descriptor(slot_ref->slot_id());
479
30.0k
            slot_desc->set_is_predicate(true);
480
30.0k
            slot_ids->emplace_back(slot_ref->slot_id());
481
32.7k
        } else {
482
32.7k
            _get_slot_ids(child_expr.get(), slot_ids);
483
32.7k
        }
484
62.8k
    }
485
61.9k
}
486
487
90.1k
Status FileScanner::_open_impl(RuntimeState* state) {
488
90.1k
    RETURN_IF_CANCELLED(state);
489
90.1k
    RETURN_IF_ERROR(Scanner::_open_impl(state));
490
90.1k
    if (_local_state) {
491
90.0k
        _condition_cache_digest = _local_state->get_condition_cache_digest();
492
90.0k
    }
493
90.1k
    RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range));
494
90.1k
    if (_first_scan_range) {
495
53.6k
        RETURN_IF_ERROR(_init_expr_ctxes());
496
53.6k
        if (_state->query_options().enable_runtime_filter_partition_prune &&
497
53.6k
            !_partition_slot_index_map.empty()) {
498
7.40k
            _init_runtime_filter_partition_prune_ctxs();
499
7.40k
            _init_runtime_filter_partition_prune_block();
500
7.40k
        }
501
53.6k
    } else {
502
        // there's no scan range in split source. stop scanner directly.
503
36.4k
        _scanner_eof = true;
504
36.4k
    }
505
506
90.1k
    return Status::OK();
507
90.1k
}
508
509
328k
Status FileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
510
328k
    Status st = _get_block_wrapped(state, block, eof);
511
512
328k
    if (!st.ok()) {
513
        // add cur path in error msg for easy debugging
514
92
        return std::move(st.append(". cur path: " + get_current_scan_range_name()));
515
92
    }
516
328k
    return st;
517
328k
}
518
519
// For query:
520
//                              [exist cols]  [non-exist cols]  [col from path]  input  output
521
//                              A     B    C  D                 E
522
// _init_src_block              x     x    x  x                 x                -      x
523
// get_next_block               x     x    x  -                 -                -      x
524
// _cast_to_input_block         -     -    -  -                 -                -      -
525
// _fill_columns_from_path      -     -    -  -                 x                -      x
526
// _fill_missing_columns        -     -    -  x                 -                -      x
527
// _convert_to_output_block     -     -    -  -                 -                -      -
528
//
529
// For load:
530
//                              [exist cols]  [non-exist cols]  [col from path]  input  output
531
//                              A     B    C  D                 E
532
// _init_src_block              x     x    x  x                 x                x      -
533
// get_next_block               x     x    x  -                 -                x      -
534
// _cast_to_input_block         x     x    x  -                 -                x      -
535
// _fill_columns_from_path      -     -    -  -                 x                x      -
536
// _fill_missing_columns        -     -    -  x                 -                x      -
537
// _convert_to_output_block     -     -    -  -                 -                -      x
538
328k
Status FileScanner::_get_block_wrapped(RuntimeState* state, Block* block, bool* eof) {
539
328k
    do {
540
328k
        RETURN_IF_CANCELLED(state);
541
328k
        if (_cur_reader == nullptr || _cur_reader_eof) {
542
163k
            _finalize_reader_condition_cache();
543
            // The file may not exist because the file list is got from meta cache,
544
            // And the file may already be removed from storage.
545
            // Just ignore not found files.
546
163k
            Status st = _get_next_reader();
547
163k
            if (st.is<ErrorCode::NOT_FOUND>() && config::ignore_not_found_file_in_external_table) {
548
0
                _cur_reader_eof = true;
549
0
                COUNTER_UPDATE(_not_found_file_counter, 1);
550
0
                continue;
551
163k
            } else if (st.is<ErrorCode::END_OF_FILE>()) {
552
16
                _cur_reader_eof = true;
553
16
                COUNTER_UPDATE(_fully_skipped_file_counter, 1);
554
16
                continue;
555
163k
            } else if (!st) {
556
16
                return st;
557
16
            }
558
163k
            _init_reader_condition_cache();
559
163k
        }
560
561
328k
        if (_scanner_eof) {
562
89.3k
            *eof = true;
563
89.3k
            return Status::OK();
564
89.3k
        }
565
566
        // Init src block for load job based on the data file schema (e.g. parquet)
567
        // For query job, simply set _src_block_ptr to block.
568
239k
        size_t read_rows = 0;
569
239k
        RETURN_IF_ERROR(_init_src_block(block));
570
571
239k
        if (_should_run_adaptive_batch_size()) {
572
220k
            _cur_reader->set_batch_size(_predict_reader_batch_rows());
573
220k
        }
574
239k
        {
575
239k
            SCOPED_TIMER(_get_block_timer);
576
577
            // Read next block.
578
            // Some of column in block may not be filled (column not exist in file)
579
239k
            RETURN_IF_ERROR(
580
239k
                    _cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof));
581
239k
        }
582
        // use read_rows instead of _src_block_ptr->rows(), because the first column of _src_block_ptr
583
        // may not be filled after calling `get_next_block()`, so _src_block_ptr->rows() may return wrong result.
584
238k
        if (read_rows > 0) {
585
134k
            if ((!_cur_reader->count_read_rows()) && _io_ctx) {
586
23.1k
                _io_ctx->file_reader_stats->read_rows += read_rows;
587
23.1k
            }
588
            // If the push_down_agg_type is COUNT, no need to do the rest,
589
            // because we only save a number in block.
590
134k
            if (_get_push_down_agg_type() != TPushAggOp::type::COUNT) {
591
130k
                RETURN_IF_ERROR(_process_src_block_after_read(block));
592
130k
            }
593
134k
        }
594
238k
        break;
595
238k
    } while (true);
596
597
    // Update filtered rows and unselected rows for load, reset counter.
598
    // {
599
    //     state->update_num_rows_load_filtered(_counter.num_rows_filtered);
600
    //     state->update_num_rows_load_unselected(_counter.num_rows_unselected);
601
    //     _reset_counter();
602
    // }
603
238k
    return Status::OK();
604
328k
}
605
606
/**
607
 * Check whether there are complex types in parquet/orc reader in broker/stream load.
608
 * Broker/stream load usually casts file columns from string slots. Complex parquet/orc columns are
609
 * supported only when the input slot that reads the file column keeps a complex type.
610
 */
611
13.1k
Status FileScanner::_check_output_block_types() {
612
    // Only called from _init_src_block_for_load, so _is_load is always true.
613
13.1k
    TFileFormatType::type format_type = _params->format_type;
614
13.1k
    if (format_type == TFileFormatType::FORMAT_PARQUET ||
615
13.1k
        format_type == TFileFormatType::FORMAT_ORC) {
616
1.91k
        for (auto slot : _output_tuple_desc->slots()) {
617
1.91k
            if (is_complex_type(slot->type()->get_primitive_type())) {
618
54
                const auto& output_col_name = slot->col_name_lower_case();
619
216
                for (auto input_slot : _input_tuple_desc->slots()) {
620
216
                    if (input_slot->col_name_lower_case() != output_col_name) {
621
189
                        continue;
622
189
                    }
623
27
                    auto file_col_type = _slot_lower_name_to_col_type.find(output_col_name);
624
27
                    if (file_col_type != _slot_lower_name_to_col_type.end() &&
625
27
                        !is_complex_type(file_col_type->second->get_primitive_type())) {
626
0
                        return Status::InternalError(
627
0
                                "Parquet/orc complex types in broker/stream load require complex "
628
0
                                "file columns, but column '{}' is read as {}.",
629
0
                                slot->col_name(), file_col_type->second->get_name());
630
0
                    }
631
27
                    if (!is_complex_type(input_slot->type()->get_primitive_type())) {
632
0
                        return Status::InternalError(
633
0
                                "Parquet/orc complex types in broker/stream load require complex "
634
0
                                "input slots, but column '{}' is planned as {}.",
635
0
                                slot->col_name(), input_slot->type()->get_name());
636
0
                    }
637
27
                }
638
54
            }
639
1.91k
        }
640
1.80k
        for (auto input_slot : _input_tuple_desc->slots()) {
641
1.80k
            if (!is_complex_type(input_slot->type()->get_primitive_type())) {
642
1.74k
                continue;
643
1.74k
            }
644
54
            auto file_col_type =
645
54
                    _slot_lower_name_to_col_type.find(input_slot->col_name_lower_case());
646
54
            if (file_col_type != _slot_lower_name_to_col_type.end() &&
647
54
                !is_complex_type(file_col_type->second->get_primitive_type())) {
648
0
                return Status::InternalError(
649
0
                        "Parquet/orc complex types in broker/stream load require complex "
650
0
                        "file columns, but column '{}' is read as {}.",
651
0
                        input_slot->col_name(), file_col_type->second->get_name());
652
0
            }
653
54
        }
654
82
    }
655
13.1k
    return Status::OK();
656
13.1k
}
657
658
238k
Status FileScanner::_init_src_block(Block* block) {
659
238k
    DCHECK(_init_src_block_handler != nullptr);
660
238k
    return (this->*_init_src_block_handler)(block);
661
238k
}
662
663
225k
Status FileScanner::_init_src_block_for_query(Block* block) {
664
225k
    _src_block_ptr = block;
665
666
    // Build name to index map only once on first call.
667
225k
    if (_src_block_name_to_idx.empty()) {
668
51.5k
        _src_block_name_to_idx = block->get_name_to_pos_map();
669
51.5k
    }
670
225k
    return Status::OK();
671
225k
}
672
673
13.1k
Status FileScanner::_init_src_block_for_load(Block* block) {
674
13.1k
    static_cast<void>(block);
675
13.1k
    RETURN_IF_ERROR(_check_output_block_types());
676
677
    // if (_src_block_init) {
678
    //     _src_block.clear_column_data();
679
    //     _src_block_ptr = &_src_block;
680
    //     return Status::OK();
681
    // }
682
683
13.1k
    _src_block.clear();
684
13.1k
    uint32_t idx = 0;
685
    // slots in _input_tuple_desc contains all slots describe in load statement, eg:
686
    // -H "columns: k1, k2, tmp1, k3 = tmp1 + 1"
687
    // _input_tuple_desc will contains: k1, k2, tmp1
688
    // and some of them are from file, such as k1 and k2, and some of them may not exist in file, such as tmp1
689
    // _input_tuple_desc also contains columns from path
690
121k
    for (auto& slot : _input_tuple_desc->slots()) {
691
121k
        DataTypePtr data_type;
692
121k
        auto it = _slot_lower_name_to_col_type.find(slot->col_name());
693
121k
        if (slot->is_skip_bitmap_col()) {
694
358
            _skip_bitmap_col_idx = idx;
695
358
        }
696
121k
        if (_params->__isset.sequence_map_col) {
697
1.29k
            if (_params->sequence_map_col == slot->col_name()) {
698
203
                _sequence_map_col_uid = slot->col_unique_id();
699
203
            }
700
1.29k
        }
701
121k
        data_type =
702
121k
                it == _slot_lower_name_to_col_type.end() ? slot->type() : make_nullable(it->second);
703
121k
        MutableColumnPtr data_column = data_type->create_column();
704
121k
        _src_block.insert(
705
121k
                ColumnWithTypeAndName(std::move(data_column), data_type, slot->col_name()));
706
121k
        _src_block_name_to_idx.emplace(slot->col_name(), idx++);
707
121k
    }
708
13.1k
    if (_params->__isset.sequence_map_col) {
709
1.81k
        for (const auto& slot : _output_tuple_desc->slots()) {
710
            // When the target table has seqeunce map column, _input_tuple_desc will not contains __DORIS_SEQUENCE_COL__,
711
            // so we should get its column unique id from _output_tuple_desc
712
1.81k
            if (slot->is_sequence_col()) {
713
211
                _sequence_col_uid = slot->col_unique_id();
714
211
            }
715
1.81k
        }
716
247
    }
717
13.1k
    _src_block_ptr = &_src_block;
718
13.1k
    _src_block_init = true;
719
13.1k
    return Status::OK();
720
13.1k
}
721
722
8.47k
Status FileScanner::_cast_to_input_block(Block* block) {
723
    // Only called from _process_src_block_after_read_for_load, so _is_load is always true.
724
8.47k
    SCOPED_TIMER(_cast_to_input_block_timer);
725
    // cast primitive type(PT0) to primitive type(PT1)
726
8.47k
    uint32_t idx = 0;
727
89.0k
    for (auto& slot_desc : _input_tuple_desc->slots()) {
728
89.0k
        if (_slot_lower_name_to_col_type.find(slot_desc->col_name()) ==
729
89.0k
            _slot_lower_name_to_col_type.end()) {
730
            // skip columns which does not exist in file
731
2.72k
            continue;
732
2.72k
        }
733
86.3k
        auto& arg = _src_block_ptr->get_by_position(_src_block_name_to_idx[slot_desc->col_name()]);
734
86.3k
        auto return_type = slot_desc->get_data_type_ptr();
735
        // remove nullable here, let the get_function decide whether nullable
736
86.3k
        auto data_type = get_data_type_with_default_argument(remove_nullable(return_type));
737
86.3k
        ColumnsWithTypeAndName arguments {
738
86.3k
                arg, {data_type->create_column(), data_type, slot_desc->col_name()}};
739
86.3k
        auto func_cast =
740
86.3k
                SimpleFunctionFactory::instance().get_function("CAST", arguments, return_type, {});
741
86.3k
        if (!func_cast) {
742
0
            return Status::InternalError("Function CAST[arg={}, col name={}, return={}] not found!",
743
0
                                         arg.type->get_name(), slot_desc->col_name(),
744
0
                                         return_type->get_name());
745
0
        }
746
86.3k
        idx = _src_block_name_to_idx[slot_desc->col_name()];
747
86.3k
        DCHECK(_state != nullptr);
748
86.3k
        auto ctx = FunctionContext::create_context(_state, {}, {});
749
86.3k
        RETURN_IF_ERROR(
750
86.3k
                func_cast->execute(ctx.get(), *_src_block_ptr, {idx}, idx, arg.column->size()));
751
86.3k
        _src_block_ptr->get_by_position(idx).type = std::move(return_type);
752
86.3k
    }
753
8.47k
    return Status::OK();
754
8.47k
}
755
756
8.47k
Status FileScanner::_pre_filter_src_block() {
757
    // Only called from _process_src_block_after_read_for_load, so _is_load is always true.
758
8.47k
    if (!_pre_conjunct_ctxs.empty()) {
759
56
        SCOPED_TIMER(_pre_filter_timer);
760
56
        auto origin_column_num = _src_block_ptr->columns();
761
56
        auto old_rows = _src_block_ptr->rows();
762
56
        RETURN_IF_ERROR(
763
56
                VExprContext::filter_block(_pre_conjunct_ctxs, _src_block_ptr, origin_column_num));
764
56
        _counter.num_rows_unselected += old_rows - _src_block_ptr->rows();
765
56
    }
766
8.47k
    return Status::OK();
767
8.47k
}
768
769
8.47k
Status FileScanner::_convert_to_output_block(Block* block) {
770
    // Only called from _process_src_block_after_read_for_load, so _is_load is always true.
771
8.47k
    SCOPED_TIMER(_convert_to_output_block_timer);
772
    // The block is passed from scanner context's free blocks,
773
    // which is initialized by output columns
774
    // so no need to clear it
775
    // block->clear();
776
777
8.47k
    int ctx_idx = 0;
778
8.47k
    size_t rows = _src_block_ptr->rows();
779
8.47k
    auto filter_column = ColumnUInt8::create(rows, 1);
780
8.47k
    auto& filter_map = filter_column->get_data();
781
782
    // After convert, the column_ptr should be copied into output block.
783
    // Can not use block->insert() because it may cause use_count() non-zero bug
784
8.47k
    auto scoped_mutable_output_block =
785
8.47k
            VectorizedUtils::build_scoped_mutable_mem_reuse_block(block, *_dest_row_desc);
786
8.47k
    auto& mutable_output_block = scoped_mutable_output_block.mutable_block();
787
8.47k
    auto& mutable_output_columns = mutable_output_block.mutable_columns();
788
789
8.47k
    std::vector<BitmapValue>* skip_bitmaps {nullptr};
790
8.47k
    MutableColumnPtr skip_bitmap_column;
791
8.47k
    if (_should_process_skip_bitmap_col()) {
792
201
        skip_bitmap_column = IColumn::mutate(
793
201
                std::move(_src_block_ptr->get_by_position(_skip_bitmap_col_idx).column));
794
201
        auto* skip_bitmap_nullable_col_ptr = assert_cast<ColumnNullable*>(skip_bitmap_column.get());
795
201
        skip_bitmaps = &(assert_cast<ColumnBitmap*>(
796
201
                                 skip_bitmap_nullable_col_ptr->get_nested_column_ptr().get())
797
201
                                 ->get_data());
798
        // NOTE:
799
        // - If the table has sequence type column, __DORIS_SEQUENCE_COL__ will be put in _input_tuple_desc, so whether
800
        //   __DORIS_SEQUENCE_COL__ will be marked in skip bitmap depends on whether it's specified in that row
801
        // - If the table has sequence map column, __DORIS_SEQUENCE_COL__ will not be put in _input_tuple_desc,
802
        //   so __DORIS_SEQUENCE_COL__ will be ommited if it't specified in a row and will not be marked in skip bitmap.
803
        //   So we should mark __DORIS_SEQUENCE_COL__ in skip bitmap here if the corresponding sequence map column us marked
804
201
        if (_sequence_map_col_uid != -1) {
805
352
            for (int j = 0; j < rows; ++j) {
806
328
                if ((*skip_bitmaps)[j].contains(_sequence_map_col_uid)) {
807
137
                    (*skip_bitmaps)[j].add(_sequence_col_uid);
808
137
                }
809
328
            }
810
24
        }
811
201
        _src_block_ptr->replace_by_position(_skip_bitmap_col_idx, std::move(skip_bitmap_column));
812
201
    }
813
814
    // for (auto slot_desc : _output_tuple_desc->slots()) {
815
105k
    for (int j = 0; j < mutable_output_columns.size(); ++j) {
816
97.2k
        auto* slot_desc = _output_tuple_desc->slots()[j];
817
97.2k
        int dest_index = ctx_idx;
818
97.2k
        ColumnPtr column_ptr;
819
820
97.2k
        auto& ctx = _dest_vexpr_ctx[dest_index];
821
        // PT1 => dest primitive type
822
97.2k
        RETURN_IF_ERROR(ctx->execute(_src_block_ptr, column_ptr));
823
        // column_ptr maybe a ColumnConst, convert it to a normal column
824
97.2k
        column_ptr = column_ptr->convert_to_full_column_if_const();
825
97.2k
        DCHECK(column_ptr);
826
827
        // because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr
828
        // is likely to be nullable
829
97.2k
        if (LIKELY(is_column_nullable(*column_ptr))) {
830
89.4k
            const auto* nullable_column = reinterpret_cast<const ColumnNullable*>(column_ptr.get());
831
175M
            for (int i = 0; i < rows; ++i) {
832
175M
                if (filter_map[i] && nullable_column->is_null_at(i)) {
833
                    // skip checks for non-mentioned columns in flexible partial update
834
2.02M
                    if (skip_bitmaps == nullptr ||
835
2.02M
                        !skip_bitmaps->at(i).contains(slot_desc->col_unique_id())) {
836
                        // clang-format off
837
1.97M
                        if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) &&
838
1.97M
                            !_src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index]).column->is_null_at(i)) {
839
5.49k
                            filter_map[i] = false;
840
5.49k
                            RETURN_IF_ERROR(_state->append_error_msg_to_file(
841
5.49k
                                [&]() -> std::string {
842
5.49k
                                    return _src_block_ptr->dump_one_line(i, _num_of_columns_from_file);
843
5.49k
                                },
844
5.49k
                                [&]() -> std::string {
845
5.49k
                                    auto raw_value =
846
5.49k
                                            _src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index]).column->get_data_at(i);
847
5.49k
                                    std::string raw_string = raw_value.to_string();
848
5.49k
                                    fmt::memory_buffer error_msg;
849
5.49k
                                    fmt::format_to(error_msg,"column({}) value is incorrect while strict mode is {}, src value is {}",
850
5.49k
                                            slot_desc->col_name(), _strict_mode, raw_string);
851
5.49k
                                    return fmt::to_string(error_msg);
852
5.49k
                                }));
853
1.96M
                        } else if (!slot_desc->is_nullable()) {
854
648
                            filter_map[i] = false;
855
648
                            RETURN_IF_ERROR(_state->append_error_msg_to_file(
856
648
                                [&]() -> std::string {
857
648
                                    return _src_block_ptr->dump_one_line(i, _num_of_columns_from_file);
858
648
                                },
859
648
                                [&]() -> std::string {
860
648
                                    fmt::memory_buffer error_msg;
861
648
                                    fmt::format_to(error_msg, "column({}) values is null while columns is not nullable", slot_desc->col_name());
862
648
                                    return fmt::to_string(error_msg);
863
648
                                }));
864
648
                        }
865
                        // clang-format on
866
1.97M
                    }
867
2.02M
                }
868
175M
            }
869
89.4k
            if (!slot_desc->is_nullable()) {
870
48.1k
                column_ptr = remove_nullable(column_ptr);
871
48.1k
            }
872
89.4k
        } else if (slot_desc->is_nullable()) {
873
115
            column_ptr = make_nullable(column_ptr);
874
115
        }
875
97.2k
        mutable_output_columns[j]->insert_range_from(*column_ptr, 0, rows);
876
97.2k
        ctx_idx++;
877
97.2k
    }
878
8.47k
    scoped_mutable_output_block.restore();
879
880
    // after do the dest block insert operation, clear _src_block to remove the reference of origin column
881
8.47k
    _src_block_ptr->clear();
882
883
8.47k
    size_t dest_size = block->columns();
884
    // do filter
885
8.47k
    block->insert(ColumnWithTypeAndName(std::move(filter_column), std::make_shared<DataTypeUInt8>(),
886
8.47k
                                        "filter column"));
887
8.47k
    RETURN_IF_ERROR(Block::filter_block(block, dest_size, dest_size));
888
889
8.47k
    _counter.num_rows_filtered += rows - block->rows();
890
8.47k
    return Status::OK();
891
8.47k
}
892
893
130k
Status FileScanner::_process_src_block_after_read(Block* block) {
894
130k
    DCHECK(_process_src_block_after_read_handler != nullptr);
895
130k
    return (this->*_process_src_block_after_read_handler)(block);
896
130k
}
897
898
121k
Status FileScanner::_process_src_block_after_read_for_query(Block* block) {
899
121k
    _update_adaptive_batch_size_before_truncate(*block);
900
901
    // Truncate CHAR/VARCHAR columns when target size is smaller than file schema.
902
    // This is needed for external table queries with truncate_char_or_varchar_columns=true.
903
121k
    RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block));
904
905
121k
    _update_adaptive_batch_size_after_truncate(*block);
906
121k
    return Status::OK();
907
121k
}
908
909
8.47k
Status FileScanner::_process_src_block_after_read_for_load(Block* block) {
910
    // Convert the src block columns type in-place.
911
8.47k
    RETURN_IF_ERROR(_cast_to_input_block(block));
912
    // All readers fill partition and missing columns inside get_next_block:
913
    //   ORC/Parquet: in _do_get_next_block via on_fill_partition_columns/on_fill_missing_columns
914
    //   CSV/JSON/others: in on_after_read_block via fill_remaining_columns
915
    // Assert all columns have consistent row counts.
916
8.47k
    if (_src_block_ptr->columns() > 0) {
917
8.47k
        size_t rows = _src_block_ptr->get_by_position(0).column->size();
918
89.0k
        for (size_t i = 1; i < _src_block_ptr->columns(); ++i) {
919
80.6k
            DCHECK_EQ(_src_block_ptr->get_by_position(i).column->size(), rows)
920
0
                    << "Column " << _src_block_ptr->get_by_position(i).name << " has "
921
0
                    << _src_block_ptr->get_by_position(i).column->size() << " rows, expected "
922
0
                    << rows;
923
80.6k
        }
924
8.47k
    }
925
    // Apply _pre_conjunct_ctxs to filter src block.
926
8.47k
    RETURN_IF_ERROR(_pre_filter_src_block());
927
928
    // Convert src block to output block (dest block), then apply filters.
929
8.47k
    RETURN_IF_ERROR(_convert_to_output_block(block));
930
931
8.47k
    _update_adaptive_batch_size_before_truncate(*block);
932
933
    // Truncate CHAR/VARCHAR columns when target size is smaller than file schema.
934
8.47k
    RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block));
935
936
8.47k
    _update_adaptive_batch_size_after_truncate(*block);
937
8.47k
    return Status::OK();
938
8.47k
}
939
940
130k
Status FileScanner::_truncate_char_or_varchar_columns(Block* block) {
941
    // Truncate char columns or varchar columns if size is smaller than file columns
942
    // or not found in the file column schema.
943
130k
    if (!_state->query_options().truncate_char_or_varchar_columns) {
944
130k
        return Status::OK();
945
130k
    }
946
8
    int idx = 0;
947
36
    for (auto* slot_desc : _real_tuple_desc->slots()) {
948
36
        const auto& type = slot_desc->type();
949
36
        if (type->get_primitive_type() != TYPE_VARCHAR && type->get_primitive_type() != TYPE_CHAR) {
950
12
            ++idx;
951
12
            continue;
952
12
        }
953
24
        auto iter = _source_file_col_name_types.find(slot_desc->col_name());
954
24
        if (iter != _source_file_col_name_types.end()) {
955
16
            const auto file_type_desc = _source_file_col_name_types[slot_desc->col_name()];
956
16
            int l = -1;
957
16
            if (auto* ftype = check_and_get_data_type<DataTypeString>(
958
16
                        remove_nullable(file_type_desc).get())) {
959
16
                l = ftype->len();
960
16
            }
961
16
            if ((assert_cast<const DataTypeString*>(remove_nullable(type).get())->len() > 0) &&
962
16
                (assert_cast<const DataTypeString*>(remove_nullable(type).get())->len() < l ||
963
16
                 l < 0)) {
964
16
                _truncate_char_or_varchar_column(
965
16
                        block, idx,
966
16
                        assert_cast<const DataTypeString*>(remove_nullable(type).get())->len());
967
16
            }
968
16
        } else {
969
8
            _truncate_char_or_varchar_column(
970
8
                    block, idx,
971
8
                    assert_cast<const DataTypeString*>(remove_nullable(type).get())->len());
972
8
        }
973
24
        ++idx;
974
24
    }
975
8
    return Status::OK();
976
130k
}
977
978
// VARCHAR substring(VARCHAR str, INT pos[, INT len])
979
24
void FileScanner::_truncate_char_or_varchar_column(Block* block, int idx, int len) {
980
24
    auto int_type = std::make_shared<DataTypeInt32>();
981
24
    uint32_t num_columns_without_result = block->columns();
982
24
    const ColumnNullable* col_nullable =
983
24
            assert_cast<const ColumnNullable*>(block->get_by_position(idx).column.get());
984
24
    const ColumnPtr& string_column_ptr = col_nullable->get_nested_column_ptr();
985
24
    ColumnPtr null_map_column_ptr = col_nullable->get_null_map_column_ptr();
986
24
    block->replace_by_position(idx, std::move(string_column_ptr));
987
24
    block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(1)), int_type,
988
24
                   "const 1"}); // pos is 1
989
24
    block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(len)), int_type,
990
24
                   fmt::format("const {}", len)});                          // len
991
24
    block->insert({nullptr, std::make_shared<DataTypeString>(), "result"}); // result column
992
24
    ColumnNumbers temp_arguments(3);
993
24
    temp_arguments[0] = idx;                            // str column
994
24
    temp_arguments[1] = num_columns_without_result;     // pos
995
24
    temp_arguments[2] = num_columns_without_result + 1; // len
996
24
    uint32_t result_column_id = num_columns_without_result + 2;
997
998
24
    SubstringUtil::substring_execute(*block, temp_arguments, result_column_id, block->rows());
999
24
    auto res = ColumnNullable::create(block->get_by_position(result_column_id).column,
1000
24
                                      null_map_column_ptr);
1001
24
    block->replace_by_position(idx, std::move(res));
1002
24
    Block::erase_useless_column(block, num_columns_without_result);
1003
24
}
1004
1005
7.49k
std::shared_ptr<segment_v2::RowIdColumnIteratorV2> FileScanner::_create_row_id_column_iterator() {
1006
7.49k
    auto& id_file_map = _state->get_id_file_map();
1007
7.49k
    auto file_id = id_file_map->get_file_mapping_id(
1008
7.49k
            std::make_shared<FileMapping>(((FileScanLocalState*)_local_state)->parent_id(),
1009
7.49k
                                          _current_range, _should_enable_file_meta_cache()));
1010
7.49k
    return std::make_shared<RowIdColumnIteratorV2>(IdManager::ID_VERSION,
1011
7.49k
                                                   BackendOptions::get_backend_id(), file_id);
1012
7.49k
}
1013
1014
77.0k
void FileScanner::_fill_base_init_context(ReaderInitContext* ctx) {
1015
77.0k
    ctx->column_descs = &_column_descs;
1016
77.0k
    ctx->col_name_to_block_idx = &_src_block_name_to_idx;
1017
77.0k
    ctx->state = _state;
1018
77.0k
    ctx->tuple_descriptor = _real_tuple_desc;
1019
77.0k
    ctx->row_descriptor = _default_val_row_desc.get();
1020
77.0k
    ctx->params = _params;
1021
77.0k
    ctx->range = &_current_range;
1022
77.0k
    ctx->table_info_node = TableSchemaChangeHelper::ConstNode::get_instance();
1023
77.0k
    ctx->push_down_agg_type = _get_push_down_agg_type();
1024
77.0k
}
1025
1026
163k
Status FileScanner::_get_next_reader() {
1027
164k
    while (true) {
1028
164k
        if (_cur_reader) {
1029
73.7k
            _cur_reader->collect_profile_before_close();
1030
73.7k
            RETURN_IF_ERROR(_cur_reader->close());
1031
73.7k
            _state->update_num_finished_scan_range(1);
1032
73.7k
        }
1033
164k
        _cur_reader.reset(nullptr);
1034
164k
        _reset_adaptive_batch_size_state();
1035
164k
        _src_block_init = false;
1036
164k
        bool has_next = _first_scan_range;
1037
164k
        if (!_first_scan_range) {
1038
110k
            RETURN_IF_ERROR(_split_source->get_next(&has_next, &_current_range));
1039
110k
        }
1040
164k
        _first_scan_range = false;
1041
164k
        if (!has_next || _should_stop) {
1042
89.4k
            _scanner_eof = true;
1043
89.4k
            return Status::OK();
1044
89.4k
        }
1045
1046
75.0k
        const TFileRangeDesc& range = _current_range;
1047
75.0k
        _current_range_path = range.path;
1048
1049
75.0k
        if (!_partition_slot_index_map.empty()) {
1050
            // we need get partition columns first for runtime filter partition pruning
1051
9.85k
            RETURN_IF_ERROR(_generate_partition_columns());
1052
1053
9.85k
            if (_state->query_options().enable_runtime_filter_partition_prune) {
1054
                // if enable_runtime_filter_partition_prune is true, we need to check whether this range can be filtered out
1055
                // by runtime filter partition prune
1056
8.63k
                if (_push_down_conjuncts.size() < _conjuncts.size()) {
1057
                    // there are new runtime filters, need to re-init runtime filter partition pruning ctxs
1058
4.00k
                    _init_runtime_filter_partition_prune_ctxs();
1059
4.00k
                }
1060
1061
8.63k
                bool can_filter_all = false;
1062
8.63k
                RETURN_IF_ERROR(_process_runtime_filters_partition_prune(can_filter_all));
1063
8.63k
                if (can_filter_all) {
1064
                    // this range can be filtered out by runtime filter partition pruning
1065
                    // so we need to skip this range
1066
546
                    COUNTER_UPDATE(_runtime_filter_partition_pruned_range_counter, 1);
1067
546
                    continue;
1068
546
                }
1069
8.63k
            }
1070
9.85k
        }
1071
1072
        // create reader for specific format
1073
74.4k
        Status init_status = Status::OK();
1074
74.4k
        TFileFormatType::type format_type = _get_current_format_type();
1075
        // for compatibility, this logic is deprecated in 3.1
1076
74.4k
        if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params) {
1077
5.27k
            if (range.table_format_params.table_format_type == "paimon" &&
1078
5.27k
                !range.table_format_params.paimon_params.__isset.paimon_split) {
1079
                // use native reader
1080
0
                auto format = range.table_format_params.paimon_params.file_format;
1081
0
                if (format == "orc") {
1082
0
                    format_type = TFileFormatType::FORMAT_ORC;
1083
0
                } else if (format == "parquet") {
1084
0
                    format_type = TFileFormatType::FORMAT_PARQUET;
1085
0
                } else {
1086
0
                    return Status::InternalError("Not supported paimon file format: {}", format);
1087
0
                }
1088
0
            }
1089
5.27k
        }
1090
1091
74.4k
        bool push_down_predicates = _should_push_down_predicates(format_type);
1092
74.4k
        bool need_to_get_parsed_schema = false;
1093
74.4k
        switch (format_type) {
1094
5.27k
        case TFileFormatType::FORMAT_JNI: {
1095
5.27k
            ReaderInitContext jni_ctx;
1096
5.27k
            _fill_base_init_context(&jni_ctx);
1097
1098
5.27k
            if (range.__isset.table_format_params &&
1099
5.27k
                range.table_format_params.table_format_type == "max_compute") {
1100
0
                const auto* mc_desc = static_cast<const MaxComputeTableDescriptor*>(
1101
0
                        _real_tuple_desc->table_desc());
1102
0
                if (!mc_desc->init_status()) {
1103
0
                    return mc_desc->init_status();
1104
0
                }
1105
0
                std::unique_ptr<MaxComputeJniReader> mc_reader = MaxComputeJniReader::create_unique(
1106
0
                        mc_desc, range.table_format_params.max_compute_params, _file_slot_descs,
1107
0
                        range, _state, _profile);
1108
0
                init_status = static_cast<GenericReader*>(mc_reader.get())->init_reader(&jni_ctx);
1109
0
                _cur_reader = std::move(mc_reader);
1110
5.27k
            } else if (range.__isset.table_format_params &&
1111
5.27k
                       range.table_format_params.table_format_type == "paimon") {
1112
1.98k
                if (_state->query_options().__isset.enable_paimon_cpp_reader &&
1113
1.98k
                    _state->query_options().enable_paimon_cpp_reader) {
1114
0
                    auto cpp_reader = PaimonCppReader::create_unique(_file_slot_descs, _state,
1115
0
                                                                     _profile, range, _params);
1116
0
                    if (!_is_load && !_push_down_conjuncts.empty()) {
1117
0
                        PaimonPredicateConverter predicate_converter(_file_slot_descs, _state);
1118
0
                        auto predicate = predicate_converter.build(_push_down_conjuncts);
1119
0
                        if (predicate) {
1120
0
                            cpp_reader->set_predicate(std::move(predicate));
1121
0
                        }
1122
0
                    }
1123
0
                    init_status =
1124
0
                            static_cast<GenericReader*>(cpp_reader.get())->init_reader(&jni_ctx);
1125
0
                    _cur_reader = std::move(cpp_reader);
1126
1.98k
                } else {
1127
1.98k
                    auto paimon_reader = PaimonJniReader::create_unique(_file_slot_descs, _state,
1128
1.98k
                                                                        _profile, range, _params);
1129
1.98k
                    init_status =
1130
1.98k
                            static_cast<GenericReader*>(paimon_reader.get())->init_reader(&jni_ctx);
1131
1.98k
                    _cur_reader = std::move(paimon_reader);
1132
1.98k
                }
1133
3.28k
            } else if (range.__isset.table_format_params &&
1134
3.28k
                       range.table_format_params.table_format_type == "hudi") {
1135
0
                auto hudi_reader = HudiJniReader::create_unique(
1136
0
                        *_params, range.table_format_params.hudi_params, _file_slot_descs, _state,
1137
0
                        _profile);
1138
0
                init_status = static_cast<GenericReader*>(hudi_reader.get())->init_reader(&jni_ctx);
1139
0
                _cur_reader = std::move(hudi_reader);
1140
3.28k
            } else if (range.__isset.table_format_params &&
1141
3.28k
                       range.table_format_params.table_format_type == "trino_connector") {
1142
790
                auto trino_reader = TrinoConnectorJniReader::create_unique(_file_slot_descs, _state,
1143
790
                                                                           _profile, range);
1144
790
                init_status =
1145
790
                        static_cast<GenericReader*>(trino_reader.get())->init_reader(&jni_ctx);
1146
790
                _cur_reader = std::move(trino_reader);
1147
2.49k
            } else if (range.__isset.table_format_params &&
1148
2.49k
                       range.table_format_params.table_format_type == "jdbc") {
1149
                // Extract jdbc params from table_format_params
1150
1.24k
                std::map<std::string, std::string> jdbc_params(
1151
1.24k
                        range.table_format_params.jdbc_params.begin(),
1152
1.24k
                        range.table_format_params.jdbc_params.end());
1153
1.24k
                auto jdbc_reader = JdbcJniReader::create_unique(_file_slot_descs, _state, _profile,
1154
1.24k
                                                                jdbc_params);
1155
1.24k
                init_status = static_cast<GenericReader*>(jdbc_reader.get())->init_reader(&jni_ctx);
1156
1.24k
                _cur_reader = std::move(jdbc_reader);
1157
1.24k
            } else if (range.__isset.table_format_params &&
1158
1.24k
                       range.table_format_params.table_format_type == "iceberg") {
1159
1.24k
                auto iceberg_sys_reader = IcebergSysTableJniReader::create_unique(
1160
1.24k
                        _file_slot_descs, _state, _profile, range, _params);
1161
1.24k
                init_status = static_cast<GenericReader*>(iceberg_sys_reader.get())
1162
1.24k
                                      ->init_reader(&jni_ctx);
1163
1.24k
                _cur_reader = std::move(iceberg_sys_reader);
1164
1.24k
            }
1165
            // Set col_name_to_block_idx for JNI readers to avoid repeated map creation
1166
5.27k
            if (_cur_reader) {
1167
5.26k
                if (auto* jni_reader = dynamic_cast<JniReader*>(_cur_reader.get())) {
1168
5.25k
                    jni_reader->set_col_name_to_block_idx(&_src_block_name_to_idx);
1169
5.25k
                }
1170
5.26k
            }
1171
5.27k
            break;
1172
5.27k
        }
1173
32.8k
        case TFileFormatType::FORMAT_PARQUET: {
1174
32.8k
            auto file_meta_cache_ptr = _should_enable_file_meta_cache()
1175
32.8k
                                               ? ExecEnv::GetInstance()->file_meta_cache()
1176
18.4E
                                               : nullptr;
1177
32.8k
            if (push_down_predicates) {
1178
32.8k
                RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1179
32.8k
            }
1180
32.8k
            RETURN_IF_ERROR(_init_parquet_reader(file_meta_cache_ptr));
1181
1182
32.8k
            need_to_get_parsed_schema = true;
1183
32.8k
            break;
1184
32.8k
        }
1185
24.7k
        case TFileFormatType::FORMAT_ORC: {
1186
24.7k
            auto file_meta_cache_ptr = _should_enable_file_meta_cache()
1187
24.7k
                                               ? ExecEnv::GetInstance()->file_meta_cache()
1188
24.7k
                                               : nullptr;
1189
24.7k
            if (push_down_predicates) {
1190
24.6k
                RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1191
24.6k
            }
1192
24.7k
            RETURN_IF_ERROR(_init_orc_reader(file_meta_cache_ptr));
1193
1194
24.7k
            need_to_get_parsed_schema = true;
1195
24.7k
            break;
1196
24.7k
        }
1197
2.77k
        case TFileFormatType::FORMAT_CSV_PLAIN:
1198
2.77k
        case TFileFormatType::FORMAT_CSV_GZ:
1199
2.77k
        case TFileFormatType::FORMAT_CSV_BZ2:
1200
2.77k
        case TFileFormatType::FORMAT_CSV_LZ4FRAME:
1201
2.77k
        case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
1202
2.77k
        case TFileFormatType::FORMAT_CSV_LZOP:
1203
2.77k
        case TFileFormatType::FORMAT_CSV_DEFLATE:
1204
2.77k
        case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
1205
2.85k
        case TFileFormatType::FORMAT_PROTO: {
1206
2.85k
            auto reader = CsvReader::create_unique(_state, _profile, &_counter, *_params, range,
1207
2.85k
                                                   _file_slot_descs, _state->batch_size(), nullptr,
1208
2.85k
                                                   _io_ctx);
1209
2.85k
            CsvInitContext csv_ctx;
1210
2.85k
            _fill_base_init_context(&csv_ctx);
1211
2.85k
            csv_ctx.is_load = _is_load;
1212
2.85k
            init_status = static_cast<GenericReader*>(reader.get())->init_reader(&csv_ctx);
1213
2.85k
            _cur_reader = std::move(reader);
1214
2.85k
            break;
1215
2.77k
        }
1216
4.67k
        case TFileFormatType::FORMAT_TEXT: {
1217
4.67k
            auto reader = TextReader::create_unique(_state, _profile, &_counter, *_params, range,
1218
4.67k
                                                    _file_slot_descs, _state->batch_size(), nullptr,
1219
4.67k
                                                    _io_ctx);
1220
4.67k
            CsvInitContext text_ctx;
1221
4.67k
            _fill_base_init_context(&text_ctx);
1222
4.67k
            text_ctx.is_load = _is_load;
1223
4.67k
            init_status = static_cast<GenericReader*>(reader.get())->init_reader(&text_ctx);
1224
4.67k
            _cur_reader = std::move(reader);
1225
4.67k
            break;
1226
2.77k
        }
1227
3.30k
        case TFileFormatType::FORMAT_JSON: {
1228
3.30k
            _cur_reader = NewJsonReader::create_unique(_state, _profile, &_counter, *_params, range,
1229
3.30k
                                                       _file_slot_descs, &_scanner_eof,
1230
3.30k
                                                       _state->batch_size(), nullptr, _io_ctx);
1231
3.30k
            JsonInitContext json_ctx;
1232
3.30k
            _fill_base_init_context(&json_ctx);
1233
3.30k
            json_ctx.col_default_value_ctx = &_col_default_value_ctx;
1234
3.30k
            json_ctx.is_load = _is_load;
1235
3.30k
            init_status = _cur_reader->init_reader(&json_ctx);
1236
3.30k
            break;
1237
2.77k
        }
1238
1239
0
        case TFileFormatType::FORMAT_WAL: {
1240
0
            _cur_reader = WalReader::create_unique(_state);
1241
0
            WalInitContext wal_ctx;
1242
0
            _fill_base_init_context(&wal_ctx);
1243
0
            wal_ctx.output_tuple_descriptor = _output_tuple_desc;
1244
0
            init_status = _cur_reader->init_reader(&wal_ctx);
1245
0
            break;
1246
2.77k
        }
1247
3
        case TFileFormatType::FORMAT_NATIVE: {
1248
3
            auto reader = NativeReader::create_unique(_profile, *_params, range, _io_ctx, _state);
1249
3
            ReaderInitContext native_ctx;
1250
3
            _fill_base_init_context(&native_ctx);
1251
3
            init_status = static_cast<GenericReader*>(reader.get())->init_reader(&native_ctx);
1252
3
            _cur_reader = std::move(reader);
1253
3
            need_to_get_parsed_schema = false;
1254
3
            break;
1255
2.77k
        }
1256
108
        case TFileFormatType::FORMAT_ARROW: {
1257
108
            ReaderInitContext arrow_ctx;
1258
108
            _fill_base_init_context(&arrow_ctx);
1259
1260
108
            if (range.__isset.table_format_params &&
1261
108
                range.table_format_params.table_format_type == "remote_doris") {
1262
98
                auto doris_reader =
1263
98
                        RemoteDorisReader::create_unique(_file_slot_descs, _state, _profile, range);
1264
98
                init_status =
1265
98
                        static_cast<GenericReader*>(doris_reader.get())->init_reader(&arrow_ctx);
1266
98
                if (doris_reader) {
1267
98
                    doris_reader->set_col_name_to_block_idx(&_src_block_name_to_idx);
1268
98
                }
1269
98
                _cur_reader = std::move(doris_reader);
1270
98
            } else {
1271
10
                auto arrow_reader =
1272
10
                        ArrowStreamReader::create_unique(_state, _profile, &_counter, *_params,
1273
10
                                                         range, _file_slot_descs, _io_ctx.get());
1274
10
                init_status =
1275
10
                        static_cast<GenericReader*>(arrow_reader.get())->init_reader(&arrow_ctx);
1276
10
                _cur_reader = std::move(arrow_reader);
1277
10
            }
1278
108
            break;
1279
2.77k
        }
1280
0
#ifdef BUILD_RUST_READERS
1281
28
        case TFileFormatType::FORMAT_LANCE: {
1282
28
            auto lance_reader = LanceRustReader::create_unique(_file_slot_descs, _state, _profile,
1283
28
                                                               range, _params);
1284
28
            init_status = lance_reader->init_reader();
1285
28
            _cur_reader = std::move(lance_reader);
1286
28
            need_to_get_parsed_schema = true;
1287
28
            break;
1288
2.77k
        }
1289
0
#endif
1290
672
        case TFileFormatType::FORMAT_ES_HTTP: {
1291
672
            _cur_reader = EsHttpReader::create_unique(_file_slot_descs, _state, _profile, range,
1292
672
                                                      *_params, _real_tuple_desc);
1293
672
            init_status = static_cast<EsHttpReader*>(_cur_reader.get())->init_reader();
1294
672
            break;
1295
2.77k
        }
1296
0
        default:
1297
0
            return Status::NotSupported("Not supported create reader for file format: {}.",
1298
0
                                        to_string(_params->format_type));
1299
74.4k
        }
1300
1301
74.4k
        if (_cur_reader == nullptr) {
1302
0
            return Status::NotSupported(
1303
0
                    "Not supported create reader for table format: {} / file format: {}.",
1304
0
                    range.__isset.table_format_params ? range.table_format_params.table_format_type
1305
0
                                                      : "NotSet",
1306
0
                    to_string(_params->format_type));
1307
0
        }
1308
74.4k
        COUNTER_UPDATE(_file_counter, 1);
1309
        // The FileScanner for external table may try to open not exist files,
1310
        // Because FE file cache for external table may out of date.
1311
        // So, NOT_FOUND for FileScanner is not a fail case.
1312
        // Will remove this after file reader refactor.
1313
74.4k
        if (init_status.is<END_OF_FILE>()) {
1314
0
            COUNTER_UPDATE(_empty_file_counter, 1);
1315
0
            continue;
1316
74.4k
        } else if (init_status.is<ErrorCode::NOT_FOUND>()) {
1317
0
            if (config::ignore_not_found_file_in_external_table) {
1318
0
                COUNTER_UPDATE(_not_found_file_counter, 1);
1319
0
                continue;
1320
0
            }
1321
0
            return Status::InternalError("failed to find reader, err: {}", init_status.to_string());
1322
74.4k
        } else if (!init_status.ok()) {
1323
1
            return Status::InternalError("failed to init reader, err: {}", init_status.to_string());
1324
1
        }
1325
1326
        // For table-level COUNT pushdown, offsets are undefined so we must skip
1327
        // _set_fill_or_truncate_columns (it uses [start_offset, end_offset] to
1328
        // filter row groups, which would produce incorrect empty results).
1329
74.4k
        bool is_table_level_count = _get_push_down_agg_type() == TPushAggOp::type::COUNT &&
1330
74.4k
                                    range.__isset.table_format_params &&
1331
74.4k
                                    range.table_format_params.table_level_row_count >= 0;
1332
74.4k
        if (!is_table_level_count) {
1333
74.1k
            Status status = _set_fill_or_truncate_columns(need_to_get_parsed_schema);
1334
74.1k
            if (status.is<END_OF_FILE>()) { // all parquet row groups are filtered
1335
0
                continue;
1336
74.1k
            } else if (!status.ok()) {
1337
0
                return Status::InternalError("failed to set_fill_or_truncate_columns, err: {}",
1338
0
                                             status.to_string());
1339
0
            }
1340
74.1k
        }
1341
1342
        // Unified COUNT(*) pushdown: replace the real reader with CountReader
1343
        // decorator if the reader accepts COUNT and can provide a total row count.
1344
74.4k
        if (_cur_reader->get_push_down_agg_type() == TPushAggOp::type::COUNT) {
1345
1.85k
            int64_t total_rows = -1;
1346
1.85k
            if (is_table_level_count) {
1347
                // FE-provided count (may account for table-format deletions)
1348
210
                total_rows = range.table_format_params.table_level_row_count;
1349
1.64k
            } else if (_cur_reader->supports_count_pushdown()) {
1350
                // File metadata count (ORC footer / Parquet row groups)
1351
588
                total_rows = _cur_reader->get_total_rows();
1352
588
            }
1353
1.85k
            if (total_rows >= 0) {
1354
798
                auto batch_size = _state->batch_size();
1355
798
                _cur_reader = std::make_unique<CountReader>(total_rows, batch_size,
1356
798
                                                            std::move(_cur_reader));
1357
798
            }
1358
1.85k
        }
1359
74.4k
        _cur_reader_eof = false;
1360
74.4k
        _init_adaptive_batch_size_state(format_type);
1361
74.4k
        break;
1362
74.4k
    }
1363
74.4k
    return Status::OK();
1364
163k
}
1365
1366
Status FileScanner::_init_parquet_reader(FileMetaCache* file_meta_cache_ptr,
1367
34.1k
                                         std::unique_ptr<ParquetReader> parquet_reader) {
1368
34.1k
    const TFileRangeDesc& range = _current_range;
1369
34.1k
    Status init_status = Status::OK();
1370
1371
34.1k
    phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> slot_id_to_predicates =
1372
34.1k
            _local_state
1373
34.1k
                    ? _local_state->cast<FileScanLocalState>()._slot_id_to_predicates
1374
34.1k
                    : phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> {};
1375
1376
    // Build unified ParquetInitContext (shared by all Parquet reader variants)
1377
34.1k
    ParquetInitContext pctx;
1378
34.1k
    _fill_base_init_context(&pctx);
1379
34.1k
    pctx.conjuncts = &_push_down_conjuncts;
1380
34.1k
    pctx.slot_id_to_predicates = &slot_id_to_predicates;
1381
34.1k
    pctx.colname_to_slot_id = _col_name_to_slot_id;
1382
34.1k
    pctx.not_single_slot_filter_conjuncts = &_not_single_slot_filter_conjuncts;
1383
34.1k
    pctx.slot_id_to_filter_conjuncts = &_slot_id_to_filter_conjuncts;
1384
1385
34.1k
    if (range.__isset.table_format_params &&
1386
34.1k
        range.table_format_params.table_format_type == "iceberg") {
1387
        // IcebergParquetReader IS-A ParquetReader (CRTP mixin), no wrapping needed
1388
18.0k
        std::unique_ptr<IcebergParquetReader> iceberg_reader = IcebergParquetReader::create_unique(
1389
18.0k
                _kv_cache, _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(),
1390
18.0k
                _io_ctx, _state, file_meta_cache_ptr);
1391
18.0k
        iceberg_reader->set_create_row_id_column_iterator_func(
1392
18.0k
                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
1393
246
                    return _create_row_id_column_iterator();
1394
246
                });
1395
18.0k
        init_status = static_cast<GenericReader*>(iceberg_reader.get())->init_reader(&pctx);
1396
18.0k
        _cur_reader = std::move(iceberg_reader);
1397
18.0k
    } else if (range.__isset.table_format_params &&
1398
16.1k
               range.table_format_params.table_format_type == "paimon") {
1399
        // PaimonParquetReader IS-A ParquetReader, no wrapping needed
1400
2.45k
        auto paimon_reader = PaimonParquetReader::create_unique(
1401
2.45k
                _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(), _kv_cache,
1402
2.45k
                _io_ctx, _state, file_meta_cache_ptr);
1403
2.45k
        init_status = static_cast<GenericReader*>(paimon_reader.get())->init_reader(&pctx);
1404
2.45k
        _cur_reader = std::move(paimon_reader);
1405
13.6k
    } else if (range.__isset.table_format_params &&
1406
13.6k
               range.table_format_params.table_format_type == "hudi") {
1407
        // HudiParquetReader IS-A ParquetReader, no wrapping needed
1408
0
        auto hudi_reader = HudiParquetReader::create_unique(
1409
0
                _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(), _io_ctx,
1410
0
                _state, file_meta_cache_ptr);
1411
0
        init_status = static_cast<GenericReader*>(hudi_reader.get())->init_reader(&pctx);
1412
0
        _cur_reader = std::move(hudi_reader);
1413
13.6k
    } else if (range.table_format_params.table_format_type == "hive") {
1414
10.4k
        auto hive_reader = HiveParquetReader::create_unique(
1415
10.4k
                _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(), _io_ctx,
1416
10.4k
                _state, &_is_file_slot, file_meta_cache_ptr,
1417
10.4k
                _state->query_options().enable_parquet_lazy_mat);
1418
10.4k
        hive_reader->set_create_row_id_column_iterator_func(
1419
10.4k
                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
1420
1.83k
                    return _create_row_id_column_iterator();
1421
1.83k
                });
1422
10.4k
        init_status = static_cast<GenericReader*>(hive_reader.get())->init_reader(&pctx);
1423
10.4k
        _cur_reader = std::move(hive_reader);
1424
10.4k
    } else if (range.table_format_params.table_format_type == "tvf") {
1425
3.15k
        if (!parquet_reader) {
1426
3.08k
            parquet_reader = ParquetReader::create_unique(
1427
3.08k
                    _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(),
1428
3.08k
                    _io_ctx, _state, file_meta_cache_ptr,
1429
3.08k
                    _state->query_options().enable_parquet_lazy_mat);
1430
3.08k
        }
1431
3.15k
        parquet_reader->set_create_row_id_column_iterator_func(
1432
3.15k
                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
1433
53
                    return _create_row_id_column_iterator();
1434
53
                });
1435
3.15k
        init_status = static_cast<GenericReader*>(parquet_reader.get())->init_reader(&pctx);
1436
3.15k
        _cur_reader = std::move(parquet_reader);
1437
3.15k
    } else if (_is_load) {
1438
37
        if (!parquet_reader) {
1439
37
            parquet_reader = ParquetReader::create_unique(
1440
37
                    _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(),
1441
37
                    _io_ctx, _state, file_meta_cache_ptr,
1442
37
                    _state->query_options().enable_parquet_lazy_mat);
1443
37
        }
1444
37
        init_status = static_cast<GenericReader*>(parquet_reader.get())->init_reader(&pctx);
1445
37
        _cur_reader = std::move(parquet_reader);
1446
37
    }
1447
1448
34.1k
    return init_status;
1449
34.1k
}
1450
1451
Status FileScanner::_init_orc_reader(FileMetaCache* file_meta_cache_ptr,
1452
26.6k
                                     std::unique_ptr<OrcReader> orc_reader) {
1453
26.6k
    const TFileRangeDesc& range = _current_range;
1454
26.6k
    Status init_status = Status::OK();
1455
1456
    // Build unified OrcInitContext (shared by all ORC reader variants)
1457
26.6k
    OrcInitContext octx;
1458
26.6k
    _fill_base_init_context(&octx);
1459
26.6k
    octx.conjuncts = &_push_down_conjuncts;
1460
26.6k
    octx.not_single_slot_filter_conjuncts = &_not_single_slot_filter_conjuncts;
1461
26.6k
    octx.slot_id_to_filter_conjuncts = &_slot_id_to_filter_conjuncts;
1462
1463
26.6k
    if (range.__isset.table_format_params &&
1464
26.6k
        range.table_format_params.table_format_type == "transactional_hive") {
1465
        // TransactionalHiveReader IS-A OrcReader, no wrapping needed
1466
290
        auto tran_orc_reader = TransactionalHiveReader::create_unique(
1467
290
                _profile, _state, *_params, range, _state->batch_size(), _state->timezone(),
1468
290
                _io_ctx, file_meta_cache_ptr);
1469
290
        tran_orc_reader->set_create_row_id_column_iterator_func(
1470
290
                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
1471
54
                    return _create_row_id_column_iterator();
1472
54
                });
1473
290
        init_status = static_cast<GenericReader*>(tran_orc_reader.get())->init_reader(&octx);
1474
1475
290
        _cur_reader = std::move(tran_orc_reader);
1476
26.3k
    } else if (range.__isset.table_format_params &&
1477
26.3k
               range.table_format_params.table_format_type == "iceberg") {
1478
        // IcebergOrcReader IS-A OrcReader (CRTP mixin), no wrapping needed
1479
6.56k
        std::unique_ptr<IcebergOrcReader> iceberg_reader = IcebergOrcReader::create_unique(
1480
6.56k
                _kv_cache, _profile, _state, *_params, range, _state->batch_size(),
1481
6.56k
                _state->timezone(), _io_ctx, file_meta_cache_ptr);
1482
6.56k
        iceberg_reader->set_create_row_id_column_iterator_func(
1483
6.56k
                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
1484
232
                    return _create_row_id_column_iterator();
1485
232
                });
1486
6.56k
        init_status = static_cast<GenericReader*>(iceberg_reader.get())->init_reader(&octx);
1487
1488
6.56k
        _cur_reader = std::move(iceberg_reader);
1489
19.7k
    } else if (range.__isset.table_format_params &&
1490
19.7k
               range.table_format_params.table_format_type == "paimon") {
1491
        // PaimonOrcReader IS-A OrcReader, no wrapping needed
1492
2.11k
        auto paimon_reader = PaimonOrcReader::create_unique(
1493
2.11k
                _profile, _state, *_params, range, _state->batch_size(), _state->timezone(),
1494
2.11k
                _kv_cache, _io_ctx, file_meta_cache_ptr);
1495
2.11k
        init_status = static_cast<GenericReader*>(paimon_reader.get())->init_reader(&octx);
1496
1497
2.11k
        _cur_reader = std::move(paimon_reader);
1498
17.6k
    } else if (range.__isset.table_format_params &&
1499
17.6k
               range.table_format_params.table_format_type == "hudi") {
1500
        // HudiOrcReader IS-A OrcReader, no wrapping needed
1501
0
        auto hudi_reader = HudiOrcReader::create_unique(_profile, _state, *_params, range,
1502
0
                                                        _state->batch_size(), _state->timezone(),
1503
0
                                                        _io_ctx, file_meta_cache_ptr);
1504
0
        init_status = static_cast<GenericReader*>(hudi_reader.get())->init_reader(&octx);
1505
1506
0
        _cur_reader = std::move(hudi_reader);
1507
17.6k
    } else if (range.__isset.table_format_params &&
1508
17.6k
               range.table_format_params.table_format_type == "hive") {
1509
15.3k
        auto hive_reader = HiveOrcReader::create_unique(
1510
15.3k
                _profile, _state, *_params, range, _state->batch_size(), _state->timezone(),
1511
15.3k
                _io_ctx, &_is_file_slot, file_meta_cache_ptr,
1512
15.3k
                _state->query_options().enable_orc_lazy_mat);
1513
15.3k
        hive_reader->set_create_row_id_column_iterator_func(
1514
15.3k
                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
1515
4.97k
                    return _create_row_id_column_iterator();
1516
4.97k
                });
1517
15.3k
        init_status = static_cast<GenericReader*>(hive_reader.get())->init_reader(&octx);
1518
1519
15.3k
        _cur_reader = std::move(hive_reader);
1520
15.3k
    } else if (range.__isset.table_format_params &&
1521
2.31k
               range.table_format_params.table_format_type == "tvf") {
1522
2.24k
        if (!orc_reader) {
1523
2.08k
            orc_reader = OrcReader::create_unique(
1524
2.08k
                    _profile, _state, *_params, range, _state->batch_size(), _state->timezone(),
1525
2.08k
                    _io_ctx, file_meta_cache_ptr, _state->query_options().enable_orc_lazy_mat);
1526
2.08k
        }
1527
2.24k
        orc_reader->set_create_row_id_column_iterator_func(
1528
2.24k
                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
1529
99
                    return _create_row_id_column_iterator();
1530
99
                });
1531
2.24k
        init_status = static_cast<GenericReader*>(orc_reader.get())->init_reader(&octx);
1532
2.24k
        _cur_reader = std::move(orc_reader);
1533
2.24k
    } else if (_is_load) {
1534
26
        if (!orc_reader) {
1535
26
            orc_reader = OrcReader::create_unique(
1536
26
                    _profile, _state, *_params, range, _state->batch_size(), _state->timezone(),
1537
26
                    _io_ctx, file_meta_cache_ptr, _state->query_options().enable_orc_lazy_mat);
1538
26
        }
1539
26
        init_status = static_cast<GenericReader*>(orc_reader.get())->init_reader(&octx);
1540
26
        _cur_reader = std::move(orc_reader);
1541
26
    }
1542
1543
26.6k
    return init_status;
1544
26.6k
}
1545
1546
77.4k
Status FileScanner::_set_fill_or_truncate_columns(bool need_to_get_parsed_schema) {
1547
77.4k
    _slot_lower_name_to_col_type.clear();
1548
1549
77.4k
    std::unordered_map<std::string, DataTypePtr> name_to_col_type;
1550
77.4k
    RETURN_IF_ERROR(_cur_reader->get_columns(&name_to_col_type));
1551
1552
2.34M
    for (const auto& [col_name, col_type] : name_to_col_type) {
1553
2.34M
        auto col_name_lower = to_lower(col_name);
1554
2.34M
        if (_partition_col_descs.contains(col_name_lower)) {
1555
            /*
1556
             * `_slot_lower_name_to_col_type` is used by `_init_src_block` and `_cast_to_input_block` during LOAD to
1557
             * generate columns of the corresponding type, which records the columns existing in the file.
1558
             *
1559
             * When a column in `COLUMNS FROM PATH` exists in a file column, the column type in the block will
1560
             * not match the slot type in `_output_tuple_desc`, causing an error when
1561
             * Serde `deserialize_one_cell_from_json` fills the partition values.
1562
             *
1563
             * So for partition column not need fill _slot_lower_name_to_col_type.
1564
             */
1565
5.50k
            continue;
1566
5.50k
        }
1567
2.33M
        _slot_lower_name_to_col_type.emplace(col_name_lower, col_type);
1568
2.33M
    }
1569
1570
77.4k
    RETURN_IF_ERROR(_generate_truncate_columns(need_to_get_parsed_schema));
1571
77.4k
    return Status::OK();
1572
77.4k
}
1573
1574
77.4k
Status FileScanner::_generate_truncate_columns(bool need_to_get_parsed_schema) {
1575
77.4k
    _source_file_col_name_types.clear();
1576
    //  The col names and types of source file, such as parquet, orc files.
1577
77.4k
    if (_state->query_options().truncate_char_or_varchar_columns && need_to_get_parsed_schema) {
1578
8
        std::vector<std::string> source_file_col_names;
1579
8
        std::vector<DataTypePtr> source_file_col_types;
1580
8
        Status status =
1581
8
                _cur_reader->get_parsed_schema(&source_file_col_names, &source_file_col_types);
1582
8
        if (!status.ok() && status.code() != TStatusCode::NOT_IMPLEMENTED_ERROR) {
1583
0
            return status;
1584
0
        }
1585
8
        DCHECK_EQ(source_file_col_names.size(), source_file_col_types.size());
1586
32
        for (int i = 0; i < source_file_col_names.size(); ++i) {
1587
24
            _source_file_col_name_types[to_lower(source_file_col_names[i])] =
1588
24
                    source_file_col_types[i];
1589
24
        }
1590
8
    }
1591
77.4k
    return Status::OK();
1592
77.4k
}
1593
1594
3.25k
Status FileScanner::prepare_for_read_lines(const TFileRangeDesc& range) {
1595
3.25k
    _current_range = range;
1596
1597
3.25k
    _file_cache_statistics.reset(new io::FileCacheStatistics());
1598
3.25k
    _file_reader_stats.reset(new io::FileReaderStats());
1599
1600
3.25k
    _file_read_bytes_counter =
1601
3.25k
            ADD_COUNTER_WITH_LEVEL(_profile, FileReadBytesProfile, TUnit::BYTES, 1);
1602
3.25k
    _file_read_time_counter = ADD_TIMER_WITH_LEVEL(_profile, FileReadTimeProfile, 1);
1603
1604
3.25k
    RETURN_IF_ERROR(_init_io_ctx());
1605
3.25k
    _io_ctx->file_cache_stats = _file_cache_statistics.get();
1606
3.25k
    _io_ctx->file_reader_stats = _file_reader_stats.get();
1607
3.25k
    _default_val_row_desc.reset(new RowDescriptor((TupleDescriptor*)_real_tuple_desc));
1608
3.25k
    RETURN_IF_ERROR(_init_expr_ctxes());
1609
1610
    // Since only one column is read from the file, there is no need to filter, so set these variables to empty.
1611
3.25k
    _push_down_conjuncts.clear();
1612
3.25k
    _not_single_slot_filter_conjuncts.clear();
1613
3.25k
    _slot_id_to_filter_conjuncts.clear();
1614
3.25k
    _kv_cache = nullptr;
1615
3.25k
    return Status::OK();
1616
3.25k
}
1617
1618
Status FileScanner::read_lines_from_range(const TFileRangeDesc& range,
1619
                                          const std::list<int64_t>& row_ids, Block* result_block,
1620
                                          const ExternalFileMappingInfo& external_info,
1621
3.25k
                                          int64_t* init_reader_ms, int64_t* get_block_ms) {
1622
3.25k
    _current_range = range;
1623
3.25k
    RETURN_IF_ERROR(_generate_partition_columns());
1624
1625
3.25k
    TFileFormatType::type format_type = _get_current_format_type();
1626
3.25k
    Status init_status = Status::OK();
1627
1628
3.25k
    auto file_meta_cache_ptr = external_info.enable_file_meta_cache
1629
3.25k
                                       ? ExecEnv::GetInstance()->file_meta_cache()
1630
3.25k
                                       : nullptr;
1631
1632
3.25k
    RETURN_IF_ERROR(scope_timer_run(
1633
3.25k
            [&]() -> Status {
1634
3.25k
                switch (format_type) {
1635
3.25k
                case TFileFormatType::FORMAT_PARQUET: {
1636
3.25k
                    std::unique_ptr<ParquetReader> parquet_reader = ParquetReader::create_unique(
1637
3.25k
                            _profile, *_params, range, 1, &_state->timezone_obj(), _io_ctx, _state,
1638
3.25k
                            file_meta_cache_ptr, false);
1639
3.25k
                    RETURN_IF_ERROR(
1640
3.25k
                            _init_parquet_reader(file_meta_cache_ptr, std::move(parquet_reader)));
1641
                    // _init_parquet_reader may create a new table-format specific reader
1642
                    // (e.g., HiveParquetReader) that replaces the original parquet_reader.
1643
                    // We need to re-apply read_by_rows to the actual _cur_reader.
1644
3.25k
                    RETURN_IF_ERROR(_cur_reader->read_by_rows(row_ids));
1645
3.25k
                    break;
1646
3.25k
                }
1647
3.25k
                case TFileFormatType::FORMAT_ORC: {
1648
3.25k
                    std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
1649
3.25k
                            _profile, _state, *_params, range, 1, _state->timezone(), _io_ctx,
1650
3.25k
                            file_meta_cache_ptr, false);
1651
3.25k
                    RETURN_IF_ERROR(_init_orc_reader(file_meta_cache_ptr, std::move(orc_reader)));
1652
                    // Same as above: re-apply read_by_rows to the actual _cur_reader.
1653
3.25k
                    RETURN_IF_ERROR(_cur_reader->read_by_rows(row_ids));
1654
3.25k
                    break;
1655
3.25k
                }
1656
3.25k
                default: {
1657
3.25k
                    return Status::NotSupported(
1658
3.25k
                            "Not support create lines reader for file format: {},"
1659
3.25k
                            "only support parquet and orc.",
1660
3.25k
                            to_string(_params->format_type));
1661
3.25k
                }
1662
3.25k
                }
1663
3.25k
                return Status::OK();
1664
3.25k
            },
1665
3.25k
            init_reader_ms));
1666
1667
3.25k
    RETURN_IF_ERROR(_set_fill_or_truncate_columns(true));
1668
3.25k
    _cur_reader_eof = false;
1669
1670
3.25k
    RETURN_IF_ERROR(scope_timer_run(
1671
3.25k
            [&]() -> Status {
1672
3.25k
                while (!_cur_reader_eof) {
1673
3.25k
                    bool eof = false;
1674
3.25k
                    RETURN_IF_ERROR(_get_block_impl(_state, result_block, &eof));
1675
3.25k
                }
1676
3.25k
                return Status::OK();
1677
3.25k
            },
1678
3.25k
            get_block_ms));
1679
1680
3.25k
    _cur_reader->collect_profile_before_close();
1681
3.25k
    RETURN_IF_ERROR(_cur_reader->close());
1682
1683
3.25k
    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
1684
3.25k
    COUNTER_UPDATE(_file_read_time_counter, _file_reader_stats->read_time_ns);
1685
3.25k
    return Status::OK();
1686
3.25k
}
1687
1688
13.1k
Status FileScanner::_generate_partition_columns() {
1689
13.1k
    _partition_col_descs.clear();
1690
13.1k
    _partition_value_is_null.clear();
1691
13.1k
    const TFileRangeDesc& range = _current_range;
1692
13.1k
    if (!range.__isset.columns_from_path_keys) {
1693
1.84k
        return Status::OK();
1694
1.84k
    }
1695
11.2k
    DORIS_CHECK(range.__isset.columns_from_path);
1696
11.2k
    DORIS_CHECK(range.columns_from_path.size() == range.columns_from_path_keys.size());
1697
11.2k
    const bool has_null_flags = range.__isset.columns_from_path_is_null;
1698
11.2k
    if (has_null_flags) {
1699
11.2k
        DORIS_CHECK(range.columns_from_path_is_null.size() == range.columns_from_path_keys.size());
1700
11.2k
    }
1701
1702
11.2k
    std::unordered_map<std::string, int> partition_name_to_key_index;
1703
11.2k
    int index = 0;
1704
19.5k
    for (const auto& key : range.columns_from_path_keys) {
1705
19.5k
        partition_name_to_key_index.emplace(key, index++);
1706
19.5k
    }
1707
1708
    // Iterate _column_descs to find PARTITION_KEY columns instead of _partition_slot_descs.
1709
97.5k
    for (const auto& col_desc : _column_descs) {
1710
97.5k
        if (col_desc.category != ColumnCategory::PARTITION_KEY) {
1711
80.0k
            continue;
1712
80.0k
        }
1713
17.5k
        auto pit = partition_name_to_key_index.find(col_desc.name);
1714
17.5k
        if (pit != partition_name_to_key_index.end()) {
1715
17.5k
            auto values_index = cast_set<size_t>(pit->second);
1716
17.5k
            _partition_col_descs.emplace(
1717
17.5k
                    col_desc.name,
1718
17.5k
                    std::make_tuple(range.columns_from_path[values_index], col_desc.slot_desc));
1719
17.5k
            _partition_value_is_null.emplace(
1720
17.5k
                    col_desc.name,
1721
17.5k
                    has_null_flags ? range.columns_from_path_is_null[values_index] : false);
1722
17.5k
        }
1723
17.5k
    }
1724
11.2k
    return Status::OK();
1725
13.1k
}
1726
1727
56.8k
Status FileScanner::_init_expr_ctxes() {
1728
56.8k
    std::map<SlotId, int> full_src_index_map;
1729
56.8k
    std::map<SlotId, SlotDescriptor*> full_src_slot_map;
1730
56.8k
    std::map<std::string, int> partition_name_to_key_index_map;
1731
56.8k
    int index = 0;
1732
343k
    for (const auto& slot_desc : _real_tuple_desc->slots()) {
1733
343k
        full_src_slot_map.emplace(slot_desc->id(), slot_desc);
1734
343k
        full_src_index_map.emplace(slot_desc->id(), index++);
1735
343k
    }
1736
1737
    // For external table query, find the index of column in path.
1738
    // Because query doesn't always search for all columns in a table
1739
    // and the order of selected columns is random.
1740
    // All ranges in _ranges vector should have identical columns_from_path_keys
1741
    // because they are all file splits for the same external table.
1742
    // So here use the first element of _ranges to fill the partition_name_to_key_index_map
1743
56.8k
    if (_current_range.__isset.columns_from_path_keys) {
1744
12.6k
        std::vector<std::string> key_map = _current_range.columns_from_path_keys;
1745
12.6k
        if (!key_map.empty()) {
1746
34.7k
            for (size_t i = 0; i < key_map.size(); i++) {
1747
22.0k
                partition_name_to_key_index_map.emplace(key_map[i], i);
1748
22.0k
            }
1749
12.6k
        }
1750
12.6k
    }
1751
1752
56.8k
    _num_of_columns_from_file = _params->num_of_columns_from_file;
1753
343k
    for (const auto& slot_info : _params->required_slots) {
1754
343k
        auto slot_id = slot_info.slot_id;
1755
343k
        auto it = full_src_slot_map.find(slot_id);
1756
343k
        if (it == std::end(full_src_slot_map)) {
1757
0
            return Status::InternalError(
1758
0
                    fmt::format("Unknown source slot descriptor, slot_id={}", slot_id));
1759
0
        }
1760
1761
343k
        ColumnDescriptor col_desc;
1762
343k
        col_desc.name = it->second->col_name();
1763
343k
        col_desc.slot_desc = it->second;
1764
1765
        // Read category from Thrift if available (new FE), otherwise fall back
1766
        // to slot_info.is_file_slot + partition_name_to_key_index_map for broker/stream load
1767
        // where the FE does not set TColumnCategory.
1768
343k
        if (slot_info.__isset.category) {
1769
285k
            switch (slot_info.category) {
1770
266k
            case TColumnCategory::REGULAR:
1771
266k
                col_desc.category = ColumnCategory::REGULAR;
1772
266k
                break;
1773
14.2k
            case TColumnCategory::PARTITION_KEY:
1774
14.2k
                col_desc.category = ColumnCategory::PARTITION_KEY;
1775
14.2k
                break;
1776
4.46k
            case TColumnCategory::SYNTHESIZED:
1777
4.46k
                col_desc.category = ColumnCategory::SYNTHESIZED;
1778
4.46k
                break;
1779
798
            case TColumnCategory::GENERATED:
1780
798
                col_desc.category = ColumnCategory::GENERATED;
1781
798
                break;
1782
285k
            }
1783
285k
        } else if (partition_name_to_key_index_map.contains(it->second->col_name()) &&
1784
57.7k
                   !slot_info.is_file_slot) {
1785
2.22k
            col_desc.category = ColumnCategory::PARTITION_KEY;
1786
2.22k
        }
1787
1788
        // Derive is_file_slot from category
1789
343k
        bool is_file_slot = (col_desc.category == ColumnCategory::REGULAR ||
1790
343k
                             col_desc.category == ColumnCategory::GENERATED);
1791
1792
343k
        if (partition_name_to_key_index_map.contains(it->second->col_name())) {
1793
16.2k
            if (_is_load) {
1794
11
                auto iti = full_src_index_map.find(slot_id);
1795
11
                _partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file);
1796
16.2k
            } else {
1797
16.2k
                auto kit = partition_name_to_key_index_map.find(it->second->col_name());
1798
16.2k
                _partition_slot_index_map.emplace(slot_id, kit->second);
1799
16.2k
            }
1800
16.2k
        }
1801
1802
343k
        if (is_file_slot) {
1803
322k
            _is_file_slot.emplace(slot_id);
1804
322k
            _file_slot_descs.emplace_back(it->second);
1805
322k
            _file_col_names.push_back(it->second->col_name());
1806
322k
        }
1807
1808
343k
        _column_descs.push_back(col_desc);
1809
343k
    }
1810
1811
    // set column name to default value expr map
1812
    // new inline TFileScanSlotInfo.default_value_expr (preferred)
1813
344k
    for (const auto& slot_info : _params->required_slots) {
1814
344k
        auto slot_id = slot_info.slot_id;
1815
344k
        auto it = full_src_slot_map.find(slot_id);
1816
344k
        if (it == std::end(full_src_slot_map)) {
1817
0
            continue;
1818
0
        }
1819
344k
        const std::string& col_name = it->second->col_name();
1820
1821
344k
        VExprContextSPtr ctx;
1822
344k
        bool has_default = false;
1823
1824
        // Prefer inline default_value_expr from TFileScanSlotInfo (new FE)
1825
344k
        if (slot_info.__isset.default_value_expr && !slot_info.default_value_expr.nodes.empty()) {
1826
307k
            RETURN_IF_ERROR(VExpr::create_expr_tree(slot_info.default_value_expr, ctx));
1827
307k
            RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc));
1828
307k
            RETURN_IF_ERROR(ctx->open(_state));
1829
307k
            has_default = true;
1830
307k
        } else if (slot_info.__isset.default_value_expr) {
1831
            // Empty nodes means null default (same as legacy empty TExpr)
1832
7.94k
            has_default = true;
1833
7.94k
        }
1834
1835
        // Fall back to legacy default_value_of_src_slot map for mixed-version FE/BE
1836
        // and callers that have not preserved inline default_value_expr yet.
1837
344k
        if (!has_default) {
1838
29.3k
            auto legacy_it = _params->default_value_of_src_slot.find(slot_id);
1839
29.3k
            if (legacy_it != std::end(_params->default_value_of_src_slot)) {
1840
24.5k
                if (!legacy_it->second.nodes.empty()) {
1841
0
                    RETURN_IF_ERROR(VExpr::create_expr_tree(legacy_it->second, ctx));
1842
0
                    RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc));
1843
0
                    RETURN_IF_ERROR(ctx->open(_state));
1844
0
                }
1845
24.5k
                has_default = true;
1846
24.5k
            }
1847
29.3k
        }
1848
1849
344k
        if (has_default) {
1850
            // if expr is empty, the default value will be null
1851
339k
            _col_default_value_ctx.emplace(col_name, ctx);
1852
339k
        }
1853
344k
    }
1854
1855
    // Populate default_expr in each ColumnDescriptor from _col_default_value_ctx.
1856
    // This makes default values available to readers via column_descs, eliminating the
1857
    // need for the separate _generate_missing_columns roundtrip.
1858
344k
    for (auto& col_desc : _column_descs) {
1859
344k
        auto it = _col_default_value_ctx.find(col_desc.name);
1860
344k
        if (it != _col_default_value_ctx.end()) {
1861
339k
            col_desc.default_expr = it->second;
1862
339k
        }
1863
344k
    }
1864
1865
56.6k
    if (_is_load) {
1866
        // follow desc expr map is only for load task.
1867
4.70k
        bool has_slot_id_map = _params->__isset.dest_sid_to_src_sid_without_trans;
1868
4.70k
        int idx = 0;
1869
41.7k
        for (auto* slot_desc : _output_tuple_desc->slots()) {
1870
41.7k
            auto it = _params->expr_of_dest_slot.find(slot_desc->id());
1871
41.7k
            if (it == std::end(_params->expr_of_dest_slot)) {
1872
0
                return Status::InternalError("No expr for dest slot, id={}, name={}",
1873
0
                                             slot_desc->id(), slot_desc->col_name());
1874
0
            }
1875
1876
41.7k
            VExprContextSPtr ctx;
1877
41.7k
            if (!it->second.nodes.empty()) {
1878
41.7k
                RETURN_IF_ERROR(VExpr::create_expr_tree(it->second, ctx));
1879
41.7k
                RETURN_IF_ERROR(ctx->prepare(_state, *_src_row_desc));
1880
41.7k
                RETURN_IF_ERROR(ctx->open(_state));
1881
41.7k
            }
1882
41.7k
            _dest_vexpr_ctx.emplace_back(ctx);
1883
41.7k
            _dest_slot_name_to_idx[slot_desc->col_name()] = idx++;
1884
1885
41.7k
            if (has_slot_id_map) {
1886
41.6k
                auto it1 = _params->dest_sid_to_src_sid_without_trans.find(slot_desc->id());
1887
41.6k
                if (it1 == std::end(_params->dest_sid_to_src_sid_without_trans)) {
1888
9.01k
                    _src_slot_descs_order_by_dest.emplace_back(nullptr);
1889
32.6k
                } else {
1890
32.6k
                    auto _src_slot_it = full_src_slot_map.find(it1->second);
1891
32.6k
                    if (_src_slot_it == std::end(full_src_slot_map)) {
1892
0
                        return Status::InternalError("No src slot {} in src slot descs",
1893
0
                                                     it1->second);
1894
0
                    }
1895
32.6k
                    _dest_slot_to_src_slot_index.emplace(_src_slot_descs_order_by_dest.size(),
1896
32.6k
                                                         full_src_index_map[_src_slot_it->first]);
1897
32.6k
                    _src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second);
1898
32.6k
                }
1899
41.6k
            }
1900
41.7k
        }
1901
4.70k
    }
1902
56.6k
    return Status::OK();
1903
56.6k
}
1904
1905
417k
bool FileScanner::_should_enable_condition_cache() {
1906
417k
    DCHECK(_should_enable_condition_cache_handler != nullptr);
1907
417k
    if (_condition_cache_digest == 0 || !(this->*_should_enable_condition_cache_handler)()) {
1908
78.8k
        return false;
1909
78.8k
    }
1910
1911
    // Condition cache starts as all-false and is turned true only by native readers when a
1912
    // row-level predicate leaves at least one row in the granule. COUNT pushdown may replace the
1913
    // native reader with CountReader, which only emits row counts and never runs that marking path.
1914
338k
    if (_get_push_down_agg_type() == TPushAggOp::type::COUNT) {
1915
14.1k
        return false;
1916
14.1k
    }
1917
1918
    // The cache is populated by native readers while evaluating pushed-down predicates.
1919
    // Scanner-only predicates cannot mark reader granules, so there is nothing useful to cache.
1920
324k
    if (_push_down_conjuncts.empty()) {
1921
260k
        return false;
1922
260k
    }
1923
1924
    // Runtime filters are query-local dynamic predicates. Some ready RF implementations can hash
1925
    // their payload into get_digest(), but FileScanner cannot rely on that for all RFs reaching the
1926
    // native reader. In particular, ScanLocalState computes _condition_cache_digest during open(),
1927
    // while FileScanner may append late-arrival RFs in _process_late_arrival_conjuncts()
1928
    // immediately before initializing Parquet/ORC readers.
1929
    //
1930
    // Reading a weaker cache entry would be safe by itself: if a cached bitmap only represented
1931
    // static predicate P, false granules for P are also false for P AND RF. The unsafe part is
1932
    // writing. On cache miss, native readers mark survivor granules using all pushed-down
1933
    // predicates, including late RFs. Without a read-only cache mode, this would insert a bitmap for
1934
    // P AND RF under a digest that only represents P.
1935
    //
1936
    // Example:
1937
    //   Q1 static predicate: k = 1, late RF payload: partition_key IN ('2024-02-01')
1938
    //   Q2 static predicate: k = 1, late RF payload: partition_key IN ('2024-03-01')
1939
    // If both scans share the same file/range/digest, reusing Q1's bitmap for Q2 can skip row
1940
    // ranges according to the wrong RF payload. Keep RF predicate pushdown enabled for reader-side
1941
    // filtering, but do not persist its result in condition cache.
1942
63.7k
    return !_contains_runtime_filter(_conjuncts) && !_contains_runtime_filter(_push_down_conjuncts);
1943
324k
}
1944
1945
0
bool FileScanner::_should_enable_condition_cache_for_load() const {
1946
0
    return false;
1947
0
}
1948
1949
339k
bool FileScanner::_should_enable_condition_cache_for_query() const {
1950
339k
    return true;
1951
339k
}
1952
1953
74.4k
bool FileScanner::_should_push_down_predicates(TFileFormatType::type format_type) const {
1954
74.4k
    DCHECK(_should_push_down_predicates_handler != nullptr);
1955
74.4k
    return (this->*_should_push_down_predicates_handler)(format_type);
1956
74.4k
}
1957
1958
4.71k
bool FileScanner::_should_push_down_predicates_for_load(TFileFormatType::type format_type) const {
1959
4.71k
    static_cast<void>(format_type);
1960
4.71k
    return false;
1961
4.71k
}
1962
1963
69.7k
bool FileScanner::_should_push_down_predicates_for_query(TFileFormatType::type format_type) const {
1964
    // JNI readers handle predicate conversion in their own paths.
1965
69.7k
    return format_type != TFileFormatType::FORMAT_JNI;
1966
69.7k
}
1967
1968
163k
void FileScanner::_init_reader_condition_cache() {
1969
163k
    _condition_cache = nullptr;
1970
163k
    _condition_cache_ctx = nullptr;
1971
1972
163k
    if (!_should_enable_condition_cache() || !_cur_reader) {
1973
146k
        return;
1974
146k
    }
1975
1976
    // Disable condition cache when delete operations exist (e.g. Iceberg position/equality
1977
    // deletes, Hive ACID deletes). Cached granule results may become stale if delete files
1978
    // change between queries while the data file's cache key remains the same.
1979
17.3k
    if (_cur_reader->has_delete_operations()) {
1980
2.19k
        return;
1981
2.19k
    }
1982
1983
15.1k
    auto* cache = segment_v2::ConditionCache::instance();
1984
15.1k
    _condition_cache_key = segment_v2::ConditionCache::ExternalCacheKey(
1985
15.1k
            _current_range.path,
1986
18.4E
            _current_range.__isset.modification_time ? _current_range.modification_time : 0,
1987
18.4E
            _current_range.__isset.file_size ? _current_range.file_size : -1,
1988
15.1k
            _condition_cache_digest,
1989
18.4E
            _current_range.__isset.start_offset ? _current_range.start_offset : 0,
1990
18.4E
            _current_range.__isset.size ? _current_range.size : -1);
1991
1992
15.1k
    segment_v2::ConditionCacheHandle handle;
1993
15.1k
    auto condition_cache_hit = cache->lookup(_condition_cache_key, &handle);
1994
15.1k
    if (condition_cache_hit) {
1995
6.67k
        _condition_cache = handle.get_filter_result();
1996
6.67k
        _condition_cache_hit_count++;
1997
8.44k
    } else {
1998
        // Allocate cache pre-sized to total number of granules.
1999
        // We add +1 as a safety margin: when a file is split across multiple scanners
2000
        // and the first row of this scanner's range is not aligned to a granule boundary,
2001
        // the data may span one more granule than ceil(total_rows / GRANULE_SIZE).
2002
        // The extra element costs only 1 bit and never affects correctness (an extra
2003
        // false-granule beyond the actual data range won't overlap any real row range).
2004
8.44k
        int64_t total_rows = _cur_reader->get_total_rows();
2005
8.44k
        if (total_rows > 0) {
2006
8.27k
            size_t num_granules = (total_rows + ConditionCacheContext::GRANULE_SIZE - 1) /
2007
8.27k
                                  ConditionCacheContext::GRANULE_SIZE;
2008
8.27k
            _condition_cache = std::make_shared<std::vector<bool>>(num_granules + 1, false);
2009
8.27k
        }
2010
8.44k
    }
2011
2012
15.1k
    if (_condition_cache) {
2013
        // Create context to pass to readers (native readers use it; non-native readers ignore it)
2014
14.9k
        _condition_cache_ctx = std::make_shared<ConditionCacheContext>();
2015
14.9k
        _condition_cache_ctx->is_hit = condition_cache_hit;
2016
14.9k
        _condition_cache_ctx->filter_result = _condition_cache;
2017
14.9k
        _cur_reader->set_condition_cache_context(_condition_cache_ctx);
2018
14.9k
    }
2019
15.1k
}
2020
2021
254k
void FileScanner::_finalize_reader_condition_cache() {
2022
254k
    if (!_should_enable_condition_cache() || !_condition_cache_ctx ||
2023
254k
        _condition_cache_ctx->is_hit) {
2024
245k
        _condition_cache = nullptr;
2025
245k
        _condition_cache_ctx = nullptr;
2026
245k
        return;
2027
245k
    }
2028
    // Only store the cache if the reader was fully consumed. If the scan was
2029
    // truncated early (e.g. by LIMIT), the cache is incomplete — unread granules
2030
    // would remain false and cause surviving rows to be incorrectly skipped on HIT.
2031
8.27k
    if (!_cur_reader_eof) {
2032
70
        _condition_cache = nullptr;
2033
70
        _condition_cache_ctx = nullptr;
2034
70
        return;
2035
70
    }
2036
2037
8.20k
    auto* cache = segment_v2::ConditionCache::instance();
2038
8.20k
    cache->insert(_condition_cache_key, std::move(_condition_cache));
2039
8.20k
    _condition_cache = nullptr;
2040
8.20k
    _condition_cache_ctx = nullptr;
2041
8.20k
}
2042
2043
90.2k
Status FileScanner::close(RuntimeState* state) {
2044
90.2k
    if (!_try_close()) {
2045
0
        return Status::OK();
2046
0
    }
2047
2048
90.2k
    _finalize_reader_condition_cache();
2049
2050
90.2k
    if (_cur_reader) {
2051
725
        RETURN_IF_ERROR(_cur_reader->close());
2052
725
    }
2053
2054
90.2k
    RETURN_IF_ERROR(Scanner::close(state));
2055
90.2k
    return Status::OK();
2056
90.2k
}
2057
2058
90.2k
void FileScanner::try_stop() {
2059
90.2k
    Scanner::try_stop();
2060
90.2k
    if (_io_ctx) {
2061
90.2k
        _io_ctx->should_stop = true;
2062
90.2k
    }
2063
90.2k
}
2064
2065
146k
void FileScanner::update_realtime_counters() {
2066
146k
    FileScanLocalState* local_state = static_cast<FileScanLocalState*>(_local_state);
2067
2068
146k
    COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes);
2069
146k
    COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows);
2070
2071
146k
    _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_rows(
2072
146k
            _file_reader_stats->read_rows);
2073
146k
    _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes(
2074
146k
            _file_reader_stats->read_bytes);
2075
2076
146k
    int64_t delta_bytes_read_from_local =
2077
146k
            _file_cache_statistics->bytes_read_from_local - _last_bytes_read_from_local;
2078
146k
    int64_t delta_bytes_read_from_remote =
2079
146k
            _file_cache_statistics->bytes_read_from_remote - _last_bytes_read_from_remote;
2080
146k
    if (_file_cache_statistics->bytes_read_from_local == 0 &&
2081
146k
        _file_cache_statistics->bytes_read_from_remote == 0) {
2082
141k
        _state->get_query_ctx()
2083
141k
                ->resource_ctx()
2084
141k
                ->io_context()
2085
141k
                ->update_scan_bytes_from_remote_storage(_file_reader_stats->read_bytes);
2086
141k
        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
2087
141k
                _file_reader_stats->read_bytes);
2088
141k
    } else {
2089
5.88k
        _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage(
2090
5.88k
                delta_bytes_read_from_local);
2091
5.88k
        _state->get_query_ctx()
2092
5.88k
                ->resource_ctx()
2093
5.88k
                ->io_context()
2094
5.88k
                ->update_scan_bytes_from_remote_storage(delta_bytes_read_from_remote);
2095
5.88k
        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
2096
5.88k
                delta_bytes_read_from_local);
2097
5.88k
        DorisMetrics::instance()->query_scan_bytes_from_remote->increment(
2098
5.88k
                delta_bytes_read_from_remote);
2099
5.88k
    }
2100
2101
146k
    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
2102
2103
146k
    DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes);
2104
146k
    DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows);
2105
2106
146k
    _file_reader_stats->read_bytes = 0;
2107
146k
    _file_reader_stats->read_rows = 0;
2108
2109
146k
    _last_bytes_read_from_local = _file_cache_statistics->bytes_read_from_local;
2110
146k
    _last_bytes_read_from_remote = _file_cache_statistics->bytes_read_from_remote;
2111
146k
}
2112
2113
90.1k
bool FileScanner::_should_update_load_counters() const {
2114
90.1k
    if (_is_load) {
2115
4.70k
        return true;
2116
4.70k
    }
2117
    // TVF based loads (e.g. http_stream, group commit relay) plan the load source as a
2118
    // tvf query scan without src tuple desc, so _is_load is false. But rows filtered by
2119
    // the load's WHERE clause still need to be reported as unselected rows. FILE_STREAM
2120
    // is only reachable from such load entries, never from normal queries, so use it to
2121
    // identify these scanners.
2122
85.4k
    return (_params->__isset.file_type && _params->file_type == TFileType::FILE_STREAM) ||
2123
85.4k
           (_current_range.__isset.file_type && _current_range.file_type == TFileType::FILE_STREAM);
2124
90.1k
}
2125
2126
90.0k
void FileScanner::_collect_profile_before_close() {
2127
90.0k
    Scanner::_collect_profile_before_close();
2128
90.1k
    if (config::enable_file_cache && _state->query_options().enable_file_cache &&
2129
90.0k
        _profile != nullptr) {
2130
1.37k
        io::FileCacheProfileReporter cache_profile(_profile);
2131
1.37k
        cache_profile.update(_file_cache_statistics.get());
2132
1.37k
        _state->get_query_ctx()->resource_ctx()->io_context()->update_bytes_write_into_cache(
2133
1.37k
                _file_cache_statistics->bytes_write_into_cache);
2134
1.37k
    }
2135
2136
90.0k
    if (_cur_reader != nullptr) {
2137
725
        _cur_reader->collect_profile_before_close();
2138
725
    }
2139
2140
90.0k
    FileScanLocalState* local_state = static_cast<FileScanLocalState*>(_local_state);
2141
90.0k
    COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes);
2142
90.0k
    COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows);
2143
2144
90.0k
    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
2145
90.0k
    COUNTER_UPDATE(_file_read_calls_counter, _file_reader_stats->read_calls);
2146
90.0k
    COUNTER_UPDATE(_file_read_time_counter, _file_reader_stats->read_time_ns);
2147
90.0k
    COUNTER_UPDATE(local_state->_condition_cache_hit_counter, _condition_cache_hit_count);
2148
90.1k
    if (_io_ctx) {
2149
90.1k
        COUNTER_UPDATE(local_state->_condition_cache_filtered_rows_counter,
2150
90.1k
                       _io_ctx->condition_cache_filtered_rows);
2151
90.1k
    }
2152
2153
90.0k
    DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes);
2154
90.0k
    DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows);
2155
90.0k
}
2156
2157
} // namespace doris