Coverage Report

Created: 2026-03-12 14:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/load_stream_stub.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
#include <brpc/controller.h>
20
#include <bthread/condition_variable.h>
21
#include <bthread/mutex.h>
22
#include <bthread/types.h>
23
#include <butil/errno.h>
24
#include <fmt/format.h>
25
#include <gen_cpp/PaloInternalService_types.h>
26
#include <gen_cpp/Types_types.h>
27
#include <gen_cpp/internal_service.pb.h>
28
#include <gen_cpp/types.pb.h>
29
#include <glog/logging.h>
30
#include <google/protobuf/stubs/callback.h>
31
#include <parallel_hashmap/phmap.h>
32
#include <stddef.h>
33
#include <stdint.h>
34
35
#include <atomic>
36
// IWYU pragma: no_include <bits/chrono.h>
37
#include <chrono> // IWYU pragma: keep
38
#include <functional>
39
#include <initializer_list>
40
#include <map>
41
#include <memory>
42
#include <mutex>
43
#include <ostream>
44
#include <queue>
45
#include <set>
46
#include <span>
47
#include <string>
48
#include <unordered_map>
49
#include <unordered_set>
50
#include <utility>
51
#include <vector>
52
53
#include "common/config.h"
54
#include "common/status.h"
55
#include "core/allocator.h"
56
#include "core/block/block.h"
57
#include "core/column/column.h"
58
#include "core/data_type/data_type.h"
59
#include "exprs/vexpr_fwd.h"
60
#include "runtime/exec_env.h"
61
#include "runtime/memory/mem_tracker.h"
62
#include "runtime/runtime_profile.h"
63
#include "runtime/thread_context.h"
64
#include "storage/tablet_info.h"
65
#include "util/countdown_latch.h"
66
#include "util/debug_points.h"
67
#include "util/stopwatch.hpp"
68
69
namespace doris {
70
#include "common/compile_check_begin.h"
71
class TabletSchema;
72
class LoadStreamStub;
73
74
struct SegmentStatistics;
75
76
using IndexToTabletSchema = phmap::parallel_flat_hash_map<
77
        int64_t, std::shared_ptr<TabletSchema>, std::hash<int64_t>, std::equal_to<int64_t>,
78
        std::allocator<phmap::Pair<const int64_t, std::shared_ptr<TabletSchema>>>, 4, std::mutex>;
79
80
using IndexToEnableMoW =
81
        phmap::parallel_flat_hash_map<int64_t, bool, std::hash<int64_t>, std::equal_to<int64_t>,
82
                                      std::allocator<phmap::Pair<const int64_t, bool>>, 4,
83
                                      std::mutex>;
84
85
class LoadStreamReplyHandler : public brpc::StreamInputHandler {
86
public:
87
    LoadStreamReplyHandler(PUniqueId load_id, int64_t dst_id, std::weak_ptr<LoadStreamStub> stub)
88
24
            : _load_id(load_id), _dst_id(dst_id), _stub(stub) {}
89
90
    int on_received_messages(brpc::StreamId id, butil::IOBuf* const messages[],
91
                             size_t size) override;
92
93
0
    void on_idle_timeout(brpc::StreamId id) override {}
94
95
    void on_closed(brpc::StreamId id) override;
96
97
    friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamReplyHandler& handler);
98
99
private:
100
    PUniqueId _load_id;   // for logging
101
    int64_t _dst_id = -1; // for logging
102
    std::weak_ptr<LoadStreamStub> _stub;
103
};
104
105
class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {
106
    friend class LoadStreamReplyHandler;
107
108
public:
109
    // construct new stub
110
    LoadStreamStub(PUniqueId load_id, int64_t src_id,
111
                   std::shared_ptr<IndexToTabletSchema> schema_map,
112
                   std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false);
113
114
    LoadStreamStub(UniqueId load_id, int64_t src_id,
115
                   std::shared_ptr<IndexToTabletSchema> schema_map,
116
                   std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false)
117
75
            : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map, incremental) {};
118
119
// for mock this class in UT
120
#ifdef BE_TEST
121
    virtual
122
#endif
123
            ~LoadStreamStub();
124
125
    // open_load_stream
126
    Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const NodeInfo& node_info,
127
                int64_t txn_id, const OlapTableSchemaParam& schema,
128
                const std::vector<PTabletID>& tablets_for_schema, int total_streams,
129
                int64_t idle_timeout_ms, bool enable_profile);
130
131
// for mock this class in UT
132
#ifdef BE_TEST
133
    virtual
134
#endif
135
            // segment_id is limited by max_segment_num_per_rowset (default value of 1000),
136
            // so in practice it will not exceed the range of i16.
137
138
            // APPEND_DATA
139
            Status
140
            append_data(int64_t partition_id, int64_t index_id, int64_t tablet_id,
141
                        int32_t segment_id, uint64_t offset, std::span<const Slice> data,
142
                        bool segment_eos = false, FileType file_type = FileType::SEGMENT_FILE);
143
144
    // ADD_SEGMENT
145
    Status add_segment(int64_t partition_id, int64_t index_id, int64_t tablet_id,
146
                       int32_t segment_id, const SegmentStatistics& segment_stat);
147
148
    // CLOSE_LOAD
149
    Status close_load(const std::vector<PTabletID>& tablets_to_commit, int num_incremental_streams);
150
151
    // GET_SCHEMA
152
    Status get_schema(const std::vector<PTabletID>& tablets);
153
154
    // wait remote to close stream,
155
    // remote will close stream when it receives CLOSE_LOAD
156
    Status close_finish_check(RuntimeState* state, bool* is_closed);
157
158
    // cancel the stream, abort close_wait, mark _is_closed and _is_cancelled
159
    void cancel(Status reason);
160
161
    Status wait_for_schema(int64_t partition_id, int64_t index_id, int64_t tablet_id,
162
                           int64_t timeout_ms = 60000);
163
164
0
    Status wait_for_new_schema(int64_t timeout_ms) {
165
0
        std::unique_lock<bthread::Mutex> lock(_schema_mutex);
166
0
        if (timeout_ms > 0) {
167
0
            int ret = _schema_cv.wait_for(lock, timeout_ms * 1000);
168
0
            return ret == 0 ? Status::OK() : Status::Error<true>(ret, "wait schema update timeout");
169
0
        }
170
0
        _schema_cv.wait(lock);
171
0
        return Status::OK();
172
0
    };
173
174
112
    std::shared_ptr<TabletSchema> tablet_schema(int64_t index_id) const {
175
112
        return (*_tablet_schema_for_index)[index_id];
176
112
    }
177
178
112
    bool enable_unique_mow(int64_t index_id) const {
179
112
        return _enable_unique_mow_for_index->at(index_id);
180
112
    }
181
182
57
    std::vector<int64_t> success_tablets() {
183
57
        std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
184
57
        return _success_tablets;
185
57
    }
186
187
57
    std::unordered_map<int64_t, Status> failed_tablets() {
188
57
        std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
189
57
        return _failed_tablets;
190
57
    }
191
192
0
    brpc::StreamId stream_id() const { return _stream_id; }
193
194
0
    int64_t src_id() const { return _src_id; }
195
196
0
    int64_t dst_id() const { return _dst_id; }
197
198
0
    bool is_open() const { return _is_open.load(); }
199
200
24
    bool is_incremental() const { return _is_incremental; }
201
202
    friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& stub);
203
204
    std::string to_string();
205
206
    // for tests only
207
    void add_success_tablet(int64_t tablet_id) {
208
        std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
209
        _success_tablets.push_back(tablet_id);
210
    }
211
212
13
    void add_failed_tablet(int64_t tablet_id, Status reason) {
213
13
        std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
214
13
        _failed_tablets[tablet_id] = reason;
215
13
    }
216
217
3.71k
    void add_bytes_written(size_t bytes) {
218
3.71k
        std::lock_guard<bthread::Mutex> lock(_write_mutex);
219
3.71k
        _bytes_written += bytes;
220
3.71k
    }
221
222
0
    int64_t bytes_written() {
223
0
        std::lock_guard<bthread::Mutex> lock(_write_mutex);
224
0
        return _bytes_written;
225
0
    }
226
227
124
    Status check_cancel() {
228
124
        DBUG_EXECUTE_IF("LoadStreamStub._check_cancel.cancelled",
229
124
                        { return Status::InternalError("stream cancelled"); });
230
124
        if (!_is_cancelled.load()) {
231
124
            return Status::OK();
232
124
        }
233
0
        std::lock_guard<bthread::Mutex> lock(_cancel_mutex);
234
0
        return Status::Cancelled("load_id={}, reason: {}", print_id(_load_id),
235
0
                                 _cancel_st.to_string_no_stack());
236
124
    }
237
238
24
    int64_t get_and_reset_load_back_pressure_version_wait_time_ms() {
239
24
        return _load_back_pressure_version_wait_time_ms.exchange(0);
240
24
    }
241
242
    void _refresh_back_pressure_version_wait_time(
243
            const ::google::protobuf::RepeatedPtrField<::doris::PTabletLoadRowsetInfo>&
244
                    tablet_load_infos);
245
246
private:
247
    Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data = {});
248
    Status _send_with_buffer(butil::IOBuf& buf, bool sync = false);
249
    Status _send_with_retry(butil::IOBuf& buf);
250
    void _handle_failure(butil::IOBuf& buf, Status st);
251
252
protected:
253
    std::atomic<bool> _is_init;
254
    std::atomic<bool> _is_open;
255
    std::atomic<bool> _is_closing;
256
    std::atomic<bool> _is_closed;
257
    std::atomic<bool> _is_cancelled;
258
    std::atomic<bool> _is_eos;
259
260
    PUniqueId _load_id;
261
    brpc::StreamId _stream_id;
262
    int64_t _src_id = -1; // source backend_id
263
    int64_t _dst_id = -1; // destination backend_id
264
    Status _status = Status::InternalError<false>("Stream is not open");
265
    Status _cancel_st;
266
267
    bthread::Mutex _open_mutex;
268
    bthread::Mutex _cancel_mutex;
269
270
    std::mutex _buffer_mutex;
271
    std::mutex _send_mutex;
272
    butil::IOBuf _buffer;
273
274
    bthread::Mutex _schema_mutex;
275
    bthread::ConditionVariable _schema_cv;
276
    std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index;
277
    std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;
278
279
    bthread::Mutex _success_tablets_mutex;
280
    bthread::Mutex _failed_tablets_mutex;
281
    std::vector<int64_t> _success_tablets;
282
    std::unordered_map<int64_t, Status> _failed_tablets;
283
284
    bool _is_incremental = false;
285
286
    bthread::Mutex _write_mutex;
287
    size_t _bytes_written = 0;
288
289
    std::atomic<int64_t> _load_back_pressure_version_wait_time_ms {0};
290
};
291
292
// a collection of LoadStreams connect to the same node
293
class LoadStreamStubs {
294
public:
295
    LoadStreamStubs(size_t num_streams, UniqueId load_id, int64_t src_id,
296
                    std::shared_ptr<IndexToTabletSchema> schema_map,
297
                    std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false)
298
48
            : _is_incremental(incremental) {
299
48
        _streams.reserve(num_streams);
300
123
        for (size_t i = 0; i < num_streams; i++) {
301
75
            _streams.emplace_back(
302
75
                    new LoadStreamStub(load_id, src_id, schema_map, mow_map, incremental));
303
75
        }
304
48
    }
305
306
    Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const NodeInfo& node_info,
307
                int64_t txn_id, const OlapTableSchemaParam& schema,
308
                const std::vector<PTabletID>& tablets_for_schema, int total_streams,
309
                int64_t idle_timeout_ms, bool enable_profile);
310
311
24
    bool is_incremental() const { return _is_incremental; }
312
313
    size_t size() const { return _streams.size(); }
314
315
    // for UT only
316
    void mark_open() { _open_success.store(true); }
317
318
120
    std::shared_ptr<LoadStreamStub> select_one_stream() {
319
120
        if (!_open_success.load()) {
320
0
            return nullptr;
321
0
        }
322
120
        size_t i = _select_index.fetch_add(1);
323
120
        return _streams[i % _streams.size()];
324
120
    }
325
326
0
    void cancel(Status reason) {
327
0
        for (auto& stream : _streams) {
328
0
            stream->cancel(reason);
329
0
        }
330
0
    }
331
332
    Status close_load(const std::vector<PTabletID>& tablets_to_commit, int num_incremental_streams);
333
334
45
    std::unordered_set<int64_t> success_tablets() {
335
45
        std::unordered_set<int64_t> s;
336
57
        for (auto& stream : _streams) {
337
57
            auto v = stream->success_tablets();
338
57
            std::copy(v.begin(), v.end(), std::inserter(s, s.end()));
339
57
        }
340
45
        return s;
341
45
    }
342
343
45
    std::unordered_map<int64_t, Status> failed_tablets() {
344
45
        std::unordered_map<int64_t, Status> m;
345
57
        for (auto& stream : _streams) {
346
57
            auto v = stream->failed_tablets();
347
57
            m.insert(v.begin(), v.end());
348
57
        }
349
45
        return m;
350
45
    }
351
352
152
    std::vector<std::shared_ptr<LoadStreamStub>> streams() { return _streams; }
353
354
private:
355
    std::vector<std::shared_ptr<LoadStreamStub>> _streams;
356
    std::atomic<bool> _open_success = false;
357
    std::atomic<size_t> _select_index = 0;
358
    const bool _is_incremental;
359
};
360
361
} // namespace doris
362
363
#include "common/compile_check_end.h"