Coverage Report

Created: 2026-03-30 20:29

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/file_scanner.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <stddef.h>
21
#include <stdint.h>
22
23
#include <memory>
24
#include <string>
25
#include <unordered_map>
26
#include <unordered_set>
27
#include <vector>
28
29
#include "common/factory_creator.h"
30
#include "common/global_types.h"
31
#include "common/status.h"
32
#include "core/block/block.h"
33
#include "exec/operator/file_scan_operator.h"
34
#include "exprs/vexpr_fwd.h"
35
#include "format/generic_reader.h"
36
#include "format/orc/vorc_reader.h"
37
#include "format/parquet/vparquet_reader.h"
38
#include "format/table/iceberg_reader.h"
39
#include "io/io_common.h"
40
#include "runtime/descriptors.h"
41
#include "runtime/runtime_profile.h"
42
#include "storage/olap_common.h"
43
#include "storage/olap_scan_common.h"
44
#include "storage/segment/condition_cache.h"
45
46
namespace doris {
47
class RuntimeState;
48
class TFileRangeDesc;
49
class TFileScanRange;
50
class TFileScanRangeParams;
51
52
class ShardedKVCache;
53
class VExpr;
54
class VExprContext;
55
} // namespace doris
56
57
namespace doris {
58
59
class FileScanner : public Scanner {
60
    ENABLE_FACTORY_CREATOR(FileScanner);
61
62
public:
63
    static constexpr const char* NAME = "FileScanner";
64
65
    // sub profile name (for parquet/orc)
66
    static const std::string FileReadBytesProfile;
67
    static const std::string FileReadTimeProfile;
68
69
    FileScanner(RuntimeState* state, FileScanLocalState* parent, int64_t limit,
70
                std::shared_ptr<SplitSourceConnector> split_source, RuntimeProfile* profile,
71
                ShardedKVCache* kv_cache,
72
                const std::unordered_map<std::string, int>* colname_to_slot_id);
73
74
    Status _open_impl(RuntimeState* state) override;
75
76
    Status close(RuntimeState* state) override;
77
78
    void try_stop() override;
79
80
    Status init(RuntimeState* state, const VExprContextSPtrs& conjuncts) override;
81
82
0
    std::string get_name() override { return FileScanner::NAME; }
83
84
70
    std::string get_current_scan_range_name() override { return _current_range_path; }
85
86
    //only used for read one line.
87
    FileScanner(RuntimeState* state, RuntimeProfile* profile, const TFileScanRangeParams* params,
88
                const std::unordered_map<std::string, int>* colname_to_slot_id,
89
                TupleDescriptor* tuple_desc)
90
2.98k
            : Scanner(state, profile),
91
2.98k
              _params(params),
92
2.98k
              _col_name_to_slot_id(colname_to_slot_id),
93
2.98k
              _real_tuple_desc(tuple_desc) {};
94
95
    Status read_lines_from_range(const TFileRangeDesc& range, const std::list<int64_t>& row_ids,
96
                                 Block* result_block, const ExternalFileMappingInfo& external_info,
97
                                 int64_t* init_reader_ms, int64_t* get_block_ms);
98
99
    Status prepare_for_read_lines(const TFileRangeDesc& range);
100
101
    void update_realtime_counters() override;
102
103
protected:
104
    Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override;
105
106
    Status _get_block_wrapped(RuntimeState* state, Block* block, bool* eof);
107
108
    Status _get_next_reader();
109
110
    // TODO: cast input block columns type to string.
111
0
    Status _cast_src_block(Block* block) { return Status::OK(); }
112
113
    void _collect_profile_before_close() override;
114
115
    // fe will add skip_bitmap_col to _input_tuple_desc iff the target olaptable has skip_bitmap_col
116
    // and the current load is a flexible partial update
117
5.51k
    bool _should_process_skip_bitmap_col() const { return _skip_bitmap_col_idx != -1; }
118
119
protected:
120
    const TFileScanRangeParams* _params = nullptr;
121
    std::shared_ptr<SplitSourceConnector> _split_source;
122
    bool _first_scan_range = false;
123
    TFileRangeDesc _current_range;
124
125
    std::unique_ptr<GenericReader> _cur_reader;
126
    bool _cur_reader_eof = false;
127
    // File source slot descriptors
128
    std::vector<SlotDescriptor*> _file_slot_descs;
129
    // col names from _file_slot_descs
130
    std::vector<std::string> _file_col_names;
131
132
    // Partition source slot descriptors
133
    std::vector<SlotDescriptor*> _partition_slot_descs;
134
    // Partition slot id to index in _partition_slot_descs
135
    std::unordered_map<SlotId, int> _partition_slot_index_map;
136
    // created from param.expr_of_dest_slot
137
    // For query, it saves default value expr of all dest columns, or nullptr for NULL.
138
    // For load, it saves conversion expr/default value of all dest columns.
139
    VExprContextSPtrs _dest_vexpr_ctx;
140
    // dest slot name to index in _dest_vexpr_ctx;
141
    std::unordered_map<std::string, int> _dest_slot_name_to_idx;
142
    // col name to default value expr
143
    std::unordered_map<std::string, VExprContextSPtr> _col_default_value_ctx;
144
    // the map values of dest slot id to src slot desc
145
    // if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr
146
    std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest;
147
    // dest slot desc index to src slot desc index
148
    std::unordered_map<int, int> _dest_slot_to_src_slot_index;
149
150
    std::unordered_map<std::string, uint32_t> _src_block_name_to_idx;
151
152
    // Get from GenericReader, save the existing columns in file to their type.
153
    std::unordered_map<std::string, DataTypePtr> _slot_lower_name_to_col_type;
154
    // Get from GenericReader, save columns that required by scan but not exist in file.
155
    // These columns will be filled by default value or null.
156
    std::unordered_set<std::string> _missing_cols;
157
158
    // The col lowercase name of source file to type of source file.
159
    std::map<std::string, DataTypePtr> _source_file_col_name_types;
160
161
    // For load task
162
    VExprContextSPtrs _pre_conjunct_ctxs;
163
    std::unique_ptr<RowDescriptor> _src_row_desc;
164
    std::unique_ptr<RowDescriptor> _dest_row_desc;
165
    // row desc for default exprs
166
    std::unique_ptr<RowDescriptor> _default_val_row_desc;
167
    // owned by scan node
168
    ShardedKVCache* _kv_cache = nullptr;
169
170
    std::set<TSlotId> _is_file_slot;
171
    bool _scanner_eof = false;
172
    int _rows = 0;
173
    int _num_of_columns_from_file;
174
175
    bool _src_block_mem_reuse = false;
176
    bool _strict_mode;
177
178
    bool _src_block_init = false;
179
    Block* _src_block_ptr = nullptr;
180
    Block _src_block;
181
182
    VExprContextSPtrs _push_down_conjuncts;
183
    VExprContextSPtrs _runtime_filter_partition_prune_ctxs;
184
    Block _runtime_filter_partition_prune_block;
185
186
    std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics;
187
    std::unique_ptr<io::FileReaderStats> _file_reader_stats;
188
    std::unique_ptr<io::IOContext> _io_ctx;
189
190
    // Whether to fill partition columns from path, default is true.
191
    bool _fill_partition_from_path = true;
192
    std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
193
            _partition_col_descs;
194
    std::unordered_map<std::string, bool> _partition_value_is_null;
195
    std::unordered_map<std::string, VExprContextSPtr> _missing_col_descs;
196
197
    // idx of skip_bitmap_col in _input_tuple_desc
198
    int32_t _skip_bitmap_col_idx {-1};
199
    int32_t _sequence_map_col_uid {-1};
200
    int32_t _sequence_col_uid {-1};
201
202
private:
203
    RuntimeProfile::Counter* _get_block_timer = nullptr;
204
    RuntimeProfile::Counter* _cast_to_input_block_timer = nullptr;
205
    RuntimeProfile::Counter* _fill_missing_columns_timer = nullptr;
206
    RuntimeProfile::Counter* _pre_filter_timer = nullptr;
207
    RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr;
208
    RuntimeProfile::Counter* _runtime_filter_partition_prune_timer = nullptr;
209
    RuntimeProfile::Counter* _empty_file_counter = nullptr;
210
    RuntimeProfile::Counter* _not_found_file_counter = nullptr;
211
    RuntimeProfile::Counter* _fully_skipped_file_counter = nullptr;
212
    RuntimeProfile::Counter* _file_counter = nullptr;
213
    RuntimeProfile::Counter* _file_read_bytes_counter = nullptr;
214
    RuntimeProfile::Counter* _file_read_calls_counter = nullptr;
215
    RuntimeProfile::Counter* _file_read_time_counter = nullptr;
216
    RuntimeProfile::Counter* _runtime_filter_partition_pruned_range_counter = nullptr;
217
218
    const std::unordered_map<std::string, int>* _col_name_to_slot_id = nullptr;
219
    // single slot filter conjuncts
220
    std::unordered_map<int, VExprContextSPtrs> _slot_id_to_filter_conjuncts;
221
    // not single(zero or multi) slot filter conjuncts
222
    VExprContextSPtrs _not_single_slot_filter_conjuncts;
223
    // save the path of current scan range
224
    std::string _current_range_path = "";
225
226
    // Only for load scan node.
227
    const TupleDescriptor* _input_tuple_desc = nullptr;
228
    // If _input_tuple_desc is set,
229
    // the _real_tuple_desc will point to _input_tuple_desc,
230
    // otherwise, point to _output_tuple_desc
231
    const TupleDescriptor* _real_tuple_desc = nullptr;
232
233
    std::pair<std::shared_ptr<RowIdColumnIteratorV2>, int> _row_id_column_iterator_pair = {nullptr,
234
                                                                                           -1};
235
    bool _need_iceberg_rowid_column = false;
236
    int _iceberg_rowid_column_pos = -1;
237
    // for iceberg row lineage
238
    RowLineageColumns _row_lineage_columns;
239
    int64_t _last_bytes_read_from_local = 0;
240
    int64_t _last_bytes_read_from_remote = 0;
241
242
    // Condition cache for external tables
243
    uint64_t _condition_cache_digest = 0;
244
    segment_v2::ConditionCache::ExternalCacheKey _condition_cache_key;
245
    std::shared_ptr<std::vector<bool>> _condition_cache;
246
    std::shared_ptr<ConditionCacheContext> _condition_cache_ctx;
247
    int64_t _condition_cache_hit_count = 0;
248
249
    Status _init_expr_ctxes();
250
    Status _init_src_block(Block* block);
251
    Status _check_output_block_types();
252
    Status _cast_to_input_block(Block* block);
253
    Status _fill_columns_from_path(size_t rows);
254
    Status _fill_missing_columns(size_t rows);
255
    Status _pre_filter_src_block();
256
    Status _convert_to_output_block(Block* block);
257
    Status _truncate_char_or_varchar_columns(Block* block);
258
    void _truncate_char_or_varchar_column(Block* block, int idx, int len);
259
    Status _generate_partition_columns();
260
    Status _generate_missing_columns();
261
    bool _check_partition_prune_expr(const VExprSPtr& expr);
262
    void _init_runtime_filter_partition_prune_ctxs();
263
    void _init_runtime_filter_partition_prune_block();
264
    Status _process_runtime_filters_partition_prune(bool& is_partition_pruned);
265
    Status _process_conjuncts();
266
    Status _process_late_arrival_conjuncts();
267
    void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
268
    Status _generate_truncate_columns(bool need_to_get_parsed_schema);
269
    Status _set_fill_or_truncate_columns(bool need_to_get_parsed_schema);
270
    Status _init_orc_reader(std::unique_ptr<OrcReader>&& orc_reader,
271
                            FileMetaCache* file_meta_cache_ptr);
272
    Status _init_parquet_reader(std::unique_ptr<ParquetReader>&& parquet_reader,
273
                                FileMetaCache* file_meta_cache_ptr);
274
    Status _create_row_id_column_iterator();
275
276
73.2k
    TFileFormatType::type _get_current_format_type() {
277
        // for compatibility, if format_type is not set in range, use the format type of params
278
73.2k
        const TFileRangeDesc& range = _current_range;
279
73.2k
        return range.__isset.format_type ? range.format_type : _params->format_type;
280
73.2k
    };
281
282
88.5k
    Status _init_io_ctx() {
283
88.5k
        _io_ctx.reset(new io::IOContext());
284
88.5k
        _io_ctx->query_id = &_state->query_id();
285
88.5k
        return Status::OK();
286
88.5k
    };
287
288
0
    void _reset_counter() {
289
0
        _counter.num_rows_unselected = 0;
290
0
        _counter.num_rows_filtered = 0;
291
0
    }
292
293
    bool _should_enable_condition_cache();
294
    void _init_reader_condition_cache();
295
    void _finalize_reader_condition_cache();
296
297
326k
    TPushAggOp::type _get_push_down_agg_type() {
298
326k
        return _local_state == nullptr ? TPushAggOp::type::NONE
299
326k
                                       : _local_state->get_push_down_agg_type();
300
326k
    }
301
302
    // enable the file meta cache only when
303
    // 1. max_external_file_meta_cache_num is > 0
304
    // 2. the file number is less than 1/3 of cache's capacibility
305
    // Otherwise, the cache miss rate will be high
306
64.9k
    bool _should_enable_file_meta_cache() {
307
64.9k
        return ExecEnv::GetInstance()->file_meta_cache()->enabled() &&
308
64.9k
               _split_source->num_scan_ranges() < config::max_external_file_meta_cache_num / 3;
309
64.9k
    }
310
};
311
} // namespace doris