Coverage Report

Created: 2026-05-08 13:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/file_scanner.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <stddef.h>
21
#include <stdint.h>
22
23
#include <memory>
24
#include <string>
25
#include <unordered_map>
26
#include <unordered_set>
27
#include <vector>
28
29
#include "common/factory_creator.h"
30
#include "common/global_types.h"
31
#include "common/status.h"
32
#include "core/block/block.h"
33
#include "exec/operator/file_scan_operator.h"
34
#include "exprs/vexpr_fwd.h"
35
#include "format/generic_reader.h"
36
#include "format/orc/vorc_reader.h"
37
#include "format/parquet/vparquet_reader.h"
38
#include "format/table/iceberg_reader.h"
39
#include "io/io_common.h"
40
#include "runtime/descriptors.h"
41
#include "runtime/runtime_profile.h"
42
#include "storage/olap_common.h"
43
#include "storage/olap_scan_common.h"
44
#include "storage/segment/adaptive_block_size_predictor.h"
45
#include "storage/segment/condition_cache.h"
46
47
namespace doris {
48
class RuntimeState;
49
class TFileRangeDesc;
50
class TFileScanRange;
51
class TFileScanRangeParams;
52
53
class ShardedKVCache;
54
class VExpr;
55
class VExprContext;
56
} // namespace doris
57
58
namespace doris {
59
60
class FileScanner : public Scanner {
61
    ENABLE_FACTORY_CREATOR(FileScanner);
62
63
public:
64
    static constexpr const char* NAME = "FileScanner";
65
    static constexpr size_t ADAPTIVE_BATCH_INITIAL_PROBE_ROWS = 32;
66
67
    // sub profile name (for parquet/orc)
68
    static const std::string FileReadBytesProfile;
69
    static const std::string FileReadTimeProfile;
70
71
    FileScanner(RuntimeState* state, FileScanLocalState* parent, int64_t limit,
72
                std::shared_ptr<SplitSourceConnector> split_source, RuntimeProfile* profile,
73
                ShardedKVCache* kv_cache,
74
                const std::unordered_map<std::string, int>* colname_to_slot_id);
75
76
    Status _open_impl(RuntimeState* state) override;
77
78
    Status close(RuntimeState* state) override;
79
80
    void try_stop() override;
81
82
    Status init(RuntimeState* state, const VExprContextSPtrs& conjuncts) override;
83
84
0
    std::string get_name() override { return FileScanner::NAME; }
85
86
72
    std::string get_current_scan_range_name() override { return _current_range_path; }
87
88
    //only used for read one line.
89
    FileScanner(RuntimeState* state, RuntimeProfile* profile, const TFileScanRangeParams* params,
90
                const std::unordered_map<std::string, int>* colname_to_slot_id,
91
                TupleDescriptor* tuple_desc)
92
3.16k
            : Scanner(state, profile),
93
3.16k
              _params(params),
94
3.16k
              _col_name_to_slot_id(colname_to_slot_id),
95
3.16k
              _real_tuple_desc(tuple_desc) {
96
3.16k
        _configure_file_scan_handlers();
97
3.16k
    };
98
99
    Status read_lines_from_range(const TFileRangeDesc& range, const std::list<int64_t>& row_ids,
100
                                 Block* result_block, const ExternalFileMappingInfo& external_info,
101
                                 int64_t* init_reader_ms, int64_t* get_block_ms);
102
103
    Status prepare_for_read_lines(const TFileRangeDesc& range);
104
105
    void update_realtime_counters() override;
106
107
protected:
108
    Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override;
109
110
    Status _get_block_wrapped(RuntimeState* state, Block* block, bool* eof);
111
112
    Status _get_next_reader();
113
114
    // Build a ReaderInitContext with shared fields from FileScanner members.
115
    void _fill_base_init_context(ReaderInitContext* ctx);
116
117
    // TODO: cast input block columns type to string.
118
0
    Status _cast_src_block(Block* block) { return Status::OK(); }
119
120
    void _collect_profile_before_close() override;
121
122
    // fe will add skip_bitmap_col to _input_tuple_desc iff the target olaptable has skip_bitmap_col
123
    // and the current load is a flexible partial update
124
6.19k
    bool _should_process_skip_bitmap_col() const { return _skip_bitmap_col_idx != -1; }
125
126
protected:
127
    const TFileScanRangeParams* _params = nullptr;
128
    std::shared_ptr<SplitSourceConnector> _split_source;
129
    bool _first_scan_range = false;
130
    TFileRangeDesc _current_range;
131
132
    std::unique_ptr<GenericReader> _cur_reader;
133
    bool _cur_reader_eof = false;
134
    // File source slot descriptors
135
    std::vector<SlotDescriptor*> _file_slot_descs;
136
    // col names from _file_slot_descs
137
    std::vector<std::string> _file_col_names;
138
    // Unified column descriptors for init_reader (includes file, partition, missing, synthesized cols)
139
    std::vector<ColumnDescriptor> _column_descs;
140
141
    // Partition slot id to partition key index (for matching columns_from_path)
142
    std::unordered_map<SlotId, int> _partition_slot_index_map;
143
    // created from param.expr_of_dest_slot
144
    // For query, it saves default value expr of all dest columns, or nullptr for NULL.
145
    // For load, it saves conversion expr/default value of all dest columns.
146
    VExprContextSPtrs _dest_vexpr_ctx;
147
    // dest slot name to index in _dest_vexpr_ctx;
148
    std::unordered_map<std::string, int> _dest_slot_name_to_idx;
149
    // col name to default value expr
150
    std::unordered_map<std::string, VExprContextSPtr> _col_default_value_ctx;
151
    // the map values of dest slot id to src slot desc
152
    // if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr
153
    std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest;
154
    // dest slot desc index to src slot desc index
155
    std::unordered_map<int, int> _dest_slot_to_src_slot_index;
156
157
    std::unordered_map<std::string, uint32_t> _src_block_name_to_idx;
158
159
    // Get from GenericReader, save the existing columns in file to their type.
160
    std::unordered_map<std::string, DataTypePtr> _slot_lower_name_to_col_type;
161
    // Get from GenericReader, save columns that required by scan but not exist in file.
162
163
    // The col lowercase name of source file to type of source file.
164
    std::map<std::string, DataTypePtr> _source_file_col_name_types;
165
166
    // For load task
167
    VExprContextSPtrs _pre_conjunct_ctxs;
168
    std::unique_ptr<RowDescriptor> _src_row_desc;
169
    std::unique_ptr<RowDescriptor> _dest_row_desc;
170
    // row desc for default exprs
171
    std::unique_ptr<RowDescriptor> _default_val_row_desc;
172
    // owned by scan node
173
    ShardedKVCache* _kv_cache = nullptr;
174
175
    std::set<TSlotId> _is_file_slot;
176
    bool _scanner_eof = false;
177
    int _rows = 0;
178
    int _num_of_columns_from_file;
179
180
    bool _src_block_mem_reuse = false;
181
    bool _strict_mode;
182
183
    bool _src_block_init = false;
184
    Block* _src_block_ptr = nullptr;
185
    Block _src_block;
186
187
    VExprContextSPtrs _push_down_conjuncts;
188
    VExprContextSPtrs _runtime_filter_partition_prune_ctxs;
189
    Block _runtime_filter_partition_prune_block;
190
191
    std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics;
192
    std::unique_ptr<io::FileReaderStats> _file_reader_stats;
193
    std::unique_ptr<io::IOContext> _io_ctx;
194
195
    // Whether to fill partition columns from path, default is true.
196
    bool _fill_partition_from_path = true;
197
    std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
198
            _partition_col_descs;
199
    std::unordered_map<std::string, bool> _partition_value_is_null;
200
201
    // idx of skip_bitmap_col in _input_tuple_desc
202
    int32_t _skip_bitmap_col_idx {-1};
203
    int32_t _sequence_map_col_uid {-1};
204
    int32_t _sequence_col_uid {-1};
205
206
private:
207
    RuntimeProfile::Counter* _get_block_timer = nullptr;
208
    RuntimeProfile::Counter* _cast_to_input_block_timer = nullptr;
209
    RuntimeProfile::Counter* _fill_missing_columns_timer = nullptr;
210
    RuntimeProfile::Counter* _pre_filter_timer = nullptr;
211
    RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr;
212
    RuntimeProfile::Counter* _runtime_filter_partition_prune_timer = nullptr;
213
    RuntimeProfile::Counter* _empty_file_counter = nullptr;
214
    RuntimeProfile::Counter* _not_found_file_counter = nullptr;
215
    RuntimeProfile::Counter* _fully_skipped_file_counter = nullptr;
216
    RuntimeProfile::Counter* _file_counter = nullptr;
217
    RuntimeProfile::Counter* _file_read_bytes_counter = nullptr;
218
    RuntimeProfile::Counter* _file_read_calls_counter = nullptr;
219
    RuntimeProfile::Counter* _file_read_time_counter = nullptr;
220
    RuntimeProfile::Counter* _runtime_filter_partition_pruned_range_counter = nullptr;
221
    RuntimeProfile::Counter* _adaptive_batch_predicted_rows_counter = nullptr;
222
    RuntimeProfile::Counter* _adaptive_batch_actual_bytes_before_truncate_counter = nullptr;
223
    RuntimeProfile::Counter* _adaptive_batch_actual_bytes_after_truncate_counter = nullptr;
224
    RuntimeProfile::Counter* _adaptive_batch_probe_count_counter = nullptr;
225
226
    const std::unordered_map<std::string, int>* _col_name_to_slot_id = nullptr;
227
    // single slot filter conjuncts
228
    std::unordered_map<int, VExprContextSPtrs> _slot_id_to_filter_conjuncts;
229
    // not single(zero or multi) slot filter conjuncts
230
    VExprContextSPtrs _not_single_slot_filter_conjuncts;
231
    // save the path of current scan range
232
    std::string _current_range_path = "";
233
234
    // Only for load scan node.
235
    const TupleDescriptor* _input_tuple_desc = nullptr;
236
    // If _input_tuple_desc is set,
237
    // the _real_tuple_desc will point to _input_tuple_desc,
238
    // otherwise, point to _output_tuple_desc
239
    const TupleDescriptor* _real_tuple_desc = nullptr;
240
241
    int64_t _last_bytes_read_from_local = 0;
242
    int64_t _last_bytes_read_from_remote = 0;
243
244
    Status (FileScanner::*_init_src_block_handler)(Block* block) = nullptr;
245
    Status (FileScanner::*_process_src_block_after_read_handler)(Block* block) = nullptr;
246
    bool (FileScanner::*_should_push_down_predicates_handler)(
247
            TFileFormatType::type format_type) const = nullptr;
248
    bool (FileScanner::*_should_enable_condition_cache_handler)() const = nullptr;
249
250
    // Condition cache for external tables
251
    uint64_t _condition_cache_digest = 0;
252
    segment_v2::ConditionCache::ExternalCacheKey _condition_cache_key;
253
    std::shared_ptr<std::vector<bool>> _condition_cache;
254
    std::shared_ptr<ConditionCacheContext> _condition_cache_ctx;
255
    int64_t _condition_cache_hit_count = 0;
256
    std::unique_ptr<AdaptiveBlockSizePredictor> _block_size_predictor;
257
258
    void _configure_file_scan_handlers();
259
260
    Status _init_expr_ctxes();
261
    Status _init_src_block(Block* block);
262
    Status _init_src_block_for_load(Block* block);
263
    Status _init_src_block_for_query(Block* block);
264
    Status _process_src_block_after_read(Block* block);
265
    Status _process_src_block_after_read_for_load(Block* block);
266
    Status _process_src_block_after_read_for_query(Block* block);
267
    Status _check_output_block_types();
268
    Status _cast_to_input_block(Block* block);
269
    Status _pre_filter_src_block();
270
    Status _convert_to_output_block(Block* block);
271
    Status _truncate_char_or_varchar_columns(Block* block);
272
    void _truncate_char_or_varchar_column(Block* block, int idx, int len);
273
    Status _generate_partition_columns();
274
275
    bool _check_partition_prune_expr(const VExprSPtr& expr);
276
    void _init_runtime_filter_partition_prune_ctxs();
277
    void _init_runtime_filter_partition_prune_block();
278
    Status _process_runtime_filters_partition_prune(bool& is_partition_pruned);
279
    Status _process_conjuncts();
280
    Status _process_late_arrival_conjuncts();
281
    void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
282
    Status _generate_truncate_columns(bool need_to_get_parsed_schema);
283
    Status _set_fill_or_truncate_columns(bool need_to_get_parsed_schema);
284
    Status _init_orc_reader(FileMetaCache* file_meta_cache_ptr,
285
                            std::unique_ptr<OrcReader> orc_reader = nullptr);
286
    Status _init_parquet_reader(FileMetaCache* file_meta_cache_ptr,
287
                                std::unique_ptr<ParquetReader> parquet_reader = nullptr);
288
    std::shared_ptr<segment_v2::RowIdColumnIteratorV2> _create_row_id_column_iterator();
289
290
75.2k
    TFileFormatType::type _get_current_format_type() {
291
        // for compatibility, if format_type is not set in range, use the format type of params
292
75.2k
        const TFileRangeDesc& range = _current_range;
293
75.2k
        return range.__isset.format_type ? range.format_type : _params->format_type;
294
75.2k
    };
295
296
90.3k
    Status _init_io_ctx() {
297
90.3k
        _io_ctx.reset(new io::IOContext());
298
90.3k
        _io_ctx->query_id = &_state->query_id();
299
90.3k
        return Status::OK();
300
90.3k
    };
301
302
0
    void _reset_counter() {
303
0
        _counter.num_rows_unselected = 0;
304
0
        _counter.num_rows_filtered = 0;
305
0
    }
306
307
    bool _should_enable_condition_cache();
308
    bool _should_enable_condition_cache_for_load() const;
309
    bool _should_enable_condition_cache_for_query() const;
310
    bool _should_push_down_predicates(TFileFormatType::type format_type) const;
311
    bool _should_push_down_predicates_for_load(TFileFormatType::type format_type) const;
312
    bool _should_push_down_predicates_for_query(TFileFormatType::type format_type) const;
313
    void _init_reader_condition_cache();
314
    void _finalize_reader_condition_cache();
315
    void _reset_adaptive_batch_size_state();
316
    void _init_adaptive_batch_size_state(TFileFormatType::type format_type);
317
    bool _should_enable_adaptive_batch_size(TFileFormatType::type format_type) const;
318
    bool _should_run_adaptive_batch_size() const;
319
    size_t _predict_reader_batch_rows();
320
    void _update_adaptive_batch_size_before_truncate(const Block& block);
321
    void _update_adaptive_batch_size_after_truncate(const Block& block);
322
323
739k
    TPushAggOp::type _get_push_down_agg_type() const {
324
739k
        return _local_state == nullptr ? TPushAggOp::type::NONE
325
739k
                                       : _local_state->get_push_down_agg_type();
326
739k
    }
327
328
    // enable the file meta cache only when
329
    // 1. max_external_file_meta_cache_num is > 0
330
    // 2. the file number is less than 1/3 of cache's capacibility
331
    // Otherwise, the cache miss rate will be high
332
65.3k
    bool _should_enable_file_meta_cache() {
333
65.3k
        return ExecEnv::GetInstance()->file_meta_cache()->enabled() &&
334
65.3k
               _split_source->num_scan_ranges() < config::max_external_file_meta_cache_num / 3;
335
65.3k
    }
336
};
337
} // namespace doris