be/src/exec/scan/file_scanner.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <stddef.h> |
21 | | #include <stdint.h> |
22 | | |
23 | | #include <memory> |
24 | | #include <string> |
25 | | #include <unordered_map> |
26 | | #include <unordered_set> |
27 | | #include <vector> |
28 | | |
29 | | #include "common/factory_creator.h" |
30 | | #include "common/global_types.h" |
31 | | #include "common/status.h" |
32 | | #include "core/block/block.h" |
33 | | #include "exec/operator/file_scan_operator.h" |
34 | | #include "exprs/vexpr_fwd.h" |
35 | | #include "format/generic_reader.h" |
36 | | #include "format/orc/vorc_reader.h" |
37 | | #include "format/parquet/vparquet_reader.h" |
38 | | #include "format/table/iceberg_reader.h" |
39 | | #include "io/io_common.h" |
40 | | #include "runtime/descriptors.h" |
41 | | #include "runtime/runtime_profile.h" |
42 | | #include "storage/olap_common.h" |
43 | | #include "storage/olap_scan_common.h" |
44 | | #include "storage/segment/condition_cache.h" |
45 | | |
46 | | namespace doris { |
47 | | class RuntimeState; |
48 | | class TFileRangeDesc; |
49 | | class TFileScanRange; |
50 | | class TFileScanRangeParams; |
51 | | |
52 | | class ShardedKVCache; |
53 | | class VExpr; |
54 | | class VExprContext; |
55 | | } // namespace doris |
56 | | |
57 | | namespace doris { |
58 | | |
59 | | class FileScanner : public Scanner { |
60 | | ENABLE_FACTORY_CREATOR(FileScanner); |
61 | | |
62 | | public: |
63 | | static constexpr const char* NAME = "FileScanner"; |
64 | | |
65 | | // sub profile name (for parquet/orc) |
66 | | static const std::string FileReadBytesProfile; |
67 | | static const std::string FileReadTimeProfile; |
68 | | |
69 | | FileScanner(RuntimeState* state, FileScanLocalState* parent, int64_t limit, |
70 | | std::shared_ptr<SplitSourceConnector> split_source, RuntimeProfile* profile, |
71 | | ShardedKVCache* kv_cache, |
72 | | const std::unordered_map<std::string, int>* colname_to_slot_id); |
73 | | |
74 | | Status _open_impl(RuntimeState* state) override; |
75 | | |
76 | | Status close(RuntimeState* state) override; |
77 | | |
78 | | void try_stop() override; |
79 | | |
80 | | Status init(RuntimeState* state, const VExprContextSPtrs& conjuncts) override; |
81 | | |
82 | 0 | std::string get_name() override { return FileScanner::NAME; } |
83 | | |
84 | 70 | std::string get_current_scan_range_name() override { return _current_range_path; } |
85 | | |
86 | | //only used for read one line. |
87 | | FileScanner(RuntimeState* state, RuntimeProfile* profile, const TFileScanRangeParams* params, |
88 | | const std::unordered_map<std::string, int>* colname_to_slot_id, |
89 | | TupleDescriptor* tuple_desc) |
90 | 2.98k | : Scanner(state, profile), |
91 | 2.98k | _params(params), |
92 | 2.98k | _col_name_to_slot_id(colname_to_slot_id), |
93 | 2.98k | _real_tuple_desc(tuple_desc) {}; |
94 | | |
95 | | Status read_lines_from_range(const TFileRangeDesc& range, const std::list<int64_t>& row_ids, |
96 | | Block* result_block, const ExternalFileMappingInfo& external_info, |
97 | | int64_t* init_reader_ms, int64_t* get_block_ms); |
98 | | |
99 | | Status prepare_for_read_lines(const TFileRangeDesc& range); |
100 | | |
101 | | void update_realtime_counters() override; |
102 | | |
103 | | protected: |
104 | | Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override; |
105 | | |
106 | | Status _get_block_wrapped(RuntimeState* state, Block* block, bool* eof); |
107 | | |
108 | | Status _get_next_reader(); |
109 | | |
110 | | // TODO: cast input block columns type to string. |
111 | 0 | Status _cast_src_block(Block* block) { return Status::OK(); } |
112 | | |
113 | | void _collect_profile_before_close() override; |
114 | | |
115 | | // fe will add skip_bitmap_col to _input_tuple_desc iff the target olaptable has skip_bitmap_col |
116 | | // and the current load is a flexible partial update |
117 | 5.51k | bool _should_process_skip_bitmap_col() const { return _skip_bitmap_col_idx != -1; } |
118 | | |
119 | | protected: |
120 | | const TFileScanRangeParams* _params = nullptr; |
121 | | std::shared_ptr<SplitSourceConnector> _split_source; |
122 | | bool _first_scan_range = false; |
123 | | TFileRangeDesc _current_range; |
124 | | |
125 | | std::unique_ptr<GenericReader> _cur_reader; |
126 | | bool _cur_reader_eof = false; |
127 | | // File source slot descriptors |
128 | | std::vector<SlotDescriptor*> _file_slot_descs; |
129 | | // col names from _file_slot_descs |
130 | | std::vector<std::string> _file_col_names; |
131 | | |
132 | | // Partition source slot descriptors |
133 | | std::vector<SlotDescriptor*> _partition_slot_descs; |
134 | | // Partition slot id to index in _partition_slot_descs |
135 | | std::unordered_map<SlotId, int> _partition_slot_index_map; |
136 | | // created from param.expr_of_dest_slot |
137 | | // For query, it saves default value expr of all dest columns, or nullptr for NULL. |
138 | | // For load, it saves conversion expr/default value of all dest columns. |
139 | | VExprContextSPtrs _dest_vexpr_ctx; |
140 | | // dest slot name to index in _dest_vexpr_ctx; |
141 | | std::unordered_map<std::string, int> _dest_slot_name_to_idx; |
142 | | // col name to default value expr |
143 | | std::unordered_map<std::string, VExprContextSPtr> _col_default_value_ctx; |
144 | | // the map values of dest slot id to src slot desc |
145 | | // if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr |
146 | | std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest; |
147 | | // dest slot desc index to src slot desc index |
148 | | std::unordered_map<int, int> _dest_slot_to_src_slot_index; |
149 | | |
150 | | std::unordered_map<std::string, uint32_t> _src_block_name_to_idx; |
151 | | |
152 | | // Get from GenericReader, save the existing columns in file to their type. |
153 | | std::unordered_map<std::string, DataTypePtr> _slot_lower_name_to_col_type; |
154 | | // Get from GenericReader, save columns that required by scan but not exist in file. |
155 | | // These columns will be filled by default value or null. |
156 | | std::unordered_set<std::string> _missing_cols; |
157 | | |
158 | | // The col lowercase name of source file to type of source file. |
159 | | std::map<std::string, DataTypePtr> _source_file_col_name_types; |
160 | | |
161 | | // For load task |
162 | | VExprContextSPtrs _pre_conjunct_ctxs; |
163 | | std::unique_ptr<RowDescriptor> _src_row_desc; |
164 | | std::unique_ptr<RowDescriptor> _dest_row_desc; |
165 | | // row desc for default exprs |
166 | | std::unique_ptr<RowDescriptor> _default_val_row_desc; |
167 | | // owned by scan node |
168 | | ShardedKVCache* _kv_cache = nullptr; |
169 | | |
170 | | std::set<TSlotId> _is_file_slot; |
171 | | bool _scanner_eof = false; |
172 | | int _rows = 0; |
173 | | int _num_of_columns_from_file; |
174 | | |
175 | | bool _src_block_mem_reuse = false; |
176 | | bool _strict_mode; |
177 | | |
178 | | bool _src_block_init = false; |
179 | | Block* _src_block_ptr = nullptr; |
180 | | Block _src_block; |
181 | | |
182 | | VExprContextSPtrs _push_down_conjuncts; |
183 | | VExprContextSPtrs _runtime_filter_partition_prune_ctxs; |
184 | | Block _runtime_filter_partition_prune_block; |
185 | | |
186 | | std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics; |
187 | | std::unique_ptr<io::FileReaderStats> _file_reader_stats; |
188 | | std::unique_ptr<io::IOContext> _io_ctx; |
189 | | |
190 | | // Whether to fill partition columns from path, default is true. |
191 | | bool _fill_partition_from_path = true; |
192 | | std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>> |
193 | | _partition_col_descs; |
194 | | std::unordered_map<std::string, bool> _partition_value_is_null; |
195 | | std::unordered_map<std::string, VExprContextSPtr> _missing_col_descs; |
196 | | |
197 | | // idx of skip_bitmap_col in _input_tuple_desc |
198 | | int32_t _skip_bitmap_col_idx {-1}; |
199 | | int32_t _sequence_map_col_uid {-1}; |
200 | | int32_t _sequence_col_uid {-1}; |
201 | | |
202 | | private: |
203 | | RuntimeProfile::Counter* _get_block_timer = nullptr; |
204 | | RuntimeProfile::Counter* _cast_to_input_block_timer = nullptr; |
205 | | RuntimeProfile::Counter* _fill_missing_columns_timer = nullptr; |
206 | | RuntimeProfile::Counter* _pre_filter_timer = nullptr; |
207 | | RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr; |
208 | | RuntimeProfile::Counter* _runtime_filter_partition_prune_timer = nullptr; |
209 | | RuntimeProfile::Counter* _empty_file_counter = nullptr; |
210 | | RuntimeProfile::Counter* _not_found_file_counter = nullptr; |
211 | | RuntimeProfile::Counter* _fully_skipped_file_counter = nullptr; |
212 | | RuntimeProfile::Counter* _file_counter = nullptr; |
213 | | RuntimeProfile::Counter* _file_read_bytes_counter = nullptr; |
214 | | RuntimeProfile::Counter* _file_read_calls_counter = nullptr; |
215 | | RuntimeProfile::Counter* _file_read_time_counter = nullptr; |
216 | | RuntimeProfile::Counter* _runtime_filter_partition_pruned_range_counter = nullptr; |
217 | | |
218 | | const std::unordered_map<std::string, int>* _col_name_to_slot_id = nullptr; |
219 | | // single slot filter conjuncts |
220 | | std::unordered_map<int, VExprContextSPtrs> _slot_id_to_filter_conjuncts; |
221 | | // not single(zero or multi) slot filter conjuncts |
222 | | VExprContextSPtrs _not_single_slot_filter_conjuncts; |
223 | | // save the path of current scan range |
224 | | std::string _current_range_path = ""; |
225 | | |
226 | | // Only for load scan node. |
227 | | const TupleDescriptor* _input_tuple_desc = nullptr; |
228 | | // If _input_tuple_desc is set, |
229 | | // the _real_tuple_desc will point to _input_tuple_desc, |
230 | | // otherwise, point to _output_tuple_desc |
231 | | const TupleDescriptor* _real_tuple_desc = nullptr; |
232 | | |
233 | | std::pair<std::shared_ptr<RowIdColumnIteratorV2>, int> _row_id_column_iterator_pair = {nullptr, |
234 | | -1}; |
235 | | bool _need_iceberg_rowid_column = false; |
236 | | int _iceberg_rowid_column_pos = -1; |
237 | | // for iceberg row lineage |
238 | | RowLineageColumns _row_lineage_columns; |
239 | | int64_t _last_bytes_read_from_local = 0; |
240 | | int64_t _last_bytes_read_from_remote = 0; |
241 | | |
242 | | // Condition cache for external tables |
243 | | uint64_t _condition_cache_digest = 0; |
244 | | segment_v2::ConditionCache::ExternalCacheKey _condition_cache_key; |
245 | | std::shared_ptr<std::vector<bool>> _condition_cache; |
246 | | std::shared_ptr<ConditionCacheContext> _condition_cache_ctx; |
247 | | int64_t _condition_cache_hit_count = 0; |
248 | | |
249 | | Status _init_expr_ctxes(); |
250 | | Status _init_src_block(Block* block); |
251 | | Status _check_output_block_types(); |
252 | | Status _cast_to_input_block(Block* block); |
253 | | Status _fill_columns_from_path(size_t rows); |
254 | | Status _fill_missing_columns(size_t rows); |
255 | | Status _pre_filter_src_block(); |
256 | | Status _convert_to_output_block(Block* block); |
257 | | Status _truncate_char_or_varchar_columns(Block* block); |
258 | | void _truncate_char_or_varchar_column(Block* block, int idx, int len); |
259 | | Status _generate_partition_columns(); |
260 | | Status _generate_missing_columns(); |
261 | | bool _check_partition_prune_expr(const VExprSPtr& expr); |
262 | | void _init_runtime_filter_partition_prune_ctxs(); |
263 | | void _init_runtime_filter_partition_prune_block(); |
264 | | Status _process_runtime_filters_partition_prune(bool& is_partition_pruned); |
265 | | Status _process_conjuncts(); |
266 | | Status _process_late_arrival_conjuncts(); |
267 | | void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids); |
268 | | Status _generate_truncate_columns(bool need_to_get_parsed_schema); |
269 | | Status _set_fill_or_truncate_columns(bool need_to_get_parsed_schema); |
270 | | Status _init_orc_reader(std::unique_ptr<OrcReader>&& orc_reader, |
271 | | FileMetaCache* file_meta_cache_ptr); |
272 | | Status _init_parquet_reader(std::unique_ptr<ParquetReader>&& parquet_reader, |
273 | | FileMetaCache* file_meta_cache_ptr); |
274 | | Status _create_row_id_column_iterator(); |
275 | | |
276 | 73.2k | TFileFormatType::type _get_current_format_type() { |
277 | | // for compatibility, if format_type is not set in range, use the format type of params |
278 | 73.2k | const TFileRangeDesc& range = _current_range; |
279 | 73.2k | return range.__isset.format_type ? range.format_type : _params->format_type; |
280 | 73.2k | }; |
281 | | |
282 | 88.5k | Status _init_io_ctx() { |
283 | 88.5k | _io_ctx.reset(new io::IOContext()); |
284 | 88.5k | _io_ctx->query_id = &_state->query_id(); |
285 | 88.5k | return Status::OK(); |
286 | 88.5k | }; |
287 | | |
288 | 0 | void _reset_counter() { |
289 | 0 | _counter.num_rows_unselected = 0; |
290 | 0 | _counter.num_rows_filtered = 0; |
291 | 0 | } |
292 | | |
293 | | bool _should_enable_condition_cache(); |
294 | | void _init_reader_condition_cache(); |
295 | | void _finalize_reader_condition_cache(); |
296 | | |
297 | 326k | TPushAggOp::type _get_push_down_agg_type() { |
298 | 326k | return _local_state == nullptr ? TPushAggOp::type::NONE |
299 | 326k | : _local_state->get_push_down_agg_type(); |
300 | 326k | } |
301 | | |
302 | | // enable the file meta cache only when |
303 | | // 1. max_external_file_meta_cache_num is > 0 |
304 | | // 2. the file number is less than 1/3 of cache's capacibility |
305 | | // Otherwise, the cache miss rate will be high |
306 | 64.9k | bool _should_enable_file_meta_cache() { |
307 | 64.9k | return ExecEnv::GetInstance()->file_meta_cache()->enabled() && |
308 | 64.9k | _split_source->num_scan_ranges() < config::max_external_file_meta_cache_num / 3; |
309 | 64.9k | } |
310 | | }; |
311 | | } // namespace doris |