Coverage Report

Created: 2026-04-13 15:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/split_source_connector.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include "common/config.h"
21
#include "core/custom_allocator.h"
22
#include "runtime/runtime_state.h"
23
#include "util/client_cache.h"
24
25
namespace doris {
26
27
/*
28
 * Multiple scanners within a scan node share a split source.
29
 * Each scanner call `get_next` to get the next scan range. A fast scanner will immediately obtain
30
 * the next scan range, so there is no situation of data skewing.
31
 */
32
class SplitSourceConnector {
33
public:
34
5
    SplitSourceConnector() = default;
35
5
    virtual ~SplitSourceConnector() = default;
36
37
    /**
38
     * Get the next scan range. has_next should be to true to fetch the next scan range.
39
     * @param has_next whether exists the next scan range
40
     * @param range the obtained next scan range
41
     */
42
    virtual Status get_next(bool* has_next, TFileRangeDesc* range) = 0;
43
44
    virtual int num_scan_ranges() = 0;
45
46
    virtual TFileScanRangeParams* get_params() = 0;
47
48
protected:
49
    template <typename T, typename V1 = std::vector<T>, typename V2 = std::vector<T>>
50
        requires(std::is_same_v<std::remove_cvref_t<V1>,
51
                                std::vector<T, typename V1::allocator_type>> &&
52
                 std::is_same_v<std::remove_cvref_t<V2>,
53
                                std::vector<T, typename V2::allocator_type>>)
54
3
    void _merge_ranges(V1& merged_ranges, const V2& scan_ranges) {
55
3
        if (scan_ranges.size() <= _max_scanners) {
56
3
            merged_ranges.assign(scan_ranges.begin(), scan_ranges.end());
57
3
            return;
58
3
        }
59
60
        // There is no need for the number of scanners to exceed the number of threads in thread pool.
61
        // scan_ranges is sorted by path(as well as partition path) in FE, so merge scan ranges in order.
62
        // In the insert statement, reading data in partition order can reduce the memory usage of BE
63
        // and prevent the generation of smaller tables.
64
0
        merged_ranges.resize(_max_scanners);
65
0
        int num_ranges = static_cast<int>(scan_ranges.size()) / _max_scanners;
66
0
        int num_add_one = static_cast<int>(scan_ranges.size()) - num_ranges * _max_scanners;
67
0
        int scan_index = 0;
68
0
        int range_index = 0;
69
0
        for (int i = 0; i < num_add_one; ++i) {
70
0
            merged_ranges[scan_index] = scan_ranges[range_index++];
71
0
            auto& ranges =
72
0
                    merged_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
73
0
            for (int j = 0; j < num_ranges; j++) {
74
0
                auto& tmp_merged_ranges =
75
0
                        scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
76
0
                ranges.insert(ranges.end(), tmp_merged_ranges.begin(), tmp_merged_ranges.end());
77
0
            }
78
0
        }
79
0
        for (int i = num_add_one; i < _max_scanners; ++i) {
80
0
            merged_ranges[scan_index] = scan_ranges[range_index++];
81
0
            auto& ranges =
82
0
                    merged_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
83
0
            for (int j = 0; j < num_ranges - 1; j++) {
84
0
                auto& tmp_merged_ranges =
85
0
                        scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
86
0
                ranges.insert(ranges.end(), tmp_merged_ranges.begin(), tmp_merged_ranges.end());
87
0
            }
88
0
        }
89
0
        LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << merged_ranges.size();
90
0
    }
_ZN5doris20SplitSourceConnector13_merge_rangesINS_16TScanRangeParamsESt6vectorIS2_NS_18CustomStdAllocatorIS2_NS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEEEEES3_IS2_SaIS2_EEQaasr3stdE9is_same_vINSt12remove_cvrefIT0_E4typeES3_IT_NSD_14allocator_typeEEEsr3stdE9is_same_vINSC_IT1_E4typeES3_ISG_NSJ_14allocator_typeEEEEEvRSD_RKSJ_
Line
Count
Source
54
3
    void _merge_ranges(V1& merged_ranges, const V2& scan_ranges) {
55
3
        if (scan_ranges.size() <= _max_scanners) {
56
3
            merged_ranges.assign(scan_ranges.begin(), scan_ranges.end());
57
3
            return;
58
3
        }
59
60
        // There is no need for the number of scanners to exceed the number of threads in thread pool.
61
        // scan_ranges is sorted by path(as well as partition path) in FE, so merge scan ranges in order.
62
        // In the insert statement, reading data in partition order can reduce the memory usage of BE
63
        // and prevent the generation of smaller tables.
64
0
        merged_ranges.resize(_max_scanners);
65
0
        int num_ranges = static_cast<int>(scan_ranges.size()) / _max_scanners;
66
0
        int num_add_one = static_cast<int>(scan_ranges.size()) - num_ranges * _max_scanners;
67
0
        int scan_index = 0;
68
0
        int range_index = 0;
69
0
        for (int i = 0; i < num_add_one; ++i) {
70
0
            merged_ranges[scan_index] = scan_ranges[range_index++];
71
0
            auto& ranges =
72
0
                    merged_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
73
0
            for (int j = 0; j < num_ranges; j++) {
74
0
                auto& tmp_merged_ranges =
75
0
                        scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
76
0
                ranges.insert(ranges.end(), tmp_merged_ranges.begin(), tmp_merged_ranges.end());
77
0
            }
78
0
        }
79
0
        for (int i = num_add_one; i < _max_scanners; ++i) {
80
0
            merged_ranges[scan_index] = scan_ranges[range_index++];
81
0
            auto& ranges =
82
0
                    merged_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
83
0
            for (int j = 0; j < num_ranges - 1; j++) {
84
0
                auto& tmp_merged_ranges =
85
0
                        scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
86
0
                ranges.insert(ranges.end(), tmp_merged_ranges.begin(), tmp_merged_ranges.end());
87
0
            }
88
0
        }
89
        LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << merged_ranges.size();
90
0
    }
Unexecuted instantiation: _ZN5doris20SplitSourceConnector13_merge_rangesINS_19TScanRangeLocationsESt6vectorIS2_NS_18CustomStdAllocatorIS2_NS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEEEEES3_IS2_SaIS2_EEQaasr3stdE9is_same_vINSt12remove_cvrefIT0_E4typeES3_IT_NSD_14allocator_typeEEEsr3stdE9is_same_vINSC_IT1_E4typeES3_ISG_NSJ_14allocator_typeEEEEEvRSD_RKSJ_
91
92
protected:
93
    int _max_scanners;
94
};
95
96
/**
97
 * The file splits are already assigned in `TFileScanRange.ranges`. Scan node has need to
98
 * fetch the scan ranges from frontend.
99
 *
100
 * In cases where the number of files is small, the splits are directly transmitted to backend.
101
 */
102
class LocalSplitSourceConnector : public SplitSourceConnector {
103
private:
104
    std::mutex _range_lock;
105
    DorisVector<TScanRangeParams> _scan_ranges;
106
    int _scan_index = 0;
107
    int _range_index = 0;
108
109
public:
110
3
    LocalSplitSourceConnector(const std::vector<TScanRangeParams>& scan_ranges, int max_scanners) {
111
3
        _max_scanners = max_scanners;
112
3
        _merge_ranges<TScanRangeParams>(_scan_ranges, scan_ranges);
113
3
    }
114
115
    Status get_next(bool* has_next, TFileRangeDesc* range) override;
116
117
3
    int num_scan_ranges() override { return static_cast<int>(_scan_ranges.size()); }
118
119
0
    TFileScanRangeParams* get_params() override {
120
0
        if (_scan_ranges.size() > 0 &&
121
0
            _scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) {
122
            // for compatibility.
123
0
            return &_scan_ranges[0].scan_range.ext_scan_range.file_scan_range.params;
124
0
        }
125
0
        throw Exception(
126
0
                Status::FatalError("Unreachable, params is got by file_scan_range_params_map"));
127
0
    }
128
};
129
130
/**
131
 * The file splits are lazily generated in frontend, and saved as a split source in frontend.
132
 * The scan node needs to fetch file splits from the frontend service. Each split source is identified by
133
 * a unique ID, and the ID is stored in `TFileScanRange.split_source.split_source_id`
134
 *
135
 * In the case of a large number of files, backend can scan data while obtaining splits information.
136
 */
137
class RemoteSplitSourceConnector : public SplitSourceConnector {
138
private:
139
    std::mutex _range_lock;
140
    RuntimeState* _state;
141
    RuntimeProfile::Counter* _get_split_timer;
142
    int64_t _split_source_id;
143
    int _num_splits;
144
145
    DorisVector<TScanRangeLocations> _scan_ranges;
146
    bool _last_batch = false;
147
    int _scan_index = 0;
148
    int _range_index = 0;
149
150
public:
151
    RemoteSplitSourceConnector(RuntimeState* state, RuntimeProfile::Counter* get_split_timer,
152
                               int64_t split_source_id, int num_splits, int max_scanners)
153
0
            : _state(state),
154
0
              _get_split_timer(get_split_timer),
155
0
              _split_source_id(split_source_id),
156
0
              _num_splits(num_splits) {
157
0
        _max_scanners = max_scanners;
158
0
    }
159
160
    Status get_next(bool* has_next, TFileRangeDesc* range) override;
161
162
    /*
163
     * Remote split source is fetched in batch mode, and the splits are generated while scanning,
164
     * so the number of scan ranges may not be accurate.
165
     */
166
0
    int num_scan_ranges() override { return _num_splits; }
167
168
0
    TFileScanRangeParams* get_params() override {
169
0
        throw Exception(
170
0
                Status::FatalError("Unreachable, params is got by file_scan_range_params_map"));
171
0
    }
172
};
173
174
} // namespace doris