Coverage Report

Created: 2026-04-07 22:25

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/file_scanner.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/file_scanner.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Exprs_types.h>
22
#include <gen_cpp/Metrics_types.h>
23
#include <gen_cpp/Opcodes_types.h>
24
#include <gen_cpp/PaloInternalService_types.h>
25
#include <gen_cpp/PlanNodes_types.h>
26
#include <glog/logging.h>
27
28
#include <algorithm>
29
#include <boost/iterator/iterator_facade.hpp>
30
#include <map>
31
#include <ranges>
32
#include <tuple>
33
#include <unordered_map>
34
#include <utility>
35
36
#include "common/compiler_util.h" // IWYU pragma: keep
37
#include "common/config.h"
38
#include "common/consts.h"
39
#include "common/logging.h"
40
#include "common/status.h"
41
#include "core/block/column_with_type_and_name.h"
42
#include "core/block/columns_with_type_and_name.h"
43
#include "core/column/column.h"
44
#include "core/column/column_nullable.h"
45
#include "core/column/column_vector.h"
46
#include "core/data_type/data_type.h"
47
#include "core/data_type/data_type_nullable.h"
48
#include "core/data_type/data_type_string.h"
49
#include "core/string_ref.h"
50
#include "exec/common/stringop_substring.h"
51
#include "exec/rowid_fetcher.h"
52
#include "exec/scan/scan_node.h"
53
#include "exprs/aggregate/aggregate_function.h"
54
#include "exprs/function/function.h"
55
#include "exprs/function/simple_function_factory.h"
56
#include "exprs/vexpr.h"
57
#include "exprs/vexpr_context.h"
58
#include "exprs/vexpr_fwd.h"
59
#include "exprs/vslot_ref.h"
60
#include "format/arrow/arrow_stream_reader.h"
61
#include "format/csv/csv_reader.h"
62
#include "format/json/new_json_reader.h"
63
#include "format/native/native_reader.h"
64
#include "format/orc/vorc_reader.h"
65
#include "format/parquet/vparquet_reader.h"
66
#include "format/table/hive_reader.h"
67
#include "format/table/hudi_jni_reader.h"
68
#include "format/table/hudi_reader.h"
69
#include "format/table/iceberg_reader.h"
70
#include "format/table/iceberg_sys_table_jni_reader.h"
71
#include "format/table/jdbc_jni_reader.h"
72
#include "format/table/max_compute_jni_reader.h"
73
#include "format/table/paimon_cpp_reader.h"
74
#include "format/table/paimon_jni_reader.h"
75
#include "format/table/paimon_predicate_converter.h"
76
#include "format/table/paimon_reader.h"
77
#include "format/table/remote_doris_reader.h"
78
#include "format/table/transactional_hive_reader.h"
79
#include "format/table/trino_connector_jni_reader.h"
80
#include "format/text/text_reader.h"
81
#ifdef BUILD_RUST_READERS
82
#include "format/lance/lance_rust_reader.h"
83
#endif
84
#include "io/cache/block_file_cache_profile.h"
85
#include "load/group_commit/wal/wal_reader.h"
86
#include "runtime/descriptors.h"
87
#include "runtime/runtime_profile.h"
88
#include "runtime/runtime_state.h"
89
90
namespace cctz {
91
class time_zone;
92
} // namespace cctz
93
namespace doris {
94
class ShardedKVCache;
95
} // namespace doris
96
97
namespace doris {
98
#include "common/compile_check_begin.h"
99
using namespace ErrorCode;
100
101
const std::string FileScanner::FileReadBytesProfile = "FileReadBytes";
102
const std::string FileScanner::FileReadTimeProfile = "FileReadTime";
103
104
FileScanner::FileScanner(RuntimeState* state, FileScanLocalState* local_state, int64_t limit,
105
                         std::shared_ptr<SplitSourceConnector> split_source,
106
                         RuntimeProfile* profile, ShardedKVCache* kv_cache,
107
                         const std::unordered_map<std::string, int>* colname_to_slot_id)
108
86.7k
        : Scanner(state, local_state, limit, profile),
109
86.7k
          _split_source(split_source),
110
86.7k
          _cur_reader(nullptr),
111
86.7k
          _cur_reader_eof(false),
112
86.7k
          _kv_cache(kv_cache),
113
86.7k
          _strict_mode(false),
114
86.7k
          _col_name_to_slot_id(colname_to_slot_id) {
115
86.7k
    if (state->get_query_ctx() != nullptr &&
116
86.7k
        state->get_query_ctx()->file_scan_range_params_map.count(local_state->parent_id()) > 0) {
117
84.3k
        _params = &(state->get_query_ctx()->file_scan_range_params_map[local_state->parent_id()]);
118
84.3k
    } else {
119
        // old fe thrift protocol
120
2.45k
        _params = _split_source->get_params();
121
2.45k
    }
122
86.7k
    if (_params->__isset.strict_mode) {
123
2.48k
        _strict_mode = _params->strict_mode;
124
2.48k
    }
125
126
    // For load scanner, there are input and output tuple.
127
    // For query scanner, there is only output tuple
128
86.7k
    _input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_params->src_tuple_id);
129
86.7k
    _real_tuple_desc = _input_tuple_desc == nullptr ? _output_tuple_desc : _input_tuple_desc;
130
86.7k
    _is_load = (_input_tuple_desc != nullptr);
131
86.7k
}
132
133
86.6k
Status FileScanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) {
134
86.6k
    RETURN_IF_ERROR(Scanner::init(state, conjuncts));
135
86.6k
    _get_block_timer =
136
86.6k
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerGetBlockTime", 1);
137
86.6k
    _cast_to_input_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
138
86.6k
                                                      "FileScannerCastInputBlockTime", 1);
139
86.6k
    _fill_missing_columns_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
140
86.6k
                                                       "FileScannerFillMissingColumnTime", 1);
141
86.6k
    _pre_filter_timer =
142
86.6k
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerPreFilterTimer", 1);
143
86.6k
    _convert_to_output_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
144
86.6k
                                                          "FileScannerConvertOuputBlockTime", 1);
145
86.6k
    _runtime_filter_partition_prune_timer = ADD_TIMER_WITH_LEVEL(
146
86.6k
            _local_state->scanner_profile(), "FileScannerRuntimeFilterPartitionPruningTime", 1);
147
86.6k
    _empty_file_counter =
148
86.6k
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "EmptyFileNum", TUnit::UNIT, 1);
149
86.6k
    _not_found_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
150
86.6k
                                                     "NotFoundFileNum", TUnit::UNIT, 1);
151
86.6k
    _fully_skipped_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
152
86.6k
                                                         "FullySkippedFileNum", TUnit::UNIT, 1);
153
86.6k
    _file_counter =
154
86.6k
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT, 1);
155
156
86.6k
    _file_read_bytes_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
157
86.6k
                                                      FileReadBytesProfile, TUnit::BYTES, 1);
158
86.6k
    _file_read_calls_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
159
86.6k
                                                      "FileReadCalls", TUnit::UNIT, 1);
160
86.6k
    _file_read_time_counter =
161
86.6k
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), FileReadTimeProfile, 1);
162
163
86.6k
    _runtime_filter_partition_pruned_range_counter =
164
86.6k
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
165
86.6k
                                   "RuntimeFilterPartitionPrunedRangeNum", TUnit::UNIT, 1);
166
167
86.6k
    _file_cache_statistics.reset(new io::FileCacheStatistics());
168
86.6k
    _file_reader_stats.reset(new io::FileReaderStats());
169
170
86.6k
    RETURN_IF_ERROR(_init_io_ctx());
171
86.6k
    _io_ctx->file_cache_stats = _file_cache_statistics.get();
172
86.6k
    _io_ctx->file_reader_stats = _file_reader_stats.get();
173
86.6k
    _io_ctx->is_disposable = _state->query_options().disable_file_cache;
174
175
86.6k
    if (_is_load) {
176
2.48k
        _src_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
177
2.48k
                                              std::vector<TupleId>({_input_tuple_desc->id()})));
178
        // prepare pre filters
179
2.48k
        if (_params->__isset.pre_filter_exprs_list) {
180
4
            RETURN_IF_ERROR(doris::VExpr::create_expr_trees(_params->pre_filter_exprs_list,
181
4
                                                            _pre_conjunct_ctxs));
182
2.47k
        } else if (_params->__isset.pre_filter_exprs) {
183
0
            VExprContextSPtr context;
184
0
            RETURN_IF_ERROR(doris::VExpr::create_expr_tree(_params->pre_filter_exprs, context));
185
0
            _pre_conjunct_ctxs.emplace_back(context);
186
0
        }
187
188
2.48k
        for (auto& conjunct : _pre_conjunct_ctxs) {
189
5
            RETURN_IF_ERROR(conjunct->prepare(_state, *_src_row_desc));
190
5
            RETURN_IF_ERROR(conjunct->open(_state));
191
5
        }
192
193
2.48k
        _dest_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
194
2.48k
                                               std::vector<TupleId>({_output_tuple_desc->id()})));
195
2.48k
    }
196
197
86.6k
    _default_val_row_desc.reset(
198
86.6k
            new RowDescriptor(_state->desc_tbl(), std::vector<TupleId>({_real_tuple_desc->id()})));
199
200
86.6k
    return Status::OK();
201
86.6k
}
202
203
// check if the expr is a partition pruning expr
204
27.4k
bool FileScanner::_check_partition_prune_expr(const VExprSPtr& expr) {
205
27.4k
    if (expr->is_slot_ref()) {
206
10.8k
        auto* slot_ref = static_cast<VSlotRef*>(expr.get());
207
10.8k
        return _partition_slot_index_map.find(slot_ref->slot_id()) !=
208
10.8k
               _partition_slot_index_map.end();
209
10.8k
    }
210
16.6k
    if (expr->is_literal()) {
211
5.14k
        return true;
212
5.14k
    }
213
16.8k
    return std::ranges::all_of(expr->children(), [this](const auto& child) {
214
16.8k
        return _check_partition_prune_expr(child);
215
16.8k
    });
216
16.6k
}
217
218
10.4k
void FileScanner::_init_runtime_filter_partition_prune_ctxs() {
219
10.4k
    _runtime_filter_partition_prune_ctxs.clear();
220
10.7k
    for (auto& conjunct : _conjuncts) {
221
10.7k
        auto impl = conjunct->root()->get_impl();
222
        // If impl is not null, which means this a conjuncts from runtime filter.
223
10.7k
        auto expr = impl ? impl : conjunct->root();
224
10.7k
        if (_check_partition_prune_expr(expr)) {
225
7.57k
            _runtime_filter_partition_prune_ctxs.emplace_back(conjunct);
226
7.57k
        }
227
10.7k
    }
228
10.4k
}
229
230
6.77k
void FileScanner::_init_runtime_filter_partition_prune_block() {
231
    // init block with empty column
232
65.0k
    for (auto const* slot_desc : _real_tuple_desc->slots()) {
233
65.0k
        _runtime_filter_partition_prune_block.insert(
234
65.0k
                ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
235
65.0k
                                      slot_desc->get_data_type_ptr(), slot_desc->col_name()));
236
65.0k
    }
237
6.77k
}
238
239
7.34k
Status FileScanner::_process_runtime_filters_partition_prune(bool& can_filter_all) {
240
7.34k
    SCOPED_TIMER(_runtime_filter_partition_prune_timer);
241
7.34k
    if (_runtime_filter_partition_prune_ctxs.empty() || _partition_col_descs.empty()) {
242
4.77k
        return Status::OK();
243
4.77k
    }
244
2.57k
    size_t partition_value_column_size = 1;
245
246
    // 1. Get partition key values to string columns.
247
2.57k
    std::unordered_map<SlotId, MutableColumnPtr> partition_slot_id_to_column;
248
3.40k
    for (auto const& partition_col_desc : _partition_col_descs) {
249
3.40k
        const auto& [partition_value, partition_slot_desc] = partition_col_desc.second;
250
3.40k
        auto data_type = partition_slot_desc->get_data_type_ptr();
251
3.40k
        auto test_serde = data_type->get_serde();
252
3.40k
        auto partition_value_column = data_type->create_column();
253
3.40k
        auto* col_ptr = static_cast<IColumn*>(partition_value_column.get());
254
3.40k
        Slice slice(partition_value.data(), partition_value.size());
255
3.40k
        uint64_t num_deserialized = 0;
256
3.40k
        DataTypeSerDe::FormatOptions options {};
257
3.40k
        if (_partition_value_is_null.contains(partition_slot_desc->col_name())) {
258
            // for iceberg/paimon table
259
            // NOTICE: column is always be nullable for iceberg/paimon table now
260
1.24k
            DCHECK(data_type->is_nullable());
261
1.24k
            test_serde = test_serde->get_nested_serdes()[0];
262
1.24k
            auto* null_column = assert_cast<ColumnNullable*>(col_ptr);
263
1.24k
            if (_partition_value_is_null[partition_slot_desc->col_name()]) {
264
182
                null_column->insert_many_defaults(partition_value_column_size);
265
1.06k
            } else {
266
                // If the partition value is not null, we set null map to 0 and deserialize it normally.
267
1.06k
                null_column->get_null_map_column().insert_many_vals(0, partition_value_column_size);
268
1.06k
                RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
269
1.06k
                        null_column->get_nested_column(), slice, partition_value_column_size,
270
1.06k
                        &num_deserialized, options));
271
1.06k
            }
272
2.16k
        } else {
273
            // for hive/hudi table, the null value is set as "\\N"
274
            // TODO: this will be unified as iceberg/paimon table in the future
275
2.16k
            RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
276
2.16k
                    *col_ptr, slice, partition_value_column_size, &num_deserialized, options));
277
2.16k
        }
278
279
3.40k
        partition_slot_id_to_column[partition_slot_desc->id()] = std::move(partition_value_column);
280
3.40k
    }
281
282
    // 2. Fill _runtime_filter_partition_prune_block from the partition column, then execute conjuncts and filter block.
283
    // 2.1 Fill _runtime_filter_partition_prune_block from the partition column to match the conjuncts executing.
284
2.57k
    size_t index = 0;
285
2.57k
    bool first_column_filled = false;
286
6.25k
    for (auto const* slot_desc : _real_tuple_desc->slots()) {
287
6.25k
        if (partition_slot_id_to_column.find(slot_desc->id()) !=
288
6.25k
            partition_slot_id_to_column.end()) {
289
3.40k
            auto data_type = slot_desc->get_data_type_ptr();
290
3.40k
            auto partition_value_column = std::move(partition_slot_id_to_column[slot_desc->id()]);
291
3.40k
            if (data_type->is_nullable()) {
292
3.40k
                _runtime_filter_partition_prune_block.insert(
293
3.40k
                        index, ColumnWithTypeAndName(
294
3.40k
                                       ColumnNullable::create(
295
3.40k
                                               std::move(partition_value_column),
296
3.40k
                                               ColumnUInt8::create(partition_value_column_size, 0)),
297
3.40k
                                       data_type, slot_desc->col_name()));
298
3.40k
            } else {
299
4
                _runtime_filter_partition_prune_block.insert(
300
4
                        index, ColumnWithTypeAndName(std::move(partition_value_column), data_type,
301
4
                                                     slot_desc->col_name()));
302
4
            }
303
3.40k
            if (index == 0) {
304
1.18k
                first_column_filled = true;
305
1.18k
            }
306
3.40k
        }
307
6.25k
        index++;
308
6.25k
    }
309
310
    // 2.2 Execute conjuncts.
311
2.57k
    if (!first_column_filled) {
312
        // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
313
        // The following process may be tricky and time-consuming, but we have no other way.
314
1.39k
        _runtime_filter_partition_prune_block.get_by_position(0).column->assume_mutable()->resize(
315
1.39k
                partition_value_column_size);
316
1.39k
    }
317
2.57k
    IColumn::Filter result_filter(_runtime_filter_partition_prune_block.rows(), 1);
318
2.57k
    RETURN_IF_ERROR(VExprContext::execute_conjuncts(_runtime_filter_partition_prune_ctxs, nullptr,
319
2.57k
                                                    &_runtime_filter_partition_prune_block,
320
2.57k
                                                    &result_filter, &can_filter_all));
321
2.57k
    return Status::OK();
322
2.57k
}
323
324
20.5k
Status FileScanner::_process_conjuncts() {
325
20.5k
    _slot_id_to_filter_conjuncts.clear();
326
20.5k
    _not_single_slot_filter_conjuncts.clear();
327
28.3k
    for (auto& conjunct : _push_down_conjuncts) {
328
28.3k
        auto impl = conjunct->root()->get_impl();
329
        // If impl is not null, which means this a conjuncts from runtime filter.
330
28.3k
        auto cur_expr = impl ? impl : conjunct->root();
331
332
28.3k
        std::vector<int> slot_ids;
333
28.3k
        _get_slot_ids(cur_expr.get(), &slot_ids);
334
28.3k
        if (slot_ids.empty()) {
335
1
            _not_single_slot_filter_conjuncts.emplace_back(conjunct);
336
1
            continue;
337
1
        }
338
28.3k
        bool single_slot = true;
339
28.4k
        for (int i = 1; i < slot_ids.size(); i++) {
340
876
            if (slot_ids[i] != slot_ids[0]) {
341
768
                single_slot = false;
342
768
                break;
343
768
            }
344
876
        }
345
28.3k
        if (single_slot) {
346
27.5k
            SlotId slot_id = slot_ids[0];
347
27.5k
            _slot_id_to_filter_conjuncts[slot_id].emplace_back(conjunct);
348
27.5k
        } else {
349
774
            _not_single_slot_filter_conjuncts.emplace_back(conjunct);
350
774
        }
351
28.3k
    }
352
20.5k
    return Status::OK();
353
20.5k
}
354
355
57.2k
Status FileScanner::_process_late_arrival_conjuncts() {
356
57.2k
    if (_push_down_conjuncts.size() < _conjuncts.size()) {
357
20.5k
        _push_down_conjuncts = _conjuncts;
358
        // Do not clear _conjuncts here!
359
        // We must keep it for fallback filtering, especially when mixing
360
        // Native readers (which use _push_down_conjuncts) and JNI readers (which rely on _conjuncts).
361
        // _conjuncts.clear();
362
20.5k
        RETURN_IF_ERROR(_process_conjuncts());
363
20.5k
    }
364
57.2k
    if (_applied_rf_num == _total_rf_num) {
365
55.0k
        _local_state->scanner_profile()->add_info_string("ApplyAllRuntimeFilters", "True");
366
55.0k
    }
367
57.2k
    return Status::OK();
368
57.2k
}
369
370
60.4k
void FileScanner::_get_slot_ids(VExpr* expr, std::vector<int>* slot_ids) {
371
61.3k
    for (auto& child_expr : expr->children()) {
372
61.3k
        if (child_expr->is_slot_ref()) {
373
29.2k
            VSlotRef* slot_ref = reinterpret_cast<VSlotRef*>(child_expr.get());
374
29.2k
            SlotDescriptor* slot_desc = _state->desc_tbl().get_slot_descriptor(slot_ref->slot_id());
375
29.2k
            slot_desc->set_is_predicate(true);
376
29.2k
            slot_ids->emplace_back(slot_ref->slot_id());
377
32.0k
        } else {
378
32.0k
            _get_slot_ids(child_expr.get(), slot_ids);
379
32.0k
        }
380
61.3k
    }
381
60.4k
}
382
383
86.8k
Status FileScanner::_open_impl(RuntimeState* state) {
384
86.8k
    RETURN_IF_CANCELLED(state);
385
86.8k
    RETURN_IF_ERROR(Scanner::_open_impl(state));
386
86.8k
    if (_local_state) {
387
86.7k
        _condition_cache_digest = _local_state->get_condition_cache_digest();
388
86.7k
    }
389
86.8k
    RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range));
390
86.8k
    if (_first_scan_range) {
391
50.2k
        RETURN_IF_ERROR(_init_expr_ctxes());
392
50.2k
        if (_state->query_options().enable_runtime_filter_partition_prune &&
393
50.2k
            !_partition_slot_index_map.empty()) {
394
6.77k
            _init_runtime_filter_partition_prune_ctxs();
395
6.77k
            _init_runtime_filter_partition_prune_block();
396
6.77k
        }
397
50.2k
    } else {
398
        // there's no scan range in split source. stop scanner directly.
399
36.5k
        _scanner_eof = true;
400
36.5k
    }
401
402
86.8k
    return Status::OK();
403
86.8k
}
404
405
263k
Status FileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
406
263k
    Status st = _get_block_wrapped(state, block, eof);
407
408
263k
    if (!st.ok()) {
409
        // add cur path in error msg for easy debugging
410
76
        return std::move(st.append(". cur path: " + get_current_scan_range_name()));
411
76
    }
412
263k
    return st;
413
263k
}
414
415
// For query:
416
//                              [exist cols]  [non-exist cols]  [col from path]  input  output
417
//                              A     B    C  D                 E
418
// _init_src_block              x     x    x  x                 x                -      x
419
// get_next_block               x     x    x  -                 -                -      x
420
// _cast_to_input_block         -     -    -  -                 -                -      -
421
// _fill_columns_from_path      -     -    -  -                 x                -      x
422
// _fill_missing_columns        -     -    -  x                 -                -      x
423
// _convert_to_output_block     -     -    -  -                 -                -      -
424
//
425
// For load:
426
//                              [exist cols]  [non-exist cols]  [col from path]  input  output
427
//                              A     B    C  D                 E
428
// _init_src_block              x     x    x  x                 x                x      -
429
// get_next_block               x     x    x  -                 -                x      -
430
// _cast_to_input_block         x     x    x  -                 -                x      -
431
// _fill_columns_from_path      -     -    -  -                 x                x      -
432
// _fill_missing_columns        -     -    -  x                 -                x      -
433
// _convert_to_output_block     -     -    -  -                 -                -      x
434
263k
Status FileScanner::_get_block_wrapped(RuntimeState* state, Block* block, bool* eof) {
435
263k
    do {
436
263k
        RETURN_IF_CANCELLED(state);
437
263k
        if (_cur_reader == nullptr || _cur_reader_eof) {
438
157k
            _finalize_reader_condition_cache();
439
            // The file may not exist because the file list is got from meta cache,
440
            // And the file may already be removed from storage.
441
            // Just ignore not found files.
442
157k
            Status st = _get_next_reader();
443
157k
            if (st.is<ErrorCode::NOT_FOUND>() && config::ignore_not_found_file_in_external_table) {
444
0
                _cur_reader_eof = true;
445
0
                COUNTER_UPDATE(_not_found_file_counter, 1);
446
0
                continue;
447
157k
            } else if (st.is<ErrorCode::END_OF_FILE>()) {
448
12
                _cur_reader_eof = true;
449
12
                COUNTER_UPDATE(_fully_skipped_file_counter, 1);
450
12
                continue;
451
157k
            } else if (!st) {
452
4
                return st;
453
4
            }
454
157k
            _init_reader_condition_cache();
455
157k
        }
456
457
263k
        if (_scanner_eof) {
458
86.2k
            *eof = true;
459
86.2k
            return Status::OK();
460
86.2k
        }
461
462
        // Init src block for load job based on the data file schema (e.g. parquet)
463
        // For query job, simply set _src_block_ptr to block.
464
176k
        size_t read_rows = 0;
465
176k
        RETURN_IF_ERROR(_init_src_block(block));
466
176k
        if (_need_iceberg_rowid_column && _current_range.__isset.table_format_params &&
467
176k
            _current_range.table_format_params.table_format_type == "iceberg") {
468
230
            if (auto* iceberg_reader = dynamic_cast<IcebergTableReader*>(_cur_reader.get())) {
469
230
                iceberg_reader->set_row_id_column_position(_iceberg_rowid_column_pos);
470
230
            }
471
230
        }
472
176k
        {
473
176k
            SCOPED_TIMER(_get_block_timer);
474
475
            // Read next block.
476
            // Some of column in block may not be filled (column not exist in file)
477
176k
            RETURN_IF_ERROR(
478
176k
                    _cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof));
479
176k
        }
480
        // use read_rows instead of _src_block_ptr->rows(), because the first column of _src_block_ptr
481
        // may not be filled after calling `get_next_block()`, so _src_block_ptr->rows() may return wrong result.
482
176k
        if (read_rows > 0) {
483
113k
            if ((!_cur_reader->count_read_rows()) && _io_ctx) {
484
18.0k
                _io_ctx->file_reader_stats->read_rows += read_rows;
485
18.0k
            }
486
            // If the push_down_agg_type is COUNT, no need to do the rest,
487
            // because we only save a number in block.
488
113k
            if (_get_push_down_agg_type() != TPushAggOp::type::COUNT) {
489
                // Convert the src block columns type to string in-place.
490
109k
                RETURN_IF_ERROR(_cast_to_input_block(block));
491
                // FileReader can fill partition and missing columns itself
492
109k
                if (!_cur_reader->fill_all_columns()) {
493
                    // Fill rows in src block with partition columns from path. (e.g. Hive partition columns)
494
16.9k
                    RETURN_IF_ERROR(_fill_columns_from_path(read_rows));
495
                    // Fill columns not exist in file with null or default value
496
16.9k
                    RETURN_IF_ERROR(_fill_missing_columns(read_rows));
497
16.9k
                }
498
                // Apply _pre_conjunct_ctxs to filter src block.
499
109k
                RETURN_IF_ERROR(_pre_filter_src_block());
500
501
                // Convert src block to output block (dest block), string to dest data type and apply filters.
502
109k
                RETURN_IF_ERROR(_convert_to_output_block(block));
503
                // Truncate char columns or varchar columns if size is smaller than file columns
504
                // or not found in the file column schema.
505
109k
                RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block));
506
109k
            }
507
113k
        }
508
176k
        break;
509
176k
    } while (true);
510
511
    // Update filtered rows and unselected rows for load, reset counter.
512
    // {
513
    //     state->update_num_rows_load_filtered(_counter.num_rows_filtered);
514
    //     state->update_num_rows_load_unselected(_counter.num_rows_unselected);
515
    //     _reset_counter();
516
    // }
517
176k
    return Status::OK();
518
263k
}
519
520
/**
521
 * Check whether there are complex types in parquet/orc reader in broker/stream load.
522
 * Broker/stream load will cast any type as string type, and complex types will be casted wrong.
523
 * This is a temporary method, and will be replaced by tvf.
524
 */
525
7.88k
Status FileScanner::_check_output_block_types() {
526
7.88k
    if (_is_load) {
527
7.88k
        TFileFormatType::type format_type = _params->format_type;
528
7.88k
        if (format_type == TFileFormatType::FORMAT_PARQUET ||
529
7.88k
            format_type == TFileFormatType::FORMAT_ORC) {
530
1.67k
            for (auto slot : _output_tuple_desc->slots()) {
531
1.67k
                if (is_complex_type(slot->type()->get_primitive_type())) {
532
0
                    return Status::InternalError(
533
0
                            "Parquet/orc doesn't support complex types in broker/stream load, "
534
0
                            "please use tvf(table value function) to insert complex types.");
535
0
                }
536
1.67k
            }
537
96
        }
538
7.88k
    }
539
7.88k
    return Status::OK();
540
7.88k
}
541
542
176k
Status FileScanner::_init_src_block(Block* block) {
543
176k
    if (!_is_load) {
544
168k
        _src_block_ptr = block;
545
546
168k
        bool update_name_to_idx = _src_block_name_to_idx.empty();
547
168k
        _iceberg_rowid_column_pos = -1;
548
168k
        if (_need_iceberg_rowid_column && _current_range.__isset.table_format_params &&
549
168k
            _current_range.table_format_params.table_format_type == "iceberg") {
550
230
            int row_id_idx = block->get_position_by_name(BeConsts::ICEBERG_ROWID_COL);
551
230
            if (row_id_idx >= 0) {
552
230
                _iceberg_rowid_column_pos = row_id_idx;
553
230
                if (!update_name_to_idx &&
554
230
                    !_src_block_name_to_idx.contains(BeConsts::ICEBERG_ROWID_COL)) {
555
0
                    update_name_to_idx = true;
556
0
                }
557
230
            }
558
230
        }
559
560
        // Build name to index map only once on first call
561
168k
        if (update_name_to_idx) {
562
50.4k
            _src_block_name_to_idx = block->get_name_to_pos_map();
563
50.4k
        }
564
168k
        return Status::OK();
565
168k
    }
566
7.87k
    RETURN_IF_ERROR(_check_output_block_types());
567
568
    // if (_src_block_init) {
569
    //     _src_block.clear_column_data();
570
    //     _src_block_ptr = &_src_block;
571
    //     return Status::OK();
572
    // }
573
574
7.87k
    _src_block.clear();
575
7.87k
    uint32_t idx = 0;
576
    // slots in _input_tuple_desc contains all slots describe in load statement, eg:
577
    // -H "columns: k1, k2, tmp1, k3 = tmp1 + 1"
578
    // _input_tuple_desc will contains: k1, k2, tmp1
579
    // and some of them are from file, such as k1 and k2, and some of them may not exist in file, such as tmp1
580
    // _input_tuple_desc also contains columns from path
581
88.8k
    for (auto& slot : _input_tuple_desc->slots()) {
582
88.8k
        DataTypePtr data_type;
583
88.8k
        auto it = _slot_lower_name_to_col_type.find(slot->col_name());
584
88.8k
        if (slot->is_skip_bitmap_col()) {
585
314
            _skip_bitmap_col_idx = idx;
586
314
        }
587
88.8k
        if (_params->__isset.sequence_map_col) {
588
1.25k
            if (_params->sequence_map_col == slot->col_name()) {
589
198
                _sequence_map_col_uid = slot->col_unique_id();
590
198
            }
591
1.25k
        }
592
88.8k
        data_type =
593
88.8k
                it == _slot_lower_name_to_col_type.end() ? slot->type() : make_nullable(it->second);
594
88.8k
        MutableColumnPtr data_column = data_type->create_column();
595
88.8k
        _src_block.insert(
596
88.8k
                ColumnWithTypeAndName(std::move(data_column), data_type, slot->col_name()));
597
88.8k
        _src_block_name_to_idx.emplace(slot->col_name(), idx++);
598
88.8k
    }
599
7.87k
    if (_params->__isset.sequence_map_col) {
600
1.76k
        for (const auto& slot : _output_tuple_desc->slots()) {
601
            // When the target table has seqeunce map column, _input_tuple_desc will not contains __DORIS_SEQUENCE_COL__,
602
            // so we should get its column unique id from _output_tuple_desc
603
1.76k
            if (slot->is_sequence_col()) {
604
206
                _sequence_col_uid = slot->col_unique_id();
605
206
            }
606
1.76k
        }
607
242
    }
608
7.87k
    _src_block_ptr = &_src_block;
609
7.87k
    _src_block_init = true;
610
7.87k
    return Status::OK();
611
7.87k
}
612
613
109k
Status FileScanner::_cast_to_input_block(Block* block) {
614
109k
    if (!_is_load) {
615
103k
        return Status::OK();
616
103k
    }
617
5.41k
    SCOPED_TIMER(_cast_to_input_block_timer);
618
    // cast primitive type(PT0) to primitive type(PT1)
619
5.41k
    uint32_t idx = 0;
620
66.4k
    for (auto& slot_desc : _input_tuple_desc->slots()) {
621
66.4k
        if (_slot_lower_name_to_col_type.find(slot_desc->col_name()) ==
622
66.4k
            _slot_lower_name_to_col_type.end()) {
623
            // skip columns which does not exist in file
624
456
            continue;
625
456
        }
626
65.9k
        auto& arg = _src_block_ptr->get_by_position(_src_block_name_to_idx[slot_desc->col_name()]);
627
65.9k
        auto return_type = slot_desc->get_data_type_ptr();
628
        // remove nullable here, let the get_function decide whether nullable
629
65.9k
        auto data_type = get_data_type_with_default_argument(remove_nullable(return_type));
630
65.9k
        ColumnsWithTypeAndName arguments {
631
65.9k
                arg, {data_type->create_column(), data_type, slot_desc->col_name()}};
632
65.9k
        auto func_cast =
633
65.9k
                SimpleFunctionFactory::instance().get_function("CAST", arguments, return_type, {});
634
65.9k
        if (!func_cast) {
635
0
            return Status::InternalError("Function CAST[arg={}, col name={}, return={}] not found!",
636
0
                                         arg.type->get_name(), slot_desc->col_name(),
637
0
                                         return_type->get_name());
638
0
        }
639
65.9k
        idx = _src_block_name_to_idx[slot_desc->col_name()];
640
65.9k
        DCHECK(_state != nullptr);
641
65.9k
        auto ctx = FunctionContext::create_context(_state, {}, {});
642
65.9k
        RETURN_IF_ERROR(
643
65.9k
                func_cast->execute(ctx.get(), *_src_block_ptr, {idx}, idx, arg.column->size()));
644
65.9k
        _src_block_ptr->get_by_position(idx).type = std::move(return_type);
645
65.9k
    }
646
5.41k
    return Status::OK();
647
5.41k
}
648
649
16.9k
Status FileScanner::_fill_columns_from_path(size_t rows) {
650
16.9k
    if (!_fill_partition_from_path) {
651
56
        return Status::OK();
652
56
    }
653
16.8k
    DataTypeSerDe::FormatOptions _text_formatOptions;
654
16.8k
    for (auto& kv : _partition_col_descs) {
655
1.61k
        auto doris_column =
656
1.61k
                _src_block_ptr->get_by_position(_src_block_name_to_idx[kv.first]).column;
657
        // _src_block_ptr points to a mutable block created by this class itself, so const_cast can be used here.
658
1.61k
        IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
659
1.61k
        auto& [value, slot_desc] = kv.second;
660
1.61k
        auto _text_serde = slot_desc->get_data_type_ptr()->get_serde();
661
1.61k
        Slice slice(value.data(), value.size());
662
1.61k
        uint64_t num_deserialized = 0;
663
1.61k
        if (_text_serde->deserialize_column_from_fixed_json(*col_ptr, slice, rows,
664
1.61k
                                                            &num_deserialized,
665
1.61k
                                                            _text_formatOptions) != Status::OK()) {
666
0
            return Status::InternalError("Failed to fill partition column: {}={}",
667
0
                                         slot_desc->col_name(), value);
668
0
        }
669
1.61k
        if (num_deserialized != rows) {
670
0
            return Status::InternalError(
671
0
                    "Failed to fill partition column: {}={} ."
672
0
                    "Number of rows expected to be written : {}, number of rows actually written : "
673
0
                    "{}",
674
0
                    slot_desc->col_name(), value, num_deserialized, rows);
675
0
        }
676
1.61k
    }
677
16.8k
    return Status::OK();
678
16.8k
}
679
680
16.9k
Status FileScanner::_fill_missing_columns(size_t rows) {
681
16.9k
    if (_missing_cols.empty()) {
682
16.9k
        return Status::OK();
683
16.9k
    }
684
685
0
    SCOPED_TIMER(_fill_missing_columns_timer);
686
0
    for (auto& kv : _missing_col_descs) {
687
0
        if (kv.second == nullptr) {
688
            // no default column, fill with null
689
0
            auto mutable_column = _src_block_ptr->get_by_position(_src_block_name_to_idx[kv.first])
690
0
                                          .column->assume_mutable();
691
0
            auto* nullable_column = static_cast<ColumnNullable*>(mutable_column.get());
692
0
            nullable_column->insert_many_defaults(rows);
693
0
        } else {
694
            // fill with default value
695
0
            auto& ctx = kv.second;
696
0
            ColumnPtr result_column_ptr;
697
            // PT1 => dest primitive type
698
0
            RETURN_IF_ERROR(ctx->execute(_src_block_ptr, result_column_ptr));
699
0
            if (result_column_ptr->use_count() == 1) {
700
                // call resize because the first column of _src_block_ptr may not be filled by reader,
701
                // so _src_block_ptr->rows() may return wrong result, cause the column created by `ctx->execute()`
702
                // has only one row.
703
0
                auto mutable_column = result_column_ptr->assume_mutable();
704
0
                mutable_column->resize(rows);
705
                // result_column_ptr maybe a ColumnConst, convert it to a normal column
706
0
                result_column_ptr = result_column_ptr->convert_to_full_column_if_const();
707
0
                auto origin_column_type =
708
0
                        _src_block_ptr->get_by_position(_src_block_name_to_idx[kv.first]).type;
709
0
                bool is_nullable = origin_column_type->is_nullable();
710
0
                if (!_src_block_name_to_idx.contains(kv.first)) {
711
0
                    return Status::InternalError("Column {} not found in src block {}", kv.first,
712
0
                                                 _src_block_ptr->dump_structure());
713
0
                }
714
0
                _src_block_ptr->replace_by_position(
715
0
                        _src_block_name_to_idx[kv.first],
716
0
                        is_nullable ? make_nullable(result_column_ptr) : result_column_ptr);
717
0
            }
718
0
        }
719
0
    }
720
0
    return Status::OK();
721
0
}
722
723
109k
Status FileScanner::_pre_filter_src_block() {
724
109k
    if (!_is_load) {
725
103k
        return Status::OK();
726
103k
    }
727
5.41k
    if (!_pre_conjunct_ctxs.empty()) {
728
28
        SCOPED_TIMER(_pre_filter_timer);
729
28
        auto origin_column_num = _src_block_ptr->columns();
730
28
        auto old_rows = _src_block_ptr->rows();
731
28
        RETURN_IF_ERROR(
732
28
                VExprContext::filter_block(_pre_conjunct_ctxs, _src_block_ptr, origin_column_num));
733
28
        _counter.num_rows_unselected += old_rows - _src_block_ptr->rows();
734
28
    }
735
5.41k
    return Status::OK();
736
5.41k
}
737
738
109k
Status FileScanner::_convert_to_output_block(Block* block) {
739
109k
    if (!_is_load) {
740
103k
        return Status::OK();
741
103k
    }
742
5.41k
    SCOPED_TIMER(_convert_to_output_block_timer);
743
    // The block is passed from scanner context's free blocks,
744
    // which is initialized by output columns
745
    // so no need to clear it
746
    // block->clear();
747
748
5.41k
    int ctx_idx = 0;
749
5.41k
    size_t rows = _src_block_ptr->rows();
750
5.41k
    auto filter_column = ColumnUInt8::create(rows, 1);
751
5.41k
    auto& filter_map = filter_column->get_data();
752
753
    // After convert, the column_ptr should be copied into output block.
754
    // Can not use block->insert() because it may cause use_count() non-zero bug
755
5.41k
    MutableBlock mutable_output_block =
756
5.41k
            VectorizedUtils::build_mutable_mem_reuse_block(block, *_dest_row_desc);
757
5.41k
    auto& mutable_output_columns = mutable_output_block.mutable_columns();
758
759
5.41k
    std::vector<BitmapValue>* skip_bitmaps {nullptr};
760
5.41k
    if (_should_process_skip_bitmap_col()) {
761
157
        auto* skip_bitmap_nullable_col_ptr =
762
157
                assert_cast<ColumnNullable*>(_src_block_ptr->get_by_position(_skip_bitmap_col_idx)
763
157
                                                     .column->assume_mutable()
764
157
                                                     .get());
765
157
        skip_bitmaps = &(assert_cast<ColumnBitmap*>(
766
157
                                 skip_bitmap_nullable_col_ptr->get_nested_column_ptr().get())
767
157
                                 ->get_data());
768
        // NOTE:
769
        // - If the table has sequence type column, __DORIS_SEQUENCE_COL__ will be put in _input_tuple_desc, so whether
770
        //   __DORIS_SEQUENCE_COL__ will be marked in skip bitmap depends on whether it's specified in that row
771
        // - If the table has sequence map column, __DORIS_SEQUENCE_COL__ will not be put in _input_tuple_desc,
772
        //   so __DORIS_SEQUENCE_COL__ will be ommited if it't specified in a row and will not be marked in skip bitmap.
773
        //   So we should mark __DORIS_SEQUENCE_COL__ in skip bitmap here if the corresponding sequence map column us marked
774
157
        if (_sequence_map_col_uid != -1) {
775
347
            for (int j = 0; j < rows; ++j) {
776
328
                if ((*skip_bitmaps)[j].contains(_sequence_map_col_uid)) {
777
137
                    (*skip_bitmaps)[j].add(_sequence_col_uid);
778
137
                }
779
328
            }
780
19
        }
781
157
    }
782
783
    // for (auto slot_desc : _output_tuple_desc->slots()) {
784
77.8k
    for (int j = 0; j < mutable_output_columns.size(); ++j) {
785
72.4k
        auto* slot_desc = _output_tuple_desc->slots()[j];
786
72.4k
        int dest_index = ctx_idx;
787
72.4k
        ColumnPtr column_ptr;
788
789
72.4k
        auto& ctx = _dest_vexpr_ctx[dest_index];
790
        // PT1 => dest primitive type
791
72.4k
        RETURN_IF_ERROR(ctx->execute(_src_block_ptr, column_ptr));
792
        // column_ptr maybe a ColumnConst, convert it to a normal column
793
72.4k
        column_ptr = column_ptr->convert_to_full_column_if_const();
794
72.4k
        DCHECK(column_ptr);
795
796
        // because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr
797
        // is likely to be nullable
798
72.4k
        if (LIKELY(column_ptr->is_nullable())) {
799
67.3k
            const auto* nullable_column = reinterpret_cast<const ColumnNullable*>(column_ptr.get());
800
177M
            for (int i = 0; i < rows; ++i) {
801
177M
                if (filter_map[i] && nullable_column->is_null_at(i)) {
802
                    // skip checks for non-mentioned columns in flexible partial update
803
2.01M
                    if (skip_bitmaps == nullptr ||
804
2.01M
                        !skip_bitmaps->at(i).contains(slot_desc->col_unique_id())) {
805
                        // clang-format off
806
1.96M
                        if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) &&
807
1.96M
                            !_src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index]).column->is_null_at(i)) {
808
5.48k
                            filter_map[i] = false;
809
5.48k
                            RETURN_IF_ERROR(_state->append_error_msg_to_file(
810
5.48k
                                [&]() -> std::string {
811
5.48k
                                    return _src_block_ptr->dump_one_line(i, _num_of_columns_from_file);
812
5.48k
                                },
813
5.48k
                                [&]() -> std::string {
814
5.48k
                                    auto raw_value =
815
5.48k
                                            _src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index]).column->get_data_at(i);
816
5.48k
                                    std::string raw_string = raw_value.to_string();
817
5.48k
                                    fmt::memory_buffer error_msg;
818
5.48k
                                    fmt::format_to(error_msg,"column({}) value is incorrect while strict mode is {}, src value is {}",
819
5.48k
                                            slot_desc->col_name(), _strict_mode, raw_string);
820
5.48k
                                    return fmt::to_string(error_msg);
821
5.48k
                                }));
822
1.95M
                        } else if (!slot_desc->is_nullable()) {
823
648
                            filter_map[i] = false;
824
648
                            RETURN_IF_ERROR(_state->append_error_msg_to_file(
825
648
                                [&]() -> std::string {
826
648
                                    return _src_block_ptr->dump_one_line(i, _num_of_columns_from_file);
827
648
                                },
828
648
                                [&]() -> std::string {
829
648
                                    fmt::memory_buffer error_msg;
830
648
                                    fmt::format_to(error_msg, "column({}) values is null while columns is not nullable", slot_desc->col_name());
831
648
                                    return fmt::to_string(error_msg);
832
648
                                }));
833
648
                        }
834
                        // clang-format on
835
1.96M
                    }
836
2.01M
                }
837
177M
            }
838
67.3k
            if (!slot_desc->is_nullable()) {
839
42.2k
                column_ptr = remove_nullable(column_ptr);
840
42.2k
            }
841
67.3k
        } else if (slot_desc->is_nullable()) {
842
115
            column_ptr = make_nullable(column_ptr);
843
115
        }
844
72.4k
        mutable_output_columns[j]->insert_range_from(*column_ptr, 0, rows);
845
72.4k
        ctx_idx++;
846
72.4k
    }
847
848
    // after do the dest block insert operation, clear _src_block to remove the reference of origin column
849
5.41k
    _src_block_ptr->clear();
850
851
5.41k
    size_t dest_size = block->columns();
852
    // do filter
853
5.41k
    block->insert(ColumnWithTypeAndName(std::move(filter_column), std::make_shared<DataTypeUInt8>(),
854
5.41k
                                        "filter column"));
855
5.41k
    RETURN_IF_ERROR(Block::filter_block(block, dest_size, dest_size));
856
857
5.41k
    _counter.num_rows_filtered += rows - block->rows();
858
5.41k
    return Status::OK();
859
5.41k
}
860
861
109k
Status FileScanner::_truncate_char_or_varchar_columns(Block* block) {
862
    // Truncate char columns or varchar columns if size is smaller than file columns
863
    // or not found in the file column schema.
864
109k
    if (!_state->query_options().truncate_char_or_varchar_columns) {
865
109k
        return Status::OK();
866
109k
    }
867
12
    int idx = 0;
868
36
    for (auto* slot_desc : _real_tuple_desc->slots()) {
869
36
        const auto& type = slot_desc->type();
870
36
        if (type->get_primitive_type() != TYPE_VARCHAR && type->get_primitive_type() != TYPE_CHAR) {
871
12
            ++idx;
872
12
            continue;
873
12
        }
874
24
        auto iter = _source_file_col_name_types.find(slot_desc->col_name());
875
24
        if (iter != _source_file_col_name_types.end()) {
876
16
            const auto file_type_desc = _source_file_col_name_types[slot_desc->col_name()];
877
16
            int l = -1;
878
16
            if (auto* ftype = check_and_get_data_type<DataTypeString>(
879
16
                        remove_nullable(file_type_desc).get())) {
880
16
                l = ftype->len();
881
16
            }
882
16
            if ((assert_cast<const DataTypeString*>(remove_nullable(type).get())->len() > 0) &&
883
16
                (assert_cast<const DataTypeString*>(remove_nullable(type).get())->len() < l ||
884
16
                 l < 0)) {
885
16
                _truncate_char_or_varchar_column(
886
16
                        block, idx,
887
16
                        assert_cast<const DataTypeString*>(remove_nullable(type).get())->len());
888
16
            }
889
16
        } else {
890
8
            _truncate_char_or_varchar_column(
891
8
                    block, idx,
892
8
                    assert_cast<const DataTypeString*>(remove_nullable(type).get())->len());
893
8
        }
894
24
        ++idx;
895
24
    }
896
12
    return Status::OK();
897
109k
}
898
899
// VARCHAR substring(VARCHAR str, INT pos[, INT len])
900
24
void FileScanner::_truncate_char_or_varchar_column(Block* block, int idx, int len) {
901
24
    auto int_type = std::make_shared<DataTypeInt32>();
902
24
    uint32_t num_columns_without_result = block->columns();
903
24
    const ColumnNullable* col_nullable =
904
24
            assert_cast<const ColumnNullable*>(block->get_by_position(idx).column.get());
905
24
    const ColumnPtr& string_column_ptr = col_nullable->get_nested_column_ptr();
906
24
    ColumnPtr null_map_column_ptr = col_nullable->get_null_map_column_ptr();
907
24
    block->replace_by_position(idx, std::move(string_column_ptr));
908
24
    block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(1)), int_type,
909
24
                   "const 1"}); // pos is 1
910
24
    block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(len)), int_type,
911
24
                   fmt::format("const {}", len)});                          // len
912
24
    block->insert({nullptr, std::make_shared<DataTypeString>(), "result"}); // result column
913
24
    ColumnNumbers temp_arguments(3);
914
24
    temp_arguments[0] = idx;                            // str column
915
24
    temp_arguments[1] = num_columns_without_result;     // pos
916
24
    temp_arguments[2] = num_columns_without_result + 1; // len
917
24
    uint32_t result_column_id = num_columns_without_result + 2;
918
919
24
    SubstringUtil::substring_execute(*block, temp_arguments, result_column_id, block->rows());
920
24
    auto res = ColumnNullable::create(block->get_by_position(result_column_id).column,
921
24
                                      null_map_column_ptr);
922
24
    block->replace_by_position(idx, std::move(res));
923
24
    Block::erase_useless_column(block, num_columns_without_result);
924
24
}
925
926
7.54k
Status FileScanner::_create_row_id_column_iterator() {
927
7.54k
    auto& id_file_map = _state->get_id_file_map();
928
7.54k
    auto file_id = id_file_map->get_file_mapping_id(
929
7.54k
            std::make_shared<FileMapping>(((FileScanLocalState*)_local_state)->parent_id(),
930
7.54k
                                          _current_range, _should_enable_file_meta_cache()));
931
7.54k
    _row_id_column_iterator_pair.first = std::make_shared<RowIdColumnIteratorV2>(
932
7.54k
            IdManager::ID_VERSION, BackendOptions::get_backend_id(), file_id);
933
7.54k
    return Status::OK();
934
7.54k
}
935
936
157k
Status FileScanner::_get_next_reader() {
937
157k
    while (true) {
938
157k
        if (_cur_reader) {
939
70.7k
            _cur_reader->collect_profile_before_close();
940
70.7k
            RETURN_IF_ERROR(_cur_reader->close());
941
70.7k
            _state->update_num_finished_scan_range(1);
942
70.7k
        }
943
157k
        _cur_reader.reset(nullptr);
944
157k
        _src_block_init = false;
945
157k
        bool has_next = _first_scan_range;
946
157k
        if (!_first_scan_range) {
947
107k
            RETURN_IF_ERROR(_split_source->get_next(&has_next, &_current_range));
948
107k
        }
949
157k
        _first_scan_range = false;
950
157k
        if (!has_next || _should_stop) {
951
86.2k
            _scanner_eof = true;
952
86.2k
            return Status::OK();
953
86.2k
        }
954
955
71.7k
        const TFileRangeDesc& range = _current_range;
956
71.7k
        _current_range_path = range.path;
957
958
71.7k
        if (!_partition_slot_descs.empty()) {
959
            // we need get partition columns first for runtime filter partition pruning
960
7.35k
            RETURN_IF_ERROR(_generate_partition_columns());
961
962
7.35k
            if (_state->query_options().enable_runtime_filter_partition_prune) {
963
                // if enable_runtime_filter_partition_prune is true, we need to check whether this range can be filtered out
964
                // by runtime filter partition prune
965
7.34k
                if (_push_down_conjuncts.size() < _conjuncts.size()) {
966
                    // there are new runtime filters, need to re-init runtime filter partition pruning ctxs
967
3.68k
                    _init_runtime_filter_partition_prune_ctxs();
968
3.68k
                }
969
970
7.34k
                bool can_filter_all = false;
971
7.34k
                RETURN_IF_ERROR(_process_runtime_filters_partition_prune(can_filter_all));
972
7.34k
                if (can_filter_all) {
973
                    // this range can be filtered out by runtime filter partition pruning
974
                    // so we need to skip this range
975
454
                    COUNTER_UPDATE(_runtime_filter_partition_pruned_range_counter, 1);
976
454
                    continue;
977
454
                }
978
7.34k
            }
979
7.35k
        }
980
981
        // create reader for specific format
982
71.2k
        Status init_status = Status::OK();
983
71.2k
        TFileFormatType::type format_type = _get_current_format_type();
984
        // for compatibility, this logic is deprecated in 3.1
985
71.2k
        if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params) {
986
5.68k
            if (range.table_format_params.table_format_type == "paimon" &&
987
5.68k
                !range.table_format_params.paimon_params.__isset.paimon_split) {
988
                // use native reader
989
0
                auto format = range.table_format_params.paimon_params.file_format;
990
0
                if (format == "orc") {
991
0
                    format_type = TFileFormatType::FORMAT_ORC;
992
0
                } else if (format == "parquet") {
993
0
                    format_type = TFileFormatType::FORMAT_PARQUET;
994
0
                } else {
995
0
                    return Status::InternalError("Not supported paimon file format: {}", format);
996
0
                }
997
0
            }
998
5.68k
        }
999
1000
        // JNI reader can only push down column value range
1001
71.2k
        bool push_down_predicates = !_is_load && format_type != TFileFormatType::FORMAT_JNI;
1002
71.2k
        bool need_to_get_parsed_schema = false;
1003
71.2k
        switch (format_type) {
1004
5.68k
        case TFileFormatType::FORMAT_JNI: {
1005
5.68k
            if (range.__isset.table_format_params &&
1006
5.68k
                range.table_format_params.table_format_type == "max_compute") {
1007
0
                const auto* mc_desc = static_cast<const MaxComputeTableDescriptor*>(
1008
0
                        _real_tuple_desc->table_desc());
1009
0
                if (!mc_desc->init_status()) {
1010
0
                    return mc_desc->init_status();
1011
0
                }
1012
0
                std::unique_ptr<MaxComputeJniReader> mc_reader = MaxComputeJniReader::create_unique(
1013
0
                        mc_desc, range.table_format_params.max_compute_params, _file_slot_descs,
1014
0
                        range, _state, _profile);
1015
0
                init_status = mc_reader->init_reader();
1016
0
                _cur_reader = std::move(mc_reader);
1017
5.68k
            } else if (range.__isset.table_format_params &&
1018
5.68k
                       range.table_format_params.table_format_type == "paimon") {
1019
1.91k
                if (_state->query_options().__isset.enable_paimon_cpp_reader &&
1020
1.91k
                    _state->query_options().enable_paimon_cpp_reader) {
1021
0
                    auto cpp_reader = PaimonCppReader::create_unique(_file_slot_descs, _state,
1022
0
                                                                     _profile, range, _params);
1023
0
                    cpp_reader->set_push_down_agg_type(_get_push_down_agg_type());
1024
0
                    if (!_is_load && !_push_down_conjuncts.empty()) {
1025
0
                        PaimonPredicateConverter predicate_converter(_file_slot_descs, _state);
1026
0
                        auto predicate = predicate_converter.build(_push_down_conjuncts);
1027
0
                        if (predicate) {
1028
0
                            cpp_reader->set_predicate(std::move(predicate));
1029
0
                        }
1030
0
                    }
1031
0
                    init_status = cpp_reader->init_reader();
1032
0
                    _cur_reader = std::move(cpp_reader);
1033
1.91k
                } else {
1034
1.91k
                    _cur_reader = PaimonJniReader::create_unique(_file_slot_descs, _state, _profile,
1035
1.91k
                                                                 range, _params);
1036
1.91k
                    init_status = ((PaimonJniReader*)(_cur_reader.get()))->init_reader();
1037
1.91k
                }
1038
3.76k
            } else if (range.__isset.table_format_params &&
1039
3.76k
                       range.table_format_params.table_format_type == "hudi") {
1040
0
                _cur_reader = HudiJniReader::create_unique(*_params,
1041
0
                                                           range.table_format_params.hudi_params,
1042
0
                                                           _file_slot_descs, _state, _profile);
1043
0
                init_status = ((HudiJniReader*)_cur_reader.get())->init_reader();
1044
1045
3.76k
            } else if (range.__isset.table_format_params &&
1046
3.76k
                       range.table_format_params.table_format_type == "trino_connector") {
1047
790
                _cur_reader = TrinoConnectorJniReader::create_unique(_file_slot_descs, _state,
1048
790
                                                                     _profile, range);
1049
790
                init_status = ((TrinoConnectorJniReader*)(_cur_reader.get()))->init_reader();
1050
2.97k
            } else if (range.__isset.table_format_params &&
1051
2.97k
                       range.table_format_params.table_format_type == "jdbc") {
1052
                // Extract jdbc params from table_format_params
1053
1.73k
                std::map<std::string, std::string> jdbc_params(
1054
1.73k
                        range.table_format_params.jdbc_params.begin(),
1055
1.73k
                        range.table_format_params.jdbc_params.end());
1056
1.73k
                _cur_reader = JdbcJniReader::create_unique(_file_slot_descs, _state, _profile,
1057
1.73k
                                                           jdbc_params);
1058
1.73k
                init_status = ((JdbcJniReader*)(_cur_reader.get()))->init_reader();
1059
1.73k
            } else if (range.__isset.table_format_params &&
1060
1.24k
                       range.table_format_params.table_format_type == "iceberg") {
1061
1.24k
                _cur_reader = IcebergSysTableJniReader::create_unique(_file_slot_descs, _state,
1062
1.24k
                                                                      _profile, range, _params);
1063
1.24k
                init_status = ((IcebergSysTableJniReader*)(_cur_reader.get()))->init_reader();
1064
1.24k
            }
1065
            // Set col_name_to_block_idx for JNI readers to avoid repeated map creation
1066
5.68k
            if (_cur_reader) {
1067
5.67k
                if (auto* jni_reader = dynamic_cast<JniReader*>(_cur_reader.get())) {
1068
5.67k
                    jni_reader->set_col_name_to_block_idx(&_src_block_name_to_idx);
1069
5.67k
                }
1070
5.67k
            }
1071
5.68k
            break;
1072
5.68k
        }
1073
32.7k
        case TFileFormatType::FORMAT_PARQUET: {
1074
32.7k
            auto file_meta_cache_ptr = _should_enable_file_meta_cache()
1075
32.7k
                                               ? ExecEnv::GetInstance()->file_meta_cache()
1076
32.7k
                                               : nullptr;
1077
32.7k
            std::unique_ptr<ParquetReader> parquet_reader = ParquetReader::create_unique(
1078
32.7k
                    _profile, *_params, range, _state->query_options().batch_size,
1079
32.7k
                    &_state->timezone_obj(), _io_ctx.get(), _state, file_meta_cache_ptr,
1080
32.7k
                    _state->query_options().enable_parquet_lazy_mat);
1081
1082
32.7k
            if (_row_id_column_iterator_pair.second != -1) {
1083
2.14k
                RETURN_IF_ERROR(_create_row_id_column_iterator());
1084
2.14k
                parquet_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
1085
2.14k
            }
1086
1087
            // ATTN: the push down agg type may be set back to NONE,
1088
            // see IcebergTableReader::init_row_filters for example.
1089
32.7k
            parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
1090
32.7k
            if (push_down_predicates) {
1091
32.6k
                RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1092
32.6k
            }
1093
32.7k
            RETURN_IF_ERROR(_init_parquet_reader(std::move(parquet_reader), file_meta_cache_ptr));
1094
1095
32.7k
            need_to_get_parsed_schema = true;
1096
32.7k
            break;
1097
32.7k
        }
1098
24.6k
        case TFileFormatType::FORMAT_ORC: {
1099
24.6k
            auto file_meta_cache_ptr = _should_enable_file_meta_cache()
1100
24.6k
                                               ? ExecEnv::GetInstance()->file_meta_cache()
1101
24.6k
                                               : nullptr;
1102
24.6k
            std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
1103
24.6k
                    _profile, _state, *_params, range, _state->query_options().batch_size,
1104
24.6k
                    _state->timezone(), _io_ctx.get(), file_meta_cache_ptr,
1105
24.6k
                    _state->query_options().enable_orc_lazy_mat);
1106
24.6k
            if (_row_id_column_iterator_pair.second != -1) {
1107
5.40k
                RETURN_IF_ERROR(_create_row_id_column_iterator());
1108
5.40k
                orc_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
1109
5.40k
            }
1110
1111
24.6k
            orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
1112
24.6k
            if (push_down_predicates) {
1113
24.5k
                RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1114
24.5k
            }
1115
24.6k
            RETURN_IF_ERROR(_init_orc_reader(std::move(orc_reader), file_meta_cache_ptr));
1116
1117
24.6k
            need_to_get_parsed_schema = true;
1118
24.6k
            break;
1119
24.6k
        }
1120
2.69k
        case TFileFormatType::FORMAT_CSV_PLAIN:
1121
2.69k
        case TFileFormatType::FORMAT_CSV_GZ:
1122
2.69k
        case TFileFormatType::FORMAT_CSV_BZ2:
1123
2.69k
        case TFileFormatType::FORMAT_CSV_LZ4FRAME:
1124
2.69k
        case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
1125
2.69k
        case TFileFormatType::FORMAT_CSV_LZOP:
1126
2.69k
        case TFileFormatType::FORMAT_CSV_DEFLATE:
1127
2.69k
        case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
1128
2.77k
        case TFileFormatType::FORMAT_PROTO: {
1129
2.77k
            auto reader = CsvReader::create_unique(_state, _profile, &_counter, *_params, range,
1130
2.77k
                                                   _file_slot_descs, _io_ctx.get());
1131
1132
2.77k
            init_status = reader->init_reader(_is_load);
1133
2.77k
            _cur_reader = std::move(reader);
1134
2.77k
            break;
1135
2.69k
        }
1136
4.63k
        case TFileFormatType::FORMAT_TEXT: {
1137
4.63k
            auto reader = TextReader::create_unique(_state, _profile, &_counter, *_params, range,
1138
4.63k
                                                    _file_slot_descs, _io_ctx.get());
1139
4.63k
            init_status = reader->init_reader(_is_load);
1140
4.63k
            _cur_reader = std::move(reader);
1141
4.63k
            break;
1142
2.69k
        }
1143
759
        case TFileFormatType::FORMAT_JSON: {
1144
759
            _cur_reader =
1145
759
                    NewJsonReader::create_unique(_state, _profile, &_counter, *_params, range,
1146
759
                                                 _file_slot_descs, &_scanner_eof, _io_ctx.get());
1147
759
            init_status = ((NewJsonReader*)(_cur_reader.get()))
1148
759
                                  ->init_reader(_col_default_value_ctx, _is_load);
1149
759
            break;
1150
2.69k
        }
1151
1152
0
        case TFileFormatType::FORMAT_WAL: {
1153
0
            _cur_reader = WalReader::create_unique(_state);
1154
0
            init_status = ((WalReader*)(_cur_reader.get()))->init_reader(_output_tuple_desc);
1155
0
            break;
1156
2.69k
        }
1157
3
        case TFileFormatType::FORMAT_NATIVE: {
1158
3
            auto reader =
1159
3
                    NativeReader::create_unique(_profile, *_params, range, _io_ctx.get(), _state);
1160
3
            init_status = reader->init_reader();
1161
3
            _cur_reader = std::move(reader);
1162
3
            need_to_get_parsed_schema = false;
1163
3
            break;
1164
2.69k
        }
1165
108
        case TFileFormatType::FORMAT_ARROW: {
1166
108
            if (range.__isset.table_format_params &&
1167
108
                range.table_format_params.table_format_type == "remote_doris") {
1168
98
                _cur_reader =
1169
98
                        RemoteDorisReader::create_unique(_file_slot_descs, _state, _profile, range);
1170
98
                init_status = ((RemoteDorisReader*)(_cur_reader.get()))->init_reader();
1171
98
                if (_cur_reader) {
1172
98
                    static_cast<RemoteDorisReader*>(_cur_reader.get())
1173
98
                            ->set_col_name_to_block_idx(&_src_block_name_to_idx);
1174
98
                }
1175
98
            } else {
1176
10
                _cur_reader =
1177
10
                        ArrowStreamReader::create_unique(_state, _profile, &_counter, *_params,
1178
10
                                                         range, _file_slot_descs, _io_ctx.get());
1179
10
                init_status = ((ArrowStreamReader*)(_cur_reader.get()))->init_reader();
1180
10
            }
1181
108
            break;
1182
2.69k
        }
1183
#ifdef BUILD_RUST_READERS
1184
        case TFileFormatType::FORMAT_LANCE: {
1185
            auto lance_reader = LanceRustReader::create_unique(_file_slot_descs, _state, _profile,
1186
                                                               range, _params);
1187
            init_status = lance_reader->init_reader();
1188
            _cur_reader = std::move(lance_reader);
1189
            need_to_get_parsed_schema = true;
1190
            break;
1191
        }
1192
#endif
1193
0
        default:
1194
0
            return Status::NotSupported("Not supported create reader for file format: {}.",
1195
0
                                        to_string(_params->format_type));
1196
71.2k
        }
1197
1198
71.1k
        if (_cur_reader == nullptr) {
1199
1
            return Status::NotSupported(
1200
1
                    "Not supported create reader for table format: {} / file format: {}.",
1201
1
                    range.__isset.table_format_params ? range.table_format_params.table_format_type
1202
1
                                                      : "NotSet",
1203
1
                    to_string(_params->format_type));
1204
1
        }
1205
71.1k
        COUNTER_UPDATE(_file_counter, 1);
1206
        // The FileScanner for external table may try to open not exist files,
1207
        // Because FE file cache for external table may out of date.
1208
        // So, NOT_FOUND for FileScanner is not a fail case.
1209
        // Will remove this after file reader refactor.
1210
71.1k
        if (init_status.is<END_OF_FILE>()) {
1211
0
            COUNTER_UPDATE(_empty_file_counter, 1);
1212
0
            continue;
1213
71.1k
        } else if (init_status.is<ErrorCode::NOT_FOUND>()) {
1214
0
            if (config::ignore_not_found_file_in_external_table) {
1215
0
                COUNTER_UPDATE(_not_found_file_counter, 1);
1216
0
                continue;
1217
0
            }
1218
0
            return Status::InternalError("failed to find reader, err: {}", init_status.to_string());
1219
71.1k
        } else if (!init_status.ok()) {
1220
1
            return Status::InternalError("failed to init reader, err: {}", init_status.to_string());
1221
1
        }
1222
1223
71.1k
        _cur_reader->set_push_down_agg_type(_get_push_down_agg_type());
1224
71.1k
        if (_get_push_down_agg_type() == TPushAggOp::type::COUNT &&
1225
71.1k
            range.__isset.table_format_params &&
1226
71.1k
            range.table_format_params.table_level_row_count >= 0) {
1227
            // This is a table level count push down operation, no need to call
1228
            // _set_fill_or_truncate_columns.
1229
            // in _set_fill_or_truncate_columns, we will use [range.start_offset, end offset]
1230
            // to filter the row group. But if this is count push down, the offset is undefined,
1231
            // causing incorrect row group filter and may return empty result.
1232
70.9k
        } else {
1233
70.9k
            Status status = _set_fill_or_truncate_columns(need_to_get_parsed_schema);
1234
70.9k
            if (status.is<END_OF_FILE>()) { // all parquet row groups are filtered
1235
0
                continue;
1236
70.9k
            } else if (!status.ok()) {
1237
0
                return Status::InternalError("failed to set_fill_or_truncate_columns, err: {}",
1238
0
                                             status.to_string());
1239
0
            }
1240
70.9k
        }
1241
71.1k
        _cur_reader_eof = false;
1242
71.1k
        break;
1243
71.1k
    }
1244
71.1k
    return Status::OK();
1245
157k
}
1246
1247
Status FileScanner::_init_parquet_reader(std::unique_ptr<ParquetReader>&& parquet_reader,
1248
34.0k
                                         FileMetaCache* file_meta_cache_ptr) {
1249
34.0k
    const TFileRangeDesc& range = _current_range;
1250
34.0k
    Status init_status = Status::OK();
1251
1252
34.0k
    phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> slot_id_to_predicates =
1253
34.0k
            _local_state
1254
34.0k
                    ? _local_state->cast<FileScanLocalState>()._slot_id_to_predicates
1255
34.0k
                    : phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> {};
1256
34.0k
    if (range.__isset.table_format_params &&
1257
34.0k
        range.table_format_params.table_format_type == "iceberg") {
1258
18.0k
        std::unique_ptr<IcebergParquetReader> iceberg_reader = IcebergParquetReader::create_unique(
1259
18.0k
                std::move(parquet_reader), _profile, _state, *_params, range, _kv_cache,
1260
18.0k
                _io_ctx.get(), file_meta_cache_ptr);
1261
18.0k
        if (_need_iceberg_rowid_column) {
1262
78
            iceberg_reader->set_need_row_id_column(true);
1263
78
        }
1264
18.0k
        if (_row_lineage_columns.row_id_column_idx != -1 ||
1265
18.0k
            _row_lineage_columns.last_updated_sequence_number_column_idx != -1) {
1266
236
            std::shared_ptr<RowLineageColumns> row_lineage_columns;
1267
236
            row_lineage_columns = std::make_shared<RowLineageColumns>();
1268
236
            row_lineage_columns->row_id_column_idx = _row_lineage_columns.row_id_column_idx;
1269
236
            row_lineage_columns->last_updated_sequence_number_column_idx =
1270
236
                    _row_lineage_columns.last_updated_sequence_number_column_idx;
1271
236
            iceberg_reader->set_row_lineage_columns(std::move(row_lineage_columns));
1272
236
        }
1273
18.0k
        iceberg_reader->set_push_down_agg_type(_get_push_down_agg_type());
1274
1275
18.0k
        init_status = iceberg_reader->init_reader(
1276
18.0k
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1277
18.0k
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1278
18.0k
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1279
18.0k
                &_slot_id_to_filter_conjuncts);
1280
18.0k
        _cur_reader = std::move(iceberg_reader);
1281
18.0k
    } else if (range.__isset.table_format_params &&
1282
16.0k
               range.table_format_params.table_format_type == "paimon") {
1283
2.48k
        std::unique_ptr<PaimonParquetReader> paimon_reader = PaimonParquetReader::create_unique(
1284
2.48k
                std::move(parquet_reader), _profile, _state, *_params, range, _kv_cache,
1285
2.48k
                _io_ctx.get(), file_meta_cache_ptr);
1286
2.48k
        init_status = paimon_reader->init_reader(
1287
2.48k
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1288
2.48k
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1289
2.48k
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1290
2.48k
                &_slot_id_to_filter_conjuncts);
1291
2.48k
        RETURN_IF_ERROR(paimon_reader->init_row_filters());
1292
2.48k
        _cur_reader = std::move(paimon_reader);
1293
13.5k
    } else if (range.__isset.table_format_params &&
1294
13.5k
               range.table_format_params.table_format_type == "hudi") {
1295
0
        std::unique_ptr<HudiParquetReader> hudi_reader = HudiParquetReader::create_unique(
1296
0
                std::move(parquet_reader), _profile, _state, *_params, range, _io_ctx.get(),
1297
0
                file_meta_cache_ptr);
1298
0
        init_status = hudi_reader->init_reader(
1299
0
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1300
0
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1301
0
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1302
0
                &_slot_id_to_filter_conjuncts);
1303
0
        _cur_reader = std::move(hudi_reader);
1304
13.5k
    } else if (range.table_format_params.table_format_type == "hive") {
1305
10.4k
        auto hive_reader = HiveParquetReader::create_unique(std::move(parquet_reader), _profile,
1306
10.4k
                                                            _state, *_params, range, _io_ctx.get(),
1307
10.4k
                                                            &_is_file_slot, file_meta_cache_ptr);
1308
10.4k
        init_status = hive_reader->init_reader(
1309
10.4k
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1310
10.4k
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1311
10.4k
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1312
10.4k
                &_slot_id_to_filter_conjuncts);
1313
10.4k
        _cur_reader = std::move(hive_reader);
1314
10.4k
    } else if (range.table_format_params.table_format_type == "tvf") {
1315
3.04k
        const FieldDescriptor* parquet_meta = nullptr;
1316
3.04k
        RETURN_IF_ERROR(parquet_reader->get_file_metadata_schema(&parquet_meta));
1317
3.04k
        DCHECK(parquet_meta != nullptr);
1318
1319
        // TVF will first `get_parsed_schema` to obtain file information from BE, and FE will convert
1320
        // the column names to lowercase (because the query process is case-insensitive),
1321
        // so the lowercase file column names are used here to match the read columns.
1322
3.04k
        std::shared_ptr<TableSchemaChangeHelper::Node> tvf_info_node = nullptr;
1323
3.04k
        RETURN_IF_ERROR(TableSchemaChangeHelper::BuildTableInfoUtil::by_parquet_name(
1324
3.04k
                _real_tuple_desc, *parquet_meta, tvf_info_node));
1325
3.04k
        init_status = parquet_reader->init_reader(
1326
3.04k
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1327
3.04k
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1328
3.04k
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1329
3.04k
                &_slot_id_to_filter_conjuncts, tvf_info_node);
1330
3.04k
        _cur_reader = std::move(parquet_reader);
1331
3.04k
    } else if (_is_load) {
1332
22
        const FieldDescriptor* parquet_meta = nullptr;
1333
22
        RETURN_IF_ERROR(parquet_reader->get_file_metadata_schema(&parquet_meta));
1334
22
        DCHECK(parquet_meta != nullptr);
1335
1336
        // Load is case-insensitive, so you to match the columns in the file.
1337
22
        std::map<std::string, std::string> file_lower_name_to_native;
1338
434
        for (const auto& parquet_field : parquet_meta->get_fields_schema()) {
1339
434
            file_lower_name_to_native.emplace(doris::to_lower(parquet_field.name),
1340
434
                                              parquet_field.name);
1341
434
        }
1342
22
        auto load_info_node = std::make_shared<TableSchemaChangeHelper::StructNode>();
1343
423
        for (const auto slot : _real_tuple_desc->slots()) {
1344
423
            if (file_lower_name_to_native.contains(slot->col_name())) {
1345
422
                load_info_node->add_children(slot->col_name(),
1346
422
                                             file_lower_name_to_native[slot->col_name()],
1347
422
                                             TableSchemaChangeHelper::ConstNode::get_instance());
1348
                // For Load, `file_scanner` will create block columns using the file type,
1349
                // there is no schema change when reading inside the struct,
1350
                // so use `TableSchemaChangeHelper::ConstNode`.
1351
422
            } else {
1352
1
                load_info_node->add_not_exist_children(slot->col_name());
1353
1
            }
1354
423
        }
1355
1356
22
        init_status = parquet_reader->init_reader(
1357
22
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
1358
22
                slot_id_to_predicates, _real_tuple_desc, _default_val_row_desc.get(),
1359
22
                _col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
1360
22
                &_slot_id_to_filter_conjuncts, load_info_node);
1361
22
        _cur_reader = std::move(parquet_reader);
1362
22
    }
1363
1364
34.0k
    return init_status;
1365
34.0k
}
1366
1367
Status FileScanner::_init_orc_reader(std::unique_ptr<OrcReader>&& orc_reader,
1368
26.4k
                                     FileMetaCache* file_meta_cache_ptr) {
1369
26.4k
    const TFileRangeDesc& range = _current_range;
1370
26.4k
    Status init_status = Status::OK();
1371
1372
26.4k
    if (range.__isset.table_format_params &&
1373
26.4k
        range.table_format_params.table_format_type == "transactional_hive") {
1374
292
        std::unique_ptr<TransactionalHiveReader> tran_orc_reader =
1375
292
                TransactionalHiveReader::create_unique(std::move(orc_reader), _profile, _state,
1376
292
                                                       *_params, range, _io_ctx.get(),
1377
292
                                                       file_meta_cache_ptr);
1378
292
        init_status = tran_orc_reader->init_reader(
1379
292
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
1380
292
                _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1381
292
                &_slot_id_to_filter_conjuncts);
1382
292
        RETURN_IF_ERROR(tran_orc_reader->init_row_filters());
1383
292
        _cur_reader = std::move(tran_orc_reader);
1384
26.1k
    } else if (range.__isset.table_format_params &&
1385
26.1k
               range.table_format_params.table_format_type == "iceberg") {
1386
6.47k
        std::unique_ptr<IcebergOrcReader> iceberg_reader = IcebergOrcReader::create_unique(
1387
6.47k
                std::move(orc_reader), _profile, _state, *_params, range, _kv_cache, _io_ctx.get(),
1388
6.47k
                file_meta_cache_ptr);
1389
6.47k
        if (_need_iceberg_rowid_column) {
1390
78
            iceberg_reader->set_need_row_id_column(true);
1391
78
        }
1392
6.47k
        if (_row_lineage_columns.row_id_column_idx != -1 ||
1393
6.47k
            _row_lineage_columns.last_updated_sequence_number_column_idx != -1) {
1394
236
            std::shared_ptr<RowLineageColumns> row_lineage_columns;
1395
236
            row_lineage_columns = std::make_shared<RowLineageColumns>();
1396
236
            row_lineage_columns->row_id_column_idx = _row_lineage_columns.row_id_column_idx;
1397
236
            row_lineage_columns->last_updated_sequence_number_column_idx =
1398
236
                    _row_lineage_columns.last_updated_sequence_number_column_idx;
1399
236
            iceberg_reader->set_row_lineage_columns(std::move(row_lineage_columns));
1400
236
        }
1401
6.47k
        init_status = iceberg_reader->init_reader(
1402
6.47k
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
1403
6.47k
                _default_val_row_desc.get(), _col_name_to_slot_id,
1404
6.47k
                &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
1405
6.47k
        _cur_reader = std::move(iceberg_reader);
1406
19.7k
    } else if (range.__isset.table_format_params &&
1407
19.7k
               range.table_format_params.table_format_type == "paimon") {
1408
2.08k
        std::unique_ptr<PaimonOrcReader> paimon_reader = PaimonOrcReader::create_unique(
1409
2.08k
                std::move(orc_reader), _profile, _state, *_params, range, _kv_cache, _io_ctx.get(),
1410
2.08k
                file_meta_cache_ptr);
1411
1412
2.08k
        init_status = paimon_reader->init_reader(
1413
2.08k
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
1414
2.08k
                _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1415
2.08k
                &_slot_id_to_filter_conjuncts);
1416
2.08k
        RETURN_IF_ERROR(paimon_reader->init_row_filters());
1417
2.08k
        _cur_reader = std::move(paimon_reader);
1418
17.6k
    } else if (range.__isset.table_format_params &&
1419
17.6k
               range.table_format_params.table_format_type == "hudi") {
1420
0
        std::unique_ptr<HudiOrcReader> hudi_reader =
1421
0
                HudiOrcReader::create_unique(std::move(orc_reader), _profile, _state, *_params,
1422
0
                                             range, _io_ctx.get(), file_meta_cache_ptr);
1423
1424
0
        init_status = hudi_reader->init_reader(
1425
0
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
1426
0
                _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1427
0
                &_slot_id_to_filter_conjuncts);
1428
0
        _cur_reader = std::move(hudi_reader);
1429
17.6k
    } else if (range.__isset.table_format_params &&
1430
17.6k
               range.table_format_params.table_format_type == "hive") {
1431
15.4k
        std::unique_ptr<HiveOrcReader> hive_reader = HiveOrcReader::create_unique(
1432
15.4k
                std::move(orc_reader), _profile, _state, *_params, range, _io_ctx.get(),
1433
15.4k
                &_is_file_slot, file_meta_cache_ptr);
1434
1435
15.4k
        init_status = hive_reader->init_reader(
1436
15.4k
                _file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, _real_tuple_desc,
1437
15.4k
                _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1438
15.4k
                &_slot_id_to_filter_conjuncts);
1439
15.4k
        _cur_reader = std::move(hive_reader);
1440
15.4k
    } else if (range.__isset.table_format_params &&
1441
2.23k
               range.table_format_params.table_format_type == "tvf") {
1442
2.20k
        const orc::Type* orc_type_ptr = nullptr;
1443
2.20k
        RETURN_IF_ERROR(orc_reader->get_file_type(&orc_type_ptr));
1444
1445
2.20k
        std::shared_ptr<TableSchemaChangeHelper::Node> tvf_info_node = nullptr;
1446
2.20k
        RETURN_IF_ERROR(TableSchemaChangeHelper::BuildTableInfoUtil::by_orc_name(
1447
2.20k
                _real_tuple_desc, orc_type_ptr, tvf_info_node));
1448
2.20k
        init_status = orc_reader->init_reader(
1449
2.20k
                &_file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, false,
1450
2.20k
                _real_tuple_desc, _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1451
2.20k
                &_slot_id_to_filter_conjuncts, tvf_info_node);
1452
2.20k
        _cur_reader = std::move(orc_reader);
1453
2.20k
    } else if (_is_load) {
1454
13
        const orc::Type* orc_type_ptr = nullptr;
1455
13
        RETURN_IF_ERROR(orc_reader->get_file_type(&orc_type_ptr));
1456
1457
13
        std::map<std::string, std::string> file_lower_name_to_native;
1458
375
        for (uint64_t idx = 0; idx < orc_type_ptr->getSubtypeCount(); idx++) {
1459
362
            file_lower_name_to_native.emplace(doris::to_lower(orc_type_ptr->getFieldName(idx)),
1460
362
                                              orc_type_ptr->getFieldName(idx));
1461
362
        }
1462
1463
13
        auto load_info_node = std::make_shared<TableSchemaChangeHelper::StructNode>();
1464
362
        for (const auto slot : _real_tuple_desc->slots()) {
1465
362
            if (file_lower_name_to_native.contains(slot->col_name())) {
1466
362
                load_info_node->add_children(slot->col_name(),
1467
362
                                             file_lower_name_to_native[slot->col_name()],
1468
362
                                             TableSchemaChangeHelper::ConstNode::get_instance());
1469
362
            } else {
1470
0
                load_info_node->add_not_exist_children(slot->col_name());
1471
0
            }
1472
362
        }
1473
13
        init_status = orc_reader->init_reader(
1474
13
                &_file_col_names, &_src_block_name_to_idx, _push_down_conjuncts, false,
1475
13
                _real_tuple_desc, _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts,
1476
13
                &_slot_id_to_filter_conjuncts, load_info_node);
1477
13
        _cur_reader = std::move(orc_reader);
1478
13
    }
1479
1480
26.4k
    return init_status;
1481
26.4k
}
1482
1483
74.1k
Status FileScanner::_set_fill_or_truncate_columns(bool need_to_get_parsed_schema) {
1484
74.1k
    _missing_cols.clear();
1485
74.1k
    _slot_lower_name_to_col_type.clear();
1486
74.1k
    std::unordered_map<std::string, DataTypePtr> name_to_col_type;
1487
74.1k
    RETURN_IF_ERROR(_cur_reader->get_columns(&name_to_col_type, &_missing_cols));
1488
74.1k
    if (_need_iceberg_rowid_column && _current_range.__isset.table_format_params &&
1489
74.1k
        _current_range.table_format_params.table_format_type == "iceberg") {
1490
156
        _missing_cols.erase(BeConsts::ICEBERG_ROWID_COL);
1491
156
        _missing_cols.erase(to_lower(BeConsts::ICEBERG_ROWID_COL));
1492
156
    }
1493
2.31M
    for (const auto& [col_name, col_type] : name_to_col_type) {
1494
2.31M
        auto col_name_lower = to_lower(col_name);
1495
2.31M
        if (_partition_col_descs.contains(col_name_lower)) {
1496
            /*
1497
             * `_slot_lower_name_to_col_type` is used by `_init_src_block` and `_cast_to_input_block` during LOAD to
1498
             * generate columns of the corresponding type, which records the columns existing in the file.
1499
             *
1500
             * When a column in `COLUMNS FROM PATH` exists in a file column, the column type in the block will
1501
             * not match the slot type in `_output_tuple_desc`, causing an error when
1502
             * Serde `deserialize_one_cell_from_json` fills the partition values.
1503
             *
1504
             * So for partition column not need fill _slot_lower_name_to_col_type.
1505
             */
1506
3.39k
            continue;
1507
3.39k
        }
1508
2.31M
        _slot_lower_name_to_col_type.emplace(col_name_lower, col_type);
1509
2.31M
    }
1510
1511
74.1k
    if (!_fill_partition_from_path && config::enable_iceberg_partition_column_fallback) {
1512
        // check if the cols of _partition_col_descs are in _missing_cols
1513
        // if so, set _fill_partition_from_path to true and remove the col from _missing_cols
1514
3.32k
        for (const auto& [col_name, col_type] : _partition_col_descs) {
1515
3.32k
            if (_missing_cols.contains(col_name)) {
1516
0
                _fill_partition_from_path = true;
1517
0
                _missing_cols.erase(col_name);
1518
0
            }
1519
3.32k
        }
1520
2.46k
    }
1521
1522
74.1k
    RETURN_IF_ERROR(_generate_missing_columns());
1523
74.1k
    if (_fill_partition_from_path) {
1524
71.6k
        RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs));
1525
71.6k
    } else {
1526
        // If the partition columns are not from path, we only fill the missing columns.
1527
2.50k
        RETURN_IF_ERROR(_cur_reader->set_fill_columns({}, _missing_col_descs));
1528
2.50k
    }
1529
74.1k
    if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
1530
0
        fmt::memory_buffer col_buf;
1531
0
        for (auto& col : _missing_cols) {
1532
0
            fmt::format_to(col_buf, " {}", col);
1533
0
        }
1534
0
        VLOG_NOTICE << fmt::format("Unknown columns:{} in file {}", fmt::to_string(col_buf),
1535
0
                                   _current_range.path);
1536
0
    }
1537
1538
74.1k
    RETURN_IF_ERROR(_generate_truncate_columns(need_to_get_parsed_schema));
1539
74.1k
    return Status::OK();
1540
74.1k
}
1541
1542
74.1k
Status FileScanner::_generate_truncate_columns(bool need_to_get_parsed_schema) {
1543
74.1k
    _source_file_col_name_types.clear();
1544
    //  The col names and types of source file, such as parquet, orc files.
1545
74.1k
    if (_state->query_options().truncate_char_or_varchar_columns && need_to_get_parsed_schema) {
1546
8
        std::vector<std::string> source_file_col_names;
1547
8
        std::vector<DataTypePtr> source_file_col_types;
1548
8
        Status status =
1549
8
                _cur_reader->get_parsed_schema(&source_file_col_names, &source_file_col_types);
1550
8
        if (!status.ok() && status.code() != TStatusCode::NOT_IMPLEMENTED_ERROR) {
1551
0
            return status;
1552
0
        }
1553
8
        DCHECK_EQ(source_file_col_names.size(), source_file_col_types.size());
1554
32
        for (int i = 0; i < source_file_col_names.size(); ++i) {
1555
24
            _source_file_col_name_types[to_lower(source_file_col_names[i])] =
1556
24
                    source_file_col_types[i];
1557
24
        }
1558
8
    }
1559
74.1k
    return Status::OK();
1560
74.1k
}
1561
1562
3.22k
Status FileScanner::prepare_for_read_lines(const TFileRangeDesc& range) {
1563
3.22k
    _current_range = range;
1564
1565
3.22k
    _file_cache_statistics.reset(new io::FileCacheStatistics());
1566
3.22k
    _file_reader_stats.reset(new io::FileReaderStats());
1567
1568
3.22k
    _file_read_bytes_counter =
1569
3.22k
            ADD_COUNTER_WITH_LEVEL(_profile, FileReadBytesProfile, TUnit::BYTES, 1);
1570
3.22k
    _file_read_time_counter = ADD_TIMER_WITH_LEVEL(_profile, FileReadTimeProfile, 1);
1571
1572
3.22k
    RETURN_IF_ERROR(_init_io_ctx());
1573
3.22k
    _io_ctx->file_cache_stats = _file_cache_statistics.get();
1574
3.22k
    _io_ctx->file_reader_stats = _file_reader_stats.get();
1575
3.22k
    _default_val_row_desc.reset(new RowDescriptor((TupleDescriptor*)_real_tuple_desc));
1576
3.22k
    RETURN_IF_ERROR(_init_expr_ctxes());
1577
1578
    // Since only one column is read from the file, there is no need to filter, so set these variables to empty.
1579
3.22k
    _push_down_conjuncts.clear();
1580
3.22k
    _not_single_slot_filter_conjuncts.clear();
1581
3.22k
    _slot_id_to_filter_conjuncts.clear();
1582
3.22k
    _kv_cache = nullptr;
1583
3.22k
    return Status::OK();
1584
3.22k
}
1585
1586
Status FileScanner::read_lines_from_range(const TFileRangeDesc& range,
1587
                                          const std::list<int64_t>& row_ids, Block* result_block,
1588
                                          const ExternalFileMappingInfo& external_info,
1589
3.22k
                                          int64_t* init_reader_ms, int64_t* get_block_ms) {
1590
3.22k
    _current_range = range;
1591
3.22k
    RETURN_IF_ERROR(_generate_partition_columns());
1592
1593
3.22k
    TFileFormatType::type format_type = _get_current_format_type();
1594
3.22k
    Status init_status = Status::OK();
1595
1596
3.22k
    auto file_meta_cache_ptr = external_info.enable_file_meta_cache
1597
3.22k
                                       ? ExecEnv::GetInstance()->file_meta_cache()
1598
3.22k
                                       : nullptr;
1599
1600
3.22k
    RETURN_IF_ERROR(scope_timer_run(
1601
3.22k
            [&]() -> Status {
1602
3.22k
                switch (format_type) {
1603
3.22k
                case TFileFormatType::FORMAT_PARQUET: {
1604
3.22k
                    std::unique_ptr<ParquetReader> parquet_reader = ParquetReader::create_unique(
1605
3.22k
                            _profile, *_params, range, 1, &_state->timezone_obj(), _io_ctx.get(),
1606
3.22k
                            _state, file_meta_cache_ptr, false);
1607
1608
3.22k
                    RETURN_IF_ERROR(parquet_reader->read_by_rows(row_ids));
1609
3.22k
                    RETURN_IF_ERROR(
1610
3.22k
                            _init_parquet_reader(std::move(parquet_reader), file_meta_cache_ptr));
1611
3.22k
                    break;
1612
3.22k
                }
1613
3.22k
                case TFileFormatType::FORMAT_ORC: {
1614
3.22k
                    std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
1615
3.22k
                            _profile, _state, *_params, range, 1, _state->timezone(), _io_ctx.get(),
1616
3.22k
                            file_meta_cache_ptr, false);
1617
1618
3.22k
                    RETURN_IF_ERROR(orc_reader->read_by_rows(row_ids));
1619
3.22k
                    RETURN_IF_ERROR(_init_orc_reader(std::move(orc_reader), file_meta_cache_ptr));
1620
3.22k
                    break;
1621
3.22k
                }
1622
3.22k
                default: {
1623
3.22k
                    return Status::NotSupported(
1624
3.22k
                            "Not support create lines reader for file format: {},"
1625
3.22k
                            "only support parquet and orc.",
1626
3.22k
                            to_string(_params->format_type));
1627
3.22k
                }
1628
3.22k
                }
1629
3.22k
                return Status::OK();
1630
3.22k
            },
1631
3.22k
            init_reader_ms));
1632
1633
3.22k
    RETURN_IF_ERROR(_set_fill_or_truncate_columns(true));
1634
3.22k
    _cur_reader_eof = false;
1635
1636
3.22k
    RETURN_IF_ERROR(scope_timer_run(
1637
3.22k
            [&]() -> Status {
1638
3.22k
                while (!_cur_reader_eof) {
1639
3.22k
                    bool eof = false;
1640
3.22k
                    RETURN_IF_ERROR(_get_block_impl(_state, result_block, &eof));
1641
3.22k
                }
1642
3.22k
                return Status::OK();
1643
3.22k
            },
1644
3.22k
            get_block_ms));
1645
1646
3.22k
    _cur_reader->collect_profile_before_close();
1647
3.22k
    RETURN_IF_ERROR(_cur_reader->close());
1648
1649
3.22k
    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
1650
3.22k
    COUNTER_UPDATE(_file_read_time_counter, _file_reader_stats->read_time_ns);
1651
3.22k
    return Status::OK();
1652
3.22k
}
1653
1654
10.5k
Status FileScanner::_generate_partition_columns() {
1655
10.5k
    _partition_col_descs.clear();
1656
10.5k
    _partition_value_is_null.clear();
1657
10.5k
    const TFileRangeDesc& range = _current_range;
1658
10.5k
    if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
1659
14.8k
        for (const auto& slot_desc : _partition_slot_descs) {
1660
14.9k
            if (slot_desc) {
1661
14.9k
                auto it = _partition_slot_index_map.find(slot_desc->id());
1662
14.9k
                if (it == std::end(_partition_slot_index_map)) {
1663
0
                    return Status::InternalError("Unknown source slot descriptor, slot_id={}",
1664
0
                                                 slot_desc->id());
1665
0
                }
1666
14.9k
                const std::string& column_from_path = range.columns_from_path[it->second];
1667
14.9k
                _partition_col_descs.emplace(slot_desc->col_name(),
1668
14.9k
                                             std::make_tuple(column_from_path, slot_desc));
1669
14.9k
                if (range.__isset.columns_from_path_is_null) {
1670
3.70k
                    _partition_value_is_null.emplace(slot_desc->col_name(),
1671
3.70k
                                                     range.columns_from_path_is_null[it->second]);
1672
3.70k
                }
1673
14.9k
            }
1674
14.8k
        }
1675
8.50k
    }
1676
10.5k
    return Status::OK();
1677
10.5k
}
1678
1679
74.1k
Status FileScanner::_generate_missing_columns() {
1680
74.1k
    _missing_col_descs.clear();
1681
74.1k
    if (!_missing_cols.empty()) {
1682
23.7k
        for (auto* slot_desc : _real_tuple_desc->slots()) {
1683
23.7k
            if (!_missing_cols.contains(slot_desc->col_name())) {
1684
16.4k
                continue;
1685
16.4k
            }
1686
1687
7.30k
            auto it = _col_default_value_ctx.find(slot_desc->col_name());
1688
7.30k
            if (it == _col_default_value_ctx.end()) {
1689
0
                return Status::InternalError("failed to find default value expr for slot: {}",
1690
0
                                             slot_desc->col_name());
1691
0
            }
1692
7.30k
            _missing_col_descs.emplace(slot_desc->col_name(), it->second);
1693
7.30k
        }
1694
5.36k
    }
1695
74.1k
    return Status::OK();
1696
74.1k
}
1697
1698
53.4k
Status FileScanner::_init_expr_ctxes() {
1699
53.4k
    std::map<SlotId, int> full_src_index_map;
1700
53.4k
    std::map<SlotId, SlotDescriptor*> full_src_slot_map;
1701
53.4k
    std::map<std::string, int> partition_name_to_key_index_map;
1702
53.4k
    int index = 0;
1703
324k
    for (const auto& slot_desc : _real_tuple_desc->slots()) {
1704
324k
        full_src_slot_map.emplace(slot_desc->id(), slot_desc);
1705
324k
        full_src_index_map.emplace(slot_desc->id(), index++);
1706
324k
    }
1707
1708
    // For external table query, find the index of column in path.
1709
    // Because query doesn't always search for all columns in a table
1710
    // and the order of selected columns is random.
1711
    // All ranges in _ranges vector should have identical columns_from_path_keys
1712
    // because they are all file splits for the same external table.
1713
    // So here use the first element of _ranges to fill the partition_name_to_key_index_map
1714
53.4k
    if (_current_range.__isset.columns_from_path_keys) {
1715
50.8k
        std::vector<std::string> key_map = _current_range.columns_from_path_keys;
1716
50.8k
        if (!key_map.empty()) {
1717
30.4k
            for (size_t i = 0; i < key_map.size(); i++) {
1718
19.8k
                partition_name_to_key_index_map.emplace(key_map[i], i);
1719
19.8k
            }
1720
10.6k
        }
1721
50.8k
    }
1722
1723
53.4k
    _num_of_columns_from_file = _params->num_of_columns_from_file;
1724
1725
324k
    for (const auto& slot_info : _params->required_slots) {
1726
324k
        auto slot_id = slot_info.slot_id;
1727
324k
        auto it = full_src_slot_map.find(slot_id);
1728
324k
        if (it == std::end(full_src_slot_map)) {
1729
0
            return Status::InternalError(
1730
0
                    fmt::format("Unknown source slot descriptor, slot_id={}", slot_id));
1731
0
        }
1732
324k
        if (it->second->col_name().starts_with(BeConsts::GLOBAL_ROWID_COL)) {
1733
4.27k
            _row_id_column_iterator_pair.second = _default_val_row_desc->get_column_id(slot_id);
1734
4.27k
            continue;
1735
4.27k
        }
1736
319k
        if (it->second->col_name() == BeConsts::ICEBERG_ROWID_COL) {
1737
156
            _need_iceberg_rowid_column = true;
1738
156
            continue;
1739
156
        }
1740
1741
319k
        if (it->second->col_name() == IcebergTableReader::ROW_LINEAGE_ROW_ID) {
1742
438
            _row_lineage_columns.row_id_column_idx = _default_val_row_desc->get_column_id(slot_id);
1743
438
        }
1744
1745
319k
        if (it->second->col_name() == IcebergTableReader::ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
1746
370
            _row_lineage_columns.last_updated_sequence_number_column_idx =
1747
370
                    _default_val_row_desc->get_column_id(slot_id);
1748
370
        }
1749
1750
319k
        if (slot_info.is_file_slot) {
1751
308k
            _is_file_slot.emplace(slot_id);
1752
308k
            _file_slot_descs.emplace_back(it->second);
1753
308k
            _file_col_names.push_back(it->second->col_name());
1754
308k
        }
1755
1756
319k
        if (partition_name_to_key_index_map.contains(it->second->col_name())) {
1757
14.2k
            if (slot_info.is_file_slot) {
1758
                // If there is slot which is both a partition column and a file column,
1759
                // we should not fill the partition column from path.
1760
3.61k
                _fill_partition_from_path = false;
1761
10.6k
            } else if (!_fill_partition_from_path) {
1762
                // This should not happen
1763
0
                return Status::InternalError(
1764
0
                        "Partition column {} is not a file column, but there is already a column "
1765
0
                        "which is both a partition column and a file column.",
1766
0
                        it->second->col_name());
1767
0
            }
1768
14.2k
            _partition_slot_descs.emplace_back(it->second);
1769
14.2k
            if (_is_load) {
1770
11
                auto iti = full_src_index_map.find(slot_id);
1771
11
                _partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file);
1772
14.2k
            } else {
1773
14.2k
                auto kit = partition_name_to_key_index_map.find(it->second->col_name());
1774
14.2k
                _partition_slot_index_map.emplace(slot_id, kit->second);
1775
14.2k
            }
1776
14.2k
        }
1777
319k
    }
1778
1779
    // set column name to default value expr map
1780
325k
    for (auto* slot_desc : _real_tuple_desc->slots()) {
1781
325k
        VExprContextSPtr ctx;
1782
325k
        auto it = _params->default_value_of_src_slot.find(slot_desc->id());
1783
325k
        if (it != std::end(_params->default_value_of_src_slot)) {
1784
320k
            if (!it->second.nodes.empty()) {
1785
291k
                RETURN_IF_ERROR(VExpr::create_expr_tree(it->second, ctx));
1786
291k
                RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc));
1787
291k
                RETURN_IF_ERROR(ctx->open(_state));
1788
291k
            }
1789
            // if expr is empty, the default value will be null
1790
320k
            _col_default_value_ctx.emplace(slot_desc->col_name(), ctx);
1791
320k
        }
1792
325k
    }
1793
1794
53.4k
    if (_is_load) {
1795
        // follow desc expr map is only for load task.
1796
2.48k
        bool has_slot_id_map = _params->__isset.dest_sid_to_src_sid_without_trans;
1797
2.48k
        int idx = 0;
1798
28.9k
        for (auto* slot_desc : _output_tuple_desc->slots()) {
1799
28.9k
            auto it = _params->expr_of_dest_slot.find(slot_desc->id());
1800
28.9k
            if (it == std::end(_params->expr_of_dest_slot)) {
1801
0
                return Status::InternalError("No expr for dest slot, id={}, name={}",
1802
0
                                             slot_desc->id(), slot_desc->col_name());
1803
0
            }
1804
1805
28.9k
            VExprContextSPtr ctx;
1806
28.9k
            if (!it->second.nodes.empty()) {
1807
28.9k
                RETURN_IF_ERROR(VExpr::create_expr_tree(it->second, ctx));
1808
28.9k
                RETURN_IF_ERROR(ctx->prepare(_state, *_src_row_desc));
1809
28.9k
                RETURN_IF_ERROR(ctx->open(_state));
1810
28.9k
            }
1811
28.9k
            _dest_vexpr_ctx.emplace_back(ctx);
1812
28.9k
            _dest_slot_name_to_idx[slot_desc->col_name()] = idx++;
1813
1814
28.9k
            if (has_slot_id_map) {
1815
28.8k
                auto it1 = _params->dest_sid_to_src_sid_without_trans.find(slot_desc->id());
1816
28.8k
                if (it1 == std::end(_params->dest_sid_to_src_sid_without_trans)) {
1817
6.39k
                    _src_slot_descs_order_by_dest.emplace_back(nullptr);
1818
22.5k
                } else {
1819
22.5k
                    auto _src_slot_it = full_src_slot_map.find(it1->second);
1820
22.5k
                    if (_src_slot_it == std::end(full_src_slot_map)) {
1821
0
                        return Status::InternalError("No src slot {} in src slot descs",
1822
0
                                                     it1->second);
1823
0
                    }
1824
22.5k
                    _dest_slot_to_src_slot_index.emplace(_src_slot_descs_order_by_dest.size(),
1825
22.5k
                                                         full_src_index_map[_src_slot_it->first]);
1826
22.5k
                    _src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second);
1827
22.5k
                }
1828
28.8k
            }
1829
28.9k
        }
1830
2.48k
    }
1831
53.4k
    return Status::OK();
1832
53.4k
}
1833
1834
401k
bool FileScanner::_should_enable_condition_cache() {
1835
401k
    return _condition_cache_digest != 0 && !_is_load &&
1836
401k
           (!_conjuncts.empty() || !_push_down_conjuncts.empty());
1837
401k
}
1838
1839
157k
void FileScanner::_init_reader_condition_cache() {
1840
157k
    _condition_cache = nullptr;
1841
157k
    _condition_cache_ctx = nullptr;
1842
1843
157k
    if (!_should_enable_condition_cache() || !_cur_reader) {
1844
136k
        return;
1845
136k
    }
1846
1847
    // Disable condition cache when delete operations exist (e.g. Iceberg position/equality
1848
    // deletes, Hive ACID deletes). Cached granule results may become stale if delete files
1849
    // change between queries while the data file's cache key remains the same.
1850
20.7k
    if (_cur_reader->has_delete_operations()) {
1851
2.93k
        return;
1852
2.93k
    }
1853
1854
17.7k
    auto* cache = segment_v2::ConditionCache::instance();
1855
17.7k
    _condition_cache_key = segment_v2::ConditionCache::ExternalCacheKey(
1856
17.7k
            _current_range.path,
1857
18.4E
            _current_range.__isset.modification_time ? _current_range.modification_time : 0,
1858
18.4E
            _current_range.__isset.file_size ? _current_range.file_size : -1,
1859
17.7k
            _condition_cache_digest,
1860
18.4E
            _current_range.__isset.start_offset ? _current_range.start_offset : 0,
1861
18.4E
            _current_range.__isset.size ? _current_range.size : -1);
1862
1863
17.7k
    segment_v2::ConditionCacheHandle handle;
1864
17.7k
    auto condition_cache_hit = cache->lookup(_condition_cache_key, &handle);
1865
17.7k
    if (condition_cache_hit) {
1866
7.20k
        _condition_cache = handle.get_filter_result();
1867
7.20k
        _condition_cache_hit_count++;
1868
10.5k
    } else {
1869
        // Allocate cache pre-sized to total number of granules.
1870
        // We add +1 as a safety margin: when a file is split across multiple scanners
1871
        // and the first row of this scanner's range is not aligned to a granule boundary,
1872
        // the data may span one more granule than ceil(total_rows / GRANULE_SIZE).
1873
        // The extra element costs only 1 bit and never affects correctness (an extra
1874
        // false-granule beyond the actual data range won't overlap any real row range).
1875
10.5k
        int64_t total_rows = _cur_reader->get_total_rows();
1876
10.5k
        if (total_rows > 0) {
1877
8.79k
            size_t num_granules = (total_rows + ConditionCacheContext::GRANULE_SIZE - 1) /
1878
8.79k
                                  ConditionCacheContext::GRANULE_SIZE;
1879
8.79k
            _condition_cache = std::make_shared<std::vector<bool>>(num_granules + 1, false);
1880
8.79k
        }
1881
10.5k
    }
1882
1883
17.7k
    if (_condition_cache) {
1884
        // Create context to pass to readers (native readers use it; non-native readers ignore it)
1885
16.0k
        _condition_cache_ctx = std::make_shared<ConditionCacheContext>();
1886
16.0k
        _condition_cache_ctx->is_hit = condition_cache_hit;
1887
16.0k
        _condition_cache_ctx->filter_result = _condition_cache;
1888
16.0k
        _cur_reader->set_condition_cache_context(_condition_cache_ctx);
1889
16.0k
    }
1890
17.7k
}
1891
1892
244k
void FileScanner::_finalize_reader_condition_cache() {
1893
244k
    if (!_should_enable_condition_cache() || !_condition_cache_ctx ||
1894
244k
        _condition_cache_ctx->is_hit) {
1895
235k
        _condition_cache = nullptr;
1896
235k
        _condition_cache_ctx = nullptr;
1897
235k
        return;
1898
235k
    }
1899
    // Only store the cache if the reader was fully consumed. If the scan was
1900
    // truncated early (e.g. by LIMIT), the cache is incomplete — unread granules
1901
    // would remain false and cause surviving rows to be incorrectly skipped on HIT.
1902
8.79k
    if (!_cur_reader_eof) {
1903
54
        _condition_cache = nullptr;
1904
54
        _condition_cache_ctx = nullptr;
1905
54
        return;
1906
54
    }
1907
1908
8.73k
    auto* cache = segment_v2::ConditionCache::instance();
1909
8.73k
    cache->insert(_condition_cache_key, std::move(_condition_cache));
1910
8.73k
    _condition_cache = nullptr;
1911
8.73k
    _condition_cache_ctx = nullptr;
1912
8.73k
}
1913
1914
86.9k
Status FileScanner::close(RuntimeState* state) {
1915
86.9k
    if (!_try_close()) {
1916
0
        return Status::OK();
1917
0
    }
1918
1919
86.9k
    _finalize_reader_condition_cache();
1920
1921
86.9k
    if (_cur_reader) {
1922
607
        RETURN_IF_ERROR(_cur_reader->close());
1923
607
    }
1924
1925
86.9k
    RETURN_IF_ERROR(Scanner::close(state));
1926
86.9k
    return Status::OK();
1927
86.9k
}
1928
1929
86.9k
void FileScanner::try_stop() {
1930
86.9k
    Scanner::try_stop();
1931
86.9k
    if (_io_ctx) {
1932
86.9k
        _io_ctx->should_stop = true;
1933
86.9k
    }
1934
86.9k
}
1935
1936
86.8k
void FileScanner::update_realtime_counters() {
1937
86.8k
    FileScanLocalState* local_state = static_cast<FileScanLocalState*>(_local_state);
1938
1939
86.8k
    COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes);
1940
86.8k
    COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows);
1941
1942
86.8k
    _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_rows(
1943
86.8k
            _file_reader_stats->read_rows);
1944
86.8k
    _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes(
1945
86.8k
            _file_reader_stats->read_bytes);
1946
1947
86.8k
    int64_t delta_bytes_read_from_local =
1948
86.8k
            _file_cache_statistics->bytes_read_from_local - _last_bytes_read_from_local;
1949
86.8k
    int64_t delta_bytes_read_from_remote =
1950
86.8k
            _file_cache_statistics->bytes_read_from_remote - _last_bytes_read_from_remote;
1951
86.8k
    if (_file_cache_statistics->bytes_read_from_local == 0 &&
1952
86.8k
        _file_cache_statistics->bytes_read_from_remote == 0) {
1953
86.0k
        _state->get_query_ctx()
1954
86.0k
                ->resource_ctx()
1955
86.0k
                ->io_context()
1956
86.0k
                ->update_scan_bytes_from_remote_storage(_file_reader_stats->read_bytes);
1957
86.0k
        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
1958
86.0k
                _file_reader_stats->read_bytes);
1959
86.0k
    } else {
1960
718
        _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage(
1961
718
                delta_bytes_read_from_local);
1962
718
        _state->get_query_ctx()
1963
718
                ->resource_ctx()
1964
718
                ->io_context()
1965
718
                ->update_scan_bytes_from_remote_storage(delta_bytes_read_from_remote);
1966
718
        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
1967
718
                delta_bytes_read_from_local);
1968
718
        DorisMetrics::instance()->query_scan_bytes_from_remote->increment(
1969
718
                delta_bytes_read_from_remote);
1970
718
    }
1971
1972
86.8k
    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
1973
1974
86.8k
    DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes);
1975
86.8k
    DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows);
1976
1977
86.8k
    _file_reader_stats->read_bytes = 0;
1978
86.8k
    _file_reader_stats->read_rows = 0;
1979
1980
86.8k
    _last_bytes_read_from_local = _file_cache_statistics->bytes_read_from_local;
1981
86.8k
    _last_bytes_read_from_remote = _file_cache_statistics->bytes_read_from_remote;
1982
86.8k
}
1983
1984
86.7k
void FileScanner::_collect_profile_before_close() {
1985
86.7k
    Scanner::_collect_profile_before_close();
1986
86.8k
    if (config::enable_file_cache && _state->query_options().enable_file_cache &&
1987
86.7k
        _profile != nullptr) {
1988
1.06k
        io::FileCacheProfileReporter cache_profile(_profile);
1989
1.06k
        cache_profile.update(_file_cache_statistics.get());
1990
1.06k
        _state->get_query_ctx()->resource_ctx()->io_context()->update_bytes_write_into_cache(
1991
1.06k
                _file_cache_statistics->bytes_write_into_cache);
1992
1.06k
    }
1993
1994
86.7k
    if (_cur_reader != nullptr) {
1995
607
        _cur_reader->collect_profile_before_close();
1996
607
    }
1997
1998
86.7k
    FileScanLocalState* local_state = static_cast<FileScanLocalState*>(_local_state);
1999
86.7k
    COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes);
2000
86.7k
    COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows);
2001
2002
86.7k
    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
2003
86.7k
    COUNTER_UPDATE(_file_read_calls_counter, _file_reader_stats->read_calls);
2004
86.7k
    COUNTER_UPDATE(_file_read_time_counter, _file_reader_stats->read_time_ns);
2005
86.7k
    COUNTER_UPDATE(local_state->_condition_cache_hit_counter, _condition_cache_hit_count);
2006
86.8k
    if (_io_ctx) {
2007
86.8k
        COUNTER_UPDATE(local_state->_condition_cache_filtered_rows_counter,
2008
86.8k
                       _io_ctx->condition_cache_filtered_rows);
2009
86.8k
    }
2010
2011
86.7k
    DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes);
2012
86.7k
    DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows);
2013
86.7k
}
2014
2015
} // namespace doris