Coverage Report

Created: 2026-03-30 11:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/result_block_buffer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/result_block_buffer.h"
19
20
#include <gen_cpp/Data_types.h>
21
#include <gen_cpp/PaloInternalService_types.h>
22
#include <gen_cpp/internal_service.pb.h>
23
#include <glog/logging.h>
24
#include <google/protobuf/stubs/callback.h>
25
// IWYU pragma: no_include <bits/chrono.h>
26
#include <chrono> // IWYU pragma: keep
27
#include <limits>
28
#include <ostream>
29
#include <string>
30
#include <utility>
31
#include <vector>
32
33
#include "arrow/type_fwd.h"
34
#include "common/config.h"
35
#include "core/block/block.h"
36
#include "exec/pipeline/dependency.h"
37
#include "exec/sink/writer/varrow_flight_result_writer.h"
38
#include "exec/sink/writer/vmysql_result_writer.h"
39
#include "runtime/runtime_profile.h"
40
#include "runtime/thread_context.h"
41
#include "util/thrift_util.h"
42
43
namespace doris {
44
#include "common/compile_check_begin.h"
45
46
template <typename ResultCtxType>
47
ResultBlockBuffer<ResultCtxType>::ResultBlockBuffer(TUniqueId id, RuntimeState* state,
48
                                                    int buffer_size)
49
246k
        : _fragment_id(std::move(id)),
50
246k
          _is_close(false),
51
246k
          _batch_size(state->batch_size()),
52
246k
          _timezone(state->timezone()),
53
246k
          _be_exec_version(state->be_exec_version()),
54
246k
          _fragment_transmission_compression_type(state->fragement_transmission_compression_type()),
55
246k
          _buffer_limit(buffer_size) {
56
246k
    _mem_tracker = MemTrackerLimiter::create_shared(
57
246k
            MemTrackerLimiter::Type::QUERY,
58
246k
            fmt::format("ResultBlockBuffer#FragmentInstanceId={}", print_id(_fragment_id)));
59
246k
}
_ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEEC2ENS_9TUniqueIdEPNS_12RuntimeStateEi
Line
Count
Source
49
106
        : _fragment_id(std::move(id)),
50
106
          _is_close(false),
51
106
          _batch_size(state->batch_size()),
52
106
          _timezone(state->timezone()),
53
106
          _be_exec_version(state->be_exec_version()),
54
106
          _fragment_transmission_compression_type(state->fragement_transmission_compression_type()),
55
106
          _buffer_limit(buffer_size) {
56
106
    _mem_tracker = MemTrackerLimiter::create_shared(
57
106
            MemTrackerLimiter::Type::QUERY,
58
106
            fmt::format("ResultBlockBuffer#FragmentInstanceId={}", print_id(_fragment_id)));
59
106
}
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEEC2ENS_9TUniqueIdEPNS_12RuntimeStateEi
Line
Count
Source
49
246k
        : _fragment_id(std::move(id)),
50
246k
          _is_close(false),
51
246k
          _batch_size(state->batch_size()),
52
246k
          _timezone(state->timezone()),
53
246k
          _be_exec_version(state->be_exec_version()),
54
246k
          _fragment_transmission_compression_type(state->fragement_transmission_compression_type()),
55
246k
          _buffer_limit(buffer_size) {
56
246k
    _mem_tracker = MemTrackerLimiter::create_shared(
57
246k
            MemTrackerLimiter::Type::QUERY,
58
246k
            fmt::format("ResultBlockBuffer#FragmentInstanceId={}", print_id(_fragment_id)));
59
246k
}
60
61
template <typename ResultCtxType>
62
Status ResultBlockBuffer<ResultCtxType>::close(const TUniqueId& id, Status exec_status,
63
382k
                                               int64_t num_rows, bool& is_fully_closed) {
64
382k
    std::unique_lock<std::mutex> l(_lock);
65
382k
    _returned_rows.fetch_add(num_rows);
66
    // close will be called multiple times and error status needs to be collected.
67
382k
    if (!exec_status.ok()) {
68
1.30k
        _status = exec_status;
69
1.30k
    }
70
71
382k
    auto it = _result_sink_dependencies.find(id);
72
383k
    if (it != _result_sink_dependencies.end()) {
73
383k
        it->second->set_always_ready();
74
383k
        _result_sink_dependencies.erase(it);
75
18.4E
    } else {
76
18.4E
        _status = Status::InternalError("Instance {} is not found in ResultBlockBuffer",
77
18.4E
                                        print_id(id));
78
18.4E
    }
79
382k
    if (!_result_sink_dependencies.empty()) {
80
        // Still waiting for other instances to finish; this is not the final close.
81
135k
        is_fully_closed = false;
82
135k
        return _status;
83
135k
    }
84
85
    // All instances have closed: the buffer is now fully closed.
86
247k
    is_fully_closed = true;
87
247k
    _is_close = true;
88
247k
    _arrow_data_arrival.notify_all();
89
90
247k
    if (!_waiting_rpc.empty()) {
91
137k
        if (_status.ok()) {
92
136k
            for (auto& ctx : _waiting_rpc) {
93
136k
                ctx->on_close(_packet_num, _returned_rows);
94
136k
            }
95
136k
        } else {
96
1.27k
            for (auto& ctx : _waiting_rpc) {
97
1.26k
                ctx->on_failure(_status);
98
1.26k
            }
99
1.27k
        }
100
137k
        _waiting_rpc.clear();
101
137k
    }
102
103
247k
    return _status;
104
382k
}
_ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE5closeERKNS_9TUniqueIdENS_6StatusElRb
Line
Count
Source
63
104
                                               int64_t num_rows, bool& is_fully_closed) {
64
104
    std::unique_lock<std::mutex> l(_lock);
65
104
    _returned_rows.fetch_add(num_rows);
66
    // close will be called multiple times and error status needs to be collected.
67
104
    if (!exec_status.ok()) {
68
2
        _status = exec_status;
69
2
    }
70
71
104
    auto it = _result_sink_dependencies.find(id);
72
104
    if (it != _result_sink_dependencies.end()) {
73
103
        it->second->set_always_ready();
74
103
        _result_sink_dependencies.erase(it);
75
103
    } else {
76
1
        _status = Status::InternalError("Instance {} is not found in ResultBlockBuffer",
77
1
                                        print_id(id));
78
1
    }
79
104
    if (!_result_sink_dependencies.empty()) {
80
        // Still waiting for other instances to finish; this is not the final close.
81
1
        is_fully_closed = false;
82
1
        return _status;
83
1
    }
84
85
    // All instances have closed: the buffer is now fully closed.
86
103
    is_fully_closed = true;
87
103
    _is_close = true;
88
103
    _arrow_data_arrival.notify_all();
89
90
103
    if (!_waiting_rpc.empty()) {
91
2
        if (_status.ok()) {
92
1
            for (auto& ctx : _waiting_rpc) {
93
1
                ctx->on_close(_packet_num, _returned_rows);
94
1
            }
95
1
        } else {
96
1
            for (auto& ctx : _waiting_rpc) {
97
1
                ctx->on_failure(_status);
98
1
            }
99
1
        }
100
2
        _waiting_rpc.clear();
101
2
    }
102
103
103
    return _status;
104
104
}
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE5closeERKNS_9TUniqueIdENS_6StatusElRb
Line
Count
Source
63
382k
                                               int64_t num_rows, bool& is_fully_closed) {
64
382k
    std::unique_lock<std::mutex> l(_lock);
65
382k
    _returned_rows.fetch_add(num_rows);
66
    // close will be called multiple times and error status needs to be collected.
67
382k
    if (!exec_status.ok()) {
68
1.30k
        _status = exec_status;
69
1.30k
    }
70
71
382k
    auto it = _result_sink_dependencies.find(id);
72
383k
    if (it != _result_sink_dependencies.end()) {
73
383k
        it->second->set_always_ready();
74
383k
        _result_sink_dependencies.erase(it);
75
18.4E
    } else {
76
18.4E
        _status = Status::InternalError("Instance {} is not found in ResultBlockBuffer",
77
18.4E
                                        print_id(id));
78
18.4E
    }
79
382k
    if (!_result_sink_dependencies.empty()) {
80
        // Still waiting for other instances to finish; this is not the final close.
81
135k
        is_fully_closed = false;
82
135k
        return _status;
83
135k
    }
84
85
    // All instances have closed: the buffer is now fully closed.
86
247k
    is_fully_closed = true;
87
247k
    _is_close = true;
88
247k
    _arrow_data_arrival.notify_all();
89
90
247k
    if (!_waiting_rpc.empty()) {
91
137k
        if (_status.ok()) {
92
136k
            for (auto& ctx : _waiting_rpc) {
93
136k
                ctx->on_close(_packet_num, _returned_rows);
94
136k
            }
95
136k
        } else {
96
1.27k
            for (auto& ctx : _waiting_rpc) {
97
1.26k
                ctx->on_failure(_status);
98
1.26k
            }
99
1.27k
        }
100
137k
        _waiting_rpc.clear();
101
137k
    }
102
103
247k
    return _status;
104
382k
}
105
106
template <typename ResultCtxType>
107
197k
void ResultBlockBuffer<ResultCtxType>::cancel(const Status& reason) {
108
197k
    std::unique_lock<std::mutex> l(_lock);
109
197k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
110
197k
    if (_status.ok()) {
111
196k
        _status = reason;
112
196k
    }
113
197k
    _arrow_data_arrival.notify_all();
114
197k
    for (auto& ctx : _waiting_rpc) {
115
2
        ctx->on_failure(reason);
116
2
    }
117
197k
    _waiting_rpc.clear();
118
197k
    _update_dependency();
119
197k
    _result_batch_queue.clear();
120
197k
}
_ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE6cancelERKNS_6StatusE
Line
Count
Source
107
102
void ResultBlockBuffer<ResultCtxType>::cancel(const Status& reason) {
108
102
    std::unique_lock<std::mutex> l(_lock);
109
102
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
110
102
    if (_status.ok()) {
111
102
        _status = reason;
112
102
    }
113
102
    _arrow_data_arrival.notify_all();
114
102
    for (auto& ctx : _waiting_rpc) {
115
1
        ctx->on_failure(reason);
116
1
    }
117
102
    _waiting_rpc.clear();
118
102
    _update_dependency();
119
102
    _result_batch_queue.clear();
120
102
}
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE6cancelERKNS_6StatusE
Line
Count
Source
107
197k
void ResultBlockBuffer<ResultCtxType>::cancel(const Status& reason) {
108
197k
    std::unique_lock<std::mutex> l(_lock);
109
197k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
110
197k
    if (_status.ok()) {
111
196k
        _status = reason;
112
196k
    }
113
197k
    _arrow_data_arrival.notify_all();
114
197k
    for (auto& ctx : _waiting_rpc) {
115
1
        ctx->on_failure(reason);
116
1
    }
117
197k
    _waiting_rpc.clear();
118
197k
    _update_dependency();
119
197k
    _result_batch_queue.clear();
120
197k
}
121
122
template <typename ResultCtxType>
123
void ResultBlockBuffer<ResultCtxType>::set_dependency(
124
380k
        const TUniqueId& id, std::shared_ptr<Dependency> result_sink_dependency) {
125
380k
    std::unique_lock<std::mutex> l(_lock);
126
380k
    _result_sink_dependencies[id] = result_sink_dependency;
127
380k
    _update_dependency();
128
380k
}
_ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE14set_dependencyERKNS_9TUniqueIdESt10shared_ptrINS_10DependencyEE
Line
Count
Source
124
106
        const TUniqueId& id, std::shared_ptr<Dependency> result_sink_dependency) {
125
106
    std::unique_lock<std::mutex> l(_lock);
126
106
    _result_sink_dependencies[id] = result_sink_dependency;
127
106
    _update_dependency();
128
106
}
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE14set_dependencyERKNS_9TUniqueIdESt10shared_ptrINS_10DependencyEE
Line
Count
Source
124
380k
        const TUniqueId& id, std::shared_ptr<Dependency> result_sink_dependency) {
125
380k
    std::unique_lock<std::mutex> l(_lock);
126
380k
    _result_sink_dependencies[id] = result_sink_dependency;
127
380k
    _update_dependency();
128
380k
}
129
130
template <typename ResultCtxType>
131
1.15M
void ResultBlockBuffer<ResultCtxType>::_update_dependency() {
132
1.15M
    if (!_status.ok()) {
133
197k
        for (auto it : _result_sink_dependencies) {
134
6
            it.second->set_ready();
135
6
        }
136
197k
        return;
137
197k
    }
138
139
1.72M
    for (auto it : _result_sink_dependencies) {
140
1.72M
        if (_instance_rows[it.first] > _batch_size) {
141
7
            it.second->block();
142
1.72M
        } else {
143
1.72M
            it.second->set_ready();
144
1.72M
        }
145
1.72M
    }
146
959k
}
_ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE18_update_dependencyEv
Line
Count
Source
131
542
void ResultBlockBuffer<ResultCtxType>::_update_dependency() {
132
542
    if (!_status.ok()) {
133
104
        for (auto it : _result_sink_dependencies) {
134
3
            it.second->set_ready();
135
3
        }
136
104
        return;
137
104
    }
138
139
438
    for (auto it : _result_sink_dependencies) {
140
247
        if (_instance_rows[it.first] > _batch_size) {
141
4
            it.second->block();
142
243
        } else {
143
243
            it.second->set_ready();
144
243
        }
145
247
    }
146
438
}
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE18_update_dependencyEv
Line
Count
Source
131
1.15M
void ResultBlockBuffer<ResultCtxType>::_update_dependency() {
132
1.15M
    if (!_status.ok()) {
133
197k
        for (auto it : _result_sink_dependencies) {
134
3
            it.second->set_ready();
135
3
        }
136
197k
        return;
137
197k
    }
138
139
1.72M
    for (auto it : _result_sink_dependencies) {
140
1.72M
        if (_instance_rows[it.first] > _batch_size) {
141
3
            it.second->block();
142
1.72M
        } else {
143
1.72M
            it.second->set_ready();
144
1.72M
        }
145
1.72M
    }
146
959k
}
147
148
template <typename ResultCtxType>
149
410k
Status ResultBlockBuffer<ResultCtxType>::get_batch(std::shared_ptr<ResultCtxType> ctx) {
150
410k
    std::lock_guard<std::mutex> l(_lock);
151
410k
    SCOPED_ATTACH_TASK(_mem_tracker);
152
411k
    Defer defer {[&]() { _update_dependency(); }};
_ZZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE9get_batchESt10shared_ptrIS1_EENKUlvE_clEv
Line
Count
Source
152
8
    Defer defer {[&]() { _update_dependency(); }};
_ZZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE9get_batchESt10shared_ptrIS1_EENKUlvE_clEv
Line
Count
Source
152
411k
    Defer defer {[&]() { _update_dependency(); }};
153
410k
    if (!_status.ok()) {
154
15
        ctx->on_failure(_status);
155
15
        return _status;
156
15
    }
157
410k
    if (!_result_batch_queue.empty()) {
158
11.6k
        auto result = _result_batch_queue.front();
159
11.6k
        _result_batch_queue.pop_front();
160
11.8k
        for (auto it : _instance_rows_in_queue.front()) {
161
11.8k
            _instance_rows[it.first] -= it.second;
162
11.8k
        }
163
11.6k
        _instance_rows_in_queue.pop_front();
164
11.6k
        RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this));
165
11.6k
        _packet_num++;
166
11.6k
        return Status::OK();
167
11.6k
    }
168
399k
    if (_is_close) {
169
106k
        if (!_status.ok()) {
170
0
            ctx->on_failure(_status);
171
0
            return Status::OK();
172
0
        }
173
106k
        ctx->on_close(_packet_num, _returned_rows);
174
106k
        LOG(INFO) << fmt::format(
175
106k
                "ResultBlockBuffer finished, fragment_id={}, is_close={}, is_cancelled={}, "
176
106k
                "packet_num={}, peak_memory_usage={}",
177
106k
                print_id(_fragment_id), _is_close, !_status.ok(), _packet_num,
178
106k
                _mem_tracker->peak_consumption());
179
106k
        return Status::OK();
180
106k
    }
181
    // no ready data, push ctx to waiting list
182
293k
    _waiting_rpc.push_back(ctx);
183
293k
    return Status::OK();
184
399k
}
_ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE9get_batchESt10shared_ptrIS1_E
Line
Count
Source
149
8
Status ResultBlockBuffer<ResultCtxType>::get_batch(std::shared_ptr<ResultCtxType> ctx) {
150
8
    std::lock_guard<std::mutex> l(_lock);
151
8
    SCOPED_ATTACH_TASK(_mem_tracker);
152
8
    Defer defer {[&]() { _update_dependency(); }};
153
8
    if (!_status.ok()) {
154
1
        ctx->on_failure(_status);
155
1
        return _status;
156
1
    }
157
7
    if (!_result_batch_queue.empty()) {
158
2
        auto result = _result_batch_queue.front();
159
2
        _result_batch_queue.pop_front();
160
2
        for (auto it : _instance_rows_in_queue.front()) {
161
2
            _instance_rows[it.first] -= it.second;
162
2
        }
163
2
        _instance_rows_in_queue.pop_front();
164
2
        RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this));
165
2
        _packet_num++;
166
2
        return Status::OK();
167
2
    }
168
5
    if (_is_close) {
169
1
        if (!_status.ok()) {
170
0
            ctx->on_failure(_status);
171
0
            return Status::OK();
172
0
        }
173
1
        ctx->on_close(_packet_num, _returned_rows);
174
1
        LOG(INFO) << fmt::format(
175
1
                "ResultBlockBuffer finished, fragment_id={}, is_close={}, is_cancelled={}, "
176
1
                "packet_num={}, peak_memory_usage={}",
177
1
                print_id(_fragment_id), _is_close, !_status.ok(), _packet_num,
178
1
                _mem_tracker->peak_consumption());
179
1
        return Status::OK();
180
1
    }
181
    // no ready data, push ctx to waiting list
182
4
    _waiting_rpc.push_back(ctx);
183
4
    return Status::OK();
184
5
}
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE9get_batchESt10shared_ptrIS1_E
Line
Count
Source
149
410k
Status ResultBlockBuffer<ResultCtxType>::get_batch(std::shared_ptr<ResultCtxType> ctx) {
150
410k
    std::lock_guard<std::mutex> l(_lock);
151
410k
    SCOPED_ATTACH_TASK(_mem_tracker);
152
410k
    Defer defer {[&]() { _update_dependency(); }};
153
410k
    if (!_status.ok()) {
154
14
        ctx->on_failure(_status);
155
14
        return _status;
156
14
    }
157
410k
    if (!_result_batch_queue.empty()) {
158
11.6k
        auto result = _result_batch_queue.front();
159
11.6k
        _result_batch_queue.pop_front();
160
11.7k
        for (auto it : _instance_rows_in_queue.front()) {
161
11.7k
            _instance_rows[it.first] -= it.second;
162
11.7k
        }
163
11.6k
        _instance_rows_in_queue.pop_front();
164
11.6k
        RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this));
165
11.6k
        _packet_num++;
166
11.6k
        return Status::OK();
167
11.6k
    }
168
399k
    if (_is_close) {
169
106k
        if (!_status.ok()) {
170
0
            ctx->on_failure(_status);
171
0
            return Status::OK();
172
0
        }
173
106k
        ctx->on_close(_packet_num, _returned_rows);
174
106k
        LOG(INFO) << fmt::format(
175
106k
                "ResultBlockBuffer finished, fragment_id={}, is_close={}, is_cancelled={}, "
176
106k
                "packet_num={}, peak_memory_usage={}",
177
106k
                print_id(_fragment_id), _is_close, !_status.ok(), _packet_num,
178
106k
                _mem_tracker->peak_consumption());
179
106k
        return Status::OK();
180
106k
    }
181
    // no ready data, push ctx to waiting list
182
293k
    _waiting_rpc.push_back(ctx);
183
293k
    return Status::OK();
184
399k
}
185
186
template <typename ResultCtxType>
187
Status ResultBlockBuffer<ResultCtxType>::add_batch(RuntimeState* state,
188
169k
                                                   std::shared_ptr<InBlockType>& result) {
189
169k
    std::unique_lock<std::mutex> l(_lock);
190
191
169k
    if (!_status.ok()) {
192
2
        return _status;
193
2
    }
194
195
169k
    if (_waiting_rpc.empty()) {
196
14.0k
        auto sz = 0;
197
14.0k
        auto num_rows = 0;
198
14.0k
        size_t batch_size = 0;
199
14.0k
        if constexpr (std::is_same_v<InBlockType, Block>) {
200
119
            num_rows = cast_set<int>(result->rows());
201
119
            batch_size = result->bytes();
202
13.9k
        } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) {
203
13.9k
            num_rows = cast_set<int>(result->result_batch.rows.size());
204
25.7k
            for (const auto& row : result->result_batch.rows) {
205
25.7k
                batch_size += row.size();
206
25.7k
            }
207
13.9k
        }
208
14.0k
        if (!_result_batch_queue.empty()) {
209
2.07k
            if constexpr (std::is_same_v<InBlockType, Block>) {
210
12
                sz = cast_set<int>(_result_batch_queue.back()->rows());
211
2.05k
            } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) {
212
2.05k
                sz = cast_set<int>(_result_batch_queue.back()->result_batch.rows.size());
213
2.05k
            }
214
2.07k
            if (sz + num_rows < _buffer_limit &&
215
2.07k
                (batch_size + _last_batch_bytes) <= config::thrift_max_message_size) {
216
2.07k
                if constexpr (std::is_same_v<InBlockType, Block>) {
217
12
                    auto last_block = _result_batch_queue.back();
218
56
                    for (size_t i = 0; i < last_block->columns(); i++) {
219
44
                        last_block->mutate_columns()[i]->insert_range_from(
220
44
                                *result->get_by_position(i).column, 0, num_rows);
221
44
                    }
222
2.05k
                } else {
223
2.05k
                    std::vector<std::string>& back_rows =
224
2.05k
                            _result_batch_queue.back()->result_batch.rows;
225
2.05k
                    std::vector<std::string>& result_rows = result->result_batch.rows;
226
2.05k
                    back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()),
227
2.05k
                                     std::make_move_iterator(result_rows.end()));
228
2.05k
                }
229
2.07k
                _last_batch_bytes += batch_size;
230
2.07k
            } else {
231
0
                _instance_rows_in_queue.emplace_back();
232
0
                _result_batch_queue.push_back(std::move(result));
233
0
                _last_batch_bytes = batch_size;
234
0
                _arrow_data_arrival
235
0
                        .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,)
236
0
            }
237
12.0k
        } else {
238
12.0k
            _instance_rows_in_queue.emplace_back();
239
12.0k
            _result_batch_queue.push_back(std::move(result));
240
12.0k
            _last_batch_bytes = batch_size;
241
12.0k
            _arrow_data_arrival
242
12.0k
                    .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,)
243
12.0k
        }
244
14.0k
        _instance_rows[state->fragment_instance_id()] += num_rows;
245
14.0k
        _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows;
246
155k
    } else {
247
155k
        auto ctx = _waiting_rpc.front();
248
155k
        _waiting_rpc.pop_front();
249
155k
        RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this));
250
155k
        _packet_num++;
251
155k
    }
252
253
169k
    _update_dependency();
254
169k
    return Status::OK();
255
169k
}
_ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE9add_batchEPNS_12RuntimeStateERSt10shared_ptrINS_5BlockEE
Line
Count
Source
188
121
                                                   std::shared_ptr<InBlockType>& result) {
189
121
    std::unique_lock<std::mutex> l(_lock);
190
191
121
    if (!_status.ok()) {
192
1
        return _status;
193
1
    }
194
195
120
    if (_waiting_rpc.empty()) {
196
119
        auto sz = 0;
197
119
        auto num_rows = 0;
198
119
        size_t batch_size = 0;
199
119
        if constexpr (std::is_same_v<InBlockType, Block>) {
200
119
            num_rows = cast_set<int>(result->rows());
201
119
            batch_size = result->bytes();
202
        } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) {
203
            num_rows = cast_set<int>(result->result_batch.rows.size());
204
            for (const auto& row : result->result_batch.rows) {
205
                batch_size += row.size();
206
            }
207
        }
208
119
        if (!_result_batch_queue.empty()) {
209
12
            if constexpr (std::is_same_v<InBlockType, Block>) {
210
12
                sz = cast_set<int>(_result_batch_queue.back()->rows());
211
            } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) {
212
                sz = cast_set<int>(_result_batch_queue.back()->result_batch.rows.size());
213
            }
214
12
            if (sz + num_rows < _buffer_limit &&
215
12
                (batch_size + _last_batch_bytes) <= config::thrift_max_message_size) {
216
12
                if constexpr (std::is_same_v<InBlockType, Block>) {
217
12
                    auto last_block = _result_batch_queue.back();
218
56
                    for (size_t i = 0; i < last_block->columns(); i++) {
219
44
                        last_block->mutate_columns()[i]->insert_range_from(
220
44
                                *result->get_by_position(i).column, 0, num_rows);
221
44
                    }
222
                } else {
223
                    std::vector<std::string>& back_rows =
224
                            _result_batch_queue.back()->result_batch.rows;
225
                    std::vector<std::string>& result_rows = result->result_batch.rows;
226
                    back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()),
227
                                     std::make_move_iterator(result_rows.end()));
228
                }
229
12
                _last_batch_bytes += batch_size;
230
12
            } else {
231
0
                _instance_rows_in_queue.emplace_back();
232
0
                _result_batch_queue.push_back(std::move(result));
233
0
                _last_batch_bytes = batch_size;
234
0
                _arrow_data_arrival
235
0
                        .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,)
236
0
            }
237
107
        } else {
238
107
            _instance_rows_in_queue.emplace_back();
239
107
            _result_batch_queue.push_back(std::move(result));
240
107
            _last_batch_bytes = batch_size;
241
107
            _arrow_data_arrival
242
107
                    .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,)
243
107
        }
244
119
        _instance_rows[state->fragment_instance_id()] += num_rows;
245
119
        _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows;
246
119
    } else {
247
1
        auto ctx = _waiting_rpc.front();
248
1
        _waiting_rpc.pop_front();
249
1
        RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this));
250
1
        _packet_num++;
251
1
    }
252
253
120
    _update_dependency();
254
120
    return Status::OK();
255
120
}
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE9add_batchEPNS_12RuntimeStateERSt10shared_ptrINS_16TFetchDataResultEE
Line
Count
Source
188
169k
                                                   std::shared_ptr<InBlockType>& result) {
189
169k
    std::unique_lock<std::mutex> l(_lock);
190
191
169k
    if (!_status.ok()) {
192
1
        return _status;
193
1
    }
194
195
169k
    if (_waiting_rpc.empty()) {
196
13.9k
        auto sz = 0;
197
13.9k
        auto num_rows = 0;
198
13.9k
        size_t batch_size = 0;
199
        if constexpr (std::is_same_v<InBlockType, Block>) {
200
            num_rows = cast_set<int>(result->rows());
201
            batch_size = result->bytes();
202
13.9k
        } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) {
203
13.9k
            num_rows = cast_set<int>(result->result_batch.rows.size());
204
25.7k
            for (const auto& row : result->result_batch.rows) {
205
25.7k
                batch_size += row.size();
206
25.7k
            }
207
13.9k
        }
208
13.9k
        if (!_result_batch_queue.empty()) {
209
            if constexpr (std::is_same_v<InBlockType, Block>) {
210
                sz = cast_set<int>(_result_batch_queue.back()->rows());
211
2.05k
            } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) {
212
2.05k
                sz = cast_set<int>(_result_batch_queue.back()->result_batch.rows.size());
213
2.05k
            }
214
2.05k
            if (sz + num_rows < _buffer_limit &&
215
2.05k
                (batch_size + _last_batch_bytes) <= config::thrift_max_message_size) {
216
                if constexpr (std::is_same_v<InBlockType, Block>) {
217
                    auto last_block = _result_batch_queue.back();
218
                    for (size_t i = 0; i < last_block->columns(); i++) {
219
                        last_block->mutate_columns()[i]->insert_range_from(
220
                                *result->get_by_position(i).column, 0, num_rows);
221
                    }
222
2.05k
                } else {
223
2.05k
                    std::vector<std::string>& back_rows =
224
2.05k
                            _result_batch_queue.back()->result_batch.rows;
225
2.05k
                    std::vector<std::string>& result_rows = result->result_batch.rows;
226
2.05k
                    back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()),
227
2.05k
                                     std::make_move_iterator(result_rows.end()));
228
2.05k
                }
229
2.05k
                _last_batch_bytes += batch_size;
230
2.05k
            } else {
231
0
                _instance_rows_in_queue.emplace_back();
232
0
                _result_batch_queue.push_back(std::move(result));
233
0
                _last_batch_bytes = batch_size;
234
0
                _arrow_data_arrival
235
0
                        .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,)
236
0
            }
237
11.9k
        } else {
238
11.9k
            _instance_rows_in_queue.emplace_back();
239
11.9k
            _result_batch_queue.push_back(std::move(result));
240
11.9k
            _last_batch_bytes = batch_size;
241
11.9k
            _arrow_data_arrival
242
11.9k
                    .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,)
243
11.9k
        }
244
13.9k
        _instance_rows[state->fragment_instance_id()] += num_rows;
245
13.9k
        _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows;
246
155k
    } else {
247
155k
        auto ctx = _waiting_rpc.front();
248
155k
        _waiting_rpc.pop_front();
249
155k
        RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this));
250
155k
        _packet_num++;
251
155k
    }
252
253
169k
    _update_dependency();
254
169k
    return Status::OK();
255
169k
}
256
257
template class ResultBlockBuffer<GetArrowResultBatchCtx>;
258
template class ResultBlockBuffer<GetResultBatchCtx>;
259
260
#include "common/compile_check_end.h"
261
} // namespace doris