Coverage Report

Created: 2026-04-22 11:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/file_scanner.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/file_scanner.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Exprs_types.h>
22
#include <gen_cpp/Metrics_types.h>
23
#include <gen_cpp/Opcodes_types.h>
24
#include <gen_cpp/PaloInternalService_types.h>
25
#include <gen_cpp/PlanNodes_types.h>
26
#include <glog/logging.h>
27
28
#include <algorithm>
29
#include <boost/iterator/iterator_facade.hpp>
30
#include <map>
31
#include <ranges>
32
#include <tuple>
33
#include <unordered_map>
34
#include <utility>
35
36
#include "common/compiler_util.h" // IWYU pragma: keep
37
#include "common/config.h"
38
#include "common/consts.h"
39
#include "common/logging.h"
40
#include "common/status.h"
41
#include "core/block/column_with_type_and_name.h"
42
#include "core/block/columns_with_type_and_name.h"
43
#include "core/column/column.h"
44
#include "core/column/column_nullable.h"
45
#include "core/column/column_vector.h"
46
#include "core/data_type/data_type.h"
47
#include "core/data_type/data_type_nullable.h"
48
#include "core/data_type/data_type_string.h"
49
#include "core/string_ref.h"
50
#include "exec/common/stringop_substring.h"
51
#include "exec/rowid_fetcher.h"
52
#include "exec/scan/scan_node.h"
53
#include "exprs/aggregate/aggregate_function.h"
54
#include "exprs/function/function.h"
55
#include "exprs/function/simple_function_factory.h"
56
#include "exprs/vexpr.h"
57
#include "exprs/vexpr_context.h"
58
#include "exprs/vexpr_fwd.h"
59
#include "exprs/vslot_ref.h"
60
#include "format/arrow/arrow_stream_reader.h"
61
#include "format/count_reader.h"
62
#include "format/csv/csv_reader.h"
63
#include "format/json/new_json_reader.h"
64
#include "format/native/native_reader.h"
65
#include "format/orc/vorc_reader.h"
66
#include "format/parquet/vparquet_reader.h"
67
#include "format/table/hive_reader.h"
68
#include "format/table/hudi_jni_reader.h"
69
#include "format/table/hudi_reader.h"
70
#include "format/table/iceberg_reader.h"
71
#include "format/table/iceberg_sys_table_jni_reader.h"
72
#include "format/table/jdbc_jni_reader.h"
73
#include "format/table/max_compute_jni_reader.h"
74
#include "format/table/paimon_cpp_reader.h"
75
#include "format/table/paimon_jni_reader.h"
76
#include "format/table/paimon_predicate_converter.h"
77
#include "format/table/paimon_reader.h"
78
#include "format/table/remote_doris_reader.h"
79
#include "format/table/transactional_hive_reader.h"
80
#include "format/table/trino_connector_jni_reader.h"
81
#include "format/text/text_reader.h"
82
#ifdef BUILD_RUST_READERS
83
#include "format/lance/lance_rust_reader.h"
84
#endif
85
#include "io/cache/block_file_cache_profile.h"
86
#include "load/group_commit/wal/wal_reader.h"
87
#include "runtime/descriptors.h"
88
#include "runtime/runtime_profile.h"
89
#include "runtime/runtime_state.h"
90
91
namespace cctz {
92
class time_zone;
93
} // namespace cctz
94
namespace doris {
95
class ShardedKVCache;
96
} // namespace doris
97
98
namespace doris {
99
using namespace ErrorCode;
100
101
const std::string FileScanner::FileReadBytesProfile = "FileReadBytes";
102
const std::string FileScanner::FileReadTimeProfile = "FileReadTime";
103
104
FileScanner::FileScanner(RuntimeState* state, FileScanLocalState* local_state, int64_t limit,
105
                         std::shared_ptr<SplitSourceConnector> split_source,
106
                         RuntimeProfile* profile, ShardedKVCache* kv_cache,
107
                         const std::unordered_map<std::string, int>* colname_to_slot_id)
108
86.3k
        : Scanner(state, local_state, limit, profile),
109
86.3k
          _split_source(split_source),
110
86.3k
          _cur_reader(nullptr),
111
86.3k
          _cur_reader_eof(false),
112
86.3k
          _kv_cache(kv_cache),
113
86.3k
          _strict_mode(false),
114
86.3k
          _col_name_to_slot_id(colname_to_slot_id) {
115
86.3k
    if (state->get_query_ctx() != nullptr &&
116
86.3k
        state->get_query_ctx()->file_scan_range_params_map.count(local_state->parent_id()) > 0) {
117
83.7k
        _params = &(state->get_query_ctx()->file_scan_range_params_map[local_state->parent_id()]);
118
83.7k
    } else {
119
        // old fe thrift protocol
120
2.57k
        _params = _split_source->get_params();
121
2.57k
    }
122
86.3k
    if (_params->__isset.strict_mode) {
123
2.49k
        _strict_mode = _params->strict_mode;
124
2.49k
    }
125
126
    // For load scanner, there are input and output tuple.
127
    // For query scanner, there is only output tuple
128
86.3k
    _input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_params->src_tuple_id);
129
86.3k
    _real_tuple_desc = _input_tuple_desc == nullptr ? _output_tuple_desc : _input_tuple_desc;
130
86.3k
    _is_load = (_input_tuple_desc != nullptr);
131
86.3k
    _configure_file_scan_handlers();
132
86.3k
}
133
134
89.5k
void FileScanner::_configure_file_scan_handlers() {
135
89.5k
    if (_is_load) {
136
2.49k
        _init_src_block_handler = &FileScanner::_init_src_block_for_load;
137
2.49k
        _process_src_block_after_read_handler =
138
2.49k
                &FileScanner::_process_src_block_after_read_for_load;
139
2.49k
        _should_push_down_predicates_handler = &FileScanner::_should_push_down_predicates_for_load;
140
2.49k
        _should_enable_condition_cache_handler =
141
2.49k
                &FileScanner::_should_enable_condition_cache_for_load;
142
87.0k
    } else {
143
87.0k
        _init_src_block_handler = &FileScanner::_init_src_block_for_query;
144
87.0k
        _process_src_block_after_read_handler =
145
87.0k
                &FileScanner::_process_src_block_after_read_for_query;
146
87.0k
        _should_push_down_predicates_handler = &FileScanner::_should_push_down_predicates_for_query;
147
87.0k
        _should_enable_condition_cache_handler =
148
87.0k
                &FileScanner::_should_enable_condition_cache_for_query;
149
87.0k
    }
150
89.5k
}
151
152
86.2k
Status FileScanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) {
153
86.2k
    RETURN_IF_ERROR(Scanner::init(state, conjuncts));
154
86.2k
    _get_block_timer =
155
86.2k
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerGetBlockTime", 1);
156
86.2k
    _cast_to_input_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
157
86.2k
                                                      "FileScannerCastInputBlockTime", 1);
158
86.2k
    _fill_missing_columns_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
159
86.2k
                                                       "FileScannerFillMissingColumnTime", 1);
160
86.2k
    _pre_filter_timer =
161
86.2k
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerPreFilterTimer", 1);
162
86.2k
    _convert_to_output_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
163
86.2k
                                                          "FileScannerConvertOuputBlockTime", 1);
164
86.2k
    _runtime_filter_partition_prune_timer = ADD_TIMER_WITH_LEVEL(
165
86.2k
            _local_state->scanner_profile(), "FileScannerRuntimeFilterPartitionPruningTime", 1);
166
86.2k
    _empty_file_counter =
167
86.2k
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "EmptyFileNum", TUnit::UNIT, 1);
168
86.2k
    _not_found_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
169
86.2k
                                                     "NotFoundFileNum", TUnit::UNIT, 1);
170
86.2k
    _fully_skipped_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
171
86.2k
                                                         "FullySkippedFileNum", TUnit::UNIT, 1);
172
86.2k
    _file_counter =
173
86.2k
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT, 1);
174
175
86.2k
    _file_read_bytes_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
176
86.2k
                                                      FileReadBytesProfile, TUnit::BYTES, 1);
177
86.2k
    _file_read_calls_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
178
86.2k
                                                      "FileReadCalls", TUnit::UNIT, 1);
179
86.2k
    _file_read_time_counter =
180
86.2k
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), FileReadTimeProfile, 1);
181
182
86.2k
    _runtime_filter_partition_pruned_range_counter =
183
86.2k
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
184
86.2k
                                   "RuntimeFilterPartitionPrunedRangeNum", TUnit::UNIT, 1);
185
186
86.2k
    _file_cache_statistics.reset(new io::FileCacheStatistics());
187
86.2k
    _file_reader_stats.reset(new io::FileReaderStats());
188
189
86.2k
    RETURN_IF_ERROR(_init_io_ctx());
190
86.2k
    _io_ctx->file_cache_stats = _file_cache_statistics.get();
191
86.2k
    _io_ctx->file_reader_stats = _file_reader_stats.get();
192
86.2k
    _io_ctx->is_disposable = _state->query_options().disable_file_cache;
193
194
86.2k
    if (_is_load) {
195
2.48k
        _src_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
196
2.48k
                                              std::vector<TupleId>({_input_tuple_desc->id()})));
197
        // prepare pre filters
198
2.48k
        if (_params->__isset.pre_filter_exprs_list) {
199
4
            RETURN_IF_ERROR(doris::VExpr::create_expr_trees(_params->pre_filter_exprs_list,
200
4
                                                            _pre_conjunct_ctxs));
201
2.48k
        } else if (_params->__isset.pre_filter_exprs) {
202
0
            VExprContextSPtr context;
203
0
            RETURN_IF_ERROR(doris::VExpr::create_expr_tree(_params->pre_filter_exprs, context));
204
0
            _pre_conjunct_ctxs.emplace_back(context);
205
0
        }
206
207
2.48k
        for (auto& conjunct : _pre_conjunct_ctxs) {
208
5
            RETURN_IF_ERROR(conjunct->prepare(_state, *_src_row_desc));
209
5
            RETURN_IF_ERROR(conjunct->open(_state));
210
5
        }
211
212
2.48k
        _dest_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
213
2.48k
                                               std::vector<TupleId>({_output_tuple_desc->id()})));
214
2.48k
    }
215
216
86.2k
    _default_val_row_desc.reset(
217
86.2k
            new RowDescriptor(_state->desc_tbl(), std::vector<TupleId>({_real_tuple_desc->id()})));
218
219
86.2k
    return Status::OK();
220
86.2k
}
221
222
// check if the expr is a partition pruning expr
223
27.6k
bool FileScanner::_check_partition_prune_expr(const VExprSPtr& expr) {
224
27.6k
    if (expr->is_slot_ref()) {
225
10.8k
        auto* slot_ref = static_cast<VSlotRef*>(expr.get());
226
10.8k
        return _partition_slot_index_map.find(slot_ref->slot_id()) !=
227
10.8k
               _partition_slot_index_map.end();
228
10.8k
    }
229
16.7k
    if (expr->is_literal()) {
230
5.22k
        return true;
231
5.22k
    }
232
16.9k
    return std::ranges::all_of(expr->children(), [this](const auto& child) {
233
16.9k
        return _check_partition_prune_expr(child);
234
16.9k
    });
235
16.7k
}
236
237
10.6k
void FileScanner::_init_runtime_filter_partition_prune_ctxs() {
238
10.6k
    _runtime_filter_partition_prune_ctxs.clear();
239
10.7k
    for (auto& conjunct : _conjuncts) {
240
10.7k
        auto impl = conjunct->root()->get_impl();
241
        // If impl is not null, which means this a conjuncts from runtime filter.
242
10.7k
        auto expr = impl ? impl : conjunct->root();
243
10.7k
        if (_check_partition_prune_expr(expr)) {
244
7.60k
            _runtime_filter_partition_prune_ctxs.emplace_back(conjunct);
245
7.60k
        }
246
10.7k
    }
247
10.6k
}
248
249
6.91k
void FileScanner::_init_runtime_filter_partition_prune_block() {
250
    // init block with empty column
251
65.5k
    for (auto const* slot_desc : _real_tuple_desc->slots()) {
252
65.5k
        _runtime_filter_partition_prune_block.insert(
253
65.5k
                ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
254
65.5k
                                      slot_desc->get_data_type_ptr(), slot_desc->col_name()));
255
65.5k
    }
256
6.91k
}
257
258
7.68k
Status FileScanner::_process_runtime_filters_partition_prune(bool& can_filter_all) {
259
7.68k
    SCOPED_TIMER(_runtime_filter_partition_prune_timer);
260
7.68k
    if (_runtime_filter_partition_prune_ctxs.empty() || _partition_col_descs.empty()) {
261
6.27k
        return Status::OK();
262
6.27k
    }
263
1.41k
    size_t partition_value_column_size = 1;
264
265
    // 1. Get partition key values to string columns.
266
1.41k
    std::unordered_map<SlotId, MutableColumnPtr> partition_slot_id_to_column;
267
2.13k
    for (auto const& partition_col_desc : _partition_col_descs) {
268
2.13k
        const auto& [partition_value, partition_slot_desc] = partition_col_desc.second;
269
2.13k
        auto data_type = partition_slot_desc->get_data_type_ptr();
270
2.13k
        auto test_serde = data_type->get_serde();
271
2.13k
        auto partition_value_column = data_type->create_column();
272
2.13k
        auto* col_ptr = static_cast<IColumn*>(partition_value_column.get());
273
2.13k
        Slice slice(partition_value.data(), partition_value.size());
274
2.13k
        uint64_t num_deserialized = 0;
275
2.13k
        DataTypeSerDe::FormatOptions options {};
276
2.13k
        if (_partition_value_is_null.contains(partition_slot_desc->col_name())) {
277
            // for iceberg/paimon table
278
            // NOTICE: column is always be nullable for iceberg/paimon table now
279
0
            DCHECK(data_type->is_nullable());
280
0
            test_serde = test_serde->get_nested_serdes()[0];
281
0
            auto* null_column = assert_cast<ColumnNullable*>(col_ptr);
282
0
            if (_partition_value_is_null[partition_slot_desc->col_name()]) {
283
0
                null_column->insert_many_defaults(partition_value_column_size);
284
0
            } else {
285
                // If the partition value is not null, we set null map to 0 and deserialize it normally.
286
0
                null_column->get_null_map_column().insert_many_vals(0, partition_value_column_size);
287
0
                RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
288
0
                        null_column->get_nested_column(), slice, partition_value_column_size,
289
0
                        &num_deserialized, options));
290
0
            }
291
2.13k
        } else {
292
            // for hive/hudi table, the null value is set as "\\N"
293
            // TODO: this will be unified as iceberg/paimon table in the future
294
2.13k
            RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
295
2.13k
                    *col_ptr, slice, partition_value_column_size, &num_deserialized, options));
296
2.13k
        }
297
298
2.13k
        partition_slot_id_to_column[partition_slot_desc->id()] = std::move(partition_value_column);
299
2.13k
    }
300
301
    // 2. Fill _runtime_filter_partition_prune_block from the partition column, then execute conjuncts and filter block.
302
    // 2.1 Fill _runtime_filter_partition_prune_block from the partition column to match the conjuncts executing.
303
1.41k
    size_t index = 0;
304
1.41k
    bool first_column_filled = false;
305
4.52k
    for (auto const* slot_desc : _real_tuple_desc->slots()) {
306
4.52k
        if (partition_slot_id_to_column.find(slot_desc->id()) !=
307
4.52k
            partition_slot_id_to_column.end()) {
308
2.13k
            auto data_type = slot_desc->get_data_type_ptr();
309
2.13k
            auto partition_value_column = std::move(partition_slot_id_to_column[slot_desc->id()]);
310
2.13k
            if (data_type->is_nullable()) {
311
2.13k
                _runtime_filter_partition_prune_block.insert(
312
2.13k
                        index, ColumnWithTypeAndName(
313
2.13k
                                       ColumnNullable::create(
314
2.13k
                                               std::move(partition_value_column),
315
2.13k
                                               ColumnUInt8::create(partition_value_column_size, 0)),
316
2.13k
                                       data_type, slot_desc->col_name()));
317
2.13k
            } else {
318
6
                _runtime_filter_partition_prune_block.insert(
319
6
                        index, ColumnWithTypeAndName(std::move(partition_value_column), data_type,
320
6
                                                     slot_desc->col_name()));
321
6
            }
322
2.13k
            if (index == 0) {
323
218
                first_column_filled = true;
324
218
            }
325
2.13k
        }
326
4.52k
        index++;
327
4.52k
    }
328
329
    // 2.2 Execute conjuncts.
330
1.41k
    if (!first_column_filled) {
331
        // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
332
        // The following process may be tricky and time-consuming, but we have no other way.
333
1.19k
        _runtime_filter_partition_prune_block.get_by_position(0).column->assume_mutable()->resize(
334
1.19k
                partition_value_column_size);
335
1.19k
    }
336
1.41k
    IColumn::Filter result_filter(_runtime_filter_partition_prune_block.rows(), 1);
337
1.41k
    RETURN_IF_ERROR(VExprContext::execute_conjuncts(_runtime_filter_partition_prune_ctxs, nullptr,
338
1.41k
                                                    &_runtime_filter_partition_prune_block,
339
1.41k
                                                    &result_filter, &can_filter_all));
340
1.41k
    return Status::OK();
341
1.41k
}
342
343
20.9k
Status FileScanner::_process_conjuncts() {
344
20.9k
    _slot_id_to_filter_conjuncts.clear();
345
20.9k
    _not_single_slot_filter_conjuncts.clear();
346
29.5k
    for (auto& conjunct : _push_down_conjuncts) {
347
29.5k
        auto impl = conjunct->root()->get_impl();
348
        // If impl is not null, which means this a conjuncts from runtime filter.
349
29.5k
        auto cur_expr = impl ? impl : conjunct->root();
350
351
29.5k
        std::vector<int> slot_ids;
352
29.5k
        _get_slot_ids(cur_expr.get(), &slot_ids);
353
29.5k
        if (slot_ids.empty()) {
354
1
            _not_single_slot_filter_conjuncts.emplace_back(conjunct);
355
1
            continue;
356
1
        }
357
29.5k
        bool single_slot = true;
358
29.6k
        for (int i = 1; i < slot_ids.size(); i++) {
359
880
            if (slot_ids[i] != slot_ids[0]) {
360
772
                single_slot = false;
361
772
                break;
362
772
            }
363
880
        }
364
29.5k
        if (single_slot) {
365
28.7k
            SlotId slot_id = slot_ids[0];
366
28.7k
            _slot_id_to_filter_conjuncts[slot_id].emplace_back(conjunct);
367
28.7k
        } else {
368
766
            _not_single_slot_filter_conjuncts.emplace_back(conjunct);
369
766
        }
370
29.5k
    }
371
20.9k
    return Status::OK();
372
20.9k
}
373
374
57.7k
Status FileScanner::_process_late_arrival_conjuncts() {
375
57.7k
    if (_push_down_conjuncts.size() < _conjuncts.size()) {
376
20.9k
        _push_down_conjuncts = _conjuncts;
377
        // Do not clear _conjuncts here!
378
        // We must keep it for fallback filtering, especially when mixing
379
        // Native readers (which use _push_down_conjuncts) and JNI readers (which rely on _conjuncts).
380
        // _conjuncts.clear();
381
20.9k
        RETURN_IF_ERROR(_process_conjuncts());
382
20.9k
    }
383
57.7k
    if (_applied_rf_num == _total_rf_num) {
384
55.1k
        _local_state->scanner_profile()->add_info_string("ApplyAllRuntimeFilters", "True");
385
55.1k
    }
386
57.7k
    return Status::OK();
387
57.7k
}
388
389
62.6k
void FileScanner::_get_slot_ids(VExpr* expr, std::vector<int>* slot_ids) {
390
63.5k
    for (auto& child_expr : expr->children()) {
391
63.5k
        if (child_expr->is_slot_ref()) {
392
30.4k
            VSlotRef* slot_ref = reinterpret_cast<VSlotRef*>(child_expr.get());
393
30.4k
            SlotDescriptor* slot_desc = _state->desc_tbl().get_slot_descriptor(slot_ref->slot_id());
394
30.4k
            slot_desc->set_is_predicate(true);
395
30.4k
            slot_ids->emplace_back(slot_ref->slot_id());
396
33.0k
        } else {
397
33.0k
            _get_slot_ids(child_expr.get(), slot_ids);
398
33.0k
        }
399
63.5k
    }
400
62.6k
}
401
402
86.4k
Status FileScanner::_open_impl(RuntimeState* state) {
403
86.4k
    RETURN_IF_CANCELLED(state);
404
86.4k
    RETURN_IF_ERROR(Scanner::_open_impl(state));
405
86.4k
    if (_local_state) {
406
86.4k
        _condition_cache_digest = _local_state->get_condition_cache_digest();
407
86.4k
    }
408
86.4k
    RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range));
409
86.4k
    if (_first_scan_range) {
410
49.8k
        RETURN_IF_ERROR(_init_expr_ctxes());
411
49.8k
        if (_state->query_options().enable_runtime_filter_partition_prune &&
412
49.8k
            !_partition_slot_index_map.empty()) {
413
6.91k
            _init_runtime_filter_partition_prune_ctxs();
414
6.91k
            _init_runtime_filter_partition_prune_block();
415
6.91k
        }
416
49.8k
    } else {
417
        // there's no scan range in split source. stop scanner directly.
418
36.6k
        _scanner_eof = true;
419
36.6k
    }
420
421
86.4k
    return Status::OK();
422
86.4k
}
423
424
257k
Status FileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
425
257k
    Status st = _get_block_wrapped(state, block, eof);
426
427
257k
    if (!st.ok()) {
428
        // add cur path in error msg for easy debugging
429
72
        return std::move(st.append(". cur path: " + get_current_scan_range_name()));
430
72
    }
431
257k
    return st;
432
257k
}
433
434
// For query:
435
//                              [exist cols]  [non-exist cols]  [col from path]  input  output
436
//                              A     B    C  D                 E
437
// _init_src_block              x     x    x  x                 x                -      x
438
// get_next_block               x     x    x  -                 -                -      x
439
// _cast_to_input_block         -     -    -  -                 -                -      -
440
// _fill_columns_from_path      -     -    -  -                 x                -      x
441
// _fill_missing_columns        -     -    -  x                 -                -      x
442
// _convert_to_output_block     -     -    -  -                 -                -      -
443
//
444
// For load:
445
//                              [exist cols]  [non-exist cols]  [col from path]  input  output
446
//                              A     B    C  D                 E
447
// _init_src_block              x     x    x  x                 x                x      -
448
// get_next_block               x     x    x  -                 -                x      -
449
// _cast_to_input_block         x     x    x  -                 -                x      -
450
// _fill_columns_from_path      -     -    -  -                 x                x      -
451
// _fill_missing_columns        -     -    -  x                 -                x      -
452
// _convert_to_output_block     -     -    -  -                 -                -      x
453
257k
Status FileScanner::_get_block_wrapped(RuntimeState* state, Block* block, bool* eof) {
454
257k
    do {
455
257k
        RETURN_IF_CANCELLED(state);
456
257k
        if (_cur_reader == nullptr || _cur_reader_eof) {
457
157k
            _finalize_reader_condition_cache();
458
            // The file may not exist because the file list is got from meta cache,
459
            // And the file may already be removed from storage.
460
            // Just ignore not found files.
461
157k
            Status st = _get_next_reader();
462
157k
            if (st.is<ErrorCode::NOT_FOUND>() && config::ignore_not_found_file_in_external_table) {
463
0
                _cur_reader_eof = true;
464
0
                COUNTER_UPDATE(_not_found_file_counter, 1);
465
0
                continue;
466
157k
            } else if (st.is<ErrorCode::END_OF_FILE>()) {
467
16
                _cur_reader_eof = true;
468
16
                COUNTER_UPDATE(_fully_skipped_file_counter, 1);
469
16
                continue;
470
157k
            } else if (!st) {
471
2
                return st;
472
2
            }
473
157k
            _init_reader_condition_cache();
474
157k
        }
475
476
257k
        if (_scanner_eof) {
477
85.8k
            *eof = true;
478
85.8k
            return Status::OK();
479
85.8k
        }
480
481
        // Init src block for load job based on the data file schema (e.g. parquet)
482
        // For query job, simply set _src_block_ptr to block.
483
171k
        size_t read_rows = 0;
484
171k
        RETURN_IF_ERROR(_init_src_block(block));
485
171k
        {
486
171k
            SCOPED_TIMER(_get_block_timer);
487
488
            // Read next block.
489
            // Some of column in block may not be filled (column not exist in file)
490
171k
            RETURN_IF_ERROR(
491
171k
                    _cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof));
492
171k
        }
493
        // use read_rows instead of _src_block_ptr->rows(), because the first column of _src_block_ptr
494
        // may not be filled after calling `get_next_block()`, so _src_block_ptr->rows() may return wrong result.
495
171k
        if (read_rows > 0) {
496
109k
            if ((!_cur_reader->count_read_rows()) && _io_ctx) {
497
17.4k
                _io_ctx->file_reader_stats->read_rows += read_rows;
498
17.4k
            }
499
            // If the push_down_agg_type is COUNT, no need to do the rest,
500
            // because we only save a number in block.
501
109k
            if (_get_push_down_agg_type() != TPushAggOp::type::COUNT) {
502
105k
                RETURN_IF_ERROR(_process_src_block_after_read(block));
503
105k
            }
504
109k
        }
505
171k
        break;
506
171k
    } while (true);
507
508
    // Update filtered rows and unselected rows for load, reset counter.
509
    // {
510
    //     state->update_num_rows_load_filtered(_counter.num_rows_filtered);
511
    //     state->update_num_rows_load_unselected(_counter.num_rows_unselected);
512
    //     _reset_counter();
513
    // }
514
171k
    return Status::OK();
515
257k
}
516
517
/**
518
 * Check whether there are complex types in parquet/orc reader in broker/stream load.
519
 * Broker/stream load will cast any type as string type, and complex types will be casted wrong.
520
 * This is a temporary method, and will be replaced by tvf.
521
 */
522
7.85k
Status FileScanner::_check_output_block_types() {
523
    // Only called from _init_src_block_for_load, so _is_load is always true.
524
7.85k
    TFileFormatType::type format_type = _params->format_type;
525
7.85k
    if (format_type == TFileFormatType::FORMAT_PARQUET ||
526
7.85k
        format_type == TFileFormatType::FORMAT_ORC) {
527
1.26k
        for (auto slot : _output_tuple_desc->slots()) {
528
1.26k
            if (is_complex_type(slot->type()->get_primitive_type())) {
529
0
                return Status::InternalError(
530
0
                        "Parquet/orc doesn't support complex types in broker/stream load, "
531
0
                        "please use tvf(table value function) to insert complex types.");
532
0
            }
533
1.26k
        }
534
50
    }
535
7.85k
    return Status::OK();
536
7.85k
}
537
538
171k
Status FileScanner::_init_src_block(Block* block) {
539
171k
    DCHECK(_init_src_block_handler != nullptr);
540
171k
    return (this->*_init_src_block_handler)(block);
541
171k
}
542
543
163k
Status FileScanner::_init_src_block_for_query(Block* block) {
544
163k
    _src_block_ptr = block;
545
546
    // Build name to index map only once on first call.
547
163k
    if (_src_block_name_to_idx.empty()) {
548
50.3k
        _src_block_name_to_idx = block->get_name_to_pos_map();
549
50.3k
    }
550
163k
    return Status::OK();
551
163k
}
552
553
7.85k
Status FileScanner::_init_src_block_for_load(Block* block) {
554
7.85k
    static_cast<void>(block);
555
7.85k
    RETURN_IF_ERROR(_check_output_block_types());
556
557
    // if (_src_block_init) {
558
    //     _src_block.clear_column_data();
559
    //     _src_block_ptr = &_src_block;
560
    //     return Status::OK();
561
    // }
562
563
7.85k
    _src_block.clear();
564
7.85k
    uint32_t idx = 0;
565
    // slots in _input_tuple_desc contains all slots describe in load statement, eg:
566
    // -H "columns: k1, k2, tmp1, k3 = tmp1 + 1"
567
    // _input_tuple_desc will contains: k1, k2, tmp1
568
    // and some of them are from file, such as k1 and k2, and some of them may not exist in file, such as tmp1
569
    // _input_tuple_desc also contains columns from path
570
88.9k
    for (auto& slot : _input_tuple_desc->slots()) {
571
88.9k
        DataTypePtr data_type;
572
88.9k
        auto it = _slot_lower_name_to_col_type.find(slot->col_name());
573
88.9k
        if (slot->is_skip_bitmap_col()) {
574
314
            _skip_bitmap_col_idx = idx;
575
314
        }
576
88.9k
        if (_params->__isset.sequence_map_col) {
577
1.25k
            if (_params->sequence_map_col == slot->col_name()) {
578
198
                _sequence_map_col_uid = slot->col_unique_id();
579
198
            }
580
1.25k
        }
581
88.9k
        data_type =
582
88.9k
                it == _slot_lower_name_to_col_type.end() ? slot->type() : make_nullable(it->second);
583
88.9k
        MutableColumnPtr data_column = data_type->create_column();
584
88.9k
        _src_block.insert(
585
88.9k
                ColumnWithTypeAndName(std::move(data_column), data_type, slot->col_name()));
586
88.9k
        _src_block_name_to_idx.emplace(slot->col_name(), idx++);
587
88.9k
    }
588
7.85k
    if (_params->__isset.sequence_map_col) {
589
1.76k
        for (const auto& slot : _output_tuple_desc->slots()) {
590
            // When the target table has seqeunce map column, _input_tuple_desc will not contains __DORIS_SEQUENCE_COL__,
591
            // so we should get its column unique id from _output_tuple_desc
592
1.76k
            if (slot->is_sequence_col()) {
593
206
                _sequence_col_uid = slot->col_unique_id();
594
206
            }
595
1.76k
        }
596
242
    }
597
7.85k
    _src_block_ptr = &_src_block;
598
7.85k
    _src_block_init = true;
599
7.85k
    return Status::OK();
600
7.85k
}
601
602
5.38k
Status FileScanner::_cast_to_input_block(Block* block) {
603
    // Only called from _process_src_block_after_read_for_load, so _is_load is always true.
604
5.38k
    SCOPED_TIMER(_cast_to_input_block_timer);
605
    // cast primitive type(PT0) to primitive type(PT1)
606
5.38k
    uint32_t idx = 0;
607
66.3k
    for (auto& slot_desc : _input_tuple_desc->slots()) {
608
66.3k
        if (_slot_lower_name_to_col_type.find(slot_desc->col_name()) ==
609
66.3k
            _slot_lower_name_to_col_type.end()) {
610
            // skip columns which does not exist in file
611
456
            continue;
612
456
        }
613
65.8k
        auto& arg = _src_block_ptr->get_by_position(_src_block_name_to_idx[slot_desc->col_name()]);
614
65.8k
        auto return_type = slot_desc->get_data_type_ptr();
615
        // remove nullable here, let the get_function decide whether nullable
616
65.8k
        auto data_type = get_data_type_with_default_argument(remove_nullable(return_type));
617
65.8k
        ColumnsWithTypeAndName arguments {
618
65.8k
                arg, {data_type->create_column(), data_type, slot_desc->col_name()}};
619
65.8k
        auto func_cast =
620
65.8k
                SimpleFunctionFactory::instance().get_function("CAST", arguments, return_type, {});
621
65.8k
        if (!func_cast) {
622
0
            return Status::InternalError("Function CAST[arg={}, col name={}, return={}] not found!",
623
0
                                         arg.type->get_name(), slot_desc->col_name(),
624
0
                                         return_type->get_name());
625
0
        }
626
65.8k
        idx = _src_block_name_to_idx[slot_desc->col_name()];
627
65.8k
        DCHECK(_state != nullptr);
628
65.8k
        auto ctx = FunctionContext::create_context(_state, {}, {});
629
65.8k
        RETURN_IF_ERROR(
630
65.8k
                func_cast->execute(ctx.get(), *_src_block_ptr, {idx}, idx, arg.column->size()));
631
65.8k
        _src_block_ptr->get_by_position(idx).type = std::move(return_type);
632
65.8k
    }
633
5.38k
    return Status::OK();
634
5.38k
}
635
636
5.38k
Status FileScanner::_pre_filter_src_block() {
637
    // Only called from _process_src_block_after_read_for_load, so _is_load is always true.
638
5.38k
    if (!_pre_conjunct_ctxs.empty()) {
639
28
        SCOPED_TIMER(_pre_filter_timer);
640
28
        auto origin_column_num = _src_block_ptr->columns();
641
28
        auto old_rows = _src_block_ptr->rows();
642
28
        RETURN_IF_ERROR(
643
28
                VExprContext::filter_block(_pre_conjunct_ctxs, _src_block_ptr, origin_column_num));
644
28
        _counter.num_rows_unselected += old_rows - _src_block_ptr->rows();
645
28
    }
646
5.38k
    return Status::OK();
647
5.38k
}
648
649
5.38k
Status FileScanner::_convert_to_output_block(Block* block) {
650
    // Only called from _process_src_block_after_read_for_load, so _is_load is always true.
651
5.38k
    SCOPED_TIMER(_convert_to_output_block_timer);
652
    // The block is passed from scanner context's free blocks,
653
    // which is initialized by output columns
654
    // so no need to clear it
655
    // block->clear();
656
657
5.38k
    int ctx_idx = 0;
658
5.38k
    size_t rows = _src_block_ptr->rows();
659
5.38k
    auto filter_column = ColumnUInt8::create(rows, 1);
660
5.38k
    auto& filter_map = filter_column->get_data();
661
662
    // After convert, the column_ptr should be copied into output block.
663
    // Can not use block->insert() because it may cause use_count() non-zero bug
664
5.38k
    MutableBlock mutable_output_block =
665
5.38k
            VectorizedUtils::build_mutable_mem_reuse_block(block, *_dest_row_desc);
666
5.38k
    auto& mutable_output_columns = mutable_output_block.mutable_columns();
667
668
5.38k
    std::vector<BitmapValue>* skip_bitmaps {nullptr};
669
5.38k
    if (_should_process_skip_bitmap_col()) {
670
157
        auto* skip_bitmap_nullable_col_ptr =
671
157
                assert_cast<ColumnNullable*>(_src_block_ptr->get_by_position(_skip_bitmap_col_idx)
672
157
                                                     .column->assume_mutable()
673
157
                                                     .get());
674
157
        skip_bitmaps = &(assert_cast<ColumnBitmap*>(
675
157
                                 skip_bitmap_nullable_col_ptr->get_nested_column_ptr().get())
676
157
                                 ->get_data());
677
        // NOTE:
678
        // - If the table has sequence type column, __DORIS_SEQUENCE_COL__ will be put in _input_tuple_desc, so whether
679
        //   __DORIS_SEQUENCE_COL__ will be marked in skip bitmap depends on whether it's specified in that row
680
        // - If the table has sequence map column, __DORIS_SEQUENCE_COL__ will not be put in _input_tuple_desc,
681
        //   so __DORIS_SEQUENCE_COL__ will be ommited if it't specified in a row and will not be marked in skip bitmap.
682
        //   So we should mark __DORIS_SEQUENCE_COL__ in skip bitmap here if the corresponding sequence map column us marked
683
157
        if (_sequence_map_col_uid != -1) {
684
347
            for (int j = 0; j < rows; ++j) {
685
328
                if ((*skip_bitmaps)[j].contains(_sequence_map_col_uid)) {
686
137
                    (*skip_bitmaps)[j].add(_sequence_col_uid);
687
137
                }
688
328
            }
689
19
        }
690
157
    }
691
692
    // for (auto slot_desc : _output_tuple_desc->slots()) {
693
77.4k
    for (int j = 0; j < mutable_output_columns.size(); ++j) {
694
72.1k
        auto* slot_desc = _output_tuple_desc->slots()[j];
695
72.1k
        int dest_index = ctx_idx;
696
72.1k
        ColumnPtr column_ptr;
697
698
72.1k
        auto& ctx = _dest_vexpr_ctx[dest_index];
699
        // PT1 => dest primitive type
700
72.1k
        RETURN_IF_ERROR(ctx->execute(_src_block_ptr, column_ptr));
701
        // column_ptr maybe a ColumnConst, convert it to a normal column
702
72.1k
        column_ptr = column_ptr->convert_to_full_column_if_const();
703
72.1k
        DCHECK(column_ptr);
704
705
        // because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr
706
        // is likely to be nullable
707
72.1k
        if (LIKELY(column_ptr->is_nullable())) {
708
66.9k
            const auto* nullable_column = reinterpret_cast<const ColumnNullable*>(column_ptr.get());
709
177M
            for (int i = 0; i < rows; ++i) {
710
177M
                if (filter_map[i] && nullable_column->is_null_at(i)) {
711
                    // skip checks for non-mentioned columns in flexible partial update
712
2.03M
                    if (skip_bitmaps == nullptr ||
713
2.03M
                        !skip_bitmaps->at(i).contains(slot_desc->col_unique_id())) {
714
                        // clang-format off
715
1.98M
                        if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) &&
716
1.98M
                            !_src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index]).column->is_null_at(i)) {
717
5.49k
                            filter_map[i] = false;
718
5.49k
                            RETURN_IF_ERROR(_state->append_error_msg_to_file(
719
5.49k
                                [&]() -> std::string {
720
5.49k
                                    return _src_block_ptr->dump_one_line(i, _num_of_columns_from_file);
721
5.49k
                                },
722
5.49k
                                [&]() -> std::string {
723
5.49k
                                    auto raw_value =
724
5.49k
                                            _src_block_ptr->get_by_position(_dest_slot_to_src_slot_index[dest_index]).column->get_data_at(i);
725
5.49k
                                    std::string raw_string = raw_value.to_string();
726
5.49k
                                    fmt::memory_buffer error_msg;
727
5.49k
                                    fmt::format_to(error_msg,"column({}) value is incorrect while strict mode is {}, src value is {}",
728
5.49k
                                            slot_desc->col_name(), _strict_mode, raw_string);
729
5.49k
                                    return fmt::to_string(error_msg);
730
5.49k
                                }));
731
1.97M
                        } else if (!slot_desc->is_nullable()) {
732
648
                            filter_map[i] = false;
733
648
                            RETURN_IF_ERROR(_state->append_error_msg_to_file(
734
648
                                [&]() -> std::string {
735
648
                                    return _src_block_ptr->dump_one_line(i, _num_of_columns_from_file);
736
648
                                },
737
648
                                [&]() -> std::string {
738
648
                                    fmt::memory_buffer error_msg;
739
648
                                    fmt::format_to(error_msg, "column({}) values is null while columns is not nullable", slot_desc->col_name());
740
648
                                    return fmt::to_string(error_msg);
741
648
                                }));
742
648
                        }
743
                        // clang-format on
744
1.98M
                    }
745
2.03M
                }
746
177M
            }
747
66.9k
            if (!slot_desc->is_nullable()) {
748
42.2k
                column_ptr = remove_nullable(column_ptr);
749
42.2k
            }
750
66.9k
        } else if (slot_desc->is_nullable()) {
751
115
            column_ptr = make_nullable(column_ptr);
752
115
        }
753
72.1k
        mutable_output_columns[j]->insert_range_from(*column_ptr, 0, rows);
754
72.1k
        ctx_idx++;
755
72.1k
    }
756
757
    // after do the dest block insert operation, clear _src_block to remove the reference of origin column
758
5.38k
    _src_block_ptr->clear();
759
760
5.38k
    size_t dest_size = block->columns();
761
    // do filter
762
5.38k
    block->insert(ColumnWithTypeAndName(std::move(filter_column), std::make_shared<DataTypeUInt8>(),
763
5.38k
                                        "filter column"));
764
5.38k
    RETURN_IF_ERROR(Block::filter_block(block, dest_size, dest_size));
765
766
5.38k
    _counter.num_rows_filtered += rows - block->rows();
767
5.38k
    return Status::OK();
768
5.38k
}
769
770
104k
Status FileScanner::_process_src_block_after_read(Block* block) {
771
104k
    DCHECK(_process_src_block_after_read_handler != nullptr);
772
104k
    return (this->*_process_src_block_after_read_handler)(block);
773
104k
}
774
775
99.6k
Status FileScanner::_process_src_block_after_read_for_query(Block* block) {
776
    // Truncate CHAR/VARCHAR columns when target size is smaller than file schema.
777
    // This is needed for external table queries with truncate_char_or_varchar_columns=true.
778
99.6k
    RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block));
779
99.6k
    return Status::OK();
780
99.6k
}
781
782
5.38k
Status FileScanner::_process_src_block_after_read_for_load(Block* block) {
783
    // Convert the src block columns type in-place.
784
5.38k
    RETURN_IF_ERROR(_cast_to_input_block(block));
785
    // All readers fill partition and missing columns inside get_next_block:
786
    //   ORC/Parquet: in _do_get_next_block via on_fill_partition_columns/on_fill_missing_columns
787
    //   CSV/JSON/others: in on_after_read_block via fill_remaining_columns
788
    // Assert all columns have consistent row counts.
789
5.38k
    if (_src_block_ptr->columns() > 0) {
790
5.38k
        size_t rows = _src_block_ptr->get_by_position(0).column->size();
791
66.3k
        for (size_t i = 1; i < _src_block_ptr->columns(); ++i) {
792
60.9k
            DCHECK_EQ(_src_block_ptr->get_by_position(i).column->size(), rows)
793
0
                    << "Column " << _src_block_ptr->get_by_position(i).name << " has "
794
0
                    << _src_block_ptr->get_by_position(i).column->size() << " rows, expected "
795
0
                    << rows;
796
60.9k
        }
797
5.38k
    }
798
    // Apply _pre_conjunct_ctxs to filter src block.
799
5.38k
    RETURN_IF_ERROR(_pre_filter_src_block());
800
801
    // Convert src block to output block (dest block), then apply filters.
802
5.38k
    RETURN_IF_ERROR(_convert_to_output_block(block));
803
    // Truncate CHAR/VARCHAR columns when target size is smaller than file schema.
804
5.38k
    RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block));
805
5.38k
    return Status::OK();
806
5.38k
}
807
808
104k
Status FileScanner::_truncate_char_or_varchar_columns(Block* block) {
809
    // Truncate char columns or varchar columns if size is smaller than file columns
810
    // or not found in the file column schema.
811
104k
    if (!_state->query_options().truncate_char_or_varchar_columns) {
812
104k
        return Status::OK();
813
104k
    }
814
10
    int idx = 0;
815
36
    for (auto* slot_desc : _real_tuple_desc->slots()) {
816
36
        const auto& type = slot_desc->type();
817
36
        if (type->get_primitive_type() != TYPE_VARCHAR && type->get_primitive_type() != TYPE_CHAR) {
818
12
            ++idx;
819
12
            continue;
820
12
        }
821
24
        auto iter = _source_file_col_name_types.find(slot_desc->col_name());
822
24
        if (iter != _source_file_col_name_types.end()) {
823
16
            const auto file_type_desc = _source_file_col_name_types[slot_desc->col_name()];
824
16
            int l = -1;
825
16
            if (auto* ftype = check_and_get_data_type<DataTypeString>(
826
16
                        remove_nullable(file_type_desc).get())) {
827
16
                l = ftype->len();
828
16
            }
829
16
            if ((assert_cast<const DataTypeString*>(remove_nullable(type).get())->len() > 0) &&
830
16
                (assert_cast<const DataTypeString*>(remove_nullable(type).get())->len() < l ||
831
16
                 l < 0)) {
832
16
                _truncate_char_or_varchar_column(
833
16
                        block, idx,
834
16
                        assert_cast<const DataTypeString*>(remove_nullable(type).get())->len());
835
16
            }
836
16
        } else {
837
8
            _truncate_char_or_varchar_column(
838
8
                    block, idx,
839
8
                    assert_cast<const DataTypeString*>(remove_nullable(type).get())->len());
840
8
        }
841
24
        ++idx;
842
24
    }
843
10
    return Status::OK();
844
104k
}
845
846
// VARCHAR substring(VARCHAR str, INT pos[, INT len])
847
24
void FileScanner::_truncate_char_or_varchar_column(Block* block, int idx, int len) {
848
24
    auto int_type = std::make_shared<DataTypeInt32>();
849
24
    uint32_t num_columns_without_result = block->columns();
850
24
    const ColumnNullable* col_nullable =
851
24
            assert_cast<const ColumnNullable*>(block->get_by_position(idx).column.get());
852
24
    const ColumnPtr& string_column_ptr = col_nullable->get_nested_column_ptr();
853
24
    ColumnPtr null_map_column_ptr = col_nullable->get_null_map_column_ptr();
854
24
    block->replace_by_position(idx, std::move(string_column_ptr));
855
24
    block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(1)), int_type,
856
24
                   "const 1"}); // pos is 1
857
24
    block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(len)), int_type,
858
24
                   fmt::format("const {}", len)});                          // len
859
24
    block->insert({nullptr, std::make_shared<DataTypeString>(), "result"}); // result column
860
24
    ColumnNumbers temp_arguments(3);
861
24
    temp_arguments[0] = idx;                            // str column
862
24
    temp_arguments[1] = num_columns_without_result;     // pos
863
24
    temp_arguments[2] = num_columns_without_result + 1; // len
864
24
    uint32_t result_column_id = num_columns_without_result + 2;
865
866
24
    SubstringUtil::substring_execute(*block, temp_arguments, result_column_id, block->rows());
867
24
    auto res = ColumnNullable::create(block->get_by_position(result_column_id).column,
868
24
                                      null_map_column_ptr);
869
24
    block->replace_by_position(idx, std::move(res));
870
24
    Block::erase_useless_column(block, num_columns_without_result);
871
24
}
872
873
7.52k
std::shared_ptr<segment_v2::RowIdColumnIteratorV2> FileScanner::_create_row_id_column_iterator() {
874
7.52k
    auto& id_file_map = _state->get_id_file_map();
875
7.52k
    auto file_id = id_file_map->get_file_mapping_id(
876
7.52k
            std::make_shared<FileMapping>(((FileScanLocalState*)_local_state)->parent_id(),
877
7.52k
                                          _current_range, _should_enable_file_meta_cache()));
878
7.52k
    return std::make_shared<RowIdColumnIteratorV2>(IdManager::ID_VERSION,
879
7.52k
                                                   BackendOptions::get_backend_id(), file_id);
880
7.52k
}
881
882
74.4k
void FileScanner::_fill_base_init_context(ReaderInitContext* ctx) {
883
74.4k
    ctx->column_descs = &_column_descs;
884
74.4k
    ctx->col_name_to_block_idx = &_src_block_name_to_idx;
885
74.4k
    ctx->state = _state;
886
74.4k
    ctx->tuple_descriptor = _real_tuple_desc;
887
74.4k
    ctx->row_descriptor = _default_val_row_desc.get();
888
74.4k
    ctx->params = _params;
889
74.4k
    ctx->range = &_current_range;
890
74.4k
    ctx->table_info_node = TableSchemaChangeHelper::ConstNode::get_instance();
891
74.4k
    ctx->push_down_agg_type = _get_push_down_agg_type();
892
74.4k
}
893
894
157k
Status FileScanner::_get_next_reader() {
895
157k
    while (true) {
896
157k
        if (_cur_reader) {
897
70.6k
            _cur_reader->collect_profile_before_close();
898
70.6k
            RETURN_IF_ERROR(_cur_reader->close());
899
70.6k
            _state->update_num_finished_scan_range(1);
900
70.6k
        }
901
157k
        _cur_reader.reset(nullptr);
902
157k
        _src_block_init = false;
903
157k
        bool has_next = _first_scan_range;
904
157k
        if (!_first_scan_range) {
905
107k
            RETURN_IF_ERROR(_split_source->get_next(&has_next, &_current_range));
906
107k
        }
907
157k
        _first_scan_range = false;
908
157k
        if (!has_next || _should_stop) {
909
85.8k
            _scanner_eof = true;
910
85.8k
            return Status::OK();
911
85.8k
        }
912
913
71.3k
        const TFileRangeDesc& range = _current_range;
914
71.3k
        _current_range_path = range.path;
915
916
71.3k
        if (!_partition_slot_index_map.empty()) {
917
            // we need get partition columns first for runtime filter partition pruning
918
7.69k
            RETURN_IF_ERROR(_generate_partition_columns());
919
920
7.69k
            if (_state->query_options().enable_runtime_filter_partition_prune) {
921
                // if enable_runtime_filter_partition_prune is true, we need to check whether this range can be filtered out
922
                // by runtime filter partition prune
923
7.67k
                if (_push_down_conjuncts.size() < _conjuncts.size()) {
924
                    // there are new runtime filters, need to re-init runtime filter partition pruning ctxs
925
3.74k
                    _init_runtime_filter_partition_prune_ctxs();
926
3.74k
                }
927
928
7.67k
                bool can_filter_all = false;
929
7.67k
                RETURN_IF_ERROR(_process_runtime_filters_partition_prune(can_filter_all));
930
7.67k
                if (can_filter_all) {
931
                    // this range can be filtered out by runtime filter partition pruning
932
                    // so we need to skip this range
933
52
                    COUNTER_UPDATE(_runtime_filter_partition_pruned_range_counter, 1);
934
52
                    continue;
935
52
                }
936
7.67k
            }
937
7.69k
        }
938
939
        // create reader for specific format
940
71.2k
        Status init_status = Status::OK();
941
71.2k
        TFileFormatType::type format_type = _get_current_format_type();
942
        // for compatibility, this logic is deprecated in 3.1
943
71.2k
        if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params) {
944
5.18k
            if (range.table_format_params.table_format_type == "paimon" &&
945
5.18k
                !range.table_format_params.paimon_params.__isset.paimon_split) {
946
                // use native reader
947
0
                auto format = range.table_format_params.paimon_params.file_format;
948
0
                if (format == "orc") {
949
0
                    format_type = TFileFormatType::FORMAT_ORC;
950
0
                } else if (format == "parquet") {
951
0
                    format_type = TFileFormatType::FORMAT_PARQUET;
952
0
                } else {
953
0
                    return Status::InternalError("Not supported paimon file format: {}", format);
954
0
                }
955
0
            }
956
5.18k
        }
957
958
71.2k
        bool push_down_predicates = _should_push_down_predicates(format_type);
959
71.2k
        bool need_to_get_parsed_schema = false;
960
71.2k
        switch (format_type) {
961
5.18k
        case TFileFormatType::FORMAT_JNI: {
962
5.18k
            ReaderInitContext jni_ctx;
963
5.18k
            _fill_base_init_context(&jni_ctx);
964
965
5.18k
            if (range.__isset.table_format_params &&
966
5.18k
                range.table_format_params.table_format_type == "max_compute") {
967
0
                const auto* mc_desc = static_cast<const MaxComputeTableDescriptor*>(
968
0
                        _real_tuple_desc->table_desc());
969
0
                if (!mc_desc->init_status()) {
970
0
                    return mc_desc->init_status();
971
0
                }
972
0
                std::unique_ptr<MaxComputeJniReader> mc_reader = MaxComputeJniReader::create_unique(
973
0
                        mc_desc, range.table_format_params.max_compute_params, _file_slot_descs,
974
0
                        range, _state, _profile);
975
0
                init_status = static_cast<GenericReader*>(mc_reader.get())->init_reader(&jni_ctx);
976
0
                _cur_reader = std::move(mc_reader);
977
5.18k
            } else if (range.__isset.table_format_params &&
978
5.18k
                       range.table_format_params.table_format_type == "paimon") {
979
1.91k
                if (_state->query_options().__isset.enable_paimon_cpp_reader &&
980
1.91k
                    _state->query_options().enable_paimon_cpp_reader) {
981
0
                    auto cpp_reader = PaimonCppReader::create_unique(_file_slot_descs, _state,
982
0
                                                                     _profile, range, _params);
983
0
                    if (!_is_load && !_push_down_conjuncts.empty()) {
984
0
                        PaimonPredicateConverter predicate_converter(_file_slot_descs, _state);
985
0
                        auto predicate = predicate_converter.build(_push_down_conjuncts);
986
0
                        if (predicate) {
987
0
                            cpp_reader->set_predicate(std::move(predicate));
988
0
                        }
989
0
                    }
990
0
                    init_status =
991
0
                            static_cast<GenericReader*>(cpp_reader.get())->init_reader(&jni_ctx);
992
0
                    _cur_reader = std::move(cpp_reader);
993
1.91k
                } else {
994
1.91k
                    auto paimon_reader = PaimonJniReader::create_unique(_file_slot_descs, _state,
995
1.91k
                                                                        _profile, range, _params);
996
1.91k
                    init_status =
997
1.91k
                            static_cast<GenericReader*>(paimon_reader.get())->init_reader(&jni_ctx);
998
1.91k
                    _cur_reader = std::move(paimon_reader);
999
1.91k
                }
1000
3.27k
            } else if (range.__isset.table_format_params &&
1001
3.27k
                       range.table_format_params.table_format_type == "hudi") {
1002
0
                auto hudi_reader = HudiJniReader::create_unique(
1003
0
                        *_params, range.table_format_params.hudi_params, _file_slot_descs, _state,
1004
0
                        _profile);
1005
0
                init_status = static_cast<GenericReader*>(hudi_reader.get())->init_reader(&jni_ctx);
1006
0
                _cur_reader = std::move(hudi_reader);
1007
3.27k
            } else if (range.__isset.table_format_params &&
1008
3.27k
                       range.table_format_params.table_format_type == "trino_connector") {
1009
788
                auto trino_reader = TrinoConnectorJniReader::create_unique(_file_slot_descs, _state,
1010
788
                                                                           _profile, range);
1011
788
                init_status =
1012
788
                        static_cast<GenericReader*>(trino_reader.get())->init_reader(&jni_ctx);
1013
788
                _cur_reader = std::move(trino_reader);
1014
2.48k
            } else if (range.__isset.table_format_params &&
1015
2.48k
                       range.table_format_params.table_format_type == "jdbc") {
1016
                // Extract jdbc params from table_format_params
1017
1.24k
                std::map<std::string, std::string> jdbc_params(
1018
1.24k
                        range.table_format_params.jdbc_params.begin(),
1019
1.24k
                        range.table_format_params.jdbc_params.end());
1020
1.24k
                auto jdbc_reader = JdbcJniReader::create_unique(_file_slot_descs, _state, _profile,
1021
1.24k
                                                                jdbc_params);
1022
1.24k
                init_status = static_cast<GenericReader*>(jdbc_reader.get())->init_reader(&jni_ctx);
1023
1.24k
                _cur_reader = std::move(jdbc_reader);
1024
1.24k
            } else if (range.__isset.table_format_params &&
1025
1.24k
                       range.table_format_params.table_format_type == "iceberg") {
1026
1.24k
                auto iceberg_sys_reader = IcebergSysTableJniReader::create_unique(
1027
1.24k
                        _file_slot_descs, _state, _profile, range, _params);
1028
1.24k
                init_status = static_cast<GenericReader*>(iceberg_sys_reader.get())
1029
1.24k
                                      ->init_reader(&jni_ctx);
1030
1.24k
                _cur_reader = std::move(iceberg_sys_reader);
1031
1.24k
            }
1032
            // Set col_name_to_block_idx for JNI readers to avoid repeated map creation
1033
5.18k
            if (_cur_reader) {
1034
5.18k
                if (auto* jni_reader = dynamic_cast<JniReader*>(_cur_reader.get())) {
1035
5.18k
                    jni_reader->set_col_name_to_block_idx(&_src_block_name_to_idx);
1036
5.18k
                }
1037
5.18k
            }
1038
5.18k
            break;
1039
5.18k
        }
1040
33.2k
        case TFileFormatType::FORMAT_PARQUET: {
1041
33.2k
            auto file_meta_cache_ptr = _should_enable_file_meta_cache()
1042
33.2k
                                               ? ExecEnv::GetInstance()->file_meta_cache()
1043
33.2k
                                               : nullptr;
1044
33.2k
            if (push_down_predicates) {
1045
33.1k
                RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1046
33.1k
            }
1047
33.2k
            RETURN_IF_ERROR(_init_parquet_reader(file_meta_cache_ptr));
1048
1049
33.1k
            need_to_get_parsed_schema = true;
1050
33.1k
            break;
1051
33.2k
        }
1052
24.5k
        case TFileFormatType::FORMAT_ORC: {
1053
24.5k
            auto file_meta_cache_ptr = _should_enable_file_meta_cache()
1054
24.5k
                                               ? ExecEnv::GetInstance()->file_meta_cache()
1055
24.5k
                                               : nullptr;
1056
24.5k
            if (push_down_predicates) {
1057
24.5k
                RETURN_IF_ERROR(_process_late_arrival_conjuncts());
1058
24.5k
            }
1059
24.5k
            RETURN_IF_ERROR(_init_orc_reader(file_meta_cache_ptr));
1060
1061
24.5k
            need_to_get_parsed_schema = true;
1062
24.5k
            break;
1063
24.5k
        }
1064
2.71k
        case TFileFormatType::FORMAT_CSV_PLAIN:
1065
2.71k
        case TFileFormatType::FORMAT_CSV_GZ:
1066
2.71k
        case TFileFormatType::FORMAT_CSV_BZ2:
1067
2.71k
        case TFileFormatType::FORMAT_CSV_LZ4FRAME:
1068
2.71k
        case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
1069
2.71k
        case TFileFormatType::FORMAT_CSV_LZOP:
1070
2.71k
        case TFileFormatType::FORMAT_CSV_DEFLATE:
1071
2.71k
        case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
1072
2.78k
        case TFileFormatType::FORMAT_PROTO: {
1073
2.78k
            auto reader = CsvReader::create_unique(_state, _profile, &_counter, *_params, range,
1074
2.78k
                                                   _file_slot_descs, _io_ctx.get());
1075
2.78k
            CsvInitContext csv_ctx;
1076
2.78k
            _fill_base_init_context(&csv_ctx);
1077
2.78k
            csv_ctx.is_load = _is_load;
1078
2.78k
            init_status = static_cast<GenericReader*>(reader.get())->init_reader(&csv_ctx);
1079
2.78k
            _cur_reader = std::move(reader);
1080
2.78k
            break;
1081
2.71k
        }
1082
4.65k
        case TFileFormatType::FORMAT_TEXT: {
1083
4.65k
            auto reader = TextReader::create_unique(_state, _profile, &_counter, *_params, range,
1084
4.65k
                                                    _file_slot_descs, _io_ctx.get());
1085
4.65k
            CsvInitContext text_ctx;
1086
4.65k
            _fill_base_init_context(&text_ctx);
1087
4.65k
            text_ctx.is_load = _is_load;
1088
4.65k
            init_status = static_cast<GenericReader*>(reader.get())->init_reader(&text_ctx);
1089
4.65k
            _cur_reader = std::move(reader);
1090
4.65k
            break;
1091
2.71k
        }
1092
753
        case TFileFormatType::FORMAT_JSON: {
1093
753
            _cur_reader =
1094
753
                    NewJsonReader::create_unique(_state, _profile, &_counter, *_params, range,
1095
753
                                                 _file_slot_descs, &_scanner_eof, _io_ctx.get());
1096
753
            JsonInitContext json_ctx;
1097
753
            _fill_base_init_context(&json_ctx);
1098
753
            json_ctx.col_default_value_ctx = &_col_default_value_ctx;
1099
753
            json_ctx.is_load = _is_load;
1100
753
            init_status = _cur_reader->init_reader(&json_ctx);
1101
753
            break;
1102
2.71k
        }
1103
1104
0
        case TFileFormatType::FORMAT_WAL: {
1105
0
            _cur_reader = WalReader::create_unique(_state);
1106
0
            WalInitContext wal_ctx;
1107
0
            _fill_base_init_context(&wal_ctx);
1108
0
            wal_ctx.output_tuple_descriptor = _output_tuple_desc;
1109
0
            init_status = _cur_reader->init_reader(&wal_ctx);
1110
0
            break;
1111
2.71k
        }
1112
3
        case TFileFormatType::FORMAT_NATIVE: {
1113
3
            auto reader =
1114
3
                    NativeReader::create_unique(_profile, *_params, range, _io_ctx.get(), _state);
1115
3
            ReaderInitContext native_ctx;
1116
3
            _fill_base_init_context(&native_ctx);
1117
3
            init_status = static_cast<GenericReader*>(reader.get())->init_reader(&native_ctx);
1118
3
            _cur_reader = std::move(reader);
1119
3
            need_to_get_parsed_schema = false;
1120
3
            break;
1121
2.71k
        }
1122
108
        case TFileFormatType::FORMAT_ARROW: {
1123
108
            ReaderInitContext arrow_ctx;
1124
108
            _fill_base_init_context(&arrow_ctx);
1125
1126
108
            if (range.__isset.table_format_params &&
1127
108
                range.table_format_params.table_format_type == "remote_doris") {
1128
98
                auto doris_reader =
1129
98
                        RemoteDorisReader::create_unique(_file_slot_descs, _state, _profile, range);
1130
98
                init_status =
1131
98
                        static_cast<GenericReader*>(doris_reader.get())->init_reader(&arrow_ctx);
1132
98
                if (doris_reader) {
1133
98
                    doris_reader->set_col_name_to_block_idx(&_src_block_name_to_idx);
1134
98
                }
1135
98
                _cur_reader = std::move(doris_reader);
1136
98
            } else {
1137
10
                auto arrow_reader =
1138
10
                        ArrowStreamReader::create_unique(_state, _profile, &_counter, *_params,
1139
10
                                                         range, _file_slot_descs, _io_ctx.get());
1140
10
                init_status =
1141
10
                        static_cast<GenericReader*>(arrow_reader.get())->init_reader(&arrow_ctx);
1142
10
                _cur_reader = std::move(arrow_reader);
1143
10
            }
1144
108
            break;
1145
2.71k
        }
1146
#ifdef BUILD_RUST_READERS
1147
        case TFileFormatType::FORMAT_LANCE: {
1148
            auto lance_reader = LanceRustReader::create_unique(_file_slot_descs, _state, _profile,
1149
                                                               range, _params);
1150
            init_status = lance_reader->init_reader();
1151
            _cur_reader = std::move(lance_reader);
1152
            need_to_get_parsed_schema = true;
1153
            break;
1154
        }
1155
#endif
1156
0
        default:
1157
0
            return Status::NotSupported("Not supported create reader for file format: {}.",
1158
0
                                        to_string(_params->format_type));
1159
71.2k
        }
1160
1161
71.2k
        if (_cur_reader == nullptr) {
1162
1
            return Status::NotSupported(
1163
1
                    "Not supported create reader for table format: {} / file format: {}.",
1164
1
                    range.__isset.table_format_params ? range.table_format_params.table_format_type
1165
1
                                                      : "NotSet",
1166
1
                    to_string(_params->format_type));
1167
1
        }
1168
71.2k
        COUNTER_UPDATE(_file_counter, 1);
1169
        // The FileScanner for external table may try to open not exist files,
1170
        // Because FE file cache for external table may out of date.
1171
        // So, NOT_FOUND for FileScanner is not a fail case.
1172
        // Will remove this after file reader refactor.
1173
71.2k
        if (init_status.is<END_OF_FILE>()) {
1174
0
            COUNTER_UPDATE(_empty_file_counter, 1);
1175
0
            continue;
1176
71.2k
        } else if (init_status.is<ErrorCode::NOT_FOUND>()) {
1177
0
            if (config::ignore_not_found_file_in_external_table) {
1178
0
                COUNTER_UPDATE(_not_found_file_counter, 1);
1179
0
                continue;
1180
0
            }
1181
0
            return Status::InternalError("failed to find reader, err: {}", init_status.to_string());
1182
71.2k
        } else if (!init_status.ok()) {
1183
1
            return Status::InternalError("failed to init reader, err: {}", init_status.to_string());
1184
1
        }
1185
1186
        // For table-level COUNT pushdown, offsets are undefined so we must skip
1187
        // _set_fill_or_truncate_columns (it uses [start_offset, end_offset] to
1188
        // filter row groups, which would produce incorrect empty results).
1189
71.2k
        bool is_table_level_count = _get_push_down_agg_type() == TPushAggOp::type::COUNT &&
1190
71.2k
                                    range.__isset.table_format_params &&
1191
71.2k
                                    range.table_format_params.table_level_row_count >= 0;
1192
71.2k
        if (!is_table_level_count) {
1193
70.9k
            Status status = _set_fill_or_truncate_columns(need_to_get_parsed_schema);
1194
70.9k
            if (status.is<END_OF_FILE>()) { // all parquet row groups are filtered
1195
0
                continue;
1196
70.9k
            } else if (!status.ok()) {
1197
0
                return Status::InternalError("failed to set_fill_or_truncate_columns, err: {}",
1198
0
                                             status.to_string());
1199
0
            }
1200
70.9k
        }
1201
1202
        // Unified COUNT(*) pushdown: replace the real reader with CountReader
1203
        // decorator if the reader accepts COUNT and can provide a total row count.
1204
71.2k
        if (_cur_reader->get_push_down_agg_type() == TPushAggOp::type::COUNT) {
1205
1.83k
            int64_t total_rows = -1;
1206
1.83k
            if (is_table_level_count) {
1207
                // FE-provided count (may account for table-format deletions)
1208
210
                total_rows = range.table_format_params.table_level_row_count;
1209
1.62k
            } else if (_cur_reader->supports_count_pushdown()) {
1210
                // File metadata count (ORC footer / Parquet row groups)
1211
586
                total_rows = _cur_reader->get_total_rows();
1212
586
            }
1213
1.83k
            if (total_rows >= 0) {
1214
796
                auto batch_size = _state->query_options().batch_size;
1215
796
                _cur_reader = std::make_unique<CountReader>(total_rows, batch_size,
1216
796
                                                            std::move(_cur_reader));
1217
796
            }
1218
1.83k
        }
1219
71.2k
        _cur_reader_eof = false;
1220
71.2k
        break;
1221
71.2k
    }
1222
71.2k
    return Status::OK();
1223
157k
}
1224
1225
Status FileScanner::_init_parquet_reader(FileMetaCache* file_meta_cache_ptr,
1226
34.5k
                                         std::unique_ptr<ParquetReader> parquet_reader) {
1227
34.5k
    const TFileRangeDesc& range = _current_range;
1228
34.5k
    Status init_status = Status::OK();
1229
1230
34.5k
    phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> slot_id_to_predicates =
1231
34.5k
            _local_state
1232
34.5k
                    ? _local_state->cast<FileScanLocalState>()._slot_id_to_predicates
1233
34.5k
                    : phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> {};
1234
1235
    // Build unified ParquetInitContext (shared by all Parquet reader variants)
1236
34.5k
    ParquetInitContext pctx;
1237
34.5k
    _fill_base_init_context(&pctx);
1238
34.5k
    pctx.conjuncts = &_push_down_conjuncts;
1239
34.5k
    pctx.slot_id_to_predicates = &slot_id_to_predicates;
1240
34.5k
    pctx.colname_to_slot_id = _col_name_to_slot_id;
1241
34.5k
    pctx.not_single_slot_filter_conjuncts = &_not_single_slot_filter_conjuncts;
1242
34.5k
    pctx.slot_id_to_filter_conjuncts = &_slot_id_to_filter_conjuncts;
1243
1244
34.5k
    if (range.__isset.table_format_params &&
1245
34.5k
        range.table_format_params.table_format_type == "iceberg") {
1246
        // IcebergParquetReader IS-A ParquetReader (CRTP mixin), no wrapping needed
1247
18.2k
        std::unique_ptr<IcebergParquetReader> iceberg_reader = IcebergParquetReader::create_unique(
1248
18.2k
                _kv_cache, _profile, *_params, range, _state->query_options().batch_size,
1249
18.2k
                &_state->timezone_obj(), _io_ctx.get(), _state, file_meta_cache_ptr);
1250
18.2k
        iceberg_reader->set_create_row_id_column_iterator_func(
1251
18.2k
                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
1252
246
                    return _create_row_id_column_iterator();
1253
246
                });
1254
18.2k
        init_status = static_cast<GenericReader*>(iceberg_reader.get())->init_reader(&pctx);
1255
18.2k
        _cur_reader = std::move(iceberg_reader);
1256
18.2k
    } else if (range.__isset.table_format_params &&
1257
16.3k
               range.table_format_params.table_format_type == "paimon") {
1258
        // PaimonParquetReader IS-A ParquetReader, no wrapping needed
1259
2.68k
        auto paimon_reader = PaimonParquetReader::create_unique(
1260
2.68k
                _profile, *_params, range, _state->query_options().batch_size,
1261
2.68k
                &_state->timezone_obj(), _kv_cache, _io_ctx.get(), _state, file_meta_cache_ptr);
1262
2.68k
        init_status = static_cast<GenericReader*>(paimon_reader.get())->init_reader(&pctx);
1263
2.68k
        _cur_reader = std::move(paimon_reader);
1264
13.6k
    } else if (range.__isset.table_format_params &&
1265
13.6k
               range.table_format_params.table_format_type == "hudi") {
1266
        // HudiParquetReader IS-A ParquetReader, no wrapping needed
1267
0
        auto hudi_reader = HudiParquetReader::create_unique(
1268
0
                _profile, *_params, range, _state->query_options().batch_size,
1269
0
                &_state->timezone_obj(), _io_ctx.get(), _state, file_meta_cache_ptr);
1270
0
        init_status = static_cast<GenericReader*>(hudi_reader.get())->init_reader(&pctx);
1271
0
        _cur_reader = std::move(hudi_reader);
1272
13.6k
    } else if (range.table_format_params.table_format_type == "hive") {
1273
10.4k
        auto hive_reader = HiveParquetReader::create_unique(
1274
10.4k
                _profile, *_params, range, _state->query_options().batch_size,
1275
10.4k
                &_state->timezone_obj(), _io_ctx.get(), _state, &_is_file_slot, file_meta_cache_ptr,
1276
10.4k
                _state->query_options().enable_parquet_lazy_mat);
1277
10.4k
        hive_reader->set_create_row_id_column_iterator_func(
1278
10.4k
                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
1279
1.84k
                    return _create_row_id_column_iterator();
1280
1.84k
                });
1281
10.4k
        init_status = static_cast<GenericReader*>(hive_reader.get())->init_reader(&pctx);
1282
10.4k
        _cur_reader = std::move(hive_reader);
1283
10.4k
    } else if (range.table_format_params.table_format_type == "tvf") {
1284
3.11k
        if (!parquet_reader) {
1285
3.05k
            parquet_reader = ParquetReader::create_unique(
1286
3.05k
                    _profile, *_params, range, _state->query_options().batch_size,
1287
3.05k
                    &_state->timezone_obj(), _io_ctx.get(), _state, file_meta_cache_ptr,
1288
3.05k
                    _state->query_options().enable_parquet_lazy_mat);
1289
3.05k
        }
1290
3.11k
        parquet_reader->set_create_row_id_column_iterator_func(
1291
3.11k
                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
1292
53
                    return _create_row_id_column_iterator();
1293
53
                });
1294
3.11k
        init_status = static_cast<GenericReader*>(parquet_reader.get())->init_reader(&pctx);
1295
3.11k
        _cur_reader = std::move(parquet_reader);
1296
3.11k
    } else if (_is_load) {
1297
22
        if (!parquet_reader) {
1298
22
            parquet_reader = ParquetReader::create_unique(
1299
22
                    _profile, *_params, range, _state->query_options().batch_size,
1300
22
                    &_state->timezone_obj(), _io_ctx.get(), _state, file_meta_cache_ptr,
1301
22
                    _state->query_options().enable_parquet_lazy_mat);
1302
22
        }
1303
22
        init_status = static_cast<GenericReader*>(parquet_reader.get())->init_reader(&pctx);
1304
22
        _cur_reader = std::move(parquet_reader);
1305
22
    }
1306
1307
34.5k
    return init_status;
1308
34.5k
}
1309
1310
Status FileScanner::_init_orc_reader(FileMetaCache* file_meta_cache_ptr,
1311
26.4k
                                     std::unique_ptr<OrcReader> orc_reader) {
1312
26.4k
    const TFileRangeDesc& range = _current_range;
1313
26.4k
    Status init_status = Status::OK();
1314
1315
    // Build unified OrcInitContext (shared by all ORC reader variants)
1316
26.4k
    OrcInitContext octx;
1317
26.4k
    _fill_base_init_context(&octx);
1318
26.4k
    octx.conjuncts = &_push_down_conjuncts;
1319
26.4k
    octx.not_single_slot_filter_conjuncts = &_not_single_slot_filter_conjuncts;
1320
26.4k
    octx.slot_id_to_filter_conjuncts = &_slot_id_to_filter_conjuncts;
1321
1322
26.4k
    if (range.__isset.table_format_params &&
1323
26.4k
        range.table_format_params.table_format_type == "transactional_hive") {
1324
        // TransactionalHiveReader IS-A OrcReader, no wrapping needed
1325
292
        auto tran_orc_reader = TransactionalHiveReader::create_unique(
1326
292
                _profile, _state, *_params, range, _state->query_options().batch_size,
1327
292
                _state->timezone(), _io_ctx.get(), file_meta_cache_ptr);
1328
292
        tran_orc_reader->set_create_row_id_column_iterator_func(
1329
292
                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
1330
54
                    return _create_row_id_column_iterator();
1331
54
                });
1332
292
        init_status = static_cast<GenericReader*>(tran_orc_reader.get())->init_reader(&octx);
1333
1334
292
        _cur_reader = std::move(tran_orc_reader);
1335
26.1k
    } else if (range.__isset.table_format_params &&
1336
26.1k
               range.table_format_params.table_format_type == "iceberg") {
1337
        // IcebergOrcReader IS-A OrcReader (CRTP mixin), no wrapping needed
1338
6.47k
        std::unique_ptr<IcebergOrcReader> iceberg_reader = IcebergOrcReader::create_unique(
1339
6.47k
                _kv_cache, _profile, _state, *_params, range, _state->query_options().batch_size,
1340
6.47k
                _state->timezone(), _io_ctx.get(), file_meta_cache_ptr);
1341
6.47k
        iceberg_reader->set_create_row_id_column_iterator_func(
1342
6.47k
                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
1343
232
                    return _create_row_id_column_iterator();
1344
232
                });
1345
6.47k
        init_status = static_cast<GenericReader*>(iceberg_reader.get())->init_reader(&octx);
1346
1347
6.47k
        _cur_reader = std::move(iceberg_reader);
1348
19.6k
    } else if (range.__isset.table_format_params &&
1349
19.6k
               range.table_format_params.table_format_type == "paimon") {
1350
        // PaimonOrcReader IS-A OrcReader, no wrapping needed
1351
2.07k
        auto paimon_reader = PaimonOrcReader::create_unique(
1352
2.07k
                _profile, _state, *_params, range, _state->query_options().batch_size,
1353
2.07k
                _state->timezone(), _kv_cache, _io_ctx.get(), file_meta_cache_ptr);
1354
2.07k
        init_status = static_cast<GenericReader*>(paimon_reader.get())->init_reader(&octx);
1355
1356
2.07k
        _cur_reader = std::move(paimon_reader);
1357
17.6k
    } else if (range.__isset.table_format_params &&
1358
17.6k
               range.table_format_params.table_format_type == "hudi") {
1359
        // HudiOrcReader IS-A OrcReader, no wrapping needed
1360
0
        auto hudi_reader = HudiOrcReader::create_unique(
1361
0
                _profile, _state, *_params, range, _state->query_options().batch_size,
1362
0
                _state->timezone(), _io_ctx.get(), file_meta_cache_ptr);
1363
0
        init_status = static_cast<GenericReader*>(hudi_reader.get())->init_reader(&octx);
1364
1365
0
        _cur_reader = std::move(hudi_reader);
1366
17.6k
    } else if (range.__isset.table_format_params &&
1367
17.6k
               range.table_format_params.table_format_type == "hive") {
1368
15.3k
        auto hive_reader = HiveOrcReader::create_unique(
1369
15.3k
                _profile, _state, *_params, range, _state->query_options().batch_size,
1370
15.3k
                _state->timezone(), _io_ctx.get(), &_is_file_slot, file_meta_cache_ptr,
1371
15.3k
                _state->query_options().enable_orc_lazy_mat);
1372
15.3k
        hive_reader->set_create_row_id_column_iterator_func(
1373
15.3k
                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
1374
5.01k
                    return _create_row_id_column_iterator();
1375
5.01k
                });
1376
15.3k
        init_status = static_cast<GenericReader*>(hive_reader.get())->init_reader(&octx);
1377
1378
15.3k
        _cur_reader = std::move(hive_reader);
1379
15.3k
    } else if (range.__isset.table_format_params &&
1380
2.25k
               range.table_format_params.table_format_type == "tvf") {
1381
2.21k
        if (!orc_reader) {
1382
2.08k
            orc_reader = OrcReader::create_unique(
1383
2.08k
                    _profile, _state, *_params, range, _state->query_options().batch_size,
1384
2.08k
                    _state->timezone(), _io_ctx.get(), file_meta_cache_ptr,
1385
2.08k
                    _state->query_options().enable_orc_lazy_mat);
1386
2.08k
        }
1387
2.21k
        orc_reader->set_create_row_id_column_iterator_func(
1388
2.21k
                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> {
1389
99
                    return _create_row_id_column_iterator();
1390
99
                });
1391
2.21k
        init_status = static_cast<GenericReader*>(orc_reader.get())->init_reader(&octx);
1392
2.21k
        _cur_reader = std::move(orc_reader);
1393
2.21k
    } else if (_is_load) {
1394
13
        if (!orc_reader) {
1395
13
            orc_reader = OrcReader::create_unique(
1396
13
                    _profile, _state, *_params, range, _state->query_options().batch_size,
1397
13
                    _state->timezone(), _io_ctx.get(), file_meta_cache_ptr,
1398
13
                    _state->query_options().enable_orc_lazy_mat);
1399
13
        }
1400
13
        init_status = static_cast<GenericReader*>(orc_reader.get())->init_reader(&octx);
1401
13
        _cur_reader = std::move(orc_reader);
1402
13
    }
1403
1404
26.4k
    return init_status;
1405
26.4k
}
1406
1407
74.1k
Status FileScanner::_set_fill_or_truncate_columns(bool need_to_get_parsed_schema) {
1408
74.1k
    _slot_lower_name_to_col_type.clear();
1409
1410
74.1k
    std::unordered_map<std::string, DataTypePtr> name_to_col_type;
1411
74.1k
    RETURN_IF_ERROR(_cur_reader->get_columns(&name_to_col_type));
1412
1413
2.32M
    for (const auto& [col_name, col_type] : name_to_col_type) {
1414
2.32M
        auto col_name_lower = to_lower(col_name);
1415
2.32M
        if (_partition_col_descs.contains(col_name_lower)) {
1416
            /*
1417
             * `_slot_lower_name_to_col_type` is used by `_init_src_block` and `_cast_to_input_block` during LOAD to
1418
             * generate columns of the corresponding type, which records the columns existing in the file.
1419
             *
1420
             * When a column in `COLUMNS FROM PATH` exists in a file column, the column type in the block will
1421
             * not match the slot type in `_output_tuple_desc`, causing an error when
1422
             * Serde `deserialize_one_cell_from_json` fills the partition values.
1423
             *
1424
             * So for partition column not need fill _slot_lower_name_to_col_type.
1425
             */
1426
68
            continue;
1427
68
        }
1428
2.32M
        _slot_lower_name_to_col_type.emplace(col_name_lower, col_type);
1429
2.32M
    }
1430
1431
74.1k
    RETURN_IF_ERROR(_generate_truncate_columns(need_to_get_parsed_schema));
1432
74.1k
    return Status::OK();
1433
74.1k
}
1434
1435
74.1k
Status FileScanner::_generate_truncate_columns(bool need_to_get_parsed_schema) {
1436
74.1k
    _source_file_col_name_types.clear();
1437
    //  The col names and types of source file, such as parquet, orc files.
1438
74.1k
    if (_state->query_options().truncate_char_or_varchar_columns && need_to_get_parsed_schema) {
1439
8
        std::vector<std::string> source_file_col_names;
1440
8
        std::vector<DataTypePtr> source_file_col_types;
1441
8
        Status status =
1442
8
                _cur_reader->get_parsed_schema(&source_file_col_names, &source_file_col_types);
1443
8
        if (!status.ok() && status.code() != TStatusCode::NOT_IMPLEMENTED_ERROR) {
1444
0
            return status;
1445
0
        }
1446
8
        DCHECK_EQ(source_file_col_names.size(), source_file_col_types.size());
1447
32
        for (int i = 0; i < source_file_col_names.size(); ++i) {
1448
24
            _source_file_col_name_types[to_lower(source_file_col_names[i])] =
1449
24
                    source_file_col_types[i];
1450
24
        }
1451
8
    }
1452
74.1k
    return Status::OK();
1453
74.1k
}
1454
1455
3.23k
Status FileScanner::prepare_for_read_lines(const TFileRangeDesc& range) {
1456
3.23k
    _current_range = range;
1457
1458
3.23k
    _file_cache_statistics.reset(new io::FileCacheStatistics());
1459
3.23k
    _file_reader_stats.reset(new io::FileReaderStats());
1460
1461
3.23k
    _file_read_bytes_counter =
1462
3.23k
            ADD_COUNTER_WITH_LEVEL(_profile, FileReadBytesProfile, TUnit::BYTES, 1);
1463
3.23k
    _file_read_time_counter = ADD_TIMER_WITH_LEVEL(_profile, FileReadTimeProfile, 1);
1464
1465
3.23k
    RETURN_IF_ERROR(_init_io_ctx());
1466
3.23k
    _io_ctx->file_cache_stats = _file_cache_statistics.get();
1467
3.23k
    _io_ctx->file_reader_stats = _file_reader_stats.get();
1468
3.23k
    _default_val_row_desc.reset(new RowDescriptor((TupleDescriptor*)_real_tuple_desc));
1469
3.23k
    RETURN_IF_ERROR(_init_expr_ctxes());
1470
1471
    // Since only one column is read from the file, there is no need to filter, so set these variables to empty.
1472
3.23k
    _push_down_conjuncts.clear();
1473
3.23k
    _not_single_slot_filter_conjuncts.clear();
1474
3.23k
    _slot_id_to_filter_conjuncts.clear();
1475
3.23k
    _kv_cache = nullptr;
1476
3.23k
    return Status::OK();
1477
3.23k
}
1478
1479
Status FileScanner::read_lines_from_range(const TFileRangeDesc& range,
1480
                                          const std::list<int64_t>& row_ids, Block* result_block,
1481
                                          const ExternalFileMappingInfo& external_info,
1482
3.20k
                                          int64_t* init_reader_ms, int64_t* get_block_ms) {
1483
3.20k
    _current_range = range;
1484
3.20k
    RETURN_IF_ERROR(_generate_partition_columns());
1485
1486
3.20k
    TFileFormatType::type format_type = _get_current_format_type();
1487
3.20k
    Status init_status = Status::OK();
1488
1489
3.20k
    auto file_meta_cache_ptr = external_info.enable_file_meta_cache
1490
3.20k
                                       ? ExecEnv::GetInstance()->file_meta_cache()
1491
3.20k
                                       : nullptr;
1492
1493
3.20k
    RETURN_IF_ERROR(scope_timer_run(
1494
3.20k
            [&]() -> Status {
1495
3.20k
                switch (format_type) {
1496
3.20k
                case TFileFormatType::FORMAT_PARQUET: {
1497
3.20k
                    std::unique_ptr<ParquetReader> parquet_reader = ParquetReader::create_unique(
1498
3.20k
                            _profile, *_params, range, 1, &_state->timezone_obj(), _io_ctx.get(),
1499
3.20k
                            _state, file_meta_cache_ptr, false);
1500
3.20k
                    RETURN_IF_ERROR(
1501
3.20k
                            _init_parquet_reader(file_meta_cache_ptr, std::move(parquet_reader)));
1502
                    // _init_parquet_reader may create a new table-format specific reader
1503
                    // (e.g., HiveParquetReader) that replaces the original parquet_reader.
1504
                    // We need to re-apply read_by_rows to the actual _cur_reader.
1505
3.20k
                    RETURN_IF_ERROR(_cur_reader->read_by_rows(row_ids));
1506
3.20k
                    break;
1507
3.20k
                }
1508
3.20k
                case TFileFormatType::FORMAT_ORC: {
1509
3.20k
                    std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
1510
3.20k
                            _profile, _state, *_params, range, 1, _state->timezone(), _io_ctx.get(),
1511
3.20k
                            file_meta_cache_ptr, false);
1512
3.20k
                    RETURN_IF_ERROR(_init_orc_reader(file_meta_cache_ptr, std::move(orc_reader)));
1513
                    // Same as above: re-apply read_by_rows to the actual _cur_reader.
1514
3.20k
                    RETURN_IF_ERROR(_cur_reader->read_by_rows(row_ids));
1515
3.20k
                    break;
1516
3.20k
                }
1517
3.20k
                default: {
1518
3.20k
                    return Status::NotSupported(
1519
3.20k
                            "Not support create lines reader for file format: {},"
1520
3.20k
                            "only support parquet and orc.",
1521
3.20k
                            to_string(_params->format_type));
1522
3.20k
                }
1523
3.20k
                }
1524
3.20k
                return Status::OK();
1525
3.20k
            },
1526
3.20k
            init_reader_ms));
1527
1528
3.20k
    RETURN_IF_ERROR(_set_fill_or_truncate_columns(true));
1529
3.20k
    _cur_reader_eof = false;
1530
1531
3.20k
    RETURN_IF_ERROR(scope_timer_run(
1532
3.20k
            [&]() -> Status {
1533
3.20k
                while (!_cur_reader_eof) {
1534
3.20k
                    bool eof = false;
1535
3.20k
                    RETURN_IF_ERROR(_get_block_impl(_state, result_block, &eof));
1536
3.20k
                }
1537
3.20k
                return Status::OK();
1538
3.20k
            },
1539
3.20k
            get_block_ms));
1540
1541
3.20k
    _cur_reader->collect_profile_before_close();
1542
3.20k
    RETURN_IF_ERROR(_cur_reader->close());
1543
1544
3.20k
    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
1545
3.20k
    COUNTER_UPDATE(_file_read_time_counter, _file_reader_stats->read_time_ns);
1546
3.20k
    return Status::OK();
1547
3.20k
}
1548
1549
10.9k
Status FileScanner::_generate_partition_columns() {
1550
10.9k
    _partition_col_descs.clear();
1551
10.9k
    _partition_value_is_null.clear();
1552
10.9k
    const TFileRangeDesc& range = _current_range;
1553
10.9k
    if (!range.__isset.columns_from_path_keys) {
1554
1.82k
        return Status::OK();
1555
1.82k
    }
1556
1557
9.08k
    std::unordered_map<std::string, int> partition_name_to_key_index;
1558
9.08k
    int index = 0;
1559
17.2k
    for (const auto& key : range.columns_from_path_keys) {
1560
17.2k
        partition_name_to_key_index.emplace(key, index++);
1561
17.2k
    }
1562
1563
    // Iterate _column_descs to find PARTITION_KEY columns instead of _partition_slot_descs.
1564
87.2k
    for (const auto& col_desc : _column_descs) {
1565
87.2k
        if (col_desc.category != ColumnCategory::PARTITION_KEY) {
1566
76.2k
            continue;
1567
76.2k
        }
1568
11.0k
        auto pit = partition_name_to_key_index.find(col_desc.name);
1569
11.0k
        if (pit != partition_name_to_key_index.end()) {
1570
11.0k
            int values_index = pit->second;
1571
11.0k
            if (range.__isset.columns_from_path && values_index < range.columns_from_path.size()) {
1572
11.0k
                _partition_col_descs.emplace(
1573
11.0k
                        col_desc.name,
1574
11.0k
                        std::make_tuple(range.columns_from_path[values_index], col_desc.slot_desc));
1575
11.0k
                if (range.__isset.columns_from_path_is_null) {
1576
0
                    _partition_value_is_null.emplace(col_desc.name,
1577
0
                                                     range.columns_from_path_is_null[values_index]);
1578
0
                }
1579
11.0k
            }
1580
11.0k
        }
1581
11.0k
    }
1582
9.08k
    return Status::OK();
1583
10.9k
}
1584
1585
53.0k
Status FileScanner::_init_expr_ctxes() {
1586
53.0k
    std::map<SlotId, int> full_src_index_map;
1587
53.0k
    std::map<SlotId, SlotDescriptor*> full_src_slot_map;
1588
53.0k
    std::map<std::string, int> partition_name_to_key_index_map;
1589
53.0k
    int index = 0;
1590
324k
    for (const auto& slot_desc : _real_tuple_desc->slots()) {
1591
324k
        full_src_slot_map.emplace(slot_desc->id(), slot_desc);
1592
324k
        full_src_index_map.emplace(slot_desc->id(), index++);
1593
324k
    }
1594
1595
    // For external table query, find the index of column in path.
1596
    // Because query doesn't always search for all columns in a table
1597
    // and the order of selected columns is random.
1598
    // All ranges in _ranges vector should have identical columns_from_path_keys
1599
    // because they are all file splits for the same external table.
1600
    // So here use the first element of _ranges to fill the partition_name_to_key_index_map
1601
53.0k
    if (_current_range.__isset.columns_from_path_keys) {
1602
14.7k
        std::vector<std::string> key_map = _current_range.columns_from_path_keys;
1603
14.7k
        if (!key_map.empty()) {
1604
30.8k
            for (size_t i = 0; i < key_map.size(); i++) {
1605
20.0k
                partition_name_to_key_index_map.emplace(key_map[i], i);
1606
20.0k
            }
1607
10.7k
        }
1608
14.7k
    }
1609
1610
53.0k
    _num_of_columns_from_file = _params->num_of_columns_from_file;
1611
324k
    for (const auto& slot_info : _params->required_slots) {
1612
324k
        auto slot_id = slot_info.slot_id;
1613
324k
        auto it = full_src_slot_map.find(slot_id);
1614
324k
        if (it == std::end(full_src_slot_map)) {
1615
0
            return Status::InternalError(
1616
0
                    fmt::format("Unknown source slot descriptor, slot_id={}", slot_id));
1617
0
        }
1618
1619
324k
        ColumnDescriptor col_desc;
1620
324k
        col_desc.name = it->second->col_name();
1621
324k
        col_desc.slot_desc = it->second;
1622
1623
        // Read category from Thrift if available (new FE), otherwise fall back
1624
        // to slot_info.is_file_slot + partition_name_to_key_index_map for broker/stream load
1625
        // where the FE does not set TColumnCategory.
1626
324k
        if (slot_info.__isset.category) {
1627
277k
            switch (slot_info.category) {
1628
263k
            case TColumnCategory::REGULAR:
1629
263k
                col_desc.category = ColumnCategory::REGULAR;
1630
263k
                break;
1631
8.49k
            case TColumnCategory::PARTITION_KEY:
1632
8.49k
                col_desc.category = ColumnCategory::PARTITION_KEY;
1633
8.49k
                break;
1634
4.44k
            case TColumnCategory::SYNTHESIZED:
1635
4.44k
                col_desc.category = ColumnCategory::SYNTHESIZED;
1636
4.44k
                break;
1637
794
            case TColumnCategory::GENERATED:
1638
794
                col_desc.category = ColumnCategory::GENERATED;
1639
794
                break;
1640
277k
            }
1641
277k
        } else if (partition_name_to_key_index_map.contains(it->second->col_name()) &&
1642
47.4k
                   !slot_info.is_file_slot) {
1643
2.13k
            col_desc.category = ColumnCategory::PARTITION_KEY;
1644
2.13k
        }
1645
1646
        // Derive is_file_slot from category
1647
324k
        bool is_file_slot = (col_desc.category == ColumnCategory::REGULAR ||
1648
324k
                             col_desc.category == ColumnCategory::GENERATED);
1649
1650
324k
        if (partition_name_to_key_index_map.contains(it->second->col_name())) {
1651
14.3k
            if (_is_load) {
1652
11
                auto iti = full_src_index_map.find(slot_id);
1653
11
                _partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file);
1654
14.3k
            } else {
1655
14.3k
                auto kit = partition_name_to_key_index_map.find(it->second->col_name());
1656
14.3k
                _partition_slot_index_map.emplace(slot_id, kit->second);
1657
14.3k
            }
1658
14.3k
        }
1659
1660
324k
        if (is_file_slot) {
1661
309k
            _is_file_slot.emplace(slot_id);
1662
309k
            _file_slot_descs.emplace_back(it->second);
1663
309k
            _file_col_names.push_back(it->second->col_name());
1664
309k
        }
1665
1666
324k
        _column_descs.push_back(col_desc);
1667
324k
    }
1668
1669
    // set column name to default value expr map
1670
    // new inline TFileScanSlotInfo.default_value_expr (preferred)
1671
325k
    for (const auto& slot_info : _params->required_slots) {
1672
325k
        auto slot_id = slot_info.slot_id;
1673
325k
        auto it = full_src_slot_map.find(slot_id);
1674
325k
        if (it == std::end(full_src_slot_map)) {
1675
0
            continue;
1676
0
        }
1677
325k
        const std::string& col_name = it->second->col_name();
1678
1679
325k
        VExprContextSPtr ctx;
1680
325k
        bool has_default = false;
1681
1682
        // Prefer inline default_value_expr from TFileScanSlotInfo (new FE)
1683
325k
        if (slot_info.__isset.default_value_expr && !slot_info.default_value_expr.nodes.empty()) {
1684
291k
            RETURN_IF_ERROR(VExpr::create_expr_tree(slot_info.default_value_expr, ctx));
1685
291k
            RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc));
1686
291k
            RETURN_IF_ERROR(ctx->open(_state));
1687
291k
            has_default = true;
1688
291k
        } else if (slot_info.__isset.default_value_expr) {
1689
            // Empty nodes means null default (same as legacy empty TExpr)
1690
5.28k
            has_default = true;
1691
5.28k
        }
1692
1693
        // Fall back to legacy default_value_of_src_slot map for mixed-version FE/BE
1694
        // and callers that have not preserved inline default_value_expr yet.
1695
325k
        if (!has_default) {
1696
28.9k
            auto legacy_it = _params->default_value_of_src_slot.find(slot_id);
1697
28.9k
            if (legacy_it != std::end(_params->default_value_of_src_slot)) {
1698
24.1k
                if (!legacy_it->second.nodes.empty()) {
1699
0
                    RETURN_IF_ERROR(VExpr::create_expr_tree(legacy_it->second, ctx));
1700
0
                    RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc));
1701
0
                    RETURN_IF_ERROR(ctx->open(_state));
1702
0
                }
1703
24.1k
                has_default = true;
1704
24.1k
            }
1705
28.9k
        }
1706
1707
325k
        if (has_default) {
1708
            // if expr is empty, the default value will be null
1709
321k
            _col_default_value_ctx.emplace(col_name, ctx);
1710
321k
        }
1711
325k
    }
1712
1713
    // Populate default_expr in each ColumnDescriptor from _col_default_value_ctx.
1714
    // This makes default values available to readers via column_descs, eliminating the
1715
    // need for the separate _generate_missing_columns roundtrip.
1716
325k
    for (auto& col_desc : _column_descs) {
1717
325k
        auto it = _col_default_value_ctx.find(col_desc.name);
1718
325k
        if (it != _col_default_value_ctx.end()) {
1719
320k
            col_desc.default_expr = it->second;
1720
320k
        }
1721
325k
    }
1722
1723
53.2k
    if (_is_load) {
1724
        // follow desc expr map is only for load task.
1725
2.49k
        bool has_slot_id_map = _params->__isset.dest_sid_to_src_sid_without_trans;
1726
2.49k
        int idx = 0;
1727
29.1k
        for (auto* slot_desc : _output_tuple_desc->slots()) {
1728
29.1k
            auto it = _params->expr_of_dest_slot.find(slot_desc->id());
1729
29.1k
            if (it == std::end(_params->expr_of_dest_slot)) {
1730
0
                return Status::InternalError("No expr for dest slot, id={}, name={}",
1731
0
                                             slot_desc->id(), slot_desc->col_name());
1732
0
            }
1733
1734
29.1k
            VExprContextSPtr ctx;
1735
29.1k
            if (!it->second.nodes.empty()) {
1736
29.1k
                RETURN_IF_ERROR(VExpr::create_expr_tree(it->second, ctx));
1737
29.1k
                RETURN_IF_ERROR(ctx->prepare(_state, *_src_row_desc));
1738
29.1k
                RETURN_IF_ERROR(ctx->open(_state));
1739
29.1k
            }
1740
29.1k
            _dest_vexpr_ctx.emplace_back(ctx);
1741
29.1k
            _dest_slot_name_to_idx[slot_desc->col_name()] = idx++;
1742
1743
29.1k
            if (has_slot_id_map) {
1744
29.0k
                auto it1 = _params->dest_sid_to_src_sid_without_trans.find(slot_desc->id());
1745
29.0k
                if (it1 == std::end(_params->dest_sid_to_src_sid_without_trans)) {
1746
6.40k
                    _src_slot_descs_order_by_dest.emplace_back(nullptr);
1747
22.6k
                } else {
1748
22.6k
                    auto _src_slot_it = full_src_slot_map.find(it1->second);
1749
22.6k
                    if (_src_slot_it == std::end(full_src_slot_map)) {
1750
0
                        return Status::InternalError("No src slot {} in src slot descs",
1751
0
                                                     it1->second);
1752
0
                    }
1753
22.6k
                    _dest_slot_to_src_slot_index.emplace(_src_slot_descs_order_by_dest.size(),
1754
22.6k
                                                         full_src_index_map[_src_slot_it->first]);
1755
22.6k
                    _src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second);
1756
22.6k
                }
1757
29.0k
            }
1758
29.1k
        }
1759
2.49k
    }
1760
53.2k
    return Status::OK();
1761
53.2k
}
1762
1763
400k
bool FileScanner::_should_enable_condition_cache() {
1764
400k
    DCHECK(_should_enable_condition_cache_handler != nullptr);
1765
400k
    return _condition_cache_digest != 0 && (this->*_should_enable_condition_cache_handler)() &&
1766
400k
           (!_conjuncts.empty() || !_push_down_conjuncts.empty());
1767
400k
}
1768
1769
0
bool FileScanner::_should_enable_condition_cache_for_load() const {
1770
0
    return false;
1771
0
}
1772
1773
332k
bool FileScanner::_should_enable_condition_cache_for_query() const {
1774
332k
    return true;
1775
332k
}
1776
1777
71.2k
bool FileScanner::_should_push_down_predicates(TFileFormatType::type format_type) const {
1778
71.2k
    DCHECK(_should_push_down_predicates_handler != nullptr);
1779
71.2k
    return (this->*_should_push_down_predicates_handler)(format_type);
1780
71.2k
}
1781
1782
2.49k
bool FileScanner::_should_push_down_predicates_for_load(TFileFormatType::type format_type) const {
1783
2.49k
    static_cast<void>(format_type);
1784
2.49k
    return false;
1785
2.49k
}
1786
1787
68.7k
bool FileScanner::_should_push_down_predicates_for_query(TFileFormatType::type format_type) const {
1788
    // JNI readers handle predicate conversion in their own paths.
1789
68.7k
    return format_type != TFileFormatType::FORMAT_JNI;
1790
68.7k
}
1791
1792
156k
void FileScanner::_init_reader_condition_cache() {
1793
156k
    _condition_cache = nullptr;
1794
156k
    _condition_cache_ctx = nullptr;
1795
1796
156k
    if (!_should_enable_condition_cache() || !_cur_reader) {
1797
135k
        return;
1798
135k
    }
1799
1800
    // Disable condition cache when delete operations exist (e.g. Iceberg position/equality
1801
    // deletes, Hive ACID deletes). Cached granule results may become stale if delete files
1802
    // change between queries while the data file's cache key remains the same.
1803
21.1k
    if (_cur_reader->has_delete_operations()) {
1804
2.19k
        return;
1805
2.19k
    }
1806
1807
18.9k
    auto* cache = segment_v2::ConditionCache::instance();
1808
18.9k
    _condition_cache_key = segment_v2::ConditionCache::ExternalCacheKey(
1809
18.9k
            _current_range.path,
1810
18.4E
            _current_range.__isset.modification_time ? _current_range.modification_time : 0,
1811
18.4E
            _current_range.__isset.file_size ? _current_range.file_size : -1,
1812
18.9k
            _condition_cache_digest,
1813
18.4E
            _current_range.__isset.start_offset ? _current_range.start_offset : 0,
1814
18.4E
            _current_range.__isset.size ? _current_range.size : -1);
1815
1816
18.9k
    segment_v2::ConditionCacheHandle handle;
1817
18.9k
    auto condition_cache_hit = cache->lookup(_condition_cache_key, &handle);
1818
18.9k
    if (condition_cache_hit) {
1819
7.76k
        _condition_cache = handle.get_filter_result();
1820
7.76k
        _condition_cache_hit_count++;
1821
11.1k
    } else {
1822
        // Allocate cache pre-sized to total number of granules.
1823
        // We add +1 as a safety margin: when a file is split across multiple scanners
1824
        // and the first row of this scanner's range is not aligned to a granule boundary,
1825
        // the data may span one more granule than ceil(total_rows / GRANULE_SIZE).
1826
        // The extra element costs only 1 bit and never affects correctness (an extra
1827
        // false-granule beyond the actual data range won't overlap any real row range).
1828
11.1k
        int64_t total_rows = _cur_reader->get_total_rows();
1829
11.1k
        if (total_rows > 0) {
1830
9.40k
            size_t num_granules = (total_rows + ConditionCacheContext::GRANULE_SIZE - 1) /
1831
9.40k
                                  ConditionCacheContext::GRANULE_SIZE;
1832
9.40k
            _condition_cache = std::make_shared<std::vector<bool>>(num_granules + 1, false);
1833
9.40k
        }
1834
11.1k
    }
1835
1836
18.9k
    if (_condition_cache) {
1837
        // Create context to pass to readers (native readers use it; non-native readers ignore it)
1838
17.1k
        _condition_cache_ctx = std::make_shared<ConditionCacheContext>();
1839
17.1k
        _condition_cache_ctx->is_hit = condition_cache_hit;
1840
17.1k
        _condition_cache_ctx->filter_result = _condition_cache;
1841
17.1k
        _cur_reader->set_condition_cache_context(_condition_cache_ctx);
1842
17.1k
    }
1843
18.9k
}
1844
1845
243k
void FileScanner::_finalize_reader_condition_cache() {
1846
243k
    if (!_should_enable_condition_cache() || !_condition_cache_ctx ||
1847
243k
        _condition_cache_ctx->is_hit) {
1848
234k
        _condition_cache = nullptr;
1849
234k
        _condition_cache_ctx = nullptr;
1850
234k
        return;
1851
234k
    }
1852
    // Only store the cache if the reader was fully consumed. If the scan was
1853
    // truncated early (e.g. by LIMIT), the cache is incomplete — unread granules
1854
    // would remain false and cause surviving rows to be incorrectly skipped on HIT.
1855
9.44k
    if (!_cur_reader_eof) {
1856
44
        _condition_cache = nullptr;
1857
44
        _condition_cache_ctx = nullptr;
1858
44
        return;
1859
44
    }
1860
1861
9.40k
    auto* cache = segment_v2::ConditionCache::instance();
1862
9.40k
    cache->insert(_condition_cache_key, std::move(_condition_cache));
1863
9.40k
    _condition_cache = nullptr;
1864
9.40k
    _condition_cache_ctx = nullptr;
1865
9.40k
}
1866
1867
86.5k
Status FileScanner::close(RuntimeState* state) {
1868
86.5k
    if (!_try_close()) {
1869
0
        return Status::OK();
1870
0
    }
1871
1872
86.5k
    _finalize_reader_condition_cache();
1873
1874
86.5k
    if (_cur_reader) {
1875
639
        RETURN_IF_ERROR(_cur_reader->close());
1876
639
    }
1877
1878
86.5k
    RETURN_IF_ERROR(Scanner::close(state));
1879
86.5k
    return Status::OK();
1880
86.5k
}
1881
1882
86.5k
void FileScanner::try_stop() {
1883
86.5k
    Scanner::try_stop();
1884
86.5k
    if (_io_ctx) {
1885
86.5k
        _io_ctx->should_stop = true;
1886
86.5k
    }
1887
86.5k
}
1888
1889
137k
void FileScanner::update_realtime_counters() {
1890
137k
    FileScanLocalState* local_state = static_cast<FileScanLocalState*>(_local_state);
1891
1892
137k
    COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes);
1893
137k
    COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows);
1894
1895
137k
    _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_rows(
1896
137k
            _file_reader_stats->read_rows);
1897
137k
    _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes(
1898
137k
            _file_reader_stats->read_bytes);
1899
1900
137k
    int64_t delta_bytes_read_from_local =
1901
137k
            _file_cache_statistics->bytes_read_from_local - _last_bytes_read_from_local;
1902
137k
    int64_t delta_bytes_read_from_remote =
1903
137k
            _file_cache_statistics->bytes_read_from_remote - _last_bytes_read_from_remote;
1904
137k
    if (_file_cache_statistics->bytes_read_from_local == 0 &&
1905
137k
        _file_cache_statistics->bytes_read_from_remote == 0) {
1906
131k
        _state->get_query_ctx()
1907
131k
                ->resource_ctx()
1908
131k
                ->io_context()
1909
131k
                ->update_scan_bytes_from_remote_storage(_file_reader_stats->read_bytes);
1910
131k
        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
1911
131k
                _file_reader_stats->read_bytes);
1912
131k
    } else {
1913
5.91k
        _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage(
1914
5.91k
                delta_bytes_read_from_local);
1915
5.91k
        _state->get_query_ctx()
1916
5.91k
                ->resource_ctx()
1917
5.91k
                ->io_context()
1918
5.91k
                ->update_scan_bytes_from_remote_storage(delta_bytes_read_from_remote);
1919
5.91k
        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
1920
5.91k
                delta_bytes_read_from_local);
1921
5.91k
        DorisMetrics::instance()->query_scan_bytes_from_remote->increment(
1922
5.91k
                delta_bytes_read_from_remote);
1923
5.91k
    }
1924
1925
137k
    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
1926
1927
137k
    DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes);
1928
137k
    DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows);
1929
1930
137k
    _file_reader_stats->read_bytes = 0;
1931
137k
    _file_reader_stats->read_rows = 0;
1932
1933
137k
    _last_bytes_read_from_local = _file_cache_statistics->bytes_read_from_local;
1934
137k
    _last_bytes_read_from_remote = _file_cache_statistics->bytes_read_from_remote;
1935
137k
}
1936
1937
86.4k
void FileScanner::_collect_profile_before_close() {
1938
86.4k
    Scanner::_collect_profile_before_close();
1939
86.4k
    if (config::enable_file_cache && _state->query_options().enable_file_cache &&
1940
86.4k
        _profile != nullptr) {
1941
1.36k
        io::FileCacheProfileReporter cache_profile(_profile);
1942
1.36k
        cache_profile.update(_file_cache_statistics.get());
1943
1.36k
        _state->get_query_ctx()->resource_ctx()->io_context()->update_bytes_write_into_cache(
1944
1.36k
                _file_cache_statistics->bytes_write_into_cache);
1945
1.36k
    }
1946
1947
86.4k
    if (_cur_reader != nullptr) {
1948
639
        _cur_reader->collect_profile_before_close();
1949
639
    }
1950
1951
86.4k
    FileScanLocalState* local_state = static_cast<FileScanLocalState*>(_local_state);
1952
86.4k
    COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes);
1953
86.4k
    COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows);
1954
1955
86.4k
    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
1956
86.4k
    COUNTER_UPDATE(_file_read_calls_counter, _file_reader_stats->read_calls);
1957
86.4k
    COUNTER_UPDATE(_file_read_time_counter, _file_reader_stats->read_time_ns);
1958
86.4k
    COUNTER_UPDATE(local_state->_condition_cache_hit_counter, _condition_cache_hit_count);
1959
86.4k
    if (_io_ctx) {
1960
86.4k
        COUNTER_UPDATE(local_state->_condition_cache_filtered_rows_counter,
1961
86.4k
                       _io_ctx->condition_cache_filtered_rows);
1962
86.4k
    }
1963
1964
86.4k
    DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes);
1965
86.4k
    DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows);
1966
86.4k
}
1967
1968
} // namespace doris