be/src/exec/scan/parallel_scanner_builder.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/scan/parallel_scanner_builder.h" |
19 | | |
20 | | #include <cstddef> |
21 | | |
22 | | #include "cloud/cloud_storage_engine.h" |
23 | | #include "cloud/cloud_tablet_hotspot.h" |
24 | | #include "cloud/config.h" |
25 | | #include "common/status.h" |
26 | | #include "exec/operator/olap_scan_operator.h" |
27 | | #include "exec/scan/olap_scanner.h" |
28 | | #include "storage/rowset/beta_rowset.h" |
29 | | #include "storage/segment/segment_loader.h" |
30 | | #include "storage/tablet/base_tablet.h" |
31 | | |
32 | | namespace doris { |
33 | | |
34 | 128k | Status ParallelScannerBuilder::build_scanners(std::list<ScannerSPtr>& scanners) { |
35 | 128k | RETURN_IF_ERROR(_load()); |
36 | 128k | if (_scan_parallelism_by_per_segment) { |
37 | 38 | return _build_scanners_by_per_segment(scanners); |
38 | 130k | } else if (_is_dup_mow_key) { |
39 | | // Default strategy for DUP/MOW tables: split by rowids within segments |
40 | 130k | return _build_scanners_by_rowid(scanners); |
41 | 18.4E | } else { |
42 | | // TODO: support to split by key range |
43 | 18.4E | return Status::NotSupported("split by key range not supported yet."); |
44 | 18.4E | } |
45 | 128k | } |
46 | | |
47 | 130k | Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list<ScannerSPtr>& scanners) { |
48 | 130k | DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner); |
49 | | |
50 | 559k | for (auto&& [tablet, version] : _tablets) { |
51 | 559k | DCHECK(_all_read_sources.contains(tablet->tablet_id())); |
52 | 559k | auto& entire_read_source = _all_read_sources[tablet->tablet_id()]; |
53 | | |
54 | 559k | if (config::is_cloud_mode()) { |
55 | | // FIXME(plat1ko): Avoid pointer cast |
56 | 556k | ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet); |
57 | 556k | } |
58 | | |
59 | | // `rs_splits` in `entire read source` will be devided into several partitial read sources |
60 | | // to build several parallel scanners, based on segment rows number. All the partitial read sources |
61 | | // share the same delete predicates from their corresponding entire read source. |
62 | 559k | TabletReadSource partitial_read_source; |
63 | 559k | int64_t rows_collected = 0; |
64 | 1.94M | for (auto& rs_split : entire_read_source.rs_splits) { |
65 | 1.94M | auto reader = rs_split.rs_reader; |
66 | 1.94M | auto rowset = reader->rowset(); |
67 | 1.94M | const auto rowset_id = rowset->rowset_id(); |
68 | | |
69 | 1.94M | const auto& segments_rows = _all_segments_rows[rowset_id]; |
70 | | |
71 | 1.94M | if (rowset->num_rows() == 0) { |
72 | 1.24M | continue; |
73 | 1.24M | } |
74 | | |
75 | 700k | int64_t segment_start = 0; |
76 | 700k | auto split = RowSetSplits(reader->clone()); |
77 | | |
78 | 1.40M | for (size_t i = 0; i != segments_rows.size(); ++i) { |
79 | 700k | const size_t rows_of_segment = segments_rows[i]; |
80 | 700k | RowRanges row_ranges; |
81 | 700k | int64_t offset_in_segment = 0; |
82 | | |
83 | | // try to split large segments into RowRanges |
84 | 1.46M | while (offset_in_segment < rows_of_segment) { |
85 | 759k | const int64_t remaining_rows = rows_of_segment - offset_in_segment; |
86 | 759k | auto rows_need = _rows_per_scanner - rows_collected; |
87 | | |
88 | | // 0.9: try to avoid splitting the segments into excessively small parts. |
89 | 759k | if (rows_need >= remaining_rows * 9 / 10) { |
90 | 700k | rows_need = remaining_rows; |
91 | 700k | } |
92 | 759k | DCHECK_LE(rows_need, remaining_rows); |
93 | | |
94 | | // RowRange stands for range: [From, To), From is inclusive, To is exclusive. |
95 | 759k | row_ranges.add({offset_in_segment, |
96 | 759k | offset_in_segment + static_cast<int64_t>(rows_need)}); |
97 | 759k | rows_collected += rows_need; |
98 | 759k | offset_in_segment += rows_need; |
99 | | |
100 | | // If collected enough rows, build a new scanner |
101 | 759k | if (rows_collected >= _rows_per_scanner) { |
102 | 64.8k | split.segment_offsets.first = segment_start, |
103 | 64.8k | split.segment_offsets.second = i + 1; |
104 | 64.8k | split.segment_row_ranges.emplace_back(std::move(row_ranges)); |
105 | | |
106 | 64.8k | DCHECK_EQ(split.segment_offsets.second - split.segment_offsets.first, |
107 | 64.8k | split.segment_row_ranges.size()); |
108 | | |
109 | 64.8k | partitial_read_source.rs_splits.emplace_back(std::move(split)); |
110 | | |
111 | 64.8k | scanners.emplace_back(_build_scanner( |
112 | 64.8k | tablet, version, _key_ranges, |
113 | 64.8k | {.rs_splits = std::move(partitial_read_source.rs_splits), |
114 | 64.8k | .delete_predicates = entire_read_source.delete_predicates, |
115 | 64.8k | .delete_bitmap = entire_read_source.delete_bitmap})); |
116 | | |
117 | 64.8k | partitial_read_source = {}; |
118 | 64.8k | split = RowSetSplits(reader->clone()); |
119 | 64.8k | row_ranges = RowRanges(); |
120 | | |
121 | 64.8k | segment_start = offset_in_segment < rows_of_segment ? i : i + 1; |
122 | 64.8k | rows_collected = 0; |
123 | 64.8k | } |
124 | 759k | } |
125 | | |
126 | | // The non-empty `row_ranges` means there are some rows left in this segment not added into `split`. |
127 | 700k | if (!row_ranges.is_empty()) { |
128 | 694k | DCHECK_GT(rows_collected, 0); |
129 | 694k | DCHECK_EQ(row_ranges.to(), rows_of_segment); |
130 | 694k | split.segment_row_ranges.emplace_back(std::move(row_ranges)); |
131 | 694k | } |
132 | 700k | } |
133 | | |
134 | 700k | DCHECK_LE(rows_collected, _rows_per_scanner); |
135 | 700k | if (rows_collected > 0) { |
136 | 694k | split.segment_offsets.first = segment_start; |
137 | 694k | split.segment_offsets.second = segments_rows.size(); |
138 | 694k | DCHECK_GT(split.segment_offsets.second, split.segment_offsets.first); |
139 | 694k | DCHECK_EQ(split.segment_row_ranges.size(), |
140 | 694k | split.segment_offsets.second - split.segment_offsets.first); |
141 | 694k | partitial_read_source.rs_splits.emplace_back(std::move(split)); |
142 | 694k | } |
143 | 700k | } // end `for (auto& rowset : rowsets)` |
144 | | |
145 | 559k | DCHECK_LE(rows_collected, _rows_per_scanner); |
146 | 559k | if (rows_collected > 0) { |
147 | 296k | DCHECK_GT(partitial_read_source.rs_splits.size(), 0); |
148 | 296k | #ifndef NDEBUG |
149 | 601k | for (auto& split : partitial_read_source.rs_splits) { |
150 | 601k | DCHECK(split.rs_reader != nullptr); |
151 | 601k | DCHECK_LT(split.segment_offsets.first, split.segment_offsets.second); |
152 | 601k | DCHECK_EQ(split.segment_row_ranges.size(), |
153 | 601k | split.segment_offsets.second - split.segment_offsets.first); |
154 | 601k | } |
155 | 296k | #endif |
156 | 296k | scanners.emplace_back( |
157 | 296k | _build_scanner(tablet, version, _key_ranges, |
158 | 296k | {.rs_splits = std::move(partitial_read_source.rs_splits), |
159 | 296k | .delete_predicates = entire_read_source.delete_predicates, |
160 | 296k | .delete_bitmap = entire_read_source.delete_bitmap})); |
161 | 296k | } |
162 | 559k | } |
163 | | |
164 | 130k | return Status::OK(); |
165 | 130k | } |
166 | | |
167 | | // Build scanners so that each segment is exclusively scanned by a single scanner. |
168 | | // This guarantees the number of scanners equals the number of segments across all rowsets |
169 | | // for the involved tablets. It preserves delete predicates and key ranges, and clones |
170 | | // RowsetReader per scanner to avoid sharing between scanners. |
171 | 38 | Status ParallelScannerBuilder::_build_scanners_by_per_segment(std::list<ScannerSPtr>& scanners) { |
172 | 38 | DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner); |
173 | | |
174 | 92 | for (auto&& [tablet, version] : _tablets) { |
175 | 92 | DCHECK(_all_read_sources.contains(tablet->tablet_id())); |
176 | 92 | auto& entire_read_source = _all_read_sources[tablet->tablet_id()]; |
177 | | |
178 | 92 | if (config::is_cloud_mode()) { |
179 | | // FIXME(plat1ko): Avoid pointer cast |
180 | 92 | ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet); |
181 | 92 | } |
182 | | |
183 | | // For each RowSet split in the read source, split by segment id and build |
184 | | // one scanner per segment. Keep delete predicates shared. |
185 | 184 | for (auto& rs_split : entire_read_source.rs_splits) { |
186 | 184 | auto reader = rs_split.rs_reader; |
187 | 184 | auto rowset = reader->rowset(); |
188 | 184 | const auto rowset_id = rowset->rowset_id(); |
189 | 184 | const auto& segments_rows = _all_segments_rows[rowset_id]; |
190 | 184 | if (segments_rows.empty() || rowset->num_rows() == 0) { |
191 | 133 | continue; |
192 | 133 | } |
193 | | |
194 | | // Build scanners for [i, i+1) segment range, without row-range slicing. |
195 | 102 | for (int64_t i = 0; i < rowset->num_segments(); ++i) { |
196 | 51 | RowSetSplits split(reader->clone()); |
197 | 51 | split.segment_offsets.first = i; |
198 | 51 | split.segment_offsets.second = i + 1; |
199 | | // No row-ranges slicing; scan whole segment i. |
200 | 51 | DCHECK_GE(split.segment_offsets.second, split.segment_offsets.first + 1); |
201 | | |
202 | 51 | TabletReadSource partitial_read_source; |
203 | 51 | partitial_read_source.rs_splits.emplace_back(std::move(split)); |
204 | | |
205 | 51 | scanners.emplace_back( |
206 | 51 | _build_scanner(tablet, version, _key_ranges, |
207 | 51 | {.rs_splits = std::move(partitial_read_source.rs_splits), |
208 | 51 | .delete_predicates = entire_read_source.delete_predicates, |
209 | 51 | .delete_bitmap = entire_read_source.delete_bitmap})); |
210 | 51 | } |
211 | 51 | } |
212 | 92 | } |
213 | | |
214 | 38 | return Status::OK(); |
215 | 38 | } |
216 | | |
217 | | /** |
218 | | * Load rowsets of each tablet with specified version, segments of each rowset. |
219 | | */ |
220 | 129k | Status ParallelScannerBuilder::_load() { |
221 | 129k | _total_rows = 0; |
222 | 129k | size_t idx = 0; |
223 | 129k | bool enable_segment_cache = _state->query_options().__isset.enable_segment_cache |
224 | 129k | ? _state->query_options().enable_segment_cache |
225 | 129k | : true; |
226 | 557k | for (auto&& [tablet, version] : _tablets) { |
227 | 557k | const auto tablet_id = tablet->tablet_id(); |
228 | 557k | _all_read_sources[tablet_id] = _read_sources[idx]; |
229 | 557k | const auto& read_source = _all_read_sources[tablet_id]; |
230 | 1.93M | for (auto& rs_split : read_source.rs_splits) { |
231 | 1.93M | auto rowset = rs_split.rs_reader->rowset(); |
232 | 1.93M | RETURN_IF_ERROR(rowset->load()); |
233 | 1.93M | const auto rowset_id = rowset->rowset_id(); |
234 | | |
235 | 1.93M | auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset); |
236 | 1.93M | std::vector<uint32_t> segment_rows; |
237 | 1.93M | RETURN_IF_ERROR(beta_rowset->get_segment_num_rows(&segment_rows, enable_segment_cache, |
238 | 1.93M | &_builder_stats)); |
239 | 1.93M | auto segment_count = rowset->num_segments(); |
240 | 2.63M | for (int64_t i = 0; i != segment_count; i++) { |
241 | 697k | _all_segments_rows[rowset_id].emplace_back(segment_rows[i]); |
242 | 697k | } |
243 | 1.93M | _total_rows += rowset->num_rows(); |
244 | 1.93M | } |
245 | 557k | idx++; |
246 | 557k | } |
247 | | |
248 | 129k | _rows_per_scanner = _total_rows / _max_scanners_count; |
249 | 129k | _rows_per_scanner = std::max<size_t>(_rows_per_scanner, _min_rows_per_scanner); |
250 | | |
251 | 129k | return Status::OK(); |
252 | 129k | } |
253 | | |
254 | | std::shared_ptr<OlapScanner> ParallelScannerBuilder::_build_scanner( |
255 | | BaseTabletSPtr tablet, int64_t version, const std::vector<OlapScanRange*>& key_ranges, |
256 | 360k | TabletReadSource&& read_source) { |
257 | 360k | OlapScanner::Params params {_state, _scanner_profile.get(), key_ranges, std::move(tablet), |
258 | 360k | version, std::move(read_source), _limit, _is_preaggregation}; |
259 | 360k | return OlapScanner::create_shared(_parent, std::move(params)); |
260 | 360k | } |
261 | | |
262 | | } // namespace doris |