Coverage Report

Created: 2026-06-24 05:56

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/load_stream_stub.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
#include <brpc/controller.h>
20
#include <bthread/condition_variable.h>
21
#include <bthread/mutex.h>
22
#include <bthread/types.h>
23
#include <butil/errno.h>
24
#include <fmt/format.h>
25
#include <gen_cpp/PaloInternalService_types.h>
26
#include <gen_cpp/Types_types.h>
27
#include <gen_cpp/internal_service.pb.h>
28
#include <gen_cpp/types.pb.h>
29
#include <glog/logging.h>
30
#include <google/protobuf/stubs/callback.h>
31
#include <parallel_hashmap/phmap.h>
32
#include <stddef.h>
33
#include <stdint.h>
34
35
#include <atomic>
36
// IWYU pragma: no_include <bits/chrono.h>
37
#include <chrono> // IWYU pragma: keep
38
#include <functional>
39
#include <initializer_list>
40
#include <map>
41
#include <memory>
42
#include <mutex>
43
#include <ostream>
44
#include <queue>
45
#include <set>
46
#include <span>
47
#include <string>
48
#include <unordered_map>
49
#include <unordered_set>
50
#include <utility>
51
#include <vector>
52
53
#include "common/config.h"
54
#include "common/status.h"
55
#include "core/allocator.h"
56
#include "core/block/block.h"
57
#include "core/column/column.h"
58
#include "core/data_type/data_type.h"
59
#include "exprs/vexpr_fwd.h"
60
#include "runtime/exec_env.h"
61
#include "runtime/memory/mem_tracker.h"
62
#include "runtime/runtime_profile.h"
63
#include "runtime/thread_context.h"
64
#include "storage/tablet_info.h"
65
#include "util/countdown_latch.h"
66
#include "util/debug_points.h"
67
#include "util/stopwatch.hpp"
68
69
namespace doris {
70
class TabletSchema;
71
class LoadStreamStub;
72
73
struct SegmentStatistics;
74
75
using IndexToTabletSchema = phmap::parallel_flat_hash_map<
76
        int64_t, std::shared_ptr<TabletSchema>, std::hash<int64_t>, std::equal_to<int64_t>,
77
        std::allocator<phmap::Pair<const int64_t, std::shared_ptr<TabletSchema>>>, 4, std::mutex>;
78
79
using IndexToEnableMoW =
80
        phmap::parallel_flat_hash_map<int64_t, bool, std::hash<int64_t>, std::equal_to<int64_t>,
81
                                      std::allocator<phmap::Pair<const int64_t, bool>>, 4,
82
                                      std::mutex>;
83
84
class CloseWaitNotifier {
85
public:
86
    int64_t close_wait_version() const;
87
88
    void wait_for_close_event(int64_t observed_version, int64_t timeout_ms);
89
90
    void notify_close_wait();
91
92
private:
93
    std::atomic<int64_t> _close_wait_version {0};
94
    bthread::Mutex _close_wait_mutex;
95
    bthread::ConditionVariable _close_wait_cv;
96
};
97
98
class LoadStreamReplyHandler : public brpc::StreamInputHandler {
99
public:
100
    LoadStreamReplyHandler(PUniqueId load_id, int64_t dst_id, std::weak_ptr<LoadStreamStub> stub)
101
5.81k
            : _load_id(load_id), _dst_id(dst_id), _stub(stub) {}
102
103
    int on_received_messages(brpc::StreamId id, butil::IOBuf* const messages[],
104
                             size_t size) override;
105
106
0
    void on_idle_timeout(brpc::StreamId id) override {}
107
108
    void on_closed(brpc::StreamId id) override;
109
110
    friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamReplyHandler& handler);
111
112
private:
113
    PUniqueId _load_id;   // for logging
114
    int64_t _dst_id = -1; // for logging
115
    std::weak_ptr<LoadStreamStub> _stub;
116
};
117
118
class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {
119
    friend class LoadStreamReplyHandler;
120
121
public:
122
    // construct new stub
123
    LoadStreamStub(PUniqueId load_id, int64_t src_id,
124
                   std::shared_ptr<IndexToTabletSchema> schema_map,
125
                   std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false,
126
                   std::shared_ptr<CloseWaitNotifier> close_wait_notifier =
127
                           std::make_shared<CloseWaitNotifier>());
128
129
    LoadStreamStub(UniqueId load_id, int64_t src_id,
130
                   std::shared_ptr<IndexToTabletSchema> schema_map,
131
                   std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false,
132
                   std::shared_ptr<CloseWaitNotifier> close_wait_notifier =
133
                           std::make_shared<CloseWaitNotifier>())
134
5.87k
            : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map, incremental,
135
5.87k
                             std::move(close_wait_notifier)) {};
136
137
// for mock this class in UT
138
#ifdef BE_TEST
139
    virtual
140
#endif
141
            ~LoadStreamStub();
142
143
    // open_load_stream
144
    Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const NodeInfo& node_info,
145
                int64_t txn_id, const OlapTableSchemaParam& schema,
146
                const std::vector<PTabletID>& tablets_for_schema, int total_streams,
147
                int64_t idle_timeout_ms, bool enable_profile);
148
149
// for mock this class in UT
150
#ifdef BE_TEST
151
    virtual
152
#endif
153
            // segment_id is limited by max_segment_num_per_rowset (default value of 1000),
154
            // so in practice it will not exceed the range of i16.
155
156
            // APPEND_DATA
157
            Status
158
            append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id,
159
                        int32_t segment_id, uint64_t offset, std::span<const Slice> data,
160
                        bool segment_eos = false, FileType file_type = FileType::SEGMENT_FILE);
161
162
    // ADD_SEGMENT
163
    Status add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id,
164
                       int32_t segment_id, const SegmentStatistics& segment_stat);
165
166
    // CLOSE_LOAD
167
    Status close_load(const std::vector<PTabletID>& tablets_to_commit, int num_incremental_streams);
168
169
    // GET_SCHEMA
170
    Status get_schema(const std::vector<PTabletID>& tablets);
171
172
    // wait remote to close stream,
173
    // remote will close stream when it receives CLOSE_LOAD
174
    Status close_finish_check(RuntimeState* state, bool* is_closed);
175
176
    // cancel the stream, abort close_wait, mark _is_closed and _is_cancelled
177
    void cancel(Status reason);
178
179
    Status wait_for_schema(int64_t partition_id, int64_t index_id, int64_t tablet_id,
180
                           int64_t timeout_ms = 60000);
181
182
4
    Status wait_for_new_schema(int64_t timeout_ms) {
183
4
        std::unique_lock<bthread::Mutex> lock(_schema_mutex);
184
4
        if (timeout_ms > 0) {
185
4
            int ret = _schema_cv.wait_for(lock, timeout_ms * 1000);
186
4
            return ret == 0 ? Status::OK() : Status::Error<true>(ret, "wait schema update timeout");
187
4
        }
188
0
        _schema_cv.wait(lock);
189
0
        return Status::OK();
190
4
    };
191
192
13.2k
    std::shared_ptr<TabletSchema> tablet_schema(int64_t index_id) const {
193
13.2k
        return (*_tablet_schema_for_index)[index_id];
194
13.2k
    }
195
196
13.2k
    bool enable_unique_mow(int64_t index_id) const {
197
13.2k
        return _enable_unique_mow_for_index->at(index_id);
198
13.2k
    }
199
200
5.82k
    std::vector<int64_t> success_tablets() {
201
5.82k
        std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
202
5.82k
        return _success_tablets;
203
5.82k
    }
204
205
5.82k
    std::unordered_map<int64_t, Status> failed_tablets() {
206
5.82k
        std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
207
5.82k
        return _failed_tablets;
208
5.82k
    }
209
210
0
    brpc::StreamId stream_id() const { return _stream_id; }
211
212
0
    int64_t src_id() const { return _src_id; }
213
214
0
    int64_t dst_id() const { return _dst_id; }
215
216
0
    bool is_open() const { return _is_open.load(); }
217
218
18.4k
    bool is_incremental() const { return _is_incremental; }
219
220
    friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& stub);
221
222
    std::string to_string();
223
224
    // for tests only
225
    void add_success_tablet(int64_t tablet_id) {
226
        std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
227
        _success_tablets.push_back(tablet_id);
228
    }
229
230
13
    void add_failed_tablet(int64_t tablet_id, Status reason) {
231
13
        std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
232
13
        _failed_tablets[tablet_id] = reason;
233
13
    }
234
235
175k
    void add_bytes_written(size_t bytes) {
236
175k
        std::lock_guard<bthread::Mutex> lock(_write_mutex);
237
175k
        _bytes_written += bytes;
238
175k
    }
239
240
0
    int64_t bytes_written() {
241
0
        std::lock_guard<bthread::Mutex> lock(_write_mutex);
242
0
        return _bytes_written;
243
0
    }
244
245
56.2k
    Status check_cancel() {
246
56.2k
        DBUG_EXECUTE_IF("LoadStreamStub._check_cancel.cancelled",
247
56.2k
                        { return Status::InternalError("stream cancelled"); });
248
56.3k
        if (!_is_cancelled.load()) {
249
56.3k
            return Status::OK();
250
56.3k
        }
251
18.4E
        std::lock_guard<bthread::Mutex> lock(_cancel_mutex);
252
18.4E
        return Status::Cancelled("load_id={}, reason: {}", print_id(_load_id),
253
18.4E
                                 _cancel_st.to_string_no_stack());
254
56.2k
    }
255
256
6.76k
    int64_t get_and_reset_load_back_pressure_version_wait_time_ms() {
257
6.76k
        return _load_back_pressure_version_wait_time_ms.exchange(0);
258
6.76k
    }
259
260
    void _refresh_back_pressure_version_wait_time(
261
            const ::google::protobuf::RepeatedPtrField<::doris::PTabletLoadRowsetInfo>&
262
                    tablet_load_infos);
263
264
private:
265
    void notify_close_wait();
266
267
    Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data = {});
268
    Status _send_with_buffer(butil::IOBuf& buf, bool sync = false);
269
    Status _send_with_retry(butil::IOBuf& buf);
270
    void _handle_failure(butil::IOBuf& buf, Status st);
271
272
protected:
273
    std::atomic<bool> _is_init;
274
    std::atomic<bool> _is_open;
275
    std::atomic<bool> _is_closing;
276
    std::atomic<bool> _is_closed;
277
    std::atomic<bool> _is_cancelled;
278
    std::atomic<bool> _is_eos;
279
280
    PUniqueId _load_id;
281
    brpc::StreamId _stream_id;
282
    int64_t _src_id = -1; // source backend_id
283
    int64_t _dst_id = -1; // destination backend_id
284
    Status _status = Status::InternalError<false>("Stream is not open");
285
    Status _cancel_st;
286
287
    bthread::Mutex _open_mutex;
288
    bthread::Mutex _cancel_mutex;
289
290
    std::mutex _buffer_mutex;
291
    std::mutex _send_mutex;
292
    butil::IOBuf _buffer;
293
294
    bthread::Mutex _schema_mutex;
295
    bthread::ConditionVariable _schema_cv;
296
    std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index;
297
    std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;
298
299
    bthread::Mutex _success_tablets_mutex;
300
    bthread::Mutex _failed_tablets_mutex;
301
    std::vector<int64_t> _success_tablets;
302
    std::unordered_map<int64_t, Status> _failed_tablets;
303
304
    bool _is_incremental = false;
305
    std::shared_ptr<CloseWaitNotifier> _close_wait_notifier;
306
307
    bthread::Mutex _write_mutex;
308
    size_t _bytes_written = 0;
309
310
    std::atomic<int64_t> _load_back_pressure_version_wait_time_ms {0};
311
};
312
313
// a collection of LoadStreams connect to the same node
314
class LoadStreamStubs {
315
public:
316
    LoadStreamStubs(size_t num_streams, UniqueId load_id, int64_t src_id,
317
                    std::shared_ptr<IndexToTabletSchema> schema_map,
318
                    std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false,
319
                    std::shared_ptr<CloseWaitNotifier> close_wait_notifier =
320
                            std::make_shared<CloseWaitNotifier>())
321
3.00k
            : _is_incremental(incremental) {
322
3.00k
        _streams.reserve(num_streams);
323
8.87k
        for (size_t i = 0; i < num_streams; i++) {
324
5.86k
            _streams.emplace_back(new LoadStreamStub(load_id, src_id, schema_map, mow_map,
325
5.86k
                                                     incremental, close_wait_notifier));
326
5.86k
        }
327
3.00k
    }
328
329
    Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const NodeInfo& node_info,
330
                int64_t txn_id, const OlapTableSchemaParam& schema,
331
                const std::vector<PTabletID>& tablets_for_schema, int total_streams,
332
                int64_t idle_timeout_ms, bool enable_profile);
333
334
5.92k
    bool is_incremental() const { return _is_incremental; }
335
336
    size_t size() const { return _streams.size(); }
337
338
    // for UT only
339
    void mark_open() { _open_success.store(true); }
340
341
6.68k
    std::shared_ptr<LoadStreamStub> select_one_stream() {
342
6.68k
        if (!_open_success.load()) {
343
0
            return nullptr;
344
0
        }
345
6.68k
        size_t i = _select_index.fetch_add(1);
346
6.68k
        return _streams[i % _streams.size()];
347
6.68k
    }
348
349
81
    void cancel(Status reason) {
350
161
        for (auto& stream : _streams) {
351
161
            stream->cancel(reason);
352
161
        }
353
81
    }
354
355
    Status close_load(const std::vector<PTabletID>& tablets_to_commit, int num_incremental_streams);
356
357
2.99k
    std::unordered_set<int64_t> success_tablets() {
358
2.99k
        std::unordered_set<int64_t> s;
359
5.82k
        for (auto& stream : _streams) {
360
5.82k
            auto v = stream->success_tablets();
361
5.82k
            std::copy(v.begin(), v.end(), std::inserter(s, s.end()));
362
5.82k
        }
363
2.99k
        return s;
364
2.99k
    }
365
366
2.99k
    std::unordered_map<int64_t, Status> failed_tablets() {
367
2.99k
        std::unordered_map<int64_t, Status> m;
368
5.82k
        for (auto& stream : _streams) {
369
5.82k
            auto v = stream->failed_tablets();
370
5.82k
            m.insert(v.begin(), v.end());
371
5.82k
        }
372
2.99k
        return m;
373
2.99k
    }
374
375
71.3k
    std::vector<std::shared_ptr<LoadStreamStub>> streams() { return _streams; }
376
377
private:
378
    std::vector<std::shared_ptr<LoadStreamStub>> _streams;
379
    std::atomic<bool> _open_success = false;
380
    std::atomic<size_t> _select_index = 0;
381
    const bool _is_incremental;
382
};
383
384
} // namespace doris