be/src/exec/scan/file_scanner.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <stddef.h> |
21 | | #include <stdint.h> |
22 | | |
23 | | #include <memory> |
24 | | #include <string> |
25 | | #include <unordered_map> |
26 | | #include <unordered_set> |
27 | | #include <vector> |
28 | | |
29 | | #include "common/factory_creator.h" |
30 | | #include "common/global_types.h" |
31 | | #include "common/status.h" |
32 | | #include "core/block/block.h" |
33 | | #include "exec/operator/file_scan_operator.h" |
34 | | #include "exprs/vexpr_fwd.h" |
35 | | #include "format/generic_reader.h" |
36 | | #include "format/orc/vorc_reader.h" |
37 | | #include "format/parquet/vparquet_reader.h" |
38 | | #include "io/io_common.h" |
39 | | #include "runtime/descriptors.h" |
40 | | #include "runtime/runtime_profile.h" |
41 | | #include "storage/olap_scan_common.h" |
42 | | |
43 | | namespace doris { |
44 | | class RuntimeState; |
45 | | class TFileRangeDesc; |
46 | | class TFileScanRange; |
47 | | class TFileScanRangeParams; |
48 | | |
49 | | class ShardedKVCache; |
50 | | class VExpr; |
51 | | class VExprContext; |
52 | | } // namespace doris |
53 | | |
54 | | namespace doris { |
55 | | |
56 | | class FileScanner : public Scanner { |
57 | | ENABLE_FACTORY_CREATOR(FileScanner); |
58 | | |
59 | | public: |
60 | | static constexpr const char* NAME = "FileScanner"; |
61 | | |
62 | | // sub profile name (for parquet/orc) |
63 | | static const std::string FileReadBytesProfile; |
64 | | static const std::string FileReadTimeProfile; |
65 | | |
66 | | FileScanner(RuntimeState* state, FileScanLocalState* parent, int64_t limit, |
67 | | std::shared_ptr<SplitSourceConnector> split_source, RuntimeProfile* profile, |
68 | | ShardedKVCache* kv_cache, |
69 | | const std::unordered_map<std::string, int>* colname_to_slot_id); |
70 | | |
71 | | Status _open_impl(RuntimeState* state) override; |
72 | | |
73 | | Status close(RuntimeState* state) override; |
74 | | |
75 | | void try_stop() override; |
76 | | |
77 | | Status init(RuntimeState* state, const VExprContextSPtrs& conjuncts) override; |
78 | | |
79 | 0 | std::string get_name() override { return FileScanner::NAME; } |
80 | | |
81 | 53 | std::string get_current_scan_range_name() override { return _current_range_path; } |
82 | | |
83 | | //only used for read one line. |
84 | | FileScanner(RuntimeState* state, RuntimeProfile* profile, const TFileScanRangeParams* params, |
85 | | const std::unordered_map<std::string, int>* colname_to_slot_id, |
86 | | TupleDescriptor* tuple_desc) |
87 | 4.17k | : Scanner(state, profile), |
88 | 4.17k | _params(params), |
89 | 4.17k | _col_name_to_slot_id(colname_to_slot_id), |
90 | 4.17k | _real_tuple_desc(tuple_desc) {}; |
91 | | |
92 | | Status read_lines_from_range(const TFileRangeDesc& range, const std::list<int64_t>& row_ids, |
93 | | Block* result_block, const ExternalFileMappingInfo& external_info, |
94 | | int64_t* init_reader_ms, int64_t* get_block_ms); |
95 | | |
96 | | Status prepare_for_read_lines(const TFileRangeDesc& range); |
97 | | |
98 | | void update_realtime_counters() override; |
99 | | |
100 | | protected: |
101 | | Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override; |
102 | | |
103 | | Status _get_block_wrapped(RuntimeState* state, Block* block, bool* eof); |
104 | | |
105 | | Status _get_next_reader(); |
106 | | |
107 | | // TODO: cast input block columns type to string. |
108 | 0 | Status _cast_src_block(Block* block) { return Status::OK(); } |
109 | | |
110 | | void _collect_profile_before_close() override; |
111 | | |
112 | | // fe will add skip_bitmap_col to _input_tuple_desc iff the target olaptable has skip_bitmap_col |
113 | | // and the current load is a flexible partial update |
114 | 344 | bool _should_process_skip_bitmap_col() const { return _skip_bitmap_col_idx != -1; } |
115 | | |
116 | | protected: |
117 | | const TFileScanRangeParams* _params = nullptr; |
118 | | std::shared_ptr<SplitSourceConnector> _split_source; |
119 | | bool _first_scan_range = false; |
120 | | TFileRangeDesc _current_range; |
121 | | |
122 | | std::unique_ptr<GenericReader> _cur_reader; |
123 | | bool _cur_reader_eof = false; |
124 | | // File source slot descriptors |
125 | | std::vector<SlotDescriptor*> _file_slot_descs; |
126 | | // col names from _file_slot_descs |
127 | | std::vector<std::string> _file_col_names; |
128 | | |
129 | | // Partition source slot descriptors |
130 | | std::vector<SlotDescriptor*> _partition_slot_descs; |
131 | | // Partition slot id to index in _partition_slot_descs |
132 | | std::unordered_map<SlotId, int> _partition_slot_index_map; |
133 | | // created from param.expr_of_dest_slot |
134 | | // For query, it saves default value expr of all dest columns, or nullptr for NULL. |
135 | | // For load, it saves conversion expr/default value of all dest columns. |
136 | | VExprContextSPtrs _dest_vexpr_ctx; |
137 | | // dest slot name to index in _dest_vexpr_ctx; |
138 | | std::unordered_map<std::string, int> _dest_slot_name_to_idx; |
139 | | // col name to default value expr |
140 | | std::unordered_map<std::string, VExprContextSPtr> _col_default_value_ctx; |
141 | | // the map values of dest slot id to src slot desc |
142 | | // if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr |
143 | | std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest; |
144 | | // dest slot desc index to src slot desc index |
145 | | std::unordered_map<int, int> _dest_slot_to_src_slot_index; |
146 | | |
147 | | std::unordered_map<std::string, uint32_t> _src_block_name_to_idx; |
148 | | |
149 | | // Get from GenericReader, save the existing columns in file to their type. |
150 | | std::unordered_map<std::string, DataTypePtr> _slot_lower_name_to_col_type; |
151 | | // Get from GenericReader, save columns that required by scan but not exist in file. |
152 | | // These columns will be filled by default value or null. |
153 | | std::unordered_set<std::string> _missing_cols; |
154 | | |
155 | | // The col lowercase name of source file to type of source file. |
156 | | std::map<std::string, DataTypePtr> _source_file_col_name_types; |
157 | | |
158 | | // For load task |
159 | | VExprContextSPtrs _pre_conjunct_ctxs; |
160 | | std::unique_ptr<RowDescriptor> _src_row_desc; |
161 | | std::unique_ptr<RowDescriptor> _dest_row_desc; |
162 | | // row desc for default exprs |
163 | | std::unique_ptr<RowDescriptor> _default_val_row_desc; |
164 | | // owned by scan node |
165 | | ShardedKVCache* _kv_cache = nullptr; |
166 | | |
167 | | std::set<TSlotId> _is_file_slot; |
168 | | bool _scanner_eof = false; |
169 | | int _rows = 0; |
170 | | int _num_of_columns_from_file; |
171 | | |
172 | | bool _src_block_mem_reuse = false; |
173 | | bool _strict_mode; |
174 | | |
175 | | bool _src_block_init = false; |
176 | | Block* _src_block_ptr = nullptr; |
177 | | Block _src_block; |
178 | | |
179 | | VExprContextSPtrs _push_down_conjuncts; |
180 | | VExprContextSPtrs _runtime_filter_partition_prune_ctxs; |
181 | | Block _runtime_filter_partition_prune_block; |
182 | | |
183 | | std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics; |
184 | | std::unique_ptr<io::FileReaderStats> _file_reader_stats; |
185 | | std::unique_ptr<io::IOContext> _io_ctx; |
186 | | |
187 | | // Whether to fill partition columns from path, default is true. |
188 | | bool _fill_partition_from_path = true; |
189 | | std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>> |
190 | | _partition_col_descs; |
191 | | std::unordered_map<std::string, bool> _partition_value_is_null; |
192 | | std::unordered_map<std::string, VExprContextSPtr> _missing_col_descs; |
193 | | |
194 | | // idx of skip_bitmap_col in _input_tuple_desc |
195 | | int32_t _skip_bitmap_col_idx {-1}; |
196 | | int32_t _sequence_map_col_uid {-1}; |
197 | | int32_t _sequence_col_uid {-1}; |
198 | | |
199 | | private: |
200 | | RuntimeProfile::Counter* _get_block_timer = nullptr; |
201 | | RuntimeProfile::Counter* _cast_to_input_block_timer = nullptr; |
202 | | RuntimeProfile::Counter* _fill_missing_columns_timer = nullptr; |
203 | | RuntimeProfile::Counter* _pre_filter_timer = nullptr; |
204 | | RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr; |
205 | | RuntimeProfile::Counter* _runtime_filter_partition_prune_timer = nullptr; |
206 | | RuntimeProfile::Counter* _empty_file_counter = nullptr; |
207 | | RuntimeProfile::Counter* _not_found_file_counter = nullptr; |
208 | | RuntimeProfile::Counter* _fully_skipped_file_counter = nullptr; |
209 | | RuntimeProfile::Counter* _file_counter = nullptr; |
210 | | RuntimeProfile::Counter* _file_read_bytes_counter = nullptr; |
211 | | RuntimeProfile::Counter* _file_read_calls_counter = nullptr; |
212 | | RuntimeProfile::Counter* _file_read_time_counter = nullptr; |
213 | | RuntimeProfile::Counter* _runtime_filter_partition_pruned_range_counter = nullptr; |
214 | | |
215 | | const std::unordered_map<std::string, int>* _col_name_to_slot_id = nullptr; |
216 | | // single slot filter conjuncts |
217 | | std::unordered_map<int, VExprContextSPtrs> _slot_id_to_filter_conjuncts; |
218 | | // not single(zero or multi) slot filter conjuncts |
219 | | VExprContextSPtrs _not_single_slot_filter_conjuncts; |
220 | | // save the path of current scan range |
221 | | std::string _current_range_path = ""; |
222 | | |
223 | | // Only for load scan node. |
224 | | const TupleDescriptor* _input_tuple_desc = nullptr; |
225 | | // If _input_tuple_desc is set, |
226 | | // the _real_tuple_desc will point to _input_tuple_desc, |
227 | | // otherwise, point to _output_tuple_desc |
228 | | const TupleDescriptor* _real_tuple_desc = nullptr; |
229 | | |
230 | | std::pair<std::shared_ptr<RowIdColumnIteratorV2>, int> _row_id_column_iterator_pair = {nullptr, |
231 | | -1}; |
232 | | int64_t _last_bytes_read_from_local = 0; |
233 | | int64_t _last_bytes_read_from_remote = 0; |
234 | | |
235 | | private: |
236 | | Status _init_expr_ctxes(); |
237 | | Status _init_src_block(Block* block); |
238 | | Status _check_output_block_types(); |
239 | | Status _cast_to_input_block(Block* block); |
240 | | Status _fill_columns_from_path(size_t rows); |
241 | | Status _fill_missing_columns(size_t rows); |
242 | | Status _pre_filter_src_block(); |
243 | | Status _convert_to_output_block(Block* block); |
244 | | Status _truncate_char_or_varchar_columns(Block* block); |
245 | | void _truncate_char_or_varchar_column(Block* block, int idx, int len); |
246 | | Status _generate_partition_columns(); |
247 | | Status _generate_missing_columns(); |
248 | | bool _check_partition_prune_expr(const VExprSPtr& expr); |
249 | | void _init_runtime_filter_partition_prune_ctxs(); |
250 | | void _init_runtime_filter_partition_prune_block(); |
251 | | Status _process_runtime_filters_partition_prune(bool& is_partition_pruned); |
252 | | Status _process_conjuncts(); |
253 | | Status _process_late_arrival_conjuncts(); |
254 | | void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids); |
255 | | Status _generate_truncate_columns(bool need_to_get_parsed_schema); |
256 | | Status _set_fill_or_truncate_columns(bool need_to_get_parsed_schema); |
257 | | Status _init_orc_reader(std::unique_ptr<OrcReader>&& orc_reader, |
258 | | FileMetaCache* file_meta_cache_ptr); |
259 | | Status _init_parquet_reader(std::unique_ptr<ParquetReader>&& parquet_reader, |
260 | | FileMetaCache* file_meta_cache_ptr); |
261 | | Status _create_row_id_column_iterator(); |
262 | | |
263 | 78.6k | TFileFormatType::type _get_current_format_type() { |
264 | | // for compatibility, if format_type is not set in range, use the format type of params |
265 | 78.6k | const TFileRangeDesc& range = _current_range; |
266 | 78.6k | return range.__isset.format_type ? range.format_type : _params->format_type; |
267 | 78.6k | }; |
268 | | |
269 | 93.6k | Status _init_io_ctx() { |
270 | 93.6k | _io_ctx.reset(new io::IOContext()); |
271 | 93.6k | _io_ctx->query_id = &_state->query_id(); |
272 | 93.6k | return Status::OK(); |
273 | 93.6k | }; |
274 | | |
275 | 0 | void _reset_counter() { |
276 | 0 | _counter.num_rows_unselected = 0; |
277 | 0 | _counter.num_rows_filtered = 0; |
278 | 0 | } |
279 | | |
280 | 344k | TPushAggOp::type _get_push_down_agg_type() { |
281 | 344k | return _local_state == nullptr ? TPushAggOp::type::NONE |
282 | 344k | : _local_state->get_push_down_agg_type(); |
283 | 344k | } |
284 | | |
285 | | // enable the file meta cache only when |
286 | | // 1. max_external_file_meta_cache_num is > 0 |
287 | | // 2. the file number is less than 1/3 of cache's capacibility |
288 | | // Otherwise, the cache miss rate will be high |
289 | 77.3k | bool _should_enable_file_meta_cache() { |
290 | 77.3k | return ExecEnv::GetInstance()->file_meta_cache()->enabled() && |
291 | 77.3k | _split_source->num_scan_ranges() < config::max_external_file_meta_cache_num / 3; |
292 | 77.3k | } |
293 | | }; |
294 | | } // namespace doris |