Coverage Report

Created: 2026-06-17 07:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/file_scanner.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <stddef.h>
21
#include <stdint.h>
22
23
#include <memory>
24
#include <string>
25
#include <unordered_map>
26
#include <unordered_set>
27
#include <vector>
28
29
#include "common/config.h"
30
#include "common/factory_creator.h"
31
#include "common/global_types.h"
32
#include "common/status.h"
33
#include "core/block/block.h"
34
#include "exec/operator/file_scan_operator.h"
35
#include "exprs/vexpr_fwd.h"
36
#include "format/generic_reader.h"
37
#include "format/orc/vorc_reader.h"
38
#include "format/parquet/vparquet_reader.h"
39
#include "format/table/iceberg_reader.h"
40
#include "io/io_common.h"
41
#include "runtime/descriptors.h"
42
#include "runtime/runtime_profile.h"
43
#include "storage/olap_common.h"
44
#include "storage/olap_scan_common.h"
45
#include "storage/segment/adaptive_block_size_predictor.h"
46
#include "storage/segment/condition_cache.h"
47
48
namespace doris {
49
class RuntimeState;
50
class TFileRangeDesc;
51
class TFileScanRange;
52
class TFileScanRangeParams;
53
54
class ShardedKVCache;
55
class VExpr;
56
class VExprContext;
57
} // namespace doris
58
59
namespace doris {
60
61
class FileScanner : public Scanner {
62
    ENABLE_FACTORY_CREATOR(FileScanner);
63
64
public:
65
    static constexpr const char* NAME = "FileScanner";
66
    static constexpr size_t ADAPTIVE_BATCH_INITIAL_PROBE_ROWS = 32;
67
68
    // sub profile name (for parquet/orc)
69
    static const std::string FileReadBytesProfile;
70
    static const std::string FileReadTimeProfile;
71
72
    FileScanner(RuntimeState* state, FileScanLocalState* parent, int64_t limit,
73
                std::shared_ptr<SplitSourceConnector> split_source, RuntimeProfile* profile,
74
                ShardedKVCache* kv_cache,
75
                const std::unordered_map<std::string, int>* colname_to_slot_id);
76
77
    Status _open_impl(RuntimeState* state) override;
78
79
    Status close(RuntimeState* state) override;
80
81
    void try_stop() override;
82
83
    Status init(RuntimeState* state, const VExprContextSPtrs& conjuncts) override;
84
85
0
    std::string get_name() override { return FileScanner::NAME; }
86
87
72
    std::string get_current_scan_range_name() override { return _current_range_path; }
88
89
    //only used for read one line.
90
    FileScanner(RuntimeState* state, RuntimeProfile* profile, const TFileScanRangeParams* params,
91
                const std::unordered_map<std::string, int>* colname_to_slot_id,
92
                TupleDescriptor* tuple_desc)
93
3.24k
            : Scanner(state, profile),
94
3.24k
              _params(params),
95
3.24k
              _col_name_to_slot_id(colname_to_slot_id),
96
3.24k
              _real_tuple_desc(tuple_desc) {
97
3.24k
        _configure_file_scan_handlers();
98
3.24k
    };
99
100
    Status read_lines_from_range(const TFileRangeDesc& range, const std::list<int64_t>& row_ids,
101
                                 Block* result_block, const ExternalFileMappingInfo& external_info,
102
                                 int64_t* init_reader_ms, int64_t* get_block_ms);
103
104
    Status prepare_for_read_lines(const TFileRangeDesc& range);
105
106
    void update_realtime_counters() override;
107
108
protected:
109
    Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override;
110
111
    Status _get_block_wrapped(RuntimeState* state, Block* block, bool* eof);
112
113
    Status _get_next_reader();
114
115
    // Build a ReaderInitContext with shared fields from FileScanner members.
116
    void _fill_base_init_context(ReaderInitContext* ctx);
117
118
    // TODO: cast input block columns type to string.
119
0
    Status _cast_src_block(Block* block) { return Status::OK(); }
120
121
    void _collect_profile_before_close() override;
122
123
    // fe will add skip_bitmap_col to _input_tuple_desc iff the target olaptable has skip_bitmap_col
124
    // and the current load is a flexible partial update
125
8.43k
    bool _should_process_skip_bitmap_col() const { return _skip_bitmap_col_idx != -1; }
126
127
protected:
128
    const TFileScanRangeParams* _params = nullptr;
129
    std::shared_ptr<SplitSourceConnector> _split_source;
130
    bool _first_scan_range = false;
131
    TFileRangeDesc _current_range;
132
133
    std::unique_ptr<GenericReader> _cur_reader;
134
    bool _cur_reader_eof = false;
135
    // File source slot descriptors
136
    std::vector<SlotDescriptor*> _file_slot_descs;
137
    // col names from _file_slot_descs
138
    std::vector<std::string> _file_col_names;
139
    // Unified column descriptors for init_reader (includes file, partition, missing, synthesized cols)
140
    std::vector<ColumnDescriptor> _column_descs;
141
142
    // Partition slot id to partition key index (for matching columns_from_path)
143
    std::unordered_map<SlotId, int> _partition_slot_index_map;
144
    // created from param.expr_of_dest_slot
145
    // For query, it saves default value expr of all dest columns, or nullptr for NULL.
146
    // For load, it saves conversion expr/default value of all dest columns.
147
    VExprContextSPtrs _dest_vexpr_ctx;
148
    // dest slot name to index in _dest_vexpr_ctx;
149
    std::unordered_map<std::string, int> _dest_slot_name_to_idx;
150
    // col name to default value expr
151
    std::unordered_map<std::string, VExprContextSPtr> _col_default_value_ctx;
152
    // the map values of dest slot id to src slot desc
153
    // if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr
154
    std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest;
155
    // dest slot desc index to src slot desc index
156
    std::unordered_map<int, int> _dest_slot_to_src_slot_index;
157
158
    std::unordered_map<std::string, uint32_t> _src_block_name_to_idx;
159
160
    // Get from GenericReader, save the existing columns in file to their type.
161
    std::unordered_map<std::string, DataTypePtr> _slot_lower_name_to_col_type;
162
    // Get from GenericReader, save columns that required by scan but not exist in file.
163
164
    // The col lowercase name of source file to type of source file.
165
    std::map<std::string, DataTypePtr> _source_file_col_name_types;
166
167
    // For load task
168
    VExprContextSPtrs _pre_conjunct_ctxs;
169
    std::unique_ptr<RowDescriptor> _src_row_desc;
170
    std::unique_ptr<RowDescriptor> _dest_row_desc;
171
    // row desc for default exprs
172
    std::unique_ptr<RowDescriptor> _default_val_row_desc;
173
    // owned by scan node
174
    ShardedKVCache* _kv_cache = nullptr;
175
176
    std::set<TSlotId> _is_file_slot;
177
    bool _scanner_eof = false;
178
    int _rows = 0;
179
    int _num_of_columns_from_file;
180
181
    bool _src_block_mem_reuse = false;
182
    bool _strict_mode;
183
184
    bool _src_block_init = false;
185
    Block* _src_block_ptr = nullptr;
186
    Block _src_block;
187
188
    VExprContextSPtrs _push_down_conjuncts;
189
    VExprContextSPtrs _runtime_filter_partition_prune_ctxs;
190
    Block _runtime_filter_partition_prune_block;
191
192
    std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics;
193
    std::unique_ptr<io::FileReaderStats> _file_reader_stats;
194
    std::shared_ptr<io::IOContext> _io_ctx;
195
196
    // Whether to fill partition columns from path, default is true.
197
    bool _fill_partition_from_path = true;
198
    std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
199
            _partition_col_descs;
200
    std::unordered_map<std::string, bool> _partition_value_is_null;
201
202
    // idx of skip_bitmap_col in _input_tuple_desc
203
    int32_t _skip_bitmap_col_idx {-1};
204
    int32_t _sequence_map_col_uid {-1};
205
    int32_t _sequence_col_uid {-1};
206
207
private:
208
    RuntimeProfile::Counter* _get_block_timer = nullptr;
209
    RuntimeProfile::Counter* _cast_to_input_block_timer = nullptr;
210
    RuntimeProfile::Counter* _fill_missing_columns_timer = nullptr;
211
    RuntimeProfile::Counter* _pre_filter_timer = nullptr;
212
    RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr;
213
    RuntimeProfile::Counter* _runtime_filter_partition_prune_timer = nullptr;
214
    RuntimeProfile::Counter* _empty_file_counter = nullptr;
215
    RuntimeProfile::Counter* _not_found_file_counter = nullptr;
216
    RuntimeProfile::Counter* _fully_skipped_file_counter = nullptr;
217
    RuntimeProfile::Counter* _file_counter = nullptr;
218
    RuntimeProfile::Counter* _file_read_bytes_counter = nullptr;
219
    RuntimeProfile::Counter* _file_read_calls_counter = nullptr;
220
    RuntimeProfile::Counter* _file_read_time_counter = nullptr;
221
    RuntimeProfile::Counter* _runtime_filter_partition_pruned_range_counter = nullptr;
222
    RuntimeProfile::Counter* _adaptive_batch_predicted_rows_counter = nullptr;
223
    RuntimeProfile::Counter* _adaptive_batch_actual_bytes_before_truncate_counter = nullptr;
224
    RuntimeProfile::Counter* _adaptive_batch_actual_bytes_after_truncate_counter = nullptr;
225
    RuntimeProfile::Counter* _adaptive_batch_probe_count_counter = nullptr;
226
227
    const std::unordered_map<std::string, int>* _col_name_to_slot_id = nullptr;
228
    // single slot filter conjuncts
229
    std::unordered_map<int, VExprContextSPtrs> _slot_id_to_filter_conjuncts;
230
    // not single(zero or multi) slot filter conjuncts
231
    VExprContextSPtrs _not_single_slot_filter_conjuncts;
232
    // save the path of current scan range
233
    std::string _current_range_path = "";
234
235
    // Only for load scan node.
236
    const TupleDescriptor* _input_tuple_desc = nullptr;
237
    // If _input_tuple_desc is set,
238
    // the _real_tuple_desc will point to _input_tuple_desc,
239
    // otherwise, point to _output_tuple_desc
240
    const TupleDescriptor* _real_tuple_desc = nullptr;
241
242
    int64_t _last_bytes_read_from_local = 0;
243
    int64_t _last_bytes_read_from_remote = 0;
244
245
    Status (FileScanner::*_init_src_block_handler)(Block* block) = nullptr;
246
    Status (FileScanner::*_process_src_block_after_read_handler)(Block* block) = nullptr;
247
    bool (FileScanner::*_should_push_down_predicates_handler)(
248
            TFileFormatType::type format_type) const = nullptr;
249
    bool (FileScanner::*_should_enable_condition_cache_handler)() const = nullptr;
250
251
    // Condition cache for external tables
252
    uint64_t _condition_cache_digest = 0;
253
    segment_v2::ConditionCache::ExternalCacheKey _condition_cache_key;
254
    std::shared_ptr<std::vector<bool>> _condition_cache;
255
    std::shared_ptr<ConditionCacheContext> _condition_cache_ctx;
256
    int64_t _condition_cache_hit_count = 0;
257
    std::unique_ptr<AdaptiveBlockSizePredictor> _block_size_predictor;
258
259
    void _configure_file_scan_handlers();
260
261
    Status _init_expr_ctxes();
262
    Status _init_src_block(Block* block);
263
    Status _init_src_block_for_load(Block* block);
264
    Status _init_src_block_for_query(Block* block);
265
    Status _process_src_block_after_read(Block* block);
266
    Status _process_src_block_after_read_for_load(Block* block);
267
    Status _process_src_block_after_read_for_query(Block* block);
268
    Status _check_output_block_types();
269
    Status _cast_to_input_block(Block* block);
270
    Status _pre_filter_src_block();
271
    Status _convert_to_output_block(Block* block);
272
    Status _truncate_char_or_varchar_columns(Block* block);
273
    void _truncate_char_or_varchar_column(Block* block, int idx, int len);
274
    Status _generate_partition_columns();
275
276
    bool _check_partition_prune_expr(const VExprSPtr& expr);
277
    bool _contains_runtime_filter(const VExprContextSPtrs& conjuncts) const;
278
    void _init_runtime_filter_partition_prune_ctxs();
279
    void _init_runtime_filter_partition_prune_block();
280
    Status _process_runtime_filters_partition_prune(bool& is_partition_pruned);
281
    Status _process_conjuncts();
282
    Status _process_late_arrival_conjuncts();
283
    void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
284
    Status _generate_truncate_columns(bool need_to_get_parsed_schema);
285
    Status _set_fill_or_truncate_columns(bool need_to_get_parsed_schema);
286
    Status _init_orc_reader(FileMetaCache* file_meta_cache_ptr,
287
                            std::unique_ptr<OrcReader> orc_reader = nullptr,
288
                            bool enable_file_meta_memory_cache = true);
289
    Status _init_parquet_reader(FileMetaCache* file_meta_cache_ptr,
290
                                std::unique_ptr<ParquetReader> parquet_reader = nullptr,
291
                                bool enable_file_meta_memory_cache = true);
292
    std::shared_ptr<segment_v2::RowIdColumnIteratorV2> _create_row_id_column_iterator();
293
294
77.5k
    TFileFormatType::type _get_current_format_type() {
295
        // for compatibility, if format_type is not set in range, use the format type of params
296
77.5k
        const TFileRangeDesc& range = _current_range;
297
77.5k
        return range.__isset.format_type ? range.format_type : _params->format_type;
298
77.5k
    };
299
300
93.0k
    Status _init_io_ctx() {
301
93.0k
        _io_ctx = std::make_shared<io::IOContext>();
302
93.0k
        _io_ctx->query_id = &_state->query_id();
303
93.0k
        return Status::OK();
304
93.0k
    };
305
306
0
    void _reset_counter() {
307
0
        _counter.num_rows_unselected = 0;
308
0
        _counter.num_rows_filtered = 0;
309
0
    }
310
311
    bool _should_enable_condition_cache();
312
    bool _should_enable_condition_cache_for_load() const;
313
    bool _should_enable_condition_cache_for_query() const;
314
    bool _should_push_down_predicates(TFileFormatType::type format_type) const;
315
    bool _should_push_down_predicates_for_load(TFileFormatType::type format_type) const;
316
    bool _should_push_down_predicates_for_query(TFileFormatType::type format_type) const;
317
    void _init_reader_condition_cache();
318
    void _finalize_reader_condition_cache();
319
    void _reset_adaptive_batch_size_state();
320
    void _init_adaptive_batch_size_state(TFileFormatType::type format_type);
321
    bool _should_enable_adaptive_batch_size(TFileFormatType::type format_type) const;
322
    bool _should_run_adaptive_batch_size() const;
323
    size_t _predict_reader_batch_rows();
324
    void _update_adaptive_batch_size_before_truncate(const Block& block);
325
    void _update_adaptive_batch_size_after_truncate(const Block& block);
326
327
1.08M
    TPushAggOp::type _get_push_down_agg_type() const {
328
1.08M
        return _local_state == nullptr ? TPushAggOp::type::NONE
329
1.08M
                                       : _local_state->get_push_down_agg_type();
330
1.08M
    }
331
332
    bool _should_enable_file_meta_cache() const;
333
    bool _should_enable_file_meta_cache(bool enable_file_meta_memory_cache) const;
334
    bool _should_enable_file_meta_memory_cache() const;
335
};
336
} // namespace doris