Coverage Report

Created: 2026-04-11 13:34

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/load_stream_stub.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
#include <brpc/controller.h>
20
#include <bthread/condition_variable.h>
21
#include <bthread/mutex.h>
22
#include <bthread/types.h>
23
#include <butil/errno.h>
24
#include <fmt/format.h>
25
#include <gen_cpp/PaloInternalService_types.h>
26
#include <gen_cpp/Types_types.h>
27
#include <gen_cpp/internal_service.pb.h>
28
#include <gen_cpp/types.pb.h>
29
#include <glog/logging.h>
30
#include <google/protobuf/stubs/callback.h>
31
#include <parallel_hashmap/phmap.h>
32
#include <stddef.h>
33
#include <stdint.h>
34
35
#include <atomic>
36
// IWYU pragma: no_include <bits/chrono.h>
37
#include <chrono> // IWYU pragma: keep
38
#include <functional>
39
#include <initializer_list>
40
#include <map>
41
#include <memory>
42
#include <mutex>
43
#include <ostream>
44
#include <queue>
45
#include <set>
46
#include <span>
47
#include <string>
48
#include <unordered_map>
49
#include <unordered_set>
50
#include <utility>
51
#include <vector>
52
53
#include "common/config.h"
54
#include "common/status.h"
55
#include "core/allocator.h"
56
#include "core/block/block.h"
57
#include "core/column/column.h"
58
#include "core/data_type/data_type.h"
59
#include "exprs/vexpr_fwd.h"
60
#include "runtime/exec_env.h"
61
#include "runtime/memory/mem_tracker.h"
62
#include "runtime/runtime_profile.h"
63
#include "runtime/thread_context.h"
64
#include "storage/tablet_info.h"
65
#include "util/countdown_latch.h"
66
#include "util/debug_points.h"
67
#include "util/stopwatch.hpp"
68
69
namespace doris {
70
class TabletSchema;
71
class LoadStreamStub;
72
73
struct SegmentStatistics;
74
75
using IndexToTabletSchema = phmap::parallel_flat_hash_map<
76
        int64_t, std::shared_ptr<TabletSchema>, std::hash<int64_t>, std::equal_to<int64_t>,
77
        std::allocator<phmap::Pair<const int64_t, std::shared_ptr<TabletSchema>>>, 4, std::mutex>;
78
79
using IndexToEnableMoW =
80
        phmap::parallel_flat_hash_map<int64_t, bool, std::hash<int64_t>, std::equal_to<int64_t>,
81
                                      std::allocator<phmap::Pair<const int64_t, bool>>, 4,
82
                                      std::mutex>;
83
84
class LoadStreamReplyHandler : public brpc::StreamInputHandler {
85
public:
86
    LoadStreamReplyHandler(PUniqueId load_id, int64_t dst_id, std::weak_ptr<LoadStreamStub> stub)
87
4.06k
            : _load_id(load_id), _dst_id(dst_id), _stub(stub) {}
88
89
    int on_received_messages(brpc::StreamId id, butil::IOBuf* const messages[],
90
                             size_t size) override;
91
92
0
    void on_idle_timeout(brpc::StreamId id) override {}
93
94
    void on_closed(brpc::StreamId id) override;
95
96
    friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamReplyHandler& handler);
97
98
private:
99
    PUniqueId _load_id;   // for logging
100
    int64_t _dst_id = -1; // for logging
101
    std::weak_ptr<LoadStreamStub> _stub;
102
};
103
104
class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {
105
    friend class LoadStreamReplyHandler;
106
107
public:
108
    // construct new stub
109
    LoadStreamStub(PUniqueId load_id, int64_t src_id,
110
                   std::shared_ptr<IndexToTabletSchema> schema_map,
111
                   std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false);
112
113
    LoadStreamStub(UniqueId load_id, int64_t src_id,
114
                   std::shared_ptr<IndexToTabletSchema> schema_map,
115
                   std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false)
116
4.11k
            : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map, incremental) {};
117
118
// for mock this class in UT
119
#ifdef BE_TEST
120
    virtual
121
#endif
122
            ~LoadStreamStub();
123
124
    // open_load_stream
125
    Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const NodeInfo& node_info,
126
                int64_t txn_id, const OlapTableSchemaParam& schema,
127
                const std::vector<PTabletID>& tablets_for_schema, int total_streams,
128
                int64_t idle_timeout_ms, bool enable_profile);
129
130
// for mock this class in UT
131
#ifdef BE_TEST
132
    virtual
133
#endif
134
            // segment_id is limited by max_segment_num_per_rowset (default value of 1000),
135
            // so in practice it will not exceed the range of i16.
136
137
            // APPEND_DATA
138
            Status
139
            append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id,
140
                        int32_t segment_id, uint64_t offset, std::span<const Slice> data,
141
                        bool segment_eos = false, FileType file_type = FileType::SEGMENT_FILE);
142
143
    // ADD_SEGMENT
144
    Status add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id,
145
                       int32_t segment_id, const SegmentStatistics& segment_stat);
146
147
    // CLOSE_LOAD
148
    Status close_load(const std::vector<PTabletID>& tablets_to_commit, int num_incremental_streams);
149
150
    // GET_SCHEMA
151
    Status get_schema(const std::vector<PTabletID>& tablets);
152
153
    // wait remote to close stream,
154
    // remote will close stream when it receives CLOSE_LOAD
155
    Status close_finish_check(RuntimeState* state, bool* is_closed);
156
157
    // cancel the stream, abort close_wait, mark _is_closed and _is_cancelled
158
    void cancel(Status reason);
159
160
    Status wait_for_schema(int64_t partition_id, int64_t index_id, int64_t tablet_id,
161
                           int64_t timeout_ms = 60000);
162
163
4
    Status wait_for_new_schema(int64_t timeout_ms) {
164
4
        std::unique_lock<bthread::Mutex> lock(_schema_mutex);
165
4
        if (timeout_ms > 0) {
166
4
            int ret = _schema_cv.wait_for(lock, timeout_ms * 1000);
167
4
            return ret == 0 ? Status::OK() : Status::Error<true>(ret, "wait schema update timeout");
168
4
        }
169
0
        _schema_cv.wait(lock);
170
0
        return Status::OK();
171
4
    };
172
173
7.50k
    std::shared_ptr<TabletSchema> tablet_schema(int64_t index_id) const {
174
7.50k
        return (*_tablet_schema_for_index)[index_id];
175
7.50k
    }
176
177
7.50k
    bool enable_unique_mow(int64_t index_id) const {
178
7.50k
        return _enable_unique_mow_for_index->at(index_id);
179
7.50k
    }
180
181
4.09k
    std::vector<int64_t> success_tablets() {
182
4.09k
        std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
183
4.09k
        return _success_tablets;
184
4.09k
    }
185
186
4.09k
    std::unordered_map<int64_t, Status> failed_tablets() {
187
4.09k
        std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
188
4.09k
        return _failed_tablets;
189
4.09k
    }
190
191
0
    brpc::StreamId stream_id() const { return _stream_id; }
192
193
0
    int64_t src_id() const { return _src_id; }
194
195
0
    int64_t dst_id() const { return _dst_id; }
196
197
0
    bool is_open() const { return _is_open.load(); }
198
199
10.8k
    bool is_incremental() const { return _is_incremental; }
200
201
    friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& stub);
202
203
    std::string to_string();
204
205
    // for tests only
206
    void add_success_tablet(int64_t tablet_id) {
207
        std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
208
        _success_tablets.push_back(tablet_id);
209
    }
210
211
13
    void add_failed_tablet(int64_t tablet_id, Status reason) {
212
13
        std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
213
13
        _failed_tablets[tablet_id] = reason;
214
13
    }
215
216
116k
    void add_bytes_written(size_t bytes) {
217
116k
        std::lock_guard<bthread::Mutex> lock(_write_mutex);
218
116k
        _bytes_written += bytes;
219
116k
    }
220
221
0
    int64_t bytes_written() {
222
0
        std::lock_guard<bthread::Mutex> lock(_write_mutex);
223
0
        return _bytes_written;
224
0
    }
225
226
35.7k
    Status check_cancel() {
227
35.7k
        DBUG_EXECUTE_IF("LoadStreamStub._check_cancel.cancelled",
228
35.7k
                        { return Status::InternalError("stream cancelled"); });
229
35.7k
        if (!_is_cancelled.load()) {
230
35.7k
            return Status::OK();
231
35.7k
        }
232
18.4E
        std::lock_guard<bthread::Mutex> lock(_cancel_mutex);
233
18.4E
        return Status::Cancelled("load_id={}, reason: {}", print_id(_load_id),
234
18.4E
                                 _cancel_st.to_string_no_stack());
235
35.7k
    }
236
237
4.88k
    int64_t get_and_reset_load_back_pressure_version_wait_time_ms() {
238
4.88k
        return _load_back_pressure_version_wait_time_ms.exchange(0);
239
4.88k
    }
240
241
    void _refresh_back_pressure_version_wait_time(
242
            const ::google::protobuf::RepeatedPtrField<::doris::PTabletLoadRowsetInfo>&
243
                    tablet_load_infos);
244
245
private:
246
    Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data = {});
247
    Status _send_with_buffer(butil::IOBuf& buf, bool sync = false);
248
    Status _send_with_retry(butil::IOBuf& buf);
249
    void _handle_failure(butil::IOBuf& buf, Status st);
250
251
protected:
252
    std::atomic<bool> _is_init;
253
    std::atomic<bool> _is_open;
254
    std::atomic<bool> _is_closing;
255
    std::atomic<bool> _is_closed;
256
    std::atomic<bool> _is_cancelled;
257
    std::atomic<bool> _is_eos;
258
259
    PUniqueId _load_id;
260
    brpc::StreamId _stream_id;
261
    int64_t _src_id = -1; // source backend_id
262
    int64_t _dst_id = -1; // destination backend_id
263
    Status _status = Status::InternalError<false>("Stream is not open");
264
    Status _cancel_st;
265
266
    bthread::Mutex _open_mutex;
267
    bthread::Mutex _cancel_mutex;
268
269
    std::mutex _buffer_mutex;
270
    std::mutex _send_mutex;
271
    butil::IOBuf _buffer;
272
273
    bthread::Mutex _schema_mutex;
274
    bthread::ConditionVariable _schema_cv;
275
    std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index;
276
    std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;
277
278
    bthread::Mutex _success_tablets_mutex;
279
    bthread::Mutex _failed_tablets_mutex;
280
    std::vector<int64_t> _success_tablets;
281
    std::unordered_map<int64_t, Status> _failed_tablets;
282
283
    bool _is_incremental = false;
284
285
    bthread::Mutex _write_mutex;
286
    size_t _bytes_written = 0;
287
288
    std::atomic<int64_t> _load_back_pressure_version_wait_time_ms {0};
289
};
290
291
// a collection of LoadStreams connect to the same node
292
class LoadStreamStubs {
293
public:
294
    LoadStreamStubs(size_t num_streams, UniqueId load_id, int64_t src_id,
295
                    std::shared_ptr<IndexToTabletSchema> schema_map,
296
                    std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false)
297
2.13k
            : _is_incremental(incremental) {
298
2.13k
        _streams.reserve(num_streams);
299
6.25k
        for (size_t i = 0; i < num_streams; i++) {
300
4.11k
            _streams.emplace_back(
301
4.11k
                    new LoadStreamStub(load_id, src_id, schema_map, mow_map, incremental));
302
4.11k
        }
303
2.13k
    }
304
305
    Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const NodeInfo& node_info,
306
                int64_t txn_id, const OlapTableSchemaParam& schema,
307
                const std::vector<PTabletID>& tablets_for_schema, int total_streams,
308
                int64_t idle_timeout_ms, bool enable_profile);
309
310
4.18k
    bool is_incremental() const { return _is_incremental; }
311
312
    size_t size() const { return _streams.size(); }
313
314
    // for UT only
315
    void mark_open() { _open_success.store(true); }
316
317
3.81k
    std::shared_ptr<LoadStreamStub> select_one_stream() {
318
3.81k
        if (!_open_success.load()) {
319
0
            return nullptr;
320
0
        }
321
3.81k
        size_t i = _select_index.fetch_add(1);
322
3.81k
        return _streams[i % _streams.size()];
323
3.81k
    }
324
325
32
    void cancel(Status reason) {
326
64
        for (auto& stream : _streams) {
327
64
            stream->cancel(reason);
328
64
        }
329
32
    }
330
331
    Status close_load(const std::vector<PTabletID>& tablets_to_commit, int num_incremental_streams);
332
333
2.12k
    std::unordered_set<int64_t> success_tablets() {
334
2.12k
        std::unordered_set<int64_t> s;
335
4.09k
        for (auto& stream : _streams) {
336
4.09k
            auto v = stream->success_tablets();
337
4.09k
            std::copy(v.begin(), v.end(), std::inserter(s, s.end()));
338
4.09k
        }
339
2.12k
        return s;
340
2.12k
    }
341
342
2.12k
    std::unordered_map<int64_t, Status> failed_tablets() {
343
2.12k
        std::unordered_map<int64_t, Status> m;
344
4.09k
        for (auto& stream : _streams) {
345
4.09k
            auto v = stream->failed_tablets();
346
4.09k
            m.insert(v.begin(), v.end());
347
4.09k
        }
348
2.12k
        return m;
349
2.12k
    }
350
351
64.3k
    std::vector<std::shared_ptr<LoadStreamStub>> streams() { return _streams; }
352
353
private:
354
    std::vector<std::shared_ptr<LoadStreamStub>> _streams;
355
    std::atomic<bool> _open_success = false;
356
    std::atomic<size_t> _select_index = 0;
357
    const bool _is_incremental;
358
};
359
360
} // namespace doris