Coverage Report

Created: 2026-05-28 11:41

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/result_block_buffer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/result_block_buffer.h"
19
20
#include <gen_cpp/Data_types.h>
21
#include <gen_cpp/PaloInternalService_types.h>
22
#include <gen_cpp/internal_service.pb.h>
23
#include <glog/logging.h>
24
#include <google/protobuf/stubs/callback.h>
25
// IWYU pragma: no_include <bits/chrono.h>
26
#include <chrono> // IWYU pragma: keep
27
#include <limits>
28
#include <ostream>
29
#include <string>
30
#include <utility>
31
#include <vector>
32
33
#include "arrow/type_fwd.h"
34
#include "common/config.h"
35
#include "core/block/block.h"
36
#include "exec/pipeline/dependency.h"
37
#include "exec/sink/writer/varrow_flight_result_writer.h"
38
#include "exec/sink/writer/vmysql_result_writer.h"
39
#include "runtime/runtime_profile.h"
40
#include "runtime/thread_context.h"
41
#include "util/thrift_util.h"
42
43
namespace doris {
44
45
template <typename ResultCtxType>
46
ResultBlockBuffer<ResultCtxType>::ResultBlockBuffer(TUniqueId id, RuntimeState* state,
47
                                                    int buffer_size)
48
218k
        : _fragment_id(std::move(id)),
49
218k
          _is_close(false),
50
218k
          _batch_size(state->batch_size()),
51
218k
          _timezone(state->timezone()),
52
218k
          _be_exec_version(state->be_exec_version()),
53
218k
          _fragment_transmission_compression_type(state->fragement_transmission_compression_type()),
54
218k
          _buffer_limit(buffer_size) {
55
218k
    _mem_tracker = MemTrackerLimiter::create_shared(
56
218k
            MemTrackerLimiter::Type::QUERY,
57
218k
            fmt::format("ResultBlockBuffer#FragmentInstanceId={}", print_id(_fragment_id)));
58
218k
}
_ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEEC2ENS_9TUniqueIdEPNS_12RuntimeStateEi
Line
Count
Source
48
60
        : _fragment_id(std::move(id)),
49
60
          _is_close(false),
50
60
          _batch_size(state->batch_size()),
51
60
          _timezone(state->timezone()),
52
60
          _be_exec_version(state->be_exec_version()),
53
60
          _fragment_transmission_compression_type(state->fragement_transmission_compression_type()),
54
60
          _buffer_limit(buffer_size) {
55
60
    _mem_tracker = MemTrackerLimiter::create_shared(
56
60
            MemTrackerLimiter::Type::QUERY,
57
60
            fmt::format("ResultBlockBuffer#FragmentInstanceId={}", print_id(_fragment_id)));
58
60
}
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEEC2ENS_9TUniqueIdEPNS_12RuntimeStateEi
Line
Count
Source
48
218k
        : _fragment_id(std::move(id)),
49
218k
          _is_close(false),
50
218k
          _batch_size(state->batch_size()),
51
218k
          _timezone(state->timezone()),
52
218k
          _be_exec_version(state->be_exec_version()),
53
218k
          _fragment_transmission_compression_type(state->fragement_transmission_compression_type()),
54
218k
          _buffer_limit(buffer_size) {
55
218k
    _mem_tracker = MemTrackerLimiter::create_shared(
56
218k
            MemTrackerLimiter::Type::QUERY,
57
218k
            fmt::format("ResultBlockBuffer#FragmentInstanceId={}", print_id(_fragment_id)));
58
218k
}
59
60
template <typename ResultCtxType>
61
Status ResultBlockBuffer<ResultCtxType>::close(const TUniqueId& id, Status exec_status,
62
441k
                                               int64_t num_rows, bool& is_fully_closed) {
63
441k
    std::unique_lock<std::mutex> l(_lock);
64
441k
    _returned_rows.fetch_add(num_rows);
65
    // close will be called multiple times and error status needs to be collected.
66
441k
    if (!exec_status.ok()) {
67
1.28k
        _status = exec_status;
68
1.28k
    }
69
70
441k
    auto it = _result_sink_dependencies.find(id);
71
442k
    if (it != _result_sink_dependencies.end()) {
72
442k
        it->second->set_always_ready();
73
442k
        _result_sink_dependencies.erase(it);
74
18.4E
    } else {
75
18.4E
        _status = Status::InternalError("Instance {} is not found in ResultBlockBuffer",
76
18.4E
                                        print_id(id));
77
18.4E
    }
78
441k
    if (!_result_sink_dependencies.empty()) {
79
        // Still waiting for other instances to finish; this is not the final close.
80
222k
        is_fully_closed = false;
81
222k
        return _status;
82
222k
    }
83
84
    // All instances have closed: the buffer is now fully closed.
85
218k
    is_fully_closed = true;
86
218k
    _is_close = true;
87
218k
    _arrow_data_arrival.notify_all();
88
89
218k
    if (!_waiting_rpc.empty()) {
90
126k
        if (_status.ok()) {
91
125k
            for (auto& ctx : _waiting_rpc) {
92
125k
                ctx->on_close(_packet_num, _returned_rows);
93
125k
            }
94
125k
        } else {
95
1.27k
            for (auto& ctx : _waiting_rpc) {
96
1.25k
                ctx->on_failure(_status);
97
1.25k
            }
98
1.27k
        }
99
126k
        _waiting_rpc.clear();
100
126k
    }
101
102
218k
    return _status;
103
441k
}
_ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE5closeERKNS_9TUniqueIdENS_6StatusElRb
Line
Count
Source
62
58
                                               int64_t num_rows, bool& is_fully_closed) {
63
58
    std::unique_lock<std::mutex> l(_lock);
64
58
    _returned_rows.fetch_add(num_rows);
65
    // close will be called multiple times and error status needs to be collected.
66
58
    if (!exec_status.ok()) {
67
2
        _status = exec_status;
68
2
    }
69
70
58
    auto it = _result_sink_dependencies.find(id);
71
58
    if (it != _result_sink_dependencies.end()) {
72
57
        it->second->set_always_ready();
73
57
        _result_sink_dependencies.erase(it);
74
57
    } else {
75
1
        _status = Status::InternalError("Instance {} is not found in ResultBlockBuffer",
76
1
                                        print_id(id));
77
1
    }
78
58
    if (!_result_sink_dependencies.empty()) {
79
        // Still waiting for other instances to finish; this is not the final close.
80
1
        is_fully_closed = false;
81
1
        return _status;
82
1
    }
83
84
    // All instances have closed: the buffer is now fully closed.
85
57
    is_fully_closed = true;
86
57
    _is_close = true;
87
57
    _arrow_data_arrival.notify_all();
88
89
57
    if (!_waiting_rpc.empty()) {
90
2
        if (_status.ok()) {
91
1
            for (auto& ctx : _waiting_rpc) {
92
1
                ctx->on_close(_packet_num, _returned_rows);
93
1
            }
94
1
        } else {
95
1
            for (auto& ctx : _waiting_rpc) {
96
1
                ctx->on_failure(_status);
97
1
            }
98
1
        }
99
2
        _waiting_rpc.clear();
100
2
    }
101
102
57
    return _status;
103
58
}
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE5closeERKNS_9TUniqueIdENS_6StatusElRb
Line
Count
Source
62
441k
                                               int64_t num_rows, bool& is_fully_closed) {
63
441k
    std::unique_lock<std::mutex> l(_lock);
64
441k
    _returned_rows.fetch_add(num_rows);
65
    // close will be called multiple times and error status needs to be collected.
66
441k
    if (!exec_status.ok()) {
67
1.28k
        _status = exec_status;
68
1.28k
    }
69
70
441k
    auto it = _result_sink_dependencies.find(id);
71
442k
    if (it != _result_sink_dependencies.end()) {
72
442k
        it->second->set_always_ready();
73
442k
        _result_sink_dependencies.erase(it);
74
18.4E
    } else {
75
18.4E
        _status = Status::InternalError("Instance {} is not found in ResultBlockBuffer",
76
18.4E
                                        print_id(id));
77
18.4E
    }
78
441k
    if (!_result_sink_dependencies.empty()) {
79
        // Still waiting for other instances to finish; this is not the final close.
80
222k
        is_fully_closed = false;
81
222k
        return _status;
82
222k
    }
83
84
    // All instances have closed: the buffer is now fully closed.
85
218k
    is_fully_closed = true;
86
218k
    _is_close = true;
87
218k
    _arrow_data_arrival.notify_all();
88
89
218k
    if (!_waiting_rpc.empty()) {
90
126k
        if (_status.ok()) {
91
125k
            for (auto& ctx : _waiting_rpc) {
92
125k
                ctx->on_close(_packet_num, _returned_rows);
93
125k
            }
94
125k
        } else {
95
1.27k
            for (auto& ctx : _waiting_rpc) {
96
1.24k
                ctx->on_failure(_status);
97
1.24k
            }
98
1.27k
        }
99
126k
        _waiting_rpc.clear();
100
126k
    }
101
102
218k
    return _status;
103
441k
}
104
105
template <typename ResultCtxType>
106
176k
void ResultBlockBuffer<ResultCtxType>::cancel(const Status& reason) {
107
176k
    std::unique_lock<std::mutex> l(_lock);
108
176k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
109
176k
    if (_status.ok()) {
110
175k
        _status = reason;
111
175k
    }
112
176k
    _arrow_data_arrival.notify_all();
113
176k
    for (auto& ctx : _waiting_rpc) {
114
2
        ctx->on_failure(reason);
115
2
    }
116
176k
    _waiting_rpc.clear();
117
176k
    _update_dependency();
118
176k
    _result_batch_queue.clear();
119
176k
}
_ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE6cancelERKNS_6StatusE
Line
Count
Source
106
56
void ResultBlockBuffer<ResultCtxType>::cancel(const Status& reason) {
107
56
    std::unique_lock<std::mutex> l(_lock);
108
56
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
109
56
    if (_status.ok()) {
110
56
        _status = reason;
111
56
    }
112
56
    _arrow_data_arrival.notify_all();
113
56
    for (auto& ctx : _waiting_rpc) {
114
1
        ctx->on_failure(reason);
115
1
    }
116
56
    _waiting_rpc.clear();
117
56
    _update_dependency();
118
56
    _result_batch_queue.clear();
119
56
}
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE6cancelERKNS_6StatusE
Line
Count
Source
106
176k
void ResultBlockBuffer<ResultCtxType>::cancel(const Status& reason) {
107
176k
    std::unique_lock<std::mutex> l(_lock);
108
176k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
109
176k
    if (_status.ok()) {
110
175k
        _status = reason;
111
175k
    }
112
176k
    _arrow_data_arrival.notify_all();
113
176k
    for (auto& ctx : _waiting_rpc) {
114
1
        ctx->on_failure(reason);
115
1
    }
116
176k
    _waiting_rpc.clear();
117
176k
    _update_dependency();
118
176k
    _result_batch_queue.clear();
119
176k
}
120
121
template <typename ResultCtxType>
122
void ResultBlockBuffer<ResultCtxType>::set_dependency(
123
436k
        const TUniqueId& id, std::shared_ptr<Dependency> result_sink_dependency) {
124
436k
    std::unique_lock<std::mutex> l(_lock);
125
436k
    _result_sink_dependencies[id] = result_sink_dependency;
126
436k
    _update_dependency();
127
436k
}
_ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE14set_dependencyERKNS_9TUniqueIdESt10shared_ptrINS_10DependencyEE
Line
Count
Source
123
60
        const TUniqueId& id, std::shared_ptr<Dependency> result_sink_dependency) {
124
60
    std::unique_lock<std::mutex> l(_lock);
125
60
    _result_sink_dependencies[id] = result_sink_dependency;
126
60
    _update_dependency();
127
60
}
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE14set_dependencyERKNS_9TUniqueIdESt10shared_ptrINS_10DependencyEE
Line
Count
Source
123
436k
        const TUniqueId& id, std::shared_ptr<Dependency> result_sink_dependency) {
124
436k
    std::unique_lock<std::mutex> l(_lock);
125
436k
    _result_sink_dependencies[id] = result_sink_dependency;
126
436k
    _update_dependency();
127
436k
}
128
129
template <typename ResultCtxType>
130
1.11M
void ResultBlockBuffer<ResultCtxType>::_update_dependency() {
131
1.11M
    if (!_status.ok()) {
132
176k
        for (auto it : _result_sink_dependencies) {
133
6
            it.second->set_ready();
134
6
        }
135
176k
        return;
136
176k
    }
137
138
2.34M
    for (auto it : _result_sink_dependencies) {
139
2.34M
        if (_instance_rows[it.first] > _batch_size) {
140
13
            it.second->block();
141
2.34M
        } else {
142
2.34M
            it.second->set_ready();
143
2.34M
        }
144
2.34M
    }
145
942k
}
_ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE18_update_dependencyEv
Line
Count
Source
130
314
void ResultBlockBuffer<ResultCtxType>::_update_dependency() {
131
314
    if (!_status.ok()) {
132
58
        for (auto it : _result_sink_dependencies) {
133
3
            it.second->set_ready();
134
3
        }
135
58
        return;
136
58
    }
137
138
256
    for (auto it : _result_sink_dependencies) {
139
155
        if (_instance_rows[it.first] > _batch_size) {
140
4
            it.second->block();
141
151
        } else {
142
151
            it.second->set_ready();
143
151
        }
144
155
    }
145
256
}
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE18_update_dependencyEv
Line
Count
Source
130
1.11M
void ResultBlockBuffer<ResultCtxType>::_update_dependency() {
131
1.11M
    if (!_status.ok()) {
132
176k
        for (auto it : _result_sink_dependencies) {
133
3
            it.second->set_ready();
134
3
        }
135
176k
        return;
136
176k
    }
137
138
2.34M
    for (auto it : _result_sink_dependencies) {
139
2.34M
        if (_instance_rows[it.first] > _batch_size) {
140
9
            it.second->block();
141
2.34M
        } else {
142
2.34M
            it.second->set_ready();
143
2.34M
        }
144
2.34M
    }
145
942k
}
146
147
template <typename ResultCtxType>
148
359k
Status ResultBlockBuffer<ResultCtxType>::get_batch(std::shared_ptr<ResultCtxType> ctx) {
149
359k
    std::lock_guard<std::mutex> l(_lock);
150
359k
    SCOPED_ATTACH_TASK(_mem_tracker);
151
359k
    Defer defer {[&]() { _update_dependency(); }};
_ZZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE9get_batchESt10shared_ptrIS1_EENKUlvE_clEv
Line
Count
Source
151
8
    Defer defer {[&]() { _update_dependency(); }};
_ZZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE9get_batchESt10shared_ptrIS1_EENKUlvE_clEv
Line
Count
Source
151
359k
    Defer defer {[&]() { _update_dependency(); }};
152
359k
    if (!_status.ok()) {
153
12
        ctx->on_failure(_status);
154
12
        return _status;
155
12
    }
156
359k
    if (!_result_batch_queue.empty()) {
157
6.94k
        auto result = _result_batch_queue.front();
158
6.94k
        _result_batch_queue.pop_front();
159
7.14k
        for (auto it : _instance_rows_in_queue.front()) {
160
7.14k
            _instance_rows[it.first] -= it.second;
161
7.14k
        }
162
6.94k
        _instance_rows_in_queue.pop_front();
163
6.94k
        RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this));
164
6.94k
        _packet_num++;
165
6.94k
        return Status::OK();
166
6.94k
    }
167
352k
    if (_is_close) {
168
89.8k
        if (!_status.ok()) {
169
0
            ctx->on_failure(_status);
170
0
            return Status::OK();
171
0
        }
172
89.8k
        ctx->on_close(_packet_num, _returned_rows);
173
89.8k
        LOG(INFO) << fmt::format(
174
89.8k
                "ResultBlockBuffer finished, fragment_id={}, is_close={}, is_cancelled={}, "
175
89.8k
                "packet_num={}, peak_memory_usage={}",
176
89.8k
                print_id(_fragment_id), _is_close, !_status.ok(), _packet_num,
177
89.8k
                _mem_tracker->peak_consumption());
178
89.8k
        return Status::OK();
179
89.8k
    }
180
    // no ready data, push ctx to waiting list
181
262k
    _waiting_rpc.push_back(ctx);
182
262k
    return Status::OK();
183
352k
}
_ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE9get_batchESt10shared_ptrIS1_E
Line
Count
Source
148
8
Status ResultBlockBuffer<ResultCtxType>::get_batch(std::shared_ptr<ResultCtxType> ctx) {
149
8
    std::lock_guard<std::mutex> l(_lock);
150
8
    SCOPED_ATTACH_TASK(_mem_tracker);
151
8
    Defer defer {[&]() { _update_dependency(); }};
152
8
    if (!_status.ok()) {
153
1
        ctx->on_failure(_status);
154
1
        return _status;
155
1
    }
156
7
    if (!_result_batch_queue.empty()) {
157
2
        auto result = _result_batch_queue.front();
158
2
        _result_batch_queue.pop_front();
159
2
        for (auto it : _instance_rows_in_queue.front()) {
160
2
            _instance_rows[it.first] -= it.second;
161
2
        }
162
2
        _instance_rows_in_queue.pop_front();
163
2
        RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this));
164
2
        _packet_num++;
165
2
        return Status::OK();
166
2
    }
167
5
    if (_is_close) {
168
1
        if (!_status.ok()) {
169
0
            ctx->on_failure(_status);
170
0
            return Status::OK();
171
0
        }
172
1
        ctx->on_close(_packet_num, _returned_rows);
173
1
        LOG(INFO) << fmt::format(
174
1
                "ResultBlockBuffer finished, fragment_id={}, is_close={}, is_cancelled={}, "
175
1
                "packet_num={}, peak_memory_usage={}",
176
1
                print_id(_fragment_id), _is_close, !_status.ok(), _packet_num,
177
1
                _mem_tracker->peak_consumption());
178
1
        return Status::OK();
179
1
    }
180
    // no ready data, push ctx to waiting list
181
4
    _waiting_rpc.push_back(ctx);
182
4
    return Status::OK();
183
5
}
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE9get_batchESt10shared_ptrIS1_E
Line
Count
Source
148
359k
Status ResultBlockBuffer<ResultCtxType>::get_batch(std::shared_ptr<ResultCtxType> ctx) {
149
359k
    std::lock_guard<std::mutex> l(_lock);
150
359k
    SCOPED_ATTACH_TASK(_mem_tracker);
151
359k
    Defer defer {[&]() { _update_dependency(); }};
152
359k
    if (!_status.ok()) {
153
11
        ctx->on_failure(_status);
154
11
        return _status;
155
11
    }
156
359k
    if (!_result_batch_queue.empty()) {
157
6.94k
        auto result = _result_batch_queue.front();
158
6.94k
        _result_batch_queue.pop_front();
159
7.14k
        for (auto it : _instance_rows_in_queue.front()) {
160
7.14k
            _instance_rows[it.first] -= it.second;
161
7.14k
        }
162
6.94k
        _instance_rows_in_queue.pop_front();
163
6.94k
        RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this));
164
6.94k
        _packet_num++;
165
6.94k
        return Status::OK();
166
6.94k
    }
167
352k
    if (_is_close) {
168
89.8k
        if (!_status.ok()) {
169
0
            ctx->on_failure(_status);
170
0
            return Status::OK();
171
0
        }
172
89.8k
        ctx->on_close(_packet_num, _returned_rows);
173
89.8k
        LOG(INFO) << fmt::format(
174
89.8k
                "ResultBlockBuffer finished, fragment_id={}, is_close={}, is_cancelled={}, "
175
89.8k
                "packet_num={}, peak_memory_usage={}",
176
89.8k
                print_id(_fragment_id), _is_close, !_status.ok(), _packet_num,
177
89.8k
                _mem_tracker->peak_consumption());
178
89.8k
        return Status::OK();
179
89.8k
    }
180
    // no ready data, push ctx to waiting list
181
262k
    _waiting_rpc.push_back(ctx);
182
262k
    return Status::OK();
183
352k
}
184
185
template <typename ResultCtxType>
186
Status ResultBlockBuffer<ResultCtxType>::add_batch(RuntimeState* state,
187
144k
                                                   std::shared_ptr<InBlockType>& result) {
188
144k
    std::unique_lock<std::mutex> l(_lock);
189
190
144k
    if (!_status.ok()) {
191
2
        return _status;
192
2
    }
193
194
144k
    if (_waiting_rpc.empty()) {
195
8.32k
        auto sz = 0;
196
8.32k
        auto num_rows = 0;
197
8.32k
        size_t batch_size = 0;
198
8.32k
        if constexpr (std::is_same_v<InBlockType, Block>) {
199
72
            num_rows = cast_set<int>(result->rows());
200
72
            batch_size = result->bytes();
201
8.25k
        } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) {
202
8.25k
            num_rows = cast_set<int>(result->result_batch.rows.size());
203
14.1k
            for (const auto& row : result->result_batch.rows) {
204
14.1k
                batch_size += row.size();
205
14.1k
            }
206
8.25k
        }
207
8.32k
        if (!_result_batch_queue.empty()) {
208
1.11k
            if constexpr (std::is_same_v<InBlockType, Block>) {
209
8
                sz = cast_set<int>(_result_batch_queue.back()->rows());
210
1.10k
            } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) {
211
1.10k
                sz = cast_set<int>(_result_batch_queue.back()->result_batch.rows.size());
212
1.10k
            }
213
1.11k
            if (sz + num_rows < _buffer_limit &&
214
1.11k
                (batch_size + _last_batch_bytes) <= config::thrift_max_message_size) {
215
1.11k
                if constexpr (std::is_same_v<InBlockType, Block>) {
216
8
                    auto last_block = _result_batch_queue.back();
217
8
                    auto mutable_columns_guard = last_block->mutate_columns_scoped();
218
8
                    auto& mutable_columns = mutable_columns_guard.mutable_columns();
219
28
                    for (size_t i = 0; i < last_block->columns(); i++) {
220
20
                        mutable_columns[i]->insert_range_from(*result->get_by_position(i).column, 0,
221
20
                                                              num_rows);
222
20
                    }
223
1.10k
                } else {
224
1.10k
                    std::vector<std::string>& back_rows =
225
1.10k
                            _result_batch_queue.back()->result_batch.rows;
226
1.10k
                    std::vector<std::string>& result_rows = result->result_batch.rows;
227
1.10k
                    back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()),
228
1.10k
                                     std::make_move_iterator(result_rows.end()));
229
1.10k
                }
230
1.11k
                _last_batch_bytes += batch_size;
231
1.11k
            } else {
232
0
                _instance_rows_in_queue.emplace_back();
233
0
                _result_batch_queue.push_back(std::move(result));
234
0
                _last_batch_bytes = batch_size;
235
0
                _arrow_data_arrival
236
0
                        .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,)
237
0
            }
238
7.21k
        } else {
239
7.21k
            _instance_rows_in_queue.emplace_back();
240
7.21k
            _result_batch_queue.push_back(std::move(result));
241
7.21k
            _last_batch_bytes = batch_size;
242
7.21k
            _arrow_data_arrival
243
7.21k
                    .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,)
244
7.21k
        }
245
8.32k
        _instance_rows[state->fragment_instance_id()] += num_rows;
246
8.32k
        _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows;
247
136k
    } else {
248
136k
        auto ctx = _waiting_rpc.front();
249
136k
        _waiting_rpc.pop_front();
250
136k
        RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this));
251
136k
        _packet_num++;
252
136k
    }
253
254
144k
    _update_dependency();
255
144k
    return Status::OK();
256
144k
}
_ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE9add_batchEPNS_12RuntimeStateERSt10shared_ptrINS_5BlockEE
Line
Count
Source
187
74
                                                   std::shared_ptr<InBlockType>& result) {
188
74
    std::unique_lock<std::mutex> l(_lock);
189
190
74
    if (!_status.ok()) {
191
1
        return _status;
192
1
    }
193
194
73
    if (_waiting_rpc.empty()) {
195
72
        auto sz = 0;
196
72
        auto num_rows = 0;
197
72
        size_t batch_size = 0;
198
72
        if constexpr (std::is_same_v<InBlockType, Block>) {
199
72
            num_rows = cast_set<int>(result->rows());
200
72
            batch_size = result->bytes();
201
        } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) {
202
            num_rows = cast_set<int>(result->result_batch.rows.size());
203
            for (const auto& row : result->result_batch.rows) {
204
                batch_size += row.size();
205
            }
206
        }
207
72
        if (!_result_batch_queue.empty()) {
208
8
            if constexpr (std::is_same_v<InBlockType, Block>) {
209
8
                sz = cast_set<int>(_result_batch_queue.back()->rows());
210
            } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) {
211
                sz = cast_set<int>(_result_batch_queue.back()->result_batch.rows.size());
212
            }
213
8
            if (sz + num_rows < _buffer_limit &&
214
8
                (batch_size + _last_batch_bytes) <= config::thrift_max_message_size) {
215
8
                if constexpr (std::is_same_v<InBlockType, Block>) {
216
8
                    auto last_block = _result_batch_queue.back();
217
8
                    auto mutable_columns_guard = last_block->mutate_columns_scoped();
218
8
                    auto& mutable_columns = mutable_columns_guard.mutable_columns();
219
28
                    for (size_t i = 0; i < last_block->columns(); i++) {
220
20
                        mutable_columns[i]->insert_range_from(*result->get_by_position(i).column, 0,
221
20
                                                              num_rows);
222
20
                    }
223
                } else {
224
                    std::vector<std::string>& back_rows =
225
                            _result_batch_queue.back()->result_batch.rows;
226
                    std::vector<std::string>& result_rows = result->result_batch.rows;
227
                    back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()),
228
                                     std::make_move_iterator(result_rows.end()));
229
                }
230
8
                _last_batch_bytes += batch_size;
231
8
            } else {
232
0
                _instance_rows_in_queue.emplace_back();
233
0
                _result_batch_queue.push_back(std::move(result));
234
0
                _last_batch_bytes = batch_size;
235
0
                _arrow_data_arrival
236
0
                        .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,)
237
0
            }
238
64
        } else {
239
64
            _instance_rows_in_queue.emplace_back();
240
64
            _result_batch_queue.push_back(std::move(result));
241
64
            _last_batch_bytes = batch_size;
242
64
            _arrow_data_arrival
243
64
                    .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,)
244
64
        }
245
72
        _instance_rows[state->fragment_instance_id()] += num_rows;
246
72
        _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows;
247
72
    } else {
248
1
        auto ctx = _waiting_rpc.front();
249
1
        _waiting_rpc.pop_front();
250
1
        RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this));
251
1
        _packet_num++;
252
1
    }
253
254
73
    _update_dependency();
255
73
    return Status::OK();
256
73
}
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE9add_batchEPNS_12RuntimeStateERSt10shared_ptrINS_16TFetchDataResultEE
Line
Count
Source
187
144k
                                                   std::shared_ptr<InBlockType>& result) {
188
144k
    std::unique_lock<std::mutex> l(_lock);
189
190
144k
    if (!_status.ok()) {
191
1
        return _status;
192
1
    }
193
194
144k
    if (_waiting_rpc.empty()) {
195
8.25k
        auto sz = 0;
196
8.25k
        auto num_rows = 0;
197
8.25k
        size_t batch_size = 0;
198
        if constexpr (std::is_same_v<InBlockType, Block>) {
199
            num_rows = cast_set<int>(result->rows());
200
            batch_size = result->bytes();
201
8.25k
        } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) {
202
8.25k
            num_rows = cast_set<int>(result->result_batch.rows.size());
203
14.1k
            for (const auto& row : result->result_batch.rows) {
204
14.1k
                batch_size += row.size();
205
14.1k
            }
206
8.25k
        }
207
8.25k
        if (!_result_batch_queue.empty()) {
208
            if constexpr (std::is_same_v<InBlockType, Block>) {
209
                sz = cast_set<int>(_result_batch_queue.back()->rows());
210
1.10k
            } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) {
211
1.10k
                sz = cast_set<int>(_result_batch_queue.back()->result_batch.rows.size());
212
1.10k
            }
213
1.10k
            if (sz + num_rows < _buffer_limit &&
214
1.10k
                (batch_size + _last_batch_bytes) <= config::thrift_max_message_size) {
215
                if constexpr (std::is_same_v<InBlockType, Block>) {
216
                    auto last_block = _result_batch_queue.back();
217
                    auto mutable_columns_guard = last_block->mutate_columns_scoped();
218
                    auto& mutable_columns = mutable_columns_guard.mutable_columns();
219
                    for (size_t i = 0; i < last_block->columns(); i++) {
220
                        mutable_columns[i]->insert_range_from(*result->get_by_position(i).column, 0,
221
                                                              num_rows);
222
                    }
223
1.10k
                } else {
224
1.10k
                    std::vector<std::string>& back_rows =
225
1.10k
                            _result_batch_queue.back()->result_batch.rows;
226
1.10k
                    std::vector<std::string>& result_rows = result->result_batch.rows;
227
1.10k
                    back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()),
228
1.10k
                                     std::make_move_iterator(result_rows.end()));
229
1.10k
                }
230
1.10k
                _last_batch_bytes += batch_size;
231
1.10k
            } else {
232
0
                _instance_rows_in_queue.emplace_back();
233
0
                _result_batch_queue.push_back(std::move(result));
234
0
                _last_batch_bytes = batch_size;
235
0
                _arrow_data_arrival
236
0
                        .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,)
237
0
            }
238
7.14k
        } else {
239
7.14k
            _instance_rows_in_queue.emplace_back();
240
7.14k
            _result_batch_queue.push_back(std::move(result));
241
7.14k
            _last_batch_bytes = batch_size;
242
7.14k
            _arrow_data_arrival
243
7.14k
                    .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,)
244
7.14k
        }
245
8.25k
        _instance_rows[state->fragment_instance_id()] += num_rows;
246
8.25k
        _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows;
247
136k
    } else {
248
136k
        auto ctx = _waiting_rpc.front();
249
136k
        _waiting_rpc.pop_front();
250
136k
        RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this));
251
136k
        _packet_num++;
252
136k
    }
253
254
144k
    _update_dependency();
255
144k
    return Status::OK();
256
144k
}
257
258
template class ResultBlockBuffer<GetArrowResultBatchCtx>;
259
template class ResultBlockBuffer<GetResultBatchCtx>;
260
261
} // namespace doris