Coverage Report

Created: 2026-03-17 07:31

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/file_scanner.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/file_scanner.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Exprs_types.h>
22
#include <gen_cpp/Metrics_types.h>
23
#include <gen_cpp/Opcodes_types.h>
24
#include <gen_cpp/PaloInternalService_types.h>
25
#include <gen_cpp/PlanNodes_types.h>
26
#include <glog/logging.h>
27
28
#include <algorithm>
29
#include <boost/iterator/iterator_facade.hpp>
30
#include <map>
31
#include <ranges>
32
#include <tuple>
33
#include <unordered_map>
34
#include <utility>
35
36
#include "common/compiler_util.h" // IWYU pragma: keep
37
#include "common/config.h"
38
#include "common/logging.h"
39
#include "common/status.h"
40
#include "core/block/column_with_type_and_name.h"
41
#include "core/block/columns_with_type_and_name.h"
42
#include "core/column/column.h"
43
#include "core/column/column_nullable.h"
44
#include "core/column/column_vector.h"
45
#include "core/data_type/data_type.h"
46
#include "core/data_type/data_type_nullable.h"
47
#include "core/data_type/data_type_string.h"
48
#include "core/string_ref.h"
49
#include "exec/common/stringop_substring.h"
50
#include "exec/rowid_fetcher.h"
51
#include "exec/scan/scan_node.h"
52
#include "exprs/aggregate/aggregate_function.h"
53
#include "exprs/function/function.h"
54
#include "exprs/function/simple_function_factory.h"
55
#include "exprs/vexpr.h"
56
#include "exprs/vexpr_context.h"
57
#include "exprs/vexpr_fwd.h"
58
#include "exprs/vslot_ref.h"
59
#include "format/arrow/arrow_stream_reader.h"
60
#include "format/csv/csv_reader.h"
61
#include "format/json/new_json_reader.h"
62
#include "format/native/native_reader.h"
63
#include "format/orc/vorc_reader.h"
64
#include "format/parquet/vparquet_reader.h"
65
#include "format/table/hive_reader.h"
66
#include "format/table/hudi_jni_reader.h"
67
#include "format/table/hudi_reader.h"
68
#include "format/table/iceberg_reader.h"
69
#include "format/table/jdbc_jni_reader.h"
70
#include "format/table/max_compute_jni_reader.h"
71
#include "format/table/paimon_cpp_reader.h"
72
#include "format/table/paimon_jni_reader.h"
73
#include "format/table/paimon_predicate_converter.h"
74
#include "format/table/paimon_reader.h"
75
#include "format/table/remote_doris_reader.h"
76
#include "format/table/transactional_hive_reader.h"
77
#include "format/table/trino_connector_jni_reader.h"
78
#include "format/text/text_reader.h"
79
#include "io/cache/block_file_cache_profile.h"
80
#include "load/group_commit/wal/wal_reader.h"
81
#include "runtime/descriptors.h"
82
#include "runtime/runtime_profile.h"
83
#include "runtime/runtime_state.h"
84
85
namespace cctz {
86
class time_zone;
87
} // namespace cctz
88
namespace doris {
89
class ShardedKVCache;
90
} // namespace doris
91
92
namespace doris {
93
#include "common/compile_check_begin.h"
94
using namespace ErrorCode;
95
96
const std::string FileScanner::FileReadBytesProfile = "FileReadBytes";
97
const std::string FileScanner::FileReadTimeProfile = "FileReadTime";
98
99
FileScanner::FileScanner(RuntimeState* state, FileScanLocalState* local_state, int64_t limit,
100
                         std::shared_ptr<SplitSourceConnector> split_source,
101
                         RuntimeProfile* profile, ShardedKVCache* kv_cache,
102
                         const std::unordered_map<std::string, int>* colname_to_slot_id)
103
92.1k
        : Scanner(state, local_state, limit, profile),
104
92.1k
          _split_source(split_source),
105
92.1k
          _cur_reader(nullptr),
106
92.1k
          _cur_reader_eof(false),
107
92.1k
          _kv_cache(kv_cache),
108
92.1k
          _strict_mode(false),
109
92.1k
          _col_name_to_slot_id(colname_to_slot_id) {
110
92.1k
    if (state->get_query_ctx() != nullptr &&
111
92.1k
        state->get_query_ctx()->file_scan_range_params_map.count(local_state->parent_id()) > 0) {
112
91.8k
        _params = &(state->get_query_ctx()->file_scan_range_params_map[local_state->parent_id()]);
113
91.8k
    } else {
114
        // old fe thrift protocol
115
325
        _params = _split_source->get_params();
116
325
    }
117
92.1k
    if (_params->__isset.strict_mode) {
118
342
        _strict_mode = _params->strict_mode;
119
342
    }
120
121
    // For load scanner, there are input and output tuple.
122
    // For query scanner, there is only output tuple
123
92.1k
    _input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_params->src_tuple_id);
124
92.1k
    _real_tuple_desc = _input_tuple_desc == nullptr ? _output_tuple_desc : _input_tuple_desc;
125
92.1k
    _is_load = (_input_tuple_desc != nullptr);
126
92.1k
}
127
128
92.0k
Status FileScanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) {
129
92.0k
    RETURN_IF_ERROR(Scanner::init(state, conjuncts));
130
92.0k
    _get_block_timer =
131
92.0k
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerGetBlockTime", 1);
132
92.0k
    _cast_to_input_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
133
92.0k
                                                      "FileScannerCastInputBlockTime", 1);
134
92.0k
    _fill_missing_columns_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
135
92.0k
                                                       "FileScannerFillMissingColumnTime", 1);
136
92.0k
    _pre_filter_timer =
137
92.0k
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerPreFilterTimer", 1);
138
92.0k
    _convert_to_output_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
139
92.0k
                                                          "FileScannerConvertOuputBlockTime", 1);
140
92.0k
    _runtime_filter_partition_prune_timer = ADD_TIMER_WITH_LEVEL(
141
92.0k
            _local_state->scanner_profile(), "FileScannerRuntimeFilterPartitionPruningTime", 1);
142
92.0k
    _empty_file_counter =
143
92.0k
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "EmptyFileNum", TUnit::UNIT, 1);
144
92.0k
    _not_found_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
145
92.0k
                                                     "NotFoundFileNum", TUnit::UNIT, 1);
146
92.0k
    _fully_skipped_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
147
92.0k
                                                         "FullySkippedFileNum", TUnit::UNIT, 1);
148
92.0k
    _file_counter =
149
92.0k
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT, 1);
150
151
92.0k
    _file_read_bytes_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
152
92.0k
                                                      FileReadBytesProfile, TUnit::BYTES, 1);
153
92.0k
    _file_read_calls_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
154
92.0k
                                                      "FileReadCalls", TUnit::UNIT, 1);
155
92.0k
    _file_read_time_counter =
156
92.0k
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), FileReadTimeProfile, 1);
157
158
92.0k
    _runtime_filter_partition_pruned_range_counter =
159
92.0k
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
160
92.0k
                                   "RuntimeFilterPartitionPrunedRangeNum", TUnit::UNIT, 1);
161
162
92.0k
    _file_cache_statistics.reset(new io::FileCacheStatistics());
163
92.0k
    _file_reader_stats.reset(new io::FileReaderStats());
164
165
92.0k
    RETURN_IF_ERROR(_init_io_ctx());
166
92.0k
    _io_ctx->file_cache_stats = _file_cache_statistics.get();
167
92.0k
    _io_ctx->file_reader_stats = _file_reader_stats.get();
168
92.0k
    _io_ctx->is_disposable = _state->query_options().disable_file_cache;
169
170
92.0k
    if (_is_load) {
171
342
        _src_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
172
342
                                              std::vector<TupleId>({_input_tuple_desc->id()})));
173
        // prepare pre filters
174
342
        if (_params->__isset.pre_filter_exprs_list) {
175
0
            RETURN_IF_ERROR(doris::VExpr::create_expr_trees(_params->pre_filter_exprs_list,
176
0
                                                            _pre_conjunct_ctxs));
177
342
        } else if (_params->__isset.pre_filter_exprs) {
178
0
            VExprContextSPtr context;
179
0
            RETURN_IF_ERROR(doris::VExpr::create_expr_tree(_params->pre_filter_exprs, context));
180
0
            _pre_conjunct_ctxs.emplace_back(context);
181
0
        }
182
183
342
        for (auto& conjunct : _pre_conjunct_ctxs) {
184
0
            RETURN_IF_ERROR(conjunct->prepare(_state, *_src_row_desc));
185
0
            RETURN_IF_ERROR(conjunct->open(_state));
186
0
        }
187
188
342
        _dest_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
189
342
                                               std::vector<TupleId>({_output_tuple_desc->id()})));
190
342
    }
191
192
92.0k
    _default_val_row_desc.reset(
193
92.0k
            new RowDescriptor(_state->desc_tbl(), std::vector<TupleId>({_real_tuple_desc->id()})));
194
195
92.0k
    return Status::OK();
196
92.0k
}
197
198
// check if the expr is a partition pruning expr
199
34.2k
bool FileScanner::_check_partition_prune_expr(const VExprSPtr& expr) {
200
34.2k
    if (expr->is_slot_ref()) {
201
13.7k
        auto* slot_ref = static_cast<VSlotRef*>(expr.get());
202
13.7k
        return _partition_slot_index_map.find(slot_ref->slot_id()) !=
203
13.7k
               _partition_slot_index_map.end();
204
13.7k
    }
205
20.5k
    if (expr->is_literal()) {
206
5.93k
        return true;
207
5.93k
    }
208
20.8k
    return std::ranges::all_of(expr->children(), [this](const auto& child) {
209
20.8k
        return _check_partition_prune_expr(child);
210
20.8k
    });
211
20.5k
}
212
213
13.0k
void FileScanner::_init_runtime_filter_partition_prune_ctxs() {
214
13.0k
    _runtime_filter_partition_prune_ctxs.clear();
215
13.4k
    for (auto& conjunct : _conjuncts) {
216
13.4k
        auto impl = conjunct->root()->get_impl();
217
        // If impl is not null, which means this a conjuncts from runtime filter.
218
13.4k
        auto expr = impl ? impl : conjunct->root();
219
13.4k
        if (_check_partition_prune_expr(expr)) {
220
9.11k
            _runtime_filter_partition_prune_ctxs.emplace_back(conjunct);
221
9.11k
        }
222
13.4k
    }
223
13.0k
}
224
225
8.36k
void FileScanner::_init_runtime_filter_partition_prune_block() {
226
    // init block with empty column
227
73.4k
    for (auto const* slot_desc : _real_tuple_desc->slots()) {
228
73.4k
        _runtime_filter_partition_prune_block.insert(
229
73.4k
                ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
230
73.4k
                                      slot_desc->get_data_type_ptr(), slot_desc->col_name()));
231
73.4k
    }
232
8.36k
}
233
234
8.46k
Status FileScanner::_process_runtime_filters_partition_prune(bool& can_filter_all) {
235
8.46k
    SCOPED_TIMER(_runtime_filter_partition_prune_timer);
236
8.46k
    if (_runtime_filter_partition_prune_ctxs.empty() || _partition_col_descs.empty()) {
237
5.29k
        return Status::OK();
238
5.29k
    }
239
3.17k
    size_t partition_value_column_size = 1;
240
241
    // 1. Get partition key values to string columns.
242
3.17k
    std::unordered_map<SlotId, MutableColumnPtr> partition_slot_id_to_column;
243
4.16k
    for (auto const& partition_col_desc : _partition_col_descs) {
244
4.16k
        const auto& [partition_value, partition_slot_desc] = partition_col_desc.second;
245
4.16k
        auto data_type = partition_slot_desc->get_data_type_ptr();
246
4.16k
        auto test_serde = data_type->get_serde();
247
4.16k
        auto partition_value_column = data_type->create_column();
248
4.16k
        auto* col_ptr = static_cast<IColumn*>(partition_value_column.get());
249
4.16k
        Slice slice(partition_value.data(), partition_value.size());
250
4.16k
        uint64_t num_deserialized = 0;
251
4.16k
        DataTypeSerDe::FormatOptions options {};
252
4.16k
        if (_partition_value_is_null.contains(partition_slot_desc->col_name())) {
253
            // for iceberg/paimon table
254
            // NOTICE: column is always be nullable for iceberg/paimon table now
255
1.12k
            DCHECK(data_type->is_nullable());
256
1.12k
            test_serde = test_serde->get_nested_serdes()[0];
257
1.12k
            auto* null_column = assert_cast<ColumnNullable*>(col_ptr);
258
1.12k
            if (_partition_value_is_null[partition_slot_desc->col_name()]) {
259
158
                null_column->insert_many_defaults(partition_value_column_size);
260
966
            } else {
261
                // If the partition value is not null, we set null map to 0 and deserialize it normally.
262
966
                null_column->get_null_map_column().insert_many_vals(0, partition_value_column_size);
263
966
                RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
264
966
                        null_column->get_nested_column(), slice, partition_value_column_size,
265
966
                        &num_deserialized, options));
266
966
            }
267
3.03k
        } else {
268
            // for hive/hudi table, the null value is set as "\\N"
269
            // TODO: this will be unified as iceberg/paimon table in the future
270
3.03k
            RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
271
3.03k
                    *col_ptr, slice, partition_value_column_size, &num_deserialized, options));
272
3.03k
        }
273
274
4.16k
        partition_slot_id_to_column[partition_slot_desc->id()] = std::move(partition_value_column);
275
4.16k
    }
276
277
    // 2. Fill _runtime_filter_partition_prune_block from the partition column, then execute conjuncts and filter block.
278
    // 2.1 Fill _runtime_filter_partition_prune_block from the partition column to match the conjuncts executing.
279
3.17k
    size_t index = 0;
280
3.17k
    bool first_column_filled = false;
281
8.09k
    for (auto const* slot_desc : _real_tuple_desc->slots()) {
282
8.09k
        if (partition_slot_id_to_column.find(slot_desc->id()) !=
283
8.09k
            partition_slot_id_to_column.end()) {
284
4.16k
            auto data_type = slot_desc->get_data_type_ptr();
285
4.16k
            auto partition_value_column = std::move(partition_slot_id_to_column[slot_desc->id()]);
286
4.16k
            if (data_type->is_nullable()) {
287
4.16k
                _runtime_filter_partition_prune_block.insert(
288
4.16k
                        index, ColumnWithTypeAndName(
289
4.16k
                                       ColumnNullable::create(
290
4.16k
                                               std::move(partition_value_column),
291
4.16k
                                               ColumnUInt8::create(partition_value_column_size, 0)),
292
4.16k
                                       data_type, slot_desc->col_name()));
293
4.16k
            } else {
294
4
                _runtime_filter_partition_prune_block.insert(
295
4
                        index, ColumnWithTypeAndName(std::move(partition_value_column), data_type,
296
4
                                                     slot_desc->col_name()));
297
4
            }
298
4.16k
            if (index == 0) {
299
1.29k
                first_column_filled = true;
300
1.29k
            }
301
4.16k
        }
302
8.09k
        index++;
303
8.09k
    }
304
305
    // 2.2 Execute conjuncts.
306
3.17k
    if (!first_column_filled) {
307
        // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
308
        // The following process may be tricky and time-consuming, but we have no other way.
309
1.88k
        _runtime_filter_partition_prune_block.get_by_position(0).column->assume_mutable()->resize(
310
1.88k
                partition_value_column_size);
311
1.88k
    }
312
3.17k
    IColumn::Filter result_filter(_runtime_filter_partition_prune_block.rows(), 1);
313
3.17k
    RETURN_IF_ERROR(VExprContext::execute_conjuncts(_runtime_filter_partition_prune_ctxs, nullptr,
314
3.17k
                                                    &_runtime_filter_partition_prune_block,
315
3.17k
                                                    &result_filter, &can_filter_all));
316
3.17k
    return Status::OK();
317
3.17k
}
318
319
35.0k
Status FileScanner::_process_conjuncts() {
320
35.0k
    _slot_id_to_filter_conjuncts.clear();
321
35.0k
    _not_single_slot_filter_conjuncts.clear();
322
48.7k
    for (auto& conjunct : _push_down_conjuncts) {
323
48.7k
        auto impl = conjunct->root()->get_impl();
324
        // If impl is not null, which means this a conjuncts from runtime filter.
325
48.7k
        auto cur_expr = impl ? impl : conjunct->root();
326
327
48.7k
        std::vector<int> slot_ids;
328
48.7k
        _get_slot_ids(cur_expr.get(), &slot_ids);
329
48.7k
        if (slot_ids.empty()) {
330
0
            _not_single_slot_filter_conjuncts.emplace_back(conjunct);
331
0
            continue;
332
0
        }
333
48.7k
        bool single_slot = true;
334
48.9k
        for (int i = 1; i < slot_ids.size(); i++) {
335
1.29k
            if (slot_ids[i] != slot_ids[0]) {
336
1.07k
                single_slot = false;
337
1.07k
                break;
338
1.07k
            }
339
1.29k
        }
340
48.7k
        if (single_slot) {
341
47.6k
            SlotId slot_id = slot_ids[0];
342
47.6k
            _slot_id_to_filter_conjuncts[slot_id].emplace_back(conjunct);
343
47.6k
        } else {
344
1.09k
            _not_single_slot_filter_conjuncts.emplace_back(conjunct);
345
1.09k
        }
346
48.7k
    }
347
35.0k
    return Status::OK();
348
35.0k
}
349
350
65.6k
Status FileScanner::_process_late_arrival_conjuncts() {
351
65.6k
    if (_push_down_conjuncts.size() < _conjuncts.size()) {
352
35.0k
        _push_down_conjuncts = _conjuncts;
353
35.0k
        _conjuncts.clear();
354
35.0k
        RETURN_IF_ERROR(_process_conjuncts());
355
35.0k
    }
356
65.6k
    if (_applied_rf_num == _total_rf_num) {
357
63.3k
        _local_state->scanner_profile()->add_info_string("ApplyAllRuntimeFilters", "True");
358
63.3k
    }
359
65.6k
    return Status::OK();
360
65.6k
}
361
362
98.8k
void FileScanner::_get_slot_ids(VExpr* expr, std::vector<int>* slot_ids) {
363
100k
    for (auto& child_expr : expr->children()) {
364
100k
        if (child_expr->is_slot_ref()) {
365
50.0k
            VSlotRef* slot_ref = reinterpret_cast<VSlotRef*>(child_expr.get());
366
50.0k
            SlotDescriptor* slot_desc = _state->desc_tbl().get_slot_descriptor(slot_ref->slot_id());
367
50.0k
            slot_desc->set_is_predicate(true);
368
50.0k
            slot_ids->emplace_back(slot_ref->slot_id());
369
50.1k
        } else {
370
50.1k
            _get_slot_ids(child_expr.get(), slot_ids);
371
50.1k
        }
372
100k
    }
373
98.8k
}
374
375
92.2k
Status FileScanner::_open_impl(RuntimeState* state) {
376
92.2k
    RETURN_IF_CANCELLED(state);
377
92.2k
    RETURN_IF_ERROR(Scanner::_open_impl(state));
378
92.2k
    if (_local_state) {
379
92.2k
        _condition_cache_digest = _local_state->get_condition_cache_digest();
380
92.2k
    }
381
92.2k
    RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range));
382
92.2k
    if (_first_scan_range) {
383
71.3k
        RETURN_IF_ERROR(_init_expr_ctxes());
384
71.3k
        if (_state->query_options().enable_runtime_filter_partition_prune &&
385
71.3k
            !_partition_slot_index_map.empty()) {
386
8.36k
            _init_runtime_filter_partition_prune_ctxs();
387
8.36k
            _init_runtime_filter_partition_prune_block();
388
8.36k
        }
389
71.3k
    } else {
390
        // there's no scan range in split source. stop scanner directly.
391
20.9k
        _scanner_eof = true;
392
20.9k
    }
393
394
92.2k
    return Status::OK();
395
92.2k
}
396
397
290k
Status FileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
398
290k
    Status st = _get_block_wrapped(state, block, eof);
399
400
290k
    if (!st.ok()) {
401
        // add cur path in error msg for easy debugging
402
61
        return std::move(st.append(". cur path: " + get_current_scan_range_name()));
403
61
    }
404
290k
    return st;
405
290k
}
406
407
// For query:
408
//                              [exist cols]  [non-exist cols]  [col from path]  input  output
409
//                              A     B    C  D                 E
410
// _init_src_block              x     x    x  x                 x                -      x
411
// get_next_block               x     x    x  -                 -                -      x
412
// _cast_to_input_block         -     -    -  -                 -                -      -
413
// _fill_columns_from_path      -     -    -  -                 x                -      x
414
// _fill_missing_columns        -     -    -  x                 -                -      x
415
// _convert_to_output_block     -     -    -  -                 -                -      -
416
//
417
// For load:
418
//                              [exist cols]  [non-exist cols]  [col from path]  input  output
419
//                              A     B    C  D                 E
420
// _init_src_block              x     x    x  x                 x                x      -
421
// get_next_block               x     x    x  -                 -                x      -
422
// _cast_to_input_block         x     x    x  -                 -                x      -
423
// _fill_columns_from_path      -     -    -  -                 x                x      -
424
// _fill_missing_columns        -     -    -  x                 -                x      -
425
// _convert_to_output_block     -     -    -  -                 -                -      x
426
290k
Status FileScanner::_get_block_wrapped(RuntimeState* state, Block* block, bool* eof) {
427
290k
    do {
428
290k
        RETURN_IF_CANCELLED(state);
429
290k
        if (_cur_reader == nullptr || _cur_reader_eof) {
430
168k
            _finalize_reader_condition_cache();
431
            // The file may not exist because the file list is got from meta cache,
432
            // And the file may already be removed from storage.
433
            // Just ignore not found files.
434
168k
            Status st = _get_next_reader();
435
168k
            if (st.is<ErrorCode::NOT_FOUND>() && config::ignore_not_found_file_in_external_table) {
436
0
                _cur_reader_eof = true;
437
0
                COUNTER_UPDATE(_not_found_file_counter, 1);
438
0
                continue;
439
168k
            } else if (st.is<ErrorCode::END_OF_FILE>()) {
440
12
                _cur_reader_eof = true;
441
12
                COUNTER_UPDATE(_fully_skipped_file_counter, 1);
442
12
                continue;
443
168k
            } else if (!st) {
444
5
                return st;
445
5
            }
446
168k
            _init_reader_condition_cache();
447
168k
        }
448
449
290k
        if (_scanner_eof) {
450
91.4k
            *eof = true;
451
91.4k
            return Status::OK();
452
91.4k
        }
453
454
        // Init src block for load job based on the data file schema (e.g. parquet)
455
        // For query job, simply set _src_block_ptr to block.
456
198k
        size_t read_rows = 0;
457
198k
        RETURN_IF_ERROR(_init_src_block(block));
458
198k
        {
459
198k
            SCOPED_TIMER(_get_block_timer);
460
461
            // Read next block.
462
            // Some of column in block may not be filled (column not exist in file)
463
198k
            RETURN_IF_ERROR(
464
198k
                    _cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof));
465
198k
        }
466
        // use read_rows instead of _src_block_ptr->rows(), because the first column of _src_block_ptr
467
        // may not be filled after calling `get_next_block()`, so _src_block_ptr->rows() may return wrong result.
468
198k
        if (read_rows > 0) {
469
129k
            if ((!_cur_reader->count_read_rows()) && _io_ctx) {
470
12.3k
                _io_ctx->file_reader_stats->read_rows += read_rows;
471
12.3k
            }
472
            // If the push_down_agg_type is COUNT, no need to do the rest,
473
            // because we only save a number in block.
474
129k
            if (_get_push_down_agg_type() != TPushAggOp::type::COUNT) {
475
                // Convert the src block columns type to string in-place.
476
125k
                RETURN_IF_ERROR(_cast_to_input_block(block));
477
                // FileReader can fill partition and missing columns itself
478
125k
                if (!_cur_reader->fill_all_columns()) {
479
                    // Fill rows in src block with partition columns from path. (e.g. Hive partition columns)
480
11.5k
                    RETURN_IF_ERROR(_fill_columns_from_path(read_rows));
481
                    // Fill columns not exist in file with null or default value
482
11.5k
                    RETURN_IF_ERROR(_fill_missing_columns(read_rows));
483
11.5k
                }
484
                // Apply _pre_conjunct_ctxs to filter src block.
485
125k
                RETURN_IF_ERROR(_pre_filter_src_block());
486
487
                // Convert src block to output block (dest block), string to dest data type and apply filters.
488
125k
                RETURN_IF_ERROR(_convert_to_output_block(block));
489
                // Truncate char columns or varchar columns if size is smaller than file columns
490
                // or not found in the file column schema.
491
125k
                RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block));
492
125k
            }
493
129k
        }
494
198k
        break;
495
198k
    } while (true);
496
497
    // Update filtered rows and unselected rows for load, reset counter.
498
    // {
499
    //     state->update_num_rows_load_filtered(_counter.num_rows_filtered);
500
    //     state->update_num_rows_load_unselected(_counter.num_rows_unselected);
501
    //     _reset_counter();
502
    // }
503
198k
    return Status::OK();
504
290k
}
505
506
/**
507
 * Check whether there are complex types in parquet/orc reader in broker/stream load.
508
 * Broker/stream load will cast any type as string type, and complex types will be casted wrong.
509
 * This is a temporary method, and will be replaced by tvf.
510
 */
511
690
Status FileScanner::_check_output_block_types() {
512
690
    if (_is_load) {
513
690
        TFileFormatType::type format_type = _params->format_type;
514
690
        if (format_type == TFileFormatType::FORMAT_PARQUET ||
515
690
            format_type == TFileFormatType::FORMAT_ORC) {
516
46
            for (auto slot : _output_tuple_desc->slots()) {
517
46
                if (is_complex_type(slot->type()->get_primitive_type())) {
518
0
                    return Status::InternalError(
519
0
                            "Parquet/orc doesn't support complex types in broker/stream load, "
520
0
                            "please use tvf(table value function) to insert complex types.");
521
0
                }
522
46
            }
523
8
        }
524
690
    }
525
690
    return Status::OK();
526
690
}
527
528
198k
Status FileScanner::_init_src_block(Block* block) {
529
198k
    if (!_is_load) {
530
198k
        _src_block_ptr = block;
531
532
        // Build name to index map only once on first call
533
198k
        if (_src_block_name_to_idx.empty()) {
534
74.7k
            _src_block_name_to_idx = block->get_name_to_pos_map();
535
74.7k
        }
536
198k
        return Status::OK();
537
198k
    }
538
694
    RETURN_IF_ERROR(_check_output_block_types());
539
540
    // if (_src_block_init) {
541
    //     _src_block.clear_column_data();
542
    //     _src_block_ptr = &_src_block;
543
    //     return Status::OK();
544
    // }
545
546
694
    _src_block.clear();
547
694
    uint32_t idx = 0;
548
    // slots in _input_tuple_desc contains all slots describe in load statement, eg:
549
    // -H "columns: k1, k2, tmp1, k3 = tmp1 + 1"
550
    // _input_tuple_desc will contains: k1, k2, tmp1
551
    // and some of them are from file, such as k1 and k2, and some of them may not exist in file, such as tmp1
552
    // _input_tuple_desc also contains columns from path
553
10.2k
    for (auto& slot : _input_tuple_desc->slots()) {
554
10.2k
        DataTypePtr data_type;
555
10.2k
        auto it = _slot_lower_name_to_col_type.find(slot->col_name());
556
10.2k
        if (slot->is_skip_bitmap_col()) {
557
0
            _skip_bitmap_col_idx = idx;
558
0
        }
559
10.2k
        if (_params->__isset.sequence_map_col) {
560
0
            if (_params->sequence_map_col == slot->col_name()) {
561
0
                _sequence_map_col_uid = slot->col_unique_id();
562
0
            }
563
0
        }
564
10.2k
        data_type =
565
10.2k
                it == _slot_lower_name_to_col_type.end() ? slot->type() : make_nullable(it->second);
566
10.2k
        MutableColumnPtr data_column = data_type->create_column();
567
10.2k
        _src_block.insert(
568
10.2k
                ColumnWithTypeAndName(std::move(data_column), data_type, slot->col_name()));
569
10.2k
        _src_block_name_to_idx.emplace(slot->col_name(), idx++);
570
10.2k
    }
571
694
    if (_params->__isset.sequence_map_col) {
572
0
        for (const auto& slot : _output_tuple_desc->slots()) {
573
            // When the target table has seqeunce map column, _input_tuple_desc will not contains __DORIS_SEQUENCE_COL__,
574
            // so we should get its column unique id from _output_tuple_desc
575
0
            if (slot->is_sequence_col()) {
576
0
                _sequence_col_uid = slot->col_unique_id();
577
0
            }
578
0
        }
579
0
    }
580
694
    _src_block_ptr = &_src_block;
581
694
    _src_block_init = true;
582
694
    return Status::OK();
583
694
}
584
585
125k
Status FileScanner::_cast_to_input_block(Block* block) {
586
125k
    if (!_is_load) {
587
125k
        return Status::OK();
588
125k
    }
589
340
    SCOPED_TIMER(_cast_to_input_block_timer);
590
    // cast primitive type(PT0) to primitive type(PT1)
591
340
    uint32_t idx = 0;
592
5.19k
    for (auto& slot_desc : _input_tuple_desc->slots()) {
593
5.19k
        if (_slot_lower_name_to_col_type.find(slot_desc->col_name()) ==
594
5.19k
            _slot_lower_name_to_col_type.end()) {
595
            // skip columns which does not exist in file
596
90
            continue;
597
90
        }
598
5.10k
        auto& arg = _src_block_ptr->get_by_position(_src_block_name_to_idx[slot_desc->col_name()]);
599
5.10k
        auto return_type = slot_desc->get_data_type_ptr();
600
        // remove nullable here, let the get_function decide whether nullable
601
5.10k
        auto data_type = get_data_type_with_default_argument(remove_nullable(return_type));
602
5.10k
        ColumnsWithTypeAndName arguments {
603
5.10k
                arg, {data_type->create_column(), data_type, slot_desc->col_name()}};
604
5.10k
        auto func_cast =
605
5.10k
                SimpleFunctionFactory::instance().get_function("CAST", arguments, return_type, {});
606
5.10k
        if (!func_cast) {
607
0
            return Status::InternalError("Function CAST[arg={}, col name={}, return={}] not found!",
608
0
                                         arg.type->get_name(), slot_desc->col_name(),
609
0
                                         return_type->get_name());
610
0
        }
611
5.10k
        idx = _src_block_name_to_idx[slot_desc->col_name()];
612
5.10k
        DCHECK(_state != nullptr);
613
5.10k
        auto ctx = FunctionContext::create_context(_state, {}, {});
614
5.10k
        RETURN_IF_ERROR(
615
5.10k
                func_cast->execute(ctx.get(), *_src_block_ptr, {idx}, idx, arg.column->size()));
616
5.10k
        _src_block_ptr->get_by_position(idx).type = std::move(return_type);
617
5.10k
    }
618
340
    return Status::OK();
619
340
}
620
621
11.5k
Status FileScanner::_fill_columns_from_path(size_t rows) {
622
11.5k
    if (!_fill_partition_from_path) {
623
6
        return Status::OK();
624
6
    }
625
11.4k
    DataTypeSerDe::FormatOptions _text_formatOptions;
626
11.4k
    for (auto& kv : _partition_col_descs) {
627
2.03k
        auto doris_column =
628
2.03k
                _src_block_ptr->get_by_position(_src_block_name_to_idx[kv.first]).column;
629
        // _src_block_ptr points to a mutable block created by this class itself, so const_cast can be used here.
630
2.03k
        IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
631
2.03k
        auto& [value, slot_desc] = kv.second;
632
2.03k
        auto _text_serde = slot_desc->get_data_type_ptr()->get_serde();
633
2.03k
        Slice slice(value.data(), value.size());
634
2.03k
        uint64_t num_deserialized = 0;
635
2.03k
        if (_text_serde->deserialize_column_from_fixed_json(*col_ptr, slice, rows,
636
2.03k
                                                            &num_deserialized,
637
2.03k
                                                            _text_formatOptions) != Status::OK()) {
638
0
            return Status::InternalError("Failed to fill partition column: {}={}",
639
0
                                         slot_desc->col_name(), value);
640
0
        }
641
2.03k
        if (num_deserialized != rows) {
642
0
            return Status::InternalError(
643
0
                    "Failed to fill partition column: {}={} ."
644
0
                    "Number of rows expected to be written : {}, number of rows actually written : "
645
0
                    "{}",
646
0
                    slot_desc->col_name(), value, num_deserialized, rows);
647
0
        }
648
2.03k
    }
649
11.4k
    return Status::OK();
650
11.4k
}
651
652
11.5k
Status FileScanner::_fill_missing_columns(size_t rows) {
653
11.5k
    if (_missing_cols.empty()) {
654
11.5k
        return Status::OK();
655
11.5k
    }
656
657
0
    SCOPED_TIMER(_fill_missing_columns_timer);
658
0
    for (auto& kv : _missing_col_descs) {
659
0
        if (kv.second == nullptr) {
660
            // no default column, fill with null
661
0
            auto mutable_column = _src_block_ptr->get_by_position(_src_block_name_to_idx[kv.first])
662
0
                                          .column->assume_mutable();
663
0
            auto* nullable_column = static_cast<ColumnNullable*>(mutable_column.get());
664
0
            nullable_column->insert_many_defaults(rows);
665
0
        } else {
666
            // fill with default value
667
0
            auto& ctx = kv.second;
668
0
            ColumnPtr result_column_ptr;
669
            // PT1 => dest primitive type
670
0
            RETURN_IF_ERROR(ctx->execute(_src_block_ptr, result_column_ptr));
671
0
            if (result_column_ptr->use_count() == 1) {
672
                // call resize because the first column of _src_block_ptr may not be filled by reader,
673
                // so _src_block_ptr->rows() may return wrong result, cause the column created by `ctx->execute()`
674
                // has only one row.
675
0
                auto mutable_column = result_column_ptr->assume_mutable();
676
0
                mutable_column->resize(rows);
677
                // result_column_ptr maybe a ColumnConst, convert it to a normal column
678
0
                result_column_ptr = result_column_ptr->convert_to_full_column_if_const();
679
0
                auto origin_column_type =
680
0
                        _src_block_ptr->get_by_position(_src_block_name_to_idx[kv.first]).type;
681
0
                bool is_nullable = origin_column_type->is_nullable();
682
0
                if (!_src_block_name_to_idx.contains(kv.first)) {
683
0
                    return Status::InternalError("Column {} not found in src block {}", kv.first,
684
0
                                                 _src_block_ptr->dump_structure());
685
0
                }
686
0
                _src_block_ptr->replace_by_position(
687
0
                        _src_block_name_to_idx[kv.first],
688
0
                        is_nullable ? make_nullable(result_column_ptr) : result_column_ptr);
689
0
            }
690
0
        }
691
0
    }
692
0
    return Status::OK();
693
0
}
694
695
125k
Status FileScanner::_pre_filter_src_block() {
696
125k
    if (!_is_load) {
697
125k
        return Status::OK();
698
125k
    }
699
342
    if (!_pre_conjunct_ctxs.empty()) {
700
0
        SCOPED_TIMER(_pre_filter_timer);
701
0
        auto origin_column_num = _src_block_ptr->columns();
702
0
        auto old_rows = _src_block_ptr->rows();
703
0
        RETURN_IF_ERROR(
704
0
                VExprContext::filter_block(_pre_conjunct_ctxs, _src_block_ptr, origin_column_num));
705
0
        _counter.num_rows_unselected += old_rows - _src_block_ptr->rows();
706
0
    }
707
342
    return Status::OK();
708
342
}
709
710
125k
Status FileScanner::_convert_to_output_block(Block* block) {
711
125k
    if (!_is_load) {
712
125k
        return Status::OK();
713
125k
    }
714
348
    SCOPED_TIMER(_convert_to_output_block_timer);
715
    // The block is passed from scanner context's free blocks,
716
    // which is initialized by output columns
717
    // so no need to clear it
718
    // block->clear();
719
720
348
    int ctx_idx = 0;
721
348
    size_t rows = _src_block_ptr->rows();
722
348
    auto filter_column = ColumnUInt8::create(rows, 1);
723
348
    auto& filter_map = filter_column->get_data();
724
725
    // After convert, the column_ptr should be copied into output block.
726
    // Can not use block->insert() because it may cause use_count() non-zero bug
727
348
    MutableBlock mutable_output_block =
728
348
            VectorizedUtils::build_mutable_mem_reuse_block(block, *_dest_row_desc);
729
348
    auto& mutable_output_columns = mutable_output_block.mutable_columns();
730
731
348
    std::vector<BitmapValue>* skip_bitmaps {nullptr};
732
348
    if (_should_process_skip_bitmap_col()) {
733
0
        auto* skip_bitmap_nullable_col_ptr =
734
0
                assert_cast<ColumnNullable*>(_src_block_ptr->get_by_position(_skip_bitmap_col_idx)
735
0
                                                     .column->assume_mutable()
736
0
                                                     .get());
737
0
        skip_bitmaps = &(assert_cast<ColumnBitmap*>(
738
0
                                 skip_bitmap_nullable_col_ptr->get_nested_column_ptr().get())
739
0
                                 ->get_data());
740
        // NOTE:
741
        // - If the table has sequence type column, __DORIS_SEQUENCE_COL__ will be put in _input_tuple_desc, so whether
742
        //   __DORIS_SEQUENCE_COL__ will be marked in skip bitmap depends on whether it's specified in that row
743
        // - If the table has sequence map column, __DORIS_SEQUENCE_COL__ will not be put in _input_tuple_desc,
744
        //   so __DORIS_SEQUENCE_COL__ will be ommited if it't specified in a row and will not be marked in skip bitmap.
745
        //   So we should mark __DORIS_SEQUENCE_COL__ in skip bitmap here if the corresponding sequence map column us marked
746
0
        if (_sequence_map_col_uid != -1) {
747
0
            for (int j = 0; j < rows; ++j) {
748
0
                if ((*skip_bitmaps)[j].contains(_sequence_map_col_uid)) {
749
0
                    (*skip_bitmaps)[j].add(_sequence_col_uid);
750
0
                }
751
0
            }
752
0
        }
753
0
    }
754
755
    // for (auto slot_desc : _output_tuple_desc->slots()) {
756
5.95k
    for (int j = 0; j < mutable_output_columns.size(); ++j) {
757
5.60k
        auto* slot_desc = _output_tuple_desc->slots()[j];
758
5.60k
        int dest_index = ctx_idx;
759
5.60k
        ColumnPtr column_ptr;
760
761
5.60k
        auto& ctx = _dest_vexpr_ctx[dest_index];
762
        // PT1 => dest primitive type
763
5.60k
        RETURN_IF_ERROR(ctx->execute(_src_block_ptr, column_ptr));
764
        // column_ptr maybe a ColumnConst, convert it to a normal column
765
5.60k
        column_ptr = column_ptr->convert_to_full_column_if_const();
766
5.60k
        DCHECK(column_ptr);
767
768
        // because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr
769
        // is likely to be nullable
770
5.60k
        if (LIKELY(column_ptr->is_nullable())) {
771
5.50k
            const auto* nullable_column = reinterpret_cast<const ColumnNullable*>(column_ptr.get());
772
4.94M
            for (int i = 0; i < rows; ++i) {
773
4.94M
                if (filter_map[i] && nullable_column->is_null_at(i)) {
774
                    // skip checks for non-mentioned columns in flexible partial update
775
227k
                    if (skip_bitmaps == nullptr ||
776
227k
                        !skip_bitmaps->at(i).contains(slot_desc->col_unique_id())) {
777
                        // clang-format off
778
227k
                        if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) &&
779
227k
                            !_src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index]).column->is_null_at(i)) {
780
0
                            filter_map[i] = false;
781
0
                            RETURN_IF_ERROR(_state->append_error_msg_to_file(
782
0
                                [&]() -> std::string {
783
0
                                    return _src_block_ptr->dump_one_line(i, _num_of_columns_from_file);
784
0
                                },
785
0
                                [&]() -> std::string {
786
0
                                    auto raw_value =
787
0
                                            _src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index]).column->get_data_at(i);
788
0
                                    std::string raw_string = raw_value.to_string();
789
0
                                    fmt::memory_buffer error_msg;
790
0
                                    fmt::format_to(error_msg,"column({}) value is incorrect while strict mode is {}, src value is {}",
791
0
                                            slot_desc->col_name(), _strict_mode, raw_string);
792
0
                                    return fmt::to_string(error_msg);
793
0
                                }));
794
227k
                        } else if (!slot_desc->is_nullable()) {
795
10
                            filter_map[i] = false;
796
10
                            RETURN_IF_ERROR(_state->append_error_msg_to_file(
797
10
                                [&]() -> std::string {
798
10
                                    return _src_block_ptr->dump_one_line(i, _num_of_columns_from_file);
799
10
                                },
800
10
                                [&]() -> std::string {
801
10
                                    fmt::memory_buffer error_msg;
802
10
                                    fmt::format_to(error_msg, "column({}) values is null while columns is not nullable", slot_desc->col_name());
803
10
                                    return fmt::to_string(error_msg);
804
10
                                }));
805
10
                        }
806
                        // clang-format on
807
227k
                    }
808
227k
                }
809
4.94M
            }
810
5.50k
            if (!slot_desc->is_nullable()) {
811
328
                column_ptr = remove_nullable(column_ptr);
812
328
            }
813
5.50k
        } else if (slot_desc->is_nullable()) {
814
0
            column_ptr = make_nullable(column_ptr);
815
0
        }
816
5.60k
        mutable_output_columns[j]->insert_range_from(*column_ptr, 0, rows);
817
5.60k
        ctx_idx++;
818
5.60k
    }
819
820
    // after do the dest block insert operation, clear _src_block to remove the reference of origin column
821
348
    _src_block_ptr->clear();
822
823
348
    size_t dest_size = block->columns();
824
    // do filter
825
348
    block->insert(ColumnWithTypeAndName(std::move(filter_column), std::make_shared<DataTypeUInt8>(),
826
348
                                        "filter column"));
827
348
    RETURN_IF_ERROR(Block::filter_block(block, dest_size, dest_size));
828
829
348
    _counter.num_rows_filtered += rows - block->rows();
830
348
    return Status::OK();
831
348
}
832
833
125k
Status FileScanner::_truncate_char_or_varchar_columns(Block* block) {
834
    // Truncate char columns or varchar columns if size is smaller than file columns
835
    // or not found in the file column schema.
836
125k
    if (!_state->query_options().truncate_char_or_varchar_columns) {
837
125k
        return Status::OK();
838
125k
    }
839
20
    int idx = 0;
840
72
    for (auto* slot_desc : _real_tuple_desc->slots()) {
841
72
        const auto& type = slot_desc->type();
842
72
        if (type->get_primitive_type() != TYPE_VARCHAR && type->get_primitive_type() != TYPE_CHAR) {
843
24
            ++idx;
844
24
            continue;
845
24
        }
846
48
        auto iter = _source_file_col_name_types.find(slot_desc->col_name());
847
48
        if (iter != _source_file_col_name_types.end()) {
848
32
            const auto file_type_desc = _source_file_col_name_types[slot_desc->col_name()];
849
32
            int l = -1;
850
32
            if (auto* ftype = check_and_get_data_type<DataTypeString>(
851
32
                        remove_nullable(file_type_desc).get())) {
852
32
                l = ftype->len();
853
32
            }
854
32
            if ((assert_cast<const DataTypeString*>(remove_nullable(type).get())->len() > 0) &&
855
32
                (assert_cast<const DataTypeString*>(remove_nullable(type).get())->len() < l ||
856
32
                 l < 0)) {
857
32
                _truncate_char_or_varchar_column(
858
32
                        block, idx,
859
32
                        assert_cast<const DataTypeString*>(remove_nullable(type).get())->len());
860
32
            }
861
32
        } else {
862
16
            _truncate_char_or_varchar_column(
863
16
                    block, idx,
864
16
                    assert_cast<const DataTypeString*>(remove_nullable(type).get())->len());
865
16
        }
866
48
        ++idx;
867
48
    }
868
20
    return Status::OK();
869
125k
}
870
871
// VARCHAR substring(VARCHAR str, INT pos[, INT len])
872
48
void FileScanner::_truncate_char_or_varchar_column(Block* block, int idx, int len) {
873
48
    auto int_type = std::make_shared<DataTypeInt32>();
874
48
    uint32_t num_columns_without_result = block->columns();
875
48
    const ColumnNullable* col_nullable =
876
48
            assert_cast<const ColumnNullable*>(block->get_by_position(idx).column.get());
877
48
    const ColumnPtr& string_column_ptr = col_nullable->get_nested_column_ptr();
878
48
    ColumnPtr null_map_column_ptr = col_nullable->get_null_map_column_ptr();
879
48
    block->replace_by_position(idx, std::move(string_column_ptr));
880
48
    block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(1)), int_type,
881
48
                   "const 1"}); // pos is 1
882
48
    block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(len)), int_type,
883
48
                   fmt::format("const {}", len)});                          // len
884
48
    block->insert({nullptr, std::make_shared<DataTypeString>(), "result"}); // result column
885
48
    ColumnNumbers temp_arguments(3);
886
48
    temp_arguments[0] = idx;                            // str column
887
48
    temp_arguments[1] = num_columns_without_result;     // pos
888
48
    temp_arguments[2] = num_columns_without_result + 1; // len
889
48
    uint32_t result_column_id = num_columns_without_result + 2;
890
891
48
    SubstringUtil::substring_execute(*block, temp_arguments, result_column_id, block->rows());
892
48
    auto res = ColumnNullable::create(block->get_by_position(result_column_id).column,
893
48
                                      null_map_column_ptr);
894
48
    block->replace_by_position(idx, std::move(res));
895
48
    Block::erase_useless_column(block, num_columns_without_result);
896
48
}
897
898
11.8k
Status FileScanner::_create_row_id_column_iterator() {
899
11.8k
    auto& id_file_map = _state->get_id_file_map();
900
11.8k
    auto file_id = id_file_map->get_file_mapping_id(
901
11.8k
            std::make_shared<FileMapping>(((FileScanLocalState*)_local_state)->parent_id(),
902
11.8k
                                          _current_range, _should_enable_file_meta_cache()));
903
11.8k
    _row_id_column_iterator_pair.first = std::make_shared<RowIdColumnIteratorV2>(
904
11.8k
            IdManager::ID_VERSION, BackendOptions::get_backend_id(), file_id);
905
11.8k
    return Status::OK();
906
11.8k
}
907
908
168k
Status FileScanner::_get_next_reader() {
909
168k
    while (true) {
910
168k
        if (_cur_reader) {
911
76.1k
            _cur_reader->collect_profile_before_close();
912
76.1k
            RETURN_IF_ERROR(_cur_reader->close());
913
76.1k
            _state->update_num_finished_scan_range(1);
914
76.1k
        }
915
168k
        _cur_reader.reset(nullptr);
916
168k
        _src_block_init = false;
917
168k
        bool has_next = _first_scan_range;
918
168k
        if (!_first_scan_range) {
919
97.5k
            RETURN_IF_ERROR(_split_source->get_next(&has_next, &_current_range));
920
97.5k
        }
921
168k
        _first_scan_range = false;
922
168k
        if (!has_next || _should_stop) {
923
91.4k
            _scanner_eof = true;
924
91.4k
            return Status::OK();
925
91.4k
        }
926
927
77.4k
        const TFileRangeDesc& range = _current_range;
928
77.4k
        _current_range_path = range.path;
929
930
77.4k
        if (!_partition_slot_descs.empty()) {
931
            // we need get partition columns first for runtime filter partition pruning
932
8.47k
            RETURN_IF_ERROR(_generate_partition_columns());
933
934
8.47k
            if (_state->query_options().enable_runtime_filter_partition_prune) {
935
                // if enable_runtime_filter_partition_prune is true, we need to check whether this range can be filtered out
936
                // by runtime filter partition prune
937
8.46k
                if (_push_down_conjuncts.size() < _conjuncts.size()) {
938
                    // there are new runtime filters, need to re-init runtime filter partition pruning ctxs
939
4.67k
                    _init_runtime_filter_partition_prune_ctxs();
940
4.67k
                }
941
942
8.46k
                bool can_filter_all = false;
943
8.46k
                RETURN_IF_ERROR(_process_runtime_filters_partition_prune(can_filter_all));
944
8.46k
                if (can_filter_all) {
945
                    // this range can be filtered out by runtime filter partition pruning
946
                    // so we need to skip this range
947
426
                    COUNTER_UPDATE(_runtime_filter_partition_pruned_range_counter, 1);
948
426
                    continue;
949
426
                }
950
8.46k
            }
951
8.47k
        }
952
953
        // create reader for specific format
954
76.9k
        Status init_status = Status::OK();
955
76.9k
        TFileFormatType::type format_type = _get_current_format_type();
956
        // for compatibility, this logic is deprecated in 3.1
957
76.9k
        if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params) {
958
5.00k
            if (range.table_format_params.table_format_type == "paimon" &&
959
5.00k
                !range.table_format_params.paimon_params.__isset.paimon_split) {
960
                // use native reader
961
0
                auto format = range.table_format_params.paimon_params.file_format;
962
0
                if (format == "orc") {
963
0
                    format_type = TFileFormatType::FORMAT_ORC;
964
0
                } else if (format == "parquet") {
965
0
                    format_type = TFileFormatType::FORMAT_PARQUET;
966
0
                } else {
967
0
                    return Status::InternalError("Not supported paimon file format: {}", format);
968
0
                }
969
0
            }
970
5.00k
        }
971
972
        // JNI reader can only push down column value range
973
76.9k
        bool push_down_predicates = !_is_load && format_type != TFileFormatType::FORMAT_JNI;
974
76.9k
        bool need_to_get_parsed_schema = false;
975
76.9k
        switch (format_type) {
976
5.00k
        case TFileFormatType::FORMAT_JNI: {
977
5.00k
            if (range.__isset.table_format_params &&
978
5.00k
                range.table_format_params.table_format_type == "max_compute") {
979
0
                const auto* mc_desc = static_cast<const MaxComputeTableDescriptor*>(
980
0
                        _real_tuple_desc->table_desc());
981
0
                if (!mc_desc->init_status()) {
982
0
                    return mc_desc->init_status();
983
0
                }
984
0
                std::unique_ptr<MaxComputeJniReader> mc_reader = MaxComputeJniReader::create_unique(
985
0
                        mc_desc, range.table_format_params.max_compute_params, _file_slot_descs,
986
0
                        range, _state, _profile);
987
0
                init_status = mc_reader->init_reader();
988
0
                _cur_reader = std::move(mc_reader);
989
5.00k
            } else if (range.__isset.table_format_params &&
990
5.00k
                       range.table_format_params.table_format_type == "paimon") {
991
1.89k
                if (_state->query_options().__isset.enable_paimon_cpp_reader &&
992
1.89k
                    _state->query_options().enable_paimon_cpp_reader) {
993
0
                    auto cpp_reader = PaimonCppReader::create_unique(_file_slot_descs, _state,
994
0
                                                                     _profile, range, _params);
995
0
                    cpp_reader->set_push_down_agg_type(_get_push_down_agg_type());
996
0
                    if (!_is_load && !_push_down_conjuncts.empty()) {
997
0
                        PaimonPredicateConverter predicate_converter(_file_slot_descs, _state);
998
0
                        auto predicate = predicate_converter.build(_push_down_conjuncts);
999
0
                        if (predicate) {
1000
0
                            cpp_reader->set_predicate(std::move(predicate));
1001
0
                        }
1002
0
                    }
1003
0
                    init_status = cpp_reader->init_reader();
1004
0
                    _cur_reader = std::move(cpp_reader);
1005
1.89k
                } else {
1006
1.89k
                    _cur_reader = PaimonJniReader::create_unique(_file_slot_descs, _state, _profile,
1007
1.89k
                                                                 range, _params);
1008
1.89k
                    init_status = ((PaimonJniReader*)(_cur_reader.get()))->init_reader();
1009
1.89k
                }
1010
3.11k
            } else if (range.__isset.table_format_params &&
1011
3.11k
                       range.table_format_params.table_format_type == "hudi") {
1012
0
                _cur_reader = HudiJniReader::create_unique(*_params,
1013
0
                                                           range.table_format_params.hudi_params,
1014
0
                                                           _file_slot_descs, _state, _profile);
1015
0
                init_status = ((HudiJniReader*)_cur_reader.get())->init_reader();
1016
1017
3.11k
            } else if (range.__isset.table_format_params &&
1018
3.11k
                       range.table_format_params.table_format_type == "trino_connector") {
1019
790
                _cur_reader = TrinoConnectorJniReader::create_unique(_file_slot_descs, _state,
1020
790
                                                                     _profile, range);
1021
790
                init_status = ((TrinoConnectorJniReader*)(_cur_reader.get()))->init_reader();
1022
2.32k
            } else if (range.__isset.table_format_params &&
1023
2.32k
                       range.table_format_params.table_format_type == "jdbc") {
1024
                // Extract jdbc params from table_format_params
1025
2.32k
                std::map<std::string, std::string> jdbc_params(
1026
2.32k
                        range.table_format_params.jdbc_params.begin(),
1027
2.32k
                        range.table_format_params.jdbc_params.end());
1028
2.32k
                _cur_reader = JdbcJniReader::create_unique(_file_slot_descs, _state, _profile,
1029
2.32k
                                                           jdbc_params);
1030
2.32k
                init_status = ((JdbcJniReader*)(_cur_reader.get()))->init_reader();
1031
2.32k
            }
1032
            // Set col_name_to_block_idx for JNI readers to avoid repeated map creation
1033
5.00k
            if (_cur_reader) {
1034
4.99k
                if (auto* jni_reader = dynamic_cast<JniReader*>(_cur_reader.get())) {
1035
4.99k
                    jni_reader->set_col_name_to_block_idx(&_src_block_name_to_idx);
1036
4.99k
                }
1037
4.99k
            }
1038
5.00k
            break;
1039
5.00k
        }
1040
34.2k
        case TFileFormatType::FORMAT_PARQUET: {
1041
34.2k
            auto file_meta_cache_ptr = _should_enable_file_meta_cache()
1042
34.2k
                                               ? ExecEnv::GetInstance()->file_meta_cache()
1043
18.4E
                                               : nullptr;
1044
34.2k
            std::unique_ptr<ParquetReader> parquet_reader = ParquetReader::create_unique(
1045
34.2k
                    _profile, *_params, range, _state->query_options().batch_size,
1046
34.2k
                    &_state->timezone_obj(), _io_ctx.get(), _state, file_meta_cache_ptr,
1047
34.2k
                    _state->query_options().enable_parquet_lazy_mat);
1048
1049
34.2k
            if (_row_id_column_iterator_pair.second != -1) {
1050
2.44k
                RETURN_IF_ERROR(_create_row_id_column_iterator());
1051
2.44k
                parquet_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
1052
2.44k
            }
1053
1054
            // ATTN: the push down agg type may be set back to NONE,
1055
            // see IcebergTableReader::init_row_filters for example.
1056
34.2k
            parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
1057
34.2k
            if (push_down_predicates) {
1058
34.2k
                RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1059
34.2k
            }
1060
34.2k
            RETURN_IF_ERROR(_init_parquet_reader(std::move(parquet_reader), file_meta_cache_ptr));
1061
1062
34.2k
            need_to_get_parsed_schema = true;
1063
34.2k
            break;
1064
34.2k
        }
1065
31.5k
        case TFileFormatType::FORMAT_ORC: {
1066
31.5k
            auto file_meta_cache_ptr = _should_enable_file_meta_cache()
1067
31.5k
                                               ? ExecEnv::GetInstance()->file_meta_cache()
1068
31.5k
                                               : nullptr;
1069
31.5k
            std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
1070
31.5k
                    _profile, _state, *_params, range, _state->query_options().batch_size,
1071
31.5k
                    _state->timezone(), _io_ctx.get(), file_meta_cache_ptr,
1072
31.5k
                    _state->query_options().enable_orc_lazy_mat);
1073
31.5k
            if (_row_id_column_iterator_pair.second != -1) {
1074
9.44k
                RETURN_IF_ERROR(_create_row_id_column_iterator());
1075
9.44k
                orc_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
1076
9.44k
            }
1077
1078
31.5k
            orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
1079
31.5k
            if (push_down_predicates) {
1080
31.5k
                RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1081
31.5k
            }
1082
31.5k
            RETURN_IF_ERROR(_init_orc_reader(std::move(orc_reader), file_meta_cache_ptr));
1083
1084
31.5k
            need_to_get_parsed_schema = true;
1085
31.5k
            break;
1086
31.5k
        }
1087
816
        case TFileFormatType::FORMAT_CSV_PLAIN:
1088
816
        case TFileFormatType::FORMAT_CSV_GZ:
1089
816
        case TFileFormatType::FORMAT_CSV_BZ2:
1090
816
        case TFileFormatType::FORMAT_CSV_LZ4FRAME:
1091
816
        case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
1092
816
        case TFileFormatType::FORMAT_CSV_LZOP:
1093
816
        case TFileFormatType::FORMAT_CSV_DEFLATE:
1094
816
        case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
1095
816
        case TFileFormatType::FORMAT_PROTO: {
1096
816
            auto reader = CsvReader::create_unique(_state, _profile, &_counter, *_params, range,
1097
816
                                                   _file_slot_descs, _io_ctx.get());
1098
1099
816
            init_status = reader->init_reader(_is_load);
1100
816
            _cur_reader = std::move(reader);
1101
816
            break;
1102
816
        }
1103
5.05k
        case TFileFormatType::FORMAT_TEXT: {
1104
5.05k
            auto reader = TextReader::create_unique(_state, _profile, &_counter, *_params, range,
1105
5.05k
                                                    _file_slot_descs, _io_ctx.get());
1106
5.05k
            init_status = reader->init_reader(_is_load);
1107
5.05k
            _cur_reader = std::move(reader);
1108
5.05k
            break;
1109
816
        }
1110
306
        case TFileFormatType::FORMAT_JSON: {
1111
306
            _cur_reader =
1112
306
                    NewJsonReader::create_unique(_state, _profile, &_counter, *_params, range,
1113
306
                                                 _file_slot_descs, &_scanner_eof, _io_ctx.get());
1114
306
            init_status = ((NewJsonReader*)(_cur_reader.get()))
1115
306
                                  ->init_reader(_col_default_value_ctx, _is_load);
1116
306
            break;
1117
816
        }
1118
1119
0
        case TFileFormatType::FORMAT_WAL: {
1120
0
            _cur_reader = WalReader::create_unique(_state);
1121
0
            init_status = ((WalReader*)(_cur_reader.get()))->init_reader(_output_tuple_desc);
1122
0
            break;
1123
816
        }
1124
0
        case TFileFormatType::FORMAT_NATIVE: {
1125
0
            auto reader =
1126
0
                    NativeReader::create_unique(_profile, *_params, range, _io_ctx.get(), _state);
1127
0
            init_status = reader->init_reader();
1128
0
            _cur_reader = std::move(reader);
1129
0
            need_to_get_parsed_schema = false;
1130
0
            break;
1131
816
        }
1132
98
        case TFileFormatType::FORMAT_ARROW: {
1133
98
            if (range.__isset.table_format_params &&
1134
98
                range.table_format_params.table_format_type == "remote_doris") {
1135
98
                _cur_reader =
1136
98
                        RemoteDorisReader::create_unique(_file_slot_descs, _state, _profile, range);
1137
98
                init_status = ((RemoteDorisReader*)(_cur_reader.get()))->init_reader();
1138
98
                if (_cur_reader) {
1139
98
                    static_cast<RemoteDorisReader*>(_cur_reader.get())
1140
98
                            ->set_col_name_to_block_idx(&_src_block_name_to_idx);
1141
98
                }
1142
98
            } else {
1143
0
                _cur_reader =
1144
0
                        ArrowStreamReader::create_unique(_state, _profile, &_counter, *_params,
1145
0
                                                         range, _file_slot_descs, _io_ctx.get());
1146
0
                init_status = ((ArrowStreamReader*)(_cur_reader.get()))->init_reader();
1147
0
            }
1148
98
            break;
1149
816
        }
1150
0
        default:
1151
0
            return Status::NotSupported("Not supported create reader for file format: {}.",
1152
0
                                        to_string(_params->format_type));
1153
76.9k
        }
1154
1155
76.9k
        if (_cur_reader == nullptr) {
1156
1
            return Status::NotSupported(
1157
1
                    "Not supported create reader for table format: {} / file format: {}.",
1158
1
                    range.__isset.table_format_params ? range.table_format_params.table_format_type
1159
1
                                                      : "NotSet",
1160
1
                    to_string(_params->format_type));
1161
1
        }
1162
76.9k
        COUNTER_UPDATE(_file_counter, 1);
1163
        // The FileScanner for external table may try to open not exist files,
1164
        // Because FE file cache for external table may out of date.
1165
        // So, NOT_FOUND for FileScanner is not a fail case.
1166
        // Will remove this after file reader refactor.
1167
76.9k
        if (init_status.is<END_OF_FILE>()) {
1168
0
            COUNTER_UPDATE(_empty_file_counter, 1);
1169
0
            continue;
1170
76.9k
        } else if (init_status.is<ErrorCode::NOT_FOUND>()) {
1171
0
            if (config::ignore_not_found_file_in_external_table) {
1172
0
                COUNTER_UPDATE(_not_found_file_counter, 1);
1173
0
                continue;
1174
0
            }
1175
0
            return Status::InternalError("failed to find reader, err: {}", init_status.to_string());
1176
76.9k
        } else if (!init_status.ok()) {
1177
2
            return Status::InternalError("failed to init reader, err: {}", init_status.to_string());
1178
2
        }
1179
1180
76.9k
        _cur_reader->set_push_down_agg_type(_get_push_down_agg_type());
1181
76.9k
        if (_get_push_down_agg_type() == TPushAggOp::type::COUNT &&
1182
76.9k
            range.__isset.table_format_params &&
1183
76.9k
            range.table_format_params.table_level_row_count >= 0) {
1184
            // This is a table level count push down operation, no need to call
1185
            // _set_fill_or_truncate_columns.
1186
            // in _set_fill_or_truncate_columns, we will use [range.start_offset, end offset]
1187
            // to filter the row group. But if this is count push down, the offset is undefined,
1188
            // causing incorrect row group filter and may return empty result.
1189
76.7k
        } else {
1190
76.7k
            Status status = _set_fill_or_truncate_columns(need_to_get_parsed_schema);
1191
76.7k
            if (status.is<END_OF_FILE>()) { // all parquet row groups are filtered
1192
4
                continue;
1193
76.7k
            } else if (!status.ok()) {
1194
0
                return Status::InternalError("failed to set_fill_or_truncate_columns, err: {}",
1195
0
                                             status.to_string());
1196
0
            }
1197
76.7k
        }
1198
76.9k
        _cur_reader_eof = false;
1199
76.9k
        break;
1200
76.9k
    }
1201
76.8k
    return Status::OK();
1202
168k
}
1203
1204
Status FileScanner::_init_parquet_reader(std::unique_ptr<ParquetReader>&& parquet_reader,
1205
35.5k
                                         FileMetaCache* file_meta_cache_ptr) {
1206
35.5k
    const TFileRangeDesc& range = _current_range;
1207
35.5k
    Status init_status = Status::OK();
1208
1209
35.5k
    phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> slot_id_to_predicates =
1210
35.5k
            _local_state
1211
35.5k
                    ? _local_state->cast<FileScanLocalState>()._slot_id_to_predicates
1212
35.5k
                    : phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> {};
1213
35.5k
    if (range.__isset.table_format_params &&
1214
35.5k
        range.table_format_params.table_format_type == "iceberg") {
1215
17.5k
        std::unique_ptr<IcebergParquetReader> iceberg_reader = IcebergParquetReader::create_unique(
1216
17.5k
                std::move(parquet_reader), _profile, _state, *_params, range, _kv_cache,
1217
17.5k
                _io_ctx.get(), file_meta_cache_ptr);
1218
17.5k
        init_status = iceberg_reader->init_reader(
1219
17.5k
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1220
17.5k
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1221
17.5k
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1222
17.5k
                &_slot_id_to_filter_conjuncts);
1223
17.5k
        _cur_reader = std::move(iceberg_reader);
1224
18.0k
    } else if (range.__isset.table_format_params &&
1225
18.0k
               range.table_format_params.table_format_type == "paimon") {
1226
2.63k
        std::unique_ptr<PaimonParquetReader> paimon_reader = PaimonParquetReader::create_unique(
1227
2.63k
                std::move(parquet_reader), _profile, _state, *_params, range, _kv_cache,
1228
2.63k
                _io_ctx.get(), file_meta_cache_ptr);
1229
2.63k
        init_status = paimon_reader->init_reader(
1230
2.63k
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1231
2.63k
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1232
2.63k
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1233
2.63k
                &_slot_id_to_filter_conjuncts);
1234
2.63k
        RETURN_IF_ERROR(paimon_reader->init_row_filters());
1235
2.63k
        _cur_reader = std::move(paimon_reader);
1236
15.3k
    } else if (range.__isset.table_format_params &&
1237
15.3k
               range.table_format_params.table_format_type == "hudi") {
1238
0
        std::unique_ptr<HudiParquetReader> hudi_reader = HudiParquetReader::create_unique(
1239
0
                std::move(parquet_reader), _profile, _state, *_params, range, _io_ctx.get(),
1240
0
                file_meta_cache_ptr);
1241
0
        init_status = hudi_reader->init_reader(
1242
0
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1243
0
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1244
0
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1245
0
                &_slot_id_to_filter_conjuncts);
1246
0
        _cur_reader = std::move(hudi_reader);
1247
15.3k
    } else if (range.table_format_params.table_format_type == "hive") {
1248
12.3k
        auto hive_reader = HiveParquetReader::create_unique(std::move(parquet_reader), _profile,
1249
12.3k
                                                            _state, *_params, range, _io_ctx.get(),
1250
12.3k
                                                            &_is_file_slot, file_meta_cache_ptr);
1251
12.3k
        init_status = hive_reader->init_reader(
1252
12.3k
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1253
12.3k
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1254
12.3k
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1255
12.3k
                &_slot_id_to_filter_conjuncts);
1256
12.3k
        _cur_reader = std::move(hive_reader);
1257
12.3k
    } else if (range.table_format_params.table_format_type == "tvf") {
1258
3.02k
        const FieldDescriptor* parquet_meta = nullptr;
1259
3.02k
        RETURN_IF_ERROR(parquet_reader->get_file_metadata_schema(&parquet_meta));
1260
3.02k
        DCHECK(parquet_meta != nullptr);
1261
1262
        // TVF will first `get_parsed_schema` to obtain file information from BE, and FE will convert
1263
        // the column names to lowercase (because the query process is case-insensitive),
1264
        // so the lowercase file column names are used here to match the read columns.
1265
3.02k
        std::shared_ptr<TableSchemaChangeHelper::Node> tvf_info_node = nullptr;
1266
3.02k
        RETURN_IF_ERROR(TableSchemaChangeHelper::BuildTableInfoUtil::by_parquet_name(
1267
3.02k
                _real_tuple_desc, *parquet_meta, tvf_info_node));
1268
3.02k
        init_status = parquet_reader->init_reader(
1269
3.02k
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1270
3.02k
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1271
3.02k
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1272
3.02k
                &_slot_id_to_filter_conjuncts, tvf_info_node);
1273
3.02k
        _cur_reader = std::move(parquet_reader);
1274
3.02k
    } else if (_is_load) {
1275
4
        const FieldDescriptor* parquet_meta = nullptr;
1276
4
        RETURN_IF_ERROR(parquet_reader->get_file_metadata_schema(&parquet_meta));
1277
4
        DCHECK(parquet_meta != nullptr);
1278
1279
        // Load is case-insensitive, so you to match the columns in the file.
1280
4
        std::map<std::string, std::string> file_lower_name_to_native;
1281
18
        for (const auto& parquet_field : parquet_meta->get_fields_schema()) {
1282
18
            file_lower_name_to_native.emplace(doris::to_lower(parquet_field.name),
1283
18
                                              parquet_field.name);
1284
18
        }
1285
4
        auto load_info_node = std::make_shared<TableSchemaChangeHelper::StructNode>();
1286
18
        for (const auto slot : _real_tuple_desc->slots()) {
1287
18
            if (file_lower_name_to_native.contains(slot->col_name())) {
1288
18
                load_info_node->add_children(slot->col_name(),
1289
18
                                             file_lower_name_to_native[slot->col_name()],
1290
18
                                             TableSchemaChangeHelper::ConstNode::get_instance());
1291
                // For Load, `file_scanner` will create block columns using the file type,
1292
                // there is no schema change when reading inside the struct,
1293
                // so use `TableSchemaChangeHelper::ConstNode`.
1294
18
            } else {
1295
0
                load_info_node->add_not_exist_children(slot->col_name());
1296
0
            }
1297
18
        }
1298
1299
4
        init_status = parquet_reader->init_reader(
1300
4
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1301
4
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1302
4
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1303
4
                &_slot_id_to_filter_conjuncts, load_info_node);
1304
4
        _cur_reader = std::move(parquet_reader);
1305
4
    }
1306
1307
35.5k
    return init_status;
1308
35.5k
}
1309
1310
Status FileScanner::_init_orc_reader(std::unique_ptr<OrcReader>&& orc_reader,
1311
34.4k
                                     FileMetaCache* file_meta_cache_ptr) {
1312
34.4k
    const TFileRangeDesc& range = _current_range;
1313
34.4k
    Status init_status = Status::OK();
1314
1315
34.4k
    if (range.__isset.table_format_params &&
1316
34.4k
        range.table_format_params.table_format_type == "transactional_hive") {
1317
292
        std::unique_ptr<TransactionalHiveReader> tran_orc_reader =
1318
292
                TransactionalHiveReader::create_unique(std::move(orc_reader), _profile, _state,
1319
292
                                                       *_params, range, _io_ctx.get(),
1320
292
                                                       file_meta_cache_ptr);
1321
292
        init_status = tran_orc_reader->init_reader(
1322
292
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
1323
292
                _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1324
292
                &_slot_id_to_filter_conjuncts);
1325
292
        RETURN_IF_ERROR(tran_orc_reader->init_row_filters());
1326
292
        _cur_reader = std::move(tran_orc_reader);
1327
34.1k
    } else if (range.__isset.table_format_params &&
1328
34.1k
               range.table_format_params.table_format_type == "iceberg") {
1329
6.00k
        std::unique_ptr<IcebergOrcReader> iceberg_reader = IcebergOrcReader::create_unique(
1330
6.00k
                std::move(orc_reader), _profile, _state, *_params, range, _kv_cache, _io_ctx.get(),
1331
6.00k
                file_meta_cache_ptr);
1332
1333
6.00k
        init_status = iceberg_reader->init_reader(
1334
6.00k
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
1335
6.00k
                _default_val_row_desc.get(), _col_name_to_slot_id,
1336
6.00k
                &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
1337
6.00k
        _cur_reader = std::move(iceberg_reader);
1338
28.1k
    } else if (range.__isset.table_format_params &&
1339
28.1k
               range.table_format_params.table_format_type == "paimon") {
1340
2.27k
        std::unique_ptr<PaimonOrcReader> paimon_reader = PaimonOrcReader::create_unique(
1341
2.27k
                std::move(orc_reader), _profile, _state, *_params, range, _kv_cache, _io_ctx.get(),
1342
2.27k
                file_meta_cache_ptr);
1343
1344
2.27k
        init_status = paimon_reader->init_reader(
1345
2.27k
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
1346
2.27k
                _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1347
2.27k
                &_slot_id_to_filter_conjuncts);
1348
2.27k
        RETURN_IF_ERROR(paimon_reader->init_row_filters());
1349
2.27k
        _cur_reader = std::move(paimon_reader);
1350
25.8k
    } else if (range.__isset.table_format_params &&
1351
25.8k
               range.table_format_params.table_format_type == "hudi") {
1352
0
        std::unique_ptr<HudiOrcReader> hudi_reader =
1353
0
                HudiOrcReader::create_unique(std::move(orc_reader), _profile, _state, *_params,
1354
0
                                             range, _io_ctx.get(), file_meta_cache_ptr);
1355
1356
0
        init_status = hudi_reader->init_reader(
1357
0
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
1358
0
                _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1359
0
                &_slot_id_to_filter_conjuncts);
1360
0
        _cur_reader = std::move(hudi_reader);
1361
25.8k
    } else if (range.__isset.table_format_params &&
1362
25.8k
               range.table_format_params.table_format_type == "hive") {
1363
23.7k
        std::unique_ptr<HiveOrcReader> hive_reader = HiveOrcReader::create_unique(
1364
23.7k
                std::move(orc_reader), _profile, _state, *_params, range, _io_ctx.get(),
1365
23.7k
                &_is_file_slot, file_meta_cache_ptr);
1366
1367
23.7k
        init_status = hive_reader->init_reader(
1368
23.7k
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
1369
23.7k
                _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1370
23.7k
                &_slot_id_to_filter_conjuncts);
1371
23.7k
        _cur_reader = std::move(hive_reader);
1372
23.7k
    } else if (range.__isset.table_format_params &&
1373
2.12k
               range.table_format_params.table_format_type == "tvf") {
1374
2.10k
        const orc::Type* orc_type_ptr = nullptr;
1375
2.10k
        RETURN_IF_ERROR(orc_reader->get_file_type(&orc_type_ptr));
1376
1377
2.10k
        std::shared_ptr<TableSchemaChangeHelper::Node> tvf_info_node = nullptr;
1378
2.10k
        RETURN_IF_ERROR(TableSchemaChangeHelper::BuildTableInfoUtil::by_orc_name(
1379
2.10k
                _real_tuple_desc, orc_type_ptr, tvf_info_node));
1380
2.10k
        init_status = orc_reader->init_reader(
1381
2.10k
                &_file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, false,
1382
2.10k
                _real_tuple_desc, _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1383
2.10k
                &_slot_id_to_filter_conjuncts, tvf_info_node);
1384
2.10k
        _cur_reader = std::move(orc_reader);
1385
2.10k
    } else if (_is_load) {
1386
2
        const orc::Type* orc_type_ptr = nullptr;
1387
2
        RETURN_IF_ERROR(orc_reader->get_file_type(&orc_type_ptr));
1388
1389
2
        std::map<std::string, std::string> file_lower_name_to_native;
1390
16
        for (uint64_t idx = 0; idx < orc_type_ptr->getSubtypeCount(); idx++) {
1391
14
            file_lower_name_to_native.emplace(doris::to_lower(orc_type_ptr->getFieldName(idx)),
1392
14
                                              orc_type_ptr->getFieldName(idx));
1393
14
        }
1394
1395
2
        auto load_info_node = std::make_shared<TableSchemaChangeHelper::StructNode>();
1396
14
        for (const auto slot : _real_tuple_desc->slots()) {
1397
14
            if (file_lower_name_to_native.contains(slot->col_name())) {
1398
14
                load_info_node->add_children(slot->col_name(),
1399
14
                                             file_lower_name_to_native[slot->col_name()],
1400
14
                                             TableSchemaChangeHelper::ConstNode::get_instance());
1401
14
            } else {
1402
0
                load_info_node->add_not_exist_children(slot->col_name());
1403
0
            }
1404
14
        }
1405
2
        init_status = orc_reader->init_reader(
1406
2
                &_file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, false,
1407
2
                _real_tuple_desc, _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1408
2
                &_slot_id_to_filter_conjuncts, load_info_node);
1409
2
        _cur_reader = std::move(orc_reader);
1410
2
    }
1411
1412
34.4k
    return init_status;
1413
34.4k
}
1414
1415
80.9k
Status FileScanner::_set_fill_or_truncate_columns(bool need_to_get_parsed_schema) {
1416
80.9k
    _missing_cols.clear();
1417
80.9k
    _slot_lower_name_to_col_type.clear();
1418
80.9k
    std::unordered_map<std::string, DataTypePtr> name_to_col_type;
1419
80.9k
    RETURN_IF_ERROR(_cur_reader->get_columns(&name_to_col_type, &_missing_cols));
1420
3.52M
    for (const auto& [col_name, col_type] : name_to_col_type) {
1421
3.52M
        auto col_name_lower = to_lower(col_name);
1422
3.52M
        if (_partition_col_descs.contains(col_name_lower)) {
1423
            /*
1424
             * `_slot_lower_name_to_col_type` is used by `_init_src_block` and `_cast_to_input_block` during LOAD to
1425
             * generate columns of the corresponding type, which records the columns existing in the file.
1426
             *
1427
             * When a column in `COLUMNS FROM PATH` exists in a file column, the column type in the block will
1428
             * not match the slot type in `_output_tuple_desc`, causing an error when
1429
             * Serde `deserialize_one_cell_from_json` fills the partition values.
1430
             *
1431
             * So for partition column not need fill _slot_lower_name_to_col_type.
1432
             */
1433
2.85k
            continue;
1434
2.85k
        }
1435
3.51M
        _slot_lower_name_to_col_type.emplace(col_name_lower, col_type);
1436
3.51M
    }
1437
1438
80.9k
    if (!_fill_partition_from_path && config::enable_iceberg_partition_column_fallback) {
1439
        // check if the cols of _partition_col_descs are in _missing_cols
1440
        // if so, set _fill_partition_from_path to true and remove the col from _missing_cols
1441
2.72k
        for (const auto& [col_name, col_type] : _partition_col_descs) {
1442
2.72k
            if (_missing_cols.contains(col_name)) {
1443
0
                _fill_partition_from_path = true;
1444
0
                _missing_cols.erase(col_name);
1445
0
            }
1446
2.72k
        }
1447
2.28k
    }
1448
1449
80.9k
    RETURN_IF_ERROR(_generate_missing_columns());
1450
80.9k
    if (_fill_partition_from_path) {
1451
78.5k
        RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs));
1452
78.5k
    } else {
1453
        // If the partition columns are not from path, we only fill the missing columns.
1454
2.31k
        RETURN_IF_ERROR(_cur_reader->set_fill_columns({}, _missing_col_descs));
1455
2.31k
    }
1456
80.9k
    if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
1457
0
        fmt::memory_buffer col_buf;
1458
0
        for (auto& col : _missing_cols) {
1459
0
            fmt::format_to(col_buf, " {}", col);
1460
0
        }
1461
0
        VLOG_NOTICE << fmt::format("Unknown columns:{} in file {}", fmt::to_string(col_buf),
1462
0
                                   _current_range.path);
1463
0
    }
1464
1465
80.9k
    RETURN_IF_ERROR(_generate_truncate_columns(need_to_get_parsed_schema));
1466
80.9k
    return Status::OK();
1467
80.9k
}
1468
1469
80.8k
Status FileScanner::_generate_truncate_columns(bool need_to_get_parsed_schema) {
1470
80.8k
    _source_file_col_name_types.clear();
1471
    //  The col names and types of source file, such as parquet, orc files.
1472
80.8k
    if (_state->query_options().truncate_char_or_varchar_columns && need_to_get_parsed_schema) {
1473
16
        std::vector<std::string> source_file_col_names;
1474
16
        std::vector<DataTypePtr> source_file_col_types;
1475
16
        Status status =
1476
16
                _cur_reader->get_parsed_schema(&source_file_col_names, &source_file_col_types);
1477
16
        if (!status.ok() && status.code() != TStatusCode::NOT_IMPLEMENTED_ERROR) {
1478
0
            return status;
1479
0
        }
1480
16
        DCHECK_EQ(source_file_col_names.size(), source_file_col_types.size());
1481
64
        for (int i = 0; i < source_file_col_names.size(); ++i) {
1482
48
            _source_file_col_name_types[to_lower(source_file_col_names[i])] =
1483
48
                    source_file_col_types[i];
1484
48
        }
1485
16
    }
1486
80.8k
    return Status::OK();
1487
80.8k
}
1488
1489
4.21k
Status FileScanner::prepare_for_read_lines(const TFileRangeDesc& range) {
1490
4.21k
    _current_range = range;
1491
1492
4.21k
    _file_cache_statistics.reset(new io::FileCacheStatistics());
1493
4.21k
    _file_reader_stats.reset(new io::FileReaderStats());
1494
1495
4.21k
    _file_read_bytes_counter =
1496
4.21k
            ADD_COUNTER_WITH_LEVEL(_profile, FileReadBytesProfile, TUnit::BYTES, 1);
1497
4.21k
    _file_read_time_counter = ADD_TIMER_WITH_LEVEL(_profile, FileReadTimeProfile, 1);
1498
1499
4.21k
    RETURN_IF_ERROR(_init_io_ctx());
1500
4.21k
    _io_ctx->file_cache_stats = _file_cache_statistics.get();
1501
4.21k
    _io_ctx->file_reader_stats = _file_reader_stats.get();
1502
4.21k
    _default_val_row_desc.reset(new RowDescriptor((TupleDescriptor*)_real_tuple_desc));
1503
4.21k
    RETURN_IF_ERROR(_init_expr_ctxes());
1504
1505
    // Since only one column is read from the file, there is no need to filter, so set these variables to empty.
1506
4.21k
    _push_down_conjuncts.clear();
1507
4.21k
    _not_single_slot_filter_conjuncts.clear();
1508
4.21k
    _slot_id_to_filter_conjuncts.clear();
1509
4.21k
    _kv_cache = nullptr;
1510
4.21k
    return Status::OK();
1511
4.21k
}
1512
1513
Status FileScanner::read_lines_from_range(const TFileRangeDesc& range,
1514
                                          const std::list<int64_t>& row_ids, Block* result_block,
1515
                                          const ExternalFileMappingInfo& external_info,
1516
4.19k
                                          int64_t* init_reader_ms, int64_t* get_block_ms) {
1517
4.19k
    _current_range = range;
1518
4.19k
    RETURN_IF_ERROR(_generate_partition_columns());
1519
1520
4.19k
    TFileFormatType::type format_type = _get_current_format_type();
1521
4.19k
    Status init_status = Status::OK();
1522
1523
4.19k
    auto file_meta_cache_ptr = external_info.enable_file_meta_cache
1524
4.19k
                                       ? ExecEnv::GetInstance()->file_meta_cache()
1525
4.19k
                                       : nullptr;
1526
1527
4.19k
    RETURN_IF_ERROR(scope_timer_run(
1528
4.19k
            [&]() -> Status {
1529
4.19k
                switch (format_type) {
1530
4.19k
                case TFileFormatType::FORMAT_PARQUET: {
1531
4.19k
                    std::unique_ptr<ParquetReader> parquet_reader = ParquetReader::create_unique(
1532
4.19k
                            _profile, *_params, range, 1, &_state->timezone_obj(), _io_ctx.get(),
1533
4.19k
                            _state, file_meta_cache_ptr, false);
1534
1535
4.19k
                    RETURN_IF_ERROR(parquet_reader->read_by_rows(row_ids));
1536
4.19k
                    RETURN_IF_ERROR(
1537
4.19k
                            _init_parquet_reader(std::move(parquet_reader), file_meta_cache_ptr));
1538
4.19k
                    break;
1539
4.19k
                }
1540
4.19k
                case TFileFormatType::FORMAT_ORC: {
1541
4.19k
                    std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
1542
4.19k
                            _profile, _state, *_params, range, 1, _state->timezone(), _io_ctx.get(),
1543
4.19k
                            file_meta_cache_ptr, false);
1544
1545
4.19k
                    RETURN_IF_ERROR(orc_reader->read_by_rows(row_ids));
1546
4.19k
                    RETURN_IF_ERROR(_init_orc_reader(std::move(orc_reader), file_meta_cache_ptr));
1547
4.19k
                    break;
1548
4.19k
                }
1549
4.19k
                default: {
1550
4.19k
                    return Status::NotSupported(
1551
4.19k
                            "Not support create lines reader for file format: {},"
1552
4.19k
                            "only support parquet and orc.",
1553
4.19k
                            to_string(_params->format_type));
1554
4.19k
                }
1555
4.19k
                }
1556
4.19k
                return Status::OK();
1557
4.19k
            },
1558
4.19k
            init_reader_ms));
1559
1560
4.19k
    RETURN_IF_ERROR(_set_fill_or_truncate_columns(true));
1561
4.19k
    _cur_reader_eof = false;
1562
1563
4.19k
    RETURN_IF_ERROR(scope_timer_run(
1564
4.19k
            [&]() -> Status {
1565
4.19k
                while (!_cur_reader_eof) {
1566
4.19k
                    bool eof = false;
1567
4.19k
                    RETURN_IF_ERROR(_get_block_impl(_state, result_block, &eof));
1568
4.19k
                }
1569
4.19k
                return Status::OK();
1570
4.19k
            },
1571
4.19k
            get_block_ms));
1572
1573
4.19k
    _cur_reader->collect_profile_before_close();
1574
4.19k
    RETURN_IF_ERROR(_cur_reader->close());
1575
1576
4.19k
    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
1577
4.19k
    COUNTER_UPDATE(_file_read_time_counter, _file_reader_stats->read_time_ns);
1578
4.19k
    return Status::OK();
1579
4.19k
}
1580
1581
12.6k
Status FileScanner::_generate_partition_columns() {
1582
12.6k
    _partition_col_descs.clear();
1583
12.6k
    _partition_value_is_null.clear();
1584
12.6k
    const TFileRangeDesc& range = _current_range;
1585
12.6k
    if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
1586
17.9k
        for (const auto& slot_desc : _partition_slot_descs) {
1587
17.9k
            if (slot_desc) {
1588
17.9k
                auto it = _partition_slot_index_map.find(slot_desc->id());
1589
17.9k
                if (it == std::end(_partition_slot_index_map)) {
1590
0
                    return Status::InternalError("Unknown source slot descriptor, slot_id={}",
1591
0
                                                 slot_desc->id());
1592
0
                }
1593
17.9k
                const std::string& column_from_path = range.columns_from_path[it->second];
1594
17.9k
                _partition_col_descs.emplace(slot_desc->col_name(),
1595
17.9k
                                             std::make_tuple(column_from_path, slot_desc));
1596
17.9k
                if (range.__isset.columns_from_path_is_null) {
1597
3.08k
                    _partition_value_is_null.emplace(slot_desc->col_name(),
1598
3.08k
                                                     range.columns_from_path_is_null[it->second]);
1599
3.08k
                }
1600
17.9k
            }
1601
17.9k
        }
1602
10.6k
    }
1603
12.6k
    return Status::OK();
1604
12.6k
}
1605
1606
80.9k
Status FileScanner::_generate_missing_columns() {
1607
80.9k
    _missing_col_descs.clear();
1608
80.9k
    if (!_missing_cols.empty()) {
1609
22.2k
        for (auto* slot_desc : _real_tuple_desc->slots()) {
1610
22.2k
            if (!_missing_cols.contains(slot_desc->col_name())) {
1611
15.6k
                continue;
1612
15.6k
            }
1613
1614
6.59k
            auto it = _col_default_value_ctx.find(slot_desc->col_name());
1615
6.59k
            if (it == _col_default_value_ctx.end()) {
1616
0
                return Status::InternalError("failed to find default value expr for slot: {}",
1617
0
                                             slot_desc->col_name());
1618
0
            }
1619
6.59k
            _missing_col_descs.emplace(slot_desc->col_name(), it->second);
1620
6.59k
        }
1621
4.94k
    }
1622
80.9k
    return Status::OK();
1623
80.9k
}
1624
1625
75.5k
Status FileScanner::_init_expr_ctxes() {
1626
75.5k
    std::map<SlotId, int> full_src_index_map;
1627
75.5k
    std::map<SlotId, SlotDescriptor*> full_src_slot_map;
1628
75.5k
    std::map<std::string, int> partition_name_to_key_index_map;
1629
75.5k
    int index = 0;
1630
497k
    for (const auto& slot_desc : _real_tuple_desc->slots()) {
1631
497k
        full_src_slot_map.emplace(slot_desc->id(), slot_desc);
1632
497k
        full_src_index_map.emplace(slot_desc->id(), index++);
1633
497k
    }
1634
1635
    // For external table query, find the index of column in path.
1636
    // Because query doesn't always search for all columns in a table
1637
    // and the order of selected columns is random.
1638
    // All ranges in _ranges vector should have identical columns_from_path_keys
1639
    // because they are all file splits for the same external table.
1640
    // So here use the first element of _ranges to fill the partition_name_to_key_index_map
1641
75.5k
    if (_current_range.__isset.columns_from_path_keys) {
1642
75.2k
        std::vector<std::string> key_map = _current_range.columns_from_path_keys;
1643
75.2k
        if (!key_map.empty()) {
1644
57.1k
            for (size_t i = 0; i < key_map.size(); i++) {
1645
37.4k
                partition_name_to_key_index_map.emplace(key_map[i], i);
1646
37.4k
            }
1647
19.7k
        }
1648
75.2k
    }
1649
1650
75.5k
    _num_of_columns_from_file = _params->num_of_columns_from_file;
1651
1652
497k
    for (const auto& slot_info : _params->required_slots) {
1653
497k
        auto slot_id = slot_info.slot_id;
1654
497k
        auto it = full_src_slot_map.find(slot_id);
1655
497k
        if (it == std::end(full_src_slot_map)) {
1656
0
            return Status::InternalError(
1657
0
                    fmt::format("Unknown source slot descriptor, slot_id={}", slot_id));
1658
0
        }
1659
497k
        if (it->second->col_name().starts_with(BeConsts::GLOBAL_ROWID_COL)) {
1660
11.3k
            _row_id_column_iterator_pair.second = _default_val_row_desc->get_column_id(slot_id);
1661
11.3k
            continue;
1662
11.3k
        }
1663
1664
485k
        if (slot_info.is_file_slot) {
1665
470k
            _is_file_slot.emplace(slot_id);
1666
470k
            _file_slot_descs.emplace_back(it->second);
1667
470k
            _file_col_names.push_back(it->second->col_name());
1668
470k
        }
1669
1670
485k
        if (partition_name_to_key_index_map.contains(it->second->col_name())) {
1671
17.8k
            if (slot_info.is_file_slot) {
1672
                // If there is slot which is both a partition column and a file column,
1673
                // we should not fill the partition column from path.
1674
3.04k
                _fill_partition_from_path = false;
1675
14.7k
            } else if (!_fill_partition_from_path) {
1676
                // This should not happen
1677
0
                return Status::InternalError(
1678
0
                        "Partition column {} is not a file column, but there is already a column "
1679
0
                        "which is both a partition column and a file column.",
1680
0
                        it->second->col_name());
1681
0
            }
1682
17.8k
            _partition_slot_descs.emplace_back(it->second);
1683
17.8k
            if (_is_load) {
1684
0
                auto iti = full_src_index_map.find(slot_id);
1685
0
                _partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file);
1686
17.8k
            } else {
1687
17.8k
                auto kit = partition_name_to_key_index_map.find(it->second->col_name());
1688
17.8k
                _partition_slot_index_map.emplace(slot_id, kit->second);
1689
17.8k
            }
1690
17.8k
        }
1691
485k
    }
1692
1693
    // set column name to default value expr map
1694
499k
    for (auto* slot_desc : _real_tuple_desc->slots()) {
1695
499k
        VExprContextSPtr ctx;
1696
499k
        auto it = _params->default_value_of_src_slot.find(slot_desc->id());
1697
499k
        if (it != std::end(_params->default_value_of_src_slot)) {
1698
487k
            if (!it->second.nodes.empty()) {
1699
443k
                RETURN_IF_ERROR(VExpr::create_expr_tree(it->second, ctx));
1700
443k
                RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc));
1701
443k
                RETURN_IF_ERROR(ctx->open(_state));
1702
443k
            }
1703
            // if expr is empty, the default value will be null
1704
487k
            _col_default_value_ctx.emplace(slot_desc->col_name(), ctx);
1705
487k
        }
1706
499k
    }
1707
1708
75.5k
    if (_is_load) {
1709
        // follow desc expr map is only for load task.
1710
342
        bool has_slot_id_map = _params->__isset.dest_sid_to_src_sid_without_trans;
1711
342
        int idx = 0;
1712
5.51k
        for (auto* slot_desc : _output_tuple_desc->slots()) {
1713
5.51k
            auto it = _params->expr_of_dest_slot.find(slot_desc->id());
1714
5.51k
            if (it == std::end(_params->expr_of_dest_slot)) {
1715
0
                return Status::InternalError("No expr for dest slot, id={}, name={}",
1716
0
                                             slot_desc->id(), slot_desc->col_name());
1717
0
            }
1718
1719
5.51k
            VExprContextSPtr ctx;
1720
5.51k
            if (!it->second.nodes.empty()) {
1721
5.51k
                RETURN_IF_ERROR(VExpr::create_expr_tree(it->second, ctx));
1722
5.51k
                RETURN_IF_ERROR(ctx->prepare(_state, *_src_row_desc));
1723
5.51k
                RETURN_IF_ERROR(ctx->open(_state));
1724
5.51k
            }
1725
5.51k
            _dest_vexpr_ctx.emplace_back(ctx);
1726
5.51k
            _dest_slot_name_to_idx[slot_desc->col_name()] = idx++;
1727
1728
5.51k
            if (has_slot_id_map) {
1729
5.51k
                auto it1 = _params->dest_sid_to_src_sid_without_trans.find(slot_desc->id());
1730
5.51k
                if (it1 == std::end(_params->dest_sid_to_src_sid_without_trans)) {
1731
448
                    _src_slot_descs_order_by_dest.emplace_back(nullptr);
1732
5.06k
                } else {
1733
5.06k
                    auto _src_slot_it = full_src_slot_map.find(it1->second);
1734
5.06k
                    if (_src_slot_it == std::end(full_src_slot_map)) {
1735
0
                        return Status::InternalError("No src slot {} in src slot descs",
1736
0
                                                     it1->second);
1737
0
                    }
1738
5.06k
                    _dest_slot_to_src_slot_index.emplace(_src_slot_descs_order_by_dest.size(),
1739
5.06k
                                                         full_src_index_map[_src_slot_it->first]);
1740
5.06k
                    _src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second);
1741
5.06k
                }
1742
5.51k
            }
1743
5.51k
        }
1744
342
    }
1745
75.5k
    return Status::OK();
1746
75.5k
}
1747
1748
429k
bool FileScanner::_should_enable_condition_cache() {
1749
429k
    return _condition_cache_digest != 0 && !_is_load &&
1750
429k
           (!_conjuncts.empty() || !_push_down_conjuncts.empty());
1751
429k
}
1752
1753
168k
void FileScanner::_init_reader_condition_cache() {
1754
168k
    _condition_cache = nullptr;
1755
168k
    _condition_cache_ctx = nullptr;
1756
1757
168k
    if (!_should_enable_condition_cache() || !_cur_reader) {
1758
144k
        return;
1759
144k
    }
1760
1761
    // Disable condition cache when delete operations exist (e.g. Iceberg position/equality
1762
    // deletes, Hive ACID deletes). Cached granule results may become stale if delete files
1763
    // change between queries while the data file's cache key remains the same.
1764
23.8k
    if (_cur_reader->has_delete_operations()) {
1765
2.95k
        return;
1766
2.95k
    }
1767
1768
20.8k
    auto* cache = segment_v2::ConditionCache::instance();
1769
20.8k
    _condition_cache_key = segment_v2::ConditionCache::ExternalCacheKey(
1770
20.8k
            _current_range.path,
1771
18.4E
            _current_range.__isset.modification_time ? _current_range.modification_time : 0,
1772
18.4E
            _current_range.__isset.file_size ? _current_range.file_size : -1,
1773
20.8k
            _condition_cache_digest,
1774
18.4E
            _current_range.__isset.start_offset ? _current_range.start_offset : 0,
1775
18.4E
            _current_range.__isset.size ? _current_range.size : -1);
1776
1777
20.8k
    segment_v2::ConditionCacheHandle handle;
1778
20.8k
    auto condition_cache_hit = cache->lookup(_condition_cache_key, &handle);
1779
20.8k
    if (condition_cache_hit) {
1780
8.71k
        _condition_cache = handle.get_filter_result();
1781
8.71k
        _condition_cache_hit_count++;
1782
12.1k
    } else {
1783
        // Allocate cache pre-sized to total number of granules.
1784
        // We add +1 as a safety margin: when a file is split across multiple scanners
1785
        // and the first row of this scanner's range is not aligned to a granule boundary,
1786
        // the data may span one more granule than ceil(total_rows / GRANULE_SIZE).
1787
        // The extra element costs only 1 bit and never affects correctness (an extra
1788
        // false-granule beyond the actual data range won't overlap any real row range).
1789
12.1k
        int64_t total_rows = _cur_reader->get_total_rows();
1790
12.1k
        if (total_rows > 0) {
1791
10.0k
            size_t num_granules = (total_rows + ConditionCacheContext::GRANULE_SIZE - 1) /
1792
10.0k
                                  ConditionCacheContext::GRANULE_SIZE;
1793
10.0k
            _condition_cache = std::make_shared<std::vector<bool>>(num_granules + 1, false);
1794
10.0k
        }
1795
12.1k
    }
1796
1797
20.8k
    if (_condition_cache) {
1798
        // Create context to pass to readers (native readers use it; non-native readers ignore it)
1799
18.7k
        _condition_cache_ctx = std::make_shared<ConditionCacheContext>();
1800
18.7k
        _condition_cache_ctx->is_hit = condition_cache_hit;
1801
18.7k
        _condition_cache_ctx->filter_result = _condition_cache;
1802
18.7k
        _cur_reader->set_condition_cache_context(_condition_cache_ctx);
1803
18.7k
    }
1804
20.8k
}
1805
1806
260k
void FileScanner::_finalize_reader_condition_cache() {
1807
260k
    if (!_should_enable_condition_cache() || !_condition_cache_ctx ||
1808
260k
        _condition_cache_ctx->is_hit) {
1809
250k
        _condition_cache = nullptr;
1810
250k
        _condition_cache_ctx = nullptr;
1811
250k
        return;
1812
250k
    }
1813
    // Only store the cache if the reader was fully consumed. If the scan was
1814
    // truncated early (e.g. by LIMIT), the cache is incomplete — unread granules
1815
    // would remain false and cause surviving rows to be incorrectly skipped on HIT.
1816
10.0k
    if (!_cur_reader_eof) {
1817
70
        _condition_cache = nullptr;
1818
70
        _condition_cache_ctx = nullptr;
1819
70
        return;
1820
70
    }
1821
1822
10.0k
    auto* cache = segment_v2::ConditionCache::instance();
1823
10.0k
    cache->insert(_condition_cache_key, std::move(_condition_cache));
1824
10.0k
    _condition_cache = nullptr;
1825
10.0k
    _condition_cache_ctx = nullptr;
1826
10.0k
}
1827
1828
92.3k
Status FileScanner::close(RuntimeState* state) {
1829
92.3k
    if (!_try_close()) {
1830
0
        return Status::OK();
1831
0
    }
1832
1833
92.3k
    _finalize_reader_condition_cache();
1834
1835
92.3k
    if (_cur_reader) {
1836
840
        RETURN_IF_ERROR(_cur_reader->close());
1837
840
    }
1838
1839
92.3k
    RETURN_IF_ERROR(Scanner::close(state));
1840
92.3k
    return Status::OK();
1841
92.3k
}
1842
1843
92.3k
void FileScanner::try_stop() {
1844
92.3k
    Scanner::try_stop();
1845
92.3k
    if (_io_ctx) {
1846
92.3k
        _io_ctx->should_stop = true;
1847
92.3k
    }
1848
92.3k
}
1849
1850
92.2k
void FileScanner::update_realtime_counters() {
1851
92.2k
    FileScanLocalState* local_state = static_cast<FileScanLocalState*>(_local_state);
1852
1853
92.2k
    COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes);
1854
92.2k
    COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows);
1855
1856
92.2k
    _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_rows(
1857
92.2k
            _file_reader_stats->read_rows);
1858
92.2k
    _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes(
1859
92.2k
            _file_reader_stats->read_bytes);
1860
1861
92.2k
    int64_t delta_bytes_read_from_local =
1862
92.2k
            _file_cache_statistics->bytes_read_from_local - _last_bytes_read_from_local;
1863
92.2k
    int64_t delta_bytes_read_from_remote =
1864
92.2k
            _file_cache_statistics->bytes_read_from_remote - _last_bytes_read_from_remote;
1865
92.2k
    if (_file_cache_statistics->bytes_read_from_local == 0 &&
1866
92.2k
        _file_cache_statistics->bytes_read_from_remote == 0) {
1867
91.4k
        _state->get_query_ctx()
1868
91.4k
                ->resource_ctx()
1869
91.4k
                ->io_context()
1870
91.4k
                ->update_scan_bytes_from_remote_storage(_file_reader_stats->read_bytes);
1871
91.4k
        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
1872
91.4k
                _file_reader_stats->read_bytes);
1873
91.4k
    } else {
1874
802
        _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage(
1875
802
                delta_bytes_read_from_local);
1876
802
        _state->get_query_ctx()
1877
802
                ->resource_ctx()
1878
802
                ->io_context()
1879
802
                ->update_scan_bytes_from_remote_storage(delta_bytes_read_from_remote);
1880
802
        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
1881
802
                delta_bytes_read_from_local);
1882
802
        DorisMetrics::instance()->query_scan_bytes_from_remote->increment(
1883
802
                delta_bytes_read_from_remote);
1884
802
    }
1885
1886
92.2k
    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
1887
1888
92.2k
    DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes);
1889
92.2k
    DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows);
1890
1891
92.2k
    _file_reader_stats->read_bytes = 0;
1892
92.2k
    _file_reader_stats->read_rows = 0;
1893
1894
92.2k
    _last_bytes_read_from_local = _file_cache_statistics->bytes_read_from_local;
1895
92.2k
    _last_bytes_read_from_remote = _file_cache_statistics->bytes_read_from_remote;
1896
92.2k
}
1897
1898
92.1k
void FileScanner::_collect_profile_before_close() {
1899
92.1k
    Scanner::_collect_profile_before_close();
1900
92.2k
    if (config::enable_file_cache && _state->query_options().enable_file_cache &&
1901
92.1k
        _profile != nullptr) {
1902
1.05k
        io::FileCacheProfileReporter cache_profile(_profile);
1903
1.05k
        cache_profile.update(_file_cache_statistics.get());
1904
1.05k
        _state->get_query_ctx()->resource_ctx()->io_context()->update_bytes_write_into_cache(
1905
1.05k
                _file_cache_statistics->bytes_write_into_cache);
1906
1.05k
    }
1907
1908
92.1k
    if (_cur_reader != nullptr) {
1909
838
        _cur_reader->collect_profile_before_close();
1910
838
    }
1911
1912
92.1k
    FileScanLocalState* local_state = static_cast<FileScanLocalState*>(_local_state);
1913
92.1k
    COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes);
1914
92.1k
    COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows);
1915
1916
92.1k
    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
1917
92.1k
    COUNTER_UPDATE(_file_read_calls_counter, _file_reader_stats->read_calls);
1918
92.1k
    COUNTER_UPDATE(_file_read_time_counter, _file_reader_stats->read_time_ns);
1919
92.1k
    COUNTER_UPDATE(local_state->_condition_cache_hit_counter, _condition_cache_hit_count);
1920
92.2k
    if (_io_ctx) {
1921
92.2k
        COUNTER_UPDATE(local_state->_condition_cache_filtered_rows_counter,
1922
92.2k
                       _io_ctx->condition_cache_filtered_rows);
1923
92.2k
    }
1924
1925
92.1k
    DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes);
1926
92.1k
    DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows);
1927
92.1k
}
1928
1929
} // namespace doris