Coverage Report

Created: 2026-04-10 13:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/file_scanner.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/file_scanner.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Exprs_types.h>
22
#include <gen_cpp/Metrics_types.h>
23
#include <gen_cpp/Opcodes_types.h>
24
#include <gen_cpp/PaloInternalService_types.h>
25
#include <gen_cpp/PlanNodes_types.h>
26
#include <glog/logging.h>
27
28
#include <algorithm>
29
#include <boost/iterator/iterator_facade.hpp>
30
#include <map>
31
#include <ranges>
32
#include <tuple>
33
#include <unordered_map>
34
#include <utility>
35
36
#include "common/compiler_util.h" // IWYU pragma: keep
37
#include "common/config.h"
38
#include "common/consts.h"
39
#include "common/logging.h"
40
#include "common/status.h"
41
#include "core/block/column_with_type_and_name.h"
42
#include "core/block/columns_with_type_and_name.h"
43
#include "core/column/column.h"
44
#include "core/column/column_nullable.h"
45
#include "core/column/column_vector.h"
46
#include "core/data_type/data_type.h"
47
#include "core/data_type/data_type_nullable.h"
48
#include "core/data_type/data_type_string.h"
49
#include "core/string_ref.h"
50
#include "exec/common/stringop_substring.h"
51
#include "exec/rowid_fetcher.h"
52
#include "exec/scan/scan_node.h"
53
#include "exprs/aggregate/aggregate_function.h"
54
#include "exprs/function/function.h"
55
#include "exprs/function/simple_function_factory.h"
56
#include "exprs/vexpr.h"
57
#include "exprs/vexpr_context.h"
58
#include "exprs/vexpr_fwd.h"
59
#include "exprs/vslot_ref.h"
60
#include "format/arrow/arrow_stream_reader.h"
61
#include "format/csv/csv_reader.h"
62
#include "format/json/new_json_reader.h"
63
#include "format/native/native_reader.h"
64
#include "format/orc/vorc_reader.h"
65
#include "format/parquet/vparquet_reader.h"
66
#include "format/table/fileset_reader.h"
67
#include "format/table/hive_reader.h"
68
#include "format/table/hudi_jni_reader.h"
69
#include "format/table/hudi_reader.h"
70
#include "format/table/iceberg_reader.h"
71
#include "format/table/iceberg_sys_table_jni_reader.h"
72
#include "format/table/jdbc_jni_reader.h"
73
#include "format/table/max_compute_jni_reader.h"
74
#include "format/table/paimon_cpp_reader.h"
75
#include "format/table/paimon_jni_reader.h"
76
#include "format/table/paimon_predicate_converter.h"
77
#include "format/table/paimon_reader.h"
78
#include "format/table/remote_doris_reader.h"
79
#include "format/table/transactional_hive_reader.h"
80
#include "format/table/trino_connector_jni_reader.h"
81
#include "format/text/text_reader.h"
82
#include "io/cache/block_file_cache_profile.h"
83
#include "load/group_commit/wal/wal_reader.h"
84
#include "runtime/descriptors.h"
85
#include "runtime/runtime_profile.h"
86
#include "runtime/runtime_state.h"
87
88
namespace cctz {
89
class time_zone;
90
} // namespace cctz
91
namespace doris {
92
class ShardedKVCache;
93
} // namespace doris
94
95
namespace doris {
96
#include "common/compile_check_begin.h"
97
using namespace ErrorCode;
98
99
const std::string FileScanner::FileReadBytesProfile = "FileReadBytes";
100
const std::string FileScanner::FileReadTimeProfile = "FileReadTime";
101
102
FileScanner::FileScanner(RuntimeState* state, FileScanLocalState* local_state, int64_t limit,
103
                         std::shared_ptr<SplitSourceConnector> split_source,
104
                         RuntimeProfile* profile, ShardedKVCache* kv_cache,
105
                         const std::unordered_map<std::string, int>* colname_to_slot_id)
106
2
        : Scanner(state, local_state, limit, profile),
107
2
          _split_source(split_source),
108
2
          _cur_reader(nullptr),
109
2
          _cur_reader_eof(false),
110
2
          _kv_cache(kv_cache),
111
2
          _strict_mode(false),
112
2
          _col_name_to_slot_id(colname_to_slot_id) {
113
2
    if (state->get_query_ctx() != nullptr &&
114
2
        state->get_query_ctx()->file_scan_range_params_map.count(local_state->parent_id()) > 0) {
115
0
        _params = &(state->get_query_ctx()->file_scan_range_params_map[local_state->parent_id()]);
116
2
    } else {
117
        // old fe thrift protocol
118
2
        _params = _split_source->get_params();
119
2
    }
120
2
    if (_params->__isset.strict_mode) {
121
0
        _strict_mode = _params->strict_mode;
122
0
    }
123
124
    // For load scanner, there are input and output tuple.
125
    // For query scanner, there is only output tuple
126
2
    _input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_params->src_tuple_id);
127
2
    _real_tuple_desc = _input_tuple_desc == nullptr ? _output_tuple_desc : _input_tuple_desc;
128
2
    _is_load = (_input_tuple_desc != nullptr);
129
2
}
130
131
2
Status FileScanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) {
132
2
    RETURN_IF_ERROR(Scanner::init(state, conjuncts));
133
2
    _get_block_timer =
134
2
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerGetBlockTime", 1);
135
2
    _cast_to_input_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
136
2
                                                      "FileScannerCastInputBlockTime", 1);
137
2
    _fill_missing_columns_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
138
2
                                                       "FileScannerFillMissingColumnTime", 1);
139
2
    _pre_filter_timer =
140
2
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerPreFilterTimer", 1);
141
2
    _convert_to_output_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
142
2
                                                          "FileScannerConvertOuputBlockTime", 1);
143
2
    _runtime_filter_partition_prune_timer = ADD_TIMER_WITH_LEVEL(
144
2
            _local_state->scanner_profile(), "FileScannerRuntimeFilterPartitionPruningTime", 1);
145
2
    _empty_file_counter =
146
2
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "EmptyFileNum", TUnit::UNIT, 1);
147
2
    _not_found_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
148
2
                                                     "NotFoundFileNum", TUnit::UNIT, 1);
149
2
    _fully_skipped_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
150
2
                                                         "FullySkippedFileNum", TUnit::UNIT, 1);
151
2
    _file_counter =
152
2
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT, 1);
153
154
2
    _file_read_bytes_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
155
2
                                                      FileReadBytesProfile, TUnit::BYTES, 1);
156
2
    _file_read_calls_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
157
2
                                                      "FileReadCalls", TUnit::UNIT, 1);
158
2
    _file_read_time_counter =
159
2
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), FileReadTimeProfile, 1);
160
161
2
    _runtime_filter_partition_pruned_range_counter =
162
2
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
163
2
                                   "RuntimeFilterPartitionPrunedRangeNum", TUnit::UNIT, 1);
164
165
2
    _file_cache_statistics.reset(new io::FileCacheStatistics());
166
2
    _file_reader_stats.reset(new io::FileReaderStats());
167
168
2
    RETURN_IF_ERROR(_init_io_ctx());
169
2
    _io_ctx->file_cache_stats = _file_cache_statistics.get();
170
2
    _io_ctx->file_reader_stats = _file_reader_stats.get();
171
2
    _io_ctx->is_disposable = _state->query_options().disable_file_cache;
172
173
2
    if (_is_load) {
174
0
        _src_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
175
0
                                              std::vector<TupleId>({_input_tuple_desc->id()})));
176
        // prepare pre filters
177
0
        if (_params->__isset.pre_filter_exprs_list) {
178
0
            RETURN_IF_ERROR(doris::VExpr::create_expr_trees(_params->pre_filter_exprs_list,
179
0
                                                            _pre_conjunct_ctxs));
180
0
        } else if (_params->__isset.pre_filter_exprs) {
181
0
            VExprContextSPtr context;
182
0
            RETURN_IF_ERROR(doris::VExpr::create_expr_tree(_params->pre_filter_exprs, context));
183
0
            _pre_conjunct_ctxs.emplace_back(context);
184
0
        }
185
186
0
        for (auto& conjunct : _pre_conjunct_ctxs) {
187
0
            RETURN_IF_ERROR(conjunct->prepare(_state, *_src_row_desc));
188
0
            RETURN_IF_ERROR(conjunct->open(_state));
189
0
        }
190
191
0
        _dest_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
192
0
                                               std::vector<TupleId>({_output_tuple_desc->id()})));
193
0
    }
194
195
2
    _default_val_row_desc.reset(
196
2
            new RowDescriptor(_state->desc_tbl(), std::vector<TupleId>({_real_tuple_desc->id()})));
197
198
2
    return Status::OK();
199
2
}
200
201
// check if the expr is a partition pruning expr
202
0
bool FileScanner::_check_partition_prune_expr(const VExprSPtr& expr) {
203
0
    if (expr->is_slot_ref()) {
204
0
        auto* slot_ref = static_cast<VSlotRef*>(expr.get());
205
0
        return _partition_slot_index_map.find(slot_ref->slot_id()) !=
206
0
               _partition_slot_index_map.end();
207
0
    }
208
0
    if (expr->is_literal()) {
209
0
        return true;
210
0
    }
211
0
    return std::ranges::all_of(expr->children(), [this](const auto& child) {
212
0
        return _check_partition_prune_expr(child);
213
0
    });
214
0
}
215
216
0
void FileScanner::_init_runtime_filter_partition_prune_ctxs() {
217
0
    _runtime_filter_partition_prune_ctxs.clear();
218
0
    for (auto& conjunct : _conjuncts) {
219
0
        auto impl = conjunct->root()->get_impl();
220
        // If impl is not null, which means this a conjuncts from runtime filter.
221
0
        auto expr = impl ? impl : conjunct->root();
222
0
        if (_check_partition_prune_expr(expr)) {
223
0
            _runtime_filter_partition_prune_ctxs.emplace_back(conjunct);
224
0
        }
225
0
    }
226
0
}
227
228
0
void FileScanner::_init_runtime_filter_partition_prune_block() {
229
    // init block with empty column
230
0
    for (auto const* slot_desc : _real_tuple_desc->slots()) {
231
0
        _runtime_filter_partition_prune_block.insert(
232
0
                ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
233
0
                                      slot_desc->get_data_type_ptr(), slot_desc->col_name()));
234
0
    }
235
0
}
236
237
0
Status FileScanner::_process_runtime_filters_partition_prune(bool& can_filter_all) {
238
0
    SCOPED_TIMER(_runtime_filter_partition_prune_timer);
239
0
    if (_runtime_filter_partition_prune_ctxs.empty() || _partition_col_descs.empty()) {
240
0
        return Status::OK();
241
0
    }
242
0
    size_t partition_value_column_size = 1;
243
244
    // 1. Get partition key values to string columns.
245
0
    std::unordered_map<SlotId, MutableColumnPtr> partition_slot_id_to_column;
246
0
    for (auto const& partition_col_desc : _partition_col_descs) {
247
0
        const auto& [partition_value, partition_slot_desc] = partition_col_desc.second;
248
0
        auto data_type = partition_slot_desc->get_data_type_ptr();
249
0
        auto test_serde = data_type->get_serde();
250
0
        auto partition_value_column = data_type->create_column();
251
0
        auto* col_ptr = static_cast<IColumn*>(partition_value_column.get());
252
0
        Slice slice(partition_value.data(), partition_value.size());
253
0
        uint64_t num_deserialized = 0;
254
0
        DataTypeSerDe::FormatOptions options {};
255
0
        if (_partition_value_is_null.contains(partition_slot_desc->col_name())) {
256
            // for iceberg/paimon table
257
            // NOTICE: column is always be nullable for iceberg/paimon table now
258
0
            DCHECK(data_type->is_nullable());
259
0
            test_serde = test_serde->get_nested_serdes()[0];
260
0
            auto* null_column = assert_cast<ColumnNullable*>(col_ptr);
261
0
            if (_partition_value_is_null[partition_slot_desc->col_name()]) {
262
0
                null_column->insert_many_defaults(partition_value_column_size);
263
0
            } else {
264
                // If the partition value is not null, we set null map to 0 and deserialize it normally.
265
0
                null_column->get_null_map_column().insert_many_vals(0, partition_value_column_size);
266
0
                RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
267
0
                        null_column->get_nested_column(), slice, partition_value_column_size,
268
0
                        &num_deserialized, options));
269
0
            }
270
0
        } else {
271
            // for hive/hudi table, the null value is set as "\\N"
272
            // TODO: this will be unified as iceberg/paimon table in the future
273
0
            RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
274
0
                    *col_ptr, slice, partition_value_column_size, &num_deserialized, options));
275
0
        }
276
277
0
        partition_slot_id_to_column[partition_slot_desc->id()] = std::move(partition_value_column);
278
0
    }
279
280
    // 2. Fill _runtime_filter_partition_prune_block from the partition column, then execute conjuncts and filter block.
281
    // 2.1 Fill _runtime_filter_partition_prune_block from the partition column to match the conjuncts executing.
282
0
    size_t index = 0;
283
0
    bool first_column_filled = false;
284
0
    for (auto const* slot_desc : _real_tuple_desc->slots()) {
285
0
        if (partition_slot_id_to_column.find(slot_desc->id()) !=
286
0
            partition_slot_id_to_column.end()) {
287
0
            auto data_type = slot_desc->get_data_type_ptr();
288
0
            auto partition_value_column = std::move(partition_slot_id_to_column[slot_desc->id()]);
289
0
            if (data_type->is_nullable()) {
290
0
                _runtime_filter_partition_prune_block.insert(
291
0
                        index, ColumnWithTypeAndName(
292
0
                                       ColumnNullable::create(
293
0
                                               std::move(partition_value_column),
294
0
                                               ColumnUInt8::create(partition_value_column_size, 0)),
295
0
                                       data_type, slot_desc->col_name()));
296
0
            } else {
297
0
                _runtime_filter_partition_prune_block.insert(
298
0
                        index, ColumnWithTypeAndName(std::move(partition_value_column), data_type,
299
0
                                                     slot_desc->col_name()));
300
0
            }
301
0
            if (index == 0) {
302
0
                first_column_filled = true;
303
0
            }
304
0
        }
305
0
        index++;
306
0
    }
307
308
    // 2.2 Execute conjuncts.
309
0
    if (!first_column_filled) {
310
        // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
311
        // The following process may be tricky and time-consuming, but we have no other way.
312
0
        _runtime_filter_partition_prune_block.get_by_position(0).column->assume_mutable()->resize(
313
0
                partition_value_column_size);
314
0
    }
315
0
    IColumn::Filter result_filter(_runtime_filter_partition_prune_block.rows(), 1);
316
0
    RETURN_IF_ERROR(VExprContext::execute_conjuncts(_runtime_filter_partition_prune_ctxs, nullptr,
317
0
                                                    &_runtime_filter_partition_prune_block,
318
0
                                                    &result_filter, &can_filter_all));
319
0
    return Status::OK();
320
0
}
321
322
1
Status FileScanner::_process_conjuncts() {
323
1
    _slot_id_to_filter_conjuncts.clear();
324
1
    _not_single_slot_filter_conjuncts.clear();
325
1
    for (auto& conjunct : _push_down_conjuncts) {
326
1
        auto impl = conjunct->root()->get_impl();
327
        // If impl is not null, which means this a conjuncts from runtime filter.
328
1
        auto cur_expr = impl ? impl : conjunct->root();
329
330
1
        std::vector<int> slot_ids;
331
1
        _get_slot_ids(cur_expr.get(), &slot_ids);
332
1
        if (slot_ids.empty()) {
333
1
            _not_single_slot_filter_conjuncts.emplace_back(conjunct);
334
1
            continue;
335
1
        }
336
0
        bool single_slot = true;
337
0
        for (int i = 1; i < slot_ids.size(); i++) {
338
0
            if (slot_ids[i] != slot_ids[0]) {
339
0
                single_slot = false;
340
0
                break;
341
0
            }
342
0
        }
343
0
        if (single_slot) {
344
0
            SlotId slot_id = slot_ids[0];
345
0
            _slot_id_to_filter_conjuncts[slot_id].emplace_back(conjunct);
346
0
        } else {
347
0
            _not_single_slot_filter_conjuncts.emplace_back(conjunct);
348
0
        }
349
0
    }
350
1
    return Status::OK();
351
1
}
352
353
1
Status FileScanner::_process_late_arrival_conjuncts() {
354
1
    if (_push_down_conjuncts.size() < _conjuncts.size()) {
355
1
        _push_down_conjuncts = _conjuncts;
356
        // Do not clear _conjuncts here!
357
        // We must keep it for fallback filtering, especially when mixing
358
        // Native readers (which use _push_down_conjuncts) and JNI readers (which rely on _conjuncts).
359
        // _conjuncts.clear();
360
1
        RETURN_IF_ERROR(_process_conjuncts());
361
1
    }
362
1
    if (_applied_rf_num == _total_rf_num) {
363
1
        _local_state->scanner_profile()->add_info_string("ApplyAllRuntimeFilters", "True");
364
1
    }
365
1
    return Status::OK();
366
1
}
367
368
1
void FileScanner::_get_slot_ids(VExpr* expr, std::vector<int>* slot_ids) {
369
1
    for (auto& child_expr : expr->children()) {
370
0
        if (child_expr->is_slot_ref()) {
371
0
            VSlotRef* slot_ref = reinterpret_cast<VSlotRef*>(child_expr.get());
372
0
            SlotDescriptor* slot_desc = _state->desc_tbl().get_slot_descriptor(slot_ref->slot_id());
373
0
            slot_desc->set_is_predicate(true);
374
0
            slot_ids->emplace_back(slot_ref->slot_id());
375
0
        } else {
376
0
            _get_slot_ids(child_expr.get(), slot_ids);
377
0
        }
378
0
    }
379
1
}
380
381
0
Status FileScanner::_open_impl(RuntimeState* state) {
382
0
    RETURN_IF_CANCELLED(state);
383
0
    RETURN_IF_ERROR(Scanner::_open_impl(state));
384
0
    if (_local_state) {
385
0
        _condition_cache_digest = _local_state->get_condition_cache_digest();
386
0
    }
387
0
    RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range));
388
0
    if (_first_scan_range) {
389
0
        RETURN_IF_ERROR(_init_expr_ctxes());
390
0
        if (_state->query_options().enable_runtime_filter_partition_prune &&
391
0
            !_partition_slot_index_map.empty()) {
392
0
            _init_runtime_filter_partition_prune_ctxs();
393
0
            _init_runtime_filter_partition_prune_block();
394
0
        }
395
0
    } else {
396
        // there's no scan range in split source. stop scanner directly.
397
0
        _scanner_eof = true;
398
0
    }
399
400
0
    return Status::OK();
401
0
}
402
403
37
Status FileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
404
37
    Status st = _get_block_wrapped(state, block, eof);
405
406
37
    if (!st.ok()) {
407
        // add cur path in error msg for easy debugging
408
1
        return std::move(st.append(". cur path: " + get_current_scan_range_name()));
409
1
    }
410
36
    return st;
411
37
}
412
413
// For query:
414
//                              [exist cols]  [non-exist cols]  [col from path]  input  output
415
//                              A     B    C  D                 E
416
// _init_src_block              x     x    x  x                 x                -      x
417
// get_next_block               x     x    x  -                 -                -      x
418
// _cast_to_input_block         -     -    -  -                 -                -      -
419
// _fill_columns_from_path      -     -    -  -                 x                -      x
420
// _fill_missing_columns        -     -    -  x                 -                -      x
421
// _convert_to_output_block     -     -    -  -                 -                -      -
422
//
423
// For load:
424
//                              [exist cols]  [non-exist cols]  [col from path]  input  output
425
//                              A     B    C  D                 E
426
// _init_src_block              x     x    x  x                 x                x      -
427
// get_next_block               x     x    x  -                 -                x      -
428
// _cast_to_input_block         x     x    x  -                 -                x      -
429
// _fill_columns_from_path      -     -    -  -                 x                x      -
430
// _fill_missing_columns        -     -    -  x                 -                x      -
431
// _convert_to_output_block     -     -    -  -                 -                -      x
432
37
Status FileScanner::_get_block_wrapped(RuntimeState* state, Block* block, bool* eof) {
433
37
    do {
434
37
        RETURN_IF_CANCELLED(state);
435
37
        if (_cur_reader == nullptr || _cur_reader_eof) {
436
1
            _finalize_reader_condition_cache();
437
            // The file may not exist because the file list is got from meta cache,
438
            // And the file may already be removed from storage.
439
            // Just ignore not found files.
440
1
            Status st = _get_next_reader();
441
1
            if (st.is<ErrorCode::NOT_FOUND>() && config::ignore_not_found_file_in_external_table) {
442
0
                _cur_reader_eof = true;
443
0
                COUNTER_UPDATE(_not_found_file_counter, 1);
444
0
                continue;
445
1
            } else if (st.is<ErrorCode::END_OF_FILE>()) {
446
0
                _cur_reader_eof = true;
447
0
                COUNTER_UPDATE(_fully_skipped_file_counter, 1);
448
0
                continue;
449
1
            } else if (!st) {
450
1
                return st;
451
1
            }
452
0
            _init_reader_condition_cache();
453
0
        }
454
455
36
        if (_scanner_eof) {
456
0
            *eof = true;
457
0
            return Status::OK();
458
0
        }
459
460
        // Init src block for load job based on the data file schema (e.g. parquet)
461
        // For query job, simply set _src_block_ptr to block.
462
36
        size_t read_rows = 0;
463
36
        RETURN_IF_ERROR(_init_src_block(block));
464
36
        if (_need_iceberg_rowid_column && _current_range.__isset.table_format_params &&
465
36
            _current_range.table_format_params.table_format_type == "iceberg") {
466
0
            if (auto* iceberg_reader = dynamic_cast<IcebergTableReader*>(_cur_reader.get())) {
467
0
                iceberg_reader->set_row_id_column_position(_iceberg_rowid_column_pos);
468
0
            }
469
0
        }
470
36
        {
471
36
            SCOPED_TIMER(_get_block_timer);
472
473
            // Read next block.
474
            // Some of column in block may not be filled (column not exist in file)
475
36
            RETURN_IF_ERROR(
476
36
                    _cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof));
477
36
        }
478
        // use read_rows instead of _src_block_ptr->rows(), because the first column of _src_block_ptr
479
        // may not be filled after calling `get_next_block()`, so _src_block_ptr->rows() may return wrong result.
480
36
        if (read_rows > 0) {
481
25
            if ((!_cur_reader->count_read_rows()) && _io_ctx) {
482
0
                _io_ctx->file_reader_stats->read_rows += read_rows;
483
0
            }
484
            // If the push_down_agg_type is COUNT, no need to do the rest,
485
            // because we only save a number in block.
486
25
            if (_get_push_down_agg_type() != TPushAggOp::type::COUNT) {
487
                // Convert the src block columns type to string in-place.
488
25
                RETURN_IF_ERROR(_cast_to_input_block(block));
489
                // FileReader can fill partition and missing columns itself
490
25
                if (!_cur_reader->fill_all_columns()) {
491
                    // Fill rows in src block with partition columns from path. (e.g. Hive partition columns)
492
0
                    RETURN_IF_ERROR(_fill_columns_from_path(read_rows));
493
                    // Fill columns not exist in file with null or default value
494
0
                    RETURN_IF_ERROR(_fill_missing_columns(read_rows));
495
0
                }
496
                // Apply _pre_conjunct_ctxs to filter src block.
497
25
                RETURN_IF_ERROR(_pre_filter_src_block());
498
499
                // Convert src block to output block (dest block), string to dest data type and apply filters.
500
25
                RETURN_IF_ERROR(_convert_to_output_block(block));
501
                // Truncate char columns or varchar columns if size is smaller than file columns
502
                // or not found in the file column schema.
503
25
                RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block));
504
25
            }
505
25
        }
506
36
        break;
507
36
    } while (true);
508
509
    // Update filtered rows and unselected rows for load, reset counter.
510
    // {
511
    //     state->update_num_rows_load_filtered(_counter.num_rows_filtered);
512
    //     state->update_num_rows_load_unselected(_counter.num_rows_unselected);
513
    //     _reset_counter();
514
    // }
515
36
    return Status::OK();
516
37
}
517
518
/**
519
 * Check whether there are complex types in parquet/orc reader in broker/stream load.
520
 * Broker/stream load will cast any type as string type, and complex types will be casted wrong.
521
 * This is a temporary method, and will be replaced by tvf.
522
 */
523
0
Status FileScanner::_check_output_block_types() {
524
0
    if (_is_load) {
525
0
        TFileFormatType::type format_type = _params->format_type;
526
0
        if (format_type == TFileFormatType::FORMAT_PARQUET ||
527
0
            format_type == TFileFormatType::FORMAT_ORC) {
528
0
            for (auto slot : _output_tuple_desc->slots()) {
529
0
                if (is_complex_type(slot->type()->get_primitive_type())) {
530
0
                    return Status::InternalError(
531
0
                            "Parquet/orc doesn't support complex types in broker/stream load, "
532
0
                            "please use tvf(table value function) to insert complex types.");
533
0
                }
534
0
            }
535
0
        }
536
0
    }
537
0
    return Status::OK();
538
0
}
539
540
36
Status FileScanner::_init_src_block(Block* block) {
541
36
    if (!_is_load) {
542
36
        _src_block_ptr = block;
543
544
36
        bool update_name_to_idx = _src_block_name_to_idx.empty();
545
36
        _iceberg_rowid_column_pos = -1;
546
36
        if (_need_iceberg_rowid_column && _current_range.__isset.table_format_params &&
547
36
            _current_range.table_format_params.table_format_type == "iceberg") {
548
0
            int row_id_idx = block->get_position_by_name(BeConsts::ICEBERG_ROWID_COL);
549
0
            if (row_id_idx >= 0) {
550
0
                _iceberg_rowid_column_pos = row_id_idx;
551
0
                if (!update_name_to_idx &&
552
0
                    !_src_block_name_to_idx.contains(BeConsts::ICEBERG_ROWID_COL)) {
553
0
                    update_name_to_idx = true;
554
0
                }
555
0
            }
556
0
        }
557
558
        // Build name to index map only once on first call
559
36
        if (update_name_to_idx) {
560
16
            _src_block_name_to_idx = block->get_name_to_pos_map();
561
16
        }
562
36
        return Status::OK();
563
36
    }
564
0
    RETURN_IF_ERROR(_check_output_block_types());
565
566
    // if (_src_block_init) {
567
    //     _src_block.clear_column_data();
568
    //     _src_block_ptr = &_src_block;
569
    //     return Status::OK();
570
    // }
571
572
0
    _src_block.clear();
573
0
    uint32_t idx = 0;
574
    // slots in _input_tuple_desc contains all slots describe in load statement, eg:
575
    // -H "columns: k1, k2, tmp1, k3 = tmp1 + 1"
576
    // _input_tuple_desc will contains: k1, k2, tmp1
577
    // and some of them are from file, such as k1 and k2, and some of them may not exist in file, such as tmp1
578
    // _input_tuple_desc also contains columns from path
579
0
    for (auto& slot : _input_tuple_desc->slots()) {
580
0
        DataTypePtr data_type;
581
0
        auto it = _slot_lower_name_to_col_type.find(slot->col_name());
582
0
        if (slot->is_skip_bitmap_col()) {
583
0
            _skip_bitmap_col_idx = idx;
584
0
        }
585
0
        if (_params->__isset.sequence_map_col) {
586
0
            if (_params->sequence_map_col == slot->col_name()) {
587
0
                _sequence_map_col_uid = slot->col_unique_id();
588
0
            }
589
0
        }
590
0
        data_type =
591
0
                it == _slot_lower_name_to_col_type.end() ? slot->type() : make_nullable(it->second);
592
0
        MutableColumnPtr data_column = data_type->create_column();
593
0
        _src_block.insert(
594
0
                ColumnWithTypeAndName(std::move(data_column), data_type, slot->col_name()));
595
0
        _src_block_name_to_idx.emplace(slot->col_name(), idx++);
596
0
    }
597
0
    if (_params->__isset.sequence_map_col) {
598
0
        for (const auto& slot : _output_tuple_desc->slots()) {
599
            // When the target table has seqeunce map column, _input_tuple_desc will not contains __DORIS_SEQUENCE_COL__,
600
            // so we should get its column unique id from _output_tuple_desc
601
0
            if (slot->is_sequence_col()) {
602
0
                _sequence_col_uid = slot->col_unique_id();
603
0
            }
604
0
        }
605
0
    }
606
0
    _src_block_ptr = &_src_block;
607
0
    _src_block_init = true;
608
0
    return Status::OK();
609
0
}
610
611
25
Status FileScanner::_cast_to_input_block(Block* block) {
612
25
    if (!_is_load) {
613
25
        return Status::OK();
614
25
    }
615
0
    SCOPED_TIMER(_cast_to_input_block_timer);
616
    // cast primitive type(PT0) to primitive type(PT1)
617
0
    uint32_t idx = 0;
618
0
    for (auto& slot_desc : _input_tuple_desc->slots()) {
619
0
        if (_slot_lower_name_to_col_type.find(slot_desc->col_name()) ==
620
0
            _slot_lower_name_to_col_type.end()) {
621
            // skip columns which does not exist in file
622
0
            continue;
623
0
        }
624
0
        auto& arg = _src_block_ptr->get_by_position(_src_block_name_to_idx[slot_desc->col_name()]);
625
0
        auto return_type = slot_desc->get_data_type_ptr();
626
        // remove nullable here, let the get_function decide whether nullable
627
0
        auto data_type = get_data_type_with_default_argument(remove_nullable(return_type));
628
0
        ColumnsWithTypeAndName arguments {
629
0
                arg, {data_type->create_column(), data_type, slot_desc->col_name()}};
630
0
        auto func_cast =
631
0
                SimpleFunctionFactory::instance().get_function("CAST", arguments, return_type, {});
632
0
        if (!func_cast) {
633
0
            return Status::InternalError("Function CAST[arg={}, col name={}, return={}] not found!",
634
0
                                         arg.type->get_name(), slot_desc->col_name(),
635
0
                                         return_type->get_name());
636
0
        }
637
0
        idx = _src_block_name_to_idx[slot_desc->col_name()];
638
0
        DCHECK(_state != nullptr);
639
0
        auto ctx = FunctionContext::create_context(_state, {}, {});
640
0
        RETURN_IF_ERROR(
641
0
                func_cast->execute(ctx.get(), *_src_block_ptr, {idx}, idx, arg.column->size()));
642
0
        _src_block_ptr->get_by_position(idx).type = std::move(return_type);
643
0
    }
644
0
    return Status::OK();
645
0
}
646
647
0
Status FileScanner::_fill_columns_from_path(size_t rows) {
648
0
    if (!_fill_partition_from_path) {
649
0
        return Status::OK();
650
0
    }
651
0
    DataTypeSerDe::FormatOptions _text_formatOptions;
652
0
    for (auto& kv : _partition_col_descs) {
653
0
        auto doris_column =
654
0
                _src_block_ptr->get_by_position(_src_block_name_to_idx[kv.first]).column;
655
        // _src_block_ptr points to a mutable block created by this class itself, so const_cast can be used here.
656
0
        IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
657
0
        auto& [value, slot_desc] = kv.second;
658
0
        auto _text_serde = slot_desc->get_data_type_ptr()->get_serde();
659
0
        Slice slice(value.data(), value.size());
660
0
        uint64_t num_deserialized = 0;
661
0
        if (_text_serde->deserialize_column_from_fixed_json(*col_ptr, slice, rows,
662
0
                                                            &num_deserialized,
663
0
                                                            _text_formatOptions) != Status::OK()) {
664
0
            return Status::InternalError("Failed to fill partition column: {}={}",
665
0
                                         slot_desc->col_name(), value);
666
0
        }
667
0
        if (num_deserialized != rows) {
668
0
            return Status::InternalError(
669
0
                    "Failed to fill partition column: {}={} ."
670
0
                    "Number of rows expected to be written : {}, number of rows actually written : "
671
0
                    "{}",
672
0
                    slot_desc->col_name(), value, num_deserialized, rows);
673
0
        }
674
0
    }
675
0
    return Status::OK();
676
0
}
677
678
0
Status FileScanner::_fill_missing_columns(size_t rows) {
679
0
    if (_missing_cols.empty()) {
680
0
        return Status::OK();
681
0
    }
682
683
0
    SCOPED_TIMER(_fill_missing_columns_timer);
684
0
    for (auto& kv : _missing_col_descs) {
685
0
        if (kv.second == nullptr) {
686
            // no default column, fill with null
687
0
            auto mutable_column = _src_block_ptr->get_by_position(_src_block_name_to_idx[kv.first])
688
0
                                          .column->assume_mutable();
689
0
            auto* nullable_column = static_cast<ColumnNullable*>(mutable_column.get());
690
0
            nullable_column->insert_many_defaults(rows);
691
0
        } else {
692
            // fill with default value
693
0
            auto& ctx = kv.second;
694
0
            ColumnPtr result_column_ptr;
695
            // PT1 => dest primitive type
696
0
            RETURN_IF_ERROR(ctx->execute(_src_block_ptr, result_column_ptr));
697
0
            if (result_column_ptr->use_count() == 1) {
698
                // call resize because the first column of _src_block_ptr may not be filled by reader,
699
                // so _src_block_ptr->rows() may return wrong result, cause the column created by `ctx->execute()`
700
                // has only one row.
701
0
                auto mutable_column = result_column_ptr->assume_mutable();
702
0
                mutable_column->resize(rows);
703
                // result_column_ptr maybe a ColumnConst, convert it to a normal column
704
0
                result_column_ptr = result_column_ptr->convert_to_full_column_if_const();
705
0
                auto origin_column_type =
706
0
                        _src_block_ptr->get_by_position(_src_block_name_to_idx[kv.first]).type;
707
0
                bool is_nullable = origin_column_type->is_nullable();
708
0
                if (!_src_block_name_to_idx.contains(kv.first)) {
709
0
                    return Status::InternalError("Column {} not found in src block {}", kv.first,
710
0
                                                 _src_block_ptr->dump_structure());
711
0
                }
712
0
                _src_block_ptr->replace_by_position(
713
0
                        _src_block_name_to_idx[kv.first],
714
0
                        is_nullable ? make_nullable(result_column_ptr) : result_column_ptr);
715
0
            }
716
0
        }
717
0
    }
718
0
    return Status::OK();
719
0
}
720
721
25
Status FileScanner::_pre_filter_src_block() {
722
25
    if (!_is_load) {
723
25
        return Status::OK();
724
25
    }
725
0
    if (!_pre_conjunct_ctxs.empty()) {
726
0
        SCOPED_TIMER(_pre_filter_timer);
727
0
        auto origin_column_num = _src_block_ptr->columns();
728
0
        auto old_rows = _src_block_ptr->rows();
729
0
        RETURN_IF_ERROR(
730
0
                VExprContext::filter_block(_pre_conjunct_ctxs, _src_block_ptr, origin_column_num));
731
0
        _counter.num_rows_unselected += old_rows - _src_block_ptr->rows();
732
0
    }
733
0
    return Status::OK();
734
0
}
735
736
25
Status FileScanner::_convert_to_output_block(Block* block) {
737
25
    if (!_is_load) {
738
25
        return Status::OK();
739
25
    }
740
0
    SCOPED_TIMER(_convert_to_output_block_timer);
741
    // The block is passed from scanner context's free blocks,
742
    // which is initialized by output columns
743
    // so no need to clear it
744
    // block->clear();
745
746
0
    int ctx_idx = 0;
747
0
    size_t rows = _src_block_ptr->rows();
748
0
    auto filter_column = ColumnUInt8::create(rows, 1);
749
0
    auto& filter_map = filter_column->get_data();
750
751
    // After convert, the column_ptr should be copied into output block.
752
    // Can not use block->insert() because it may cause use_count() non-zero bug
753
0
    MutableBlock mutable_output_block =
754
0
            VectorizedUtils::build_mutable_mem_reuse_block(block, *_dest_row_desc);
755
0
    auto& mutable_output_columns = mutable_output_block.mutable_columns();
756
757
0
    std::vector<BitmapValue>* skip_bitmaps {nullptr};
758
0
    if (_should_process_skip_bitmap_col()) {
759
0
        auto* skip_bitmap_nullable_col_ptr =
760
0
                assert_cast<ColumnNullable*>(_src_block_ptr->get_by_position(_skip_bitmap_col_idx)
761
0
                                                     .column->assume_mutable()
762
0
                                                     .get());
763
0
        skip_bitmaps = &(assert_cast<ColumnBitmap*>(
764
0
                                 skip_bitmap_nullable_col_ptr->get_nested_column_ptr().get())
765
0
                                 ->get_data());
766
        // NOTE:
767
        // - If the table has sequence type column, __DORIS_SEQUENCE_COL__ will be put in _input_tuple_desc, so whether
768
        //   __DORIS_SEQUENCE_COL__ will be marked in skip bitmap depends on whether it's specified in that row
769
        // - If the table has sequence map column, __DORIS_SEQUENCE_COL__ will not be put in _input_tuple_desc,
770
        //   so __DORIS_SEQUENCE_COL__ will be ommited if it't specified in a row and will not be marked in skip bitmap.
771
        //   So we should mark __DORIS_SEQUENCE_COL__ in skip bitmap here if the corresponding sequence map column us marked
772
0
        if (_sequence_map_col_uid != -1) {
773
0
            for (int j = 0; j < rows; ++j) {
774
0
                if ((*skip_bitmaps)[j].contains(_sequence_map_col_uid)) {
775
0
                    (*skip_bitmaps)[j].add(_sequence_col_uid);
776
0
                }
777
0
            }
778
0
        }
779
0
    }
780
781
    // for (auto slot_desc : _output_tuple_desc->slots()) {
782
0
    for (int j = 0; j < mutable_output_columns.size(); ++j) {
783
0
        auto* slot_desc = _output_tuple_desc->slots()[j];
784
0
        int dest_index = ctx_idx;
785
0
        ColumnPtr column_ptr;
786
787
0
        auto& ctx = _dest_vexpr_ctx[dest_index];
788
        // PT1 => dest primitive type
789
0
        RETURN_IF_ERROR(ctx->execute(_src_block_ptr, column_ptr));
790
        // column_ptr maybe a ColumnConst, convert it to a normal column
791
0
        column_ptr = column_ptr->convert_to_full_column_if_const();
792
0
        DCHECK(column_ptr);
793
794
        // because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr
795
        // is likely to be nullable
796
0
        if (LIKELY(column_ptr->is_nullable())) {
797
0
            const auto* nullable_column = reinterpret_cast<const ColumnNullable*>(column_ptr.get());
798
0
            for (int i = 0; i < rows; ++i) {
799
0
                if (filter_map[i] && nullable_column->is_null_at(i)) {
800
                    // skip checks for non-mentioned columns in flexible partial update
801
0
                    if (skip_bitmaps == nullptr ||
802
0
                        !skip_bitmaps->at(i).contains(slot_desc->col_unique_id())) {
803
                        // clang-format off
804
0
                        if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) &&
805
0
                            !_src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index]).column->is_null_at(i)) {
806
0
                            filter_map[i] = false;
807
0
                            RETURN_IF_ERROR(_state->append_error_msg_to_file(
808
0
                                [&]() -> std::string {
809
0
                                    return _src_block_ptr->dump_one_line(i, _num_of_columns_from_file);
810
0
                                },
811
0
                                [&]() -> std::string {
812
0
                                    auto raw_value =
813
0
                                            _src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index]).column->get_data_at(i);
814
0
                                    std::string raw_string = raw_value.to_string();
815
0
                                    fmt::memory_buffer error_msg;
816
0
                                    fmt::format_to(error_msg,"column({}) value is incorrect while strict mode is {}, src value is {}",
817
0
                                            slot_desc->col_name(), _strict_mode, raw_string);
818
0
                                    return fmt::to_string(error_msg);
819
0
                                }));
820
0
                        } else if (!slot_desc->is_nullable()) {
821
0
                            filter_map[i] = false;
822
0
                            RETURN_IF_ERROR(_state->append_error_msg_to_file(
823
0
                                [&]() -> std::string {
824
0
                                    return _src_block_ptr->dump_one_line(i, _num_of_columns_from_file);
825
0
                                },
826
0
                                [&]() -> std::string {
827
0
                                    fmt::memory_buffer error_msg;
828
0
                                    fmt::format_to(error_msg, "column({}) values is null while columns is not nullable", slot_desc->col_name());
829
0
                                    return fmt::to_string(error_msg);
830
0
                                }));
831
0
                        }
832
                        // clang-format on
833
0
                    }
834
0
                }
835
0
            }
836
0
            if (!slot_desc->is_nullable()) {
837
0
                column_ptr = remove_nullable(column_ptr);
838
0
            }
839
0
        } else if (slot_desc->is_nullable()) {
840
0
            column_ptr = make_nullable(column_ptr);
841
0
        }
842
0
        mutable_output_columns[j]->insert_range_from(*column_ptr, 0, rows);
843
0
        ctx_idx++;
844
0
    }
845
846
    // after do the dest block insert operation, clear _src_block to remove the reference of origin column
847
0
    _src_block_ptr->clear();
848
849
0
    size_t dest_size = block->columns();
850
    // do filter
851
0
    block->insert(ColumnWithTypeAndName(std::move(filter_column), std::make_shared<DataTypeUInt8>(),
852
0
                                        "filter column"));
853
0
    RETURN_IF_ERROR(Block::filter_block(block, dest_size, dest_size));
854
855
0
    _counter.num_rows_filtered += rows - block->rows();
856
0
    return Status::OK();
857
0
}
858
859
25
Status FileScanner::_truncate_char_or_varchar_columns(Block* block) {
860
    // Truncate char columns or varchar columns if size is smaller than file columns
861
    // or not found in the file column schema.
862
25
    if (!_state->query_options().truncate_char_or_varchar_columns) {
863
25
        return Status::OK();
864
25
    }
865
0
    int idx = 0;
866
0
    for (auto* slot_desc : _real_tuple_desc->slots()) {
867
0
        const auto& type = slot_desc->type();
868
0
        if (type->get_primitive_type() != TYPE_VARCHAR && type->get_primitive_type() != TYPE_CHAR) {
869
0
            ++idx;
870
0
            continue;
871
0
        }
872
0
        auto iter = _source_file_col_name_types.find(slot_desc->col_name());
873
0
        if (iter != _source_file_col_name_types.end()) {
874
0
            const auto file_type_desc = _source_file_col_name_types[slot_desc->col_name()];
875
0
            int l = -1;
876
0
            if (auto* ftype = check_and_get_data_type<DataTypeString>(
877
0
                        remove_nullable(file_type_desc).get())) {
878
0
                l = ftype->len();
879
0
            }
880
0
            if ((assert_cast<const DataTypeString*>(remove_nullable(type).get())->len() > 0) &&
881
0
                (assert_cast<const DataTypeString*>(remove_nullable(type).get())->len() < l ||
882
0
                 l < 0)) {
883
0
                _truncate_char_or_varchar_column(
884
0
                        block, idx,
885
0
                        assert_cast<const DataTypeString*>(remove_nullable(type).get())->len());
886
0
            }
887
0
        } else {
888
0
            _truncate_char_or_varchar_column(
889
0
                    block, idx,
890
0
                    assert_cast<const DataTypeString*>(remove_nullable(type).get())->len());
891
0
        }
892
0
        ++idx;
893
0
    }
894
0
    return Status::OK();
895
25
}
896
897
// VARCHAR substring(VARCHAR str, INT pos[, INT len])
898
0
void FileScanner::_truncate_char_or_varchar_column(Block* block, int idx, int len) {
899
0
    auto int_type = std::make_shared<DataTypeInt32>();
900
0
    uint32_t num_columns_without_result = block->columns();
901
0
    const ColumnNullable* col_nullable =
902
0
            assert_cast<const ColumnNullable*>(block->get_by_position(idx).column.get());
903
0
    const ColumnPtr& string_column_ptr = col_nullable->get_nested_column_ptr();
904
0
    ColumnPtr null_map_column_ptr = col_nullable->get_null_map_column_ptr();
905
0
    block->replace_by_position(idx, std::move(string_column_ptr));
906
0
    block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(1)), int_type,
907
0
                   "const 1"}); // pos is 1
908
0
    block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(len)), int_type,
909
0
                   fmt::format("const {}", len)});                          // len
910
0
    block->insert({nullptr, std::make_shared<DataTypeString>(), "result"}); // result column
911
0
    ColumnNumbers temp_arguments(3);
912
0
    temp_arguments[0] = idx;                            // str column
913
0
    temp_arguments[1] = num_columns_without_result;     // pos
914
0
    temp_arguments[2] = num_columns_without_result + 1; // len
915
0
    uint32_t result_column_id = num_columns_without_result + 2;
916
917
0
    SubstringUtil::substring_execute(*block, temp_arguments, result_column_id, block->rows());
918
0
    auto res = ColumnNullable::create(block->get_by_position(result_column_id).column,
919
0
                                      null_map_column_ptr);
920
0
    block->replace_by_position(idx, std::move(res));
921
0
    Block::erase_useless_column(block, num_columns_without_result);
922
0
}
923
924
0
Status FileScanner::_create_row_id_column_iterator() {
925
0
    auto& id_file_map = _state->get_id_file_map();
926
0
    auto file_id = id_file_map->get_file_mapping_id(
927
0
            std::make_shared<FileMapping>(((FileScanLocalState*)_local_state)->parent_id(),
928
0
                                          _current_range, _should_enable_file_meta_cache()));
929
0
    _row_id_column_iterator_pair.first = std::make_shared<RowIdColumnIteratorV2>(
930
0
            IdManager::ID_VERSION, BackendOptions::get_backend_id(), file_id);
931
0
    return Status::OK();
932
0
}
933
934
1
Status FileScanner::_get_next_reader() {
935
1
    while (true) {
936
1
        if (_cur_reader) {
937
0
            _cur_reader->collect_profile_before_close();
938
0
            RETURN_IF_ERROR(_cur_reader->close());
939
0
            _state->update_num_finished_scan_range(1);
940
0
        }
941
1
        _cur_reader.reset(nullptr);
942
1
        _src_block_init = false;
943
1
        bool has_next = _first_scan_range;
944
1
        if (!_first_scan_range) {
945
1
            RETURN_IF_ERROR(_split_source->get_next(&has_next, &_current_range));
946
1
        }
947
1
        _first_scan_range = false;
948
1
        if (!has_next || _should_stop) {
949
0
            _scanner_eof = true;
950
0
            return Status::OK();
951
0
        }
952
953
1
        const TFileRangeDesc& range = _current_range;
954
1
        _current_range_path = range.path;
955
956
1
        if (!_partition_slot_descs.empty()) {
957
            // we need get partition columns first for runtime filter partition pruning
958
0
            RETURN_IF_ERROR(_generate_partition_columns());
959
960
0
            if (_state->query_options().enable_runtime_filter_partition_prune) {
961
                // if enable_runtime_filter_partition_prune is true, we need to check whether this range can be filtered out
962
                // by runtime filter partition prune
963
0
                if (_push_down_conjuncts.size() < _conjuncts.size()) {
964
                    // there are new runtime filters, need to re-init runtime filter partition pruning ctxs
965
0
                    _init_runtime_filter_partition_prune_ctxs();
966
0
                }
967
968
0
                bool can_filter_all = false;
969
0
                RETURN_IF_ERROR(_process_runtime_filters_partition_prune(can_filter_all));
970
0
                if (can_filter_all) {
971
                    // this range can be filtered out by runtime filter partition pruning
972
                    // so we need to skip this range
973
0
                    COUNTER_UPDATE(_runtime_filter_partition_pruned_range_counter, 1);
974
0
                    continue;
975
0
                }
976
0
            }
977
0
        }
978
979
        // create reader for specific format
980
1
        Status init_status = Status::OK();
981
1
        TFileFormatType::type format_type = _get_current_format_type();
982
        // for compatibility, this logic is deprecated in 3.1
983
1
        if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params) {
984
0
            if (range.table_format_params.table_format_type == "paimon" &&
985
0
                !range.table_format_params.paimon_params.__isset.paimon_split) {
986
                // use native reader
987
0
                auto format = range.table_format_params.paimon_params.file_format;
988
0
                if (format == "orc") {
989
0
                    format_type = TFileFormatType::FORMAT_ORC;
990
0
                } else if (format == "parquet") {
991
0
                    format_type = TFileFormatType::FORMAT_PARQUET;
992
0
                } else {
993
0
                    return Status::InternalError("Not supported paimon file format: {}", format);
994
0
                }
995
0
            }
996
0
        }
997
998
        // JNI reader can only push down column value range
999
1
        bool push_down_predicates = !_is_load && format_type != TFileFormatType::FORMAT_JNI &&
1000
1
                                    format_type != TFileFormatType::FORMAT_MULTIDATA;
1001
1
        bool need_to_get_parsed_schema = false;
1002
1
        switch (format_type) {
1003
1
        case TFileFormatType::FORMAT_JNI: {
1004
1
            if (range.__isset.table_format_params &&
1005
1
                range.table_format_params.table_format_type == "max_compute") {
1006
0
                const auto* mc_desc = static_cast<const MaxComputeTableDescriptor*>(
1007
0
                        _real_tuple_desc->table_desc());
1008
0
                if (!mc_desc->init_status()) {
1009
0
                    return mc_desc->init_status();
1010
0
                }
1011
0
                std::unique_ptr<MaxComputeJniReader> mc_reader = MaxComputeJniReader::create_unique(
1012
0
                        mc_desc, range.table_format_params.max_compute_params, _file_slot_descs,
1013
0
                        range, _state, _profile);
1014
0
                init_status = mc_reader->init_reader();
1015
0
                _cur_reader = std::move(mc_reader);
1016
1
            } else if (range.__isset.table_format_params &&
1017
1
                       range.table_format_params.table_format_type == "paimon") {
1018
0
                if (_state->query_options().__isset.enable_paimon_cpp_reader &&
1019
0
                    _state->query_options().enable_paimon_cpp_reader) {
1020
0
                    auto cpp_reader = PaimonCppReader::create_unique(_file_slot_descs, _state,
1021
0
                                                                     _profile, range, _params);
1022
0
                    cpp_reader->set_push_down_agg_type(_get_push_down_agg_type());
1023
0
                    if (!_is_load && !_push_down_conjuncts.empty()) {
1024
0
                        PaimonPredicateConverter predicate_converter(_file_slot_descs, _state);
1025
0
                        auto predicate = predicate_converter.build(_push_down_conjuncts);
1026
0
                        if (predicate) {
1027
0
                            cpp_reader->set_predicate(std::move(predicate));
1028
0
                        }
1029
0
                    }
1030
0
                    init_status = cpp_reader->init_reader();
1031
0
                    _cur_reader = std::move(cpp_reader);
1032
0
                } else {
1033
0
                    _cur_reader = PaimonJniReader::create_unique(_file_slot_descs, _state, _profile,
1034
0
                                                                 range, _params);
1035
0
                    init_status = ((PaimonJniReader*)(_cur_reader.get()))->init_reader();
1036
0
                }
1037
1
            } else if (range.__isset.table_format_params &&
1038
1
                       range.table_format_params.table_format_type == "hudi") {
1039
0
                _cur_reader = HudiJniReader::create_unique(*_params,
1040
0
                                                           range.table_format_params.hudi_params,
1041
0
                                                           _file_slot_descs, _state, _profile);
1042
0
                init_status = ((HudiJniReader*)_cur_reader.get())->init_reader();
1043
1044
1
            } else if (range.__isset.table_format_params &&
1045
1
                       range.table_format_params.table_format_type == "trino_connector") {
1046
0
                _cur_reader = TrinoConnectorJniReader::create_unique(_file_slot_descs, _state,
1047
0
                                                                     _profile, range);
1048
0
                init_status = ((TrinoConnectorJniReader*)(_cur_reader.get()))->init_reader();
1049
1
            } else if (range.__isset.table_format_params &&
1050
1
                       range.table_format_params.table_format_type == "jdbc") {
1051
                // Extract jdbc params from table_format_params
1052
0
                std::map<std::string, std::string> jdbc_params(
1053
0
                        range.table_format_params.jdbc_params.begin(),
1054
0
                        range.table_format_params.jdbc_params.end());
1055
0
                _cur_reader = JdbcJniReader::create_unique(_file_slot_descs, _state, _profile,
1056
0
                                                           jdbc_params);
1057
0
                init_status = ((JdbcJniReader*)(_cur_reader.get()))->init_reader();
1058
1
            } else if (range.__isset.table_format_params &&
1059
1
                       range.table_format_params.table_format_type == "iceberg") {
1060
0
                _cur_reader = IcebergSysTableJniReader::create_unique(_file_slot_descs, _state,
1061
0
                                                                      _profile, range, _params);
1062
0
                init_status = ((IcebergSysTableJniReader*)(_cur_reader.get()))->init_reader();
1063
0
            }
1064
            // Set col_name_to_block_idx for JNI readers to avoid repeated map creation
1065
1
            if (_cur_reader) {
1066
0
                if (auto* jni_reader = dynamic_cast<JniReader*>(_cur_reader.get())) {
1067
0
                    jni_reader->set_col_name_to_block_idx(&_src_block_name_to_idx);
1068
0
                }
1069
0
            }
1070
1
            break;
1071
1
        }
1072
0
        case TFileFormatType::FORMAT_MULTIDATA: {
1073
0
            if (range.__isset.table_format_params &&
1074
0
                range.table_format_params.table_format_type == "fileset") {
1075
0
                std::map<std::string, std::string> fileset_params(
1076
0
                        range.table_format_params.fileset_params.begin(),
1077
0
                        range.table_format_params.fileset_params.end());
1078
0
                _cur_reader = FilesetReader::create_unique(_file_slot_descs, _state, _profile,
1079
0
                                                           fileset_params);
1080
0
                init_status = ((FilesetReader*)(_cur_reader.get()))->init_reader();
1081
0
                break;
1082
0
            }
1083
0
            return Status::InternalError("unsupported multidata table format: {}",
1084
0
                                         range.__isset.table_format_params
1085
0
                                                 ? range.table_format_params.table_format_type
1086
0
                                                 : "unknown");
1087
0
        }
1088
0
        case TFileFormatType::FORMAT_PARQUET: {
1089
0
            auto file_meta_cache_ptr = _should_enable_file_meta_cache()
1090
0
                                               ? ExecEnv::GetInstance()->file_meta_cache()
1091
0
                                               : nullptr;
1092
0
            std::unique_ptr<ParquetReader> parquet_reader = ParquetReader::create_unique(
1093
0
                    _profile, *_params, range, _state->query_options().batch_size,
1094
0
                    &_state->timezone_obj(), _io_ctx.get(), _state, file_meta_cache_ptr,
1095
0
                    _state->query_options().enable_parquet_lazy_mat);
1096
1097
0
            if (_row_id_column_iterator_pair.second != -1) {
1098
0
                RETURN_IF_ERROR(_create_row_id_column_iterator());
1099
0
                parquet_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
1100
0
            }
1101
1102
            // ATTN: the push down agg type may be set back to NONE,
1103
            // see IcebergTableReader::init_row_filters for example.
1104
0
            parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
1105
0
            if (push_down_predicates) {
1106
0
                RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1107
0
            }
1108
0
            RETURN_IF_ERROR(_init_parquet_reader(std::move(parquet_reader), file_meta_cache_ptr));
1109
1110
0
            need_to_get_parsed_schema = true;
1111
0
            break;
1112
0
        }
1113
0
        case TFileFormatType::FORMAT_ORC: {
1114
0
            auto file_meta_cache_ptr = _should_enable_file_meta_cache()
1115
0
                                               ? ExecEnv::GetInstance()->file_meta_cache()
1116
0
                                               : nullptr;
1117
0
            std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
1118
0
                    _profile, _state, *_params, range, _state->query_options().batch_size,
1119
0
                    _state->timezone(), _io_ctx.get(), file_meta_cache_ptr,
1120
0
                    _state->query_options().enable_orc_lazy_mat);
1121
0
            if (_row_id_column_iterator_pair.second != -1) {
1122
0
                RETURN_IF_ERROR(_create_row_id_column_iterator());
1123
0
                orc_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
1124
0
            }
1125
1126
0
            orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
1127
0
            if (push_down_predicates) {
1128
0
                RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1129
0
            }
1130
0
            RETURN_IF_ERROR(_init_orc_reader(std::move(orc_reader), file_meta_cache_ptr));
1131
1132
0
            need_to_get_parsed_schema = true;
1133
0
            break;
1134
0
        }
1135
0
        case TFileFormatType::FORMAT_CSV_PLAIN:
1136
0
        case TFileFormatType::FORMAT_CSV_GZ:
1137
0
        case TFileFormatType::FORMAT_CSV_BZ2:
1138
0
        case TFileFormatType::FORMAT_CSV_LZ4FRAME:
1139
0
        case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
1140
0
        case TFileFormatType::FORMAT_CSV_LZOP:
1141
0
        case TFileFormatType::FORMAT_CSV_DEFLATE:
1142
0
        case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
1143
0
        case TFileFormatType::FORMAT_PROTO: {
1144
0
            auto reader = CsvReader::create_unique(_state, _profile, &_counter, *_params, range,
1145
0
                                                   _file_slot_descs, _io_ctx.get());
1146
1147
0
            init_status = reader->init_reader(_is_load);
1148
0
            _cur_reader = std::move(reader);
1149
0
            break;
1150
0
        }
1151
0
        case TFileFormatType::FORMAT_TEXT: {
1152
0
            auto reader = TextReader::create_unique(_state, _profile, &_counter, *_params, range,
1153
0
                                                    _file_slot_descs, _io_ctx.get());
1154
0
            init_status = reader->init_reader(_is_load);
1155
0
            _cur_reader = std::move(reader);
1156
0
            break;
1157
0
        }
1158
0
        case TFileFormatType::FORMAT_JSON: {
1159
0
            _cur_reader =
1160
0
                    NewJsonReader::create_unique(_state, _profile, &_counter, *_params, range,
1161
0
                                                 _file_slot_descs, &_scanner_eof, _io_ctx.get());
1162
0
            init_status = ((NewJsonReader*)(_cur_reader.get()))
1163
0
                                  ->init_reader(_col_default_value_ctx, _is_load);
1164
0
            break;
1165
0
        }
1166
1167
0
        case TFileFormatType::FORMAT_WAL: {
1168
0
            _cur_reader = WalReader::create_unique(_state);
1169
0
            init_status = ((WalReader*)(_cur_reader.get()))->init_reader(_output_tuple_desc);
1170
0
            break;
1171
0
        }
1172
0
        case TFileFormatType::FORMAT_NATIVE: {
1173
0
            auto reader =
1174
0
                    NativeReader::create_unique(_profile, *_params, range, _io_ctx.get(), _state);
1175
0
            init_status = reader->init_reader();
1176
0
            _cur_reader = std::move(reader);
1177
0
            need_to_get_parsed_schema = false;
1178
0
            break;
1179
0
        }
1180
0
        case TFileFormatType::FORMAT_ARROW: {
1181
0
            if (range.__isset.table_format_params &&
1182
0
                range.table_format_params.table_format_type == "remote_doris") {
1183
0
                _cur_reader =
1184
0
                        RemoteDorisReader::create_unique(_file_slot_descs, _state, _profile, range);
1185
0
                init_status = ((RemoteDorisReader*)(_cur_reader.get()))->init_reader();
1186
0
                if (_cur_reader) {
1187
0
                    static_cast<RemoteDorisReader*>(_cur_reader.get())
1188
0
                            ->set_col_name_to_block_idx(&_src_block_name_to_idx);
1189
0
                }
1190
0
            } else {
1191
0
                _cur_reader =
1192
0
                        ArrowStreamReader::create_unique(_state, _profile, &_counter, *_params,
1193
0
                                                         range, _file_slot_descs, _io_ctx.get());
1194
0
                init_status = ((ArrowStreamReader*)(_cur_reader.get()))->init_reader();
1195
0
            }
1196
0
            break;
1197
0
        }
1198
0
        default:
1199
0
            return Status::NotSupported("Not supported create reader for file format: {}.",
1200
0
                                        to_string(_params->format_type));
1201
1
        }
1202
1203
1
        if (_cur_reader == nullptr) {
1204
1
            return Status::NotSupported(
1205
1
                    "Not supported create reader for table format: {} / file format: {}.",
1206
1
                    range.__isset.table_format_params ? range.table_format_params.table_format_type
1207
1
                                                      : "NotSet",
1208
1
                    to_string(_params->format_type));
1209
1
        }
1210
0
        COUNTER_UPDATE(_file_counter, 1);
1211
        // The FileScanner for external table may try to open not exist files,
1212
        // Because FE file cache for external table may out of date.
1213
        // So, NOT_FOUND for FileScanner is not a fail case.
1214
        // Will remove this after file reader refactor.
1215
0
        if (init_status.is<END_OF_FILE>()) {
1216
0
            COUNTER_UPDATE(_empty_file_counter, 1);
1217
0
            continue;
1218
0
        } else if (init_status.is<ErrorCode::NOT_FOUND>()) {
1219
0
            if (config::ignore_not_found_file_in_external_table) {
1220
0
                COUNTER_UPDATE(_not_found_file_counter, 1);
1221
0
                continue;
1222
0
            }
1223
0
            return Status::InternalError("failed to find reader, err: {}", init_status.to_string());
1224
0
        } else if (!init_status.ok()) {
1225
0
            return Status::InternalError("failed to init reader, err: {}", init_status.to_string());
1226
0
        }
1227
1228
0
        _cur_reader->set_push_down_agg_type(_get_push_down_agg_type());
1229
0
        if (_get_push_down_agg_type() == TPushAggOp::type::COUNT &&
1230
0
            range.__isset.table_format_params &&
1231
0
            range.table_format_params.table_level_row_count >= 0) {
1232
            // This is a table level count push down operation, no need to call
1233
            // _set_fill_or_truncate_columns.
1234
            // in _set_fill_or_truncate_columns, we will use [range.start_offset, end offset]
1235
            // to filter the row group. But if this is count push down, the offset is undefined,
1236
            // causing incorrect row group filter and may return empty result.
1237
0
        } else {
1238
0
            Status status = _set_fill_or_truncate_columns(need_to_get_parsed_schema);
1239
0
            if (status.is<END_OF_FILE>()) { // all parquet row groups are filtered
1240
0
                continue;
1241
0
            } else if (!status.ok()) {
1242
0
                return Status::InternalError("failed to set_fill_or_truncate_columns, err: {}",
1243
0
                                             status.to_string());
1244
0
            }
1245
0
        }
1246
0
        _cur_reader_eof = false;
1247
0
        break;
1248
0
    }
1249
0
    return Status::OK();
1250
1
}
1251
1252
Status FileScanner::_init_parquet_reader(std::unique_ptr<ParquetReader>&& parquet_reader,
1253
14
                                         FileMetaCache* file_meta_cache_ptr) {
1254
14
    const TFileRangeDesc& range = _current_range;
1255
14
    Status init_status = Status::OK();
1256
1257
14
    phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> slot_id_to_predicates =
1258
14
            _local_state
1259
14
                    ? _local_state->cast<FileScanLocalState>()._slot_id_to_predicates
1260
14
                    : phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> {};
1261
14
    if (range.__isset.table_format_params &&
1262
14
        range.table_format_params.table_format_type == "iceberg") {
1263
0
        std::unique_ptr<IcebergParquetReader> iceberg_reader = IcebergParquetReader::create_unique(
1264
0
                std::move(parquet_reader), _profile, _state, *_params, range, _kv_cache,
1265
0
                _io_ctx.get(), file_meta_cache_ptr);
1266
0
        if (_need_iceberg_rowid_column) {
1267
0
            iceberg_reader->set_need_row_id_column(true);
1268
0
        }
1269
0
        if (_row_lineage_columns.row_id_column_idx != -1 ||
1270
0
            _row_lineage_columns.last_updated_sequence_number_column_idx != -1) {
1271
0
            std::shared_ptr<RowLineageColumns> row_lineage_columns;
1272
0
            row_lineage_columns = std::make_shared<RowLineageColumns>();
1273
0
            row_lineage_columns->row_id_column_idx = _row_lineage_columns.row_id_column_idx;
1274
0
            row_lineage_columns->last_updated_sequence_number_column_idx =
1275
0
                    _row_lineage_columns.last_updated_sequence_number_column_idx;
1276
0
            iceberg_reader->set_row_lineage_columns(std::move(row_lineage_columns));
1277
0
        }
1278
0
        iceberg_reader->set_push_down_agg_type(_get_push_down_agg_type());
1279
1280
0
        init_status = iceberg_reader->init_reader(
1281
0
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1282
0
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1283
0
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1284
0
                &_slot_id_to_filter_conjuncts);
1285
0
        _cur_reader = std::move(iceberg_reader);
1286
14
    } else if (range.__isset.table_format_params &&
1287
14
               range.table_format_params.table_format_type == "paimon") {
1288
0
        std::unique_ptr<PaimonParquetReader> paimon_reader = PaimonParquetReader::create_unique(
1289
0
                std::move(parquet_reader), _profile, _state, *_params, range, _kv_cache,
1290
0
                _io_ctx.get(), file_meta_cache_ptr);
1291
0
        init_status = paimon_reader->init_reader(
1292
0
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1293
0
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1294
0
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1295
0
                &_slot_id_to_filter_conjuncts);
1296
0
        RETURN_IF_ERROR(paimon_reader->init_row_filters());
1297
0
        _cur_reader = std::move(paimon_reader);
1298
14
    } else if (range.__isset.table_format_params &&
1299
14
               range.table_format_params.table_format_type == "hudi") {
1300
0
        std::unique_ptr<HudiParquetReader> hudi_reader = HudiParquetReader::create_unique(
1301
0
                std::move(parquet_reader), _profile, _state, *_params, range, _io_ctx.get(),
1302
0
                file_meta_cache_ptr);
1303
0
        init_status = hudi_reader->init_reader(
1304
0
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1305
0
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1306
0
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1307
0
                &_slot_id_to_filter_conjuncts);
1308
0
        _cur_reader = std::move(hudi_reader);
1309
14
    } else if (range.table_format_params.table_format_type == "hive") {
1310
14
        auto hive_reader = HiveParquetReader::create_unique(std::move(parquet_reader), _profile,
1311
14
                                                            _state, *_params, range, _io_ctx.get(),
1312
14
                                                            &_is_file_slot, file_meta_cache_ptr);
1313
14
        init_status = hive_reader->init_reader(
1314
14
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1315
14
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1316
14
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1317
14
                &_slot_id_to_filter_conjuncts);
1318
14
        _cur_reader = std::move(hive_reader);
1319
14
    } else if (range.table_format_params.table_format_type == "tvf") {
1320
0
        const FieldDescriptor* parquet_meta = nullptr;
1321
0
        RETURN_IF_ERROR(parquet_reader->get_file_metadata_schema(&parquet_meta));
1322
0
        DCHECK(parquet_meta != nullptr);
1323
1324
        // TVF will first `get_parsed_schema` to obtain file information from BE, and FE will convert
1325
        // the column names to lowercase (because the query process is case-insensitive),
1326
        // so the lowercase file column names are used here to match the read columns.
1327
0
        std::shared_ptr<TableSchemaChangeHelper::Node> tvf_info_node = nullptr;
1328
0
        RETURN_IF_ERROR(TableSchemaChangeHelper::BuildTableInfoUtil::by_parquet_name(
1329
0
                _real_tuple_desc, *parquet_meta, tvf_info_node));
1330
0
        init_status = parquet_reader->init_reader(
1331
0
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1332
0
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1333
0
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1334
0
                &_slot_id_to_filter_conjuncts, tvf_info_node);
1335
0
        _cur_reader = std::move(parquet_reader);
1336
0
    } else if (_is_load) {
1337
0
        const FieldDescriptor* parquet_meta = nullptr;
1338
0
        RETURN_IF_ERROR(parquet_reader->get_file_metadata_schema(&parquet_meta));
1339
0
        DCHECK(parquet_meta != nullptr);
1340
1341
        // Load is case-insensitive, so you to match the columns in the file.
1342
0
        std::map<std::string, std::string> file_lower_name_to_native;
1343
0
        for (const auto& parquet_field : parquet_meta->get_fields_schema()) {
1344
0
            file_lower_name_to_native.emplace(doris::to_lower(parquet_field.name),
1345
0
                                              parquet_field.name);
1346
0
        }
1347
0
        auto load_info_node = std::make_shared<TableSchemaChangeHelper::StructNode>();
1348
0
        for (const auto slot : _real_tuple_desc->slots()) {
1349
0
            if (file_lower_name_to_native.contains(slot->col_name())) {
1350
0
                load_info_node->add_children(slot->col_name(),
1351
0
                                             file_lower_name_to_native[slot->col_name()],
1352
0
                                             TableSchemaChangeHelper::ConstNode::get_instance());
1353
                // For Load, `file_scanner` will create block columns using the file type,
1354
                // there is no schema change when reading inside the struct,
1355
                // so use `TableSchemaChangeHelper::ConstNode`.
1356
0
            } else {
1357
0
                load_info_node->add_not_exist_children(slot->col_name());
1358
0
            }
1359
0
        }
1360
1361
0
        init_status = parquet_reader->init_reader(
1362
0
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1363
0
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1364
0
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1365
0
                &_slot_id_to_filter_conjuncts, load_info_node);
1366
0
        _cur_reader = std::move(parquet_reader);
1367
0
    }
1368
1369
14
    return init_status;
1370
14
}
1371
1372
Status FileScanner::_init_orc_reader(std::unique_ptr<OrcReader>&& orc_reader,
1373
11
                                     FileMetaCache* file_meta_cache_ptr) {
1374
11
    const TFileRangeDesc& range = _current_range;
1375
11
    Status init_status = Status::OK();
1376
1377
11
    if (range.__isset.table_format_params &&
1378
11
        range.table_format_params.table_format_type == "transactional_hive") {
1379
0
        std::unique_ptr<TransactionalHiveReader> tran_orc_reader =
1380
0
                TransactionalHiveReader::create_unique(std::move(orc_reader), _profile, _state,
1381
0
                                                       *_params, range, _io_ctx.get(),
1382
0
                                                       file_meta_cache_ptr);
1383
0
        init_status = tran_orc_reader->init_reader(
1384
0
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
1385
0
                _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1386
0
                &_slot_id_to_filter_conjuncts);
1387
0
        RETURN_IF_ERROR(tran_orc_reader->init_row_filters());
1388
0
        _cur_reader = std::move(tran_orc_reader);
1389
11
    } else if (range.__isset.table_format_params &&
1390
11
               range.table_format_params.table_format_type == "iceberg") {
1391
0
        std::unique_ptr<IcebergOrcReader> iceberg_reader = IcebergOrcReader::create_unique(
1392
0
                std::move(orc_reader), _profile, _state, *_params, range, _kv_cache, _io_ctx.get(),
1393
0
                file_meta_cache_ptr);
1394
0
        if (_need_iceberg_rowid_column) {
1395
0
            iceberg_reader->set_need_row_id_column(true);
1396
0
        }
1397
0
        if (_row_lineage_columns.row_id_column_idx != -1 ||
1398
0
            _row_lineage_columns.last_updated_sequence_number_column_idx != -1) {
1399
0
            std::shared_ptr<RowLineageColumns> row_lineage_columns;
1400
0
            row_lineage_columns = std::make_shared<RowLineageColumns>();
1401
0
            row_lineage_columns->row_id_column_idx = _row_lineage_columns.row_id_column_idx;
1402
0
            row_lineage_columns->last_updated_sequence_number_column_idx =
1403
0
                    _row_lineage_columns.last_updated_sequence_number_column_idx;
1404
0
            iceberg_reader->set_row_lineage_columns(std::move(row_lineage_columns));
1405
0
        }
1406
0
        init_status = iceberg_reader->init_reader(
1407
0
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
1408
0
                _default_val_row_desc.get(), _col_name_to_slot_id,
1409
0
                &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
1410
0
        _cur_reader = std::move(iceberg_reader);
1411
11
    } else if (range.__isset.table_format_params &&
1412
11
               range.table_format_params.table_format_type == "paimon") {
1413
0
        std::unique_ptr<PaimonOrcReader> paimon_reader = PaimonOrcReader::create_unique(
1414
0
                std::move(orc_reader), _profile, _state, *_params, range, _kv_cache, _io_ctx.get(),
1415
0
                file_meta_cache_ptr);
1416
1417
0
        init_status = paimon_reader->init_reader(
1418
0
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
1419
0
                _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1420
0
                &_slot_id_to_filter_conjuncts);
1421
0
        RETURN_IF_ERROR(paimon_reader->init_row_filters());
1422
0
        _cur_reader = std::move(paimon_reader);
1423
11
    } else if (range.__isset.table_format_params &&
1424
11
               range.table_format_params.table_format_type == "hudi") {
1425
0
        std::unique_ptr<HudiOrcReader> hudi_reader =
1426
0
                HudiOrcReader::create_unique(std::move(orc_reader), _profile, _state, *_params,
1427
0
                                             range, _io_ctx.get(), file_meta_cache_ptr);
1428
1429
0
        init_status = hudi_reader->init_reader(
1430
0
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
1431
0
                _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1432
0
                &_slot_id_to_filter_conjuncts);
1433
0
        _cur_reader = std::move(hudi_reader);
1434
11
    } else if (range.__isset.table_format_params &&
1435
11
               range.table_format_params.table_format_type == "hive") {
1436
11
        std::unique_ptr<HiveOrcReader> hive_reader = HiveOrcReader::create_unique(
1437
11
                std::move(orc_reader), _profile, _state, *_params, range, _io_ctx.get(),
1438
11
                &_is_file_slot, file_meta_cache_ptr);
1439
1440
11
        init_status = hive_reader->init_reader(
1441
11
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
1442
11
                _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1443
11
                &_slot_id_to_filter_conjuncts);
1444
11
        _cur_reader = std::move(hive_reader);
1445
11
    } else if (range.__isset.table_format_params &&
1446
0
               range.table_format_params.table_format_type == "tvf") {
1447
0
        const orc::Type* orc_type_ptr = nullptr;
1448
0
        RETURN_IF_ERROR(orc_reader->get_file_type(&orc_type_ptr));
1449
1450
0
        std::shared_ptr<TableSchemaChangeHelper::Node> tvf_info_node = nullptr;
1451
0
        RETURN_IF_ERROR(TableSchemaChangeHelper::BuildTableInfoUtil::by_orc_name(
1452
0
                _real_tuple_desc, orc_type_ptr, tvf_info_node));
1453
0
        init_status = orc_reader->init_reader(
1454
0
                &_file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, false,
1455
0
                _real_tuple_desc, _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1456
0
                &_slot_id_to_filter_conjuncts, tvf_info_node);
1457
0
        _cur_reader = std::move(orc_reader);
1458
0
    } else if (_is_load) {
1459
0
        const orc::Type* orc_type_ptr = nullptr;
1460
0
        RETURN_IF_ERROR(orc_reader->get_file_type(&orc_type_ptr));
1461
1462
0
        std::map<std::string, std::string> file_lower_name_to_native;
1463
0
        for (uint64_t idx = 0; idx < orc_type_ptr->getSubtypeCount(); idx++) {
1464
0
            file_lower_name_to_native.emplace(doris::to_lower(orc_type_ptr->getFieldName(idx)),
1465
0
                                              orc_type_ptr->getFieldName(idx));
1466
0
        }
1467
1468
0
        auto load_info_node = std::make_shared<TableSchemaChangeHelper::StructNode>();
1469
0
        for (const auto slot : _real_tuple_desc->slots()) {
1470
0
            if (file_lower_name_to_native.contains(slot->col_name())) {
1471
0
                load_info_node->add_children(slot->col_name(),
1472
0
                                             file_lower_name_to_native[slot->col_name()],
1473
0
                                             TableSchemaChangeHelper::ConstNode::get_instance());
1474
0
            } else {
1475
0
                load_info_node->add_not_exist_children(slot->col_name());
1476
0
            }
1477
0
        }
1478
0
        init_status = orc_reader->init_reader(
1479
0
                &_file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, false,
1480
0
                _real_tuple_desc, _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1481
0
                &_slot_id_to_filter_conjuncts, load_info_node);
1482
0
        _cur_reader = std::move(orc_reader);
1483
0
    }
1484
1485
11
    return init_status;
1486
11
}
1487
1488
25
Status FileScanner::_set_fill_or_truncate_columns(bool need_to_get_parsed_schema) {
1489
25
    _missing_cols.clear();
1490
25
    _slot_lower_name_to_col_type.clear();
1491
25
    std::unordered_map<std::string, DataTypePtr> name_to_col_type;
1492
25
    RETURN_IF_ERROR(_cur_reader->get_columns(&name_to_col_type, &_missing_cols));
1493
25
    if (_need_iceberg_rowid_column && _current_range.__isset.table_format_params &&
1494
25
        _current_range.table_format_params.table_format_type == "iceberg") {
1495
0
        _missing_cols.erase(BeConsts::ICEBERG_ROWID_COL);
1496
0
        _missing_cols.erase(to_lower(BeConsts::ICEBERG_ROWID_COL));
1497
0
    }
1498
309
    for (const auto& [col_name, col_type] : name_to_col_type) {
1499
309
        auto col_name_lower = to_lower(col_name);
1500
309
        if (_partition_col_descs.contains(col_name_lower)) {
1501
            /*
1502
             * `_slot_lower_name_to_col_type` is used by `_init_src_block` and `_cast_to_input_block` during LOAD to
1503
             * generate columns of the corresponding type, which records the columns existing in the file.
1504
             *
1505
             * When a column in `COLUMNS FROM PATH` exists in a file column, the column type in the block will
1506
             * not match the slot type in `_output_tuple_desc`, causing an error when
1507
             * Serde `deserialize_one_cell_from_json` fills the partition values.
1508
             *
1509
             * So for partition column not need fill _slot_lower_name_to_col_type.
1510
             */
1511
0
            continue;
1512
0
        }
1513
309
        _slot_lower_name_to_col_type.emplace(col_name_lower, col_type);
1514
309
    }
1515
1516
25
    if (!_fill_partition_from_path && config::enable_iceberg_partition_column_fallback) {
1517
        // check if the cols of _partition_col_descs are in _missing_cols
1518
        // if so, set _fill_partition_from_path to true and remove the col from _missing_cols
1519
0
        for (const auto& [col_name, col_type] : _partition_col_descs) {
1520
0
            if (_missing_cols.contains(col_name)) {
1521
0
                _fill_partition_from_path = true;
1522
0
                _missing_cols.erase(col_name);
1523
0
            }
1524
0
        }
1525
0
    }
1526
1527
25
    RETURN_IF_ERROR(_generate_missing_columns());
1528
25
    if (_fill_partition_from_path) {
1529
25
        RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs));
1530
25
    } else {
1531
        // If the partition columns are not from path, we only fill the missing columns.
1532
0
        RETURN_IF_ERROR(_cur_reader->set_fill_columns({}, _missing_col_descs));
1533
0
    }
1534
25
    if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
1535
0
        fmt::memory_buffer col_buf;
1536
0
        for (auto& col : _missing_cols) {
1537
0
            fmt::format_to(col_buf, " {}", col);
1538
0
        }
1539
0
        VLOG_NOTICE << fmt::format("Unknown columns:{} in file {}", fmt::to_string(col_buf),
1540
0
                                   _current_range.path);
1541
0
    }
1542
1543
25
    RETURN_IF_ERROR(_generate_truncate_columns(need_to_get_parsed_schema));
1544
25
    return Status::OK();
1545
25
}
1546
1547
25
Status FileScanner::_generate_truncate_columns(bool need_to_get_parsed_schema) {
1548
25
    _source_file_col_name_types.clear();
1549
    //  The col names and types of source file, such as parquet, orc files.
1550
25
    if (_state->query_options().truncate_char_or_varchar_columns && need_to_get_parsed_schema) {
1551
0
        std::vector<std::string> source_file_col_names;
1552
0
        std::vector<DataTypePtr> source_file_col_types;
1553
0
        Status status =
1554
0
                _cur_reader->get_parsed_schema(&source_file_col_names, &source_file_col_types);
1555
0
        if (!status.ok() && status.code() != TStatusCode::NOT_IMPLEMENTED_ERROR) {
1556
0
            return status;
1557
0
        }
1558
0
        DCHECK_EQ(source_file_col_names.size(), source_file_col_types.size());
1559
0
        for (int i = 0; i < source_file_col_names.size(); ++i) {
1560
0
            _source_file_col_name_types[to_lower(source_file_col_names[i])] =
1561
0
                    source_file_col_types[i];
1562
0
        }
1563
0
    }
1564
25
    return Status::OK();
1565
25
}
1566
1567
16
Status FileScanner::prepare_for_read_lines(const TFileRangeDesc& range) {
1568
16
    _current_range = range;
1569
1570
16
    _file_cache_statistics.reset(new io::FileCacheStatistics());
1571
16
    _file_reader_stats.reset(new io::FileReaderStats());
1572
1573
16
    _file_read_bytes_counter =
1574
16
            ADD_COUNTER_WITH_LEVEL(_profile, FileReadBytesProfile, TUnit::BYTES, 1);
1575
16
    _file_read_time_counter = ADD_TIMER_WITH_LEVEL(_profile, FileReadTimeProfile, 1);
1576
1577
16
    RETURN_IF_ERROR(_init_io_ctx());
1578
16
    _io_ctx->file_cache_stats = _file_cache_statistics.get();
1579
16
    _io_ctx->file_reader_stats = _file_reader_stats.get();
1580
16
    _default_val_row_desc.reset(new RowDescriptor((TupleDescriptor*)_real_tuple_desc));
1581
16
    RETURN_IF_ERROR(_init_expr_ctxes());
1582
1583
    // Since only one column is read from the file, there is no need to filter, so set these variables to empty.
1584
16
    _push_down_conjuncts.clear();
1585
16
    _not_single_slot_filter_conjuncts.clear();
1586
16
    _slot_id_to_filter_conjuncts.clear();
1587
16
    _kv_cache = nullptr;
1588
16
    return Status::OK();
1589
16
}
1590
1591
Status FileScanner::read_lines_from_range(const TFileRangeDesc& range,
1592
                                          const std::list<int64_t>& row_ids, Block* result_block,
1593
                                          const ExternalFileMappingInfo& external_info,
1594
25
                                          int64_t* init_reader_ms, int64_t* get_block_ms) {
1595
25
    _current_range = range;
1596
25
    RETURN_IF_ERROR(_generate_partition_columns());
1597
1598
25
    TFileFormatType::type format_type = _get_current_format_type();
1599
25
    Status init_status = Status::OK();
1600
1601
25
    auto file_meta_cache_ptr = external_info.enable_file_meta_cache
1602
25
                                       ? ExecEnv::GetInstance()->file_meta_cache()
1603
25
                                       : nullptr;
1604
1605
25
    RETURN_IF_ERROR(scope_timer_run(
1606
25
            [&]() -> Status {
1607
25
                switch (format_type) {
1608
25
                case TFileFormatType::FORMAT_PARQUET: {
1609
25
                    std::unique_ptr<ParquetReader> parquet_reader = ParquetReader::create_unique(
1610
25
                            _profile, *_params, range, 1, &_state->timezone_obj(), _io_ctx.get(),
1611
25
                            _state, file_meta_cache_ptr, false);
1612
1613
25
                    RETURN_IF_ERROR(parquet_reader->read_by_rows(row_ids));
1614
25
                    RETURN_IF_ERROR(
1615
25
                            _init_parquet_reader(std::move(parquet_reader), file_meta_cache_ptr));
1616
25
                    break;
1617
25
                }
1618
25
                case TFileFormatType::FORMAT_ORC: {
1619
25
                    std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
1620
25
                            _profile, _state, *_params, range, 1, _state->timezone(), _io_ctx.get(),
1621
25
                            file_meta_cache_ptr, false);
1622
1623
25
                    RETURN_IF_ERROR(orc_reader->read_by_rows(row_ids));
1624
25
                    RETURN_IF_ERROR(_init_orc_reader(std::move(orc_reader), file_meta_cache_ptr));
1625
25
                    break;
1626
25
                }
1627
25
                default: {
1628
25
                    return Status::NotSupported(
1629
25
                            "Not support create lines reader for file format: {},"
1630
25
                            "only support parquet and orc.",
1631
25
                            to_string(_params->format_type));
1632
25
                }
1633
25
                }
1634
25
                return Status::OK();
1635
25
            },
1636
25
            init_reader_ms));
1637
1638
25
    RETURN_IF_ERROR(_set_fill_or_truncate_columns(true));
1639
25
    _cur_reader_eof = false;
1640
1641
25
    RETURN_IF_ERROR(scope_timer_run(
1642
25
            [&]() -> Status {
1643
25
                while (!_cur_reader_eof) {
1644
25
                    bool eof = false;
1645
25
                    RETURN_IF_ERROR(_get_block_impl(_state, result_block, &eof));
1646
25
                }
1647
25
                return Status::OK();
1648
25
            },
1649
25
            get_block_ms));
1650
1651
25
    _cur_reader->collect_profile_before_close();
1652
25
    RETURN_IF_ERROR(_cur_reader->close());
1653
1654
25
    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
1655
25
    COUNTER_UPDATE(_file_read_time_counter, _file_reader_stats->read_time_ns);
1656
25
    return Status::OK();
1657
25
}
1658
1659
25
Status FileScanner::_generate_partition_columns() {
1660
25
    _partition_col_descs.clear();
1661
25
    _partition_value_is_null.clear();
1662
25
    const TFileRangeDesc& range = _current_range;
1663
25
    if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
1664
0
        for (const auto& slot_desc : _partition_slot_descs) {
1665
0
            if (slot_desc) {
1666
0
                auto it = _partition_slot_index_map.find(slot_desc->id());
1667
0
                if (it == std::end(_partition_slot_index_map)) {
1668
0
                    return Status::InternalError("Unknown source slot descriptor, slot_id={}",
1669
0
                                                 slot_desc->id());
1670
0
                }
1671
0
                const std::string& column_from_path = range.columns_from_path[it->second];
1672
0
                _partition_col_descs.emplace(slot_desc->col_name(),
1673
0
                                             std::make_tuple(column_from_path, slot_desc));
1674
0
                if (range.__isset.columns_from_path_is_null) {
1675
0
                    _partition_value_is_null.emplace(slot_desc->col_name(),
1676
0
                                                     range.columns_from_path_is_null[it->second]);
1677
0
                }
1678
0
            }
1679
0
        }
1680
0
    }
1681
25
    return Status::OK();
1682
25
}
1683
1684
25
Status FileScanner::_generate_missing_columns() {
1685
25
    _missing_col_descs.clear();
1686
25
    if (!_missing_cols.empty()) {
1687
0
        for (auto* slot_desc : _real_tuple_desc->slots()) {
1688
0
            if (!_missing_cols.contains(slot_desc->col_name())) {
1689
0
                continue;
1690
0
            }
1691
1692
0
            auto it = _col_default_value_ctx.find(slot_desc->col_name());
1693
0
            if (it == _col_default_value_ctx.end()) {
1694
0
                return Status::InternalError("failed to find default value expr for slot: {}",
1695
0
                                             slot_desc->col_name());
1696
0
            }
1697
0
            _missing_col_descs.emplace(slot_desc->col_name(), it->second);
1698
0
        }
1699
0
    }
1700
25
    return Status::OK();
1701
25
}
1702
1703
16
Status FileScanner::_init_expr_ctxes() {
1704
16
    std::map<SlotId, int> full_src_index_map;
1705
16
    std::map<SlotId, SlotDescriptor*> full_src_slot_map;
1706
16
    std::map<std::string, int> partition_name_to_key_index_map;
1707
16
    int index = 0;
1708
117
    for (const auto& slot_desc : _real_tuple_desc->slots()) {
1709
117
        full_src_slot_map.emplace(slot_desc->id(), slot_desc);
1710
117
        full_src_index_map.emplace(slot_desc->id(), index++);
1711
117
    }
1712
1713
    // For external table query, find the index of column in path.
1714
    // Because query doesn't always search for all columns in a table
1715
    // and the order of selected columns is random.
1716
    // All ranges in _ranges vector should have identical columns_from_path_keys
1717
    // because they are all file splits for the same external table.
1718
    // So here use the first element of _ranges to fill the partition_name_to_key_index_map
1719
16
    if (_current_range.__isset.columns_from_path_keys) {
1720
0
        std::vector<std::string> key_map = _current_range.columns_from_path_keys;
1721
0
        if (!key_map.empty()) {
1722
0
            for (size_t i = 0; i < key_map.size(); i++) {
1723
0
                partition_name_to_key_index_map.emplace(key_map[i], i);
1724
0
            }
1725
0
        }
1726
0
    }
1727
1728
16
    _num_of_columns_from_file = _params->num_of_columns_from_file;
1729
1730
117
    for (const auto& slot_info : _params->required_slots) {
1731
117
        auto slot_id = slot_info.slot_id;
1732
117
        auto it = full_src_slot_map.find(slot_id);
1733
117
        if (it == std::end(full_src_slot_map)) {
1734
0
            return Status::InternalError(
1735
0
                    fmt::format("Unknown source slot descriptor, slot_id={}", slot_id));
1736
0
        }
1737
117
        if (it->second->col_name().starts_with(BeConsts::GLOBAL_ROWID_COL)) {
1738
0
            _row_id_column_iterator_pair.second = _default_val_row_desc->get_column_id(slot_id);
1739
0
            continue;
1740
0
        }
1741
117
        if (it->second->col_name() == BeConsts::ICEBERG_ROWID_COL) {
1742
0
            _need_iceberg_rowid_column = true;
1743
0
            continue;
1744
0
        }
1745
1746
117
        if (it->second->col_name() == IcebergTableReader::ROW_LINEAGE_ROW_ID) {
1747
0
            _row_lineage_columns.row_id_column_idx = _default_val_row_desc->get_column_id(slot_id);
1748
0
        }
1749
1750
117
        if (it->second->col_name() == IcebergTableReader::ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
1751
0
            _row_lineage_columns.last_updated_sequence_number_column_idx =
1752
0
                    _default_val_row_desc->get_column_id(slot_id);
1753
0
        }
1754
1755
117
        if (slot_info.is_file_slot) {
1756
117
            _is_file_slot.emplace(slot_id);
1757
117
            _file_slot_descs.emplace_back(it->second);
1758
117
            _file_col_names.push_back(it->second->col_name());
1759
117
        }
1760
1761
117
        if (partition_name_to_key_index_map.contains(it->second->col_name())) {
1762
0
            if (slot_info.is_file_slot) {
1763
                // If there is slot which is both a partition column and a file column,
1764
                // we should not fill the partition column from path.
1765
0
                _fill_partition_from_path = false;
1766
0
            } else if (!_fill_partition_from_path) {
1767
                // This should not happen
1768
0
                return Status::InternalError(
1769
0
                        "Partition column {} is not a file column, but there is already a column "
1770
0
                        "which is both a partition column and a file column.",
1771
0
                        it->second->col_name());
1772
0
            }
1773
0
            _partition_slot_descs.emplace_back(it->second);
1774
0
            if (_is_load) {
1775
0
                auto iti = full_src_index_map.find(slot_id);
1776
0
                _partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file);
1777
0
            } else {
1778
0
                auto kit = partition_name_to_key_index_map.find(it->second->col_name());
1779
0
                _partition_slot_index_map.emplace(slot_id, kit->second);
1780
0
            }
1781
0
        }
1782
117
    }
1783
1784
    // set column name to default value expr map
1785
117
    for (auto* slot_desc : _real_tuple_desc->slots()) {
1786
117
        VExprContextSPtr ctx;
1787
117
        auto it = _params->default_value_of_src_slot.find(slot_desc->id());
1788
117
        if (it != std::end(_params->default_value_of_src_slot)) {
1789
0
            if (!it->second.nodes.empty()) {
1790
0
                RETURN_IF_ERROR(VExpr::create_expr_tree(it->second, ctx));
1791
0
                RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc));
1792
0
                RETURN_IF_ERROR(ctx->open(_state));
1793
0
            }
1794
            // if expr is empty, the default value will be null
1795
0
            _col_default_value_ctx.emplace(slot_desc->col_name(), ctx);
1796
0
        }
1797
117
    }
1798
1799
16
    if (_is_load) {
1800
        // follow desc expr map is only for load task.
1801
0
        bool has_slot_id_map = _params->__isset.dest_sid_to_src_sid_without_trans;
1802
0
        int idx = 0;
1803
0
        for (auto* slot_desc : _output_tuple_desc->slots()) {
1804
0
            auto it = _params->expr_of_dest_slot.find(slot_desc->id());
1805
0
            if (it == std::end(_params->expr_of_dest_slot)) {
1806
0
                return Status::InternalError("No expr for dest slot, id={}, name={}",
1807
0
                                             slot_desc->id(), slot_desc->col_name());
1808
0
            }
1809
1810
0
            VExprContextSPtr ctx;
1811
0
            if (!it->second.nodes.empty()) {
1812
0
                RETURN_IF_ERROR(VExpr::create_expr_tree(it->second, ctx));
1813
0
                RETURN_IF_ERROR(ctx->prepare(_state, *_src_row_desc));
1814
0
                RETURN_IF_ERROR(ctx->open(_state));
1815
0
            }
1816
0
            _dest_vexpr_ctx.emplace_back(ctx);
1817
0
            _dest_slot_name_to_idx[slot_desc->col_name()] = idx++;
1818
1819
0
            if (has_slot_id_map) {
1820
0
                auto it1 = _params->dest_sid_to_src_sid_without_trans.find(slot_desc->id());
1821
0
                if (it1 == std::end(_params->dest_sid_to_src_sid_without_trans)) {
1822
0
                    _src_slot_descs_order_by_dest.emplace_back(nullptr);
1823
0
                } else {
1824
0
                    auto _src_slot_it = full_src_slot_map.find(it1->second);
1825
0
                    if (_src_slot_it == std::end(full_src_slot_map)) {
1826
0
                        return Status::InternalError("No src slot {} in src slot descs",
1827
0
                                                     it1->second);
1828
0
                    }
1829
0
                    _dest_slot_to_src_slot_index.emplace(_src_slot_descs_order_by_dest.size(),
1830
0
                                                         full_src_index_map[_src_slot_it->first]);
1831
0
                    _src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second);
1832
0
                }
1833
0
            }
1834
0
        }
1835
0
    }
1836
16
    return Status::OK();
1837
16
}
1838
1839
3
bool FileScanner::_should_enable_condition_cache() {
1840
3
    return _condition_cache_digest != 0 && !_is_load &&
1841
3
           (!_conjuncts.empty() || !_push_down_conjuncts.empty());
1842
3
}
1843
1844
0
void FileScanner::_init_reader_condition_cache() {
1845
0
    _condition_cache = nullptr;
1846
0
    _condition_cache_ctx = nullptr;
1847
1848
0
    if (!_should_enable_condition_cache() || !_cur_reader) {
1849
0
        return;
1850
0
    }
1851
1852
    // Disable condition cache when delete operations exist (e.g. Iceberg position/equality
1853
    // deletes, Hive ACID deletes). Cached granule results may become stale if delete files
1854
    // change between queries while the data file's cache key remains the same.
1855
0
    if (_cur_reader->has_delete_operations()) {
1856
0
        return;
1857
0
    }
1858
1859
0
    auto* cache = segment_v2::ConditionCache::instance();
1860
0
    _condition_cache_key = segment_v2::ConditionCache::ExternalCacheKey(
1861
0
            _current_range.path,
1862
0
            _current_range.__isset.modification_time ? _current_range.modification_time : 0,
1863
0
            _current_range.__isset.file_size ? _current_range.file_size : -1,
1864
0
            _condition_cache_digest,
1865
0
            _current_range.__isset.start_offset ? _current_range.start_offset : 0,
1866
0
            _current_range.__isset.size ? _current_range.size : -1);
1867
1868
0
    segment_v2::ConditionCacheHandle handle;
1869
0
    auto condition_cache_hit = cache->lookup(_condition_cache_key, &handle);
1870
0
    if (condition_cache_hit) {
1871
0
        _condition_cache = handle.get_filter_result();
1872
0
        _condition_cache_hit_count++;
1873
0
    } else {
1874
        // Allocate cache pre-sized to total number of granules.
1875
        // We add +1 as a safety margin: when a file is split across multiple scanners
1876
        // and the first row of this scanner's range is not aligned to a granule boundary,
1877
        // the data may span one more granule than ceil(total_rows / GRANULE_SIZE).
1878
        // The extra element costs only 1 bit and never affects correctness (an extra
1879
        // false-granule beyond the actual data range won't overlap any real row range).
1880
0
        int64_t total_rows = _cur_reader->get_total_rows();
1881
0
        if (total_rows > 0) {
1882
0
            size_t num_granules = (total_rows + ConditionCacheContext::GRANULE_SIZE - 1) /
1883
0
                                  ConditionCacheContext::GRANULE_SIZE;
1884
0
            _condition_cache = std::make_shared<std::vector<bool>>(num_granules + 1, false);
1885
0
        }
1886
0
    }
1887
1888
0
    if (_condition_cache) {
1889
        // Create context to pass to readers (native readers use it; non-native readers ignore it)
1890
0
        _condition_cache_ctx = std::make_shared<ConditionCacheContext>();
1891
0
        _condition_cache_ctx->is_hit = condition_cache_hit;
1892
0
        _condition_cache_ctx->filter_result = _condition_cache;
1893
0
        _cur_reader->set_condition_cache_context(_condition_cache_ctx);
1894
0
    }
1895
0
}
1896
1897
3
void FileScanner::_finalize_reader_condition_cache() {
1898
3
    if (!_should_enable_condition_cache() || !_condition_cache_ctx ||
1899
3
        _condition_cache_ctx->is_hit) {
1900
3
        _condition_cache = nullptr;
1901
3
        _condition_cache_ctx = nullptr;
1902
3
        return;
1903
3
    }
1904
    // Only store the cache if the reader was fully consumed. If the scan was
1905
    // truncated early (e.g. by LIMIT), the cache is incomplete — unread granules
1906
    // would remain false and cause surviving rows to be incorrectly skipped on HIT.
1907
0
    if (!_cur_reader_eof) {
1908
0
        _condition_cache = nullptr;
1909
0
        _condition_cache_ctx = nullptr;
1910
0
        return;
1911
0
    }
1912
1913
0
    auto* cache = segment_v2::ConditionCache::instance();
1914
0
    cache->insert(_condition_cache_key, std::move(_condition_cache));
1915
0
    _condition_cache = nullptr;
1916
0
    _condition_cache_ctx = nullptr;
1917
0
}
1918
1919
2
Status FileScanner::close(RuntimeState* state) {
1920
2
    if (!_try_close()) {
1921
0
        return Status::OK();
1922
0
    }
1923
1924
2
    _finalize_reader_condition_cache();
1925
1926
2
    if (_cur_reader) {
1927
0
        RETURN_IF_ERROR(_cur_reader->close());
1928
0
    }
1929
1930
2
    RETURN_IF_ERROR(Scanner::close(state));
1931
2
    return Status::OK();
1932
2
}
1933
1934
0
void FileScanner::try_stop() {
1935
0
    Scanner::try_stop();
1936
0
    if (_io_ctx) {
1937
0
        _io_ctx->should_stop = true;
1938
0
    }
1939
0
}
1940
1941
0
void FileScanner::update_realtime_counters() {
1942
0
    FileScanLocalState* local_state = static_cast<FileScanLocalState*>(_local_state);
1943
1944
0
    COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes);
1945
0
    COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows);
1946
1947
0
    _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_rows(
1948
0
            _file_reader_stats->read_rows);
1949
0
    _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes(
1950
0
            _file_reader_stats->read_bytes);
1951
1952
0
    int64_t delta_bytes_read_from_local =
1953
0
            _file_cache_statistics->bytes_read_from_local - _last_bytes_read_from_local;
1954
0
    int64_t delta_bytes_read_from_remote =
1955
0
            _file_cache_statistics->bytes_read_from_remote - _last_bytes_read_from_remote;
1956
0
    if (_file_cache_statistics->bytes_read_from_local == 0 &&
1957
0
        _file_cache_statistics->bytes_read_from_remote == 0) {
1958
0
        _state->get_query_ctx()
1959
0
                ->resource_ctx()
1960
0
                ->io_context()
1961
0
                ->update_scan_bytes_from_remote_storage(_file_reader_stats->read_bytes);
1962
0
        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
1963
0
                _file_reader_stats->read_bytes);
1964
0
    } else {
1965
0
        _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage(
1966
0
                delta_bytes_read_from_local);
1967
0
        _state->get_query_ctx()
1968
0
                ->resource_ctx()
1969
0
                ->io_context()
1970
0
                ->update_scan_bytes_from_remote_storage(delta_bytes_read_from_remote);
1971
0
        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
1972
0
                delta_bytes_read_from_local);
1973
0
        DorisMetrics::instance()->query_scan_bytes_from_remote->increment(
1974
0
                delta_bytes_read_from_remote);
1975
0
    }
1976
1977
0
    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
1978
1979
0
    DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes);
1980
0
    DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows);
1981
1982
0
    _file_reader_stats->read_bytes = 0;
1983
0
    _file_reader_stats->read_rows = 0;
1984
1985
0
    _last_bytes_read_from_local = _file_cache_statistics->bytes_read_from_local;
1986
0
    _last_bytes_read_from_remote = _file_cache_statistics->bytes_read_from_remote;
1987
0
}
1988
1989
0
void FileScanner::_collect_profile_before_close() {
1990
0
    Scanner::_collect_profile_before_close();
1991
0
    if (config::enable_file_cache && _state->query_options().enable_file_cache &&
1992
0
        _profile != nullptr) {
1993
0
        io::FileCacheProfileReporter cache_profile(_profile);
1994
0
        cache_profile.update(_file_cache_statistics.get());
1995
0
        _state->get_query_ctx()->resource_ctx()->io_context()->update_bytes_write_into_cache(
1996
0
                _file_cache_statistics->bytes_write_into_cache);
1997
0
    }
1998
1999
0
    if (_cur_reader != nullptr) {
2000
0
        _cur_reader->collect_profile_before_close();
2001
0
    }
2002
2003
0
    FileScanLocalState* local_state = static_cast<FileScanLocalState*>(_local_state);
2004
0
    COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes);
2005
0
    COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows);
2006
2007
0
    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
2008
0
    COUNTER_UPDATE(_file_read_calls_counter, _file_reader_stats->read_calls);
2009
0
    COUNTER_UPDATE(_file_read_time_counter, _file_reader_stats->read_time_ns);
2010
0
    COUNTER_UPDATE(local_state->_condition_cache_hit_counter, _condition_cache_hit_count);
2011
0
    if (_io_ctx) {
2012
0
        COUNTER_UPDATE(local_state->_condition_cache_filtered_rows_counter,
2013
0
                       _io_ctx->condition_cache_filtered_rows);
2014
0
    }
2015
2016
0
    DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes);
2017
0
    DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows);
2018
0
}
2019
2020
} // namespace doris