Coverage Report

Created: 2026-03-16 17:53

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/split_source_connector.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include "common/config.h"
21
#include "core/custom_allocator.h"
22
#include "runtime/runtime_state.h"
23
#include "util/client_cache.h"
24
25
namespace doris {
26
#include "common/compile_check_begin.h"
27
28
/*
29
 * Multiple scanners within a scan node share a split source.
30
 * Each scanner call `get_next` to get the next scan range. A fast scanner will immediately obtain
31
 * the next scan range, so there is no situation of data skewing.
32
 */
33
class SplitSourceConnector {
34
public:
35
3
    SplitSourceConnector() = default;
36
3
    virtual ~SplitSourceConnector() = default;
37
38
    /**
39
     * Get the next scan range. has_next should be to true to fetch the next scan range.
40
     * @param has_next whether exists the next scan range
41
     * @param range the obtained next scan range
42
     */
43
    virtual Status get_next(bool* has_next, TFileRangeDesc* range) = 0;
44
45
    virtual int num_scan_ranges() = 0;
46
47
    virtual TFileScanRangeParams* get_params() = 0;
48
49
protected:
50
    template <typename T, typename V1 = std::vector<T>, typename V2 = std::vector<T>>
51
        requires(std::is_same_v<std::remove_cvref_t<V1>,
52
                                std::vector<T, typename V1::allocator_type>> &&
53
                 std::is_same_v<std::remove_cvref_t<V2>,
54
                                std::vector<T, typename V2::allocator_type>>)
55
2
    void _merge_ranges(V1& merged_ranges, const V2& scan_ranges) {
56
2
        if (scan_ranges.size() <= _max_scanners) {
57
2
            merged_ranges.assign(scan_ranges.begin(), scan_ranges.end());
58
2
            return;
59
2
        }
60
61
        // There is no need for the number of scanners to exceed the number of threads in thread pool.
62
        // scan_ranges is sorted by path(as well as partition path) in FE, so merge scan ranges in order.
63
        // In the insert statement, reading data in partition order can reduce the memory usage of BE
64
        // and prevent the generation of smaller tables.
65
0
        merged_ranges.resize(_max_scanners);
66
0
        int num_ranges = static_cast<int>(scan_ranges.size()) / _max_scanners;
67
0
        int num_add_one = static_cast<int>(scan_ranges.size()) - num_ranges * _max_scanners;
68
0
        int scan_index = 0;
69
0
        int range_index = 0;
70
0
        for (int i = 0; i < num_add_one; ++i) {
71
0
            merged_ranges[scan_index] = scan_ranges[range_index++];
72
0
            auto& ranges =
73
0
                    merged_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
74
0
            for (int j = 0; j < num_ranges; j++) {
75
0
                auto& tmp_merged_ranges =
76
0
                        scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
77
0
                ranges.insert(ranges.end(), tmp_merged_ranges.begin(), tmp_merged_ranges.end());
78
0
            }
79
0
        }
80
0
        for (int i = num_add_one; i < _max_scanners; ++i) {
81
0
            merged_ranges[scan_index] = scan_ranges[range_index++];
82
0
            auto& ranges =
83
0
                    merged_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
84
0
            for (int j = 0; j < num_ranges - 1; j++) {
85
0
                auto& tmp_merged_ranges =
86
0
                        scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
87
0
                ranges.insert(ranges.end(), tmp_merged_ranges.begin(), tmp_merged_ranges.end());
88
0
            }
89
0
        }
90
0
        LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << merged_ranges.size();
91
0
    }
_ZN5doris20SplitSourceConnector13_merge_rangesINS_16TScanRangeParamsESt6vectorIS2_NS_18CustomStdAllocatorIS2_NS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEEEEES3_IS2_SaIS2_EEQaasr3stdE9is_same_vINSt12remove_cvrefIT0_E4typeES3_IT_NSD_14allocator_typeEEEsr3stdE9is_same_vINSC_IT1_E4typeES3_ISG_NSJ_14allocator_typeEEEEEvRSD_RKSJ_
Line
Count
Source
55
2
    void _merge_ranges(V1& merged_ranges, const V2& scan_ranges) {
56
2
        if (scan_ranges.size() <= _max_scanners) {
57
2
            merged_ranges.assign(scan_ranges.begin(), scan_ranges.end());
58
2
            return;
59
2
        }
60
61
        // There is no need for the number of scanners to exceed the number of threads in thread pool.
62
        // scan_ranges is sorted by path(as well as partition path) in FE, so merge scan ranges in order.
63
        // In the insert statement, reading data in partition order can reduce the memory usage of BE
64
        // and prevent the generation of smaller tables.
65
0
        merged_ranges.resize(_max_scanners);
66
0
        int num_ranges = static_cast<int>(scan_ranges.size()) / _max_scanners;
67
0
        int num_add_one = static_cast<int>(scan_ranges.size()) - num_ranges * _max_scanners;
68
0
        int scan_index = 0;
69
0
        int range_index = 0;
70
0
        for (int i = 0; i < num_add_one; ++i) {
71
0
            merged_ranges[scan_index] = scan_ranges[range_index++];
72
0
            auto& ranges =
73
0
                    merged_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
74
0
            for (int j = 0; j < num_ranges; j++) {
75
0
                auto& tmp_merged_ranges =
76
0
                        scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
77
0
                ranges.insert(ranges.end(), tmp_merged_ranges.begin(), tmp_merged_ranges.end());
78
0
            }
79
0
        }
80
0
        for (int i = num_add_one; i < _max_scanners; ++i) {
81
0
            merged_ranges[scan_index] = scan_ranges[range_index++];
82
0
            auto& ranges =
83
0
                    merged_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
84
0
            for (int j = 0; j < num_ranges - 1; j++) {
85
0
                auto& tmp_merged_ranges =
86
0
                        scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
87
0
                ranges.insert(ranges.end(), tmp_merged_ranges.begin(), tmp_merged_ranges.end());
88
0
            }
89
0
        }
90
        LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << merged_ranges.size();
91
0
    }
Unexecuted instantiation: _ZN5doris20SplitSourceConnector13_merge_rangesINS_19TScanRangeLocationsESt6vectorIS2_NS_18CustomStdAllocatorIS2_NS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEEEEES3_IS2_SaIS2_EEQaasr3stdE9is_same_vINSt12remove_cvrefIT0_E4typeES3_IT_NSD_14allocator_typeEEEsr3stdE9is_same_vINSC_IT1_E4typeES3_ISG_NSJ_14allocator_typeEEEEEvRSD_RKSJ_
92
93
protected:
94
    int _max_scanners;
95
};
96
97
/**
98
 * The file splits are already assigned in `TFileScanRange.ranges`. Scan node has need to
99
 * fetch the scan ranges from frontend.
100
 *
101
 * In cases where the number of files is small, the splits are directly transmitted to backend.
102
 */
103
class LocalSplitSourceConnector : public SplitSourceConnector {
104
private:
105
    std::mutex _range_lock;
106
    DorisVector<TScanRangeParams> _scan_ranges;
107
    int _scan_index = 0;
108
    int _range_index = 0;
109
110
public:
111
2
    LocalSplitSourceConnector(const std::vector<TScanRangeParams>& scan_ranges, int max_scanners) {
112
2
        _max_scanners = max_scanners;
113
2
        _merge_ranges<TScanRangeParams>(_scan_ranges, scan_ranges);
114
2
    }
115
116
    Status get_next(bool* has_next, TFileRangeDesc* range) override;
117
118
2
    int num_scan_ranges() override { return static_cast<int>(_scan_ranges.size()); }
119
120
0
    TFileScanRangeParams* get_params() override {
121
0
        if (_scan_ranges.size() > 0 &&
122
0
            _scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) {
123
            // for compatibility.
124
0
            return &_scan_ranges[0].scan_range.ext_scan_range.file_scan_range.params;
125
0
        }
126
0
        throw Exception(
127
0
                Status::FatalError("Unreachable, params is got by file_scan_range_params_map"));
128
0
    }
129
};
130
131
/**
132
 * The file splits are lazily generated in frontend, and saved as a split source in frontend.
133
 * The scan node needs to fetch file splits from the frontend service. Each split source is identified by
134
 * a unique ID, and the ID is stored in `TFileScanRange.split_source.split_source_id`
135
 *
136
 * In the case of a large number of files, backend can scan data while obtaining splits information.
137
 */
138
class RemoteSplitSourceConnector : public SplitSourceConnector {
139
private:
140
    std::mutex _range_lock;
141
    RuntimeState* _state;
142
    RuntimeProfile::Counter* _get_split_timer;
143
    int64_t _split_source_id;
144
    int _num_splits;
145
146
    DorisVector<TScanRangeLocations> _scan_ranges;
147
    bool _last_batch = false;
148
    int _scan_index = 0;
149
    int _range_index = 0;
150
151
public:
152
    RemoteSplitSourceConnector(RuntimeState* state, RuntimeProfile::Counter* get_split_timer,
153
                               int64_t split_source_id, int num_splits, int max_scanners)
154
0
            : _state(state),
155
0
              _get_split_timer(get_split_timer),
156
0
              _split_source_id(split_source_id),
157
0
              _num_splits(num_splits) {
158
0
        _max_scanners = max_scanners;
159
0
    }
160
161
    Status get_next(bool* has_next, TFileRangeDesc* range) override;
162
163
    /*
164
     * Remote split source is fetched in batch mode, and the splits are generated while scanning,
165
     * so the number of scan ranges may not be accurate.
166
     */
167
0
    int num_scan_ranges() override { return _num_splits; }
168
169
0
    TFileScanRangeParams* get_params() override {
170
0
        throw Exception(
171
0
                Status::FatalError("Unreachable, params is got by file_scan_range_params_map"));
172
0
    }
173
};
174
175
#include "common/compile_check_end.h"
176
} // namespace doris