Coverage Report

Created: 2026-07-05 03:19

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/parallel_scanner_builder.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/parallel_scanner_builder.h"
19
20
#include <cstddef>
21
22
#include "cloud/cloud_storage_engine.h"
23
#include "cloud/cloud_tablet_hotspot.h"
24
#include "cloud/config.h"
25
#include "common/status.h"
26
#include "exec/operator/olap_scan_operator.h"
27
#include "exec/scan/olap_scanner.h"
28
#include "storage/rowset/beta_rowset.h"
29
#include "storage/segment/segment_loader.h"
30
#include "storage/tablet/base_tablet.h"
31
32
namespace doris {
33
34
namespace {
35
36
io::FileCacheStatistics take_initial_file_cache_stats(
37
326k
        std::unordered_map<int64_t, io::FileCacheStatistics>* preload_stats, int64_t tablet_id) {
38
326k
    auto it = preload_stats->find(tablet_id);
39
326k
    if (it == preload_stats->end()) {
40
53.5k
        return {};
41
53.5k
    }
42
272k
    auto stats = std::move(it->second);
43
272k
    preload_stats->erase(it);
44
272k
    return stats;
45
326k
}
46
47
} // namespace
48
49
103k
Status ParallelScannerBuilder::build_scanners(std::list<ScannerSPtr>& scanners) {
50
103k
    RETURN_IF_ERROR(_load());
51
103k
    if (_scan_parallelism_by_per_segment) {
52
22
        return _build_scanners_by_per_segment(scanners);
53
104k
    } else if (_is_dup_mow_key) {
54
        // Default strategy for DUP/MOW tables: split by rowids within segments
55
104k
        return _build_scanners_by_rowid(scanners);
56
18.4E
    } else {
57
        // TODO: support to split by key range
58
18.4E
        return Status::NotSupported("split by key range not supported yet.");
59
18.4E
    }
60
103k
}
61
62
104k
Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list<ScannerSPtr>& scanners) {
63
104k
    DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner);
64
65
461k
    for (auto&& [tablet, version] : _tablets) {
66
461k
        DCHECK(_all_read_sources.contains(tablet->tablet_id()));
67
461k
        auto& entire_read_source = _all_read_sources[tablet->tablet_id()];
68
69
461k
        if (config::is_cloud_mode()) {
70
            // FIXME(plat1ko): Avoid pointer cast
71
458k
            ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet);
72
458k
        }
73
74
        // `rs_splits` in `entire read source` will be devided into several partitial read sources
75
        // to build several parallel scanners, based on segment rows number. All the partitial read sources
76
        // share the same delete predicates from their corresponding entire read source.
77
461k
        TabletReadSource partitial_read_source;
78
461k
        int64_t rows_collected = 0;
79
2.17M
        for (auto& rs_split : entire_read_source.rs_splits) {
80
2.17M
            auto reader = rs_split.rs_reader;
81
2.17M
            auto rowset = reader->rowset();
82
2.17M
            const auto rowset_id = rowset->rowset_id();
83
84
2.17M
            const auto& segments_rows = _all_segments_rows[rowset_id];
85
86
2.17M
            if (rowset->num_rows() == 0) {
87
1.49M
                continue;
88
1.49M
            }
89
90
679k
            int64_t segment_start = 0;
91
679k
            auto split = RowSetSplits(reader->clone());
92
93
1.35M
            for (size_t i = 0; i != segments_rows.size(); ++i) {
94
678k
                const size_t rows_of_segment = segments_rows[i];
95
678k
                RowRanges row_ranges;
96
678k
                int64_t offset_in_segment = 0;
97
98
                // try to split large segments into RowRanges
99
1.40M
                while (offset_in_segment < rows_of_segment) {
100
730k
                    const int64_t remaining_rows = rows_of_segment - offset_in_segment;
101
730k
                    auto rows_need = _rows_per_scanner - rows_collected;
102
103
                    // 0.9: try to avoid splitting the segments into excessively small parts.
104
730k
                    if (rows_need >= remaining_rows * 9 / 10) {
105
677k
                        rows_need = remaining_rows;
106
677k
                    }
107
730k
                    DCHECK_LE(rows_need, remaining_rows);
108
109
                    // RowRange stands for range: [From, To), From is inclusive, To is exclusive.
110
730k
                    row_ranges.add({offset_in_segment,
111
730k
                                    offset_in_segment + static_cast<int64_t>(rows_need)});
112
730k
                    rows_collected += rows_need;
113
730k
                    offset_in_segment += rows_need;
114
115
                    // If collected enough rows, build a new scanner
116
730k
                    if (rows_collected >= _rows_per_scanner) {
117
54.0k
                        split.segment_offsets.first = segment_start,
118
54.0k
                        split.segment_offsets.second = i + 1;
119
54.0k
                        split.segment_row_ranges.emplace_back(std::move(row_ranges));
120
121
54.0k
                        DCHECK_EQ(split.segment_offsets.second - split.segment_offsets.first,
122
54.0k
                                  split.segment_row_ranges.size());
123
124
54.0k
                        partitial_read_source.rs_splits.emplace_back(std::move(split));
125
126
54.0k
                        scanners.emplace_back(_build_scanner(
127
54.0k
                                tablet, version, _key_ranges,
128
54.0k
                                {.rs_splits = std::move(partitial_read_source.rs_splits),
129
54.0k
                                 .delete_predicates = entire_read_source.delete_predicates,
130
54.0k
                                 .delete_bitmap = entire_read_source.delete_bitmap},
131
54.0k
                                take_initial_file_cache_stats(&_tablet_preload_file_cache_stats,
132
54.0k
                                                              tablet->tablet_id())));
133
134
54.0k
                        partitial_read_source = {};
135
54.0k
                        split = RowSetSplits(reader->clone());
136
54.0k
                        row_ranges = RowRanges();
137
138
54.0k
                        segment_start = offset_in_segment < rows_of_segment ? i : i + 1;
139
54.0k
                        rows_collected = 0;
140
54.0k
                    }
141
730k
                }
142
143
                // The non-empty `row_ranges` means there are some rows left in this segment not added into `split`.
144
678k
                if (!row_ranges.is_empty()) {
145
675k
                    DCHECK_GT(rows_collected, 0);
146
675k
                    DCHECK_EQ(row_ranges.to(), rows_of_segment);
147
675k
                    split.segment_row_ranges.emplace_back(std::move(row_ranges));
148
675k
                }
149
678k
            }
150
151
679k
            DCHECK_LE(rows_collected, _rows_per_scanner);
152
679k
            if (rows_collected > 0) {
153
675k
                split.segment_offsets.first = segment_start;
154
675k
                split.segment_offsets.second = segments_rows.size();
155
675k
                DCHECK_GT(split.segment_offsets.second, split.segment_offsets.first);
156
675k
                DCHECK_EQ(split.segment_row_ranges.size(),
157
675k
                          split.segment_offsets.second - split.segment_offsets.first);
158
675k
                partitial_read_source.rs_splits.emplace_back(std::move(split));
159
675k
            }
160
679k
        } // end `for (auto& rowset : rowsets)`
161
162
461k
        DCHECK_LE(rows_collected, _rows_per_scanner);
163
461k
        if (rows_collected > 0) {
164
272k
            DCHECK_GT(partitial_read_source.rs_splits.size(), 0);
165
272k
#ifndef NDEBUG
166
595k
            for (auto& split : partitial_read_source.rs_splits) {
167
595k
                DCHECK(split.rs_reader != nullptr);
168
595k
                DCHECK_LT(split.segment_offsets.first, split.segment_offsets.second);
169
595k
                DCHECK_EQ(split.segment_row_ranges.size(),
170
595k
                          split.segment_offsets.second - split.segment_offsets.first);
171
595k
            }
172
272k
#endif
173
272k
            scanners.emplace_back(
174
272k
                    _build_scanner(tablet, version, _key_ranges,
175
272k
                                   {.rs_splits = std::move(partitial_read_source.rs_splits),
176
272k
                                    .delete_predicates = entire_read_source.delete_predicates,
177
272k
                                    .delete_bitmap = entire_read_source.delete_bitmap},
178
272k
                                   take_initial_file_cache_stats(&_tablet_preload_file_cache_stats,
179
272k
                                                                 tablet->tablet_id())));
180
272k
        }
181
461k
    }
182
183
104k
    return Status::OK();
184
104k
}
185
186
// Build scanners so that each segment is exclusively scanned by a single scanner.
187
// This guarantees the number of scanners equals the number of segments across all rowsets
188
// for the involved tablets. It preserves delete predicates and key ranges, and clones
189
// RowsetReader per scanner to avoid sharing between scanners.
190
22
Status ParallelScannerBuilder::_build_scanners_by_per_segment(std::list<ScannerSPtr>& scanners) {
191
22
    DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner);
192
193
120
    for (auto&& [tablet, version] : _tablets) {
194
120
        DCHECK(_all_read_sources.contains(tablet->tablet_id()));
195
120
        auto& entire_read_source = _all_read_sources[tablet->tablet_id()];
196
197
120
        if (config::is_cloud_mode()) {
198
            // FIXME(plat1ko): Avoid pointer cast
199
120
            ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet);
200
120
        }
201
202
        // For each RowSet split in the read source, split by segment id and build
203
        // one scanner per segment. Keep delete predicates shared.
204
240
        for (auto& rs_split : entire_read_source.rs_splits) {
205
240
            auto reader = rs_split.rs_reader;
206
240
            auto rowset = reader->rowset();
207
240
            const auto rowset_id = rowset->rowset_id();
208
240
            const auto& segments_rows = _all_segments_rows[rowset_id];
209
240
            if (segments_rows.empty() || rowset->num_rows() == 0) {
210
191
                continue;
211
191
            }
212
213
            // Build scanners for [i, i+1) segment range, without row-range slicing.
214
98
            for (int64_t i = 0; i < rowset->num_segments(); ++i) {
215
49
                RowSetSplits split(reader->clone());
216
49
                split.segment_offsets.first = i;
217
49
                split.segment_offsets.second = i + 1;
218
                // No row-ranges slicing; scan whole segment i.
219
49
                DCHECK_GE(split.segment_offsets.second, split.segment_offsets.first + 1);
220
221
49
                TabletReadSource partitial_read_source;
222
49
                partitial_read_source.rs_splits.emplace_back(std::move(split));
223
224
49
                scanners.emplace_back(_build_scanner(
225
49
                        tablet, version, _key_ranges,
226
49
                        {.rs_splits = std::move(partitial_read_source.rs_splits),
227
49
                         .delete_predicates = entire_read_source.delete_predicates,
228
49
                         .delete_bitmap = entire_read_source.delete_bitmap},
229
49
                        take_initial_file_cache_stats(&_tablet_preload_file_cache_stats,
230
49
                                                      tablet->tablet_id())));
231
49
            }
232
49
        }
233
120
    }
234
235
22
    return Status::OK();
236
22
}
237
238
/**
239
 * Load rowsets of each tablet with specified version, segments of each rowset.
240
 */
241
102k
Status ParallelScannerBuilder::_load() {
242
102k
    _total_rows = 0;
243
102k
    size_t idx = 0;
244
102k
    bool enable_segment_cache = _state->query_options().__isset.enable_segment_cache
245
103k
                                        ? _state->query_options().enable_segment_cache
246
18.4E
                                        : true;
247
459k
    for (auto&& [tablet, version] : _tablets) {
248
459k
        const auto tablet_id = tablet->tablet_id();
249
459k
        _all_read_sources[tablet_id] = _read_sources[idx];
250
459k
        const auto& read_source = _all_read_sources[tablet_id];
251
2.16M
        for (auto& rs_split : read_source.rs_splits) {
252
2.16M
            auto rowset = rs_split.rs_reader->rowset();
253
2.16M
            RETURN_IF_ERROR(rowset->load());
254
2.16M
            const auto rowset_id = rowset->rowset_id();
255
256
2.16M
            auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
257
2.16M
            std::vector<uint32_t> segment_rows;
258
2.16M
            OlapReaderStatistics preload_stats;
259
2.16M
            RETURN_IF_ERROR(beta_rowset->get_segment_num_rows(&segment_rows, enable_segment_cache,
260
2.16M
                                                              &preload_stats));
261
2.16M
            _tablet_preload_file_cache_stats[tablet_id].merge_from(preload_stats.file_cache_stats);
262
2.16M
            auto segment_count = rowset->num_segments();
263
2.84M
            for (int64_t i = 0; i != segment_count; i++) {
264
675k
                _all_segments_rows[rowset_id].emplace_back(segment_rows[i]);
265
675k
            }
266
2.16M
            _total_rows += rowset->num_rows();
267
2.16M
        }
268
459k
        idx++;
269
459k
    }
270
271
102k
    _rows_per_scanner = _total_rows / _max_scanners_count;
272
102k
    _rows_per_scanner = std::max<size_t>(_rows_per_scanner, _min_rows_per_scanner);
273
274
102k
    return Status::OK();
275
102k
}
276
277
std::shared_ptr<OlapScanner> ParallelScannerBuilder::_build_scanner(
278
        BaseTabletSPtr tablet, int64_t version, const std::vector<OlapScanRange*>& key_ranges,
279
325k
        TabletReadSource&& read_source, io::FileCacheStatistics&& initial_file_cache_stats) {
280
325k
    OlapScanner::Params params {
281
325k
            .state = _state,
282
325k
            .profile = _scanner_profile.get(),
283
325k
            .key_ranges = key_ranges,
284
325k
            .tablet = std::move(tablet),
285
325k
            .version = version,
286
325k
            .read_source = std::move(read_source),
287
325k
            .initial_file_cache_stats = std::move(initial_file_cache_stats),
288
325k
            .limit = _limit,
289
325k
            .aggregation = _is_preaggregation,
290
325k
            .read_row_binlog = false,
291
325k
            .binlog_scan_type = TBinlogScanType::NONE,
292
325k
            .start_tso = std::nullopt,
293
325k
            .end_tso = std::nullopt,
294
325k
    };
295
325k
    return OlapScanner::create_shared(_parent, std::move(params));
296
325k
}
297
298
} // namespace doris