Coverage Report

Created: 2026-04-14 15:19

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/parallel_scanner_builder.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/parallel_scanner_builder.h"
19
20
#include <cstddef>
21
22
#include "cloud/cloud_storage_engine.h"
23
#include "cloud/cloud_tablet_hotspot.h"
24
#include "cloud/config.h"
25
#include "common/status.h"
26
#include "exec/operator/olap_scan_operator.h"
27
#include "exec/scan/olap_scanner.h"
28
#include "storage/rowset/beta_rowset.h"
29
#include "storage/segment/segment_loader.h"
30
#include "storage/tablet/base_tablet.h"
31
32
namespace doris {
33
34
163k
Status ParallelScannerBuilder::build_scanners(std::list<ScannerSPtr>& scanners) {
35
163k
    RETURN_IF_ERROR(_load());
36
163k
    if (_scan_parallelism_by_per_segment) {
37
26
        return _build_scanners_by_per_segment(scanners);
38
165k
    } else if (_is_dup_mow_key) {
39
        // Default strategy for DUP/MOW tables: split by rowids within segments
40
165k
        return _build_scanners_by_rowid(scanners);
41
18.4E
    } else {
42
        // TODO: support to split by key range
43
18.4E
        return Status::NotSupported("split by key range not supported yet.");
44
18.4E
    }
45
163k
}
46
47
165k
Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list<ScannerSPtr>& scanners) {
48
165k
    DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner);
49
50
803k
    for (auto&& [tablet, version] : _tablets) {
51
803k
        DCHECK(_all_read_sources.contains(tablet->tablet_id()));
52
803k
        auto& entire_read_source = _all_read_sources[tablet->tablet_id()];
53
54
803k
        if (config::is_cloud_mode()) {
55
            // FIXME(plat1ko): Avoid pointer cast
56
473k
            ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet);
57
473k
        }
58
59
        // `rs_splits` in `entire read source` will be devided into several partitial read sources
60
        // to build several parallel scanners, based on segment rows number. All the partitial read sources
61
        // share the same delete predicates from their corresponding entire read source.
62
803k
        TabletReadSource partitial_read_source;
63
803k
        int64_t rows_collected = 0;
64
3.38M
        for (auto& rs_split : entire_read_source.rs_splits) {
65
3.38M
            auto reader = rs_split.rs_reader;
66
3.38M
            auto rowset = reader->rowset();
67
3.38M
            const auto rowset_id = rowset->rowset_id();
68
69
3.38M
            const auto& segments_rows = _all_segments_rows[rowset_id];
70
71
3.38M
            if (rowset->num_rows() == 0) {
72
2.19M
                continue;
73
2.19M
            }
74
75
1.19M
            int64_t segment_start = 0;
76
1.19M
            auto split = RowSetSplits(reader->clone());
77
78
2.39M
            for (size_t i = 0; i != segments_rows.size(); ++i) {
79
1.19M
                const size_t rows_of_segment = segments_rows[i];
80
1.19M
                RowRanges row_ranges;
81
1.19M
                int64_t offset_in_segment = 0;
82
83
                // try to split large segments into RowRanges
84
2.44M
                while (offset_in_segment < rows_of_segment) {
85
1.25M
                    const int64_t remaining_rows = rows_of_segment - offset_in_segment;
86
1.25M
                    auto rows_need = _rows_per_scanner - rows_collected;
87
88
                    // 0.9: try to avoid splitting the segments into excessively small parts.
89
1.25M
                    if (rows_need >= remaining_rows * 9 / 10) {
90
1.19M
                        rows_need = remaining_rows;
91
1.19M
                    }
92
1.25M
                    DCHECK_LE(rows_need, remaining_rows);
93
94
                    // RowRange stands for range: [From, To), From is inclusive, To is exclusive.
95
1.25M
                    row_ranges.add({offset_in_segment,
96
1.25M
                                    offset_in_segment + static_cast<int64_t>(rows_need)});
97
1.25M
                    rows_collected += rows_need;
98
1.25M
                    offset_in_segment += rows_need;
99
100
                    // If collected enough rows, build a new scanner
101
1.25M
                    if (rows_collected >= _rows_per_scanner) {
102
53.9k
                        split.segment_offsets.first = segment_start,
103
53.9k
                        split.segment_offsets.second = i + 1;
104
53.9k
                        split.segment_row_ranges.emplace_back(std::move(row_ranges));
105
106
53.9k
                        DCHECK_EQ(split.segment_offsets.second - split.segment_offsets.first,
107
53.9k
                                  split.segment_row_ranges.size());
108
109
53.9k
                        partitial_read_source.rs_splits.emplace_back(std::move(split));
110
111
53.9k
                        scanners.emplace_back(_build_scanner(
112
53.9k
                                tablet, version, _key_ranges,
113
53.9k
                                {.rs_splits = std::move(partitial_read_source.rs_splits),
114
53.9k
                                 .delete_predicates = entire_read_source.delete_predicates,
115
53.9k
                                 .delete_bitmap = entire_read_source.delete_bitmap}));
116
117
53.9k
                        partitial_read_source = {};
118
53.9k
                        split = RowSetSplits(reader->clone());
119
53.9k
                        row_ranges = RowRanges();
120
121
53.9k
                        segment_start = offset_in_segment < rows_of_segment ? i : i + 1;
122
53.9k
                        rows_collected = 0;
123
53.9k
                    }
124
1.25M
                }
125
126
                // The non-empty `row_ranges` means there are some rows left in this segment not added into `split`.
127
1.19M
                if (!row_ranges.is_empty()) {
128
1.19M
                    DCHECK_GT(rows_collected, 0);
129
1.19M
                    DCHECK_EQ(row_ranges.to(), rows_of_segment);
130
1.19M
                    split.segment_row_ranges.emplace_back(std::move(row_ranges));
131
1.19M
                }
132
1.19M
            }
133
134
1.19M
            DCHECK_LE(rows_collected, _rows_per_scanner);
135
1.19M
            if (rows_collected > 0) {
136
1.19M
                split.segment_offsets.first = segment_start;
137
1.19M
                split.segment_offsets.second = segments_rows.size();
138
1.19M
                DCHECK_GT(split.segment_offsets.second, split.segment_offsets.first);
139
1.19M
                DCHECK_EQ(split.segment_row_ranges.size(),
140
1.19M
                          split.segment_offsets.second - split.segment_offsets.first);
141
1.19M
                partitial_read_source.rs_splits.emplace_back(std::move(split));
142
1.19M
            }
143
1.19M
        } // end `for (auto& rowset : rowsets)`
144
145
803k
        DCHECK_LE(rows_collected, _rows_per_scanner);
146
803k
        if (rows_collected > 0) {
147
579k
            DCHECK_GT(partitial_read_source.rs_splits.size(), 0);
148
579k
#ifndef NDEBUG
149
1.11M
            for (auto& split : partitial_read_source.rs_splits) {
150
1.11M
                DCHECK(split.rs_reader != nullptr);
151
1.11M
                DCHECK_LT(split.segment_offsets.first, split.segment_offsets.second);
152
1.11M
                DCHECK_EQ(split.segment_row_ranges.size(),
153
1.11M
                          split.segment_offsets.second - split.segment_offsets.first);
154
1.11M
            }
155
579k
#endif
156
579k
            scanners.emplace_back(
157
579k
                    _build_scanner(tablet, version, _key_ranges,
158
579k
                                   {.rs_splits = std::move(partitial_read_source.rs_splits),
159
579k
                                    .delete_predicates = entire_read_source.delete_predicates,
160
579k
                                    .delete_bitmap = entire_read_source.delete_bitmap}));
161
579k
        }
162
803k
    }
163
164
165k
    return Status::OK();
165
165k
}
166
167
// Build scanners so that each segment is exclusively scanned by a single scanner.
168
// This guarantees the number of scanners equals the number of segments across all rowsets
169
// for the involved tablets. It preserves delete predicates and key ranges, and clones
170
// RowsetReader per scanner to avoid sharing between scanners.
171
26
Status ParallelScannerBuilder::_build_scanners_by_per_segment(std::list<ScannerSPtr>& scanners) {
172
26
    DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner);
173
174
71
    for (auto&& [tablet, version] : _tablets) {
175
71
        DCHECK(_all_read_sources.contains(tablet->tablet_id()));
176
71
        auto& entire_read_source = _all_read_sources[tablet->tablet_id()];
177
178
71
        if (config::is_cloud_mode()) {
179
            // FIXME(plat1ko): Avoid pointer cast
180
71
            ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet);
181
71
        }
182
183
        // For each RowSet split in the read source, split by segment id and build
184
        // one scanner per segment. Keep delete predicates shared.
185
145
        for (auto& rs_split : entire_read_source.rs_splits) {
186
145
            auto reader = rs_split.rs_reader;
187
145
            auto rowset = reader->rowset();
188
145
            const auto rowset_id = rowset->rowset_id();
189
145
            const auto& segments_rows = _all_segments_rows[rowset_id];
190
145
            if (segments_rows.empty() || rowset->num_rows() == 0) {
191
103
                continue;
192
103
            }
193
194
            // Build scanners for [i, i+1) segment range, without row-range slicing.
195
84
            for (int64_t i = 0; i < rowset->num_segments(); ++i) {
196
42
                RowSetSplits split(reader->clone());
197
42
                split.segment_offsets.first = i;
198
42
                split.segment_offsets.second = i + 1;
199
                // No row-ranges slicing; scan whole segment i.
200
42
                DCHECK_GE(split.segment_offsets.second, split.segment_offsets.first + 1);
201
202
42
                TabletReadSource partitial_read_source;
203
42
                partitial_read_source.rs_splits.emplace_back(std::move(split));
204
205
42
                scanners.emplace_back(
206
42
                        _build_scanner(tablet, version, _key_ranges,
207
42
                                       {.rs_splits = std::move(partitial_read_source.rs_splits),
208
42
                                        .delete_predicates = entire_read_source.delete_predicates,
209
42
                                        .delete_bitmap = entire_read_source.delete_bitmap}));
210
42
            }
211
42
        }
212
71
    }
213
214
26
    return Status::OK();
215
26
}
216
217
/**
218
 * Load rowsets of each tablet with specified version, segments of each rowset.
219
 */
220
163k
Status ParallelScannerBuilder::_load() {
221
163k
    _total_rows = 0;
222
163k
    size_t idx = 0;
223
163k
    bool enable_segment_cache = _state->query_options().__isset.enable_segment_cache
224
163k
                                        ? _state->query_options().enable_segment_cache
225
163k
                                        : true;
226
801k
    for (auto&& [tablet, version] : _tablets) {
227
801k
        const auto tablet_id = tablet->tablet_id();
228
801k
        _all_read_sources[tablet_id] = _read_sources[idx];
229
801k
        const auto& read_source = _all_read_sources[tablet_id];
230
3.38M
        for (auto& rs_split : read_source.rs_splits) {
231
3.38M
            auto rowset = rs_split.rs_reader->rowset();
232
3.38M
            RETURN_IF_ERROR(rowset->load());
233
3.38M
            const auto rowset_id = rowset->rowset_id();
234
235
3.38M
            auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
236
3.38M
            std::vector<uint32_t> segment_rows;
237
3.38M
            RETURN_IF_ERROR(beta_rowset->get_segment_num_rows(&segment_rows, enable_segment_cache,
238
3.38M
                                                              &_builder_stats));
239
3.38M
            auto segment_count = rowset->num_segments();
240
4.57M
            for (int64_t i = 0; i != segment_count; i++) {
241
1.19M
                _all_segments_rows[rowset_id].emplace_back(segment_rows[i]);
242
1.19M
            }
243
3.38M
            _total_rows += rowset->num_rows();
244
3.38M
        }
245
801k
        idx++;
246
801k
    }
247
248
163k
    _rows_per_scanner = _total_rows / _max_scanners_count;
249
163k
    _rows_per_scanner = std::max<size_t>(_rows_per_scanner, _min_rows_per_scanner);
250
251
163k
    return Status::OK();
252
163k
}
253
254
std::shared_ptr<OlapScanner> ParallelScannerBuilder::_build_scanner(
255
        BaseTabletSPtr tablet, int64_t version, const std::vector<OlapScanRange*>& key_ranges,
256
632k
        TabletReadSource&& read_source) {
257
632k
    OlapScanner::Params params {_state,  _scanner_profile.get(), key_ranges, std::move(tablet),
258
632k
                                version, std::move(read_source), _limit,     _is_preaggregation};
259
632k
    return OlapScanner::create_shared(_parent, std::move(params));
260
632k
}
261
262
} // namespace doris