be/src/exec/scan/file_scanner_v2.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <map> |
21 | | #include <memory> |
22 | | #include <optional> |
23 | | #include <string> |
24 | | #include <unordered_map> |
25 | | #include <vector> |
26 | | |
27 | | #include "common/factory_creator.h" |
28 | | #include "common/status.h" |
29 | | #include "core/block/block.h" |
30 | | #include "exec/operator/file_scan_operator.h" |
31 | | #include "exec/scan/scanner.h" |
32 | | #include "exec/scan/split_source_connector.h" |
33 | | #include "exprs/vexpr_fwd.h" |
34 | | #include "format_v2/column_mapper.h" |
35 | | #include "format_v2/table_reader.h" |
36 | | #include "gen_cpp/Descriptors_types.h" |
37 | | #include "gen_cpp/PlanNodes_types.h" |
38 | | #include "io/io_common.h" |
39 | | #include "runtime/runtime_profile.h" |
40 | | #include "storage/segment/adaptive_block_size_predictor.h" |
41 | | |
42 | | namespace doris { |
43 | | |
44 | | class RuntimeState; |
45 | | class SlotDescriptor; |
46 | | class TFileRangeDesc; |
47 | | class TFileScanRangeParams; |
48 | | class ShardedKVCache; |
49 | | |
50 | | class FileScannerV2 final : public Scanner { |
51 | | ENABLE_FACTORY_CREATOR(FileScannerV2); |
52 | | |
53 | | public: |
54 | | static constexpr const char* NAME = "FileScannerV2"; |
55 | | static constexpr size_t ADAPTIVE_BATCH_INITIAL_PROBE_ROWS = 32; |
56 | | |
57 | | static bool is_supported(const TFileScanRangeParams& params, const TFileRangeDesc& range); |
58 | | #ifdef BE_TEST |
59 | | static Status TEST_to_file_format(TFileFormatType::type format_type, |
60 | | format::FileFormat* file_format); |
61 | | static bool TEST_is_partition_slot(const TFileScanSlotInfo& slot_info, |
62 | | const std::string& column_name); |
63 | | static bool TEST_is_data_file_slot(const TFileScanSlotInfo& slot_info, |
64 | | const std::string& column_name); |
65 | | static Status TEST_rewrite_slot_refs_to_global_index( |
66 | | VExprSPtr* expr, |
67 | | const std::unordered_map<int32_t, format::GlobalIndex>& slot_id_to_global_index); |
68 | | #endif |
69 | | |
70 | | FileScannerV2(RuntimeState* state, FileScanLocalState* parent, int64_t limit, |
71 | | std::shared_ptr<SplitSourceConnector> split_source, RuntimeProfile* profile, |
72 | | ShardedKVCache* kv_cache, |
73 | | const std::unordered_map<std::string, int>* colname_to_slot_id); |
74 | | |
75 | | Status init(RuntimeState* state, const VExprContextSPtrs& conjuncts) override; |
76 | | Status _open_impl(RuntimeState* state) override; |
77 | | Status close(RuntimeState* state) override; |
78 | | void try_stop() override; |
79 | 0 | std::string get_name() override { return FileScannerV2::NAME; } |
80 | 0 | std::string get_current_scan_range_name() override { return _current_range_path; } |
81 | | void update_realtime_counters() override; |
82 | | |
83 | | protected: |
84 | | Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override; |
85 | | void _collect_profile_before_close() override; |
86 | | bool _should_update_load_counters() const override; |
87 | | |
88 | | private: |
89 | | TFileFormatType::type _get_current_format_type() const; |
90 | | Status _init_io_ctx(); |
91 | | Status _init_expr_ctxes(); |
92 | | Status _prepare_next_split(bool* eos); |
93 | | Status _init_table_reader(const TFileRangeDesc& range); |
94 | | Status _create_table_reader_for_format(const TFileRangeDesc& range, |
95 | | std::unique_ptr<format::TableReader>* reader) const; |
96 | | Status _prepare_table_reader_split(const TFileRangeDesc& range); |
97 | | bool _should_enable_file_meta_cache() const; |
98 | | std::optional<format::GlobalRowIdContext> _create_global_rowid_context( |
99 | | const TFileRangeDesc& range) const; |
100 | | Status _generate_partition_values(const TFileRangeDesc& range, |
101 | | std::map<std::string, Field>* partition_values) const; |
102 | | Status _parse_partition_value(const SlotDescriptor* slot_desc, const std::string& value, |
103 | | bool is_null, Field* field) const; |
104 | | Status _build_projected_columns(const format::TableReader& table_reader); |
105 | | Status _build_default_expr(const TFileScanSlotInfo& slot_info, VExprContextSPtr* ctx) const; |
106 | | static format::ColumnDefinition _build_table_column(const SlotDescriptor* slot_desc); |
107 | | Status _build_table_column_predicates(format::TableColumnPredicates* predicates) const; |
108 | | Status _build_table_conjuncts(VExprContextSPtrs* conjuncts) const; |
109 | | static Status _to_file_format(TFileFormatType::type format_type, |
110 | | format::FileFormat* file_format); |
111 | | void _reset_adaptive_batch_size_state(); |
112 | | void _init_adaptive_batch_size_state(TFileFormatType::type format_type); |
113 | | bool _should_enable_adaptive_batch_size(TFileFormatType::type format_type) const; |
114 | | bool _should_run_adaptive_batch_size() const; |
115 | | size_t _predict_reader_batch_rows(); |
116 | | void _update_adaptive_batch_size(const Block& block); |
117 | | void _report_file_reader_predicate_filtered_rows(); |
118 | | void _report_condition_cache_profile(); |
119 | | |
120 | | struct PartitionSlotInfo { |
121 | | const SlotDescriptor* slot_desc = nullptr; |
122 | | std::string canonical_name; |
123 | | }; |
124 | | |
125 | | const TFileScanRangeParams* _params = nullptr; |
126 | | std::shared_ptr<SplitSourceConnector> _split_source; |
127 | | bool _first_scan_range = false; |
128 | | bool _has_prepared_split = false; |
129 | | TFileRangeDesc _current_range; |
130 | | std::string _current_range_path; |
131 | | |
132 | | std::unique_ptr<format::TableReader> _table_reader; |
133 | | std::vector<format::ColumnDefinition> _projected_columns; |
134 | | // File formats without embedded schema, such as CSV, still need the FE slot descriptors in |
135 | | // file-column order. This mirrors old FileScanner::_file_slot_descs and is passed only to |
136 | | // readers that cannot derive their schema from file metadata. |
137 | | std::vector<SlotDescriptor*> _file_slot_descs; |
138 | | bool _need_global_rowid_column = false; |
139 | | std::unordered_map<int32_t, const SlotDescriptor*> _slot_id_to_desc; |
140 | | std::unordered_map<int32_t, format::GlobalIndex> _slot_id_to_global_index; |
141 | | std::unordered_map<std::string, PartitionSlotInfo> _partition_slot_descs; |
142 | | |
143 | | std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics; |
144 | | std::unique_ptr<io::FileReaderStats> _file_reader_stats; |
145 | | std::shared_ptr<io::IOContext> _io_ctx; |
146 | | ShardedKVCache* _kv_cache = nullptr; |
147 | | |
148 | | RuntimeProfile::Counter* _get_block_timer = nullptr; |
149 | | RuntimeProfile::Counter* _file_counter = nullptr; |
150 | | RuntimeProfile::Counter* _file_read_bytes_counter = nullptr; |
151 | | RuntimeProfile::Counter* _file_read_calls_counter = nullptr; |
152 | | RuntimeProfile::Counter* _file_read_time_counter = nullptr; |
153 | | RuntimeProfile::Counter* _adaptive_batch_predicted_rows_counter = nullptr; |
154 | | RuntimeProfile::Counter* _adaptive_batch_actual_bytes_counter = nullptr; |
155 | | RuntimeProfile::Counter* _adaptive_batch_probe_count_counter = nullptr; |
156 | | std::unique_ptr<AdaptiveBlockSizePredictor> _block_size_predictor; |
157 | | int64_t _reported_predicate_filtered_rows = 0; |
158 | | int64_t _reported_condition_cache_hit_count = 0; |
159 | | int64_t _reported_condition_cache_filtered_rows = 0; |
160 | | }; |
161 | | |
162 | | } // namespace doris |