Coverage Report

Created: 2026-07-05 00:48

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/split_source_connector.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <functional>
21
22
#include "common/config.h"
23
#include "core/custom_allocator.h"
24
#include "runtime/runtime_state.h"
25
#include "util/client_cache.h"
26
27
namespace doris {
28
29
/*
30
 * Multiple scanners within a scan node share a split source.
31
 * Each scanner call `get_next` to get the next scan range. A fast scanner will immediately obtain
32
 * the next scan range, so there is no situation of data skewing.
33
 */
34
class SplitSourceConnector {
35
public:
36
9
    SplitSourceConnector() = default;
37
9
    virtual ~SplitSourceConnector() = default;
38
39
    /**
40
     * Get the next scan range. has_next should be to true to fetch the next scan range.
41
     * @param has_next whether exists the next scan range
42
     * @param range the obtained next scan range
43
     */
44
    virtual Status get_next(bool* has_next, TFileRangeDesc* range) = 0;
45
46
    virtual int num_scan_ranges() = 0;
47
48
    virtual TFileScanRangeParams* get_params() = 0;
49
50
    virtual bool all_scan_ranges_match(
51
            const TFileScanRangeParams& params,
52
            const std::function<bool(const TFileScanRangeParams&, const TFileRangeDesc&)>&
53
0
                    predicate) {
54
0
        (void)params;
55
0
        (void)predicate;
56
0
        return false;
57
0
    }
58
59
protected:
60
    template <typename T, typename V1 = std::vector<T>, typename V2 = std::vector<T>>
61
        requires(std::is_same_v<std::remove_cvref_t<V1>,
62
                                std::vector<T, typename V1::allocator_type>> &&
63
                 std::is_same_v<std::remove_cvref_t<V2>,
64
                                std::vector<T, typename V2::allocator_type>>)
65
7
    void _merge_ranges(V1& merged_ranges, const V2& scan_ranges) {
66
7
        if (scan_ranges.size() <= _max_scanners) {
67
3
            merged_ranges.assign(scan_ranges.begin(), scan_ranges.end());
68
3
            return;
69
3
        }
70
71
        // There is no need for the number of scanners to exceed the number of threads in thread pool.
72
        // scan_ranges is sorted by path(as well as partition path) in FE, so merge scan ranges in order.
73
        // In the insert statement, reading data in partition order can reduce the memory usage of BE
74
        // and prevent the generation of smaller tables.
75
4
        merged_ranges.resize(_max_scanners);
76
4
        int num_ranges = static_cast<int>(scan_ranges.size()) / _max_scanners;
77
4
        int num_add_one = static_cast<int>(scan_ranges.size()) - num_ranges * _max_scanners;
78
4
        int scan_index = 0;
79
4
        int range_index = 0;
80
4
        for (int i = 0; i < num_add_one; ++i) {
81
0
            merged_ranges[scan_index] = scan_ranges[range_index++];
82
0
            auto& ranges =
83
0
                    merged_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
84
0
            for (int j = 0; j < num_ranges; j++) {
85
0
                auto& tmp_merged_ranges =
86
0
                        scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
87
0
                ranges.insert(ranges.end(), tmp_merged_ranges.begin(), tmp_merged_ranges.end());
88
0
            }
89
0
        }
90
8
        for (int i = num_add_one; i < _max_scanners; ++i) {
91
4
            merged_ranges[scan_index] = scan_ranges[range_index++];
92
4
            auto& ranges =
93
4
                    merged_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
94
8
            for (int j = 0; j < num_ranges - 1; j++) {
95
4
                auto& tmp_merged_ranges =
96
4
                        scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
97
4
                ranges.insert(ranges.end(), tmp_merged_ranges.begin(), tmp_merged_ranges.end());
98
4
            }
99
4
        }
100
4
        LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << merged_ranges.size();
101
4
    }
_ZN5doris20SplitSourceConnector13_merge_rangesINS_16TScanRangeParamsESt6vectorIS2_NS_18CustomStdAllocatorIS2_NS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEEEEES3_IS2_SaIS2_EEQaasr3stdE9is_same_vINSt12remove_cvrefIT0_E4typeES3_IT_NSD_14allocator_typeEEEsr3stdE9is_same_vINSC_IT1_E4typeES3_ISG_NSJ_14allocator_typeEEEEEvRSD_RKSJ_
Line
Count
Source
65
7
    void _merge_ranges(V1& merged_ranges, const V2& scan_ranges) {
66
7
        if (scan_ranges.size() <= _max_scanners) {
67
3
            merged_ranges.assign(scan_ranges.begin(), scan_ranges.end());
68
3
            return;
69
3
        }
70
71
        // There is no need for the number of scanners to exceed the number of threads in thread pool.
72
        // scan_ranges is sorted by path(as well as partition path) in FE, so merge scan ranges in order.
73
        // In the insert statement, reading data in partition order can reduce the memory usage of BE
74
        // and prevent the generation of smaller tables.
75
4
        merged_ranges.resize(_max_scanners);
76
4
        int num_ranges = static_cast<int>(scan_ranges.size()) / _max_scanners;
77
4
        int num_add_one = static_cast<int>(scan_ranges.size()) - num_ranges * _max_scanners;
78
4
        int scan_index = 0;
79
4
        int range_index = 0;
80
4
        for (int i = 0; i < num_add_one; ++i) {
81
0
            merged_ranges[scan_index] = scan_ranges[range_index++];
82
0
            auto& ranges =
83
0
                    merged_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
84
0
            for (int j = 0; j < num_ranges; j++) {
85
0
                auto& tmp_merged_ranges =
86
0
                        scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
87
0
                ranges.insert(ranges.end(), tmp_merged_ranges.begin(), tmp_merged_ranges.end());
88
0
            }
89
0
        }
90
8
        for (int i = num_add_one; i < _max_scanners; ++i) {
91
4
            merged_ranges[scan_index] = scan_ranges[range_index++];
92
4
            auto& ranges =
93
4
                    merged_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
94
8
            for (int j = 0; j < num_ranges - 1; j++) {
95
4
                auto& tmp_merged_ranges =
96
4
                        scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
97
4
                ranges.insert(ranges.end(), tmp_merged_ranges.begin(), tmp_merged_ranges.end());
98
4
            }
99
4
        }
100
        LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << merged_ranges.size();
101
4
    }
Unexecuted instantiation: _ZN5doris20SplitSourceConnector13_merge_rangesINS_19TScanRangeLocationsESt6vectorIS2_NS_18CustomStdAllocatorIS2_NS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEEEEES3_IS2_SaIS2_EEQaasr3stdE9is_same_vINSt12remove_cvrefIT0_E4typeES3_IT_NSD_14allocator_typeEEEsr3stdE9is_same_vINSC_IT1_E4typeES3_ISG_NSJ_14allocator_typeEEEEEvRSD_RKSJ_
102
103
protected:
104
    int _max_scanners;
105
};
106
107
/**
108
 * The file splits are already assigned in `TFileScanRange.ranges`. Scan node has need to
109
 * fetch the scan ranges from frontend.
110
 *
111
 * In cases where the number of files is small, the splits are directly transmitted to backend.
112
 */
113
class LocalSplitSourceConnector : public SplitSourceConnector {
114
private:
115
    std::mutex _range_lock;
116
    DorisVector<TScanRangeParams> _scan_ranges;
117
    int _scan_index = 0;
118
    int _range_index = 0;
119
120
public:
121
7
    LocalSplitSourceConnector(const std::vector<TScanRangeParams>& scan_ranges, int max_scanners) {
122
7
        _max_scanners = max_scanners;
123
7
        _merge_ranges<TScanRangeParams>(_scan_ranges, scan_ranges);
124
7
    }
125
126
    Status get_next(bool* has_next, TFileRangeDesc* range) override;
127
128
3
    int num_scan_ranges() override { return static_cast<int>(_scan_ranges.size()); }
129
130
0
    TFileScanRangeParams* get_params() override {
131
0
        if (_scan_ranges.size() > 0 &&
132
0
            _scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) {
133
            // for compatibility.
134
0
            return &_scan_ranges[0].scan_range.ext_scan_range.file_scan_range.params;
135
0
        }
136
0
        throw Exception(
137
0
                Status::FatalError("Unreachable, params is got by file_scan_range_params_map"));
138
0
    }
139
140
    bool all_scan_ranges_match(
141
            const TFileScanRangeParams& params,
142
            const std::function<bool(const TFileScanRangeParams&, const TFileRangeDesc&)>&
143
4
                    predicate) override {
144
4
        if (_scan_ranges.empty()) {
145
0
            return false;
146
0
        }
147
4
        for (const auto& scan_range : _scan_ranges) {
148
4
            const auto& file_scan_range = scan_range.scan_range.ext_scan_range.file_scan_range;
149
8
            for (const auto& range : file_scan_range.ranges) {
150
8
                if (!predicate(params, range)) {
151
2
                    return false;
152
2
                }
153
8
            }
154
4
        }
155
2
        return true;
156
4
    }
157
};
158
159
/**
160
 * The file splits are lazily generated in frontend, and saved as a split source in frontend.
161
 * The scan node needs to fetch file splits from the frontend service. Each split source is identified by
162
 * a unique ID, and the ID is stored in `TFileScanRange.split_source.split_source_id`
163
 *
164
 * In the case of a large number of files, backend can scan data while obtaining splits information.
165
 */
166
class RemoteSplitSourceConnector : public SplitSourceConnector {
167
private:
168
    std::mutex _range_lock;
169
    RuntimeState* _state;
170
    RuntimeProfile::Counter* _get_split_timer;
171
    int64_t _split_source_id;
172
    int _num_splits;
173
174
    DorisVector<TScanRangeLocations> _scan_ranges;
175
    bool _last_batch = false;
176
    int _scan_index = 0;
177
    int _range_index = 0;
178
179
public:
180
    RemoteSplitSourceConnector(RuntimeState* state, RuntimeProfile::Counter* get_split_timer,
181
                               int64_t split_source_id, int num_splits, int max_scanners)
182
0
            : _state(state),
183
0
              _get_split_timer(get_split_timer),
184
0
              _split_source_id(split_source_id),
185
0
              _num_splits(num_splits) {
186
0
        _max_scanners = max_scanners;
187
0
    }
188
189
    Status get_next(bool* has_next, TFileRangeDesc* range) override;
190
191
    /*
192
     * Remote split source is fetched in batch mode, and the splits are generated while scanning,
193
     * so the number of scan ranges may not be accurate.
194
     */
195
0
    int num_scan_ranges() override { return _num_splits; }
196
197
0
    TFileScanRangeParams* get_params() override {
198
0
        throw Exception(
199
0
                Status::FatalError("Unreachable, params is got by file_scan_range_params_map"));
200
0
    }
201
};
202
203
} // namespace doris