Coverage Report

Created: 2026-06-29 23:23

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/writer/vtablet_writer.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
#include <brpc/controller.h>
20
#include <bthread/types.h>
21
#include <butil/errno.h>
22
#include <fmt/format.h>
23
#include <gen_cpp/Exprs_types.h>
24
#include <gen_cpp/FrontendService.h>
25
#include <gen_cpp/FrontendService_types.h>
26
#include <gen_cpp/PaloInternalService_types.h>
27
#include <gen_cpp/Types_types.h>
28
#include <gen_cpp/internal_service.pb.h>
29
#include <gen_cpp/types.pb.h>
30
#include <glog/logging.h>
31
#include <google/protobuf/stubs/callback.h>
32
33
// IWYU pragma: no_include <bits/chrono.h>
34
#include <bthread/condition_variable.h>
35
#include <bthread/mutex.h>
36
37
#include <atomic>
38
#include <chrono> // IWYU pragma: keep
39
#include <cstddef>
40
#include <cstdint>
41
#include <functional>
42
#include <map>
43
#include <memory>
44
#include <mutex>
45
#include <ostream>
46
#include <queue>
47
#include <sstream>
48
#include <string>
49
#include <thread>
50
#include <unordered_map>
51
#include <unordered_set>
52
#include <utility>
53
#include <vector>
54
55
#include "common/config.h"
56
#include "common/status.h"
57
#include "core/block/block.h"
58
#include "core/column/column.h"
59
#include "core/data_type/data_type.h"
60
#include "exec/sink/vrow_distribution.h"
61
#include "exec/sink/vtablet_block_convertor.h"
62
#include "exec/sink/vtablet_finder.h"
63
#include "exec/sink/writer/async_result_writer.h"
64
#include "exprs/vexpr_fwd.h"
65
#include "runtime/exec_env.h"
66
#include "runtime/memory/mem_tracker.h"
67
#include "runtime/runtime_profile.h"
68
#include "runtime/thread_context.h"
69
#include "storage/tablet_info.h"
70
#include "util/brpc_closure.h"
71
#include "util/stopwatch.hpp"
72
73
namespace doris {
74
class ObjectPool;
75
class RowDescriptor;
76
class RuntimeState;
77
class TDataSink;
78
class TExpr;
79
class Thread;
80
class ThreadPoolToken;
81
class TupleDescriptor;
82
83
// The counter of add_batch rpc of a single node
84
struct AddBatchCounter {
85
    // total execution time of a add_batch rpc
86
    int64_t add_batch_execution_time_us = 0;
87
    // lock waiting time in a add_batch rpc
88
    int64_t add_batch_wait_execution_time_us = 0;
89
    // number of add_batch call
90
    int64_t add_batch_num = 0;
91
    // time passed between marked close and finish close
92
    int64_t close_wait_time_ms = 0;
93
94
0
    AddBatchCounter& operator+=(const AddBatchCounter& rhs) {
95
0
        add_batch_execution_time_us += rhs.add_batch_execution_time_us;
96
0
        add_batch_wait_execution_time_us += rhs.add_batch_wait_execution_time_us;
97
0
        add_batch_num += rhs.add_batch_num;
98
0
        close_wait_time_ms += rhs.close_wait_time_ms;
99
0
        return *this;
100
0
    }
101
0
    friend AddBatchCounter operator+(const AddBatchCounter& lhs, const AddBatchCounter& rhs) {
102
0
        AddBatchCounter sum = lhs;
103
0
        sum += rhs;
104
0
        return sum;
105
0
    }
106
};
107
108
struct WriteBlockCallbackContext {
109
    std::atomic<bool> _is_last_rpc {false};
110
};
111
112
// It's very error-prone to guarantee the handler capture vars' & this closure's destruct sequence.
113
// So using create() to get the closure pointer is recommended. We can delete the closure ptr before the capture vars destruction.
114
// Delete this point is safe, don't worry about RPC callback will run after WriteBlockCallback deleted.
115
// "Ping-Pong" between sender and receiver, `try_set_in_flight` when send, `clear_in_flight` after rpc failure or callback,
116
// then next send will start, and it will wait for the rpc callback to complete when it is destroyed.
117
template <typename T>
118
class WriteBlockCallback final : public ::doris::DummyBrpcCallback<T> {
119
    ENABLE_FACTORY_CREATOR(WriteBlockCallback);
120
121
public:
122
0
    WriteBlockCallback() : cid(INVALID_BTHREAD_ID) {}
123
0
    ~WriteBlockCallback() override = default;
124
125
0
    void addFailedHandler(const std::function<void(const WriteBlockCallbackContext&)>& fn) {
126
0
        failed_handler = fn;
127
0
    }
128
    void addSuccessHandler(
129
0
            const std::function<void(const T&, const WriteBlockCallbackContext&)>& fn) {
130
0
        success_handler = fn;
131
0
    }
132
133
0
    void join() override {
134
        // We rely on in_flight to assure one rpc is running,
135
        // while cid is not reliable due to memory order.
136
        // in_flight is written before getting callid,
137
        // so we can not use memory fence to synchronize.
138
0
        while (_packet_in_flight) {
139
            // cid here is complicated
140
0
            if (cid != INVALID_BTHREAD_ID) {
141
                // actually cid may be the last rpc call id.
142
0
                brpc::Join(cid);
143
0
            }
144
0
            if (_packet_in_flight) {
145
0
                std::this_thread::sleep_for(std::chrono::milliseconds(10));
146
0
            }
147
0
        }
148
0
    }
149
150
    // plz follow this order: reset() -> set_in_flight() -> send brpc batch
151
0
    void reset() {
152
0
        ::doris::DummyBrpcCallback<T>::cntl_->Reset();
153
0
        cid = ::doris::DummyBrpcCallback<T>::cntl_->call_id();
154
0
    }
155
156
    // if _packet_in_flight == false, set it to true. Return true.
157
    // if _packet_in_flight == true, Return false.
158
0
    bool try_set_in_flight() {
159
0
        bool value = false;
160
0
        return _packet_in_flight.compare_exchange_strong(value, true);
161
0
    }
162
163
0
    void clear_in_flight() { _packet_in_flight = false; }
164
165
    bool is_packet_in_flight() { return _packet_in_flight; }
166
167
0
    void end_mark() {
168
0
        DCHECK(_ctx._is_last_rpc == false);
169
0
        _ctx._is_last_rpc = true;
170
0
    }
171
172
0
    void call() override {
173
0
        DCHECK(_packet_in_flight);
174
0
        if (::doris::DummyBrpcCallback<T>::cntl_->Failed()) {
175
0
            LOG(WARNING) << "failed to send brpc batch, error="
176
0
                         << berror(::doris::DummyBrpcCallback<T>::cntl_->ErrorCode())
177
0
                         << ", error_text=" << ::doris::DummyBrpcCallback<T>::cntl_->ErrorText();
178
0
            failed_handler(_ctx);
179
0
        } else {
180
0
            success_handler(*(::doris::DummyBrpcCallback<T>::response_), _ctx);
181
0
        }
182
0
        clear_in_flight();
183
0
    }
184
185
private:
186
    brpc::CallId cid;
187
    std::atomic<bool> _packet_in_flight {false};
188
    WriteBlockCallbackContext _ctx;
189
    std::function<void(const WriteBlockCallbackContext&)> failed_handler;
190
    std::function<void(const T&, const WriteBlockCallbackContext&)> success_handler;
191
};
192
193
class IndexChannel;
194
class VTabletWriter;
195
196
class VNodeChannelStat {
197
public:
198
0
    VNodeChannelStat& operator+=(const VNodeChannelStat& stat) {
199
0
        mem_exceeded_block_ns += stat.mem_exceeded_block_ns;
200
0
        where_clause_ns += stat.where_clause_ns;
201
0
        append_node_channel_ns += stat.append_node_channel_ns;
202
0
        return *this;
203
0
    };
204
205
    int64_t mem_exceeded_block_ns = 0;
206
    int64_t where_clause_ns = 0;
207
    int64_t append_node_channel_ns = 0;
208
};
209
210
struct WriterStats {
211
    int64_t serialize_batch_ns = 0;
212
    int64_t queue_push_lock_ns = 0;
213
    int64_t actual_consume_ns = 0;
214
    int64_t total_add_batch_exec_time_ns = 0;
215
    int64_t max_add_batch_exec_time_ns = 0;
216
    int64_t total_wait_exec_time_ns = 0;
217
    int64_t max_wait_exec_time_ns = 0;
218
    int64_t total_add_batch_num = 0;
219
    int64_t num_node_channels = 0;
220
    int64_t load_back_pressure_version_time_ms = 0;
221
    VNodeChannelStat channel_stat;
222
};
223
224
struct Payload {
225
    std::unique_ptr<IColumn::Selector> row_ids;
226
    RowPartTabletIds* row_part_tablet_ids = nullptr;
227
    std::vector<uint32_t> route_idxs;
228
};
229
230
// every NodeChannel keeps a data transmission channel with one BE. for multiple times open, it has a dozen of requests and corresponding closures.
231
class VNodeChannel {
232
public:
233
    VNodeChannel(VTabletWriter* parent, IndexChannel* index_channel, int64_t node_id,
234
                 bool is_incremental = false);
235
236
    ~VNodeChannel();
237
238
    // called before open, used to add tablet located in this backend. called by IndexChannel::init
239
0
    void add_tablet(const TTabletWithPartition& tablet) { _tablets_wait_open.emplace_back(tablet); }
240
0
    std::string debug_tablets() const {
241
0
        std::stringstream ss;
242
0
        for (const auto& tab : _all_tablets) {
243
0
            tab.printTo(ss);
244
0
            ss << '\n';
245
0
        }
246
0
        return ss.str();
247
0
    }
248
249
0
    void add_slave_tablet_nodes(int64_t tablet_id, const std::vector<int64_t>& slave_nodes) {
250
0
        _slave_tablet_nodes[tablet_id] = slave_nodes;
251
0
    }
252
253
    // this function is NON_REENTRANT
254
    Status init(RuntimeState* state);
255
    /// these two functions will call open_internal. should keep that clear --- REENTRANT
256
    // build corresponding connect to BE. NON-REENTRANT
257
    void open();
258
    // for auto partition, we use this to open more tablet. KEEP IT REENTRANT
259
    void incremental_open();
260
    // this will block until all request transmission which were opened or incremental opened finished.
261
    // this function will called multi times. NON_REENTRANT
262
    Status open_wait();
263
264
    Status add_block(Block* block, const Payload* payload);
265
266
    // @return: 1 if running, 0 if finished.
267
    // @caller: VOlapTabletSink::_send_batch_process. it's a continual asynchronous process.
268
    int try_send_and_fetch_status(RuntimeState* state,
269
                                  std::unique_ptr<ThreadPoolToken>& thread_pool_token);
270
    // when there's pending block found by try_send_and_fetch_status(), we will awake a thread to send it.
271
    void try_send_pending_block(RuntimeState* state);
272
273
    void clear_all_blocks();
274
275
    // two ways to stop channel:
276
    // 1. mark_close()->close_wait() PS. close_wait() will block waiting for the last AddBatch rpc response.
277
    // 2. just cancel()
278
    // hang_wait = true will make reciever hang until all sender mark_closed.
279
    void mark_close(bool hang_wait = false);
280
281
0
    bool is_closed() const { return _is_closed; }
282
0
    bool is_cancelled() const { return _cancelled; }
283
0
    std::string get_cancel_msg() {
284
0
        std::lock_guard<std::mutex> l(_cancel_msg_lock);
285
0
        if (!_cancel_msg.empty()) {
286
0
            return _cancel_msg;
287
0
        }
288
0
        return fmt::format("{} is cancelled", channel_info());
289
0
    }
290
291
    // two ways to stop channel:
292
    // 1. mark_close()->close_wait() PS. close_wait() will block waiting for the last AddBatch rpc response.
293
    // 2. just cancel()
294
    Status close_wait(RuntimeState* state, bool* is_closed);
295
296
    Status after_close_handle(
297
            RuntimeState* state, WriterStats* writer_stats,
298
            std::unordered_map<int64_t, AddBatchCounter>* node_add_batch_counter_map);
299
300
    Status check_status();
301
302
    void cancel(const std::string& cancel_msg);
303
304
    void time_report(std::unordered_map<int64_t, AddBatchCounter>* add_batch_counter_map,
305
0
                     WriterStats* writer_stats) const {
306
0
        if (add_batch_counter_map != nullptr) {
307
0
            (*add_batch_counter_map)[_node_id] += _add_batch_counter;
308
0
            (*add_batch_counter_map)[_node_id].close_wait_time_ms = _close_time_ms;
309
0
        }
310
0
        if (writer_stats != nullptr) {
311
0
            writer_stats->serialize_batch_ns += _serialize_batch_ns;
312
0
            writer_stats->channel_stat += _stat;
313
0
            writer_stats->queue_push_lock_ns += _queue_push_lock_ns;
314
0
            writer_stats->actual_consume_ns += _actual_consume_ns;
315
0
            writer_stats->total_add_batch_exec_time_ns +=
316
0
                    (_add_batch_counter.add_batch_execution_time_us * 1000);
317
0
            writer_stats->total_wait_exec_time_ns +=
318
0
                    (_add_batch_counter.add_batch_wait_execution_time_us * 1000);
319
0
            writer_stats->total_add_batch_num += _add_batch_counter.add_batch_num;
320
0
            writer_stats->load_back_pressure_version_time_ms +=
321
0
                    _load_back_pressure_version_block_ms;
322
0
        }
323
0
    }
324
325
0
    int64_t node_id() const { return _node_id; }
326
0
    std::string host() const { return _node_info.host; }
327
0
    std::string name() const { return _name; }
328
329
0
    std::string channel_info() const {
330
0
        return fmt::format("{}, {}, node={}:{}", _name, _load_info, _node_info.host,
331
0
                           _node_info.brpc_port);
332
0
    }
333
334
0
    size_t get_pending_bytes() { return _pending_batches_bytes; }
335
336
0
    bool is_incremental() const { return _is_incremental; }
337
338
0
    int64_t write_bytes() const { return _write_bytes.load(); }
339
340
protected:
341
    // make a real open request for relative BE's load channel.
342
    void _open_internal(bool is_incremental);
343
    void _set_adaptive_random_bucket_open_request(PTabletWriterOpenRequest* request);
344
345
    void _close_check();
346
    void _cancel_with_msg(const std::string& msg);
347
348
    void _add_block_success_callback(const PTabletWriterAddBlockResult& result,
349
                                     const WriteBlockCallbackContext& ctx);
350
    void _add_block_failed_callback(const WriteBlockCallbackContext& ctx);
351
352
    void _refresh_back_pressure_version_wait_time(
353
            const ::google::protobuf::RepeatedPtrField<::doris::PTabletLoadRowsetInfo>&
354
                    tablet_load_infos);
355
356
    VTabletWriter* _parent = nullptr;
357
    IndexChannel* _index_channel = nullptr;
358
    int64_t _node_id = -1;
359
    std::string _load_info;
360
    std::string _name;
361
362
    std::shared_ptr<MemTracker> _node_channel_tracker;
363
    int64_t _load_mem_limit = -1;
364
365
    TupleDescriptor* _tuple_desc = nullptr;
366
    NodeInfo _node_info;
367
368
    // this should be set in init() using config
369
    int _rpc_timeout_ms = 60000;
370
    int64_t _next_packet_seq = 0;
371
    MonotonicStopWatch _timeout_watch;
372
373
    // the timestamp when this node channel be marked closed and finished closed
374
    uint64_t _close_time_ms = 0;
375
376
    // user cancel or get some errors
377
    std::atomic<bool> _cancelled {false};
378
    std::mutex _cancel_msg_lock;
379
    std::string _cancel_msg;
380
381
    // send finished means the consumer thread which send the rpc can exit
382
    std::atomic<bool> _send_finished {false};
383
384
    // add batches finished means the last rpc has be response, used to check whether this channel can be closed
385
    std::atomic<bool> _add_batches_finished {false}; // reuse for vectorized
386
387
    bool _eos_is_produced {false}; // only for restricting producer behaviors
388
389
    std::unique_ptr<RowDescriptor> _row_desc;
390
    int _batch_size = 0;
391
392
    // limit _pending_batches size
393
    std::atomic<size_t> _pending_batches_bytes {0};
394
    size_t _max_pending_batches_bytes {(size_t)config::nodechannel_pending_queue_max_bytes};
395
    std::mutex _pending_batches_lock;          // reuse for vectorized
396
    std::atomic<int> _pending_batches_num {0}; // reuse for vectorized
397
398
    std::shared_ptr<PBackendService_Stub> _stub;
399
    // because we have incremantal open, we should keep one relative closure for one request. it's similarly for adding block.
400
    std::vector<std::shared_ptr<DummyBrpcCallback<PTabletWriterOpenResult>>> _open_callbacks;
401
402
    std::vector<TTabletWithPartition> _all_tablets;
403
    std::vector<TTabletWithPartition> _tablets_wait_open;
404
    // For rolling-upgrade compatibility, adaptive random bucket add-block RPCs also carry
405
    // tablet_ids. New receivers ignore them and route by partition id, while old receivers use
406
    // this local tablet id instead of failing on an empty tablet_ids list.
407
    std::unordered_map<int64_t, int64_t> _adaptive_partition_compat_tablets;
408
    // map from tablet_id to node_id where slave replicas locate in
409
    std::unordered_map<int64_t, std::vector<int64_t>> _slave_tablet_nodes;
410
    std::vector<TTabletCommitInfo> _tablet_commit_infos;
411
412
    AddBatchCounter _add_batch_counter;
413
    std::atomic<int64_t> _serialize_batch_ns {0};
414
    std::atomic<int64_t> _queue_push_lock_ns {0};
415
    std::atomic<int64_t> _actual_consume_ns {0};
416
    std::atomic<int64_t> _load_back_pressure_version_block_ms {0};
417
418
    VNodeChannelStat _stat;
419
    // lock to protect _is_closed.
420
    // The methods in the IndexChannel are called back in the RpcClosure in the NodeChannel.
421
    // However, this rpc callback may occur after the whole task is finished (e.g. due to network latency),
422
    // and by that time the IndexChannel may have been destructured, so we should not call the
423
    // IndexChannel methods anymore, otherwise the BE will crash.
424
    // Therefore, we use the _is_closed and _closed_lock to ensure that the RPC callback
425
    // function will not call the IndexChannel method after the NodeChannel is closed.
426
    // The IndexChannel is definitely accessible until the NodeChannel is closed.
427
    std::mutex _closed_lock;
428
    bool _is_closed = false;
429
    bool _inited = false;
430
431
    RuntimeState* _state = nullptr;
432
    // A context lock for callbacks, the callback has to lock the ctx, to avoid
433
    // the object is deleted during callback is running.
434
    std::weak_ptr<TaskExecutionContext> _task_exec_ctx;
435
    // rows number received per tablet, tablet_id -> rows_num
436
    std::vector<std::pair<int64_t, int64_t>> _tablets_received_rows;
437
    // rows number filtered per tablet, tablet_id -> filtered_rows_num
438
    std::vector<std::pair<int64_t, int64_t>> _tablets_filtered_rows;
439
440
    // build a _cur_mutable_block and push into _pending_blocks. when not building, this block is empty.
441
    std::unique_ptr<MutableBlock> _cur_mutable_block;
442
    std::shared_ptr<PTabletWriterAddBlockRequest> _cur_add_block_request;
443
444
    using AddBlockReq =
445
            std::pair<std::unique_ptr<MutableBlock>, std::shared_ptr<PTabletWriterAddBlockRequest>>;
446
    std::queue<AddBlockReq> _pending_blocks;
447
    // send block to slave BE rely on this. dont reconstruct it.
448
    std::shared_ptr<WriteBlockCallback<PTabletWriterAddBlockResult>> _send_block_callback = nullptr;
449
450
    int64_t _wg_id = -1;
451
452
    bool _is_incremental;
453
454
    std::atomic<int64_t> _write_bytes {0};
455
    std::atomic<int64_t> _load_back_pressure_version_wait_time_ms {0};
456
};
457
458
// an IndexChannel is related to specific table and its rollup and mv
459
class IndexChannel {
460
public:
461
    IndexChannel(VTabletWriter* parent, int64_t index_id, VExprContextSPtr where_clause)
462
0
            : _parent(parent), _index_id(index_id), _where_clause(std::move(where_clause)) {
463
0
        _index_channel_tracker =
464
0
                std::make_unique<MemTracker>("IndexChannel:indexID=" + std::to_string(_index_id));
465
0
    }
466
0
    ~IndexChannel() = default;
467
468
    // allow to init multi times, for incremental open more tablets for one index(table)
469
    Status init(RuntimeState* state, const std::vector<TTabletWithPartition>& tablets,
470
                bool incremental = false);
471
472
    void for_each_node_channel(
473
0
            const std::function<void(const std::shared_ptr<VNodeChannel>&)>& func) {
474
0
        for (auto& it : _node_channels) {
475
0
            func(it.second);
476
0
        }
477
0
    }
478
479
    void for_init_node_channel(
480
0
            const std::function<void(const std::shared_ptr<VNodeChannel>&)>& func) {
481
0
        for (auto& it : _node_channels) {
482
0
            if (!it.second->is_incremental()) {
483
0
                func(it.second);
484
0
            }
485
0
        }
486
0
    }
487
488
    void for_inc_node_channel(
489
0
            const std::function<void(const std::shared_ptr<VNodeChannel>&)>& func) {
490
0
        for (auto& it : _node_channels) {
491
0
            if (it.second->is_incremental()) {
492
0
                func(it.second);
493
0
            }
494
0
        }
495
0
    }
496
497
0
    std::unordered_set<int64_t> init_node_channel_ids() {
498
0
        std::unordered_set<int64_t> node_channel_ids;
499
0
        for (auto& it : _node_channels) {
500
0
            if (!it.second->is_incremental()) {
501
0
                node_channel_ids.insert(it.first);
502
0
            }
503
0
        }
504
0
        return node_channel_ids;
505
0
    }
506
507
0
    std::unordered_set<int64_t> inc_node_channel_ids() {
508
0
        std::unordered_set<int64_t> node_channel_ids;
509
0
        for (auto& it : _node_channels) {
510
0
            if (it.second->is_incremental()) {
511
0
                node_channel_ids.insert(it.first);
512
0
            }
513
0
        }
514
0
        return node_channel_ids;
515
0
    }
516
517
0
    std::unordered_set<int64_t> each_node_channel_ids() {
518
0
        std::unordered_set<int64_t> node_channel_ids;
519
0
        for (auto& it : _node_channels) {
520
0
            node_channel_ids.insert(it.first);
521
0
        }
522
0
        return node_channel_ids;
523
0
    }
524
525
0
    bool has_incremental_node_channel() const { return _has_inc_node; }
526
527
    void mark_as_failed(const VNodeChannel* node_channel, const std::string& err,
528
                        int64_t tablet_id = -1);
529
    Status check_intolerable_failure();
530
531
    Status close_wait(RuntimeState* state, WriterStats* writer_stats,
532
                      std::unordered_map<int64_t, AddBatchCounter>* node_add_batch_counter_map,
533
                      std::unordered_set<int64_t> unfinished_node_channel_ids,
534
                      bool need_wait_after_quorum_success);
535
536
0
    int64_t close_wait_version() const {
537
0
        return _close_wait_version.load(std::memory_order_acquire);
538
0
    }
539
540
    void wait_for_close_event(int64_t observed_version, int64_t timeout_ms);
541
542
    void notify_close_wait();
543
544
    Status check_each_node_channel_close(
545
            std::unordered_set<int64_t>* unfinished_node_channel_ids,
546
            std::unordered_map<int64_t, AddBatchCounter>* node_add_batch_counter_map,
547
            WriterStats* writer_stats, Status status);
548
549
    // set error tablet info in runtime state, so that it can be returned to FE.
550
    void set_error_tablet_in_state(RuntimeState* state);
551
552
0
    size_t num_node_channels() const { return _node_channels.size(); }
553
554
0
    size_t get_pending_bytes() const {
555
0
        size_t mem_consumption = 0;
556
0
        for (const auto& kv : _node_channels) {
557
0
            mem_consumption += kv.second->get_pending_bytes();
558
0
        }
559
0
        return mem_consumption;
560
0
    }
561
562
    void set_tablets_received_rows(
563
            const std::vector<std::pair<int64_t, int64_t>>& tablets_received_rows, int64_t node_id);
564
565
    void set_tablets_filtered_rows(
566
            const std::vector<std::pair<int64_t, int64_t>>& tablets_filtered_rows, int64_t node_id);
567
568
0
    int64_t num_rows_filtered() {
569
        // the Unique table has no roll up or materilized view
570
        // we just add up filtered rows from all partitions
571
0
        return std::accumulate(_tablets_filtered_rows.cbegin(), _tablets_filtered_rows.cend(), 0,
572
0
                               [](int64_t sum, const auto& a) { return sum + a.second[0].second; });
573
0
    }
574
575
    // check whether the rows num written by different replicas is consistent
576
    Status check_tablet_received_rows_consistency();
577
578
    // check whether the rows num filtered by different replicas is consistent
579
    Status check_tablet_filtered_rows_consistency();
580
581
0
    void set_start_time(const int64_t& start_time) { _start_time = start_time; }
582
583
0
    VExprContextSPtr get_where_clause() { return _where_clause; }
584
585
private:
586
    friend class VNodeChannel;
587
    friend class VTabletWriter;
588
    friend class VRowDistribution;
589
590
    int _max_failed_replicas(int64_t tablet_id);
591
592
    int _load_required_replicas_num(int64_t tablet_id);
593
594
    bool _quorum_success(const std::unordered_set<int64_t>& unfinished_node_channel_ids,
595
                         const std::unordered_set<int64_t>& need_finish_tablets);
596
597
    int64_t _calc_max_wait_time_ms(const std::unordered_set<int64_t>& unfinished_node_channel_ids);
598
599
    VTabletWriter* _parent = nullptr;
600
    int64_t _index_id;
601
    VExprContextSPtr _where_clause;
602
603
    // from backend channel to tablet_id
604
    // ATTN: must be placed before `_node_channels` and `_channels_by_tablet`.
605
    // Because the destruct order of objects is opposite to the creation order.
606
    // So NodeChannel will be destructured first.
607
    // And the destructor function of NodeChannel waits for all RPCs to finish.
608
    // This ensures that it is safe to use `_tablets_by_channel` in the callback function for the end of the RPC.
609
    std::unordered_map<int64_t, std::unordered_set<int64_t>> _tablets_by_channel;
610
    // BeId -> channel
611
    std::unordered_map<int64_t, std::shared_ptr<VNodeChannel>> _node_channels;
612
    // from tablet_id to backend channel
613
    std::unordered_map<int64_t, std::vector<std::shared_ptr<VNodeChannel>>> _channels_by_tablet;
614
    // from partition_id to FE-planned bucket owner channel in cloud adaptive random bucket mode
615
    std::unordered_map<int64_t, std::shared_ptr<VNodeChannel>> _channels_by_partition;
616
    bool _has_inc_node = false;
617
618
    // lock to protect _failed_channels and _failed_channels_msgs
619
    mutable std::mutex _fail_lock;
620
    // key is tablet_id, value is a set of failed node id
621
    std::unordered_map<int64_t, std::unordered_set<int64_t>> _failed_channels;
622
    // key is tablet_id, value is error message
623
    std::unordered_map<int64_t, std::string> _failed_channels_msgs;
624
    Status _intolerable_failure_status = Status::OK();
625
626
    std::unique_ptr<MemTracker> _index_channel_tracker;
627
    // rows num received by DeltaWriter per tablet, tablet_id -> <node_Id, rows_num>
628
    // used to verify whether the rows num received by different replicas is consistent
629
    std::map<int64_t, std::vector<std::pair<int64_t, int64_t>>> _tablets_received_rows;
630
631
    // rows num filtered by DeltaWriter per tablet, tablet_id -> <node_Id, filtered_rows_num>
632
    // used to verify whether the rows num filtered by different replicas is consistent
633
    std::map<int64_t, std::vector<std::pair<int64_t, int64_t>>> _tablets_filtered_rows;
634
635
    int64_t _start_time = 0;
636
637
    std::atomic<int64_t> _close_wait_version {0};
638
    bthread::Mutex _close_wait_mutex;
639
    bthread::ConditionVariable _close_wait_cv;
640
};
641
} // namespace doris
642
643
namespace doris {
644
//
645
// write result to file
646
class VTabletWriter final : public AsyncResultWriter {
647
public:
648
    VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs,
649
                  std::shared_ptr<Dependency> dep, std::shared_ptr<Dependency> fin_dep);
650
651
    Status write(RuntimeState* state, Block& block) override;
652
653
    Status close(Status) override;
654
655
    Status open(RuntimeState* state, RuntimeProfile* profile) override;
656
657
    // the consumer func of sending pending batches in every NodeChannel.
658
    // use polling & NodeChannel::try_send_and_fetch_status() to achieve nonblocking sending.
659
    // only focus on pending batches and channel status, the internal errors of NodeChannels will be handled by the producer
660
    void _send_batch_process();
661
662
    Status on_partitions_created(TCreatePartitionResult* result);
663
664
    Status _send_new_partition_batch();
665
666
private:
667
    friend class VNodeChannel;
668
    friend class IndexChannel;
669
670
    using ChannelDistributionPayload = std::unordered_map<VNodeChannel*, Payload>;
671
    using ChannelDistributionPayloadVec = std::vector<std::unordered_map<VNodeChannel*, Payload>>;
672
673
    Status _init_row_distribution();
674
675
    Status _init(RuntimeState* state, RuntimeProfile* profile);
676
677
    Status _generate_one_index_channel_payload(RowPartTabletIds& row_part_tablet_tuple,
678
                                               int32_t index_idx,
679
                                               ChannelDistributionPayload& channel_payload);
680
681
    Status _generate_index_channels_payloads(std::vector<RowPartTabletIds>& row_part_tablet_ids,
682
                                             ChannelDistributionPayloadVec& payload);
683
684
    void _cancel_all_channel(Status status);
685
686
    Status _incremental_open_node_channel(const std::vector<TOlapTablePartition>& partitions);
687
688
    void _do_try_close(RuntimeState* state, const Status& exec_status);
689
690
    void _build_tablet_replica_info(const int64_t tablet_id, VOlapTablePartition* partition);
691
692
    TDataSink _t_sink;
693
694
    std::shared_ptr<MemTracker> _mem_tracker;
695
696
    ObjectPool* _pool = nullptr;
697
698
    bthread_t _sender_thread = 0;
699
700
    // unique load id
701
    PUniqueId _load_id;
702
    int64_t _txn_id = -1;
703
    int _num_replicas = -1;
704
    int _tuple_desc_id = -1;
705
706
    // this is tuple descriptor of destination OLAP table
707
    TupleDescriptor* _output_tuple_desc = nullptr;
708
    RowDescriptor* _output_row_desc = nullptr;
709
710
    // number of senders used to insert into OlapTable, if we only support single node insert,
711
    // all data from select should collectted and then send to OlapTable.
712
    // To support multiple senders, we maintain a channel for each sender.
713
    int _sender_id = -1;
714
    int _num_senders = -1;
715
    bool _is_high_priority = false;
716
717
    // TODO(zc): think about cache this data
718
    std::shared_ptr<OlapTableSchemaParam> _schema;
719
    OlapTableLocationParam* _location = nullptr;
720
    bool _write_single_replica = false;
721
    OlapTableLocationParam* _slave_location = nullptr;
722
    DorisNodesInfo* _nodes_info = nullptr;
723
724
    std::unique_ptr<OlapTabletFinder> _tablet_finder;
725
726
    // index_channel
727
    bthread::Mutex _stop_check_channel;
728
    std::vector<std::shared_ptr<IndexChannel>> _channels;
729
    std::unordered_map<int64_t, std::shared_ptr<IndexChannel>> _index_id_to_channel;
730
731
    std::unique_ptr<ThreadPoolToken> _send_batch_thread_pool_token;
732
733
    // support only one partition column now
734
    std::vector<std::vector<TStringLiteral>> _partitions_need_create;
735
736
    std::unique_ptr<OlapTableBlockConvertor> _block_convertor;
737
    // Stats for this
738
    int64_t _send_data_ns = 0;
739
    int64_t _number_input_rows = 0;
740
    int64_t _number_output_rows = 0;
741
    int64_t _filter_ns = 0;
742
743
    MonotonicStopWatch _row_distribution_watch;
744
745
    RuntimeProfile::Counter* _input_rows_counter = nullptr;
746
    RuntimeProfile::Counter* _output_rows_counter = nullptr;
747
    RuntimeProfile::Counter* _filtered_rows_counter = nullptr;
748
    RuntimeProfile::Counter* _send_data_timer = nullptr;
749
    RuntimeProfile::Counter* _row_distribution_timer = nullptr;
750
    RuntimeProfile::Counter* _append_node_channel_timer = nullptr;
751
    RuntimeProfile::Counter* _filter_timer = nullptr;
752
    RuntimeProfile::Counter* _where_clause_timer = nullptr;
753
    RuntimeProfile::Counter* _add_partition_request_timer = nullptr;
754
    RuntimeProfile::Counter* _wait_mem_limit_timer = nullptr;
755
    RuntimeProfile::Counter* _validate_data_timer = nullptr;
756
    RuntimeProfile::Counter* _open_timer = nullptr;
757
    RuntimeProfile::Counter* _close_timer = nullptr;
758
    RuntimeProfile::Counter* _non_blocking_send_timer = nullptr;
759
    RuntimeProfile::Counter* _non_blocking_send_work_timer = nullptr;
760
    RuntimeProfile::Counter* _serialize_batch_timer = nullptr;
761
    RuntimeProfile::Counter* _total_add_batch_exec_timer = nullptr;
762
    RuntimeProfile::Counter* _max_add_batch_exec_timer = nullptr;
763
    RuntimeProfile::Counter* _total_wait_exec_timer = nullptr;
764
    RuntimeProfile::Counter* _max_wait_exec_timer = nullptr;
765
    RuntimeProfile::Counter* _add_batch_number = nullptr;
766
    RuntimeProfile::Counter* _num_node_channels = nullptr;
767
    RuntimeProfile::Counter* _load_back_pressure_version_time_ms = nullptr;
768
769
    // the timeout of load channels opened by this tablet sink. in second
770
    int64_t _load_channel_timeout_s = 0;
771
    // the load txn absolute expiration time.
772
    int64_t _txn_expiration = 0;
773
774
    int32_t _send_batch_parallelism = 1;
775
    // Save the status of try_close() and close() method
776
    Status _close_status;
777
    // if we called try_close(), for auto partition the periodic send thread should stop if it's still waiting for node channels first-time open.
778
    // atomic: written by pthread (_do_try_close), read by bthread (_send_batch_process)
779
    std::atomic<bool> _try_close {false};
780
    bool _inited = false;
781
    bool _write_file_cache = false;
782
783
    // User can change this config at runtime, avoid it being modified during query or loading process.
784
    bool _transfer_large_data_by_brpc = false;
785
786
    VOlapTablePartitionParam* _vpartition = nullptr;
787
788
    RuntimeState* _state = nullptr; // not owned, set when open
789
790
    VRowDistribution _row_distribution;
791
    // reuse to avoid frequent memory allocation and release.
792
    std::vector<RowPartTabletIds> _row_part_tablet_ids;
793
794
    // tablet_id -> <total replicas num, load required replicas num>
795
    std::unordered_map<int64_t, std::pair<int, int>> _tablet_replica_info;
796
797
    // tablet_id -> set of backend_ids that have version gaps
798
    // these backends' success should not be counted for majority write
799
    std::unordered_map<int64_t, std::unordered_set<int64_t>> _tablet_version_gap_backends;
800
};
801
} // namespace doris