Coverage Report

Created: 2026-03-16 06:27

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/file_scanner.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/file_scanner.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Exprs_types.h>
22
#include <gen_cpp/Metrics_types.h>
23
#include <gen_cpp/Opcodes_types.h>
24
#include <gen_cpp/PaloInternalService_types.h>
25
#include <gen_cpp/PlanNodes_types.h>
26
#include <glog/logging.h>
27
28
#include <algorithm>
29
#include <boost/iterator/iterator_facade.hpp>
30
#include <map>
31
#include <ranges>
32
#include <tuple>
33
#include <unordered_map>
34
#include <utility>
35
36
#include "common/compiler_util.h" // IWYU pragma: keep
37
#include "common/config.h"
38
#include "common/logging.h"
39
#include "common/status.h"
40
#include "core/block/column_with_type_and_name.h"
41
#include "core/block/columns_with_type_and_name.h"
42
#include "core/column/column.h"
43
#include "core/column/column_nullable.h"
44
#include "core/column/column_vector.h"
45
#include "core/data_type/data_type.h"
46
#include "core/data_type/data_type_nullable.h"
47
#include "core/data_type/data_type_string.h"
48
#include "core/string_ref.h"
49
#include "exec/common/stringop_substring.h"
50
#include "exec/rowid_fetcher.h"
51
#include "exec/scan/scan_node.h"
52
#include "exprs/aggregate/aggregate_function.h"
53
#include "exprs/function/function.h"
54
#include "exprs/function/simple_function_factory.h"
55
#include "exprs/vexpr.h"
56
#include "exprs/vexpr_context.h"
57
#include "exprs/vexpr_fwd.h"
58
#include "exprs/vslot_ref.h"
59
#include "format/arrow/arrow_stream_reader.h"
60
#include "format/csv/csv_reader.h"
61
#include "format/json/new_json_reader.h"
62
#include "format/native/native_reader.h"
63
#include "format/orc/vorc_reader.h"
64
#include "format/parquet/vparquet_reader.h"
65
#include "format/table/hive_reader.h"
66
#include "format/table/hudi_jni_reader.h"
67
#include "format/table/hudi_reader.h"
68
#include "format/table/iceberg_reader.h"
69
#include "format/table/jdbc_jni_reader.h"
70
#include "format/table/max_compute_jni_reader.h"
71
#include "format/table/paimon_cpp_reader.h"
72
#include "format/table/paimon_jni_reader.h"
73
#include "format/table/paimon_predicate_converter.h"
74
#include "format/table/paimon_reader.h"
75
#include "format/table/remote_doris_reader.h"
76
#include "format/table/transactional_hive_reader.h"
77
#include "format/table/trino_connector_jni_reader.h"
78
#include "format/text/text_reader.h"
79
#include "io/cache/block_file_cache_profile.h"
80
#include "load/group_commit/wal/wal_reader.h"
81
#include "runtime/descriptors.h"
82
#include "runtime/runtime_profile.h"
83
#include "runtime/runtime_state.h"
84
85
namespace cctz {
86
class time_zone;
87
} // namespace cctz
88
namespace doris {
89
class ShardedKVCache;
90
} // namespace doris
91
92
namespace doris {
93
#include "common/compile_check_begin.h"
94
using namespace ErrorCode;
95
96
const std::string FileScanner::FileReadBytesProfile = "FileReadBytes";
97
const std::string FileScanner::FileReadTimeProfile = "FileReadTime";
98
99
FileScanner::FileScanner(RuntimeState* state, FileScanLocalState* local_state, int64_t limit,
100
                         std::shared_ptr<SplitSourceConnector> split_source,
101
                         RuntimeProfile* profile, ShardedKVCache* kv_cache,
102
                         const std::unordered_map<std::string, int>* colname_to_slot_id)
103
        : Scanner(state, local_state, limit, profile),
104
          _split_source(split_source),
105
          _cur_reader(nullptr),
106
2.67k
          _cur_reader_eof(false),
107
2.67k
          _kv_cache(kv_cache),
108
2.67k
          _strict_mode(false),
109
2.67k
          _col_name_to_slot_id(colname_to_slot_id) {
110
2.67k
    if (state->get_query_ctx() != nullptr &&
111
2.67k
        state->get_query_ctx()->file_scan_range_params_map.count(local_state->parent_id()) > 0) {
112
2.67k
        _params = &(state->get_query_ctx()->file_scan_range_params_map[local_state->parent_id()]);
113
2.67k
    } else {
114
2.67k
        // old fe thrift protocol
115
516
        _params = _split_source->get_params();
116
2.15k
    }
117
    if (_params->__isset.strict_mode) {
118
2.15k
        _strict_mode = _params->strict_mode;
119
2.15k
    }
120
2.67k
121
2.15k
    // For load scanner, there are input and output tuple.
122
2.15k
    // For query scanner, there is only output tuple
123
    _input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_params->src_tuple_id);
124
    _real_tuple_desc = _input_tuple_desc == nullptr ? _output_tuple_desc : _input_tuple_desc;
125
    _is_load = (_input_tuple_desc != nullptr);
126
2.67k
}
127
2.67k
128
2.67k
Status FileScanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) {
129
2.67k
    RETURN_IF_ERROR(Scanner::init(state, conjuncts));
130
    _get_block_timer =
131
2.67k
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerGetBlockTime", 1);
132
2.67k
    _cast_to_input_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
133
2.67k
                                                      "FileScannerCastInputBlockTime", 1);
134
2.67k
    _fill_missing_columns_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
135
2.67k
                                                       "FileScannerFillMissingColumnTime", 1);
136
2.67k
    _pre_filter_timer =
137
2.67k
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerPreFilterTimer", 1);
138
2.67k
    _convert_to_output_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
139
2.67k
                                                          "FileScannerConvertOuputBlockTime", 1);
140
2.67k
    _runtime_filter_partition_prune_timer = ADD_TIMER_WITH_LEVEL(
141
2.67k
            _local_state->scanner_profile(), "FileScannerRuntimeFilterPartitionPruningTime", 1);
142
2.67k
    _empty_file_counter =
143
2.67k
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "EmptyFileNum", TUnit::UNIT, 1);
144
2.67k
    _not_found_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
145
2.67k
                                                     "NotFoundFileNum", TUnit::UNIT, 1);
146
2.67k
    _fully_skipped_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
147
2.67k
                                                         "FullySkippedFileNum", TUnit::UNIT, 1);
148
2.67k
    _file_counter =
149
2.67k
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT, 1);
150
2.67k
151
2.67k
    _file_read_bytes_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
152
2.67k
                                                      FileReadBytesProfile, TUnit::BYTES, 1);
153
    _file_read_calls_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
154
2.67k
                                                      "FileReadCalls", TUnit::UNIT, 1);
155
2.67k
    _file_read_time_counter =
156
2.67k
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), FileReadTimeProfile, 1);
157
2.67k
158
2.67k
    _runtime_filter_partition_pruned_range_counter =
159
2.67k
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
160
                                   "RuntimeFilterPartitionPrunedRangeNum", TUnit::UNIT, 1);
161
2.67k
162
2.67k
    _file_cache_statistics.reset(new io::FileCacheStatistics());
163
2.67k
    _file_reader_stats.reset(new io::FileReaderStats());
164
165
2.67k
    RETURN_IF_ERROR(_init_io_ctx());
166
2.67k
    _io_ctx->file_cache_stats = _file_cache_statistics.get();
167
    _io_ctx->file_reader_stats = _file_reader_stats.get();
168
2.67k
    _io_ctx->is_disposable = _state->query_options().disable_file_cache;
169
2.67k
170
2.67k
    if (_is_load) {
171
2.67k
        _src_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
172
                                              std::vector<TupleId>({_input_tuple_desc->id()})));
173
2.67k
        // prepare pre filters
174
2.15k
        if (_params->__isset.pre_filter_exprs_list) {
175
2.15k
            RETURN_IF_ERROR(doris::VExpr::create_expr_trees(_params->pre_filter_exprs_list,
176
                                                            _pre_conjunct_ctxs));
177
2.15k
        } else if (_params->__isset.pre_filter_exprs) {
178
4
            VExprContextSPtr context;
179
4
            RETURN_IF_ERROR(doris::VExpr::create_expr_tree(_params->pre_filter_exprs, context));
180
2.15k
            _pre_conjunct_ctxs.emplace_back(context);
181
0
        }
182
0
183
0
        for (auto& conjunct : _pre_conjunct_ctxs) {
184
0
            RETURN_IF_ERROR(conjunct->prepare(_state, *_src_row_desc));
185
            RETURN_IF_ERROR(conjunct->open(_state));
186
2.15k
        }
187
5
188
5
        _dest_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
189
5
                                               std::vector<TupleId>({_output_tuple_desc->id()})));
190
    }
191
2.15k
192
2.15k
    _default_val_row_desc.reset(
193
2.15k
            new RowDescriptor(_state->desc_tbl(), std::vector<TupleId>({_real_tuple_desc->id()})));
194
195
2.67k
    return Status::OK();
196
2.67k
}
197
198
2.67k
// check if the expr is a partition pruning expr
199
2.67k
bool FileScanner::_check_partition_prune_expr(const VExprSPtr& expr) {
200
    if (expr->is_slot_ref()) {
201
        auto* slot_ref = static_cast<VSlotRef*>(expr.get());
202
0
        return _partition_slot_index_map.find(slot_ref->slot_id()) !=
203
0
               _partition_slot_index_map.end();
204
0
    }
205
0
    if (expr->is_literal()) {
206
0
        return true;
207
0
    }
208
0
    return std::ranges::all_of(expr->children(), [this](const auto& child) {
209
0
        return _check_partition_prune_expr(child);
210
0
    });
211
0
}
212
0
213
0
void FileScanner::_init_runtime_filter_partition_prune_ctxs() {
214
0
    _runtime_filter_partition_prune_ctxs.clear();
215
    for (auto& conjunct : _conjuncts) {
216
6
        auto impl = conjunct->root()->get_impl();
217
6
        // If impl is not null, which means this a conjuncts from runtime filter.
218
6
        auto expr = impl ? impl : conjunct->root();
219
0
        if (_check_partition_prune_expr(expr)) {
220
            _runtime_filter_partition_prune_ctxs.emplace_back(conjunct);
221
0
        }
222
0
    }
223
0
}
224
0
225
0
void FileScanner::_init_runtime_filter_partition_prune_block() {
226
6
    // init block with empty column
227
    for (auto const* slot_desc : _real_tuple_desc->slots()) {
228
6
        _runtime_filter_partition_prune_block.insert(
229
                ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
230
22
                                      slot_desc->get_data_type_ptr(), slot_desc->col_name()));
231
22
    }
232
22
}
233
22
234
22
Status FileScanner::_process_runtime_filters_partition_prune(bool& can_filter_all) {
235
6
    SCOPED_TIMER(_runtime_filter_partition_prune_timer);
236
    if (_runtime_filter_partition_prune_ctxs.empty() || _partition_col_descs.empty()) {
237
6
        return Status::OK();
238
6
    }
239
6
    size_t partition_value_column_size = 1;
240
6
241
6
    // 1. Get partition key values to string columns.
242
0
    std::unordered_map<SlotId, MutableColumnPtr> partition_slot_id_to_column;
243
    for (auto const& partition_col_desc : _partition_col_descs) {
244
        const auto& [partition_value, partition_slot_desc] = partition_col_desc.second;
245
0
        auto data_type = partition_slot_desc->get_data_type_ptr();
246
0
        auto test_serde = data_type->get_serde();
247
0
        auto partition_value_column = data_type->create_column();
248
0
        auto* col_ptr = static_cast<IColumn*>(partition_value_column.get());
249
0
        Slice slice(partition_value.data(), partition_value.size());
250
0
        uint64_t num_deserialized = 0;
251
0
        DataTypeSerDe::FormatOptions options {};
252
0
        if (_partition_value_is_null.contains(partition_slot_desc->col_name())) {
253
0
            // for iceberg/paimon table
254
0
            // NOTICE: column is always be nullable for iceberg/paimon table now
255
0
            DCHECK(data_type->is_nullable());
256
            test_serde = test_serde->get_nested_serdes()[0];
257
            auto* null_column = assert_cast<ColumnNullable*>(col_ptr);
258
0
            if (_partition_value_is_null[partition_slot_desc->col_name()]) {
259
0
                null_column->insert_many_defaults(partition_value_column_size);
260
0
            } else {
261
0
                // If the partition value is not null, we set null map to 0 and deserialize it normally.
262
0
                null_column->get_null_map_column().insert_many_vals(0, partition_value_column_size);
263
0
                RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
264
                        null_column->get_nested_column(), slice, partition_value_column_size,
265
0
                        &num_deserialized, options));
266
0
            }
267
0
        } else {
268
0
            // for hive/hudi table, the null value is set as "\\N"
269
0
            // TODO: this will be unified as iceberg/paimon table in the future
270
0
            RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
271
                    *col_ptr, slice, partition_value_column_size, &num_deserialized, options));
272
        }
273
0
274
0
        partition_slot_id_to_column[partition_slot_desc->id()] = std::move(partition_value_column);
275
0
    }
276
277
0
    // 2. Fill _runtime_filter_partition_prune_block from the partition column, then execute conjuncts and filter block.
278
0
    // 2.1 Fill _runtime_filter_partition_prune_block from the partition column to match the conjuncts executing.
279
    size_t index = 0;
280
    bool first_column_filled = false;
281
    for (auto const* slot_desc : _real_tuple_desc->slots()) {
282
0
        if (partition_slot_id_to_column.find(slot_desc->id()) !=
283
0
            partition_slot_id_to_column.end()) {
284
0
            auto data_type = slot_desc->get_data_type_ptr();
285
0
            auto partition_value_column = std::move(partition_slot_id_to_column[slot_desc->id()]);
286
0
            if (data_type->is_nullable()) {
287
0
                _runtime_filter_partition_prune_block.insert(
288
0
                        index, ColumnWithTypeAndName(
289
0
                                       ColumnNullable::create(
290
0
                                               std::move(partition_value_column),
291
0
                                               ColumnUInt8::create(partition_value_column_size, 0)),
292
0
                                       data_type, slot_desc->col_name()));
293
0
            } else {
294
0
                _runtime_filter_partition_prune_block.insert(
295
0
                        index, ColumnWithTypeAndName(std::move(partition_value_column), data_type,
296
0
                                                     slot_desc->col_name()));
297
0
            }
298
0
            if (index == 0) {
299
0
                first_column_filled = true;
300
0
            }
301
0
        }
302
0
        index++;
303
0
    }
304
0
305
0
    // 2.2 Execute conjuncts.
306
0
    if (!first_column_filled) {
307
        // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
308
        // The following process may be tricky and time-consuming, but we have no other way.
309
0
        _runtime_filter_partition_prune_block.get_by_position(0).column->assume_mutable()->resize(
310
                partition_value_column_size);
311
    }
312
0
    IColumn::Filter result_filter(_runtime_filter_partition_prune_block.rows(), 1);
313
0
    RETURN_IF_ERROR(VExprContext::execute_conjuncts(_runtime_filter_partition_prune_ctxs, nullptr,
314
0
                                                    &_runtime_filter_partition_prune_block,
315
0
                                                    &result_filter, &can_filter_all));
316
0
    return Status::OK();
317
0
}
318
0
319
0
Status FileScanner::_process_conjuncts() {
320
0
    _slot_id_to_filter_conjuncts.clear();
321
    _not_single_slot_filter_conjuncts.clear();
322
1
    for (auto& conjunct : _push_down_conjuncts) {
323
1
        auto impl = conjunct->root()->get_impl();
324
1
        // If impl is not null, which means this a conjuncts from runtime filter.
325
1
        auto cur_expr = impl ? impl : conjunct->root();
326
1
327
        std::vector<int> slot_ids;
328
1
        _get_slot_ids(cur_expr.get(), &slot_ids);
329
        if (slot_ids.empty()) {
330
1
            _not_single_slot_filter_conjuncts.emplace_back(conjunct);
331
1
            continue;
332
1
        }
333
0
        bool single_slot = true;
334
0
        for (int i = 1; i < slot_ids.size(); i++) {
335
0
            if (slot_ids[i] != slot_ids[0]) {
336
1
                single_slot = false;
337
1
                break;
338
0
            }
339
0
        }
340
0
        if (single_slot) {
341
0
            SlotId slot_id = slot_ids[0];
342
0
            _slot_id_to_filter_conjuncts[slot_id].emplace_back(conjunct);
343
1
        } else {
344
1
            _not_single_slot_filter_conjuncts.emplace_back(conjunct);
345
1
        }
346
1
    }
347
0
    return Status::OK();
348
0
}
349
1
350
1
Status FileScanner::_process_late_arrival_conjuncts() {
351
1
    if (_push_down_conjuncts.size() < _conjuncts.size()) {
352
        _push_down_conjuncts = _conjuncts;
353
178
        _conjuncts.clear();
354
178
        RETURN_IF_ERROR(_process_conjuncts());
355
1
    }
356
1
    if (_applied_rf_num == _total_rf_num) {
357
1
        _local_state->scanner_profile()->add_info_string("ApplyAllRuntimeFilters", "True");
358
1
    }
359
178
    return Status::OK();
360
178
}
361
178
362
178
void FileScanner::_get_slot_ids(VExpr* expr, std::vector<int>* slot_ids) {
363
178
    for (auto& child_expr : expr->children()) {
364
        if (child_expr->is_slot_ref()) {
365
1
            VSlotRef* slot_ref = reinterpret_cast<VSlotRef*>(child_expr.get());
366
1
            SlotDescriptor* slot_desc = _state->desc_tbl().get_slot_descriptor(slot_ref->slot_id());
367
1
            slot_desc->set_is_predicate(true);
368
1
            slot_ids->emplace_back(slot_ref->slot_id());
369
1
        } else {
370
1
            _get_slot_ids(child_expr.get(), slot_ids);
371
1
        }
372
1
    }
373
0
}
374
0
375
1
Status FileScanner::_open_impl(RuntimeState* state) {
376
1
    RETURN_IF_CANCELLED(state);
377
    RETURN_IF_ERROR(Scanner::_open_impl(state));
378
2.67k
    RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range));
379
2.67k
    if (_first_scan_range) {
380
2.67k
        RETURN_IF_ERROR(_init_expr_ctxes());
381
2.67k
        if (_state->query_options().enable_runtime_filter_partition_prune &&
382
2.67k
            !_partition_slot_index_map.empty()) {
383
2.66k
            _init_runtime_filter_partition_prune_ctxs();
384
2.66k
            _init_runtime_filter_partition_prune_block();
385
2.66k
        }
386
6
    } else {
387
6
        // there's no scan range in split source. stop scanner directly.
388
6
        _scanner_eof = true;
389
2.66k
    }
390
391
2
    return Status::OK();
392
2
}
393
394
2.67k
Status FileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
395
2.67k
    Status st = _get_block_wrapped(state, block, eof);
396
397
11.1k
    if (!st.ok()) {
398
11.1k
        // add cur path in error msg for easy debugging
399
        return std::move(st.append(". cur path: " + get_current_scan_range_name()));
400
11.1k
    }
401
    return st;
402
14
}
403
14
404
11.0k
// For query:
405
11.1k
//                              [exist cols]  [non-exist cols]  [col from path]  input  output
406
//                              A     B    C  D                 E
407
// _init_src_block              x     x    x  x                 x                -      x
408
// get_next_block               x     x    x  -                 -                -      x
409
// _cast_to_input_block         -     -    -  -                 -                -      -
410
// _fill_columns_from_path      -     -    -  -                 x                -      x
411
// _fill_missing_columns        -     -    -  x                 -                -      x
412
// _convert_to_output_block     -     -    -  -                 -                -      -
413
//
414
// For load:
415
//                              [exist cols]  [non-exist cols]  [col from path]  input  output
416
//                              A     B    C  D                 E
417
// _init_src_block              x     x    x  x                 x                x      -
418
// get_next_block               x     x    x  -                 -                x      -
419
// _cast_to_input_block         x     x    x  -                 -                x      -
420
// _fill_columns_from_path      -     -    -  -                 x                x      -
421
// _fill_missing_columns        -     -    -  x                 -                x      -
422
// _convert_to_output_block     -     -    -  -                 -                -      x
423
Status FileScanner::_get_block_wrapped(RuntimeState* state, Block* block, bool* eof) {
424
    do {
425
        RETURN_IF_CANCELLED(state);
426
11.1k
        if (_cur_reader == nullptr || _cur_reader_eof) {
427
11.1k
            // The file may not exist because the file list is got from meta cache,
428
11.1k
            // And the file may already be removed from storage.
429
11.1k
            // Just ignore not found files.
430
            Status st = _get_next_reader();
431
            if (st.is<ErrorCode::NOT_FOUND>() && config::ignore_not_found_file_in_external_table) {
432
                _cur_reader_eof = true;
433
5.33k
                COUNTER_UPDATE(_not_found_file_counter, 1);
434
5.33k
                continue;
435
0
            } else if (st.is<ErrorCode::END_OF_FILE>()) {
436
0
                _cur_reader_eof = true;
437
0
                COUNTER_UPDATE(_fully_skipped_file_counter, 1);
438
5.33k
                continue;
439
0
            } else if (!st) {
440
0
                return st;
441
0
            }
442
5.33k
        }
443
2
444
2
        if (_scanner_eof) {
445
5.33k
            *eof = true;
446
            return Status::OK();
447
11.1k
        }
448
2.65k
449
2.65k
        // Init src block for load job based on the data file schema (e.g. parquet)
450
2.65k
        // For query job, simply set _src_block_ptr to block.
451
        size_t read_rows = 0;
452
        RETURN_IF_ERROR(_init_src_block(block));
453
        {
454
8.45k
            SCOPED_TIMER(_get_block_timer);
455
8.45k
456
8.45k
            // Read next block.
457
8.45k
            // Some of column in block may not be filled (column not exist in file)
458
            RETURN_IF_ERROR(
459
                    _cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof));
460
        }
461
8.45k
        // use read_rows instead of _src_block_ptr->rows(), because the first column of _src_block_ptr
462
8.45k
        // may not be filled after calling `get_next_block()`, so _src_block_ptr->rows() may return wrong result.
463
8.45k
        if (read_rows > 0) {
464
            if ((!_cur_reader->count_read_rows()) && _io_ctx) {
465
                _io_ctx->file_reader_stats->read_rows += read_rows;
466
8.44k
            }
467
5.86k
            // If the push_down_agg_type is COUNT, no need to do the rest,
468
5.54k
            // because we only save a number in block.
469
5.54k
            if (_get_push_down_agg_type() != TPushAggOp::type::COUNT) {
470
                // Convert the src block columns type to string in-place.
471
                RETURN_IF_ERROR(_cast_to_input_block(block));
472
5.86k
                // FileReader can fill partition and missing columns itself
473
                if (!_cur_reader->fill_all_columns()) {
474
5.86k
                    // Fill rows in src block with partition columns from path. (e.g. Hive partition columns)
475
                    RETURN_IF_ERROR(_fill_columns_from_path(read_rows));
476
5.86k
                    // Fill columns not exist in file with null or default value
477
                    RETURN_IF_ERROR(_fill_missing_columns(read_rows));
478
5.56k
                }
479
                // Apply _pre_conjunct_ctxs to filter src block.
480
5.56k
                RETURN_IF_ERROR(_pre_filter_src_block());
481
5.56k
                // Convert src block to output block (dest block), string to dest data type and apply filters.
482
                RETURN_IF_ERROR(_convert_to_output_block(block));
483
5.86k
                // Truncate char columns or varchar columns if size is smaller than file columns
484
                // or not found in the file column schema.
485
5.86k
                RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block));
486
            }
487
        }
488
5.86k
        break;
489
5.86k
    } while (true);
490
5.86k
491
8.44k
    // Update filtered rows and unselected rows for load, reset counter.
492
8.44k
    // {
493
    //     state->update_num_rows_load_filtered(_counter.num_rows_filtered);
494
    //     state->update_num_rows_load_unselected(_counter.num_rows_unselected);
495
    //     _reset_counter();
496
    // }
497
    return Status::OK();
498
}
499
500
8.44k
/**
501
11.1k
 * Check whether there are complex types in parquet/orc reader in broker/stream load.
502
 * Broker/stream load will cast any type as string type, and complex types will be casted wrong.
503
 * This is a temporary method, and will be replaced by tvf.
504
 */
505
Status FileScanner::_check_output_block_types() {
506
    if (_is_load) {
507
        TFileFormatType::type format_type = _params->format_type;
508
7.28k
        if (format_type == TFileFormatType::FORMAT_PARQUET ||
509
7.28k
            format_type == TFileFormatType::FORMAT_ORC) {
510
7.28k
            for (auto slot : _output_tuple_desc->slots()) {
511
7.28k
                if (is_complex_type(slot->type()->get_primitive_type())) {
512
7.28k
                    return Status::InternalError(
513
1.63k
                            "Parquet/orc doesn't support complex types in broker/stream load, "
514
1.63k
                            "please use tvf(table value function) to insert complex types.");
515
0
                }
516
0
            }
517
0
        }
518
0
    }
519
1.63k
    return Status::OK();
520
88
}
521
7.28k
522
7.28k
Status FileScanner::_init_src_block(Block* block) {
523
7.28k
    if (!_is_load) {
524
        _src_block_ptr = block;
525
8.45k
526
8.45k
        // Build name to index map only once on first call
527
1.16k
        if (_src_block_name_to_idx.empty()) {
528
            _src_block_name_to_idx = block->get_name_to_pos_map();
529
        }
530
1.16k
        return Status::OK();
531
542
    }
532
542
    RETURN_IF_ERROR(_check_output_block_types());
533
1.16k
534
1.16k
    // if (_src_block_init) {
535
7.28k
    //     _src_block.clear_column_data();
536
    //     _src_block_ptr = &_src_block;
537
    //     return Status::OK();
538
    // }
539
540
    _src_block.clear();
541
    uint32_t idx = 0;
542
    // slots in _input_tuple_desc contains all slots describe in load statement, eg:
543
7.28k
    // -H "columns: k1, k2, tmp1, k3 = tmp1 + 1"
544
7.28k
    // _input_tuple_desc will contains: k1, k2, tmp1
545
    // and some of them are from file, such as k1 and k2, and some of them may not exist in file, such as tmp1
546
    // _input_tuple_desc also contains columns from path
547
    for (auto& slot : _input_tuple_desc->slots()) {
548
        DataTypePtr data_type;
549
        auto it = _slot_lower_name_to_col_type.find(slot->col_name());
550
80.8k
        if (slot->is_skip_bitmap_col()) {
551
80.8k
            _skip_bitmap_col_idx = idx;
552
80.8k
        }
553
80.8k
        if (_params->__isset.sequence_map_col) {
554
314
            if (_params->sequence_map_col == slot->col_name()) {
555
314
                _sequence_map_col_uid = slot->col_unique_id();
556
80.8k
            }
557
1.25k
        }
558
198
        data_type =
559
198
                it == _slot_lower_name_to_col_type.end() ? slot->type() : make_nullable(it->second);
560
1.25k
        MutableColumnPtr data_column = data_type->create_column();
561
80.8k
        _src_block.insert(
562
80.8k
                ColumnWithTypeAndName(std::move(data_column), data_type, slot->col_name()));
563
80.8k
        _src_block_name_to_idx.emplace(slot->col_name(), idx++);
564
80.8k
    }
565
80.8k
    if (_params->__isset.sequence_map_col) {
566
80.8k
        for (const auto& slot : _output_tuple_desc->slots()) {
567
80.8k
            // When the target table has seqeunce map column, _input_tuple_desc will not contains __DORIS_SEQUENCE_COL__,
568
7.28k
            // so we should get its column unique id from _output_tuple_desc
569
1.76k
            if (slot->is_sequence_col()) {
570
                _sequence_col_uid = slot->col_unique_id();
571
            }
572
1.76k
        }
573
206
    }
574
206
    _src_block_ptr = &_src_block;
575
1.76k
    _src_block_init = true;
576
242
    return Status::OK();
577
7.28k
}
578
7.28k
579
7.28k
Status FileScanner::_cast_to_input_block(Block* block) {
580
7.28k
    if (!_is_load) {
581
        return Status::OK();
582
5.86k
    }
583
5.86k
    SCOPED_TIMER(_cast_to_input_block_timer);
584
721
    // cast primitive type(PT0) to primitive type(PT1)
585
721
    uint32_t idx = 0;
586
5.14k
    for (auto& slot_desc : _input_tuple_desc->slots()) {
587
        if (_slot_lower_name_to_col_type.find(slot_desc->col_name()) ==
588
5.14k
            _slot_lower_name_to_col_type.end()) {
589
62.7k
            // skip columns which does not exist in file
590
62.7k
            continue;
591
62.7k
        }
592
        auto& arg = _src_block_ptr->get_by_position(_src_block_name_to_idx[slot_desc->col_name()]);
593
336
        auto return_type = slot_desc->get_data_type_ptr();
594
336
        // remove nullable here, let the get_function decide whether nullable
595
62.4k
        auto data_type = get_data_type_with_default_argument(remove_nullable(return_type));
596
62.4k
        ColumnsWithTypeAndName arguments {
597
                arg, {data_type->create_column(), data_type, slot_desc->col_name()}};
598
62.4k
        auto func_cast =
599
62.4k
                SimpleFunctionFactory::instance().get_function("CAST", arguments, return_type, {});
600
62.4k
        if (!func_cast) {
601
62.4k
            return Status::InternalError("Function CAST[arg={}, col name={}, return={}] not found!",
602
62.4k
                                         arg.type->get_name(), slot_desc->col_name(),
603
62.4k
                                         return_type->get_name());
604
0
        }
605
0
        idx = _src_block_name_to_idx[slot_desc->col_name()];
606
0
        DCHECK(_state != nullptr);
607
0
        auto ctx = FunctionContext::create_context(_state, {}, {});
608
62.4k
        RETURN_IF_ERROR(
609
62.4k
                func_cast->execute(ctx.get(), *_src_block_ptr, {idx}, idx, arg.column->size()));
610
62.4k
        _src_block_ptr->get_by_position(idx).type = std::move(return_type);
611
62.4k
    }
612
62.4k
    return Status::OK();
613
62.4k
}
614
62.4k
615
5.14k
Status FileScanner::_fill_columns_from_path(size_t rows) {
616
5.14k
    if (!_fill_partition_from_path) {
617
        return Status::OK();
618
5.56k
    }
619
5.56k
    DataTypeSerDe::FormatOptions _text_formatOptions;
620
0
    for (auto& kv : _partition_col_descs) {
621
0
        auto doris_column =
622
5.56k
                _src_block_ptr->get_by_position(_src_block_name_to_idx[kv.first]).column;
623
5.56k
        // _src_block_ptr points to a mutable block created by this class itself, so const_cast can be used here.
624
7
        IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
625
7
        auto& [value, slot_desc] = kv.second;
626
        auto _text_serde = slot_desc->get_data_type_ptr()->get_serde();
627
7
        Slice slice(value.data(), value.size());
628
7
        uint64_t num_deserialized = 0;
629
7
        if (_text_serde->deserialize_column_from_fixed_json(*col_ptr, slice, rows,
630
7
                                                            &num_deserialized,
631
7
                                                            _text_formatOptions) != Status::OK()) {
632
7
            return Status::InternalError("Failed to fill partition column: {}={}",
633
7
                                         slot_desc->col_name(), value);
634
7
        }
635
0
        if (num_deserialized != rows) {
636
0
            return Status::InternalError(
637
0
                    "Failed to fill partition column: {}={} ."
638
7
                    "Number of rows expected to be written : {}, number of rows actually written : "
639
0
                    "{}",
640
0
                    slot_desc->col_name(), value, num_deserialized, rows);
641
0
        }
642
0
    }
643
0
    return Status::OK();
644
0
}
645
7
646
5.56k
Status FileScanner::_fill_missing_columns(size_t rows) {
647
5.56k
    if (_missing_cols.empty()) {
648
        return Status::OK();
649
5.56k
    }
650
5.56k
651
5.56k
    SCOPED_TIMER(_fill_missing_columns_timer);
652
5.56k
    for (auto& kv : _missing_col_descs) {
653
        if (kv.second == nullptr) {
654
0
            // no default column, fill with null
655
0
            auto mutable_column = _src_block_ptr->get_by_position(_src_block_name_to_idx[kv.first])
656
0
                                          .column->assume_mutable();
657
            auto* nullable_column = static_cast<ColumnNullable*>(mutable_column.get());
658
0
            nullable_column->insert_many_defaults(rows);
659
0
        } else {
660
0
            // fill with default value
661
0
            auto& ctx = kv.second;
662
0
            ColumnPtr result_column_ptr;
663
            // PT1 => dest primitive type
664
0
            RETURN_IF_ERROR(ctx->execute(_src_block_ptr, result_column_ptr));
665
0
            if (result_column_ptr->use_count() == 1) {
666
                // call resize because the first column of _src_block_ptr may not be filled by reader,
667
0
                // so _src_block_ptr->rows() may return wrong result, cause the column created by `ctx->execute()`
668
0
                // has only one row.
669
                auto mutable_column = result_column_ptr->assume_mutable();
670
                mutable_column->resize(rows);
671
                // result_column_ptr maybe a ColumnConst, convert it to a normal column
672
0
                result_column_ptr = result_column_ptr->convert_to_full_column_if_const();
673
0
                auto origin_column_type =
674
                        _src_block_ptr->get_by_position(_src_block_name_to_idx[kv.first]).type;
675
0
                bool is_nullable = origin_column_type->is_nullable();
676
0
                if (!_src_block_name_to_idx.contains(kv.first)) {
677
0
                    return Status::InternalError("Column {} not found in src block {}", kv.first,
678
0
                                                 _src_block_ptr->dump_structure());
679
0
                }
680
0
                _src_block_ptr->replace_by_position(
681
0
                        _src_block_name_to_idx[kv.first],
682
0
                        is_nullable ? make_nullable(result_column_ptr) : result_column_ptr);
683
0
            }
684
0
        }
685
0
    }
686
0
    return Status::OK();
687
0
}
688
0
689
0
Status FileScanner::_pre_filter_src_block() {
690
0
    if (!_is_load) {
691
        return Status::OK();
692
5.86k
    }
693
5.86k
    if (!_pre_conjunct_ctxs.empty()) {
694
721
        SCOPED_TIMER(_pre_filter_timer);
695
721
        auto origin_column_num = _src_block_ptr->columns();
696
5.14k
        auto old_rows = _src_block_ptr->rows();
697
28
        RETURN_IF_ERROR(
698
28
                VExprContext::filter_block(_pre_conjunct_ctxs, _src_block_ptr, origin_column_num));
699
28
        _counter.num_rows_unselected += old_rows - _src_block_ptr->rows();
700
28
    }
701
28
    return Status::OK();
702
28
}
703
28
704
5.14k
Status FileScanner::_convert_to_output_block(Block* block) {
705
5.14k
    if (!_is_load) {
706
        return Status::OK();
707
5.86k
    }
708
5.86k
    SCOPED_TIMER(_convert_to_output_block_timer);
709
721
    // The block is passed from scanner context's free blocks,
710
721
    // which is initialized by output columns
711
5.14k
    // so no need to clear it
712
    // block->clear();
713
714
    int ctx_idx = 0;
715
    size_t rows = _src_block_ptr->rows();
716
    auto filter_column = ColumnUInt8::create(rows, 1);
717
5.14k
    auto& filter_map = filter_column->get_data();
718
5.14k
719
5.14k
    // After convert, the column_ptr should be copied into output block.
720
5.14k
    // Can not use block->insert() because it may cause use_count() non-zero bug
721
    MutableBlock mutable_output_block =
722
            VectorizedUtils::build_mutable_mem_reuse_block(block, *_dest_row_desc);
723
    auto& mutable_output_columns = mutable_output_block.mutable_columns();
724
5.14k
725
5.14k
    std::vector<BitmapValue>* skip_bitmaps {nullptr};
726
5.14k
    if (_should_process_skip_bitmap_col()) {
727
        auto* skip_bitmap_nullable_col_ptr =
728
5.14k
                assert_cast<ColumnNullable*>(_src_block_ptr->get_by_position(_skip_bitmap_col_idx)
729
5.14k
                                                     .column->assume_mutable()
730
157
                                                     .get());
731
157
        skip_bitmaps = &(assert_cast<ColumnBitmap*>(
732
157
                                 skip_bitmap_nullable_col_ptr->get_nested_column_ptr().get())
733
157
                                 ->get_data());
734
157
        // NOTE:
735
157
        // - If the table has sequence type column, __DORIS_SEQUENCE_COL__ will be put in _input_tuple_desc, so whether
736
157
        //   __DORIS_SEQUENCE_COL__ will be marked in skip bitmap depends on whether it's specified in that row
737
        // - If the table has sequence map column, __DORIS_SEQUENCE_COL__ will not be put in _input_tuple_desc,
738
        //   so __DORIS_SEQUENCE_COL__ will be ommited if it't specified in a row and will not be marked in skip bitmap.
739
        //   So we should mark __DORIS_SEQUENCE_COL__ in skip bitmap here if the corresponding sequence map column us marked
740
        if (_sequence_map_col_uid != -1) {
741
            for (int j = 0; j < rows; ++j) {
742
                if ((*skip_bitmaps)[j].contains(_sequence_map_col_uid)) {
743
157
                    (*skip_bitmaps)[j].add(_sequence_col_uid);
744
347
                }
745
328
            }
746
137
        }
747
137
    }
748
328
749
19
    // for (auto slot_desc : _output_tuple_desc->slots()) {
750
157
    for (int j = 0; j < mutable_output_columns.size(); ++j) {
751
        auto* slot_desc = _output_tuple_desc->slots()[j];
752
        int dest_index = ctx_idx;
753
73.5k
        ColumnPtr column_ptr;
754
68.4k
755
68.4k
        auto& ctx = _dest_vexpr_ctx[dest_index];
756
68.4k
        // PT1 => dest primitive type
757
        RETURN_IF_ERROR(ctx->execute(_src_block_ptr, column_ptr));
758
68.4k
        // column_ptr maybe a ColumnConst, convert it to a normal column
759
        column_ptr = column_ptr->convert_to_full_column_if_const();
760
68.4k
        DCHECK(column_ptr);
761
762
68.4k
        // because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr
763
68.4k
        // is likely to be nullable
764
        if (LIKELY(column_ptr->is_nullable())) {
765
            const auto* nullable_column = reinterpret_cast<const ColumnNullable*>(column_ptr.get());
766
            for (int i = 0; i < rows; ++i) {
767
68.4k
                if (filter_map[i] && nullable_column->is_null_at(i)) {
768
63.4k
                    // skip checks for non-mentioned columns in flexible partial update
769
176M
                    if (skip_bitmaps == nullptr ||
770
176M
                        !skip_bitmaps->at(i).contains(slot_desc->col_unique_id())) {
771
                        // clang-format off
772
1.79M
                        if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) &&
773
1.79M
                            !_src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index]).column->is_null_at(i)) {
774
                            filter_map[i] = false;
775
1.74M
                            RETURN_IF_ERROR(_state->append_error_msg_to_file(
776
1.74M
                                [&]() -> std::string {
777
5.48k
                                    return _src_block_ptr->dump_one_line(i, _num_of_columns_from_file);
778
5.48k
                                },
779
5.48k
                                [&]() -> std::string {
780
5.48k
                                    auto raw_value =
781
5.48k
                                            _src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index]).column->get_data_at(i);
782
5.48k
                                    std::string raw_string = raw_value.to_string();
783
5.48k
                                    fmt::memory_buffer error_msg;
784
5.48k
                                    fmt::format_to(error_msg,"column({}) value is incorrect while strict mode is {}, src value is {}",
785
5.48k
                                            slot_desc->col_name(), _strict_mode, raw_string);
786
5.48k
                                    return fmt::to_string(error_msg);
787
5.48k
                                }));
788
5.48k
                        } else if (!slot_desc->is_nullable()) {
789
5.48k
                            filter_map[i] = false;
790
5.48k
                            RETURN_IF_ERROR(_state->append_error_msg_to_file(
791
1.74M
                                [&]() -> std::string {
792
638
                                    return _src_block_ptr->dump_one_line(i, _num_of_columns_from_file);
793
638
                                },
794
638
                                [&]() -> std::string {
795
638
                                    fmt::memory_buffer error_msg;
796
638
                                    fmt::format_to(error_msg, "column({}) values is null while columns is not nullable", slot_desc->col_name());
797
638
                                    return fmt::to_string(error_msg);
798
638
                                }));
799
638
                        }
800
638
                        // clang-format on
801
638
                    }
802
638
                }
803
            }
804
1.74M
            if (!slot_desc->is_nullable()) {
805
1.79M
                column_ptr = remove_nullable(column_ptr);
806
176M
            }
807
63.4k
        } else if (slot_desc->is_nullable()) {
808
41.9k
            column_ptr = make_nullable(column_ptr);
809
41.9k
        }
810
63.4k
        mutable_output_columns[j]->insert_range_from(*column_ptr, 0, rows);
811
111
        ctx_idx++;
812
111
    }
813
68.4k
814
68.4k
    // after do the dest block insert operation, clear _src_block to remove the reference of origin column
815
68.4k
    _src_block_ptr->clear();
816
817
    size_t dest_size = block->columns();
818
5.14k
    // do filter
819
    block->insert(ColumnWithTypeAndName(std::move(filter_column), std::make_shared<DataTypeUInt8>(),
820
5.14k
                                        "filter column"));
821
    RETURN_IF_ERROR(Block::filter_block(block, dest_size, dest_size));
822
5.14k
823
5.14k
    _counter.num_rows_filtered += rows - block->rows();
824
5.14k
    return Status::OK();
825
}
826
5.14k
827
5.14k
Status FileScanner::_truncate_char_or_varchar_columns(Block* block) {
828
5.14k
    // Truncate char columns or varchar columns if size is smaller than file columns
829
    // or not found in the file column schema.
830
5.86k
    if (!_state->query_options().truncate_char_or_varchar_columns) {
831
        return Status::OK();
832
    }
833
5.86k
    int idx = 0;
834
5.86k
    for (auto* slot_desc : _real_tuple_desc->slots()) {
835
5.86k
        const auto& type = slot_desc->type();
836
0
        if (type->get_primitive_type() != TYPE_VARCHAR && type->get_primitive_type() != TYPE_CHAR) {
837
0
            ++idx;
838
0
            continue;
839
0
        }
840
0
        auto iter = _source_file_col_name_types.find(slot_desc->col_name());
841
0
        if (iter != _source_file_col_name_types.end()) {
842
0
            const auto file_type_desc = _source_file_col_name_types[slot_desc->col_name()];
843
0
            int l = -1;
844
0
            if (auto* ftype = check_and_get_data_type<DataTypeString>(
845
0
                        remove_nullable(file_type_desc).get())) {
846
0
                l = ftype->len();
847
0
            }
848
0
            if ((assert_cast<const DataTypeString*>(remove_nullable(type).get())->len() > 0) &&
849
0
                (assert_cast<const DataTypeString*>(remove_nullable(type).get())->len() < l ||
850
0
                 l < 0)) {
851
0
                _truncate_char_or_varchar_column(
852
0
                        block, idx,
853
0
                        assert_cast<const DataTypeString*>(remove_nullable(type).get())->len());
854
0
            }
855
0
        } else {
856
0
            _truncate_char_or_varchar_column(
857
0
                    block, idx,
858
0
                    assert_cast<const DataTypeString*>(remove_nullable(type).get())->len());
859
0
        }
860
0
        ++idx;
861
0
    }
862
0
    return Status::OK();
863
0
}
864
0
865
0
// VARCHAR substring(VARCHAR str, INT pos[, INT len])
866
5.86k
void FileScanner::_truncate_char_or_varchar_column(Block* block, int idx, int len) {
867
    auto int_type = std::make_shared<DataTypeInt32>();
868
    uint32_t num_columns_without_result = block->columns();
869
0
    const ColumnNullable* col_nullable =
870
0
            assert_cast<const ColumnNullable*>(block->get_by_position(idx).column.get());
871
0
    const ColumnPtr& string_column_ptr = col_nullable->get_nested_column_ptr();
872
0
    ColumnPtr null_map_column_ptr = col_nullable->get_null_map_column_ptr();
873
0
    block->replace_by_position(idx, std::move(string_column_ptr));
874
0
    block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(1)), int_type,
875
0
                   "const 1"}); // pos is 1
876
0
    block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(len)), int_type,
877
0
                   fmt::format("const {}", len)});                          // len
878
0
    block->insert({nullptr, std::make_shared<DataTypeString>(), "result"}); // result column
879
0
    ColumnNumbers temp_arguments(3);
880
0
    temp_arguments[0] = idx;                            // str column
881
0
    temp_arguments[1] = num_columns_without_result;     // pos
882
0
    temp_arguments[2] = num_columns_without_result + 1; // len
883
0
    uint32_t result_column_id = num_columns_without_result + 2;
884
0
885
0
    SubstringUtil::substring_execute(*block, temp_arguments, result_column_id, block->rows());
886
0
    auto res = ColumnNullable::create(block->get_by_position(result_column_id).column,
887
                                      null_map_column_ptr);
888
0
    block->replace_by_position(idx, std::move(res));
889
0
    Block::erase_useless_column(block, num_columns_without_result);
890
0
}
891
0
892
0
Status FileScanner::_create_row_id_column_iterator() {
893
0
    auto& id_file_map = _state->get_id_file_map();
894
    auto file_id = id_file_map->get_file_mapping_id(
895
13
            std::make_shared<FileMapping>(((FileScanLocalState*)_local_state)->parent_id(),
896
13
                                          _current_range, _should_enable_file_meta_cache()));
897
13
    _row_id_column_iterator_pair.first = std::make_shared<RowIdColumnIteratorV2>(
898
13
            IdManager::ID_VERSION, BackendOptions::get_backend_id(), file_id);
899
13
    return Status::OK();
900
13
}
901
13
902
13
Status FileScanner::_get_next_reader() {
903
13
    while (true) {
904
        if (_cur_reader) {
905
5.33k
            _cur_reader->collect_profile_before_close();
906
5.33k
            RETURN_IF_ERROR(_cur_reader->close());
907
5.33k
            _state->update_num_finished_scan_range(1);
908
2.65k
        }
909
2.65k
        _cur_reader.reset(nullptr);
910
2.65k
        _src_block_init = false;
911
2.65k
        bool has_next = _first_scan_range;
912
5.33k
        if (!_first_scan_range) {
913
5.33k
            RETURN_IF_ERROR(_split_source->get_next(&has_next, &_current_range));
914
5.33k
        }
915
5.33k
        _first_scan_range = false;
916
2.66k
        if (!has_next || _should_stop) {
917
2.66k
            _scanner_eof = true;
918
5.33k
            return Status::OK();
919
5.33k
        }
920
2.65k
921
2.65k
        const TFileRangeDesc& range = _current_range;
922
2.65k
        _current_range_path = range.path;
923
924
2.67k
        if (!_partition_slot_descs.empty()) {
925
2.67k
            // we need get partition columns first for runtime filter partition pruning
926
            RETURN_IF_ERROR(_generate_partition_columns());
927
2.67k
928
            if (_state->query_options().enable_runtime_filter_partition_prune) {
929
6
                // if enable_runtime_filter_partition_prune is true, we need to check whether this range can be filtered out
930
                // by runtime filter partition prune
931
6
                if (_push_down_conjuncts.size() < _conjuncts.size()) {
932
                    // there are new runtime filters, need to re-init runtime filter partition pruning ctxs
933
                    _init_runtime_filter_partition_prune_ctxs();
934
6
                }
935
936
0
                bool can_filter_all = false;
937
0
                RETURN_IF_ERROR(_process_runtime_filters_partition_prune(can_filter_all));
938
                if (can_filter_all) {
939
6
                    // this range can be filtered out by runtime filter partition pruning
940
6
                    // so we need to skip this range
941
6
                    COUNTER_UPDATE(_runtime_filter_partition_pruned_range_counter, 1);
942
                    continue;
943
                }
944
0
            }
945
0
        }
946
0
947
6
        // create reader for specific format
948
6
        Status init_status = Status::OK();
949
        TFileFormatType::type format_type = _get_current_format_type();
950
        // for compatibility, this logic is deprecated in 3.1
951
2.67k
        if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params) {
952
2.67k
            if (range.table_format_params.table_format_type == "paimon" &&
953
                !range.table_format_params.paimon_params.__isset.paimon_split) {
954
2.67k
                // use native reader
955
0
                auto format = range.table_format_params.paimon_params.file_format;
956
0
                if (format == "orc") {
957
                    format_type = TFileFormatType::FORMAT_ORC;
958
0
                } else if (format == "parquet") {
959
0
                    format_type = TFileFormatType::FORMAT_PARQUET;
960
0
                } else {
961
0
                    return Status::InternalError("Not supported paimon file format: {}", format);
962
0
                }
963
0
            }
964
0
        }
965
0
966
0
        // JNI reader can only push down column value range
967
0
        bool push_down_predicates = !_is_load && format_type != TFileFormatType::FORMAT_JNI;
968
        bool need_to_get_parsed_schema = false;
969
        switch (format_type) {
970
2.67k
        case TFileFormatType::FORMAT_JNI: {
971
2.67k
            if (range.__isset.table_format_params &&
972
2.67k
                range.table_format_params.table_format_type == "max_compute") {
973
1
                const auto* mc_desc = static_cast<const MaxComputeTableDescriptor*>(
974
1
                        _real_tuple_desc->table_desc());
975
1
                if (!mc_desc->init_status()) {
976
0
                    return mc_desc->init_status();
977
0
                }
978
0
                std::unique_ptr<MaxComputeJniReader> mc_reader = MaxComputeJniReader::create_unique(
979
0
                        mc_desc, range.table_format_params.max_compute_params, _file_slot_descs,
980
0
                        range, _state, _profile);
981
0
                init_status = mc_reader->init_reader();
982
0
                _cur_reader = std::move(mc_reader);
983
0
            } else if (range.__isset.table_format_params &&
984
0
                       range.table_format_params.table_format_type == "paimon") {
985
0
                if (_state->query_options().__isset.enable_paimon_cpp_reader &&
986
1
                    _state->query_options().enable_paimon_cpp_reader) {
987
1
                    auto cpp_reader = PaimonCppReader::create_unique(_file_slot_descs, _state,
988
0
                                                                     _profile, range, _params);
989
0
                    cpp_reader->set_push_down_agg_type(_get_push_down_agg_type());
990
0
                    if (!_is_load && !_push_down_conjuncts.empty()) {
991
0
                        PaimonPredicateConverter predicate_converter(_file_slot_descs, _state);
992
0
                        auto predicate = predicate_converter.build(_push_down_conjuncts);
993
0
                        if (predicate) {
994
0
                            cpp_reader->set_predicate(std::move(predicate));
995
0
                        }
996
0
                    }
997
0
                    init_status = cpp_reader->init_reader();
998
0
                    _cur_reader = std::move(cpp_reader);
999
0
                } else {
1000
0
                    _cur_reader = PaimonJniReader::create_unique(_file_slot_descs, _state, _profile,
1001
0
                                                                 range, _params);
1002
0
                    init_status = ((PaimonJniReader*)(_cur_reader.get()))->init_reader();
1003
0
                }
1004
0
            } else if (range.__isset.table_format_params &&
1005
0
                       range.table_format_params.table_format_type == "hudi") {
1006
0
                _cur_reader = HudiJniReader::create_unique(*_params,
1007
1
                                                           range.table_format_params.hudi_params,
1008
1
                                                           _file_slot_descs, _state, _profile);
1009
0
                init_status = ((HudiJniReader*)_cur_reader.get())->init_reader();
1010
0
1011
0
            } else if (range.__isset.table_format_params &&
1012
0
                       range.table_format_params.table_format_type == "trino_connector") {
1013
1
                _cur_reader = TrinoConnectorJniReader::create_unique(_file_slot_descs, _state,
1014
1
                                                                     _profile, range);
1015
0
                init_status = ((TrinoConnectorJniReader*)(_cur_reader.get()))->init_reader();
1016
0
            } else if (range.__isset.table_format_params &&
1017
0
                       range.table_format_params.table_format_type == "jdbc") {
1018
0
                // Extract jdbc params from table_format_params
1019
1
                std::map<std::string, std::string> jdbc_params(
1020
1
                        range.table_format_params.jdbc_params.begin(),
1021
0
                        range.table_format_params.jdbc_params.end());
1022
0
                _cur_reader = JdbcJniReader::create_unique(_file_slot_descs, _state, _profile,
1023
0
                                                           jdbc_params);
1024
0
                init_status = ((JdbcJniReader*)(_cur_reader.get()))->init_reader();
1025
            }
1026
1
            // Set col_name_to_block_idx for JNI readers to avoid repeated map creation
1027
0
            if (_cur_reader) {
1028
0
                if (auto* jni_reader = dynamic_cast<JniReader*>(_cur_reader.get())) {
1029
0
                    jni_reader->set_col_name_to_block_idx(&_src_block_name_to_idx);
1030
0
                }
1031
1
            }
1032
1
            break;
1033
105
        }
1034
105
        case TFileFormatType::FORMAT_PARQUET: {
1035
105
            auto file_meta_cache_ptr = _should_enable_file_meta_cache()
1036
105
                                               ? ExecEnv::GetInstance()->file_meta_cache()
1037
105
                                               : nullptr;
1038
105
            std::unique_ptr<ParquetReader> parquet_reader = ParquetReader::create_unique(
1039
105
                    _profile, *_params, range, _state->query_options().batch_size,
1040
105
                    &_state->timezone_obj(), _io_ctx.get(), _state, file_meta_cache_ptr,
1041
                    _state->query_options().enable_parquet_lazy_mat);
1042
105
1043
8
            if (_row_id_column_iterator_pair.second != -1) {
1044
8
                RETURN_IF_ERROR(_create_row_id_column_iterator());
1045
8
                parquet_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
1046
            }
1047
1048
            // ATTN: the push down agg type may be set back to NONE,
1049
105
            // see IcebergTableReader::init_row_filters for example.
1050
105
            parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
1051
87
            if (push_down_predicates) {
1052
87
                RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1053
105
            }
1054
            RETURN_IF_ERROR(_init_parquet_reader(std::move(parquet_reader), file_meta_cache_ptr));
1055
105
1056
105
            need_to_get_parsed_schema = true;
1057
105
            break;
1058
102
        }
1059
102
        case TFileFormatType::FORMAT_ORC: {
1060
102
            auto file_meta_cache_ptr = _should_enable_file_meta_cache()
1061
102
                                               ? ExecEnv::GetInstance()->file_meta_cache()
1062
102
                                               : nullptr;
1063
102
            std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
1064
102
                    _profile, _state, *_params, range, _state->query_options().batch_size,
1065
102
                    _state->timezone(), _io_ctx.get(), file_meta_cache_ptr,
1066
102
                    _state->query_options().enable_orc_lazy_mat);
1067
5
            if (_row_id_column_iterator_pair.second != -1) {
1068
5
                RETURN_IF_ERROR(_create_row_id_column_iterator());
1069
5
                orc_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
1070
            }
1071
102
1072
102
            orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
1073
91
            if (push_down_predicates) {
1074
91
                RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1075
102
            }
1076
            RETURN_IF_ERROR(_init_orc_reader(std::move(orc_reader), file_meta_cache_ptr));
1077
102
1078
102
            need_to_get_parsed_schema = true;
1079
102
            break;
1080
1.94k
        }
1081
1.94k
        case TFileFormatType::FORMAT_CSV_PLAIN:
1082
1.94k
        case TFileFormatType::FORMAT_CSV_GZ:
1083
1.94k
        case TFileFormatType::FORMAT_CSV_BZ2:
1084
1.94k
        case TFileFormatType::FORMAT_CSV_LZ4FRAME:
1085
1.94k
        case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
1086
1.94k
        case TFileFormatType::FORMAT_CSV_LZOP:
1087
1.94k
        case TFileFormatType::FORMAT_CSV_DEFLATE:
1088
2.02k
        case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
1089
2.02k
        case TFileFormatType::FORMAT_PROTO: {
1090
2.02k
            auto reader = CsvReader::create_unique(_state, _profile, &_counter, *_params, range,
1091
                                                   _file_slot_descs, _io_ctx.get());
1092
2.02k
1093
2.02k
            init_status = reader->init_reader(_is_load);
1094
2.02k
            _cur_reader = std::move(reader);
1095
1.94k
            break;
1096
3
        }
1097
3
        case TFileFormatType::FORMAT_TEXT: {
1098
3
            auto reader = TextReader::create_unique(_state, _profile, &_counter, *_params, range,
1099
3
                                                    _file_slot_descs, _io_ctx.get());
1100
3
            init_status = reader->init_reader(_is_load);
1101
3
            _cur_reader = std::move(reader);
1102
1.94k
            break;
1103
424
        }
1104
424
        case TFileFormatType::FORMAT_JSON: {
1105
424
            _cur_reader =
1106
424
                    NewJsonReader::create_unique(_state, _profile, &_counter, *_params, range,
1107
424
                                                 _file_slot_descs, &_scanner_eof, _io_ctx.get());
1108
424
            init_status = ((NewJsonReader*)(_cur_reader.get()))
1109
424
                                  ->init_reader(_col_default_value_ctx, _is_load);
1110
1.94k
            break;
1111
0
        }
1112
0
1113
0
        case TFileFormatType::FORMAT_WAL: {
1114
0
            _cur_reader = WalReader::create_unique(_state);
1115
            init_status = ((WalReader*)(_cur_reader.get()))->init_reader(_output_tuple_desc);
1116
0
            break;
1117
0
        }
1118
0
        case TFileFormatType::FORMAT_NATIVE: {
1119
0
            auto reader =
1120
0
                    NativeReader::create_unique(_profile, *_params, range, _io_ctx.get(), _state);
1121
1.94k
            init_status = reader->init_reader();
1122
0
            _cur_reader = std::move(reader);
1123
0
            need_to_get_parsed_schema = false;
1124
0
            break;
1125
0
        }
1126
1.94k
        case TFileFormatType::FORMAT_ARROW: {
1127
3
            if (range.__isset.table_format_params &&
1128
3
                range.table_format_params.table_format_type == "remote_doris") {
1129
3
                _cur_reader =
1130
3
                        RemoteDorisReader::create_unique(_file_slot_descs, _state, _profile, range);
1131
3
                init_status = ((RemoteDorisReader*)(_cur_reader.get()))->init_reader();
1132
3
                if (_cur_reader) {
1133
3
                    static_cast<RemoteDorisReader*>(_cur_reader.get())
1134
1.94k
                            ->set_col_name_to_block_idx(&_src_block_name_to_idx);
1135
10
                }
1136
10
            } else {
1137
10
                _cur_reader =
1138
0
                        ArrowStreamReader::create_unique(_state, _profile, &_counter, *_params,
1139
0
                                                         range, _file_slot_descs, _io_ctx.get());
1140
0
                init_status = ((ArrowStreamReader*)(_cur_reader.get()))->init_reader();
1141
0
            }
1142
0
            break;
1143
0
        }
1144
0
        default:
1145
10
            return Status::NotSupported("Not supported create reader for file format: {}.",
1146
10
                                        to_string(_params->format_type));
1147
10
        }
1148
10
1149
10
        if (_cur_reader == nullptr) {
1150
10
            return Status::NotSupported(
1151
10
                    "Not supported create reader for table format: {} / file format: {}.",
1152
1.94k
                    range.__isset.table_format_params ? range.table_format_params.table_format_type
1153
0
                                                      : "NotSet",
1154
0
                    to_string(_params->format_type));
1155
0
        }
1156
2.67k
        COUNTER_UPDATE(_file_counter, 1);
1157
        // The FileScanner for external table may try to open not exist files,
1158
2.67k
        // Because FE file cache for external table may out of date.
1159
1
        // So, NOT_FOUND for FileScanner is not a fail case.
1160
1
        // Will remove this after file reader refactor.
1161
1
        if (init_status.is<END_OF_FILE>()) {
1162
1
            COUNTER_UPDATE(_empty_file_counter, 1);
1163
1
            continue;
1164
1
        } else if (init_status.is<ErrorCode::NOT_FOUND>()) {
1165
2.67k
            if (config::ignore_not_found_file_in_external_table) {
1166
                COUNTER_UPDATE(_not_found_file_counter, 1);
1167
                continue;
1168
            }
1169
            return Status::InternalError("failed to find reader, err: {}", init_status.to_string());
1170
2.67k
        } else if (!init_status.ok()) {
1171
0
            return Status::InternalError("failed to init reader, err: {}", init_status.to_string());
1172
0
        }
1173
2.67k
1174
0
        _cur_reader->set_push_down_agg_type(_get_push_down_agg_type());
1175
0
        if (_get_push_down_agg_type() == TPushAggOp::type::COUNT &&
1176
0
            range.__isset.table_format_params &&
1177
0
            range.table_format_params.table_level_row_count >= 0) {
1178
0
            // This is a table level count push down operation, no need to call
1179
2.67k
            // _set_fill_or_truncate_columns.
1180
1
            // in _set_fill_or_truncate_columns, we will use [range.start_offset, end offset]
1181
1
            // to filter the row group. But if this is count push down, the offset is undefined,
1182
            // causing incorrect row group filter and may return empty result.
1183
2.67k
        } else {
1184
2.67k
            Status status = _set_fill_or_truncate_columns(need_to_get_parsed_schema);
1185
2.67k
            if (status.is<END_OF_FILE>()) { // all parquet row groups are filtered
1186
2.67k
                continue;
1187
            } else if (!status.ok()) {
1188
                return Status::InternalError("failed to set_fill_or_truncate_columns, err: {}",
1189
                                             status.to_string());
1190
            }
1191
        }
1192
2.67k
        _cur_reader_eof = false;
1193
2.67k
        break;
1194
2.67k
    }
1195
0
    return Status::OK();
1196
2.67k
}
1197
0
1198
0
Status FileScanner::_init_parquet_reader(std::unique_ptr<ParquetReader>&& parquet_reader,
1199
0
                                         FileMetaCache* file_meta_cache_ptr) {
1200
2.67k
    const TFileRangeDesc& range = _current_range;
1201
2.67k
    Status init_status = Status::OK();
1202
2.67k
1203
2.67k
    phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> slot_id_to_predicates =
1204
2.67k
            _local_state
1205
5.33k
                    ? _local_state->cast<FileScanLocalState>()._slot_id_to_predicates
1206
                    : phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> {};
1207
    if (range.__isset.table_format_params &&
1208
126
        range.table_format_params.table_format_type == "iceberg") {
1209
126
        std::unique_ptr<IcebergParquetReader> iceberg_reader = IcebergParquetReader::create_unique(
1210
126
                std::move(parquet_reader), _profile, _state, *_params, range, _kv_cache,
1211
                _io_ctx.get(), file_meta_cache_ptr);
1212
126
        init_status = iceberg_reader->init_reader(
1213
126
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1214
126
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1215
126
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1216
126
                &_slot_id_to_filter_conjuncts);
1217
126
        _cur_reader = std::move(iceberg_reader);
1218
0
    } else if (range.__isset.table_format_params &&
1219
0
               range.table_format_params.table_format_type == "paimon") {
1220
0
        std::unique_ptr<PaimonParquetReader> paimon_reader = PaimonParquetReader::create_unique(
1221
0
                std::move(parquet_reader), _profile, _state, *_params, range, _kv_cache,
1222
0
                _io_ctx.get(), file_meta_cache_ptr);
1223
0
        init_status = paimon_reader->init_reader(
1224
0
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1225
0
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1226
0
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1227
126
                &_slot_id_to_filter_conjuncts);
1228
126
        RETURN_IF_ERROR(paimon_reader->init_row_filters());
1229
0
        _cur_reader = std::move(paimon_reader);
1230
0
    } else if (range.__isset.table_format_params &&
1231
0
               range.table_format_params.table_format_type == "hudi") {
1232
0
        std::unique_ptr<HudiParquetReader> hudi_reader = HudiParquetReader::create_unique(
1233
0
                std::move(parquet_reader), _profile, _state, *_params, range, _io_ctx.get(),
1234
0
                file_meta_cache_ptr);
1235
0
        init_status = hudi_reader->init_reader(
1236
0
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1237
0
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1238
0
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1239
126
                &_slot_id_to_filter_conjuncts);
1240
126
        _cur_reader = std::move(hudi_reader);
1241
0
    } else if (range.table_format_params.table_format_type == "hive") {
1242
0
        auto hive_reader = HiveParquetReader::create_unique(std::move(parquet_reader), _profile,
1243
0
                                                            _state, *_params, range, _io_ctx.get(),
1244
0
                                                            &_is_file_slot, file_meta_cache_ptr);
1245
0
        init_status = hive_reader->init_reader(
1246
0
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1247
0
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1248
0
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1249
0
                &_slot_id_to_filter_conjuncts);
1250
126
        _cur_reader = std::move(hive_reader);
1251
14
    } else if (range.table_format_params.table_format_type == "tvf") {
1252
14
        const FieldDescriptor* parquet_meta = nullptr;
1253
14
        RETURN_IF_ERROR(parquet_reader->get_file_metadata_schema(&parquet_meta));
1254
14
        DCHECK(parquet_meta != nullptr);
1255
14
1256
14
        // TVF will first `get_parsed_schema` to obtain file information from BE, and FE will convert
1257
14
        // the column names to lowercase (because the query process is case-insensitive),
1258
14
        // so the lowercase file column names are used here to match the read columns.
1259
14
        std::shared_ptr<TableSchemaChangeHelper::Node> tvf_info_node = nullptr;
1260
112
        RETURN_IF_ERROR(TableSchemaChangeHelper::BuildTableInfoUtil::by_parquet_name(
1261
95
                _real_tuple_desc, *parquet_meta, tvf_info_node));
1262
95
        init_status = parquet_reader->init_reader(
1263
95
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1264
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1265
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1266
                &_slot_id_to_filter_conjuncts, tvf_info_node);
1267
        _cur_reader = std::move(parquet_reader);
1268
95
    } else if (_is_load) {
1269
95
        const FieldDescriptor* parquet_meta = nullptr;
1270
95
        RETURN_IF_ERROR(parquet_reader->get_file_metadata_schema(&parquet_meta));
1271
95
        DCHECK(parquet_meta != nullptr);
1272
95
1273
95
        // Load is case-insensitive, so you to match the columns in the file.
1274
95
        std::map<std::string, std::string> file_lower_name_to_native;
1275
95
        for (const auto& parquet_field : parquet_meta->get_fields_schema()) {
1276
95
            file_lower_name_to_native.emplace(doris::to_lower(parquet_field.name),
1277
95
                                              parquet_field.name);
1278
17
        }
1279
17
        auto load_info_node = std::make_shared<TableSchemaChangeHelper::StructNode>();
1280
17
        for (const auto slot : _real_tuple_desc->slots()) {
1281
            if (file_lower_name_to_native.contains(slot->col_name())) {
1282
                load_info_node->add_children(slot->col_name(),
1283
17
                                             file_lower_name_to_native[slot->col_name()],
1284
416
                                             TableSchemaChangeHelper::ConstNode::get_instance());
1285
416
                // For Load, `file_scanner` will create block columns using the file type,
1286
416
                // there is no schema change when reading inside the struct,
1287
416
                // so use `TableSchemaChangeHelper::ConstNode`.
1288
17
            } else {
1289
405
                load_info_node->add_not_exist_children(slot->col_name());
1290
405
            }
1291
404
        }
1292
404
1293
404
        init_status = parquet_reader->init_reader(
1294
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1295
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1296
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1297
404
                &_slot_id_to_filter_conjuncts, load_info_node);
1298
1
        _cur_reader = std::move(parquet_reader);
1299
1
    }
1300
405
1301
    return init_status;
1302
17
}
1303
17
1304
17
Status FileScanner::_init_orc_reader(std::unique_ptr<OrcReader>&& orc_reader,
1305
17
                                     FileMetaCache* file_meta_cache_ptr) {
1306
17
    const TFileRangeDesc& range = _current_range;
1307
17
    Status init_status = Status::OK();
1308
17
1309
    if (range.__isset.table_format_params &&
1310
126
        range.table_format_params.table_format_type == "transactional_hive") {
1311
126
        std::unique_ptr<TransactionalHiveReader> tran_orc_reader =
1312
                TransactionalHiveReader::create_unique(std::move(orc_reader), _profile, _state,
1313
                                                       *_params, range, _io_ctx.get(),
1314
117
                                                       file_meta_cache_ptr);
1315
117
        init_status = tran_orc_reader->init_reader(
1316
117
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
1317
                _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1318
117
                &_slot_id_to_filter_conjuncts);
1319
117
        RETURN_IF_ERROR(tran_orc_reader->init_row_filters());
1320
0
        _cur_reader = std::move(tran_orc_reader);
1321
0
    } else if (range.__isset.table_format_params &&
1322
0
               range.table_format_params.table_format_type == "iceberg") {
1323
0
        std::unique_ptr<IcebergOrcReader> iceberg_reader = IcebergOrcReader::create_unique(
1324
0
                std::move(orc_reader), _profile, _state, *_params, range, _kv_cache, _io_ctx.get(),
1325
0
                file_meta_cache_ptr);
1326
0
1327
0
        init_status = iceberg_reader->init_reader(
1328
0
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
1329
0
                _default_val_row_desc.get(), _col_name_to_slot_id,
1330
117
                &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
1331
117
        _cur_reader = std::move(iceberg_reader);
1332
0
    } else if (range.__isset.table_format_params &&
1333
0
               range.table_format_params.table_format_type == "paimon") {
1334
0
        std::unique_ptr<PaimonOrcReader> paimon_reader = PaimonOrcReader::create_unique(
1335
                std::move(orc_reader), _profile, _state, *_params, range, _kv_cache, _io_ctx.get(),
1336
0
                file_meta_cache_ptr);
1337
0
1338
0
        init_status = paimon_reader->init_reader(
1339
0
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
1340
0
                _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1341
117
                &_slot_id_to_filter_conjuncts);
1342
117
        RETURN_IF_ERROR(paimon_reader->init_row_filters());
1343
0
        _cur_reader = std::move(paimon_reader);
1344
0
    } else if (range.__isset.table_format_params &&
1345
0
               range.table_format_params.table_format_type == "hudi") {
1346
        std::unique_ptr<HudiOrcReader> hudi_reader =
1347
0
                HudiOrcReader::create_unique(std::move(orc_reader), _profile, _state, *_params,
1348
0
                                             range, _io_ctx.get(), file_meta_cache_ptr);
1349
0
1350
0
        init_status = hudi_reader->init_reader(
1351
0
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
1352
0
                _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1353
117
                &_slot_id_to_filter_conjuncts);
1354
117
        _cur_reader = std::move(hudi_reader);
1355
0
    } else if (range.__isset.table_format_params &&
1356
0
               range.table_format_params.table_format_type == "hive") {
1357
0
        std::unique_ptr<HiveOrcReader> hive_reader = HiveOrcReader::create_unique(
1358
                std::move(orc_reader), _profile, _state, *_params, range, _io_ctx.get(),
1359
0
                &_is_file_slot, file_meta_cache_ptr);
1360
0
1361
0
        init_status = hive_reader->init_reader(
1362
0
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
1363
0
                _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1364
117
                &_slot_id_to_filter_conjuncts);
1365
117
        _cur_reader = std::move(hive_reader);
1366
10
    } else if (range.__isset.table_format_params &&
1367
10
               range.table_format_params.table_format_type == "tvf") {
1368
10
        const orc::Type* orc_type_ptr = nullptr;
1369
        RETURN_IF_ERROR(orc_reader->get_file_type(&orc_type_ptr));
1370
10
1371
10
        std::shared_ptr<TableSchemaChangeHelper::Node> tvf_info_node = nullptr;
1372
10
        RETURN_IF_ERROR(TableSchemaChangeHelper::BuildTableInfoUtil::by_orc_name(
1373
10
                _real_tuple_desc, orc_type_ptr, tvf_info_node));
1374
10
        init_status = orc_reader->init_reader(
1375
107
                &_file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, false,
1376
107
                _real_tuple_desc, _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1377
96
                &_slot_id_to_filter_conjuncts, tvf_info_node);
1378
96
        _cur_reader = std::move(orc_reader);
1379
    } else if (_is_load) {
1380
96
        const orc::Type* orc_type_ptr = nullptr;
1381
96
        RETURN_IF_ERROR(orc_reader->get_file_type(&orc_type_ptr));
1382
96
1383
96
        std::map<std::string, std::string> file_lower_name_to_native;
1384
96
        for (uint64_t idx = 0; idx < orc_type_ptr->getSubtypeCount(); idx++) {
1385
96
            file_lower_name_to_native.emplace(doris::to_lower(orc_type_ptr->getFieldName(idx)),
1386
96
                                              orc_type_ptr->getFieldName(idx));
1387
96
        }
1388
96
1389
11
        auto load_info_node = std::make_shared<TableSchemaChangeHelper::StructNode>();
1390
11
        for (const auto slot : _real_tuple_desc->slots()) {
1391
            if (file_lower_name_to_native.contains(slot->col_name())) {
1392
11
                load_info_node->add_children(slot->col_name(),
1393
359
                                             file_lower_name_to_native[slot->col_name()],
1394
348
                                             TableSchemaChangeHelper::ConstNode::get_instance());
1395
348
            } else {
1396
348
                load_info_node->add_not_exist_children(slot->col_name());
1397
            }
1398
11
        }
1399
348
        init_status = orc_reader->init_reader(
1400
348
                &_file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, false,
1401
348
                _real_tuple_desc, _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1402
348
                &_slot_id_to_filter_conjuncts, load_info_node);
1403
348
        _cur_reader = std::move(orc_reader);
1404
348
    }
1405
0
1406
0
    return init_status;
1407
348
}
1408
11
1409
11
Status FileScanner::_set_fill_or_truncate_columns(bool need_to_get_parsed_schema) {
1410
11
    _missing_cols.clear();
1411
11
    _slot_lower_name_to_col_type.clear();
1412
11
    std::unordered_map<std::string, DataTypePtr> name_to_col_type;
1413
11
    RETURN_IF_ERROR(_cur_reader->get_columns(&name_to_col_type, &_missing_cols));
1414
    for (const auto& [col_name, col_type] : name_to_col_type) {
1415
117
        auto col_name_lower = to_lower(col_name);
1416
117
        if (_partition_col_descs.contains(col_name_lower)) {
1417
            /*
1418
2.70k
             * `_slot_lower_name_to_col_type` is used by `_init_src_block` and `_cast_to_input_block` during LOAD to
1419
2.70k
             * generate columns of the corresponding type, which records the columns existing in the file.
1420
2.70k
             *
1421
2.70k
             * When a column in `COLUMNS FROM PATH` exists in a file column, the column type in the block will
1422
2.70k
             * not match the slot type in `_output_tuple_desc`, causing an error when
1423
22.3k
             * Serde `deserialize_one_cell_from_json` fills the partition values.
1424
22.3k
             *
1425
22.3k
             * So for partition column not need fill _slot_lower_name_to_col_type.
1426
             */
1427
            continue;
1428
        }
1429
        _slot_lower_name_to_col_type.emplace(col_name_lower, col_type);
1430
    }
1431
1432
    if (!_fill_partition_from_path && config::enable_iceberg_partition_column_fallback) {
1433
        // check if the cols of _partition_col_descs are in _missing_cols
1434
        // if so, set _fill_partition_from_path to true and remove the col from _missing_cols
1435
        for (const auto& [col_name, col_type] : _partition_col_descs) {
1436
4
            if (_missing_cols.contains(col_name)) {
1437
4
                _fill_partition_from_path = true;
1438
22.3k
                _missing_cols.erase(col_name);
1439
22.3k
            }
1440
        }
1441
2.70k
    }
1442
1443
    RETURN_IF_ERROR(_generate_missing_columns());
1444
0
    if (_fill_partition_from_path) {
1445
0
        RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs));
1446
0
    } else {
1447
0
        // If the partition columns are not from path, we only fill the missing columns.
1448
0
        RETURN_IF_ERROR(_cur_reader->set_fill_columns({}, _missing_col_descs));
1449
0
    }
1450
0
    if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
1451
        fmt::memory_buffer col_buf;
1452
2.70k
        for (auto& col : _missing_cols) {
1453
2.70k
            fmt::format_to(col_buf, " {}", col);
1454
2.70k
        }
1455
2.70k
        VLOG_NOTICE << fmt::format("Unknown columns:{} in file {}", fmt::to_string(col_buf),
1456
                                   _current_range.path);
1457
0
    }
1458
0
1459
2.70k
    RETURN_IF_ERROR(_generate_truncate_columns(need_to_get_parsed_schema));
1460
0
    return Status::OK();
1461
0
}
1462
0
1463
0
Status FileScanner::_generate_truncate_columns(bool need_to_get_parsed_schema) {
1464
0
    _source_file_col_name_types.clear();
1465
0
    //  The col names and types of source file, such as parquet, orc files.
1466
0
    if (_state->query_options().truncate_char_or_varchar_columns && need_to_get_parsed_schema) {
1467
        std::vector<std::string> source_file_col_names;
1468
2.70k
        std::vector<DataTypePtr> source_file_col_types;
1469
2.70k
        Status status =
1470
2.70k
                _cur_reader->get_parsed_schema(&source_file_col_names, &source_file_col_types);
1471
        if (!status.ok() && status.code() != TStatusCode::NOT_IMPLEMENTED_ERROR) {
1472
2.70k
            return status;
1473
2.70k
        }
1474
        DCHECK_EQ(source_file_col_names.size(), source_file_col_types.size());
1475
2.70k
        for (int i = 0; i < source_file_col_names.size(); ++i) {
1476
0
            _source_file_col_name_types[to_lower(source_file_col_names[i])] =
1477
0
                    source_file_col_types[i];
1478
0
        }
1479
0
    }
1480
0
    return Status::OK();
1481
0
}
1482
0
1483
0
Status FileScanner::prepare_for_read_lines(const TFileRangeDesc& range) {
1484
0
    _current_range = range;
1485
0
1486
0
    _file_cache_statistics.reset(new io::FileCacheStatistics());
1487
0
    _file_reader_stats.reset(new io::FileReaderStats());
1488
0
1489
2.70k
    _file_read_bytes_counter =
1490
2.70k
            ADD_COUNTER_WITH_LEVEL(_profile, FileReadBytesProfile, TUnit::BYTES, 1);
1491
    _file_read_time_counter = ADD_TIMER_WITH_LEVEL(_profile, FileReadTimeProfile, 1);
1492
28
1493
28
    RETURN_IF_ERROR(_init_io_ctx());
1494
    _io_ctx->file_cache_stats = _file_cache_statistics.get();
1495
28
    _io_ctx->file_reader_stats = _file_reader_stats.get();
1496
28
    _default_val_row_desc.reset(new RowDescriptor((TupleDescriptor*)_real_tuple_desc));
1497
    RETURN_IF_ERROR(_init_expr_ctxes());
1498
28
1499
28
    // Since only one column is read from the file, there is no need to filter, so set these variables to empty.
1500
28
    _push_down_conjuncts.clear();
1501
    _not_single_slot_filter_conjuncts.clear();
1502
28
    _slot_id_to_filter_conjuncts.clear();
1503
28
    _kv_cache = nullptr;
1504
28
    return Status::OK();
1505
28
}
1506
28
1507
Status FileScanner::read_lines_from_range(const TFileRangeDesc& range,
1508
                                          const std::list<int64_t>& row_ids, Block* result_block,
1509
28
                                          const ExternalFileMappingInfo& external_info,
1510
28
                                          int64_t* init_reader_ms, int64_t* get_block_ms) {
1511
28
    _current_range = range;
1512
28
    RETURN_IF_ERROR(_generate_partition_columns());
1513
28
1514
28
    TFileFormatType::type format_type = _get_current_format_type();
1515
    Status init_status = Status::OK();
1516
1517
    auto file_meta_cache_ptr = external_info.enable_file_meta_cache
1518
                                       ? ExecEnv::GetInstance()->file_meta_cache()
1519
37
                                       : nullptr;
1520
37
1521
37
    RETURN_IF_ERROR(scope_timer_run(
1522
            [&]() -> Status {
1523
37
                switch (format_type) {
1524
37
                case TFileFormatType::FORMAT_PARQUET: {
1525
                    std::unique_ptr<ParquetReader> parquet_reader = ParquetReader::create_unique(
1526
37
                            _profile, *_params, range, 1, &_state->timezone_obj(), _io_ctx.get(),
1527
37
                            _state, file_meta_cache_ptr, false);
1528
37
1529
                    RETURN_IF_ERROR(parquet_reader->read_by_rows(row_ids));
1530
37
                    RETURN_IF_ERROR(
1531
37
                            _init_parquet_reader(std::move(parquet_reader), file_meta_cache_ptr));
1532
37
                    break;
1533
37
                }
1534
37
                case TFileFormatType::FORMAT_ORC: {
1535
37
                    std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
1536
37
                            _profile, _state, *_params, range, 1, _state->timezone(), _io_ctx.get(),
1537
                            file_meta_cache_ptr, false);
1538
37
1539
37
                    RETURN_IF_ERROR(orc_reader->read_by_rows(row_ids));
1540
37
                    RETURN_IF_ERROR(_init_orc_reader(std::move(orc_reader), file_meta_cache_ptr));
1541
37
                    break;
1542
37
                }
1543
37
                default: {
1544
37
                    return Status::NotSupported(
1545
37
                            "Not support create lines reader for file format: {},"
1546
37
                            "only support parquet and orc.",
1547
                            to_string(_params->format_type));
1548
37
                }
1549
37
                }
1550
37
                return Status::OK();
1551
37
            },
1552
37
            init_reader_ms));
1553
37
1554
37
    RETURN_IF_ERROR(_set_fill_or_truncate_columns(true));
1555
37
    _cur_reader_eof = false;
1556
37
1557
37
    RETURN_IF_ERROR(scope_timer_run(
1558
37
            [&]() -> Status {
1559
37
                while (!_cur_reader_eof) {
1560
37
                    bool eof = false;
1561
37
                    RETURN_IF_ERROR(_get_block_impl(_state, result_block, &eof));
1562
                }
1563
37
                return Status::OK();
1564
37
            },
1565
            get_block_ms));
1566
37
1567
37
    _cur_reader->collect_profile_before_close();
1568
37
    RETURN_IF_ERROR(_cur_reader->close());
1569
37
1570
37
    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
1571
37
    COUNTER_UPDATE(_file_read_time_counter, _file_reader_stats->read_time_ns);
1572
37
    return Status::OK();
1573
37
}
1574
37
1575
Status FileScanner::_generate_partition_columns() {
1576
37
    _partition_col_descs.clear();
1577
37
    _partition_value_is_null.clear();
1578
    const TFileRangeDesc& range = _current_range;
1579
37
    if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
1580
37
        for (const auto& slot_desc : _partition_slot_descs) {
1581
37
            if (slot_desc) {
1582
37
                auto it = _partition_slot_index_map.find(slot_desc->id());
1583
                if (it == std::end(_partition_slot_index_map)) {
1584
43
                    return Status::InternalError("Unknown source slot descriptor, slot_id={}",
1585
43
                                                 slot_desc->id());
1586
43
                }
1587
43
                const std::string& column_from_path = range.columns_from_path[it->second];
1588
43
                _partition_col_descs.emplace(slot_desc->col_name(),
1589
11
                                             std::make_tuple(column_from_path, slot_desc));
1590
11
                if (range.__isset.columns_from_path_is_null) {
1591
11
                    _partition_value_is_null.emplace(slot_desc->col_name(),
1592
11
                                                     range.columns_from_path_is_null[it->second]);
1593
0
                }
1594
0
            }
1595
0
        }
1596
11
    }
1597
11
    return Status::OK();
1598
11
}
1599
11
1600
0
Status FileScanner::_generate_missing_columns() {
1601
0
    _missing_col_descs.clear();
1602
0
    if (!_missing_cols.empty()) {
1603
11
        for (auto* slot_desc : _real_tuple_desc->slots()) {
1604
11
            if (!_missing_cols.contains(slot_desc->col_name())) {
1605
6
                continue;
1606
43
            }
1607
43
1608
            auto it = _col_default_value_ctx.find(slot_desc->col_name());
1609
2.70k
            if (it == _col_default_value_ctx.end()) {
1610
2.70k
                return Status::InternalError("failed to find default value expr for slot: {}",
1611
2.70k
                                             slot_desc->col_name());
1612
3
            }
1613
3
            _missing_col_descs.emplace(slot_desc->col_name(), it->second);
1614
2
        }
1615
2
    }
1616
    return Status::OK();
1617
1
}
1618
1
1619
0
Status FileScanner::_init_expr_ctxes() {
1620
0
    std::map<SlotId, int> full_src_index_map;
1621
0
    std::map<SlotId, SlotDescriptor*> full_src_slot_map;
1622
1
    std::map<std::string, int> partition_name_to_key_index_map;
1623
1
    int index = 0;
1624
1
    for (const auto& slot_desc : _real_tuple_desc->slots()) {
1625
2.70k
        full_src_slot_map.emplace(slot_desc->id(), slot_desc);
1626
2.70k
        full_src_index_map.emplace(slot_desc->id(), index++);
1627
    }
1628
2.69k
1629
2.69k
    // For external table query, find the index of column in path.
1630
2.69k
    // Because query doesn't always search for all columns in a table
1631
2.69k
    // and the order of selected columns is random.
1632
2.69k
    // All ranges in _ranges vector should have identical columns_from_path_keys
1633
21.8k
    // because they are all file splits for the same external table.
1634
21.8k
    // So here use the first element of _ranges to fill the partition_name_to_key_index_map
1635
21.8k
    if (_current_range.__isset.columns_from_path_keys) {
1636
21.8k
        std::vector<std::string> key_map = _current_range.columns_from_path_keys;
1637
        if (!key_map.empty()) {
1638
            for (size_t i = 0; i < key_map.size(); i++) {
1639
                partition_name_to_key_index_map.emplace(key_map[i], i);
1640
            }
1641
        }
1642
    }
1643
1644
2.69k
    _num_of_columns_from_file = _params->num_of_columns_from_file;
1645
420
1646
420
    for (const auto& slot_info : _params->required_slots) {
1647
17
        auto slot_id = slot_info.slot_id;
1648
11
        auto it = full_src_slot_map.find(slot_id);
1649
11
        if (it == std::end(full_src_slot_map)) {
1650
6
            return Status::InternalError(
1651
420
                    fmt::format("Unknown source slot descriptor, slot_id={}", slot_id));
1652
        }
1653
2.69k
        if (it->second->col_name().starts_with(BeConsts::GLOBAL_ROWID_COL)) {
1654
            _row_id_column_iterator_pair.second = _default_val_row_desc->get_column_id(slot_id);
1655
21.8k
            continue;
1656
21.8k
        }
1657
21.8k
1658
21.8k
        if (slot_info.is_file_slot) {
1659
0
            _is_file_slot.emplace(slot_id);
1660
0
            _file_slot_descs.emplace_back(it->second);
1661
0
            _file_col_names.push_back(it->second->col_name());
1662
21.8k
        }
1663
13
1664
13
        if (partition_name_to_key_index_map.contains(it->second->col_name())) {
1665
13
            if (slot_info.is_file_slot) {
1666
                // If there is slot which is both a partition column and a file column,
1667
21.8k
                // we should not fill the partition column from path.
1668
21.8k
                _fill_partition_from_path = false;
1669
21.8k
            } else if (!_fill_partition_from_path) {
1670
21.8k
                // This should not happen
1671
21.8k
                return Status::InternalError(
1672
                        "Partition column {} is not a file column, but there is already a column "
1673
21.8k
                        "which is both a partition column and a file column.",
1674
11
                        it->second->col_name());
1675
            }
1676
            _partition_slot_descs.emplace_back(it->second);
1677
0
            if (_is_load) {
1678
11
                auto iti = full_src_index_map.find(slot_id);
1679
                _partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file);
1680
0
            } else {
1681
0
                auto kit = partition_name_to_key_index_map.find(it->second->col_name());
1682
0
                _partition_slot_index_map.emplace(slot_id, kit->second);
1683
0
            }
1684
0
        }
1685
11
    }
1686
11
1687
11
    // set column name to default value expr map
1688
11
    for (auto* slot_desc : _real_tuple_desc->slots()) {
1689
11
        VExprContextSPtr ctx;
1690
0
        auto it = _params->default_value_of_src_slot.find(slot_desc->id());
1691
0
        if (it != std::end(_params->default_value_of_src_slot)) {
1692
0
            if (!it->second.nodes.empty()) {
1693
11
                RETURN_IF_ERROR(VExpr::create_expr_tree(it->second, ctx));
1694
21.8k
                RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc));
1695
                RETURN_IF_ERROR(ctx->open(_state));
1696
            }
1697
21.8k
            // if expr is empty, the default value will be null
1698
21.8k
            _col_default_value_ctx.emplace(slot_desc->col_name(), ctx);
1699
21.8k
        }
1700
21.8k
    }
1701
21.4k
1702
17.6k
    if (_is_load) {
1703
17.6k
        // follow desc expr map is only for load task.
1704
17.6k
        bool has_slot_id_map = _params->__isset.dest_sid_to_src_sid_without_trans;
1705
17.6k
        int idx = 0;
1706
        for (auto* slot_desc : _output_tuple_desc->slots()) {
1707
21.4k
            auto it = _params->expr_of_dest_slot.find(slot_desc->id());
1708
21.4k
            if (it == std::end(_params->expr_of_dest_slot)) {
1709
21.8k
                return Status::InternalError("No expr for dest slot, id={}, name={}",
1710
                                             slot_desc->id(), slot_desc->col_name());
1711
2.69k
            }
1712
1713
2.15k
            VExprContextSPtr ctx;
1714
2.15k
            if (!it->second.nodes.empty()) {
1715
24.2k
                RETURN_IF_ERROR(VExpr::create_expr_tree(it->second, ctx));
1716
24.2k
                RETURN_IF_ERROR(ctx->prepare(_state, *_src_row_desc));
1717
24.2k
                RETURN_IF_ERROR(ctx->open(_state));
1718
0
            }
1719
0
            _dest_vexpr_ctx.emplace_back(ctx);
1720
0
            _dest_slot_name_to_idx[slot_desc->col_name()] = idx++;
1721
1722
24.2k
            if (has_slot_id_map) {
1723
24.2k
                auto it1 = _params->dest_sid_to_src_sid_without_trans.find(slot_desc->id());
1724
24.2k
                if (it1 == std::end(_params->dest_sid_to_src_sid_without_trans)) {
1725
24.2k
                    _src_slot_descs_order_by_dest.emplace_back(nullptr);
1726
24.2k
                } else {
1727
24.2k
                    auto _src_slot_it = full_src_slot_map.find(it1->second);
1728
24.2k
                    if (_src_slot_it == std::end(full_src_slot_map)) {
1729
24.2k
                        return Status::InternalError("No src slot {} in src slot descs",
1730
                                                     it1->second);
1731
24.2k
                    }
1732
24.1k
                    _dest_slot_to_src_slot_index.emplace(_src_slot_descs_order_by_dest.size(),
1733
24.1k
                                                         full_src_index_map[_src_slot_it->first]);
1734
6.02k
                    _src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second);
1735
18.1k
                }
1736
18.1k
            }
1737
18.1k
        }
1738
0
    }
1739
0
    return Status::OK();
1740
0
}
1741
18.1k
1742
18.1k
Status FileScanner::close(RuntimeState* state) {
1743
18.1k
    if (!_try_close()) {
1744
18.1k
        return Status::OK();
1745
24.1k
    }
1746
24.2k
1747
2.15k
    if (_cur_reader) {
1748
2.69k
        RETURN_IF_ERROR(_cur_reader->close());
1749
2.69k
    }
1750
1751
2.67k
    RETURN_IF_ERROR(Scanner::close(state));
1752
2.67k
    return Status::OK();
1753
0
}
1754
0
1755
void FileScanner::try_stop() {
1756
2.67k
    Scanner::try_stop();
1757
13
    if (_io_ctx) {
1758
13
        _io_ctx->should_stop = true;
1759
    }
1760
2.67k
}
1761
2.67k
1762
2.67k
void FileScanner::update_realtime_counters() {
1763
    FileScanLocalState* local_state = static_cast<FileScanLocalState*>(_local_state);
1764
2.67k
1765
2.67k
    COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes);
1766
2.67k
    COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows);
1767
2.67k
1768
2.67k
    _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_rows(
1769
2.67k
            _file_reader_stats->read_rows);
1770
    _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes(
1771
2.67k
            _file_reader_stats->read_bytes);
1772
2.67k
1773
    int64_t delta_bytes_read_from_local =
1774
2.67k
            _file_cache_statistics->bytes_read_from_local - _last_bytes_read_from_local;
1775
2.67k
    int64_t delta_bytes_read_from_remote =
1776
            _file_cache_statistics->bytes_read_from_remote - _last_bytes_read_from_remote;
1777
2.67k
    if (_file_cache_statistics->bytes_read_from_local == 0 &&
1778
2.67k
        _file_cache_statistics->bytes_read_from_remote == 0) {
1779
2.67k
        _state->get_query_ctx()
1780
2.67k
                ->resource_ctx()
1781
                ->io_context()
1782
2.67k
                ->update_scan_bytes_from_remote_storage(_file_reader_stats->read_bytes);
1783
2.67k
        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
1784
2.67k
                _file_reader_stats->read_bytes);
1785
2.67k
    } else {
1786
2.67k
        _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage(
1787
2.67k
                delta_bytes_read_from_local);
1788
2.67k
        _state->get_query_ctx()
1789
2.67k
                ->resource_ctx()
1790
2.67k
                ->io_context()
1791
2.67k
                ->update_scan_bytes_from_remote_storage(delta_bytes_read_from_remote);
1792
2.67k
        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
1793
2.67k
                delta_bytes_read_from_local);
1794
2.67k
        DorisMetrics::instance()->query_scan_bytes_from_remote->increment(
1795
0
                delta_bytes_read_from_remote);
1796
0
    }
1797
0
1798
0
    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
1799
0
1800
0
    DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes);
1801
0
    DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows);
1802
0
1803
0
    _file_reader_stats->read_bytes = 0;
1804
0
    _file_reader_stats->read_rows = 0;
1805
0
1806
    _last_bytes_read_from_local = _file_cache_statistics->bytes_read_from_local;
1807
2.67k
    _last_bytes_read_from_remote = _file_cache_statistics->bytes_read_from_remote;
1808
}
1809
2.67k
1810
2.67k
void FileScanner::_collect_profile_before_close() {
1811
    Scanner::_collect_profile_before_close();
1812
2.67k
    if (config::enable_file_cache && _state->query_options().enable_file_cache &&
1813
2.67k
        _profile != nullptr) {
1814
        io::FileCacheProfileReporter cache_profile(_profile);
1815
2.67k
        cache_profile.update(_file_cache_statistics.get());
1816
2.67k
        _state->get_query_ctx()->resource_ctx()->io_context()->update_bytes_write_into_cache(
1817
2.67k
                _file_cache_statistics->bytes_write_into_cache);
1818
    }
1819
2.67k
1820
2.67k
    if (_cur_reader != nullptr) {
1821
2.67k
        _cur_reader->collect_profile_before_close();
1822
2.67k
    }
1823
0
1824
0
    FileScanLocalState* local_state = static_cast<FileScanLocalState*>(_local_state);
1825
0
    COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes);
1826
0
    COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows);
1827
0
1828
    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
1829
2.67k
    COUNTER_UPDATE(_file_read_calls_counter, _file_reader_stats->read_calls);
1830
13
    COUNTER_UPDATE(_file_read_time_counter, _file_reader_stats->read_time_ns);
1831
13
1832
    DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes);
1833
2.67k
    DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows);
1834
2.67k
}
1835
2.67k
1836
} // namespace doris