Coverage Report

Created: 2026-03-12 17:15

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/file_scanner.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <stddef.h>
21
#include <stdint.h>
22
23
#include <memory>
24
#include <string>
25
#include <unordered_map>
26
#include <unordered_set>
27
#include <vector>
28
29
#include "common/factory_creator.h"
30
#include "common/global_types.h"
31
#include "common/status.h"
32
#include "core/block/block.h"
33
#include "exec/operator/file_scan_operator.h"
34
#include "exprs/vexpr_fwd.h"
35
#include "format/generic_reader.h"
36
#include "format/orc/vorc_reader.h"
37
#include "format/parquet/vparquet_reader.h"
38
#include "io/io_common.h"
39
#include "runtime/descriptors.h"
40
#include "runtime/runtime_profile.h"
41
#include "storage/olap_scan_common.h"
42
43
namespace doris {
44
class RuntimeState;
45
class TFileRangeDesc;
46
class TFileScanRange;
47
class TFileScanRangeParams;
48
49
class ShardedKVCache;
50
class VExpr;
51
class VExprContext;
52
} // namespace doris
53
54
namespace doris {
55
56
class FileScanner : public Scanner {
57
    ENABLE_FACTORY_CREATOR(FileScanner);
58
59
public:
60
    static constexpr const char* NAME = "FileScanner";
61
62
    // sub profile name (for parquet/orc)
63
    static const std::string FileReadBytesProfile;
64
    static const std::string FileReadTimeProfile;
65
66
    FileScanner(RuntimeState* state, FileScanLocalState* parent, int64_t limit,
67
                std::shared_ptr<SplitSourceConnector> split_source, RuntimeProfile* profile,
68
                ShardedKVCache* kv_cache,
69
                const std::unordered_map<std::string, int>* colname_to_slot_id);
70
71
    Status _open_impl(RuntimeState* state) override;
72
73
    Status close(RuntimeState* state) override;
74
75
    void try_stop() override;
76
77
    Status init(RuntimeState* state, const VExprContextSPtrs& conjuncts) override;
78
79
0
    std::string get_name() override { return FileScanner::NAME; }
80
81
53
    std::string get_current_scan_range_name() override { return _current_range_path; }
82
83
    //only used for read one line.
84
    FileScanner(RuntimeState* state, RuntimeProfile* profile, const TFileScanRangeParams* params,
85
                const std::unordered_map<std::string, int>* colname_to_slot_id,
86
                TupleDescriptor* tuple_desc)
87
4.17k
            : Scanner(state, profile),
88
4.17k
              _params(params),
89
4.17k
              _col_name_to_slot_id(colname_to_slot_id),
90
4.17k
              _real_tuple_desc(tuple_desc) {};
91
92
    Status read_lines_from_range(const TFileRangeDesc& range, const std::list<int64_t>& row_ids,
93
                                 Block* result_block, const ExternalFileMappingInfo& external_info,
94
                                 int64_t* init_reader_ms, int64_t* get_block_ms);
95
96
    Status prepare_for_read_lines(const TFileRangeDesc& range);
97
98
    void update_realtime_counters() override;
99
100
protected:
101
    Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override;
102
103
    Status _get_block_wrapped(RuntimeState* state, Block* block, bool* eof);
104
105
    Status _get_next_reader();
106
107
    // TODO: cast input block columns type to string.
108
0
    Status _cast_src_block(Block* block) { return Status::OK(); }
109
110
    void _collect_profile_before_close() override;
111
112
    // fe will add skip_bitmap_col to _input_tuple_desc iff the target olaptable has skip_bitmap_col
113
    // and the current load is a flexible partial update
114
344
    bool _should_process_skip_bitmap_col() const { return _skip_bitmap_col_idx != -1; }
115
116
protected:
117
    const TFileScanRangeParams* _params = nullptr;
118
    std::shared_ptr<SplitSourceConnector> _split_source;
119
    bool _first_scan_range = false;
120
    TFileRangeDesc _current_range;
121
122
    std::unique_ptr<GenericReader> _cur_reader;
123
    bool _cur_reader_eof = false;
124
    // File source slot descriptors
125
    std::vector<SlotDescriptor*> _file_slot_descs;
126
    // col names from _file_slot_descs
127
    std::vector<std::string> _file_col_names;
128
129
    // Partition source slot descriptors
130
    std::vector<SlotDescriptor*> _partition_slot_descs;
131
    // Partition slot id to index in _partition_slot_descs
132
    std::unordered_map<SlotId, int> _partition_slot_index_map;
133
    // created from param.expr_of_dest_slot
134
    // For query, it saves default value expr of all dest columns, or nullptr for NULL.
135
    // For load, it saves conversion expr/default value of all dest columns.
136
    VExprContextSPtrs _dest_vexpr_ctx;
137
    // dest slot name to index in _dest_vexpr_ctx;
138
    std::unordered_map<std::string, int> _dest_slot_name_to_idx;
139
    // col name to default value expr
140
    std::unordered_map<std::string, VExprContextSPtr> _col_default_value_ctx;
141
    // the map values of dest slot id to src slot desc
142
    // if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr
143
    std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest;
144
    // dest slot desc index to src slot desc index
145
    std::unordered_map<int, int> _dest_slot_to_src_slot_index;
146
147
    std::unordered_map<std::string, uint32_t> _src_block_name_to_idx;
148
149
    // Get from GenericReader, save the existing columns in file to their type.
150
    std::unordered_map<std::string, DataTypePtr> _slot_lower_name_to_col_type;
151
    // Get from GenericReader, save columns that required by scan but not exist in file.
152
    // These columns will be filled by default value or null.
153
    std::unordered_set<std::string> _missing_cols;
154
155
    // The col lowercase name of source file to type of source file.
156
    std::map<std::string, DataTypePtr> _source_file_col_name_types;
157
158
    // For load task
159
    VExprContextSPtrs _pre_conjunct_ctxs;
160
    std::unique_ptr<RowDescriptor> _src_row_desc;
161
    std::unique_ptr<RowDescriptor> _dest_row_desc;
162
    // row desc for default exprs
163
    std::unique_ptr<RowDescriptor> _default_val_row_desc;
164
    // owned by scan node
165
    ShardedKVCache* _kv_cache = nullptr;
166
167
    std::set<TSlotId> _is_file_slot;
168
    bool _scanner_eof = false;
169
    int _rows = 0;
170
    int _num_of_columns_from_file;
171
172
    bool _src_block_mem_reuse = false;
173
    bool _strict_mode;
174
175
    bool _src_block_init = false;
176
    Block* _src_block_ptr = nullptr;
177
    Block _src_block;
178
179
    VExprContextSPtrs _push_down_conjuncts;
180
    VExprContextSPtrs _runtime_filter_partition_prune_ctxs;
181
    Block _runtime_filter_partition_prune_block;
182
183
    std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics;
184
    std::unique_ptr<io::FileReaderStats> _file_reader_stats;
185
    std::unique_ptr<io::IOContext> _io_ctx;
186
187
    // Whether to fill partition columns from path, default is true.
188
    bool _fill_partition_from_path = true;
189
    std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
190
            _partition_col_descs;
191
    std::unordered_map<std::string, bool> _partition_value_is_null;
192
    std::unordered_map<std::string, VExprContextSPtr> _missing_col_descs;
193
194
    // idx of skip_bitmap_col in _input_tuple_desc
195
    int32_t _skip_bitmap_col_idx {-1};
196
    int32_t _sequence_map_col_uid {-1};
197
    int32_t _sequence_col_uid {-1};
198
199
private:
200
    RuntimeProfile::Counter* _get_block_timer = nullptr;
201
    RuntimeProfile::Counter* _cast_to_input_block_timer = nullptr;
202
    RuntimeProfile::Counter* _fill_missing_columns_timer = nullptr;
203
    RuntimeProfile::Counter* _pre_filter_timer = nullptr;
204
    RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr;
205
    RuntimeProfile::Counter* _runtime_filter_partition_prune_timer = nullptr;
206
    RuntimeProfile::Counter* _empty_file_counter = nullptr;
207
    RuntimeProfile::Counter* _not_found_file_counter = nullptr;
208
    RuntimeProfile::Counter* _fully_skipped_file_counter = nullptr;
209
    RuntimeProfile::Counter* _file_counter = nullptr;
210
    RuntimeProfile::Counter* _file_read_bytes_counter = nullptr;
211
    RuntimeProfile::Counter* _file_read_calls_counter = nullptr;
212
    RuntimeProfile::Counter* _file_read_time_counter = nullptr;
213
    RuntimeProfile::Counter* _runtime_filter_partition_pruned_range_counter = nullptr;
214
215
    const std::unordered_map<std::string, int>* _col_name_to_slot_id = nullptr;
216
    // single slot filter conjuncts
217
    std::unordered_map<int, VExprContextSPtrs> _slot_id_to_filter_conjuncts;
218
    // not single(zero or multi) slot filter conjuncts
219
    VExprContextSPtrs _not_single_slot_filter_conjuncts;
220
    // save the path of current scan range
221
    std::string _current_range_path = "";
222
223
    // Only for load scan node.
224
    const TupleDescriptor* _input_tuple_desc = nullptr;
225
    // If _input_tuple_desc is set,
226
    // the _real_tuple_desc will point to _input_tuple_desc,
227
    // otherwise, point to _output_tuple_desc
228
    const TupleDescriptor* _real_tuple_desc = nullptr;
229
230
    std::pair<std::shared_ptr<RowIdColumnIteratorV2>, int> _row_id_column_iterator_pair = {nullptr,
231
                                                                                           -1};
232
    int64_t _last_bytes_read_from_local = 0;
233
    int64_t _last_bytes_read_from_remote = 0;
234
235
private:
236
    Status _init_expr_ctxes();
237
    Status _init_src_block(Block* block);
238
    Status _check_output_block_types();
239
    Status _cast_to_input_block(Block* block);
240
    Status _fill_columns_from_path(size_t rows);
241
    Status _fill_missing_columns(size_t rows);
242
    Status _pre_filter_src_block();
243
    Status _convert_to_output_block(Block* block);
244
    Status _truncate_char_or_varchar_columns(Block* block);
245
    void _truncate_char_or_varchar_column(Block* block, int idx, int len);
246
    Status _generate_partition_columns();
247
    Status _generate_missing_columns();
248
    bool _check_partition_prune_expr(const VExprSPtr& expr);
249
    void _init_runtime_filter_partition_prune_ctxs();
250
    void _init_runtime_filter_partition_prune_block();
251
    Status _process_runtime_filters_partition_prune(bool& is_partition_pruned);
252
    Status _process_conjuncts();
253
    Status _process_late_arrival_conjuncts();
254
    void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
255
    Status _generate_truncate_columns(bool need_to_get_parsed_schema);
256
    Status _set_fill_or_truncate_columns(bool need_to_get_parsed_schema);
257
    Status _init_orc_reader(std::unique_ptr<OrcReader>&& orc_reader,
258
                            FileMetaCache* file_meta_cache_ptr);
259
    Status _init_parquet_reader(std::unique_ptr<ParquetReader>&& parquet_reader,
260
                                FileMetaCache* file_meta_cache_ptr);
261
    Status _create_row_id_column_iterator();
262
263
78.6k
    TFileFormatType::type _get_current_format_type() {
264
        // for compatibility, if format_type is not set in range, use the format type of params
265
78.6k
        const TFileRangeDesc& range = _current_range;
266
78.6k
        return range.__isset.format_type ? range.format_type : _params->format_type;
267
78.6k
    };
268
269
93.6k
    Status _init_io_ctx() {
270
93.6k
        _io_ctx.reset(new io::IOContext());
271
93.6k
        _io_ctx->query_id = &_state->query_id();
272
93.6k
        return Status::OK();
273
93.6k
    };
274
275
0
    void _reset_counter() {
276
0
        _counter.num_rows_unselected = 0;
277
0
        _counter.num_rows_filtered = 0;
278
0
    }
279
280
344k
    TPushAggOp::type _get_push_down_agg_type() {
281
344k
        return _local_state == nullptr ? TPushAggOp::type::NONE
282
344k
                                       : _local_state->get_push_down_agg_type();
283
344k
    }
284
285
    // enable the file meta cache only when
286
    // 1. max_external_file_meta_cache_num is > 0
287
    // 2. the file number is less than 1/3 of cache's capacibility
288
    // Otherwise, the cache miss rate will be high
289
77.3k
    bool _should_enable_file_meta_cache() {
290
77.3k
        return ExecEnv::GetInstance()->file_meta_cache()->enabled() &&
291
77.3k
               _split_source->num_scan_ranges() < config::max_external_file_meta_cache_num / 3;
292
77.3k
    }
293
};
294
} // namespace doris