Coverage Report

Created: 2026-03-14 20:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/split_source_connector.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include "common/config.h"
21
#include "runtime/runtime_state.h"
22
#include "util/client_cache.h"
23
24
namespace doris {
25
#include "common/compile_check_begin.h"
26
27
/*
28
 * Multiple scanners within a scan node share a split source.
29
 * Each scanner call `get_next` to get the next scan range. A fast scanner will immediately obtain
30
 * the next scan range, so there is no situation of data skewing.
31
 */
32
class SplitSourceConnector {
33
public:
34
3
    SplitSourceConnector() = default;
35
3
    virtual ~SplitSourceConnector() = default;
36
37
    /**
38
     * Get the next scan range. has_next should be to true to fetch the next scan range.
39
     * @param has_next whether exists the next scan range
40
     * @param range the obtained next scan range
41
     */
42
    virtual Status get_next(bool* has_next, TFileRangeDesc* range) = 0;
43
44
    virtual int num_scan_ranges() = 0;
45
46
    virtual TFileScanRangeParams* get_params() = 0;
47
48
protected:
49
    template <typename T>
50
2
    void _merge_ranges(std::vector<T>& merged_ranges, const std::vector<T>& scan_ranges) {
51
2
        if (scan_ranges.size() <= _max_scanners) {
52
2
            merged_ranges = scan_ranges;
53
2
            return;
54
2
        }
55
56
        // There is no need for the number of scanners to exceed the number of threads in thread pool.
57
        // scan_ranges is sorted by path(as well as partition path) in FE, so merge scan ranges in order.
58
        // In the insert statement, reading data in partition order can reduce the memory usage of BE
59
        // and prevent the generation of smaller tables.
60
0
        merged_ranges.resize(_max_scanners);
61
0
        int num_ranges = static_cast<int>(scan_ranges.size()) / _max_scanners;
62
0
        int num_add_one = static_cast<int>(scan_ranges.size()) - num_ranges * _max_scanners;
63
0
        int scan_index = 0;
64
0
        int range_index = 0;
65
0
        for (int i = 0; i < num_add_one; ++i) {
66
0
            merged_ranges[scan_index] = scan_ranges[range_index++];
67
0
            auto& ranges =
68
0
                    merged_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
69
0
            for (int j = 0; j < num_ranges; j++) {
70
0
                auto& tmp_merged_ranges =
71
0
                        scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
72
0
                ranges.insert(ranges.end(), tmp_merged_ranges.begin(), tmp_merged_ranges.end());
73
0
            }
74
0
        }
75
0
        for (int i = num_add_one; i < _max_scanners; ++i) {
76
0
            merged_ranges[scan_index] = scan_ranges[range_index++];
77
0
            auto& ranges =
78
0
                    merged_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
79
0
            for (int j = 0; j < num_ranges - 1; j++) {
80
0
                auto& tmp_merged_ranges =
81
0
                        scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
82
0
                ranges.insert(ranges.end(), tmp_merged_ranges.begin(), tmp_merged_ranges.end());
83
0
            }
84
0
        }
85
0
        LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << merged_ranges.size();
86
0
    }
_ZN5doris20SplitSourceConnector13_merge_rangesINS_16TScanRangeParamsEEEvRSt6vectorIT_SaIS4_EERKS6_
Line
Count
Source
50
2
    void _merge_ranges(std::vector<T>& merged_ranges, const std::vector<T>& scan_ranges) {
51
2
        if (scan_ranges.size() <= _max_scanners) {
52
2
            merged_ranges = scan_ranges;
53
2
            return;
54
2
        }
55
56
        // There is no need for the number of scanners to exceed the number of threads in thread pool.
57
        // scan_ranges is sorted by path(as well as partition path) in FE, so merge scan ranges in order.
58
        // In the insert statement, reading data in partition order can reduce the memory usage of BE
59
        // and prevent the generation of smaller tables.
60
0
        merged_ranges.resize(_max_scanners);
61
0
        int num_ranges = static_cast<int>(scan_ranges.size()) / _max_scanners;
62
0
        int num_add_one = static_cast<int>(scan_ranges.size()) - num_ranges * _max_scanners;
63
0
        int scan_index = 0;
64
0
        int range_index = 0;
65
0
        for (int i = 0; i < num_add_one; ++i) {
66
0
            merged_ranges[scan_index] = scan_ranges[range_index++];
67
0
            auto& ranges =
68
0
                    merged_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
69
0
            for (int j = 0; j < num_ranges; j++) {
70
0
                auto& tmp_merged_ranges =
71
0
                        scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
72
0
                ranges.insert(ranges.end(), tmp_merged_ranges.begin(), tmp_merged_ranges.end());
73
0
            }
74
0
        }
75
0
        for (int i = num_add_one; i < _max_scanners; ++i) {
76
0
            merged_ranges[scan_index] = scan_ranges[range_index++];
77
0
            auto& ranges =
78
0
                    merged_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
79
0
            for (int j = 0; j < num_ranges - 1; j++) {
80
0
                auto& tmp_merged_ranges =
81
0
                        scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
82
0
                ranges.insert(ranges.end(), tmp_merged_ranges.begin(), tmp_merged_ranges.end());
83
0
            }
84
0
        }
85
        LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << merged_ranges.size();
86
0
    }
Unexecuted instantiation: _ZN5doris20SplitSourceConnector13_merge_rangesINS_19TScanRangeLocationsEEEvRSt6vectorIT_SaIS4_EERKS6_
87
88
protected:
89
    int _max_scanners;
90
};
91
92
/**
93
 * The file splits are already assigned in `TFileScanRange.ranges`. Scan node has need to
94
 * fetch the scan ranges from frontend.
95
 *
96
 * In cases where the number of files is small, the splits are directly transmitted to backend.
97
 */
98
class LocalSplitSourceConnector : public SplitSourceConnector {
99
private:
100
    std::mutex _range_lock;
101
    std::vector<TScanRangeParams> _scan_ranges;
102
    int _scan_index = 0;
103
    int _range_index = 0;
104
105
public:
106
2
    LocalSplitSourceConnector(const std::vector<TScanRangeParams>& scan_ranges, int max_scanners) {
107
2
        _max_scanners = max_scanners;
108
2
        _merge_ranges<TScanRangeParams>(_scan_ranges, scan_ranges);
109
2
    }
110
111
    Status get_next(bool* has_next, TFileRangeDesc* range) override;
112
113
2
    int num_scan_ranges() override { return static_cast<int>(_scan_ranges.size()); }
114
115
0
    TFileScanRangeParams* get_params() override {
116
0
        if (_scan_ranges.size() > 0 &&
117
0
            _scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) {
118
            // for compatibility.
119
0
            return &_scan_ranges[0].scan_range.ext_scan_range.file_scan_range.params;
120
0
        }
121
0
        throw Exception(
122
0
                Status::FatalError("Unreachable, params is got by file_scan_range_params_map"));
123
0
    }
124
};
125
126
/**
127
 * The file splits are lazily generated in frontend, and saved as a split source in frontend.
128
 * The scan node needs to fetch file splits from the frontend service. Each split source is identified by
129
 * a unique ID, and the ID is stored in `TFileScanRange.split_source.split_source_id`
130
 *
131
 * In the case of a large number of files, backend can scan data while obtaining splits information.
132
 */
133
class RemoteSplitSourceConnector : public SplitSourceConnector {
134
private:
135
    std::mutex _range_lock;
136
    RuntimeState* _state;
137
    RuntimeProfile::Counter* _get_split_timer;
138
    int64_t _split_source_id;
139
    int _num_splits;
140
141
    std::vector<TScanRangeLocations> _scan_ranges;
142
    bool _last_batch = false;
143
    int _scan_index = 0;
144
    int _range_index = 0;
145
146
public:
147
    RemoteSplitSourceConnector(RuntimeState* state, RuntimeProfile::Counter* get_split_timer,
148
                               int64_t split_source_id, int num_splits, int max_scanners)
149
0
            : _state(state),
150
0
              _get_split_timer(get_split_timer),
151
0
              _split_source_id(split_source_id),
152
0
              _num_splits(num_splits) {
153
0
        _max_scanners = max_scanners;
154
0
    }
155
156
    Status get_next(bool* has_next, TFileRangeDesc* range) override;
157
158
    /*
159
     * Remote split source is fetched in batch mode, and the splits are generated while scanning,
160
     * so the number of scan ranges may not be accurate.
161
     */
162
0
    int num_scan_ranges() override { return _num_splits; }
163
164
0
    TFileScanRangeParams* get_params() override {
165
0
        throw Exception(
166
0
                Status::FatalError("Unreachable, params is got by file_scan_range_params_map"));
167
0
    }
168
};
169
170
#include "common/compile_check_end.h"
171
} // namespace doris