Coverage Report

Created: 2026-04-08 13:21

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/file_scanner.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/file_scanner.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Exprs_types.h>
22
#include <gen_cpp/Metrics_types.h>
23
#include <gen_cpp/Opcodes_types.h>
24
#include <gen_cpp/PaloInternalService_types.h>
25
#include <gen_cpp/PlanNodes_types.h>
26
#include <glog/logging.h>
27
28
#include <algorithm>
29
#include <boost/iterator/iterator_facade.hpp>
30
#include <map>
31
#include <ranges>
32
#include <tuple>
33
#include <unordered_map>
34
#include <utility>
35
36
#include "common/compiler_util.h" // IWYU pragma: keep
37
#include "common/config.h"
38
#include "common/consts.h"
39
#include "common/logging.h"
40
#include "common/status.h"
41
#include "core/block/column_with_type_and_name.h"
42
#include "core/block/columns_with_type_and_name.h"
43
#include "core/column/column.h"
44
#include "core/column/column_nullable.h"
45
#include "core/column/column_vector.h"
46
#include "core/data_type/data_type.h"
47
#include "core/data_type/data_type_nullable.h"
48
#include "core/data_type/data_type_string.h"
49
#include "core/string_ref.h"
50
#include "exec/common/stringop_substring.h"
51
#include "exec/rowid_fetcher.h"
52
#include "exec/scan/scan_node.h"
53
#include "exprs/aggregate/aggregate_function.h"
54
#include "exprs/function/function.h"
55
#include "exprs/function/simple_function_factory.h"
56
#include "exprs/vexpr.h"
57
#include "exprs/vexpr_context.h"
58
#include "exprs/vexpr_fwd.h"
59
#include "exprs/vslot_ref.h"
60
#include "format/arrow/arrow_stream_reader.h"
61
#include "format/csv/csv_reader.h"
62
#include "format/json/new_json_reader.h"
63
#include "format/native/native_reader.h"
64
#include "format/orc/vorc_reader.h"
65
#include "format/parquet/vparquet_reader.h"
66
#include "format/table/hive_reader.h"
67
#include "format/table/hudi_jni_reader.h"
68
#include "format/table/hudi_reader.h"
69
#include "format/table/iceberg_reader.h"
70
#include "format/table/iceberg_sys_table_jni_reader.h"
71
#include "format/table/jdbc_jni_reader.h"
72
#include "format/table/max_compute_jni_reader.h"
73
#include "format/table/paimon_cpp_reader.h"
74
#include "format/table/paimon_jni_reader.h"
75
#include "format/table/paimon_predicate_converter.h"
76
#include "format/table/paimon_reader.h"
77
#include "format/table/remote_doris_reader.h"
78
#include "format/table/transactional_hive_reader.h"
79
#include "format/table/trino_connector_jni_reader.h"
80
#include "format/text/text_reader.h"
81
#include "io/cache/block_file_cache_profile.h"
82
#include "load/group_commit/wal/wal_reader.h"
83
#include "runtime/descriptors.h"
84
#include "runtime/runtime_profile.h"
85
#include "runtime/runtime_state.h"
86
87
namespace cctz {
88
class time_zone;
89
} // namespace cctz
90
namespace doris {
91
class ShardedKVCache;
92
} // namespace doris
93
94
namespace doris {
95
#include "common/compile_check_begin.h"
96
using namespace ErrorCode;
97
98
const std::string FileScanner::FileReadBytesProfile = "FileReadBytes";
99
const std::string FileScanner::FileReadTimeProfile = "FileReadTime";
100
101
FileScanner::FileScanner(RuntimeState* state, FileScanLocalState* local_state, int64_t limit,
102
                         std::shared_ptr<SplitSourceConnector> split_source,
103
                         RuntimeProfile* profile, ShardedKVCache* kv_cache,
104
                         const std::unordered_map<std::string, int>* colname_to_slot_id)
105
84.2k
        : Scanner(state, local_state, limit, profile),
106
84.2k
          _split_source(split_source),
107
84.2k
          _cur_reader(nullptr),
108
84.2k
          _cur_reader_eof(false),
109
84.2k
          _kv_cache(kv_cache),
110
84.2k
          _strict_mode(false),
111
84.2k
          _col_name_to_slot_id(colname_to_slot_id) {
112
84.2k
    if (state->get_query_ctx() != nullptr &&
113
84.2k
        state->get_query_ctx()->file_scan_range_params_map.count(local_state->parent_id()) > 0) {
114
83.8k
        _params = &(state->get_query_ctx()->file_scan_range_params_map[local_state->parent_id()]);
115
83.8k
    } else {
116
        // old fe thrift protocol
117
338
        _params = _split_source->get_params();
118
338
    }
119
84.2k
    if (_params->__isset.strict_mode) {
120
378
        _strict_mode = _params->strict_mode;
121
378
    }
122
123
    // For load scanner, there are input and output tuple.
124
    // For query scanner, there is only output tuple
125
84.2k
    _input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_params->src_tuple_id);
126
84.2k
    _real_tuple_desc = _input_tuple_desc == nullptr ? _output_tuple_desc : _input_tuple_desc;
127
84.2k
    _is_load = (_input_tuple_desc != nullptr);
128
84.2k
}
129
130
84.0k
Status FileScanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) {
131
84.0k
    RETURN_IF_ERROR(Scanner::init(state, conjuncts));
132
84.0k
    _get_block_timer =
133
84.0k
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerGetBlockTime", 1);
134
84.0k
    _cast_to_input_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
135
84.0k
                                                      "FileScannerCastInputBlockTime", 1);
136
84.0k
    _fill_missing_columns_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
137
84.0k
                                                       "FileScannerFillMissingColumnTime", 1);
138
84.0k
    _pre_filter_timer =
139
84.0k
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerPreFilterTimer", 1);
140
84.0k
    _convert_to_output_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
141
84.0k
                                                          "FileScannerConvertOuputBlockTime", 1);
142
84.0k
    _runtime_filter_partition_prune_timer = ADD_TIMER_WITH_LEVEL(
143
84.0k
            _local_state->scanner_profile(), "FileScannerRuntimeFilterPartitionPruningTime", 1);
144
84.0k
    _empty_file_counter =
145
84.0k
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "EmptyFileNum", TUnit::UNIT, 1);
146
84.0k
    _not_found_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
147
84.0k
                                                     "NotFoundFileNum", TUnit::UNIT, 1);
148
84.0k
    _fully_skipped_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
149
84.0k
                                                         "FullySkippedFileNum", TUnit::UNIT, 1);
150
84.0k
    _file_counter =
151
84.0k
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT, 1);
152
153
84.0k
    _file_read_bytes_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
154
84.0k
                                                      FileReadBytesProfile, TUnit::BYTES, 1);
155
84.0k
    _file_read_calls_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
156
84.0k
                                                      "FileReadCalls", TUnit::UNIT, 1);
157
84.0k
    _file_read_time_counter =
158
84.0k
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), FileReadTimeProfile, 1);
159
160
84.0k
    _runtime_filter_partition_pruned_range_counter =
161
84.0k
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
162
84.0k
                                   "RuntimeFilterPartitionPrunedRangeNum", TUnit::UNIT, 1);
163
    // Keep the current file's adaptive state while also preserving the peak value across all
164
    // files handled by this scanner instance.
165
84.0k
    _adaptive_batch_predicted_rows_counter =
166
84.0k
            _local_state->scanner_profile()->AddHighWaterMarkCounter(
167
84.0k
                    "AdaptiveBatchPredictedRows", TUnit::UNIT, RuntimeProfile::ROOT_COUNTER, 1);
168
84.0k
    _adaptive_batch_actual_bytes_before_truncate_counter =
169
84.0k
            _local_state->scanner_profile()->AddHighWaterMarkCounter(
170
84.0k
                    "AdaptiveBatchActualBytesBeforeTruncate", TUnit::BYTES,
171
84.0k
                    RuntimeProfile::ROOT_COUNTER, 1);
172
84.0k
    _adaptive_batch_actual_bytes_after_truncate_counter =
173
84.0k
            _local_state->scanner_profile()->AddHighWaterMarkCounter(
174
84.0k
                    "AdaptiveBatchActualBytesAfterTruncate", TUnit::BYTES,
175
84.0k
                    RuntimeProfile::ROOT_COUNTER, 1);
176
84.0k
    _adaptive_batch_probe_count_counter = ADD_COUNTER_WITH_LEVEL(
177
84.0k
            _local_state->scanner_profile(), "AdaptiveBatchProbeCount", TUnit::UNIT, 1);
178
179
84.0k
    _file_cache_statistics.reset(new io::FileCacheStatistics());
180
84.0k
    _file_reader_stats.reset(new io::FileReaderStats());
181
182
84.0k
    RETURN_IF_ERROR(_init_io_ctx());
183
84.0k
    _io_ctx->file_cache_stats = _file_cache_statistics.get();
184
84.0k
    _io_ctx->file_reader_stats = _file_reader_stats.get();
185
84.0k
    _io_ctx->is_disposable = _state->query_options().disable_file_cache;
186
187
84.0k
    if (_is_load) {
188
378
        _src_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
189
378
                                              std::vector<TupleId>({_input_tuple_desc->id()})));
190
        // prepare pre filters
191
378
        if (_params->__isset.pre_filter_exprs_list) {
192
0
            RETURN_IF_ERROR(doris::VExpr::create_expr_trees(_params->pre_filter_exprs_list,
193
0
                                                            _pre_conjunct_ctxs));
194
378
        } else if (_params->__isset.pre_filter_exprs) {
195
0
            VExprContextSPtr context;
196
0
            RETURN_IF_ERROR(doris::VExpr::create_expr_tree(_params->pre_filter_exprs, context));
197
0
            _pre_conjunct_ctxs.emplace_back(context);
198
0
        }
199
200
378
        for (auto& conjunct : _pre_conjunct_ctxs) {
201
0
            RETURN_IF_ERROR(conjunct->prepare(_state, *_src_row_desc));
202
0
            RETURN_IF_ERROR(conjunct->open(_state));
203
0
        }
204
205
378
        _dest_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
206
378
                                               std::vector<TupleId>({_output_tuple_desc->id()})));
207
378
    }
208
209
84.0k
    _default_val_row_desc.reset(
210
84.0k
            new RowDescriptor(_state->desc_tbl(), std::vector<TupleId>({_real_tuple_desc->id()})));
211
212
84.0k
    return Status::OK();
213
84.0k
}
214
215
68.6k
bool FileScanner::_should_enable_adaptive_batch_size(TFileFormatType::type format_type) const {
216
    // Only enable for readers that support set_batch_size().
217
    // Table-format wrappers are covered because they delegate to native readers.
218
68.6k
    if (!config::enable_adaptive_batch_size || _state->preferred_block_size_bytes() <= 0) {
219
0
        return false;
220
0
    }
221
68.6k
    switch (format_type) {
222
32.5k
    case TFileFormatType::FORMAT_PARQUET:
223
56.9k
    case TFileFormatType::FORMAT_ORC:
224
57.7k
    case TFileFormatType::FORMAT_CSV_PLAIN:
225
57.7k
    case TFileFormatType::FORMAT_CSV_GZ:
226
57.7k
    case TFileFormatType::FORMAT_CSV_BZ2:
227
57.7k
    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
228
57.7k
    case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
229
57.7k
    case TFileFormatType::FORMAT_CSV_LZOP:
230
57.7k
    case TFileFormatType::FORMAT_CSV_DEFLATE:
231
57.7k
    case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
232
57.7k
    case TFileFormatType::FORMAT_PROTO:
233
62.4k
    case TFileFormatType::FORMAT_TEXT:
234
62.8k
    case TFileFormatType::FORMAT_JSON:
235
62.8k
        return true;
236
5.77k
    default:
237
5.77k
        return false;
238
68.6k
    }
239
68.6k
}
240
241
382k
bool FileScanner::_should_run_adaptive_batch_size() const {
242
382k
    return _block_size_predictor != nullptr && _get_push_down_agg_type() != TPushAggOp::type::COUNT;
243
382k
}
244
245
221k
void FileScanner::_reset_adaptive_batch_size_state() {
246
221k
    _block_size_predictor.reset();
247
221k
    _adaptive_batch_output_column_ids.clear();
248
221k
    COUNTER_SET(_adaptive_batch_predicted_rows_counter, int64_t(0));
249
221k
    COUNTER_SET(_adaptive_batch_actual_bytes_before_truncate_counter, int64_t(0));
250
221k
    COUNTER_SET(_adaptive_batch_actual_bytes_after_truncate_counter, int64_t(0));
251
221k
}
252
253
68.5k
void FileScanner::_init_adaptive_batch_size_state(TFileFormatType::type format_type) {
254
68.5k
    _reset_adaptive_batch_size_state();
255
68.5k
    if (!_should_enable_adaptive_batch_size(format_type)) {
256
5.77k
        return;
257
5.77k
    }
258
259
    // External file readers do not provide reliable memory-size metadata hints. Use a small probe
260
    // batch so the predictor can learn from real FileScanner output quickly.
261
62.7k
    _block_size_predictor = std::make_unique<AdaptiveBlockSizePredictor>(
262
62.7k
            _state->preferred_block_size_bytes(),
263
62.7k
            _state->preferred_max_column_in_block_size_bytes(), 0.0,
264
62.7k
            std::unordered_map<ColumnId, double> {}, ADAPTIVE_BATCH_INITIAL_PROBE_ROWS,
265
62.7k
            _state->preferred_block_size_rows());
266
62.7k
}
267
268
165k
size_t FileScanner::_predict_reader_batch_rows() {
269
165k
    DCHECK(_block_size_predictor != nullptr);
270
165k
    size_t predicted_rows = _block_size_predictor->predict_next_rows();
271
165k
    COUNTER_SET(_adaptive_batch_predicted_rows_counter, static_cast<int64_t>(predicted_rows));
272
165k
    return predicted_rows;
273
165k
}
274
275
74.4k
void FileScanner::_ensure_adaptive_batch_output_column_ids(const Block& block) {
276
74.4k
    if (!_adaptive_batch_output_column_ids.empty()) {
277
28.2k
        DCHECK_EQ(_adaptive_batch_output_column_ids.size(), block.columns());
278
28.2k
        return;
279
28.2k
    }
280
281
46.1k
    _adaptive_batch_output_column_ids.reserve(block.columns());
282
424k
    for (size_t i = 0; i < block.columns(); ++i) {
283
378k
        _adaptive_batch_output_column_ids.push_back(static_cast<ColumnId>(i));
284
378k
    }
285
46.1k
}
286
287
94.3k
void FileScanner::_update_adaptive_batch_size_before_truncate(const Block& block) {
288
94.3k
    if (!_should_run_adaptive_batch_size()) {
289
15.3k
        return;
290
15.3k
    }
291
292
    // Learn from the logical bytes before CHAR/VARCHAR truncation. The truncated block can be
293
    // much smaller than the data the reader and FileScanner have already materialized.
294
79.0k
    COUNTER_SET(_adaptive_batch_actual_bytes_before_truncate_counter,
295
79.0k
                static_cast<int64_t>(block.bytes()));
296
79.0k
    if (block.rows() == 0) {
297
4.52k
        return;
298
4.52k
    }
299
300
74.4k
    _ensure_adaptive_batch_output_column_ids(block);
301
    // Count a probe only when we actually obtain the first non-empty sample that seeds history.
302
74.4k
    if (!_block_size_predictor->has_history()) {
303
46.1k
        COUNTER_UPDATE(_adaptive_batch_probe_count_counter, 1);
304
46.1k
    }
305
74.4k
    _block_size_predictor->update(block, _adaptive_batch_output_column_ids);
306
74.4k
}
307
308
94.3k
void FileScanner::_update_adaptive_batch_size_after_truncate(const Block& block) {
309
94.3k
    if (!_should_run_adaptive_batch_size()) {
310
15.3k
        return;
311
15.3k
    }
312
313
    // Keep the post-truncate size only for observability. It should not affect the next batch
314
    // because truncation happens after the upstream memory cost has already been paid.
315
78.9k
    COUNTER_SET(_adaptive_batch_actual_bytes_after_truncate_counter,
316
78.9k
                static_cast<int64_t>(block.bytes()));
317
78.9k
}
318
319
// check if the expr is a partition pruning expr
320
27.5k
bool FileScanner::_check_partition_prune_expr(const VExprSPtr& expr) {
321
27.5k
    if (expr->is_slot_ref()) {
322
10.8k
        auto* slot_ref = static_cast<VSlotRef*>(expr.get());
323
10.8k
        return _partition_slot_index_map.find(slot_ref->slot_id()) !=
324
10.8k
               _partition_slot_index_map.end();
325
10.8k
    }
326
16.6k
    if (expr->is_literal()) {
327
5.15k
        return true;
328
5.15k
    }
329
16.8k
    return std::ranges::all_of(expr->children(), [this](const auto& child) {
330
16.8k
        return _check_partition_prune_expr(child);
331
16.8k
    });
332
16.6k
}
333
334
10.4k
void FileScanner::_init_runtime_filter_partition_prune_ctxs() {
335
10.4k
    _runtime_filter_partition_prune_ctxs.clear();
336
10.7k
    for (auto& conjunct : _conjuncts) {
337
10.7k
        auto impl = conjunct->root()->get_impl();
338
        // If impl is not null, which means this a conjuncts from runtime filter.
339
10.7k
        auto expr = impl ? impl : conjunct->root();
340
10.7k
        if (_check_partition_prune_expr(expr)) {
341
7.59k
            _runtime_filter_partition_prune_ctxs.emplace_back(conjunct);
342
7.59k
        }
343
10.7k
    }
344
10.4k
}
345
346
6.76k
void FileScanner::_init_runtime_filter_partition_prune_block() {
347
    // init block with empty column
348
65.0k
    for (auto const* slot_desc : _real_tuple_desc->slots()) {
349
65.0k
        _runtime_filter_partition_prune_block.insert(
350
65.0k
                ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
351
65.0k
                                      slot_desc->get_data_type_ptr(), slot_desc->col_name()));
352
65.0k
    }
353
6.76k
}
354
355
7.34k
Status FileScanner::_process_runtime_filters_partition_prune(bool& can_filter_all) {
356
7.34k
    SCOPED_TIMER(_runtime_filter_partition_prune_timer);
357
7.34k
    if (_runtime_filter_partition_prune_ctxs.empty() || _partition_col_descs.empty()) {
358
4.77k
        return Status::OK();
359
4.77k
    }
360
2.57k
    size_t partition_value_column_size = 1;
361
362
    // 1. Get partition key values to string columns.
363
2.57k
    std::unordered_map<SlotId, MutableColumnPtr> partition_slot_id_to_column;
364
3.40k
    for (auto const& partition_col_desc : _partition_col_descs) {
365
3.40k
        const auto& [partition_value, partition_slot_desc] = partition_col_desc.second;
366
3.40k
        auto data_type = partition_slot_desc->get_data_type_ptr();
367
3.40k
        auto test_serde = data_type->get_serde();
368
3.40k
        auto partition_value_column = data_type->create_column();
369
3.40k
        auto* col_ptr = static_cast<IColumn*>(partition_value_column.get());
370
3.40k
        Slice slice(partition_value.data(), partition_value.size());
371
3.40k
        uint64_t num_deserialized = 0;
372
3.40k
        DataTypeSerDe::FormatOptions options {};
373
3.40k
        if (_partition_value_is_null.contains(partition_slot_desc->col_name())) {
374
            // for iceberg/paimon table
375
            // NOTICE: column is always be nullable for iceberg/paimon table now
376
1.24k
            DCHECK(data_type->is_nullable());
377
1.24k
            test_serde = test_serde->get_nested_serdes()[0];
378
1.24k
            auto* null_column = assert_cast<ColumnNullable*>(col_ptr);
379
1.24k
            if (_partition_value_is_null[partition_slot_desc->col_name()]) {
380
182
                null_column->insert_many_defaults(partition_value_column_size);
381
1.06k
            } else {
382
                // If the partition value is not null, we set null map to 0 and deserialize it normally.
383
1.06k
                null_column->get_null_map_column().insert_many_vals(0, partition_value_column_size);
384
1.06k
                RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
385
1.06k
                        null_column->get_nested_column(), slice, partition_value_column_size,
386
1.06k
                        &num_deserialized, options));
387
1.06k
            }
388
2.15k
        } else {
389
            // for hive/hudi table, the null value is set as "\\N"
390
            // TODO: this will be unified as iceberg/paimon table in the future
391
2.15k
            RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
392
2.15k
                    *col_ptr, slice, partition_value_column_size, &num_deserialized, options));
393
2.15k
        }
394
395
3.40k
        partition_slot_id_to_column[partition_slot_desc->id()] = std::move(partition_value_column);
396
3.40k
    }
397
398
    // 2. Fill _runtime_filter_partition_prune_block from the partition column, then execute conjuncts and filter block.
399
    // 2.1 Fill _runtime_filter_partition_prune_block from the partition column to match the conjuncts executing.
400
2.57k
    size_t index = 0;
401
2.57k
    bool first_column_filled = false;
402
6.24k
    for (auto const* slot_desc : _real_tuple_desc->slots()) {
403
6.24k
        if (partition_slot_id_to_column.find(slot_desc->id()) !=
404
6.24k
            partition_slot_id_to_column.end()) {
405
3.40k
            auto data_type = slot_desc->get_data_type_ptr();
406
3.40k
            auto partition_value_column = std::move(partition_slot_id_to_column[slot_desc->id()]);
407
3.40k
            if (data_type->is_nullable()) {
408
3.40k
                _runtime_filter_partition_prune_block.insert(
409
3.40k
                        index, ColumnWithTypeAndName(
410
3.40k
                                       ColumnNullable::create(
411
3.40k
                                               std::move(partition_value_column),
412
3.40k
                                               ColumnUInt8::create(partition_value_column_size, 0)),
413
3.40k
                                       data_type, slot_desc->col_name()));
414
3.40k
            } else {
415
4
                _runtime_filter_partition_prune_block.insert(
416
4
                        index, ColumnWithTypeAndName(std::move(partition_value_column), data_type,
417
4
                                                     slot_desc->col_name()));
418
4
            }
419
3.40k
            if (index == 0) {
420
1.18k
                first_column_filled = true;
421
1.18k
            }
422
3.40k
        }
423
6.24k
        index++;
424
6.24k
    }
425
426
    // 2.2 Execute conjuncts.
427
2.57k
    if (!first_column_filled) {
428
        // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
429
        // The following process may be tricky and time-consuming, but we have no other way.
430
1.39k
        _runtime_filter_partition_prune_block.get_by_position(0).column->assume_mutable()->resize(
431
1.39k
                partition_value_column_size);
432
1.39k
    }
433
2.57k
    IColumn::Filter result_filter(_runtime_filter_partition_prune_block.rows(), 1);
434
2.57k
    RETURN_IF_ERROR(VExprContext::execute_conjuncts(_runtime_filter_partition_prune_ctxs, nullptr,
435
2.57k
                                                    &_runtime_filter_partition_prune_block,
436
2.57k
                                                    &result_filter, &can_filter_all));
437
2.57k
    return Status::OK();
438
2.57k
}
439
440
20.5k
Status FileScanner::_process_conjuncts() {
441
20.5k
    _slot_id_to_filter_conjuncts.clear();
442
20.5k
    _not_single_slot_filter_conjuncts.clear();
443
28.4k
    for (auto& conjunct : _push_down_conjuncts) {
444
28.4k
        auto impl = conjunct->root()->get_impl();
445
        // If impl is not null, which means this a conjuncts from runtime filter.
446
28.4k
        auto cur_expr = impl ? impl : conjunct->root();
447
448
28.4k
        std::vector<int> slot_ids;
449
28.4k
        _get_slot_ids(cur_expr.get(), &slot_ids);
450
28.4k
        if (slot_ids.empty()) {
451
1
            _not_single_slot_filter_conjuncts.emplace_back(conjunct);
452
1
            continue;
453
1
        }
454
28.4k
        bool single_slot = true;
455
28.5k
        for (int i = 1; i < slot_ids.size(); i++) {
456
880
            if (slot_ids[i] != slot_ids[0]) {
457
772
                single_slot = false;
458
772
                break;
459
772
            }
460
880
        }
461
28.4k
        if (single_slot) {
462
27.6k
            SlotId slot_id = slot_ids[0];
463
27.6k
            _slot_id_to_filter_conjuncts[slot_id].emplace_back(conjunct);
464
27.6k
        } else {
465
798
            _not_single_slot_filter_conjuncts.emplace_back(conjunct);
466
798
        }
467
28.4k
    }
468
20.5k
    return Status::OK();
469
20.5k
}
470
471
57.0k
Status FileScanner::_process_late_arrival_conjuncts() {
472
57.0k
    if (_push_down_conjuncts.size() < _conjuncts.size()) {
473
20.5k
        _push_down_conjuncts = _conjuncts;
474
        // Do not clear _conjuncts here!
475
        // We must keep it for fallback filtering, especially when mixing
476
        // Native readers (which use _push_down_conjuncts) and JNI readers (which rely on _conjuncts).
477
        // _conjuncts.clear();
478
20.5k
        RETURN_IF_ERROR(_process_conjuncts());
479
20.5k
    }
480
57.0k
    if (_applied_rf_num == _total_rf_num) {
481
54.8k
        _local_state->scanner_profile()->add_info_string("ApplyAllRuntimeFilters", "True");
482
54.8k
    }
483
57.0k
    return Status::OK();
484
57.0k
}
485
486
60.5k
void FileScanner::_get_slot_ids(VExpr* expr, std::vector<int>* slot_ids) {
487
61.4k
    for (auto& child_expr : expr->children()) {
488
61.4k
        if (child_expr->is_slot_ref()) {
489
29.3k
            VSlotRef* slot_ref = reinterpret_cast<VSlotRef*>(child_expr.get());
490
29.3k
            SlotDescriptor* slot_desc = _state->desc_tbl().get_slot_descriptor(slot_ref->slot_id());
491
29.3k
            slot_desc->set_is_predicate(true);
492
29.3k
            slot_ids->emplace_back(slot_ref->slot_id());
493
32.1k
        } else {
494
32.1k
            _get_slot_ids(child_expr.get(), slot_ids);
495
32.1k
        }
496
61.4k
    }
497
60.5k
}
498
499
84.3k
Status FileScanner::_open_impl(RuntimeState* state) {
500
84.3k
    RETURN_IF_CANCELLED(state);
501
84.3k
    RETURN_IF_ERROR(Scanner::_open_impl(state));
502
84.3k
    if (_local_state) {
503
84.2k
        _condition_cache_digest = _local_state->get_condition_cache_digest();
504
84.2k
    }
505
84.3k
    RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range));
506
84.3k
    if (_first_scan_range) {
507
47.6k
        RETURN_IF_ERROR(_init_expr_ctxes());
508
47.6k
        if (_state->query_options().enable_runtime_filter_partition_prune &&
509
47.6k
            !_partition_slot_index_map.empty()) {
510
6.77k
            _init_runtime_filter_partition_prune_ctxs();
511
6.77k
            _init_runtime_filter_partition_prune_block();
512
6.77k
        }
513
47.6k
    } else {
514
        // there's no scan range in split source. stop scanner directly.
515
36.6k
        _scanner_eof = true;
516
36.6k
    }
517
518
84.3k
    return Status::OK();
519
84.3k
}
520
521
277k
Status FileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
522
277k
    Status st = _get_block_wrapped(state, block, eof);
523
524
277k
    if (!st.ok()) {
525
        // add cur path in error msg for easy debugging
526
59
        return std::move(st.append(". cur path: " + get_current_scan_range_name()));
527
59
    }
528
277k
    return st;
529
277k
}
530
531
// For query:
532
//                              [exist cols]  [non-exist cols]  [col from path]  input  output
533
//                              A     B    C  D                 E
534
// _init_src_block              x     x    x  x                 x                -      x
535
// get_next_block               x     x    x  -                 -                -      x
536
// _cast_to_input_block         -     -    -  -                 -                -      -
537
// _fill_columns_from_path      -     -    -  -                 x                -      x
538
// _fill_missing_columns        -     -    -  x                 -                -      x
539
// _convert_to_output_block     -     -    -  -                 -                -      -
540
//
541
// For load:
542
//                              [exist cols]  [non-exist cols]  [col from path]  input  output
543
//                              A     B    C  D                 E
544
// _init_src_block              x     x    x  x                 x                x      -
545
// get_next_block               x     x    x  -                 -                x      -
546
// _cast_to_input_block         x     x    x  -                 -                x      -
547
// _fill_columns_from_path      -     -    -  -                 x                x      -
548
// _fill_missing_columns        -     -    -  x                 -                x      -
549
// _convert_to_output_block     -     -    -  -                 -                -      x
550
277k
Status FileScanner::_get_block_wrapped(RuntimeState* state, Block* block, bool* eof) {
551
277k
    do {
552
277k
        RETURN_IF_CANCELLED(state);
553
277k
        if (_cur_reader == nullptr || _cur_reader_eof) {
554
152k
            _finalize_reader_condition_cache();
555
            // The file may not exist because the file list is got from meta cache,
556
            // And the file may already be removed from storage.
557
            // Just ignore not found files.
558
152k
            Status st = _get_next_reader();
559
152k
            if (st.is<ErrorCode::NOT_FOUND>() && config::ignore_not_found_file_in_external_table) {
560
0
                _cur_reader_eof = true;
561
0
                COUNTER_UPDATE(_not_found_file_counter, 1);
562
0
                continue;
563
152k
            } else if (st.is<ErrorCode::END_OF_FILE>()) {
564
12
                _cur_reader_eof = true;
565
12
                COUNTER_UPDATE(_fully_skipped_file_counter, 1);
566
12
                continue;
567
152k
            } else if (!st) {
568
3
                return st;
569
3
            }
570
152k
            _init_reader_condition_cache();
571
152k
        }
572
573
277k
        if (_scanner_eof) {
574
83.6k
            *eof = true;
575
83.6k
            return Status::OK();
576
83.6k
        }
577
578
        // Init src block for load job based on the data file schema (e.g. parquet)
579
        // For query job, simply set _src_block_ptr to block.
580
194k
        size_t read_rows = 0;
581
194k
        RETURN_IF_ERROR(_init_src_block(block));
582
194k
        if (_should_run_adaptive_batch_size()) {
583
165k
            _cur_reader->set_batch_size(_predict_reader_batch_rows());
584
165k
        }
585
194k
        if (_need_iceberg_rowid_column && _current_range.__isset.table_format_params &&
586
194k
            _current_range.table_format_params.table_format_type == "iceberg") {
587
230
            if (auto* iceberg_reader = dynamic_cast<IcebergTableReader*>(_cur_reader.get())) {
588
230
                iceberg_reader->set_row_id_column_position(_iceberg_rowid_column_pos);
589
230
            }
590
230
        }
591
194k
        {
592
194k
            SCOPED_TIMER(_get_block_timer);
593
594
            // Read next block.
595
            // Some of column in block may not be filled (column not exist in file)
596
194k
            RETURN_IF_ERROR(
597
194k
                    _cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof));
598
194k
        }
599
        // use read_rows instead of _src_block_ptr->rows(), because the first column of _src_block_ptr
600
        // may not be filled after calling `get_next_block()`, so _src_block_ptr->rows() may return wrong result.
601
194k
        if (read_rows > 0) {
602
98.5k
            if ((!_cur_reader->count_read_rows()) && _io_ctx) {
603
13.5k
                _io_ctx->file_reader_stats->read_rows += read_rows;
604
13.5k
            }
605
            // If the push_down_agg_type is COUNT, no need to do the rest,
606
            // because we only save a number in block.
607
98.5k
            if (_get_push_down_agg_type() != TPushAggOp::type::COUNT) {
608
                // Convert the src block columns type to string in-place.
609
94.3k
                RETURN_IF_ERROR(_cast_to_input_block(block));
610
                // FileReader can fill partition and missing columns itself
611
94.3k
                if (!_cur_reader->fill_all_columns()) {
612
                    // Fill rows in src block with partition columns from path. (e.g. Hive partition columns)
613
12.4k
                    RETURN_IF_ERROR(_fill_columns_from_path(read_rows));
614
                    // Fill columns not exist in file with null or default value
615
12.4k
                    RETURN_IF_ERROR(_fill_missing_columns(read_rows));
616
12.4k
                }
617
                // Apply _pre_conjunct_ctxs to filter src block.
618
94.3k
                RETURN_IF_ERROR(_pre_filter_src_block());
619
620
                // Convert src block to output block (dest block), string to dest data type and apply filters.
621
94.3k
                RETURN_IF_ERROR(_convert_to_output_block(block));
622
94.3k
                _update_adaptive_batch_size_before_truncate(*block);
623
                // Truncate char columns or varchar columns if size is smaller than file columns
624
                // or not found in the file column schema.
625
94.3k
                RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block));
626
94.3k
                _update_adaptive_batch_size_after_truncate(*block);
627
94.3k
            }
628
98.5k
        }
629
194k
        break;
630
194k
    } while (true);
631
632
    // Update filtered rows and unselected rows for load, reset counter.
633
    // {
634
    //     state->update_num_rows_load_filtered(_counter.num_rows_filtered);
635
    //     state->update_num_rows_load_unselected(_counter.num_rows_unselected);
636
    //     _reset_counter();
637
    // }
638
194k
    return Status::OK();
639
277k
}
640
641
/**
642
 * Check whether there are complex types in parquet/orc reader in broker/stream load.
643
 * Broker/stream load will cast any type as string type, and complex types will be casted wrong.
644
 * This is a temporary method, and will be replaced by tvf.
645
 */
646
866
Status FileScanner::_check_output_block_types() {
647
866
    if (_is_load) {
648
866
        TFileFormatType::type format_type = _params->format_type;
649
866
        if (format_type == TFileFormatType::FORMAT_PARQUET ||
650
866
            format_type == TFileFormatType::FORMAT_ORC) {
651
50
            for (auto slot : _output_tuple_desc->slots()) {
652
50
                if (is_complex_type(slot->type()->get_primitive_type())) {
653
0
                    return Status::InternalError(
654
0
                            "Parquet/orc doesn't support complex types in broker/stream load, "
655
0
                            "please use tvf(table value function) to insert complex types.");
656
0
                }
657
50
            }
658
10
        }
659
866
    }
660
866
    return Status::OK();
661
866
}
662
663
194k
Status FileScanner::_init_src_block(Block* block) {
664
194k
    if (!_is_load) {
665
193k
        _src_block_ptr = block;
666
667
193k
        bool update_name_to_idx = _src_block_name_to_idx.empty();
668
193k
        _iceberg_rowid_column_pos = -1;
669
193k
        if (_need_iceberg_rowid_column && _current_range.__isset.table_format_params &&
670
193k
            _current_range.table_format_params.table_format_type == "iceberg") {
671
230
            int row_id_idx = block->get_position_by_name(BeConsts::ICEBERG_ROWID_COL);
672
230
            if (row_id_idx >= 0) {
673
230
                _iceberg_rowid_column_pos = row_id_idx;
674
230
                if (!update_name_to_idx &&
675
230
                    !_src_block_name_to_idx.contains(BeConsts::ICEBERG_ROWID_COL)) {
676
0
                    update_name_to_idx = true;
677
0
                }
678
230
            }
679
230
        }
680
681
        // Build name to index map only once on first call
682
193k
        if (update_name_to_idx) {
683
50.0k
            _src_block_name_to_idx = block->get_name_to_pos_map();
684
50.0k
        }
685
193k
        return Status::OK();
686
193k
    }
687
840
    RETURN_IF_ERROR(_check_output_block_types());
688
689
    // if (_src_block_init) {
690
    //     _src_block.clear_column_data();
691
    //     _src_block_ptr = &_src_block;
692
    //     return Status::OK();
693
    // }
694
695
840
    _src_block.clear();
696
840
    uint32_t idx = 0;
697
    // slots in _input_tuple_desc contains all slots describe in load statement, eg:
698
    // -H "columns: k1, k2, tmp1, k3 = tmp1 + 1"
699
    // _input_tuple_desc will contains: k1, k2, tmp1
700
    // and some of them are from file, such as k1 and k2, and some of them may not exist in file, such as tmp1
701
    // _input_tuple_desc also contains columns from path
702
14.3k
    for (auto& slot : _input_tuple_desc->slots()) {
703
14.3k
        DataTypePtr data_type;
704
14.3k
        auto it = _slot_lower_name_to_col_type.find(slot->col_name());
705
14.3k
        if (slot->is_skip_bitmap_col()) {
706
0
            _skip_bitmap_col_idx = idx;
707
0
        }
708
14.3k
        if (_params->__isset.sequence_map_col) {
709
0
            if (_params->sequence_map_col == slot->col_name()) {
710
0
                _sequence_map_col_uid = slot->col_unique_id();
711
0
            }
712
0
        }
713
14.3k
        data_type =
714
14.3k
                it == _slot_lower_name_to_col_type.end() ? slot->type() : make_nullable(it->second);
715
14.3k
        MutableColumnPtr data_column = data_type->create_column();
716
14.3k
        _src_block.insert(
717
14.3k
                ColumnWithTypeAndName(std::move(data_column), data_type, slot->col_name()));
718
14.3k
        _src_block_name_to_idx.emplace(slot->col_name(), idx++);
719
14.3k
    }
720
840
    if (_params->__isset.sequence_map_col) {
721
0
        for (const auto& slot : _output_tuple_desc->slots()) {
722
            // When the target table has seqeunce map column, _input_tuple_desc will not contains __DORIS_SEQUENCE_COL__,
723
            // so we should get its column unique id from _output_tuple_desc
724
0
            if (slot->is_sequence_col()) {
725
0
                _sequence_col_uid = slot->col_unique_id();
726
0
            }
727
0
        }
728
0
    }
729
840
    _src_block_ptr = &_src_block;
730
840
    _src_block_init = true;
731
840
    return Status::OK();
732
840
}
733
734
94.3k
Status FileScanner::_cast_to_input_block(Block* block) {
735
94.3k
    if (!_is_load) {
736
93.8k
        return Status::OK();
737
93.8k
    }
738
484
    SCOPED_TIMER(_cast_to_input_block_timer);
739
    // cast primitive type(PT0) to primitive type(PT1)
740
484
    uint32_t idx = 0;
741
8.95k
    for (auto& slot_desc : _input_tuple_desc->slots()) {
742
8.95k
        if (_slot_lower_name_to_col_type.find(slot_desc->col_name()) ==
743
8.95k
            _slot_lower_name_to_col_type.end()) {
744
            // skip columns which does not exist in file
745
120
            continue;
746
120
        }
747
8.83k
        auto& arg = _src_block_ptr->get_by_position(_src_block_name_to_idx[slot_desc->col_name()]);
748
8.83k
        auto return_type = slot_desc->get_data_type_ptr();
749
        // remove nullable here, let the get_function decide whether nullable
750
8.83k
        auto data_type = get_data_type_with_default_argument(remove_nullable(return_type));
751
8.83k
        ColumnsWithTypeAndName arguments {
752
8.83k
                arg, {data_type->create_column(), data_type, slot_desc->col_name()}};
753
8.83k
        auto func_cast =
754
8.83k
                SimpleFunctionFactory::instance().get_function("CAST", arguments, return_type, {});
755
8.83k
        if (!func_cast) {
756
0
            return Status::InternalError("Function CAST[arg={}, col name={}, return={}] not found!",
757
0
                                         arg.type->get_name(), slot_desc->col_name(),
758
0
                                         return_type->get_name());
759
0
        }
760
8.83k
        idx = _src_block_name_to_idx[slot_desc->col_name()];
761
8.83k
        DCHECK(_state != nullptr);
762
8.83k
        auto ctx = FunctionContext::create_context(_state, {}, {});
763
8.83k
        RETURN_IF_ERROR(
764
8.83k
                func_cast->execute(ctx.get(), *_src_block_ptr, {idx}, idx, arg.column->size()));
765
8.83k
        _src_block_ptr->get_by_position(idx).type = std::move(return_type);
766
8.83k
    }
767
484
    return Status::OK();
768
484
}
769
770
12.4k
Status FileScanner::_fill_columns_from_path(size_t rows) {
771
12.4k
    if (!_fill_partition_from_path) {
772
56
        return Status::OK();
773
56
    }
774
12.4k
    DataTypeSerDe::FormatOptions _text_formatOptions;
775
12.4k
    for (auto& kv : _partition_col_descs) {
776
1.73k
        auto doris_column =
777
1.73k
                _src_block_ptr->get_by_position(_src_block_name_to_idx[kv.first]).column;
778
        // _src_block_ptr points to a mutable block created by this class itself, so const_cast can be used here.
779
1.73k
        IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
780
1.73k
        auto& [value, slot_desc] = kv.second;
781
1.73k
        auto _text_serde = slot_desc->get_data_type_ptr()->get_serde();
782
1.73k
        Slice slice(value.data(), value.size());
783
1.73k
        uint64_t num_deserialized = 0;
784
1.73k
        if (_text_serde->deserialize_column_from_fixed_json(*col_ptr, slice, rows,
785
1.73k
                                                            &num_deserialized,
786
1.73k
                                                            _text_formatOptions) != Status::OK()) {
787
0
            return Status::InternalError("Failed to fill partition column: {}={}",
788
0
                                         slot_desc->col_name(), value);
789
0
        }
790
1.73k
        if (num_deserialized != rows) {
791
0
            return Status::InternalError(
792
0
                    "Failed to fill partition column: {}={} ."
793
0
                    "Number of rows expected to be written : {}, number of rows actually written : "
794
0
                    "{}",
795
0
                    slot_desc->col_name(), value, num_deserialized, rows);
796
0
        }
797
1.73k
    }
798
12.4k
    return Status::OK();
799
12.4k
}
800
801
12.4k
Status FileScanner::_fill_missing_columns(size_t rows) {
802
12.4k
    if (_missing_cols.empty()) {
803
12.4k
        return Status::OK();
804
12.4k
    }
805
806
18.4E
    SCOPED_TIMER(_fill_missing_columns_timer);
807
18.4E
    for (auto& kv : _missing_col_descs) {
808
0
        if (kv.second == nullptr) {
809
            // no default column, fill with null
810
0
            auto mutable_column = _src_block_ptr->get_by_position(_src_block_name_to_idx[kv.first])
811
0
                                          .column->assume_mutable();
812
0
            auto* nullable_column = static_cast<ColumnNullable*>(mutable_column.get());
813
0
            nullable_column->insert_many_defaults(rows);
814
0
        } else {
815
            // fill with default value
816
0
            auto& ctx = kv.second;
817
0
            ColumnPtr result_column_ptr;
818
            // PT1 => dest primitive type
819
0
            RETURN_IF_ERROR(ctx->execute(_src_block_ptr, result_column_ptr));
820
0
            if (result_column_ptr->use_count() == 1) {
821
                // call resize because the first column of _src_block_ptr may not be filled by reader,
822
                // so _src_block_ptr->rows() may return wrong result, cause the column created by `ctx->execute()`
823
                // has only one row.
824
0
                auto mutable_column = result_column_ptr->assume_mutable();
825
0
                mutable_column->resize(rows);
826
                // result_column_ptr maybe a ColumnConst, convert it to a normal column
827
0
                result_column_ptr = result_column_ptr->convert_to_full_column_if_const();
828
0
                auto origin_column_type =
829
0
                        _src_block_ptr->get_by_position(_src_block_name_to_idx[kv.first]).type;
830
0
                bool is_nullable = origin_column_type->is_nullable();
831
0
                if (!_src_block_name_to_idx.contains(kv.first)) {
832
0
                    return Status::InternalError("Column {} not found in src block {}", kv.first,
833
0
                                                 _src_block_ptr->dump_structure());
834
0
                }
835
0
                _src_block_ptr->replace_by_position(
836
0
                        _src_block_name_to_idx[kv.first],
837
0
                        is_nullable ? make_nullable(result_column_ptr) : result_column_ptr);
838
0
            }
839
0
        }
840
0
    }
841
18.4E
    return Status::OK();
842
18.4E
}
843
844
94.3k
Status FileScanner::_pre_filter_src_block() {
845
94.3k
    if (!_is_load) {
846
93.8k
        return Status::OK();
847
93.8k
    }
848
486
    if (!_pre_conjunct_ctxs.empty()) {
849
0
        SCOPED_TIMER(_pre_filter_timer);
850
0
        auto origin_column_num = _src_block_ptr->columns();
851
0
        auto old_rows = _src_block_ptr->rows();
852
0
        RETURN_IF_ERROR(
853
0
                VExprContext::filter_block(_pre_conjunct_ctxs, _src_block_ptr, origin_column_num));
854
0
        _counter.num_rows_unselected += old_rows - _src_block_ptr->rows();
855
0
    }
856
486
    return Status::OK();
857
486
}
858
859
94.3k
Status FileScanner::_convert_to_output_block(Block* block) {
860
94.3k
    if (!_is_load) {
861
93.8k
        return Status::OK();
862
93.8k
    }
863
482
    SCOPED_TIMER(_convert_to_output_block_timer);
864
    // The block is passed from scanner context's free blocks,
865
    // which is initialized by output columns
866
    // so no need to clear it
867
    // block->clear();
868
869
482
    int ctx_idx = 0;
870
482
    size_t rows = _src_block_ptr->rows();
871
482
    auto filter_column = ColumnUInt8::create(rows, 1);
872
482
    auto& filter_map = filter_column->get_data();
873
874
    // After convert, the column_ptr should be copied into output block.
875
    // Can not use block->insert() because it may cause use_count() non-zero bug
876
482
    MutableBlock mutable_output_block =
877
482
            VectorizedUtils::build_mutable_mem_reuse_block(block, *_dest_row_desc);
878
482
    auto& mutable_output_columns = mutable_output_block.mutable_columns();
879
880
482
    std::vector<BitmapValue>* skip_bitmaps {nullptr};
881
482
    if (_should_process_skip_bitmap_col()) {
882
0
        auto* skip_bitmap_nullable_col_ptr =
883
0
                assert_cast<ColumnNullable*>(_src_block_ptr->get_by_position(_skip_bitmap_col_idx)
884
0
                                                     .column->assume_mutable()
885
0
                                                     .get());
886
0
        skip_bitmaps = &(assert_cast<ColumnBitmap*>(
887
0
                                 skip_bitmap_nullable_col_ptr->get_nested_column_ptr().get())
888
0
                                 ->get_data());
889
        // NOTE:
890
        // - If the table has sequence type column, __DORIS_SEQUENCE_COL__ will be put in _input_tuple_desc, so whether
891
        //   __DORIS_SEQUENCE_COL__ will be marked in skip bitmap depends on whether it's specified in that row
892
        // - If the table has sequence map column, __DORIS_SEQUENCE_COL__ will not be put in _input_tuple_desc,
893
        //   so __DORIS_SEQUENCE_COL__ will be ommited if it't specified in a row and will not be marked in skip bitmap.
894
        //   So we should mark __DORIS_SEQUENCE_COL__ in skip bitmap here if the corresponding sequence map column us marked
895
0
        if (_sequence_map_col_uid != -1) {
896
0
            for (int j = 0; j < rows; ++j) {
897
0
                if ((*skip_bitmaps)[j].contains(_sequence_map_col_uid)) {
898
0
                    (*skip_bitmaps)[j].add(_sequence_col_uid);
899
0
                }
900
0
            }
901
0
        }
902
0
    }
903
904
    // for (auto slot_desc : _output_tuple_desc->slots()) {
905
9.94k
    for (int j = 0; j < mutable_output_columns.size(); ++j) {
906
9.46k
        auto* slot_desc = _output_tuple_desc->slots()[j];
907
9.46k
        int dest_index = ctx_idx;
908
9.46k
        ColumnPtr column_ptr;
909
910
9.46k
        auto& ctx = _dest_vexpr_ctx[dest_index];
911
        // PT1 => dest primitive type
912
9.46k
        RETURN_IF_ERROR(ctx->execute(_src_block_ptr, column_ptr));
913
        // column_ptr maybe a ColumnConst, convert it to a normal column
914
9.46k
        column_ptr = column_ptr->convert_to_full_column_if_const();
915
9.46k
        DCHECK(column_ptr);
916
917
        // because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr
918
        // is likely to be nullable
919
9.46k
        if (LIKELY(column_ptr->is_nullable())) {
920
9.33k
            const auto* nullable_column = reinterpret_cast<const ColumnNullable*>(column_ptr.get());
921
4.97M
            for (int i = 0; i < rows; ++i) {
922
4.96M
                if (filter_map[i] && nullable_column->is_null_at(i)) {
923
                    // skip checks for non-mentioned columns in flexible partial update
924
223k
                    if (skip_bitmaps == nullptr ||
925
223k
                        !skip_bitmaps->at(i).contains(slot_desc->col_unique_id())) {
926
                        // clang-format off
927
223k
                        if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) &&
928
223k
                            !_src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index]).column->is_null_at(i)) {
929
0
                            filter_map[i] = false;
930
0
                            RETURN_IF_ERROR(_state->append_error_msg_to_file(
931
0
                                [&]() -> std::string {
932
0
                                    return _src_block_ptr->dump_one_line(i, _num_of_columns_from_file);
933
0
                                },
934
0
                                [&]() -> std::string {
935
0
                                    auto raw_value =
936
0
                                            _src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index]).column->get_data_at(i);
937
0
                                    std::string raw_string = raw_value.to_string();
938
0
                                    fmt::memory_buffer error_msg;
939
0
                                    fmt::format_to(error_msg,"column({}) value is incorrect while strict mode is {}, src value is {}",
940
0
                                            slot_desc->col_name(), _strict_mode, raw_string);
941
0
                                    return fmt::to_string(error_msg);
942
0
                                }));
943
223k
                        } else if (!slot_desc->is_nullable()) {
944
10
                            filter_map[i] = false;
945
10
                            RETURN_IF_ERROR(_state->append_error_msg_to_file(
946
10
                                [&]() -> std::string {
947
10
                                    return _src_block_ptr->dump_one_line(i, _num_of_columns_from_file);
948
10
                                },
949
10
                                [&]() -> std::string {
950
10
                                    fmt::memory_buffer error_msg;
951
10
                                    fmt::format_to(error_msg, "column({}) values is null while columns is not nullable", slot_desc->col_name());
952
10
                                    return fmt::to_string(error_msg);
953
10
                                }));
954
10
                        }
955
                        // clang-format on
956
223k
                    }
957
223k
                }
958
4.96M
            }
959
9.33k
            if (!slot_desc->is_nullable()) {
960
410
                column_ptr = remove_nullable(column_ptr);
961
410
            }
962
9.33k
        } else if (slot_desc->is_nullable()) {
963
0
            column_ptr = make_nullable(column_ptr);
964
0
        }
965
9.46k
        mutable_output_columns[j]->insert_range_from(*column_ptr, 0, rows);
966
9.46k
        ctx_idx++;
967
9.46k
    }
968
969
    // after do the dest block insert operation, clear _src_block to remove the reference of origin column
970
482
    _src_block_ptr->clear();
971
972
482
    size_t dest_size = block->columns();
973
    // do filter
974
482
    block->insert(ColumnWithTypeAndName(std::move(filter_column), std::make_shared<DataTypeUInt8>(),
975
482
                                        "filter column"));
976
482
    RETURN_IF_ERROR(Block::filter_block(block, dest_size, dest_size));
977
978
482
    _counter.num_rows_filtered += rows - block->rows();
979
482
    return Status::OK();
980
482
}
981
982
94.3k
Status FileScanner::_truncate_char_or_varchar_columns(Block* block) {
983
    // Truncate char columns or varchar columns if size is smaller than file columns
984
    // or not found in the file column schema.
985
94.3k
    if (!_state->query_options().truncate_char_or_varchar_columns) {
986
94.3k
        return Status::OK();
987
94.3k
    }
988
8
    int idx = 0;
989
36
    for (auto* slot_desc : _real_tuple_desc->slots()) {
990
36
        const auto& type = slot_desc->type();
991
36
        if (type->get_primitive_type() != TYPE_VARCHAR && type->get_primitive_type() != TYPE_CHAR) {
992
12
            ++idx;
993
12
            continue;
994
12
        }
995
24
        auto iter = _source_file_col_name_types.find(slot_desc->col_name());
996
24
        if (iter != _source_file_col_name_types.end()) {
997
16
            const auto file_type_desc = _source_file_col_name_types[slot_desc->col_name()];
998
16
            int l = -1;
999
16
            if (auto* ftype = check_and_get_data_type<DataTypeString>(
1000
16
                        remove_nullable(file_type_desc).get())) {
1001
16
                l = ftype->len();
1002
16
            }
1003
16
            if ((assert_cast<const DataTypeString*>(remove_nullable(type).get())->len() > 0) &&
1004
16
                (assert_cast<const DataTypeString*>(remove_nullable(type).get())->len() < l ||
1005
16
                 l < 0)) {
1006
16
                _truncate_char_or_varchar_column(
1007
16
                        block, idx,
1008
16
                        assert_cast<const DataTypeString*>(remove_nullable(type).get())->len());
1009
16
            }
1010
16
        } else {
1011
8
            _truncate_char_or_varchar_column(
1012
8
                    block, idx,
1013
8
                    assert_cast<const DataTypeString*>(remove_nullable(type).get())->len());
1014
8
        }
1015
24
        ++idx;
1016
24
    }
1017
8
    return Status::OK();
1018
94.3k
}
1019
1020
// VARCHAR substring(VARCHAR str, INT pos[, INT len])
1021
24
void FileScanner::_truncate_char_or_varchar_column(Block* block, int idx, int len) {
1022
24
    auto int_type = std::make_shared<DataTypeInt32>();
1023
24
    uint32_t num_columns_without_result = block->columns();
1024
24
    const ColumnNullable* col_nullable =
1025
24
            assert_cast<const ColumnNullable*>(block->get_by_position(idx).column.get());
1026
24
    const ColumnPtr& string_column_ptr = col_nullable->get_nested_column_ptr();
1027
24
    ColumnPtr null_map_column_ptr = col_nullable->get_null_map_column_ptr();
1028
24
    block->replace_by_position(idx, std::move(string_column_ptr));
1029
24
    block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(1)), int_type,
1030
24
                   "const 1"}); // pos is 1
1031
24
    block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(len)), int_type,
1032
24
                   fmt::format("const {}", len)});                          // len
1033
24
    block->insert({nullptr, std::make_shared<DataTypeString>(), "result"}); // result column
1034
24
    ColumnNumbers temp_arguments(3);
1035
24
    temp_arguments[0] = idx;                            // str column
1036
24
    temp_arguments[1] = num_columns_without_result;     // pos
1037
24
    temp_arguments[2] = num_columns_without_result + 1; // len
1038
24
    uint32_t result_column_id = num_columns_without_result + 2;
1039
1040
24
    SubstringUtil::substring_execute(*block, temp_arguments, result_column_id, block->rows());
1041
24
    auto res = ColumnNullable::create(block->get_by_position(result_column_id).column,
1042
24
                                      null_map_column_ptr);
1043
24
    block->replace_by_position(idx, std::move(res));
1044
24
    Block::erase_useless_column(block, num_columns_without_result);
1045
24
}
1046
1047
7.47k
Status FileScanner::_create_row_id_column_iterator() {
1048
7.47k
    auto& id_file_map = _state->get_id_file_map();
1049
7.47k
    auto file_id = id_file_map->get_file_mapping_id(
1050
7.47k
            std::make_shared<FileMapping>(((FileScanLocalState*)_local_state)->parent_id(),
1051
7.47k
                                          _current_range, _should_enable_file_meta_cache()));
1052
7.47k
    _row_id_column_iterator_pair.first = std::make_shared<RowIdColumnIteratorV2>(
1053
7.47k
            IdManager::ID_VERSION, BackendOptions::get_backend_id(), file_id);
1054
7.47k
    return Status::OK();
1055
7.47k
}
1056
1057
152k
Status FileScanner::_get_next_reader() {
1058
152k
    while (true) {
1059
152k
        if (_cur_reader) {
1060
68.1k
            _cur_reader->collect_profile_before_close();
1061
68.1k
            RETURN_IF_ERROR(_cur_reader->close());
1062
68.1k
            _state->update_num_finished_scan_range(1);
1063
68.1k
        }
1064
152k
        _cur_reader.reset(nullptr);
1065
152k
        _reset_adaptive_batch_size_state();
1066
152k
        _src_block_init = false;
1067
152k
        bool has_next = _first_scan_range;
1068
152k
        if (!_first_scan_range) {
1069
105k
            RETURN_IF_ERROR(_split_source->get_next(&has_next, &_current_range));
1070
105k
        }
1071
152k
        _first_scan_range = false;
1072
152k
        if (!has_next || _should_stop) {
1073
83.6k
            _scanner_eof = true;
1074
83.6k
            return Status::OK();
1075
83.6k
        }
1076
1077
69.1k
        const TFileRangeDesc& range = _current_range;
1078
69.1k
        _current_range_path = range.path;
1079
1080
69.1k
        if (!_partition_slot_descs.empty()) {
1081
            // we need get partition columns first for runtime filter partition pruning
1082
7.35k
            RETURN_IF_ERROR(_generate_partition_columns());
1083
1084
7.35k
            if (_state->query_options().enable_runtime_filter_partition_prune) {
1085
                // if enable_runtime_filter_partition_prune is true, we need to check whether this range can be filtered out
1086
                // by runtime filter partition prune
1087
7.35k
                if (_push_down_conjuncts.size() < _conjuncts.size()) {
1088
                    // there are new runtime filters, need to re-init runtime filter partition pruning ctxs
1089
3.68k
                    _init_runtime_filter_partition_prune_ctxs();
1090
3.68k
                }
1091
1092
7.35k
                bool can_filter_all = false;
1093
7.35k
                RETURN_IF_ERROR(_process_runtime_filters_partition_prune(can_filter_all));
1094
7.35k
                if (can_filter_all) {
1095
                    // this range can be filtered out by runtime filter partition pruning
1096
                    // so we need to skip this range
1097
500
                    COUNTER_UPDATE(_runtime_filter_partition_pruned_range_counter, 1);
1098
500
                    continue;
1099
500
                }
1100
7.35k
            }
1101
7.35k
        }
1102
1103
        // create reader for specific format
1104
68.6k
        Status init_status = Status::OK();
1105
68.6k
        TFileFormatType::type format_type = _get_current_format_type();
1106
        // for compatibility, this logic is deprecated in 3.1
1107
68.6k
        if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params) {
1108
5.67k
            if (range.table_format_params.table_format_type == "paimon" &&
1109
5.67k
                !range.table_format_params.paimon_params.__isset.paimon_split) {
1110
                // use native reader
1111
0
                auto format = range.table_format_params.paimon_params.file_format;
1112
0
                if (format == "orc") {
1113
0
                    format_type = TFileFormatType::FORMAT_ORC;
1114
0
                } else if (format == "parquet") {
1115
0
                    format_type = TFileFormatType::FORMAT_PARQUET;
1116
0
                } else {
1117
0
                    return Status::InternalError("Not supported paimon file format: {}", format);
1118
0
                }
1119
0
            }
1120
5.67k
        }
1121
1122
        // JNI reader can only push down column value range
1123
68.6k
        bool push_down_predicates = !_is_load && format_type != TFileFormatType::FORMAT_JNI;
1124
68.6k
        bool need_to_get_parsed_schema = false;
1125
68.6k
        switch (format_type) {
1126
5.67k
        case TFileFormatType::FORMAT_JNI: {
1127
5.67k
            if (range.__isset.table_format_params &&
1128
5.67k
                range.table_format_params.table_format_type == "max_compute") {
1129
0
                const auto* mc_desc = static_cast<const MaxComputeTableDescriptor*>(
1130
0
                        _real_tuple_desc->table_desc());
1131
0
                if (!mc_desc->init_status()) {
1132
0
                    return mc_desc->init_status();
1133
0
                }
1134
0
                std::unique_ptr<MaxComputeJniReader> mc_reader = MaxComputeJniReader::create_unique(
1135
0
                        mc_desc, range.table_format_params.max_compute_params, _file_slot_descs,
1136
0
                        range, _state, _profile);
1137
0
                init_status = mc_reader->init_reader();
1138
0
                _cur_reader = std::move(mc_reader);
1139
5.67k
            } else if (range.__isset.table_format_params &&
1140
5.67k
                       range.table_format_params.table_format_type == "paimon") {
1141
1.91k
                if (_state->query_options().__isset.enable_paimon_cpp_reader &&
1142
1.91k
                    _state->query_options().enable_paimon_cpp_reader) {
1143
0
                    auto cpp_reader = PaimonCppReader::create_unique(_file_slot_descs, _state,
1144
0
                                                                     _profile, range, _params);
1145
0
                    cpp_reader->set_push_down_agg_type(_get_push_down_agg_type());
1146
0
                    if (!_is_load && !_push_down_conjuncts.empty()) {
1147
0
                        PaimonPredicateConverter predicate_converter(_file_slot_descs, _state);
1148
0
                        auto predicate = predicate_converter.build(_push_down_conjuncts);
1149
0
                        if (predicate) {
1150
0
                            cpp_reader->set_predicate(std::move(predicate));
1151
0
                        }
1152
0
                    }
1153
0
                    init_status = cpp_reader->init_reader();
1154
0
                    _cur_reader = std::move(cpp_reader);
1155
1.91k
                } else {
1156
1.91k
                    _cur_reader = PaimonJniReader::create_unique(_file_slot_descs, _state, _profile,
1157
1.91k
                                                                 range, _params);
1158
1.91k
                    init_status = ((PaimonJniReader*)(_cur_reader.get()))->init_reader();
1159
1.91k
                }
1160
3.76k
            } else if (range.__isset.table_format_params &&
1161
3.76k
                       range.table_format_params.table_format_type == "hudi") {
1162
0
                _cur_reader = HudiJniReader::create_unique(*_params,
1163
0
                                                           range.table_format_params.hudi_params,
1164
0
                                                           _file_slot_descs, _state, _profile);
1165
0
                init_status = ((HudiJniReader*)_cur_reader.get())->init_reader();
1166
1167
3.76k
            } else if (range.__isset.table_format_params &&
1168
3.76k
                       range.table_format_params.table_format_type == "trino_connector") {
1169
790
                _cur_reader = TrinoConnectorJniReader::create_unique(_file_slot_descs, _state,
1170
790
                                                                     _profile, range);
1171
790
                init_status = ((TrinoConnectorJniReader*)(_cur_reader.get()))->init_reader();
1172
2.97k
            } else if (range.__isset.table_format_params &&
1173
2.97k
                       range.table_format_params.table_format_type == "jdbc") {
1174
                // Extract jdbc params from table_format_params
1175
1.73k
                std::map<std::string, std::string> jdbc_params(
1176
1.73k
                        range.table_format_params.jdbc_params.begin(),
1177
1.73k
                        range.table_format_params.jdbc_params.end());
1178
1.73k
                _cur_reader = JdbcJniReader::create_unique(_file_slot_descs, _state, _profile,
1179
1.73k
                                                           jdbc_params);
1180
1.73k
                init_status = ((JdbcJniReader*)(_cur_reader.get()))->init_reader();
1181
1.73k
            } else if (range.__isset.table_format_params &&
1182
1.24k
                       range.table_format_params.table_format_type == "iceberg") {
1183
1.24k
                _cur_reader = IcebergSysTableJniReader::create_unique(_file_slot_descs, _state,
1184
1.24k
                                                                      _profile, range, _params);
1185
1.24k
                init_status = ((IcebergSysTableJniReader*)(_cur_reader.get()))->init_reader();
1186
1.24k
            }
1187
            // Set col_name_to_block_idx for JNI readers to avoid repeated map creation
1188
5.67k
            if (_cur_reader) {
1189
5.67k
                if (auto* jni_reader = dynamic_cast<JniReader*>(_cur_reader.get())) {
1190
5.67k
                    jni_reader->set_col_name_to_block_idx(&_src_block_name_to_idx);
1191
5.67k
                }
1192
5.67k
            }
1193
5.67k
            break;
1194
5.67k
        }
1195
32.6k
        case TFileFormatType::FORMAT_PARQUET: {
1196
32.6k
            auto file_meta_cache_ptr = _should_enable_file_meta_cache()
1197
32.6k
                                               ? ExecEnv::GetInstance()->file_meta_cache()
1198
32.6k
                                               : nullptr;
1199
32.6k
            std::unique_ptr<ParquetReader> parquet_reader = ParquetReader::create_unique(
1200
32.6k
                    _profile, *_params, range, _state->query_options().batch_size,
1201
32.6k
                    &_state->timezone_obj(), _io_ctx.get(), _state, file_meta_cache_ptr,
1202
32.6k
                    _state->query_options().enable_parquet_lazy_mat);
1203
1204
32.6k
            if (_row_id_column_iterator_pair.second != -1) {
1205
2.13k
                RETURN_IF_ERROR(_create_row_id_column_iterator());
1206
2.13k
                parquet_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
1207
2.13k
            }
1208
1209
            // ATTN: the push down agg type may be set back to NONE,
1210
            // see IcebergTableReader::init_row_filters for example.
1211
32.6k
            parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
1212
32.6k
            if (push_down_predicates) {
1213
32.6k
                RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1214
32.6k
            }
1215
32.6k
            RETURN_IF_ERROR(_init_parquet_reader(std::move(parquet_reader), file_meta_cache_ptr));
1216
1217
32.6k
            need_to_get_parsed_schema = true;
1218
32.6k
            break;
1219
32.6k
        }
1220
24.4k
        case TFileFormatType::FORMAT_ORC: {
1221
24.4k
            auto file_meta_cache_ptr = _should_enable_file_meta_cache()
1222
24.4k
                                               ? ExecEnv::GetInstance()->file_meta_cache()
1223
24.4k
                                               : nullptr;
1224
24.4k
            std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
1225
24.4k
                    _profile, _state, *_params, range, _state->query_options().batch_size,
1226
24.4k
                    _state->timezone(), _io_ctx.get(), file_meta_cache_ptr,
1227
24.4k
                    _state->query_options().enable_orc_lazy_mat);
1228
24.4k
            if (_row_id_column_iterator_pair.second != -1) {
1229
5.36k
                RETURN_IF_ERROR(_create_row_id_column_iterator());
1230
5.36k
                orc_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
1231
5.36k
            }
1232
1233
24.4k
            orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
1234
24.4k
            if (push_down_predicates) {
1235
24.4k
                RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1236
24.4k
            }
1237
24.4k
            RETURN_IF_ERROR(_init_orc_reader(std::move(orc_reader), file_meta_cache_ptr));
1238
1239
24.4k
            need_to_get_parsed_schema = true;
1240
24.4k
            break;
1241
24.4k
        }
1242
800
        case TFileFormatType::FORMAT_CSV_PLAIN:
1243
800
        case TFileFormatType::FORMAT_CSV_GZ:
1244
800
        case TFileFormatType::FORMAT_CSV_BZ2:
1245
800
        case TFileFormatType::FORMAT_CSV_LZ4FRAME:
1246
800
        case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
1247
800
        case TFileFormatType::FORMAT_CSV_LZOP:
1248
800
        case TFileFormatType::FORMAT_CSV_DEFLATE:
1249
800
        case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
1250
800
        case TFileFormatType::FORMAT_PROTO: {
1251
800
            auto reader = CsvReader::create_unique(_state, _profile, &_counter, *_params, range,
1252
800
                                                   _file_slot_descs, _io_ctx.get());
1253
1254
800
            init_status = reader->init_reader(_is_load);
1255
800
            _cur_reader = std::move(reader);
1256
800
            break;
1257
800
        }
1258
4.70k
        case TFileFormatType::FORMAT_TEXT: {
1259
4.70k
            auto reader = TextReader::create_unique(_state, _profile, &_counter, *_params, range,
1260
4.70k
                                                    _file_slot_descs, _io_ctx.get());
1261
4.70k
            init_status = reader->init_reader(_is_load);
1262
4.70k
            _cur_reader = std::move(reader);
1263
4.70k
            break;
1264
800
        }
1265
326
        case TFileFormatType::FORMAT_JSON: {
1266
326
            _cur_reader =
1267
326
                    NewJsonReader::create_unique(_state, _profile, &_counter, *_params, range,
1268
326
                                                 _file_slot_descs, &_scanner_eof, _io_ctx.get());
1269
326
            init_status = ((NewJsonReader*)(_cur_reader.get()))
1270
326
                                  ->init_reader(_col_default_value_ctx, _is_load);
1271
326
            break;
1272
800
        }
1273
1274
0
        case TFileFormatType::FORMAT_WAL: {
1275
0
            _cur_reader = WalReader::create_unique(_state);
1276
0
            init_status = ((WalReader*)(_cur_reader.get()))->init_reader(_output_tuple_desc);
1277
0
            break;
1278
800
        }
1279
0
        case TFileFormatType::FORMAT_NATIVE: {
1280
0
            auto reader =
1281
0
                    NativeReader::create_unique(_profile, *_params, range, _io_ctx.get(), _state);
1282
0
            init_status = reader->init_reader();
1283
0
            _cur_reader = std::move(reader);
1284
0
            need_to_get_parsed_schema = false;
1285
0
            break;
1286
800
        }
1287
98
        case TFileFormatType::FORMAT_ARROW: {
1288
98
            if (range.__isset.table_format_params &&
1289
98
                range.table_format_params.table_format_type == "remote_doris") {
1290
98
                _cur_reader =
1291
98
                        RemoteDorisReader::create_unique(_file_slot_descs, _state, _profile, range);
1292
98
                init_status = ((RemoteDorisReader*)(_cur_reader.get()))->init_reader();
1293
98
                if (_cur_reader) {
1294
98
                    static_cast<RemoteDorisReader*>(_cur_reader.get())
1295
98
                            ->set_col_name_to_block_idx(&_src_block_name_to_idx);
1296
98
                }
1297
98
            } else {
1298
0
                _cur_reader =
1299
0
                        ArrowStreamReader::create_unique(_state, _profile, &_counter, *_params,
1300
0
                                                         range, _file_slot_descs, _io_ctx.get());
1301
0
                init_status = ((ArrowStreamReader*)(_cur_reader.get()))->init_reader();
1302
0
            }
1303
98
            break;
1304
800
        }
1305
0
        default:
1306
0
            return Status::NotSupported("Not supported create reader for file format: {}.",
1307
0
                                        to_string(_params->format_type));
1308
68.6k
        }
1309
1310
68.6k
        if (_cur_reader == nullptr) {
1311
1
            return Status::NotSupported(
1312
1
                    "Not supported create reader for table format: {} / file format: {}.",
1313
1
                    range.__isset.table_format_params ? range.table_format_params.table_format_type
1314
1
                                                      : "NotSet",
1315
1
                    to_string(_params->format_type));
1316
1
        }
1317
68.6k
        COUNTER_UPDATE(_file_counter, 1);
1318
        // The FileScanner for external table may try to open not exist files,
1319
        // Because FE file cache for external table may out of date.
1320
        // So, NOT_FOUND for FileScanner is not a fail case.
1321
        // Will remove this after file reader refactor.
1322
68.6k
        if (init_status.is<END_OF_FILE>()) {
1323
0
            COUNTER_UPDATE(_empty_file_counter, 1);
1324
0
            continue;
1325
68.6k
        } else if (init_status.is<ErrorCode::NOT_FOUND>()) {
1326
0
            if (config::ignore_not_found_file_in_external_table) {
1327
0
                COUNTER_UPDATE(_not_found_file_counter, 1);
1328
0
                continue;
1329
0
            }
1330
0
            return Status::InternalError("failed to find reader, err: {}", init_status.to_string());
1331
68.6k
        } else if (!init_status.ok()) {
1332
0
            return Status::InternalError("failed to init reader, err: {}", init_status.to_string());
1333
0
        }
1334
1335
68.6k
        _cur_reader->set_push_down_agg_type(_get_push_down_agg_type());
1336
68.6k
        if (_get_push_down_agg_type() == TPushAggOp::type::COUNT &&
1337
68.6k
            range.__isset.table_format_params &&
1338
68.6k
            range.table_format_params.table_level_row_count >= 0) {
1339
            // This is a table level count push down operation, no need to call
1340
            // _set_fill_or_truncate_columns.
1341
            // in _set_fill_or_truncate_columns, we will use [range.start_offset, end offset]
1342
            // to filter the row group. But if this is count push down, the offset is undefined,
1343
            // causing incorrect row group filter and may return empty result.
1344
68.4k
        } else {
1345
68.4k
            Status status = _set_fill_or_truncate_columns(need_to_get_parsed_schema);
1346
68.4k
            if (status.is<END_OF_FILE>()) { // all parquet row groups are filtered
1347
4
                continue;
1348
68.4k
            } else if (!status.ok()) {
1349
0
                return Status::InternalError("failed to set_fill_or_truncate_columns, err: {}",
1350
0
                                             status.to_string());
1351
0
            }
1352
68.4k
        }
1353
68.6k
        _cur_reader_eof = false;
1354
68.6k
        _init_adaptive_batch_size_state(format_type);
1355
68.6k
        break;
1356
68.6k
    }
1357
68.6k
    return Status::OK();
1358
152k
}
1359
1360
Status FileScanner::_init_parquet_reader(std::unique_ptr<ParquetReader>&& parquet_reader,
1361
33.9k
                                         FileMetaCache* file_meta_cache_ptr) {
1362
33.9k
    const TFileRangeDesc& range = _current_range;
1363
33.9k
    Status init_status = Status::OK();
1364
1365
33.9k
    phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> slot_id_to_predicates =
1366
33.9k
            _local_state
1367
33.9k
                    ? _local_state->cast<FileScanLocalState>()._slot_id_to_predicates
1368
33.9k
                    : phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> {};
1369
33.9k
    if (range.__isset.table_format_params &&
1370
33.9k
        range.table_format_params.table_format_type == "iceberg") {
1371
18.0k
        std::unique_ptr<IcebergParquetReader> iceberg_reader = IcebergParquetReader::create_unique(
1372
18.0k
                std::move(parquet_reader), _profile, _state, *_params, range, _kv_cache,
1373
18.0k
                _io_ctx.get(), file_meta_cache_ptr);
1374
18.0k
        if (_need_iceberg_rowid_column) {
1375
78
            iceberg_reader->set_need_row_id_column(true);
1376
78
        }
1377
18.0k
        if (_row_lineage_columns.row_id_column_idx != -1 ||
1378
18.0k
            _row_lineage_columns.last_updated_sequence_number_column_idx != -1) {
1379
236
            std::shared_ptr<RowLineageColumns> row_lineage_columns;
1380
236
            row_lineage_columns = std::make_shared<RowLineageColumns>();
1381
236
            row_lineage_columns->row_id_column_idx = _row_lineage_columns.row_id_column_idx;
1382
236
            row_lineage_columns->last_updated_sequence_number_column_idx =
1383
236
                    _row_lineage_columns.last_updated_sequence_number_column_idx;
1384
236
            iceberg_reader->set_row_lineage_columns(std::move(row_lineage_columns));
1385
236
        }
1386
18.0k
        iceberg_reader->set_push_down_agg_type(_get_push_down_agg_type());
1387
1388
18.0k
        init_status = iceberg_reader->init_reader(
1389
18.0k
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1390
18.0k
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1391
18.0k
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1392
18.0k
                &_slot_id_to_filter_conjuncts);
1393
18.0k
        _cur_reader = std::move(iceberg_reader);
1394
18.0k
    } else if (range.__isset.table_format_params &&
1395
15.9k
               range.table_format_params.table_format_type == "paimon") {
1396
2.48k
        std::unique_ptr<PaimonParquetReader> paimon_reader = PaimonParquetReader::create_unique(
1397
2.48k
                std::move(parquet_reader), _profile, _state, *_params, range, _kv_cache,
1398
2.48k
                _io_ctx.get(), file_meta_cache_ptr);
1399
2.48k
        init_status = paimon_reader->init_reader(
1400
2.48k
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1401
2.48k
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1402
2.48k
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1403
2.48k
                &_slot_id_to_filter_conjuncts);
1404
2.48k
        RETURN_IF_ERROR(paimon_reader->init_row_filters());
1405
2.48k
        _cur_reader = std::move(paimon_reader);
1406
13.5k
    } else if (range.__isset.table_format_params &&
1407
13.5k
               range.table_format_params.table_format_type == "hudi") {
1408
0
        std::unique_ptr<HudiParquetReader> hudi_reader = HudiParquetReader::create_unique(
1409
0
                std::move(parquet_reader), _profile, _state, *_params, range, _io_ctx.get(),
1410
0
                file_meta_cache_ptr);
1411
0
        init_status = hudi_reader->init_reader(
1412
0
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1413
0
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1414
0
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1415
0
                &_slot_id_to_filter_conjuncts);
1416
0
        _cur_reader = std::move(hudi_reader);
1417
13.5k
    } else if (range.table_format_params.table_format_type == "hive") {
1418
10.4k
        auto hive_reader = HiveParquetReader::create_unique(std::move(parquet_reader), _profile,
1419
10.4k
                                                            _state, *_params, range, _io_ctx.get(),
1420
10.4k
                                                            &_is_file_slot, file_meta_cache_ptr);
1421
10.4k
        init_status = hive_reader->init_reader(
1422
10.4k
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1423
10.4k
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1424
10.4k
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1425
10.4k
                &_slot_id_to_filter_conjuncts);
1426
10.4k
        _cur_reader = std::move(hive_reader);
1427
10.4k
    } else if (range.table_format_params.table_format_type == "tvf") {
1428
2.99k
        const FieldDescriptor* parquet_meta = nullptr;
1429
2.99k
        RETURN_IF_ERROR(parquet_reader->get_file_metadata_schema(&parquet_meta));
1430
2.99k
        DCHECK(parquet_meta != nullptr);
1431
1432
        // TVF will first `get_parsed_schema` to obtain file information from BE, and FE will convert
1433
        // the column names to lowercase (because the query process is case-insensitive),
1434
        // so the lowercase file column names are used here to match the read columns.
1435
2.99k
        std::shared_ptr<TableSchemaChangeHelper::Node> tvf_info_node = nullptr;
1436
2.99k
        RETURN_IF_ERROR(TableSchemaChangeHelper::BuildTableInfoUtil::by_parquet_name(
1437
2.99k
                _real_tuple_desc, *parquet_meta, tvf_info_node));
1438
2.99k
        init_status = parquet_reader->init_reader(
1439
2.99k
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1440
2.99k
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1441
2.99k
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1442
2.99k
                &_slot_id_to_filter_conjuncts, tvf_info_node);
1443
2.99k
        _cur_reader = std::move(parquet_reader);
1444
2.99k
    } else if (_is_load) {
1445
4
        const FieldDescriptor* parquet_meta = nullptr;
1446
4
        RETURN_IF_ERROR(parquet_reader->get_file_metadata_schema(&parquet_meta));
1447
4
        DCHECK(parquet_meta != nullptr);
1448
1449
        // Load is case-insensitive, so you to match the columns in the file.
1450
4
        std::map<std::string, std::string> file_lower_name_to_native;
1451
18
        for (const auto& parquet_field : parquet_meta->get_fields_schema()) {
1452
18
            file_lower_name_to_native.emplace(doris::to_lower(parquet_field.name),
1453
18
                                              parquet_field.name);
1454
18
        }
1455
4
        auto load_info_node = std::make_shared<TableSchemaChangeHelper::StructNode>();
1456
18
        for (const auto slot : _real_tuple_desc->slots()) {
1457
18
            if (file_lower_name_to_native.contains(slot->col_name())) {
1458
18
                load_info_node->add_children(slot->col_name(),
1459
18
                                             file_lower_name_to_native[slot->col_name()],
1460
18
                                             TableSchemaChangeHelper::ConstNode::get_instance());
1461
                // For Load, `file_scanner` will create block columns using the file type,
1462
                // there is no schema change when reading inside the struct,
1463
                // so use `TableSchemaChangeHelper::ConstNode`.
1464
18
            } else {
1465
0
                load_info_node->add_not_exist_children(slot->col_name());
1466
0
            }
1467
18
        }
1468
1469
4
        init_status = parquet_reader->init_reader(
1470
4
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1471
4
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1472
4
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1473
4
                &_slot_id_to_filter_conjuncts, load_info_node);
1474
4
        _cur_reader = std::move(parquet_reader);
1475
4
    }
1476
1477
33.9k
    return init_status;
1478
33.9k
}
1479
1480
Status FileScanner::_init_orc_reader(std::unique_ptr<OrcReader>&& orc_reader,
1481
26.3k
                                     FileMetaCache* file_meta_cache_ptr) {
1482
26.3k
    const TFileRangeDesc& range = _current_range;
1483
26.3k
    Status init_status = Status::OK();
1484
1485
26.3k
    if (range.__isset.table_format_params &&
1486
26.3k
        range.table_format_params.table_format_type == "transactional_hive") {
1487
292
        std::unique_ptr<TransactionalHiveReader> tran_orc_reader =
1488
292
                TransactionalHiveReader::create_unique(std::move(orc_reader), _profile, _state,
1489
292
                                                       *_params, range, _io_ctx.get(),
1490
292
                                                       file_meta_cache_ptr);
1491
292
        init_status = tran_orc_reader->init_reader(
1492
292
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
1493
292
                _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1494
292
                &_slot_id_to_filter_conjuncts);
1495
292
        RETURN_IF_ERROR(tran_orc_reader->init_row_filters());
1496
292
        _cur_reader = std::move(tran_orc_reader);
1497
26.0k
    } else if (range.__isset.table_format_params &&
1498
26.0k
               range.table_format_params.table_format_type == "iceberg") {
1499
6.47k
        std::unique_ptr<IcebergOrcReader> iceberg_reader = IcebergOrcReader::create_unique(
1500
6.47k
                std::move(orc_reader), _profile, _state, *_params, range, _kv_cache, _io_ctx.get(),
1501
6.47k
                file_meta_cache_ptr);
1502
6.47k
        if (_need_iceberg_rowid_column) {
1503
78
            iceberg_reader->set_need_row_id_column(true);
1504
78
        }
1505
6.47k
        if (_row_lineage_columns.row_id_column_idx != -1 ||
1506
6.47k
            _row_lineage_columns.last_updated_sequence_number_column_idx != -1) {
1507
236
            std::shared_ptr<RowLineageColumns> row_lineage_columns;
1508
236
            row_lineage_columns = std::make_shared<RowLineageColumns>();
1509
236
            row_lineage_columns->row_id_column_idx = _row_lineage_columns.row_id_column_idx;
1510
236
            row_lineage_columns->last_updated_sequence_number_column_idx =
1511
236
                    _row_lineage_columns.last_updated_sequence_number_column_idx;
1512
236
            iceberg_reader->set_row_lineage_columns(std::move(row_lineage_columns));
1513
236
        }
1514
6.47k
        init_status = iceberg_reader->init_reader(
1515
6.47k
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
1516
6.47k
                _default_val_row_desc.get(), _col_name_to_slot_id,
1517
6.47k
                &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
1518
6.47k
        _cur_reader = std::move(iceberg_reader);
1519
19.5k
    } else if (range.__isset.table_format_params &&
1520
19.5k
               range.table_format_params.table_format_type == "paimon") {
1521
2.08k
        std::unique_ptr<PaimonOrcReader> paimon_reader = PaimonOrcReader::create_unique(
1522
2.08k
                std::move(orc_reader), _profile, _state, *_params, range, _kv_cache, _io_ctx.get(),
1523
2.08k
                file_meta_cache_ptr);
1524
1525
2.08k
        init_status = paimon_reader->init_reader(
1526
2.08k
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
1527
2.08k
                _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1528
2.08k
                &_slot_id_to_filter_conjuncts);
1529
2.08k
        RETURN_IF_ERROR(paimon_reader->init_row_filters());
1530
2.08k
        _cur_reader = std::move(paimon_reader);
1531
17.5k
    } else if (range.__isset.table_format_params &&
1532
17.5k
               range.table_format_params.table_format_type == "hudi") {
1533
0
        std::unique_ptr<HudiOrcReader> hudi_reader =
1534
0
                HudiOrcReader::create_unique(std::move(orc_reader), _profile, _state, *_params,
1535
0
                                             range, _io_ctx.get(), file_meta_cache_ptr);
1536
1537
0
        init_status = hudi_reader->init_reader(
1538
0
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
1539
0
                _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1540
0
                &_slot_id_to_filter_conjuncts);
1541
0
        _cur_reader = std::move(hudi_reader);
1542
17.5k
    } else if (range.__isset.table_format_params &&
1543
17.5k
               range.table_format_params.table_format_type == "hive") {
1544
15.3k
        std::unique_ptr<HiveOrcReader> hive_reader = HiveOrcReader::create_unique(
1545
15.3k
                std::move(orc_reader), _profile, _state, *_params, range, _io_ctx.get(),
1546
15.3k
                &_is_file_slot, file_meta_cache_ptr);
1547
1548
15.3k
        init_status = hive_reader->init_reader(
1549
15.3k
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
1550
15.3k
                _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1551
15.3k
                &_slot_id_to_filter_conjuncts);
1552
15.3k
        _cur_reader = std::move(hive_reader);
1553
15.3k
    } else if (range.__isset.table_format_params &&
1554
2.13k
               range.table_format_params.table_format_type == "tvf") {
1555
2.13k
        const orc::Type* orc_type_ptr = nullptr;
1556
2.13k
        RETURN_IF_ERROR(orc_reader->get_file_type(&orc_type_ptr));
1557
1558
2.13k
        std::shared_ptr<TableSchemaChangeHelper::Node> tvf_info_node = nullptr;
1559
2.13k
        RETURN_IF_ERROR(TableSchemaChangeHelper::BuildTableInfoUtil::by_orc_name(
1560
2.13k
                _real_tuple_desc, orc_type_ptr, tvf_info_node));
1561
2.13k
        init_status = orc_reader->init_reader(
1562
2.13k
                &_file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, false,
1563
2.13k
                _real_tuple_desc, _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1564
2.13k
                &_slot_id_to_filter_conjuncts, tvf_info_node);
1565
2.13k
        _cur_reader = std::move(orc_reader);
1566
2.13k
    } else if (_is_load) {
1567
2
        const orc::Type* orc_type_ptr = nullptr;
1568
2
        RETURN_IF_ERROR(orc_reader->get_file_type(&orc_type_ptr));
1569
1570
2
        std::map<std::string, std::string> file_lower_name_to_native;
1571
16
        for (uint64_t idx = 0; idx < orc_type_ptr->getSubtypeCount(); idx++) {
1572
14
            file_lower_name_to_native.emplace(doris::to_lower(orc_type_ptr->getFieldName(idx)),
1573
14
                                              orc_type_ptr->getFieldName(idx));
1574
14
        }
1575
1576
2
        auto load_info_node = std::make_shared<TableSchemaChangeHelper::StructNode>();
1577
14
        for (const auto slot : _real_tuple_desc->slots()) {
1578
14
            if (file_lower_name_to_native.contains(slot->col_name())) {
1579
14
                load_info_node->add_children(slot->col_name(),
1580
14
                                             file_lower_name_to_native[slot->col_name()],
1581
14
                                             TableSchemaChangeHelper::ConstNode::get_instance());
1582
14
            } else {
1583
0
                load_info_node->add_not_exist_children(slot->col_name());
1584
0
            }
1585
14
        }
1586
2
        init_status = orc_reader->init_reader(
1587
2
                &_file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, false,
1588
2
                _real_tuple_desc, _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1589
2
                &_slot_id_to_filter_conjuncts, load_info_node);
1590
2
        _cur_reader = std::move(orc_reader);
1591
2
    }
1592
1593
26.3k
    return init_status;
1594
26.3k
}
1595
1596
71.6k
Status FileScanner::_set_fill_or_truncate_columns(bool need_to_get_parsed_schema) {
1597
71.6k
    _missing_cols.clear();
1598
71.6k
    _slot_lower_name_to_col_type.clear();
1599
71.6k
    std::unordered_map<std::string, DataTypePtr> name_to_col_type;
1600
71.6k
    RETURN_IF_ERROR(_cur_reader->get_columns(&name_to_col_type, &_missing_cols));
1601
71.6k
    if (_need_iceberg_rowid_column && _current_range.__isset.table_format_params &&
1602
71.6k
        _current_range.table_format_params.table_format_type == "iceberg") {
1603
156
        _missing_cols.erase(BeConsts::ICEBERG_ROWID_COL);
1604
156
        _missing_cols.erase(to_lower(BeConsts::ICEBERG_ROWID_COL));
1605
156
    }
1606
2.30M
    for (const auto& [col_name, col_type] : name_to_col_type) {
1607
2.30M
        auto col_name_lower = to_lower(col_name);
1608
2.30M
        if (_partition_col_descs.contains(col_name_lower)) {
1609
            /*
1610
             * `_slot_lower_name_to_col_type` is used by `_init_src_block` and `_cast_to_input_block` during LOAD to
1611
             * generate columns of the corresponding type, which records the columns existing in the file.
1612
             *
1613
             * When a column in `COLUMNS FROM PATH` exists in a file column, the column type in the block will
1614
             * not match the slot type in `_output_tuple_desc`, causing an error when
1615
             * Serde `deserialize_one_cell_from_json` fills the partition values.
1616
             *
1617
             * So for partition column not need fill _slot_lower_name_to_col_type.
1618
             */
1619
3.39k
            continue;
1620
3.39k
        }
1621
2.29M
        _slot_lower_name_to_col_type.emplace(col_name_lower, col_type);
1622
2.29M
    }
1623
1624
71.6k
    if (!_fill_partition_from_path && config::enable_iceberg_partition_column_fallback) {
1625
        // check if the cols of _partition_col_descs are in _missing_cols
1626
        // if so, set _fill_partition_from_path to true and remove the col from _missing_cols
1627
3.33k
        for (const auto& [col_name, col_type] : _partition_col_descs) {
1628
3.33k
            if (_missing_cols.contains(col_name)) {
1629
0
                _fill_partition_from_path = true;
1630
0
                _missing_cols.erase(col_name);
1631
0
            }
1632
3.33k
        }
1633
2.46k
    }
1634
1635
71.6k
    RETURN_IF_ERROR(_generate_missing_columns());
1636
71.6k
    if (_fill_partition_from_path) {
1637
69.1k
        RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs));
1638
69.1k
    } else {
1639
        // If the partition columns are not from path, we only fill the missing columns.
1640
2.47k
        RETURN_IF_ERROR(_cur_reader->set_fill_columns({}, _missing_col_descs));
1641
2.47k
    }
1642
71.6k
    if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
1643
0
        fmt::memory_buffer col_buf;
1644
0
        for (auto& col : _missing_cols) {
1645
0
            fmt::format_to(col_buf, " {}", col);
1646
0
        }
1647
0
        VLOG_NOTICE << fmt::format("Unknown columns:{} in file {}", fmt::to_string(col_buf),
1648
0
                                   _current_range.path);
1649
0
    }
1650
1651
71.6k
    RETURN_IF_ERROR(_generate_truncate_columns(need_to_get_parsed_schema));
1652
71.6k
    return Status::OK();
1653
71.6k
}
1654
1655
71.5k
Status FileScanner::_generate_truncate_columns(bool need_to_get_parsed_schema) {
1656
71.5k
    _source_file_col_name_types.clear();
1657
    //  The col names and types of source file, such as parquet, orc files.
1658
71.5k
    if (_state->query_options().truncate_char_or_varchar_columns && need_to_get_parsed_schema) {
1659
8
        std::vector<std::string> source_file_col_names;
1660
8
        std::vector<DataTypePtr> source_file_col_types;
1661
8
        Status status =
1662
8
                _cur_reader->get_parsed_schema(&source_file_col_names, &source_file_col_types);
1663
8
        if (!status.ok() && status.code() != TStatusCode::NOT_IMPLEMENTED_ERROR) {
1664
0
            return status;
1665
0
        }
1666
8
        DCHECK_EQ(source_file_col_names.size(), source_file_col_types.size());
1667
32
        for (int i = 0; i < source_file_col_names.size(); ++i) {
1668
24
            _source_file_col_name_types[to_lower(source_file_col_names[i])] =
1669
24
                    source_file_col_types[i];
1670
24
        }
1671
8
    }
1672
71.5k
    return Status::OK();
1673
71.5k
}
1674
1675
3.25k
Status FileScanner::prepare_for_read_lines(const TFileRangeDesc& range) {
1676
3.25k
    _current_range = range;
1677
1678
3.25k
    _file_cache_statistics.reset(new io::FileCacheStatistics());
1679
3.25k
    _file_reader_stats.reset(new io::FileReaderStats());
1680
1681
3.25k
    _file_read_bytes_counter =
1682
3.25k
            ADD_COUNTER_WITH_LEVEL(_profile, FileReadBytesProfile, TUnit::BYTES, 1);
1683
3.25k
    _file_read_time_counter = ADD_TIMER_WITH_LEVEL(_profile, FileReadTimeProfile, 1);
1684
1685
3.25k
    RETURN_IF_ERROR(_init_io_ctx());
1686
3.25k
    _io_ctx->file_cache_stats = _file_cache_statistics.get();
1687
3.25k
    _io_ctx->file_reader_stats = _file_reader_stats.get();
1688
3.25k
    _default_val_row_desc.reset(new RowDescriptor((TupleDescriptor*)_real_tuple_desc));
1689
3.25k
    RETURN_IF_ERROR(_init_expr_ctxes());
1690
1691
    // Since only one column is read from the file, there is no need to filter, so set these variables to empty.
1692
3.25k
    _push_down_conjuncts.clear();
1693
3.25k
    _not_single_slot_filter_conjuncts.clear();
1694
3.25k
    _slot_id_to_filter_conjuncts.clear();
1695
3.25k
    _kv_cache = nullptr;
1696
3.25k
    return Status::OK();
1697
3.25k
}
1698
1699
Status FileScanner::read_lines_from_range(const TFileRangeDesc& range,
1700
                                          const std::list<int64_t>& row_ids, Block* result_block,
1701
                                          const ExternalFileMappingInfo& external_info,
1702
3.24k
                                          int64_t* init_reader_ms, int64_t* get_block_ms) {
1703
3.24k
    _current_range = range;
1704
3.24k
    RETURN_IF_ERROR(_generate_partition_columns());
1705
1706
3.24k
    TFileFormatType::type format_type = _get_current_format_type();
1707
3.24k
    Status init_status = Status::OK();
1708
1709
3.24k
    auto file_meta_cache_ptr = external_info.enable_file_meta_cache
1710
3.25k
                                       ? ExecEnv::GetInstance()->file_meta_cache()
1711
18.4E
                                       : nullptr;
1712
1713
3.24k
    RETURN_IF_ERROR(scope_timer_run(
1714
3.24k
            [&]() -> Status {
1715
3.24k
                switch (format_type) {
1716
3.24k
                case TFileFormatType::FORMAT_PARQUET: {
1717
3.24k
                    std::unique_ptr<ParquetReader> parquet_reader = ParquetReader::create_unique(
1718
3.24k
                            _profile, *_params, range, 1, &_state->timezone_obj(), _io_ctx.get(),
1719
3.24k
                            _state, file_meta_cache_ptr, false);
1720
1721
3.24k
                    RETURN_IF_ERROR(parquet_reader->read_by_rows(row_ids));
1722
3.24k
                    RETURN_IF_ERROR(
1723
3.24k
                            _init_parquet_reader(std::move(parquet_reader), file_meta_cache_ptr));
1724
3.24k
                    break;
1725
3.24k
                }
1726
3.24k
                case TFileFormatType::FORMAT_ORC: {
1727
3.24k
                    std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
1728
3.24k
                            _profile, _state, *_params, range, 1, _state->timezone(), _io_ctx.get(),
1729
3.24k
                            file_meta_cache_ptr, false);
1730
1731
3.24k
                    RETURN_IF_ERROR(orc_reader->read_by_rows(row_ids));
1732
3.24k
                    RETURN_IF_ERROR(_init_orc_reader(std::move(orc_reader), file_meta_cache_ptr));
1733
3.24k
                    break;
1734
3.24k
                }
1735
3.24k
                default: {
1736
3.24k
                    return Status::NotSupported(
1737
3.24k
                            "Not support create lines reader for file format: {},"
1738
3.24k
                            "only support parquet and orc.",
1739
3.24k
                            to_string(_params->format_type));
1740
3.24k
                }
1741
3.24k
                }
1742
3.24k
                return Status::OK();
1743
3.24k
            },
1744
3.24k
            init_reader_ms));
1745
1746
3.24k
    RETURN_IF_ERROR(_set_fill_or_truncate_columns(true));
1747
3.24k
    _cur_reader_eof = false;
1748
1749
3.24k
    RETURN_IF_ERROR(scope_timer_run(
1750
3.24k
            [&]() -> Status {
1751
3.24k
                while (!_cur_reader_eof) {
1752
3.24k
                    bool eof = false;
1753
3.24k
                    RETURN_IF_ERROR(_get_block_impl(_state, result_block, &eof));
1754
3.24k
                }
1755
3.24k
                return Status::OK();
1756
3.24k
            },
1757
3.24k
            get_block_ms));
1758
1759
3.24k
    _cur_reader->collect_profile_before_close();
1760
3.24k
    RETURN_IF_ERROR(_cur_reader->close());
1761
1762
3.24k
    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
1763
3.24k
    COUNTER_UPDATE(_file_read_time_counter, _file_reader_stats->read_time_ns);
1764
3.24k
    return Status::OK();
1765
3.24k
}
1766
1767
10.5k
Status FileScanner::_generate_partition_columns() {
1768
10.5k
    _partition_col_descs.clear();
1769
10.5k
    _partition_value_is_null.clear();
1770
10.5k
    const TFileRangeDesc& range = _current_range;
1771
10.5k
    if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
1772
14.8k
        for (const auto& slot_desc : _partition_slot_descs) {
1773
14.8k
            if (slot_desc) {
1774
14.8k
                auto it = _partition_slot_index_map.find(slot_desc->id());
1775
14.8k
                if (it == std::end(_partition_slot_index_map)) {
1776
0
                    return Status::InternalError("Unknown source slot descriptor, slot_id={}",
1777
0
                                                 slot_desc->id());
1778
0
                }
1779
14.8k
                const std::string& column_from_path = range.columns_from_path[it->second];
1780
14.8k
                _partition_col_descs.emplace(slot_desc->col_name(),
1781
14.8k
                                             std::make_tuple(column_from_path, slot_desc));
1782
14.8k
                if (range.__isset.columns_from_path_is_null) {
1783
3.71k
                    _partition_value_is_null.emplace(slot_desc->col_name(),
1784
3.71k
                                                     range.columns_from_path_is_null[it->second]);
1785
3.71k
                }
1786
14.8k
            }
1787
14.8k
        }
1788
8.49k
    }
1789
10.5k
    return Status::OK();
1790
10.5k
}
1791
1792
71.7k
Status FileScanner::_generate_missing_columns() {
1793
71.7k
    _missing_col_descs.clear();
1794
71.7k
    if (!_missing_cols.empty()) {
1795
23.7k
        for (auto* slot_desc : _real_tuple_desc->slots()) {
1796
23.7k
            if (!_missing_cols.contains(slot_desc->col_name())) {
1797
16.4k
                continue;
1798
16.4k
            }
1799
1800
7.30k
            auto it = _col_default_value_ctx.find(slot_desc->col_name());
1801
7.30k
            if (it == _col_default_value_ctx.end()) {
1802
0
                return Status::InternalError("failed to find default value expr for slot: {}",
1803
0
                                             slot_desc->col_name());
1804
0
            }
1805
7.30k
            _missing_col_descs.emplace(slot_desc->col_name(), it->second);
1806
7.30k
        }
1807
5.36k
    }
1808
71.7k
    return Status::OK();
1809
71.7k
}
1810
1811
50.9k
Status FileScanner::_init_expr_ctxes() {
1812
50.9k
    std::map<SlotId, int> full_src_index_map;
1813
50.9k
    std::map<SlotId, SlotDescriptor*> full_src_slot_map;
1814
50.9k
    std::map<std::string, int> partition_name_to_key_index_map;
1815
50.9k
    int index = 0;
1816
303k
    for (const auto& slot_desc : _real_tuple_desc->slots()) {
1817
303k
        full_src_slot_map.emplace(slot_desc->id(), slot_desc);
1818
303k
        full_src_index_map.emplace(slot_desc->id(), index++);
1819
303k
    }
1820
1821
    // For external table query, find the index of column in path.
1822
    // Because query doesn't always search for all columns in a table
1823
    // and the order of selected columns is random.
1824
    // All ranges in _ranges vector should have identical columns_from_path_keys
1825
    // because they are all file splits for the same external table.
1826
    // So here use the first element of _ranges to fill the partition_name_to_key_index_map
1827
50.9k
    if (_current_range.__isset.columns_from_path_keys) {
1828
50.5k
        std::vector<std::string> key_map = _current_range.columns_from_path_keys;
1829
50.5k
        if (!key_map.empty()) {
1830
30.5k
            for (size_t i = 0; i < key_map.size(); i++) {
1831
19.8k
                partition_name_to_key_index_map.emplace(key_map[i], i);
1832
19.8k
            }
1833
10.6k
        }
1834
50.5k
    }
1835
1836
50.9k
    _num_of_columns_from_file = _params->num_of_columns_from_file;
1837
1838
303k
    for (const auto& slot_info : _params->required_slots) {
1839
303k
        auto slot_id = slot_info.slot_id;
1840
303k
        auto it = full_src_slot_map.find(slot_id);
1841
303k
        if (it == std::end(full_src_slot_map)) {
1842
0
            return Status::InternalError(
1843
0
                    fmt::format("Unknown source slot descriptor, slot_id={}", slot_id));
1844
0
        }
1845
303k
        if (it->second->col_name().starts_with(BeConsts::GLOBAL_ROWID_COL)) {
1846
4.26k
            _row_id_column_iterator_pair.second = _default_val_row_desc->get_column_id(slot_id);
1847
4.26k
            continue;
1848
4.26k
        }
1849
299k
        if (it->second->col_name() == BeConsts::ICEBERG_ROWID_COL) {
1850
156
            _need_iceberg_rowid_column = true;
1851
156
            continue;
1852
156
        }
1853
1854
299k
        if (it->second->col_name() == IcebergTableReader::ROW_LINEAGE_ROW_ID) {
1855
436
            _row_lineage_columns.row_id_column_idx = _default_val_row_desc->get_column_id(slot_id);
1856
436
        }
1857
1858
299k
        if (it->second->col_name() == IcebergTableReader::ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
1859
368
            _row_lineage_columns.last_updated_sequence_number_column_idx =
1860
368
                    _default_val_row_desc->get_column_id(slot_id);
1861
368
        }
1862
1863
299k
        if (slot_info.is_file_slot) {
1864
288k
            _is_file_slot.emplace(slot_id);
1865
288k
            _file_slot_descs.emplace_back(it->second);
1866
288k
            _file_col_names.push_back(it->second->col_name());
1867
288k
        }
1868
1869
299k
        if (partition_name_to_key_index_map.contains(it->second->col_name())) {
1870
14.2k
            if (slot_info.is_file_slot) {
1871
                // If there is slot which is both a partition column and a file column,
1872
                // we should not fill the partition column from path.
1873
3.60k
                _fill_partition_from_path = false;
1874
10.6k
            } else if (!_fill_partition_from_path) {
1875
                // This should not happen
1876
0
                return Status::InternalError(
1877
0
                        "Partition column {} is not a file column, but there is already a column "
1878
0
                        "which is both a partition column and a file column.",
1879
0
                        it->second->col_name());
1880
0
            }
1881
14.2k
            _partition_slot_descs.emplace_back(it->second);
1882
14.2k
            if (_is_load) {
1883
0
                auto iti = full_src_index_map.find(slot_id);
1884
0
                _partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file);
1885
14.2k
            } else {
1886
14.2k
                auto kit = partition_name_to_key_index_map.find(it->second->col_name());
1887
14.2k
                _partition_slot_index_map.emplace(slot_id, kit->second);
1888
14.2k
            }
1889
14.2k
        }
1890
299k
    }
1891
1892
    // set column name to default value expr map
1893
304k
    for (auto* slot_desc : _real_tuple_desc->slots()) {
1894
304k
        VExprContextSPtr ctx;
1895
304k
        auto it = _params->default_value_of_src_slot.find(slot_desc->id());
1896
304k
        if (it != std::end(_params->default_value_of_src_slot)) {
1897
300k
            if (!it->second.nodes.empty()) {
1898
274k
                RETURN_IF_ERROR(VExpr::create_expr_tree(it->second, ctx));
1899
274k
                RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc));
1900
274k
                RETURN_IF_ERROR(ctx->open(_state));
1901
274k
            }
1902
            // if expr is empty, the default value will be null
1903
300k
            _col_default_value_ctx.emplace(slot_desc->col_name(), ctx);
1904
300k
        }
1905
304k
    }
1906
1907
50.9k
    if (_is_load) {
1908
        // follow desc expr map is only for load task.
1909
378
        bool has_slot_id_map = _params->__isset.dest_sid_to_src_sid_without_trans;
1910
378
        int idx = 0;
1911
5.93k
        for (auto* slot_desc : _output_tuple_desc->slots()) {
1912
5.93k
            auto it = _params->expr_of_dest_slot.find(slot_desc->id());
1913
5.93k
            if (it == std::end(_params->expr_of_dest_slot)) {
1914
0
                return Status::InternalError("No expr for dest slot, id={}, name={}",
1915
0
                                             slot_desc->id(), slot_desc->col_name());
1916
0
            }
1917
1918
5.93k
            VExprContextSPtr ctx;
1919
5.93k
            if (!it->second.nodes.empty()) {
1920
5.93k
                RETURN_IF_ERROR(VExpr::create_expr_tree(it->second, ctx));
1921
5.93k
                RETURN_IF_ERROR(ctx->prepare(_state, *_src_row_desc));
1922
5.93k
                RETURN_IF_ERROR(ctx->open(_state));
1923
5.93k
            }
1924
5.93k
            _dest_vexpr_ctx.emplace_back(ctx);
1925
5.93k
            _dest_slot_name_to_idx[slot_desc->col_name()] = idx++;
1926
1927
5.93k
            if (has_slot_id_map) {
1928
5.93k
                auto it1 = _params->dest_sid_to_src_sid_without_trans.find(slot_desc->id());
1929
5.93k
                if (it1 == std::end(_params->dest_sid_to_src_sid_without_trans)) {
1930
540
                    _src_slot_descs_order_by_dest.emplace_back(nullptr);
1931
5.39k
                } else {
1932
5.39k
                    auto _src_slot_it = full_src_slot_map.find(it1->second);
1933
5.39k
                    if (_src_slot_it == std::end(full_src_slot_map)) {
1934
0
                        return Status::InternalError("No src slot {} in src slot descs",
1935
0
                                                     it1->second);
1936
0
                    }
1937
5.39k
                    _dest_slot_to_src_slot_index.emplace(_src_slot_descs_order_by_dest.size(),
1938
5.39k
                                                         full_src_index_map[_src_slot_it->first]);
1939
5.39k
                    _src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second);
1940
5.39k
                }
1941
5.93k
            }
1942
5.93k
        }
1943
378
    }
1944
50.9k
    return Status::OK();
1945
50.9k
}
1946
1947
389k
bool FileScanner::_should_enable_condition_cache() {
1948
389k
    return _condition_cache_digest != 0 && !_is_load &&
1949
389k
           (!_conjuncts.empty() || !_push_down_conjuncts.empty());
1950
389k
}
1951
1952
152k
void FileScanner::_init_reader_condition_cache() {
1953
152k
    _condition_cache = nullptr;
1954
152k
    _condition_cache_ctx = nullptr;
1955
1956
152k
    if (!_should_enable_condition_cache() || !_cur_reader) {
1957
131k
        return;
1958
131k
    }
1959
1960
    // Disable condition cache when delete operations exist (e.g. Iceberg position/equality
1961
    // deletes, Hive ACID deletes). Cached granule results may become stale if delete files
1962
    // change between queries while the data file's cache key remains the same.
1963
20.7k
    if (_cur_reader->has_delete_operations()) {
1964
2.92k
        return;
1965
2.92k
    }
1966
1967
17.8k
    auto* cache = segment_v2::ConditionCache::instance();
1968
17.8k
    _condition_cache_key = segment_v2::ConditionCache::ExternalCacheKey(
1969
17.8k
            _current_range.path,
1970
17.8k
            _current_range.__isset.modification_time ? _current_range.modification_time : 0,
1971
18.4E
            _current_range.__isset.file_size ? _current_range.file_size : -1,
1972
17.8k
            _condition_cache_digest,
1973
18.4E
            _current_range.__isset.start_offset ? _current_range.start_offset : 0,
1974
18.4E
            _current_range.__isset.size ? _current_range.size : -1);
1975
1976
17.8k
    segment_v2::ConditionCacheHandle handle;
1977
17.8k
    auto condition_cache_hit = cache->lookup(_condition_cache_key, &handle);
1978
17.8k
    if (condition_cache_hit) {
1979
7.23k
        _condition_cache = handle.get_filter_result();
1980
7.23k
        _condition_cache_hit_count++;
1981
10.6k
    } else {
1982
        // Allocate cache pre-sized to total number of granules.
1983
        // We add +1 as a safety margin: when a file is split across multiple scanners
1984
        // and the first row of this scanner's range is not aligned to a granule boundary,
1985
        // the data may span one more granule than ceil(total_rows / GRANULE_SIZE).
1986
        // The extra element costs only 1 bit and never affects correctness (an extra
1987
        // false-granule beyond the actual data range won't overlap any real row range).
1988
10.6k
        int64_t total_rows = _cur_reader->get_total_rows();
1989
10.6k
        if (total_rows > 0) {
1990
8.77k
            size_t num_granules = (total_rows + ConditionCacheContext::GRANULE_SIZE - 1) /
1991
8.77k
                                  ConditionCacheContext::GRANULE_SIZE;
1992
8.77k
            _condition_cache = std::make_shared<std::vector<bool>>(num_granules + 1, false);
1993
8.77k
        }
1994
10.6k
    }
1995
1996
17.8k
    if (_condition_cache) {
1997
        // Create context to pass to readers (native readers use it; non-native readers ignore it)
1998
16.0k
        _condition_cache_ctx = std::make_shared<ConditionCacheContext>();
1999
16.0k
        _condition_cache_ctx->is_hit = condition_cache_hit;
2000
16.0k
        _condition_cache_ctx->filter_result = _condition_cache;
2001
16.0k
        _cur_reader->set_condition_cache_context(_condition_cache_ctx);
2002
16.0k
    }
2003
17.8k
}
2004
2005
236k
void FileScanner::_finalize_reader_condition_cache() {
2006
236k
    if (!_should_enable_condition_cache() || !_condition_cache_ctx ||
2007
236k
        _condition_cache_ctx->is_hit) {
2008
227k
        _condition_cache = nullptr;
2009
227k
        _condition_cache_ctx = nullptr;
2010
227k
        return;
2011
227k
    }
2012
    // Only store the cache if the reader was fully consumed. If the scan was
2013
    // truncated early (e.g. by LIMIT), the cache is incomplete — unread granules
2014
    // would remain false and cause surviving rows to be incorrectly skipped on HIT.
2015
8.76k
    if (!_cur_reader_eof) {
2016
62
        _condition_cache = nullptr;
2017
62
        _condition_cache_ctx = nullptr;
2018
62
        return;
2019
62
    }
2020
2021
8.70k
    auto* cache = segment_v2::ConditionCache::instance();
2022
8.70k
    cache->insert(_condition_cache_key, std::move(_condition_cache));
2023
8.70k
    _condition_cache = nullptr;
2024
8.70k
    _condition_cache_ctx = nullptr;
2025
8.70k
}
2026
2027
84.3k
Status FileScanner::close(RuntimeState* state) {
2028
84.3k
    if (!_try_close()) {
2029
0
        return Status::OK();
2030
0
    }
2031
2032
84.3k
    _finalize_reader_condition_cache();
2033
2034
84.3k
    if (_cur_reader) {
2035
628
        RETURN_IF_ERROR(_cur_reader->close());
2036
628
    }
2037
2038
84.3k
    RETURN_IF_ERROR(Scanner::close(state));
2039
84.3k
    return Status::OK();
2040
84.3k
}
2041
2042
84.3k
void FileScanner::try_stop() {
2043
84.3k
    Scanner::try_stop();
2044
84.3k
    if (_io_ctx) {
2045
84.3k
        _io_ctx->should_stop = true;
2046
84.3k
    }
2047
84.3k
}
2048
2049
84.3k
void FileScanner::update_realtime_counters() {
2050
84.3k
    FileScanLocalState* local_state = static_cast<FileScanLocalState*>(_local_state);
2051
2052
84.3k
    COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes);
2053
84.3k
    COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows);
2054
2055
84.3k
    _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_rows(
2056
84.3k
            _file_reader_stats->read_rows);
2057
84.3k
    _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes(
2058
84.3k
            _file_reader_stats->read_bytes);
2059
2060
84.3k
    int64_t delta_bytes_read_from_local =
2061
84.3k
            _file_cache_statistics->bytes_read_from_local - _last_bytes_read_from_local;
2062
84.3k
    int64_t delta_bytes_read_from_remote =
2063
84.3k
            _file_cache_statistics->bytes_read_from_remote - _last_bytes_read_from_remote;
2064
84.3k
    if (_file_cache_statistics->bytes_read_from_local == 0 &&
2065
84.3k
        _file_cache_statistics->bytes_read_from_remote == 0) {
2066
83.5k
        _state->get_query_ctx()
2067
83.5k
                ->resource_ctx()
2068
83.5k
                ->io_context()
2069
83.5k
                ->update_scan_bytes_from_remote_storage(_file_reader_stats->read_bytes);
2070
83.5k
        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
2071
83.5k
                _file_reader_stats->read_bytes);
2072
83.5k
    } else {
2073
758
        _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage(
2074
758
                delta_bytes_read_from_local);
2075
758
        _state->get_query_ctx()
2076
758
                ->resource_ctx()
2077
758
                ->io_context()
2078
758
                ->update_scan_bytes_from_remote_storage(delta_bytes_read_from_remote);
2079
758
        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
2080
758
                delta_bytes_read_from_local);
2081
758
        DorisMetrics::instance()->query_scan_bytes_from_remote->increment(
2082
758
                delta_bytes_read_from_remote);
2083
758
    }
2084
2085
84.3k
    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
2086
2087
84.3k
    DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes);
2088
84.3k
    DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows);
2089
2090
84.3k
    _file_reader_stats->read_bytes = 0;
2091
84.3k
    _file_reader_stats->read_rows = 0;
2092
2093
84.3k
    _last_bytes_read_from_local = _file_cache_statistics->bytes_read_from_local;
2094
84.3k
    _last_bytes_read_from_remote = _file_cache_statistics->bytes_read_from_remote;
2095
84.3k
}
2096
2097
84.2k
void FileScanner::_collect_profile_before_close() {
2098
84.2k
    Scanner::_collect_profile_before_close();
2099
84.2k
    if (config::enable_file_cache && _state->query_options().enable_file_cache &&
2100
84.2k
        _profile != nullptr) {
2101
1.06k
        io::FileCacheProfileReporter cache_profile(_profile);
2102
1.06k
        cache_profile.update(_file_cache_statistics.get());
2103
1.06k
        _state->get_query_ctx()->resource_ctx()->io_context()->update_bytes_write_into_cache(
2104
1.06k
                _file_cache_statistics->bytes_write_into_cache);
2105
1.06k
    }
2106
2107
84.2k
    if (_cur_reader != nullptr) {
2108
628
        _cur_reader->collect_profile_before_close();
2109
628
    }
2110
2111
84.2k
    FileScanLocalState* local_state = static_cast<FileScanLocalState*>(_local_state);
2112
84.2k
    COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes);
2113
84.2k
    COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows);
2114
2115
84.2k
    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
2116
84.2k
    COUNTER_UPDATE(_file_read_calls_counter, _file_reader_stats->read_calls);
2117
84.2k
    COUNTER_UPDATE(_file_read_time_counter, _file_reader_stats->read_time_ns);
2118
84.2k
    COUNTER_UPDATE(local_state->_condition_cache_hit_counter, _condition_cache_hit_count);
2119
84.2k
    if (_io_ctx) {
2120
84.2k
        COUNTER_UPDATE(local_state->_condition_cache_filtered_rows_counter,
2121
84.2k
                       _io_ctx->condition_cache_filtered_rows);
2122
84.2k
    }
2123
2124
84.2k
    DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes);
2125
84.2k
    DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows);
2126
84.2k
}
2127
2128
} // namespace doris