be/src/exec/scan/parallel_scanner_builder.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/scan/parallel_scanner_builder.h" |
19 | | |
20 | | #include <cstddef> |
21 | | |
22 | | #include "cloud/cloud_storage_engine.h" |
23 | | #include "cloud/cloud_tablet_hotspot.h" |
24 | | #include "cloud/config.h" |
25 | | #include "common/status.h" |
26 | | #include "exec/operator/olap_scan_operator.h" |
27 | | #include "exec/scan/olap_scanner.h" |
28 | | #include "storage/rowset/beta_rowset.h" |
29 | | #include "storage/segment/segment_loader.h" |
30 | | #include "storage/tablet/base_tablet.h" |
31 | | |
32 | | namespace doris { |
33 | | |
34 | | namespace { |
35 | | |
36 | | io::FileCacheStatistics take_initial_file_cache_stats( |
37 | 326k | std::unordered_map<int64_t, io::FileCacheStatistics>* preload_stats, int64_t tablet_id) { |
38 | 326k | auto it = preload_stats->find(tablet_id); |
39 | 326k | if (it == preload_stats->end()) { |
40 | 53.5k | return {}; |
41 | 53.5k | } |
42 | 272k | auto stats = std::move(it->second); |
43 | 272k | preload_stats->erase(it); |
44 | 272k | return stats; |
45 | 326k | } |
46 | | |
47 | | } // namespace |
48 | | |
49 | 103k | Status ParallelScannerBuilder::build_scanners(std::list<ScannerSPtr>& scanners) { |
50 | 103k | RETURN_IF_ERROR(_load()); |
51 | 103k | if (_scan_parallelism_by_per_segment) { |
52 | 22 | return _build_scanners_by_per_segment(scanners); |
53 | 104k | } else if (_is_dup_mow_key) { |
54 | | // Default strategy for DUP/MOW tables: split by rowids within segments |
55 | 104k | return _build_scanners_by_rowid(scanners); |
56 | 18.4E | } else { |
57 | | // TODO: support to split by key range |
58 | 18.4E | return Status::NotSupported("split by key range not supported yet."); |
59 | 18.4E | } |
60 | 103k | } |
61 | | |
62 | 104k | Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list<ScannerSPtr>& scanners) { |
63 | 104k | DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner); |
64 | | |
65 | 461k | for (auto&& [tablet, version] : _tablets) { |
66 | 461k | DCHECK(_all_read_sources.contains(tablet->tablet_id())); |
67 | 461k | auto& entire_read_source = _all_read_sources[tablet->tablet_id()]; |
68 | | |
69 | 461k | if (config::is_cloud_mode()) { |
70 | | // FIXME(plat1ko): Avoid pointer cast |
71 | 458k | ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet); |
72 | 458k | } |
73 | | |
74 | | // `rs_splits` in `entire read source` will be devided into several partitial read sources |
75 | | // to build several parallel scanners, based on segment rows number. All the partitial read sources |
76 | | // share the same delete predicates from their corresponding entire read source. |
77 | 461k | TabletReadSource partitial_read_source; |
78 | 461k | int64_t rows_collected = 0; |
79 | 2.17M | for (auto& rs_split : entire_read_source.rs_splits) { |
80 | 2.17M | auto reader = rs_split.rs_reader; |
81 | 2.17M | auto rowset = reader->rowset(); |
82 | 2.17M | const auto rowset_id = rowset->rowset_id(); |
83 | | |
84 | 2.17M | const auto& segments_rows = _all_segments_rows[rowset_id]; |
85 | | |
86 | 2.17M | if (rowset->num_rows() == 0) { |
87 | 1.49M | continue; |
88 | 1.49M | } |
89 | | |
90 | 679k | int64_t segment_start = 0; |
91 | 679k | auto split = RowSetSplits(reader->clone()); |
92 | | |
93 | 1.35M | for (size_t i = 0; i != segments_rows.size(); ++i) { |
94 | 678k | const size_t rows_of_segment = segments_rows[i]; |
95 | 678k | RowRanges row_ranges; |
96 | 678k | int64_t offset_in_segment = 0; |
97 | | |
98 | | // try to split large segments into RowRanges |
99 | 1.40M | while (offset_in_segment < rows_of_segment) { |
100 | 730k | const int64_t remaining_rows = rows_of_segment - offset_in_segment; |
101 | 730k | auto rows_need = _rows_per_scanner - rows_collected; |
102 | | |
103 | | // 0.9: try to avoid splitting the segments into excessively small parts. |
104 | 730k | if (rows_need >= remaining_rows * 9 / 10) { |
105 | 677k | rows_need = remaining_rows; |
106 | 677k | } |
107 | 730k | DCHECK_LE(rows_need, remaining_rows); |
108 | | |
109 | | // RowRange stands for range: [From, To), From is inclusive, To is exclusive. |
110 | 730k | row_ranges.add({offset_in_segment, |
111 | 730k | offset_in_segment + static_cast<int64_t>(rows_need)}); |
112 | 730k | rows_collected += rows_need; |
113 | 730k | offset_in_segment += rows_need; |
114 | | |
115 | | // If collected enough rows, build a new scanner |
116 | 730k | if (rows_collected >= _rows_per_scanner) { |
117 | 54.0k | split.segment_offsets.first = segment_start, |
118 | 54.0k | split.segment_offsets.second = i + 1; |
119 | 54.0k | split.segment_row_ranges.emplace_back(std::move(row_ranges)); |
120 | | |
121 | 54.0k | DCHECK_EQ(split.segment_offsets.second - split.segment_offsets.first, |
122 | 54.0k | split.segment_row_ranges.size()); |
123 | | |
124 | 54.0k | partitial_read_source.rs_splits.emplace_back(std::move(split)); |
125 | | |
126 | 54.0k | scanners.emplace_back(_build_scanner( |
127 | 54.0k | tablet, version, _key_ranges, |
128 | 54.0k | {.rs_splits = std::move(partitial_read_source.rs_splits), |
129 | 54.0k | .delete_predicates = entire_read_source.delete_predicates, |
130 | 54.0k | .delete_bitmap = entire_read_source.delete_bitmap}, |
131 | 54.0k | take_initial_file_cache_stats(&_tablet_preload_file_cache_stats, |
132 | 54.0k | tablet->tablet_id()))); |
133 | | |
134 | 54.0k | partitial_read_source = {}; |
135 | 54.0k | split = RowSetSplits(reader->clone()); |
136 | 54.0k | row_ranges = RowRanges(); |
137 | | |
138 | 54.0k | segment_start = offset_in_segment < rows_of_segment ? i : i + 1; |
139 | 54.0k | rows_collected = 0; |
140 | 54.0k | } |
141 | 730k | } |
142 | | |
143 | | // The non-empty `row_ranges` means there are some rows left in this segment not added into `split`. |
144 | 678k | if (!row_ranges.is_empty()) { |
145 | 675k | DCHECK_GT(rows_collected, 0); |
146 | 675k | DCHECK_EQ(row_ranges.to(), rows_of_segment); |
147 | 675k | split.segment_row_ranges.emplace_back(std::move(row_ranges)); |
148 | 675k | } |
149 | 678k | } |
150 | | |
151 | 679k | DCHECK_LE(rows_collected, _rows_per_scanner); |
152 | 679k | if (rows_collected > 0) { |
153 | 675k | split.segment_offsets.first = segment_start; |
154 | 675k | split.segment_offsets.second = segments_rows.size(); |
155 | 675k | DCHECK_GT(split.segment_offsets.second, split.segment_offsets.first); |
156 | 675k | DCHECK_EQ(split.segment_row_ranges.size(), |
157 | 675k | split.segment_offsets.second - split.segment_offsets.first); |
158 | 675k | partitial_read_source.rs_splits.emplace_back(std::move(split)); |
159 | 675k | } |
160 | 679k | } // end `for (auto& rowset : rowsets)` |
161 | | |
162 | 461k | DCHECK_LE(rows_collected, _rows_per_scanner); |
163 | 461k | if (rows_collected > 0) { |
164 | 272k | DCHECK_GT(partitial_read_source.rs_splits.size(), 0); |
165 | 272k | #ifndef NDEBUG |
166 | 595k | for (auto& split : partitial_read_source.rs_splits) { |
167 | 595k | DCHECK(split.rs_reader != nullptr); |
168 | 595k | DCHECK_LT(split.segment_offsets.first, split.segment_offsets.second); |
169 | 595k | DCHECK_EQ(split.segment_row_ranges.size(), |
170 | 595k | split.segment_offsets.second - split.segment_offsets.first); |
171 | 595k | } |
172 | 272k | #endif |
173 | 272k | scanners.emplace_back( |
174 | 272k | _build_scanner(tablet, version, _key_ranges, |
175 | 272k | {.rs_splits = std::move(partitial_read_source.rs_splits), |
176 | 272k | .delete_predicates = entire_read_source.delete_predicates, |
177 | 272k | .delete_bitmap = entire_read_source.delete_bitmap}, |
178 | 272k | take_initial_file_cache_stats(&_tablet_preload_file_cache_stats, |
179 | 272k | tablet->tablet_id()))); |
180 | 272k | } |
181 | 461k | } |
182 | | |
183 | 104k | return Status::OK(); |
184 | 104k | } |
185 | | |
186 | | // Build scanners so that each segment is exclusively scanned by a single scanner. |
187 | | // This guarantees the number of scanners equals the number of segments across all rowsets |
188 | | // for the involved tablets. It preserves delete predicates and key ranges, and clones |
189 | | // RowsetReader per scanner to avoid sharing between scanners. |
190 | 22 | Status ParallelScannerBuilder::_build_scanners_by_per_segment(std::list<ScannerSPtr>& scanners) { |
191 | 22 | DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner); |
192 | | |
193 | 120 | for (auto&& [tablet, version] : _tablets) { |
194 | 120 | DCHECK(_all_read_sources.contains(tablet->tablet_id())); |
195 | 120 | auto& entire_read_source = _all_read_sources[tablet->tablet_id()]; |
196 | | |
197 | 120 | if (config::is_cloud_mode()) { |
198 | | // FIXME(plat1ko): Avoid pointer cast |
199 | 120 | ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet); |
200 | 120 | } |
201 | | |
202 | | // For each RowSet split in the read source, split by segment id and build |
203 | | // one scanner per segment. Keep delete predicates shared. |
204 | 240 | for (auto& rs_split : entire_read_source.rs_splits) { |
205 | 240 | auto reader = rs_split.rs_reader; |
206 | 240 | auto rowset = reader->rowset(); |
207 | 240 | const auto rowset_id = rowset->rowset_id(); |
208 | 240 | const auto& segments_rows = _all_segments_rows[rowset_id]; |
209 | 240 | if (segments_rows.empty() || rowset->num_rows() == 0) { |
210 | 191 | continue; |
211 | 191 | } |
212 | | |
213 | | // Build scanners for [i, i+1) segment range, without row-range slicing. |
214 | 98 | for (int64_t i = 0; i < rowset->num_segments(); ++i) { |
215 | 49 | RowSetSplits split(reader->clone()); |
216 | 49 | split.segment_offsets.first = i; |
217 | 49 | split.segment_offsets.second = i + 1; |
218 | | // No row-ranges slicing; scan whole segment i. |
219 | 49 | DCHECK_GE(split.segment_offsets.second, split.segment_offsets.first + 1); |
220 | | |
221 | 49 | TabletReadSource partitial_read_source; |
222 | 49 | partitial_read_source.rs_splits.emplace_back(std::move(split)); |
223 | | |
224 | 49 | scanners.emplace_back(_build_scanner( |
225 | 49 | tablet, version, _key_ranges, |
226 | 49 | {.rs_splits = std::move(partitial_read_source.rs_splits), |
227 | 49 | .delete_predicates = entire_read_source.delete_predicates, |
228 | 49 | .delete_bitmap = entire_read_source.delete_bitmap}, |
229 | 49 | take_initial_file_cache_stats(&_tablet_preload_file_cache_stats, |
230 | 49 | tablet->tablet_id()))); |
231 | 49 | } |
232 | 49 | } |
233 | 120 | } |
234 | | |
235 | 22 | return Status::OK(); |
236 | 22 | } |
237 | | |
238 | | /** |
239 | | * Load rowsets of each tablet with specified version, segments of each rowset. |
240 | | */ |
241 | 102k | Status ParallelScannerBuilder::_load() { |
242 | 102k | _total_rows = 0; |
243 | 102k | size_t idx = 0; |
244 | 102k | bool enable_segment_cache = _state->query_options().__isset.enable_segment_cache |
245 | 103k | ? _state->query_options().enable_segment_cache |
246 | 18.4E | : true; |
247 | 459k | for (auto&& [tablet, version] : _tablets) { |
248 | 459k | const auto tablet_id = tablet->tablet_id(); |
249 | 459k | _all_read_sources[tablet_id] = _read_sources[idx]; |
250 | 459k | const auto& read_source = _all_read_sources[tablet_id]; |
251 | 2.16M | for (auto& rs_split : read_source.rs_splits) { |
252 | 2.16M | auto rowset = rs_split.rs_reader->rowset(); |
253 | 2.16M | RETURN_IF_ERROR(rowset->load()); |
254 | 2.16M | const auto rowset_id = rowset->rowset_id(); |
255 | | |
256 | 2.16M | auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset); |
257 | 2.16M | std::vector<uint32_t> segment_rows; |
258 | 2.16M | OlapReaderStatistics preload_stats; |
259 | 2.16M | RETURN_IF_ERROR(beta_rowset->get_segment_num_rows(&segment_rows, enable_segment_cache, |
260 | 2.16M | &preload_stats)); |
261 | 2.16M | _tablet_preload_file_cache_stats[tablet_id].merge_from(preload_stats.file_cache_stats); |
262 | 2.16M | auto segment_count = rowset->num_segments(); |
263 | 2.84M | for (int64_t i = 0; i != segment_count; i++) { |
264 | 675k | _all_segments_rows[rowset_id].emplace_back(segment_rows[i]); |
265 | 675k | } |
266 | 2.16M | _total_rows += rowset->num_rows(); |
267 | 2.16M | } |
268 | 459k | idx++; |
269 | 459k | } |
270 | | |
271 | 102k | _rows_per_scanner = _total_rows / _max_scanners_count; |
272 | 102k | _rows_per_scanner = std::max<size_t>(_rows_per_scanner, _min_rows_per_scanner); |
273 | | |
274 | 102k | return Status::OK(); |
275 | 102k | } |
276 | | |
277 | | std::shared_ptr<OlapScanner> ParallelScannerBuilder::_build_scanner( |
278 | | BaseTabletSPtr tablet, int64_t version, const std::vector<OlapScanRange*>& key_ranges, |
279 | 325k | TabletReadSource&& read_source, io::FileCacheStatistics&& initial_file_cache_stats) { |
280 | 325k | OlapScanner::Params params { |
281 | 325k | .state = _state, |
282 | 325k | .profile = _scanner_profile.get(), |
283 | 325k | .key_ranges = key_ranges, |
284 | 325k | .tablet = std::move(tablet), |
285 | 325k | .version = version, |
286 | 325k | .read_source = std::move(read_source), |
287 | 325k | .initial_file_cache_stats = std::move(initial_file_cache_stats), |
288 | 325k | .limit = _limit, |
289 | 325k | .aggregation = _is_preaggregation, |
290 | 325k | .read_row_binlog = false, |
291 | 325k | .binlog_scan_type = TBinlogScanType::NONE, |
292 | 325k | .start_tso = std::nullopt, |
293 | 325k | .end_tso = std::nullopt, |
294 | 325k | }; |
295 | 325k | return OlapScanner::create_shared(_parent, std::move(params)); |
296 | 325k | } |
297 | | |
298 | | } // namespace doris |