Coverage Report

Created: 2026-03-19 12:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/result_block_buffer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/result_block_buffer.h"
19
20
#include <gen_cpp/Data_types.h>
21
#include <gen_cpp/PaloInternalService_types.h>
22
#include <gen_cpp/internal_service.pb.h>
23
#include <glog/logging.h>
24
#include <google/protobuf/stubs/callback.h>
25
// IWYU pragma: no_include <bits/chrono.h>
26
#include <chrono> // IWYU pragma: keep
27
#include <limits>
28
#include <ostream>
29
#include <string>
30
#include <utility>
31
#include <vector>
32
33
#include "arrow/type_fwd.h"
34
#include "common/config.h"
35
#include "core/block/block.h"
36
#include "exec/pipeline/dependency.h"
37
#include "exec/sink/writer/varrow_flight_result_writer.h"
38
#include "exec/sink/writer/vmysql_result_writer.h"
39
#include "runtime/runtime_profile.h"
40
#include "runtime/thread_context.h"
41
#include "util/thrift_util.h"
42
43
namespace doris {
44
#include "common/compile_check_begin.h"
45
46
template <typename ResultCtxType>
47
ResultBlockBuffer<ResultCtxType>::ResultBlockBuffer(TUniqueId id, RuntimeState* state,
48
                                                    int buffer_size)
49
190k
        : _fragment_id(std::move(id)),
50
190k
          _is_close(false),
51
190k
          _batch_size(state->batch_size()),
52
190k
          _timezone(state->timezone()),
53
190k
          _be_exec_version(state->be_exec_version()),
54
190k
          _fragment_transmission_compression_type(state->fragement_transmission_compression_type()),
55
190k
          _buffer_limit(buffer_size) {
56
190k
    _mem_tracker = MemTrackerLimiter::create_shared(
57
190k
            MemTrackerLimiter::Type::QUERY,
58
190k
            fmt::format("ResultBlockBuffer#FragmentInstanceId={}", print_id(_fragment_id)));
59
190k
}
_ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEEC2ENS_9TUniqueIdEPNS_12RuntimeStateEi
Line
Count
Source
49
8
        : _fragment_id(std::move(id)),
50
8
          _is_close(false),
51
8
          _batch_size(state->batch_size()),
52
8
          _timezone(state->timezone()),
53
8
          _be_exec_version(state->be_exec_version()),
54
8
          _fragment_transmission_compression_type(state->fragement_transmission_compression_type()),
55
8
          _buffer_limit(buffer_size) {
56
8
    _mem_tracker = MemTrackerLimiter::create_shared(
57
8
            MemTrackerLimiter::Type::QUERY,
58
8
            fmt::format("ResultBlockBuffer#FragmentInstanceId={}", print_id(_fragment_id)));
59
8
}
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEEC2ENS_9TUniqueIdEPNS_12RuntimeStateEi
Line
Count
Source
49
190k
        : _fragment_id(std::move(id)),
50
190k
          _is_close(false),
51
190k
          _batch_size(state->batch_size()),
52
190k
          _timezone(state->timezone()),
53
190k
          _be_exec_version(state->be_exec_version()),
54
190k
          _fragment_transmission_compression_type(state->fragement_transmission_compression_type()),
55
190k
          _buffer_limit(buffer_size) {
56
190k
    _mem_tracker = MemTrackerLimiter::create_shared(
57
190k
            MemTrackerLimiter::Type::QUERY,
58
190k
            fmt::format("ResultBlockBuffer#FragmentInstanceId={}", print_id(_fragment_id)));
59
190k
}
60
61
template <typename ResultCtxType>
62
Status ResultBlockBuffer<ResultCtxType>::close(const TUniqueId& id, Status exec_status,
63
392k
                                               int64_t num_rows) {
64
392k
    std::unique_lock<std::mutex> l(_lock);
65
392k
    _returned_rows.fetch_add(num_rows);
66
    // close will be called multiple times and error status needs to be collected.
67
392k
    if (!exec_status.ok()) {
68
1.26k
        _status = exec_status;
69
1.26k
    }
70
71
392k
    auto it = _result_sink_dependencies.find(id);
72
392k
    if (it != _result_sink_dependencies.end()) {
73
392k
        it->second->set_always_ready();
74
392k
        _result_sink_dependencies.erase(it);
75
18.4E
    } else {
76
18.4E
        _status = Status::InternalError("Instance {} is not found in ResultBlockBuffer",
77
18.4E
                                        print_id(id));
78
18.4E
    }
79
392k
    if (!_result_sink_dependencies.empty()) {
80
201k
        return _status;
81
201k
    }
82
83
191k
    _is_close = true;
84
191k
    _arrow_data_arrival.notify_all();
85
86
191k
    if (!_waiting_rpc.empty()) {
87
105k
        if (_status.ok()) {
88
104k
            for (auto& ctx : _waiting_rpc) {
89
104k
                ctx->on_close(_packet_num, _returned_rows);
90
104k
            }
91
104k
        } else {
92
1.21k
            for (auto& ctx : _waiting_rpc) {
93
1.20k
                ctx->on_failure(_status);
94
1.20k
            }
95
1.21k
        }
96
105k
        _waiting_rpc.clear();
97
105k
    }
98
99
191k
    return _status;
100
392k
}
_ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE5closeERKNS_9TUniqueIdENS_6StatusEl
Line
Count
Source
63
6
                                               int64_t num_rows) {
64
6
    std::unique_lock<std::mutex> l(_lock);
65
6
    _returned_rows.fetch_add(num_rows);
66
    // close will be called multiple times and error status needs to be collected.
67
6
    if (!exec_status.ok()) {
68
2
        _status = exec_status;
69
2
    }
70
71
6
    auto it = _result_sink_dependencies.find(id);
72
6
    if (it != _result_sink_dependencies.end()) {
73
5
        it->second->set_always_ready();
74
5
        _result_sink_dependencies.erase(it);
75
5
    } else {
76
1
        _status = Status::InternalError("Instance {} is not found in ResultBlockBuffer",
77
1
                                        print_id(id));
78
1
    }
79
6
    if (!_result_sink_dependencies.empty()) {
80
1
        return _status;
81
1
    }
82
83
5
    _is_close = true;
84
5
    _arrow_data_arrival.notify_all();
85
86
5
    if (!_waiting_rpc.empty()) {
87
2
        if (_status.ok()) {
88
1
            for (auto& ctx : _waiting_rpc) {
89
1
                ctx->on_close(_packet_num, _returned_rows);
90
1
            }
91
1
        } else {
92
1
            for (auto& ctx : _waiting_rpc) {
93
1
                ctx->on_failure(_status);
94
1
            }
95
1
        }
96
2
        _waiting_rpc.clear();
97
2
    }
98
99
5
    return _status;
100
6
}
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE5closeERKNS_9TUniqueIdENS_6StatusEl
Line
Count
Source
63
392k
                                               int64_t num_rows) {
64
392k
    std::unique_lock<std::mutex> l(_lock);
65
392k
    _returned_rows.fetch_add(num_rows);
66
    // close will be called multiple times and error status needs to be collected.
67
392k
    if (!exec_status.ok()) {
68
1.25k
        _status = exec_status;
69
1.25k
    }
70
71
392k
    auto it = _result_sink_dependencies.find(id);
72
392k
    if (it != _result_sink_dependencies.end()) {
73
392k
        it->second->set_always_ready();
74
392k
        _result_sink_dependencies.erase(it);
75
18.4E
    } else {
76
18.4E
        _status = Status::InternalError("Instance {} is not found in ResultBlockBuffer",
77
18.4E
                                        print_id(id));
78
18.4E
    }
79
392k
    if (!_result_sink_dependencies.empty()) {
80
201k
        return _status;
81
201k
    }
82
83
191k
    _is_close = true;
84
191k
    _arrow_data_arrival.notify_all();
85
86
191k
    if (!_waiting_rpc.empty()) {
87
105k
        if (_status.ok()) {
88
104k
            for (auto& ctx : _waiting_rpc) {
89
104k
                ctx->on_close(_packet_num, _returned_rows);
90
104k
            }
91
104k
        } else {
92
1.21k
            for (auto& ctx : _waiting_rpc) {
93
1.20k
                ctx->on_failure(_status);
94
1.20k
            }
95
1.21k
        }
96
105k
        _waiting_rpc.clear();
97
105k
    }
98
99
191k
    return _status;
100
392k
}
101
102
template <typename ResultCtxType>
103
72.5k
void ResultBlockBuffer<ResultCtxType>::cancel(const Status& reason) {
104
72.5k
    std::unique_lock<std::mutex> l(_lock);
105
72.5k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
106
72.5k
    if (_status.ok()) {
107
71.5k
        _status = reason;
108
71.5k
    }
109
72.5k
    _arrow_data_arrival.notify_all();
110
72.5k
    for (auto& ctx : _waiting_rpc) {
111
2
        ctx->on_failure(reason);
112
2
    }
113
72.5k
    _waiting_rpc.clear();
114
72.5k
    _update_dependency();
115
72.5k
    _result_batch_queue.clear();
116
72.5k
}
_ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE6cancelERKNS_6StatusE
Line
Count
Source
103
4
void ResultBlockBuffer<ResultCtxType>::cancel(const Status& reason) {
104
4
    std::unique_lock<std::mutex> l(_lock);
105
4
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
106
4
    if (_status.ok()) {
107
4
        _status = reason;
108
4
    }
109
4
    _arrow_data_arrival.notify_all();
110
4
    for (auto& ctx : _waiting_rpc) {
111
1
        ctx->on_failure(reason);
112
1
    }
113
4
    _waiting_rpc.clear();
114
4
    _update_dependency();
115
4
    _result_batch_queue.clear();
116
4
}
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE6cancelERKNS_6StatusE
Line
Count
Source
103
72.5k
void ResultBlockBuffer<ResultCtxType>::cancel(const Status& reason) {
104
72.5k
    std::unique_lock<std::mutex> l(_lock);
105
72.5k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
106
72.5k
    if (_status.ok()) {
107
71.5k
        _status = reason;
108
71.5k
    }
109
72.5k
    _arrow_data_arrival.notify_all();
110
72.5k
    for (auto& ctx : _waiting_rpc) {
111
1
        ctx->on_failure(reason);
112
1
    }
113
72.5k
    _waiting_rpc.clear();
114
72.5k
    _update_dependency();
115
72.5k
    _result_batch_queue.clear();
116
72.5k
}
117
118
template <typename ResultCtxType>
119
void ResultBlockBuffer<ResultCtxType>::set_dependency(
120
386k
        const TUniqueId& id, std::shared_ptr<Dependency> result_sink_dependency) {
121
386k
    std::unique_lock<std::mutex> l(_lock);
122
386k
    _result_sink_dependencies[id] = result_sink_dependency;
123
386k
    _update_dependency();
124
386k
}
_ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE14set_dependencyERKNS_9TUniqueIdESt10shared_ptrINS_10DependencyEE
Line
Count
Source
120
8
        const TUniqueId& id, std::shared_ptr<Dependency> result_sink_dependency) {
121
8
    std::unique_lock<std::mutex> l(_lock);
122
8
    _result_sink_dependencies[id] = result_sink_dependency;
123
8
    _update_dependency();
124
8
}
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE14set_dependencyERKNS_9TUniqueIdESt10shared_ptrINS_10DependencyEE
Line
Count
Source
120
386k
        const TUniqueId& id, std::shared_ptr<Dependency> result_sink_dependency) {
121
386k
    std::unique_lock<std::mutex> l(_lock);
122
386k
    _result_sink_dependencies[id] = result_sink_dependency;
123
386k
    _update_dependency();
124
386k
}
125
126
template <typename ResultCtxType>
127
915k
void ResultBlockBuffer<ResultCtxType>::_update_dependency() {
128
915k
    if (!_status.ok()) {
129
72.5k
        for (auto it : _result_sink_dependencies) {
130
6
            it.second->set_ready();
131
6
        }
132
72.5k
        return;
133
72.5k
    }
134
135
2.46M
    for (auto it : _result_sink_dependencies) {
136
2.46M
        if (_instance_rows[it.first] > _batch_size) {
137
11
            it.second->block();
138
2.46M
        } else {
139
2.46M
            it.second->set_ready();
140
2.46M
        }
141
2.46M
    }
142
842k
}
_ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE18_update_dependencyEv
Line
Count
Source
127
38
void ResultBlockBuffer<ResultCtxType>::_update_dependency() {
128
38
    if (!_status.ok()) {
129
6
        for (auto it : _result_sink_dependencies) {
130
3
            it.second->set_ready();
131
3
        }
132
6
        return;
133
6
    }
134
135
32
    for (auto it : _result_sink_dependencies) {
136
26
        if (_instance_rows[it.first] > _batch_size) {
137
4
            it.second->block();
138
22
        } else {
139
22
            it.second->set_ready();
140
22
        }
141
26
    }
142
32
}
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE18_update_dependencyEv
Line
Count
Source
127
915k
void ResultBlockBuffer<ResultCtxType>::_update_dependency() {
128
915k
    if (!_status.ok()) {
129
72.5k
        for (auto it : _result_sink_dependencies) {
130
3
            it.second->set_ready();
131
3
        }
132
72.5k
        return;
133
72.5k
    }
134
135
2.46M
    for (auto it : _result_sink_dependencies) {
136
2.46M
        if (_instance_rows[it.first] > _batch_size) {
137
7
            it.second->block();
138
2.46M
        } else {
139
2.46M
            it.second->set_ready();
140
2.46M
        }
141
2.46M
    }
142
842k
}
143
144
template <typename ResultCtxType>
145
321k
Status ResultBlockBuffer<ResultCtxType>::get_batch(std::shared_ptr<ResultCtxType> ctx) {
146
321k
    std::lock_guard<std::mutex> l(_lock);
147
321k
    SCOPED_ATTACH_TASK(_mem_tracker);
148
322k
    Defer defer {[&]() { _update_dependency(); }};
_ZZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE9get_batchESt10shared_ptrIS1_EENKUlvE_clEv
Line
Count
Source
148
8
    Defer defer {[&]() { _update_dependency(); }};
_ZZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE9get_batchESt10shared_ptrIS1_EENKUlvE_clEv
Line
Count
Source
148
322k
    Defer defer {[&]() { _update_dependency(); }};
149
321k
    if (!_status.ok()) {
150
10
        ctx->on_failure(_status);
151
10
        return _status;
152
10
    }
153
321k
    if (!_result_batch_queue.empty()) {
154
6.59k
        auto result = _result_batch_queue.front();
155
6.59k
        _result_batch_queue.pop_front();
156
6.69k
        for (auto it : _instance_rows_in_queue.front()) {
157
6.69k
            _instance_rows[it.first] -= it.second;
158
6.69k
        }
159
6.59k
        _instance_rows_in_queue.pop_front();
160
6.59k
        RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this));
161
6.59k
        _packet_num++;
162
6.59k
        return Status::OK();
163
6.59k
    }
164
315k
    if (_is_close) {
165
85.9k
        if (!_status.ok()) {
166
0
            ctx->on_failure(_status);
167
0
            return Status::OK();
168
0
        }
169
85.9k
        ctx->on_close(_packet_num, _returned_rows);
170
85.9k
        LOG(INFO) << fmt::format(
171
85.9k
                "ResultBlockBuffer finished, fragment_id={}, is_close={}, is_cancelled={}, "
172
85.9k
                "packet_num={}, peak_memory_usage={}",
173
85.9k
                print_id(_fragment_id), _is_close, !_status.ok(), _packet_num,
174
85.9k
                _mem_tracker->peak_consumption());
175
85.9k
        return Status::OK();
176
85.9k
    }
177
    // no ready data, push ctx to waiting list
178
229k
    _waiting_rpc.push_back(ctx);
179
229k
    return Status::OK();
180
315k
}
_ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE9get_batchESt10shared_ptrIS1_E
Line
Count
Source
145
8
Status ResultBlockBuffer<ResultCtxType>::get_batch(std::shared_ptr<ResultCtxType> ctx) {
146
8
    std::lock_guard<std::mutex> l(_lock);
147
8
    SCOPED_ATTACH_TASK(_mem_tracker);
148
8
    Defer defer {[&]() { _update_dependency(); }};
149
8
    if (!_status.ok()) {
150
1
        ctx->on_failure(_status);
151
1
        return _status;
152
1
    }
153
7
    if (!_result_batch_queue.empty()) {
154
2
        auto result = _result_batch_queue.front();
155
2
        _result_batch_queue.pop_front();
156
2
        for (auto it : _instance_rows_in_queue.front()) {
157
2
            _instance_rows[it.first] -= it.second;
158
2
        }
159
2
        _instance_rows_in_queue.pop_front();
160
2
        RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this));
161
2
        _packet_num++;
162
2
        return Status::OK();
163
2
    }
164
5
    if (_is_close) {
165
1
        if (!_status.ok()) {
166
0
            ctx->on_failure(_status);
167
0
            return Status::OK();
168
0
        }
169
1
        ctx->on_close(_packet_num, _returned_rows);
170
1
        LOG(INFO) << fmt::format(
171
1
                "ResultBlockBuffer finished, fragment_id={}, is_close={}, is_cancelled={}, "
172
1
                "packet_num={}, peak_memory_usage={}",
173
1
                print_id(_fragment_id), _is_close, !_status.ok(), _packet_num,
174
1
                _mem_tracker->peak_consumption());
175
1
        return Status::OK();
176
1
    }
177
    // no ready data, push ctx to waiting list
178
4
    _waiting_rpc.push_back(ctx);
179
4
    return Status::OK();
180
5
}
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE9get_batchESt10shared_ptrIS1_E
Line
Count
Source
145
321k
Status ResultBlockBuffer<ResultCtxType>::get_batch(std::shared_ptr<ResultCtxType> ctx) {
146
321k
    std::lock_guard<std::mutex> l(_lock);
147
321k
    SCOPED_ATTACH_TASK(_mem_tracker);
148
321k
    Defer defer {[&]() { _update_dependency(); }};
149
321k
    if (!_status.ok()) {
150
9
        ctx->on_failure(_status);
151
9
        return _status;
152
9
    }
153
321k
    if (!_result_batch_queue.empty()) {
154
6.59k
        auto result = _result_batch_queue.front();
155
6.59k
        _result_batch_queue.pop_front();
156
6.69k
        for (auto it : _instance_rows_in_queue.front()) {
157
6.69k
            _instance_rows[it.first] -= it.second;
158
6.69k
        }
159
6.59k
        _instance_rows_in_queue.pop_front();
160
6.59k
        RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this));
161
6.59k
        _packet_num++;
162
6.59k
        return Status::OK();
163
6.59k
    }
164
315k
    if (_is_close) {
165
85.9k
        if (!_status.ok()) {
166
0
            ctx->on_failure(_status);
167
0
            return Status::OK();
168
0
        }
169
85.9k
        ctx->on_close(_packet_num, _returned_rows);
170
85.9k
        LOG(INFO) << fmt::format(
171
85.9k
                "ResultBlockBuffer finished, fragment_id={}, is_close={}, is_cancelled={}, "
172
85.9k
                "packet_num={}, peak_memory_usage={}",
173
85.9k
                print_id(_fragment_id), _is_close, !_status.ok(), _packet_num,
174
85.9k
                _mem_tracker->peak_consumption());
175
85.9k
        return Status::OK();
176
85.9k
    }
177
    // no ready data, push ctx to waiting list
178
229k
    _waiting_rpc.push_back(ctx);
179
229k
    return Status::OK();
180
315k
}
181
182
template <typename ResultCtxType>
183
Status ResultBlockBuffer<ResultCtxType>::add_batch(RuntimeState* state,
184
131k
                                                   std::shared_ptr<InBlockType>& result) {
185
131k
    std::unique_lock<std::mutex> l(_lock);
186
187
131k
    if (!_status.ok()) {
188
2
        return _status;
189
2
    }
190
191
131k
    if (_waiting_rpc.empty()) {
192
7.59k
        auto sz = 0;
193
7.59k
        auto num_rows = 0;
194
7.59k
        size_t batch_size = 0;
195
7.59k
        if constexpr (std::is_same_v<InBlockType, Block>) {
196
9
            num_rows = cast_set<int>(result->rows());
197
9
            batch_size = result->bytes();
198
7.58k
        } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) {
199
7.58k
            num_rows = cast_set<int>(result->result_batch.rows.size());
200
12.4k
            for (const auto& row : result->result_batch.rows) {
201
12.4k
                batch_size += row.size();
202
12.4k
            }
203
7.58k
        }
204
7.59k
        if (!_result_batch_queue.empty()) {
205
745
            if constexpr (std::is_same_v<InBlockType, Block>) {
206
2
                sz = cast_set<int>(_result_batch_queue.back()->rows());
207
743
            } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) {
208
743
                sz = cast_set<int>(_result_batch_queue.back()->result_batch.rows.size());
209
743
            }
210
745
            if (sz + num_rows < _buffer_limit &&
211
745
                (batch_size + _last_batch_bytes) <= config::thrift_max_message_size) {
212
745
                if constexpr (std::is_same_v<InBlockType, Block>) {
213
2
                    auto last_block = _result_batch_queue.back();
214
4
                    for (size_t i = 0; i < last_block->columns(); i++) {
215
2
                        last_block->mutate_columns()[i]->insert_range_from(
216
2
                                *result->get_by_position(i).column, 0, num_rows);
217
2
                    }
218
743
                } else {
219
743
                    std::vector<std::string>& back_rows =
220
743
                            _result_batch_queue.back()->result_batch.rows;
221
743
                    std::vector<std::string>& result_rows = result->result_batch.rows;
222
743
                    back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()),
223
743
                                     std::make_move_iterator(result_rows.end()));
224
743
                }
225
745
                _last_batch_bytes += batch_size;
226
745
            } else {
227
0
                _instance_rows_in_queue.emplace_back();
228
0
                _result_batch_queue.push_back(std::move(result));
229
0
                _last_batch_bytes = batch_size;
230
0
                _arrow_data_arrival
231
0
                        .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,)
232
0
            }
233
6.85k
        } else {
234
6.85k
            _instance_rows_in_queue.emplace_back();
235
6.85k
            _result_batch_queue.push_back(std::move(result));
236
6.85k
            _last_batch_bytes = batch_size;
237
6.85k
            _arrow_data_arrival
238
6.85k
                    .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,)
239
6.85k
        }
240
7.59k
        _instance_rows[state->fragment_instance_id()] += num_rows;
241
7.59k
        _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows;
242
124k
    } else {
243
124k
        auto ctx = _waiting_rpc.front();
244
124k
        _waiting_rpc.pop_front();
245
124k
        RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this));
246
124k
        _packet_num++;
247
124k
    }
248
249
131k
    _update_dependency();
250
131k
    return Status::OK();
251
131k
}
_ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE9add_batchEPNS_12RuntimeStateERSt10shared_ptrINS_5BlockEE
Line
Count
Source
184
11
                                                   std::shared_ptr<InBlockType>& result) {
185
11
    std::unique_lock<std::mutex> l(_lock);
186
187
11
    if (!_status.ok()) {
188
1
        return _status;
189
1
    }
190
191
10
    if (_waiting_rpc.empty()) {
192
9
        auto sz = 0;
193
9
        auto num_rows = 0;
194
9
        size_t batch_size = 0;
195
9
        if constexpr (std::is_same_v<InBlockType, Block>) {
196
9
            num_rows = cast_set<int>(result->rows());
197
9
            batch_size = result->bytes();
198
        } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) {
199
            num_rows = cast_set<int>(result->result_batch.rows.size());
200
            for (const auto& row : result->result_batch.rows) {
201
                batch_size += row.size();
202
            }
203
        }
204
9
        if (!_result_batch_queue.empty()) {
205
2
            if constexpr (std::is_same_v<InBlockType, Block>) {
206
2
                sz = cast_set<int>(_result_batch_queue.back()->rows());
207
            } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) {
208
                sz = cast_set<int>(_result_batch_queue.back()->result_batch.rows.size());
209
            }
210
2
            if (sz + num_rows < _buffer_limit &&
211
2
                (batch_size + _last_batch_bytes) <= config::thrift_max_message_size) {
212
2
                if constexpr (std::is_same_v<InBlockType, Block>) {
213
2
                    auto last_block = _result_batch_queue.back();
214
4
                    for (size_t i = 0; i < last_block->columns(); i++) {
215
2
                        last_block->mutate_columns()[i]->insert_range_from(
216
2
                                *result->get_by_position(i).column, 0, num_rows);
217
2
                    }
218
                } else {
219
                    std::vector<std::string>& back_rows =
220
                            _result_batch_queue.back()->result_batch.rows;
221
                    std::vector<std::string>& result_rows = result->result_batch.rows;
222
                    back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()),
223
                                     std::make_move_iterator(result_rows.end()));
224
                }
225
2
                _last_batch_bytes += batch_size;
226
2
            } else {
227
0
                _instance_rows_in_queue.emplace_back();
228
0
                _result_batch_queue.push_back(std::move(result));
229
0
                _last_batch_bytes = batch_size;
230
0
                _arrow_data_arrival
231
0
                        .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,)
232
0
            }
233
7
        } else {
234
7
            _instance_rows_in_queue.emplace_back();
235
7
            _result_batch_queue.push_back(std::move(result));
236
7
            _last_batch_bytes = batch_size;
237
7
            _arrow_data_arrival
238
7
                    .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,)
239
7
        }
240
9
        _instance_rows[state->fragment_instance_id()] += num_rows;
241
9
        _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows;
242
9
    } else {
243
1
        auto ctx = _waiting_rpc.front();
244
1
        _waiting_rpc.pop_front();
245
1
        RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this));
246
1
        _packet_num++;
247
1
    }
248
249
10
    _update_dependency();
250
10
    return Status::OK();
251
10
}
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE9add_batchEPNS_12RuntimeStateERSt10shared_ptrINS_16TFetchDataResultEE
Line
Count
Source
184
131k
                                                   std::shared_ptr<InBlockType>& result) {
185
131k
    std::unique_lock<std::mutex> l(_lock);
186
187
131k
    if (!_status.ok()) {
188
1
        return _status;
189
1
    }
190
191
131k
    if (_waiting_rpc.empty()) {
192
7.58k
        auto sz = 0;
193
7.58k
        auto num_rows = 0;
194
7.58k
        size_t batch_size = 0;
195
        if constexpr (std::is_same_v<InBlockType, Block>) {
196
            num_rows = cast_set<int>(result->rows());
197
            batch_size = result->bytes();
198
7.58k
        } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) {
199
7.58k
            num_rows = cast_set<int>(result->result_batch.rows.size());
200
12.4k
            for (const auto& row : result->result_batch.rows) {
201
12.4k
                batch_size += row.size();
202
12.4k
            }
203
7.58k
        }
204
7.58k
        if (!_result_batch_queue.empty()) {
205
            if constexpr (std::is_same_v<InBlockType, Block>) {
206
                sz = cast_set<int>(_result_batch_queue.back()->rows());
207
743
            } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) {
208
743
                sz = cast_set<int>(_result_batch_queue.back()->result_batch.rows.size());
209
743
            }
210
743
            if (sz + num_rows < _buffer_limit &&
211
743
                (batch_size + _last_batch_bytes) <= config::thrift_max_message_size) {
212
                if constexpr (std::is_same_v<InBlockType, Block>) {
213
                    auto last_block = _result_batch_queue.back();
214
                    for (size_t i = 0; i < last_block->columns(); i++) {
215
                        last_block->mutate_columns()[i]->insert_range_from(
216
                                *result->get_by_position(i).column, 0, num_rows);
217
                    }
218
743
                } else {
219
743
                    std::vector<std::string>& back_rows =
220
743
                            _result_batch_queue.back()->result_batch.rows;
221
743
                    std::vector<std::string>& result_rows = result->result_batch.rows;
222
743
                    back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()),
223
743
                                     std::make_move_iterator(result_rows.end()));
224
743
                }
225
743
                _last_batch_bytes += batch_size;
226
743
            } else {
227
0
                _instance_rows_in_queue.emplace_back();
228
0
                _result_batch_queue.push_back(std::move(result));
229
0
                _last_batch_bytes = batch_size;
230
0
                _arrow_data_arrival
231
0
                        .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,)
232
0
            }
233
6.84k
        } else {
234
6.84k
            _instance_rows_in_queue.emplace_back();
235
6.84k
            _result_batch_queue.push_back(std::move(result));
236
6.84k
            _last_batch_bytes = batch_size;
237
6.84k
            _arrow_data_arrival
238
6.84k
                    .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,)
239
6.84k
        }
240
7.58k
        _instance_rows[state->fragment_instance_id()] += num_rows;
241
7.58k
        _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows;
242
124k
    } else {
243
124k
        auto ctx = _waiting_rpc.front();
244
124k
        _waiting_rpc.pop_front();
245
124k
        RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this));
246
124k
        _packet_num++;
247
124k
    }
248
249
131k
    _update_dependency();
250
131k
    return Status::OK();
251
131k
}
252
253
template class ResultBlockBuffer<GetArrowResultBatchCtx>;
254
template class ResultBlockBuffer<GetResultBatchCtx>;
255
256
#include "common/compile_check_end.h"
257
} // namespace doris