be/src/exec/scan/parallel_scanner_builder.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/scan/parallel_scanner_builder.h" |
19 | | |
20 | | #include <cstddef> |
21 | | |
22 | | #include "cloud/cloud_storage_engine.h" |
23 | | #include "cloud/cloud_tablet_hotspot.h" |
24 | | #include "cloud/config.h" |
25 | | #include "common/status.h" |
26 | | #include "exec/operator/olap_scan_operator.h" |
27 | | #include "exec/scan/olap_scanner.h" |
28 | | #include "storage/rowset/beta_rowset.h" |
29 | | #include "storage/segment/segment_loader.h" |
30 | | #include "storage/tablet/base_tablet.h" |
31 | | |
32 | | namespace doris { |
33 | | |
34 | 0 | Status ParallelScannerBuilder::build_scanners(std::list<ScannerSPtr>& scanners) { |
35 | 0 | RETURN_IF_ERROR(_load()); |
36 | 0 | if (_scan_parallelism_by_per_segment) { |
37 | 0 | return _build_scanners_by_per_segment(scanners); |
38 | 0 | } else if (_is_dup_mow_key) { |
39 | | // Default strategy for DUP/MOW tables: split by rowids within segments |
40 | 0 | return _build_scanners_by_rowid(scanners); |
41 | 0 | } else { |
42 | | // TODO: support to split by key range |
43 | 0 | return Status::NotSupported("split by key range not supported yet."); |
44 | 0 | } |
45 | 0 | } |
46 | | |
47 | 0 | Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list<ScannerSPtr>& scanners) { |
48 | 0 | DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner); |
49 | |
|
50 | 0 | for (auto&& [tablet, version] : _tablets) { |
51 | 0 | DCHECK(_all_read_sources.contains(tablet->tablet_id())); |
52 | 0 | auto& entire_read_source = _all_read_sources[tablet->tablet_id()]; |
53 | |
|
54 | 0 | if (config::is_cloud_mode()) { |
55 | | // FIXME(plat1ko): Avoid pointer cast |
56 | 0 | ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet); |
57 | 0 | } |
58 | | |
59 | | // `rs_splits` in `entire read source` will be devided into several partitial read sources |
60 | | // to build several parallel scanners, based on segment rows number. All the partitial read sources |
61 | | // share the same delete predicates from their corresponding entire read source. |
62 | 0 | TabletReadSource partitial_read_source; |
63 | 0 | int64_t rows_collected = 0; |
64 | 0 | for (auto& rs_split : entire_read_source.rs_splits) { |
65 | 0 | auto reader = rs_split.rs_reader; |
66 | 0 | auto rowset = reader->rowset(); |
67 | 0 | const auto rowset_id = rowset->rowset_id(); |
68 | |
|
69 | 0 | const auto& segments_rows = _all_segments_rows[rowset_id]; |
70 | |
|
71 | 0 | if (rowset->num_rows() == 0) { |
72 | 0 | continue; |
73 | 0 | } |
74 | | |
75 | 0 | int64_t segment_start = 0; |
76 | 0 | auto split = RowSetSplits(reader->clone()); |
77 | |
|
78 | 0 | for (size_t i = 0; i != segments_rows.size(); ++i) { |
79 | 0 | const size_t rows_of_segment = segments_rows[i]; |
80 | 0 | RowRanges row_ranges; |
81 | 0 | int64_t offset_in_segment = 0; |
82 | | |
83 | | // try to split large segments into RowRanges |
84 | 0 | while (offset_in_segment < rows_of_segment) { |
85 | 0 | const int64_t remaining_rows = rows_of_segment - offset_in_segment; |
86 | 0 | auto rows_need = _rows_per_scanner - rows_collected; |
87 | | |
88 | | // 0.9: try to avoid splitting the segments into excessively small parts. |
89 | 0 | if (rows_need >= remaining_rows * 9 / 10) { |
90 | 0 | rows_need = remaining_rows; |
91 | 0 | } |
92 | 0 | DCHECK_LE(rows_need, remaining_rows); |
93 | | |
94 | | // RowRange stands for range: [From, To), From is inclusive, To is exclusive. |
95 | 0 | row_ranges.add({offset_in_segment, |
96 | 0 | offset_in_segment + static_cast<int64_t>(rows_need)}); |
97 | 0 | rows_collected += rows_need; |
98 | 0 | offset_in_segment += rows_need; |
99 | | |
100 | | // If collected enough rows, build a new scanner |
101 | 0 | if (rows_collected >= _rows_per_scanner) { |
102 | 0 | split.segment_offsets.first = segment_start, |
103 | 0 | split.segment_offsets.second = i + 1; |
104 | 0 | split.segment_row_ranges.emplace_back(std::move(row_ranges)); |
105 | |
|
106 | 0 | DCHECK_EQ(split.segment_offsets.second - split.segment_offsets.first, |
107 | 0 | split.segment_row_ranges.size()); |
108 | |
|
109 | 0 | partitial_read_source.rs_splits.emplace_back(std::move(split)); |
110 | |
|
111 | 0 | scanners.emplace_back(_build_scanner( |
112 | 0 | tablet, version, _key_ranges, |
113 | 0 | {.rs_splits = std::move(partitial_read_source.rs_splits), |
114 | 0 | .delete_predicates = entire_read_source.delete_predicates, |
115 | 0 | .delete_bitmap = entire_read_source.delete_bitmap})); |
116 | |
|
117 | 0 | partitial_read_source = {}; |
118 | 0 | split = RowSetSplits(reader->clone()); |
119 | 0 | row_ranges = RowRanges(); |
120 | |
|
121 | 0 | segment_start = offset_in_segment < rows_of_segment ? i : i + 1; |
122 | 0 | rows_collected = 0; |
123 | 0 | } |
124 | 0 | } |
125 | | |
126 | | // The non-empty `row_ranges` means there are some rows left in this segment not added into `split`. |
127 | 0 | if (!row_ranges.is_empty()) { |
128 | 0 | DCHECK_GT(rows_collected, 0); |
129 | 0 | DCHECK_EQ(row_ranges.to(), rows_of_segment); |
130 | 0 | split.segment_row_ranges.emplace_back(std::move(row_ranges)); |
131 | 0 | } |
132 | 0 | } |
133 | |
|
134 | 0 | DCHECK_LE(rows_collected, _rows_per_scanner); |
135 | 0 | if (rows_collected > 0) { |
136 | 0 | split.segment_offsets.first = segment_start; |
137 | 0 | split.segment_offsets.second = segments_rows.size(); |
138 | 0 | DCHECK_GT(split.segment_offsets.second, split.segment_offsets.first); |
139 | 0 | DCHECK_EQ(split.segment_row_ranges.size(), |
140 | 0 | split.segment_offsets.second - split.segment_offsets.first); |
141 | 0 | partitial_read_source.rs_splits.emplace_back(std::move(split)); |
142 | 0 | } |
143 | 0 | } // end `for (auto& rowset : rowsets)` |
144 | |
|
145 | 0 | DCHECK_LE(rows_collected, _rows_per_scanner); |
146 | 0 | if (rows_collected > 0) { |
147 | 0 | DCHECK_GT(partitial_read_source.rs_splits.size(), 0); |
148 | 0 | #ifndef NDEBUG |
149 | 0 | for (auto& split : partitial_read_source.rs_splits) { |
150 | 0 | DCHECK(split.rs_reader != nullptr); |
151 | 0 | DCHECK_LT(split.segment_offsets.first, split.segment_offsets.second); |
152 | 0 | DCHECK_EQ(split.segment_row_ranges.size(), |
153 | 0 | split.segment_offsets.second - split.segment_offsets.first); |
154 | 0 | } |
155 | 0 | #endif |
156 | 0 | scanners.emplace_back( |
157 | 0 | _build_scanner(tablet, version, _key_ranges, |
158 | 0 | {.rs_splits = std::move(partitial_read_source.rs_splits), |
159 | 0 | .delete_predicates = entire_read_source.delete_predicates, |
160 | 0 | .delete_bitmap = entire_read_source.delete_bitmap})); |
161 | 0 | } |
162 | 0 | } |
163 | |
|
164 | 0 | return Status::OK(); |
165 | 0 | } |
166 | | |
167 | | // Build scanners so that each segment is exclusively scanned by a single scanner. |
168 | | // This guarantees the number of scanners equals the number of segments across all rowsets |
169 | | // for the involved tablets. It preserves delete predicates and key ranges, and clones |
170 | | // RowsetReader per scanner to avoid sharing between scanners. |
171 | 0 | Status ParallelScannerBuilder::_build_scanners_by_per_segment(std::list<ScannerSPtr>& scanners) { |
172 | 0 | DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner); |
173 | |
|
174 | 0 | for (auto&& [tablet, version] : _tablets) { |
175 | 0 | DCHECK(_all_read_sources.contains(tablet->tablet_id())); |
176 | 0 | auto& entire_read_source = _all_read_sources[tablet->tablet_id()]; |
177 | |
|
178 | 0 | if (config::is_cloud_mode()) { |
179 | | // FIXME(plat1ko): Avoid pointer cast |
180 | 0 | ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet); |
181 | 0 | } |
182 | | |
183 | | // For each RowSet split in the read source, split by segment id and build |
184 | | // one scanner per segment. Keep delete predicates shared. |
185 | 0 | for (auto& rs_split : entire_read_source.rs_splits) { |
186 | 0 | auto reader = rs_split.rs_reader; |
187 | 0 | auto rowset = reader->rowset(); |
188 | 0 | const auto rowset_id = rowset->rowset_id(); |
189 | 0 | const auto& segments_rows = _all_segments_rows[rowset_id]; |
190 | 0 | if (segments_rows.empty() || rowset->num_rows() == 0) { |
191 | 0 | continue; |
192 | 0 | } |
193 | | |
194 | | // Build scanners for [i, i+1) segment range, without row-range slicing. |
195 | 0 | for (int64_t i = 0; i < rowset->num_segments(); ++i) { |
196 | 0 | RowSetSplits split(reader->clone()); |
197 | 0 | split.segment_offsets.first = i; |
198 | 0 | split.segment_offsets.second = i + 1; |
199 | | // No row-ranges slicing; scan whole segment i. |
200 | 0 | DCHECK_GE(split.segment_offsets.second, split.segment_offsets.first + 1); |
201 | |
|
202 | 0 | TabletReadSource partitial_read_source; |
203 | 0 | partitial_read_source.rs_splits.emplace_back(std::move(split)); |
204 | |
|
205 | 0 | scanners.emplace_back( |
206 | 0 | _build_scanner(tablet, version, _key_ranges, |
207 | 0 | {.rs_splits = std::move(partitial_read_source.rs_splits), |
208 | 0 | .delete_predicates = entire_read_source.delete_predicates, |
209 | 0 | .delete_bitmap = entire_read_source.delete_bitmap})); |
210 | 0 | } |
211 | 0 | } |
212 | 0 | } |
213 | |
|
214 | 0 | return Status::OK(); |
215 | 0 | } |
216 | | |
217 | | /** |
218 | | * Load rowsets of each tablet with specified version, segments of each rowset. |
219 | | */ |
220 | 0 | Status ParallelScannerBuilder::_load() { |
221 | 0 | _total_rows = 0; |
222 | 0 | size_t idx = 0; |
223 | 0 | bool enable_segment_cache = _state->query_options().__isset.enable_segment_cache |
224 | 0 | ? _state->query_options().enable_segment_cache |
225 | 0 | : true; |
226 | 0 | for (auto&& [tablet, version] : _tablets) { |
227 | 0 | const auto tablet_id = tablet->tablet_id(); |
228 | 0 | _all_read_sources[tablet_id] = _read_sources[idx]; |
229 | 0 | const auto& read_source = _all_read_sources[tablet_id]; |
230 | 0 | for (auto& rs_split : read_source.rs_splits) { |
231 | 0 | auto rowset = rs_split.rs_reader->rowset(); |
232 | 0 | RETURN_IF_ERROR(rowset->load()); |
233 | 0 | const auto rowset_id = rowset->rowset_id(); |
234 | |
|
235 | 0 | auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset); |
236 | 0 | std::vector<uint32_t> segment_rows; |
237 | 0 | RETURN_IF_ERROR(beta_rowset->get_segment_num_rows(&segment_rows, enable_segment_cache, |
238 | 0 | &_builder_stats)); |
239 | 0 | auto segment_count = rowset->num_segments(); |
240 | 0 | for (int64_t i = 0; i != segment_count; i++) { |
241 | 0 | _all_segments_rows[rowset_id].emplace_back(segment_rows[i]); |
242 | 0 | } |
243 | 0 | _total_rows += rowset->num_rows(); |
244 | 0 | } |
245 | 0 | idx++; |
246 | 0 | } |
247 | | |
248 | 0 | _rows_per_scanner = _total_rows / _max_scanners_count; |
249 | 0 | _rows_per_scanner = std::max<size_t>(_rows_per_scanner, _min_rows_per_scanner); |
250 | |
|
251 | 0 | return Status::OK(); |
252 | 0 | } |
253 | | |
254 | | std::shared_ptr<OlapScanner> ParallelScannerBuilder::_build_scanner( |
255 | | BaseTabletSPtr tablet, int64_t version, const std::vector<OlapScanRange*>& key_ranges, |
256 | 0 | TabletReadSource&& read_source) { |
257 | 0 | OlapScanner::Params params {_state, _scanner_profile.get(), key_ranges, std::move(tablet), |
258 | 0 | version, std::move(read_source), _limit, _is_preaggregation}; |
259 | 0 | return OlapScanner::create_shared(_parent, std::move(params)); |
260 | 0 | } |
261 | | |
262 | | } // namespace doris |