Coverage Report

Created: 2026-06-09 21:12

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/writer/vtablet_writer.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
#include <brpc/controller.h>
20
#include <bthread/types.h>
21
#include <butil/errno.h>
22
#include <fmt/format.h>
23
#include <gen_cpp/Exprs_types.h>
24
#include <gen_cpp/FrontendService.h>
25
#include <gen_cpp/FrontendService_types.h>
26
#include <gen_cpp/PaloInternalService_types.h>
27
#include <gen_cpp/Types_types.h>
28
#include <gen_cpp/internal_service.pb.h>
29
#include <gen_cpp/types.pb.h>
30
#include <glog/logging.h>
31
#include <google/protobuf/stubs/callback.h>
32
33
// IWYU pragma: no_include <bits/chrono.h>
34
#include <bthread/mutex.h>
35
36
#include <atomic>
37
#include <chrono> // IWYU pragma: keep
38
#include <cstddef>
39
#include <cstdint>
40
#include <functional>
41
#include <map>
42
#include <memory>
43
#include <mutex>
44
#include <ostream>
45
#include <queue>
46
#include <sstream>
47
#include <string>
48
#include <thread>
49
#include <unordered_map>
50
#include <unordered_set>
51
#include <utility>
52
#include <vector>
53
54
#include "common/config.h"
55
#include "common/status.h"
56
#include "core/block/block.h"
57
#include "core/column/column.h"
58
#include "core/data_type/data_type.h"
59
#include "exec/sink/vrow_distribution.h"
60
#include "exec/sink/vtablet_block_convertor.h"
61
#include "exec/sink/vtablet_finder.h"
62
#include "exec/sink/writer/async_result_writer.h"
63
#include "exprs/vexpr_fwd.h"
64
#include "runtime/exec_env.h"
65
#include "runtime/memory/mem_tracker.h"
66
#include "runtime/runtime_profile.h"
67
#include "runtime/thread_context.h"
68
#include "storage/tablet_info.h"
69
#include "util/brpc_closure.h"
70
#include "util/stopwatch.hpp"
71
72
namespace doris {
73
class ObjectPool;
74
class RowDescriptor;
75
class RuntimeState;
76
class TDataSink;
77
class TExpr;
78
class Thread;
79
class ThreadPoolToken;
80
class TupleDescriptor;
81
82
// The counter of add_batch rpc of a single node
83
struct AddBatchCounter {
84
    // total execution time of a add_batch rpc
85
    int64_t add_batch_execution_time_us = 0;
86
    // lock waiting time in a add_batch rpc
87
    int64_t add_batch_wait_execution_time_us = 0;
88
    // number of add_batch call
89
    int64_t add_batch_num = 0;
90
    // time passed between marked close and finish close
91
    int64_t close_wait_time_ms = 0;
92
93
0
    AddBatchCounter& operator+=(const AddBatchCounter& rhs) {
94
0
        add_batch_execution_time_us += rhs.add_batch_execution_time_us;
95
0
        add_batch_wait_execution_time_us += rhs.add_batch_wait_execution_time_us;
96
0
        add_batch_num += rhs.add_batch_num;
97
0
        close_wait_time_ms += rhs.close_wait_time_ms;
98
0
        return *this;
99
0
    }
100
0
    friend AddBatchCounter operator+(const AddBatchCounter& lhs, const AddBatchCounter& rhs) {
101
0
        AddBatchCounter sum = lhs;
102
0
        sum += rhs;
103
0
        return sum;
104
0
    }
105
};
106
107
struct WriteBlockCallbackContext {
108
    std::atomic<bool> _is_last_rpc {false};
109
};
110
111
// It's very error-prone to guarantee the handler capture vars' & this closure's destruct sequence.
112
// So using create() to get the closure pointer is recommended. We can delete the closure ptr before the capture vars destruction.
113
// Delete this point is safe, don't worry about RPC callback will run after WriteBlockCallback deleted.
114
// "Ping-Pong" between sender and receiver, `try_set_in_flight` when send, `clear_in_flight` after rpc failure or callback,
115
// then next send will start, and it will wait for the rpc callback to complete when it is destroyed.
116
template <typename T>
117
class WriteBlockCallback final : public ::doris::DummyBrpcCallback<T> {
118
    ENABLE_FACTORY_CREATOR(WriteBlockCallback);
119
120
public:
121
0
    WriteBlockCallback() : cid(INVALID_BTHREAD_ID) {}
122
0
    ~WriteBlockCallback() override = default;
123
124
0
    void addFailedHandler(const std::function<void(const WriteBlockCallbackContext&)>& fn) {
125
0
        failed_handler = fn;
126
0
    }
127
    void addSuccessHandler(
128
0
            const std::function<void(const T&, const WriteBlockCallbackContext&)>& fn) {
129
0
        success_handler = fn;
130
0
    }
131
132
0
    void join() override {
133
        // We rely on in_flight to assure one rpc is running,
134
        // while cid is not reliable due to memory order.
135
        // in_flight is written before getting callid,
136
        // so we can not use memory fence to synchronize.
137
0
        while (_packet_in_flight) {
138
            // cid here is complicated
139
0
            if (cid != INVALID_BTHREAD_ID) {
140
                // actually cid may be the last rpc call id.
141
0
                brpc::Join(cid);
142
0
            }
143
0
            if (_packet_in_flight) {
144
0
                std::this_thread::sleep_for(std::chrono::milliseconds(10));
145
0
            }
146
0
        }
147
0
    }
148
149
    // plz follow this order: reset() -> set_in_flight() -> send brpc batch
150
0
    void reset() {
151
0
        ::doris::DummyBrpcCallback<T>::cntl_->Reset();
152
0
        cid = ::doris::DummyBrpcCallback<T>::cntl_->call_id();
153
0
    }
154
155
    // if _packet_in_flight == false, set it to true. Return true.
156
    // if _packet_in_flight == true, Return false.
157
0
    bool try_set_in_flight() {
158
0
        bool value = false;
159
0
        return _packet_in_flight.compare_exchange_strong(value, true);
160
0
    }
161
162
0
    void clear_in_flight() { _packet_in_flight = false; }
163
164
    bool is_packet_in_flight() { return _packet_in_flight; }
165
166
0
    void end_mark() {
167
0
        DCHECK(_ctx._is_last_rpc == false);
168
0
        _ctx._is_last_rpc = true;
169
0
    }
170
171
0
    void call() override {
172
0
        DCHECK(_packet_in_flight);
173
0
        if (::doris::DummyBrpcCallback<T>::cntl_->Failed()) {
174
0
            LOG(WARNING) << "failed to send brpc batch, error="
175
0
                         << berror(::doris::DummyBrpcCallback<T>::cntl_->ErrorCode())
176
0
                         << ", error_text=" << ::doris::DummyBrpcCallback<T>::cntl_->ErrorText();
177
0
            failed_handler(_ctx);
178
0
        } else {
179
0
            success_handler(*(::doris::DummyBrpcCallback<T>::response_), _ctx);
180
0
        }
181
0
        clear_in_flight();
182
0
    }
183
184
private:
185
    brpc::CallId cid;
186
    std::atomic<bool> _packet_in_flight {false};
187
    WriteBlockCallbackContext _ctx;
188
    std::function<void(const WriteBlockCallbackContext&)> failed_handler;
189
    std::function<void(const T&, const WriteBlockCallbackContext&)> success_handler;
190
};
191
192
class IndexChannel;
193
class VTabletWriter;
194
195
class VNodeChannelStat {
196
public:
197
0
    VNodeChannelStat& operator+=(const VNodeChannelStat& stat) {
198
0
        mem_exceeded_block_ns += stat.mem_exceeded_block_ns;
199
0
        where_clause_ns += stat.where_clause_ns;
200
0
        append_node_channel_ns += stat.append_node_channel_ns;
201
0
        return *this;
202
0
    };
203
204
    int64_t mem_exceeded_block_ns = 0;
205
    int64_t where_clause_ns = 0;
206
    int64_t append_node_channel_ns = 0;
207
};
208
209
struct WriterStats {
210
    int64_t serialize_batch_ns = 0;
211
    int64_t queue_push_lock_ns = 0;
212
    int64_t actual_consume_ns = 0;
213
    int64_t total_add_batch_exec_time_ns = 0;
214
    int64_t max_add_batch_exec_time_ns = 0;
215
    int64_t total_wait_exec_time_ns = 0;
216
    int64_t max_wait_exec_time_ns = 0;
217
    int64_t total_add_batch_num = 0;
218
    int64_t num_node_channels = 0;
219
    int64_t load_back_pressure_version_time_ms = 0;
220
    VNodeChannelStat channel_stat;
221
};
222
223
struct Payload {
224
    std::unique_ptr<IColumn::Selector> row_ids;
225
    RowPartTabletIds* row_part_tablet_ids = nullptr;
226
    std::vector<uint32_t> route_idxs;
227
};
228
229
// every NodeChannel keeps a data transmission channel with one BE. for multiple times open, it has a dozen of requests and corresponding closures.
230
class VNodeChannel {
231
public:
232
    VNodeChannel(VTabletWriter* parent, IndexChannel* index_channel, int64_t node_id,
233
                 bool is_incremental = false);
234
235
    ~VNodeChannel();
236
237
    // called before open, used to add tablet located in this backend. called by IndexChannel::init
238
0
    void add_tablet(const TTabletWithPartition& tablet) { _tablets_wait_open.emplace_back(tablet); }
239
0
    std::string debug_tablets() const {
240
0
        std::stringstream ss;
241
0
        for (const auto& tab : _all_tablets) {
242
0
            tab.printTo(ss);
243
0
            ss << '\n';
244
0
        }
245
0
        return ss.str();
246
0
    }
247
248
0
    void add_slave_tablet_nodes(int64_t tablet_id, const std::vector<int64_t>& slave_nodes) {
249
0
        _slave_tablet_nodes[tablet_id] = slave_nodes;
250
0
    }
251
252
    // this function is NON_REENTRANT
253
    Status init(RuntimeState* state);
254
    /// these two functions will call open_internal. should keep that clear --- REENTRANT
255
    // build corresponding connect to BE. NON-REENTRANT
256
    void open();
257
    // for auto partition, we use this to open more tablet. KEEP IT REENTRANT
258
    void incremental_open();
259
    // this will block until all request transmission which were opened or incremental opened finished.
260
    // this function will called multi times. NON_REENTRANT
261
    Status open_wait();
262
263
    Status add_block(Block* block, const Payload* payload);
264
265
    // @return: 1 if running, 0 if finished.
266
    // @caller: VOlapTabletSink::_send_batch_process. it's a continual asynchronous process.
267
    int try_send_and_fetch_status(RuntimeState* state,
268
                                  std::unique_ptr<ThreadPoolToken>& thread_pool_token);
269
    // when there's pending block found by try_send_and_fetch_status(), we will awake a thread to send it.
270
    void try_send_pending_block(RuntimeState* state);
271
272
    void clear_all_blocks();
273
274
    // two ways to stop channel:
275
    // 1. mark_close()->close_wait() PS. close_wait() will block waiting for the last AddBatch rpc response.
276
    // 2. just cancel()
277
    // hang_wait = true will make reciever hang until all sender mark_closed.
278
    void mark_close(bool hang_wait = false);
279
280
0
    bool is_closed() const { return _is_closed; }
281
0
    bool is_cancelled() const { return _cancelled; }
282
0
    std::string get_cancel_msg() {
283
0
        std::lock_guard<std::mutex> l(_cancel_msg_lock);
284
0
        if (!_cancel_msg.empty()) {
285
0
            return _cancel_msg;
286
0
        }
287
0
        return fmt::format("{} is cancelled", channel_info());
288
0
    }
289
290
    // two ways to stop channel:
291
    // 1. mark_close()->close_wait() PS. close_wait() will block waiting for the last AddBatch rpc response.
292
    // 2. just cancel()
293
    Status close_wait(RuntimeState* state, bool* is_closed);
294
295
    Status after_close_handle(
296
            RuntimeState* state, WriterStats* writer_stats,
297
            std::unordered_map<int64_t, AddBatchCounter>* node_add_batch_counter_map);
298
299
    Status check_status();
300
301
    void cancel(const std::string& cancel_msg);
302
303
    void time_report(std::unordered_map<int64_t, AddBatchCounter>* add_batch_counter_map,
304
0
                     WriterStats* writer_stats) const {
305
0
        if (add_batch_counter_map != nullptr) {
306
0
            (*add_batch_counter_map)[_node_id] += _add_batch_counter;
307
0
            (*add_batch_counter_map)[_node_id].close_wait_time_ms = _close_time_ms;
308
0
        }
309
0
        if (writer_stats != nullptr) {
310
0
            writer_stats->serialize_batch_ns += _serialize_batch_ns;
311
0
            writer_stats->channel_stat += _stat;
312
0
            writer_stats->queue_push_lock_ns += _queue_push_lock_ns;
313
0
            writer_stats->actual_consume_ns += _actual_consume_ns;
314
0
            writer_stats->total_add_batch_exec_time_ns +=
315
0
                    (_add_batch_counter.add_batch_execution_time_us * 1000);
316
0
            writer_stats->total_wait_exec_time_ns +=
317
0
                    (_add_batch_counter.add_batch_wait_execution_time_us * 1000);
318
0
            writer_stats->total_add_batch_num += _add_batch_counter.add_batch_num;
319
0
            writer_stats->load_back_pressure_version_time_ms +=
320
0
                    _load_back_pressure_version_block_ms;
321
0
        }
322
0
    }
323
324
0
    int64_t node_id() const { return _node_id; }
325
0
    std::string host() const { return _node_info.host; }
326
0
    std::string name() const { return _name; }
327
328
0
    std::string channel_info() const {
329
0
        return fmt::format("{}, {}, node={}:{}", _name, _load_info, _node_info.host,
330
0
                           _node_info.brpc_port);
331
0
    }
332
333
0
    size_t get_pending_bytes() { return _pending_batches_bytes; }
334
335
0
    bool is_incremental() const { return _is_incremental; }
336
337
0
    int64_t write_bytes() const { return _write_bytes.load(); }
338
339
protected:
340
    // make a real open request for relative BE's load channel.
341
    void _open_internal(bool is_incremental);
342
343
    void _close_check();
344
    void _cancel_with_msg(const std::string& msg);
345
346
    void _add_block_success_callback(const PTabletWriterAddBlockResult& result,
347
                                     const WriteBlockCallbackContext& ctx);
348
    void _add_block_failed_callback(const WriteBlockCallbackContext& ctx);
349
350
    void _refresh_back_pressure_version_wait_time(
351
            const ::google::protobuf::RepeatedPtrField<::doris::PTabletLoadRowsetInfo>&
352
                    tablet_load_infos);
353
354
    VTabletWriter* _parent = nullptr;
355
    IndexChannel* _index_channel = nullptr;
356
    int64_t _node_id = -1;
357
    std::string _load_info;
358
    std::string _name;
359
360
    std::shared_ptr<MemTracker> _node_channel_tracker;
361
    int64_t _load_mem_limit = -1;
362
363
    TupleDescriptor* _tuple_desc = nullptr;
364
    NodeInfo _node_info;
365
366
    // this should be set in init() using config
367
    int _rpc_timeout_ms = 60000;
368
    int64_t _next_packet_seq = 0;
369
    MonotonicStopWatch _timeout_watch;
370
371
    // the timestamp when this node channel be marked closed and finished closed
372
    uint64_t _close_time_ms = 0;
373
374
    // user cancel or get some errors
375
    std::atomic<bool> _cancelled {false};
376
    std::mutex _cancel_msg_lock;
377
    std::string _cancel_msg;
378
379
    // send finished means the consumer thread which send the rpc can exit
380
    std::atomic<bool> _send_finished {false};
381
382
    // add batches finished means the last rpc has be response, used to check whether this channel can be closed
383
    std::atomic<bool> _add_batches_finished {false}; // reuse for vectorized
384
385
    bool _eos_is_produced {false}; // only for restricting producer behaviors
386
387
    std::unique_ptr<RowDescriptor> _row_desc;
388
    int _batch_size = 0;
389
390
    // limit _pending_batches size
391
    std::atomic<size_t> _pending_batches_bytes {0};
392
    size_t _max_pending_batches_bytes {(size_t)config::nodechannel_pending_queue_max_bytes};
393
    std::mutex _pending_batches_lock;          // reuse for vectorized
394
    std::atomic<int> _pending_batches_num {0}; // reuse for vectorized
395
396
    std::shared_ptr<PBackendService_Stub> _stub;
397
    // because we have incremantal open, we should keep one relative closure for one request. it's similarly for adding block.
398
    std::vector<std::shared_ptr<DummyBrpcCallback<PTabletWriterOpenResult>>> _open_callbacks;
399
400
    std::vector<TTabletWithPartition> _all_tablets;
401
    std::vector<TTabletWithPartition> _tablets_wait_open;
402
    // map from tablet_id to node_id where slave replicas locate in
403
    std::unordered_map<int64_t, std::vector<int64_t>> _slave_tablet_nodes;
404
    std::vector<TTabletCommitInfo> _tablet_commit_infos;
405
406
    AddBatchCounter _add_batch_counter;
407
    std::atomic<int64_t> _serialize_batch_ns {0};
408
    std::atomic<int64_t> _queue_push_lock_ns {0};
409
    std::atomic<int64_t> _actual_consume_ns {0};
410
    std::atomic<int64_t> _load_back_pressure_version_block_ms {0};
411
412
    VNodeChannelStat _stat;
413
    // lock to protect _is_closed.
414
    // The methods in the IndexChannel are called back in the RpcClosure in the NodeChannel.
415
    // However, this rpc callback may occur after the whole task is finished (e.g. due to network latency),
416
    // and by that time the IndexChannel may have been destructured, so we should not call the
417
    // IndexChannel methods anymore, otherwise the BE will crash.
418
    // Therefore, we use the _is_closed and _closed_lock to ensure that the RPC callback
419
    // function will not call the IndexChannel method after the NodeChannel is closed.
420
    // The IndexChannel is definitely accessible until the NodeChannel is closed.
421
    std::mutex _closed_lock;
422
    bool _is_closed = false;
423
    bool _inited = false;
424
425
    RuntimeState* _state = nullptr;
426
    // A context lock for callbacks, the callback has to lock the ctx, to avoid
427
    // the object is deleted during callback is running.
428
    std::weak_ptr<TaskExecutionContext> _task_exec_ctx;
429
    // rows number received per tablet, tablet_id -> rows_num
430
    std::vector<std::pair<int64_t, int64_t>> _tablets_received_rows;
431
    // rows number filtered per tablet, tablet_id -> filtered_rows_num
432
    std::vector<std::pair<int64_t, int64_t>> _tablets_filtered_rows;
433
434
    // build a _cur_mutable_block and push into _pending_blocks. when not building, this block is empty.
435
    std::unique_ptr<MutableBlock> _cur_mutable_block;
436
    std::shared_ptr<PTabletWriterAddBlockRequest> _cur_add_block_request;
437
438
    using AddBlockReq =
439
            std::pair<std::unique_ptr<MutableBlock>, std::shared_ptr<PTabletWriterAddBlockRequest>>;
440
    std::queue<AddBlockReq> _pending_blocks;
441
    // send block to slave BE rely on this. dont reconstruct it.
442
    std::shared_ptr<WriteBlockCallback<PTabletWriterAddBlockResult>> _send_block_callback = nullptr;
443
444
    int64_t _wg_id = -1;
445
446
    bool _is_incremental;
447
448
    std::atomic<int64_t> _write_bytes {0};
449
    std::atomic<int64_t> _load_back_pressure_version_wait_time_ms {0};
450
};
451
452
// an IndexChannel is related to specific table and its rollup and mv
453
class IndexChannel {
454
public:
455
    IndexChannel(VTabletWriter* parent, int64_t index_id, VExprContextSPtr where_clause)
456
0
            : _parent(parent), _index_id(index_id), _where_clause(std::move(where_clause)) {
457
0
        _index_channel_tracker =
458
0
                std::make_unique<MemTracker>("IndexChannel:indexID=" + std::to_string(_index_id));
459
0
    }
460
0
    ~IndexChannel() = default;
461
462
    // allow to init multi times, for incremental open more tablets for one index(table)
463
    Status init(RuntimeState* state, const std::vector<TTabletWithPartition>& tablets,
464
                bool incremental = false);
465
466
    void for_each_node_channel(
467
0
            const std::function<void(const std::shared_ptr<VNodeChannel>&)>& func) {
468
0
        for (auto& it : _node_channels) {
469
0
            func(it.second);
470
0
        }
471
0
    }
472
473
    void for_init_node_channel(
474
0
            const std::function<void(const std::shared_ptr<VNodeChannel>&)>& func) {
475
0
        for (auto& it : _node_channels) {
476
0
            if (!it.second->is_incremental()) {
477
0
                func(it.second);
478
0
            }
479
0
        }
480
0
    }
481
482
    void for_inc_node_channel(
483
0
            const std::function<void(const std::shared_ptr<VNodeChannel>&)>& func) {
484
0
        for (auto& it : _node_channels) {
485
0
            if (it.second->is_incremental()) {
486
0
                func(it.second);
487
0
            }
488
0
        }
489
0
    }
490
491
0
    std::unordered_set<int64_t> init_node_channel_ids() {
492
0
        std::unordered_set<int64_t> node_channel_ids;
493
0
        for (auto& it : _node_channels) {
494
0
            if (!it.second->is_incremental()) {
495
0
                node_channel_ids.insert(it.first);
496
0
            }
497
0
        }
498
0
        return node_channel_ids;
499
0
    }
500
501
0
    std::unordered_set<int64_t> inc_node_channel_ids() {
502
0
        std::unordered_set<int64_t> node_channel_ids;
503
0
        for (auto& it : _node_channels) {
504
0
            if (it.second->is_incremental()) {
505
0
                node_channel_ids.insert(it.first);
506
0
            }
507
0
        }
508
0
        return node_channel_ids;
509
0
    }
510
511
0
    std::unordered_set<int64_t> each_node_channel_ids() {
512
0
        std::unordered_set<int64_t> node_channel_ids;
513
0
        for (auto& it : _node_channels) {
514
0
            node_channel_ids.insert(it.first);
515
0
        }
516
0
        return node_channel_ids;
517
0
    }
518
519
0
    bool has_incremental_node_channel() const { return _has_inc_node; }
520
521
    void mark_as_failed(const VNodeChannel* node_channel, const std::string& err,
522
                        int64_t tablet_id = -1);
523
    Status check_intolerable_failure();
524
525
    Status close_wait(RuntimeState* state, WriterStats* writer_stats,
526
                      std::unordered_map<int64_t, AddBatchCounter>* node_add_batch_counter_map,
527
                      std::unordered_set<int64_t> unfinished_node_channel_ids,
528
                      bool need_wait_after_quorum_success);
529
530
    Status check_each_node_channel_close(
531
            std::unordered_set<int64_t>* unfinished_node_channel_ids,
532
            std::unordered_map<int64_t, AddBatchCounter>* node_add_batch_counter_map,
533
            WriterStats* writer_stats, Status status);
534
535
    // set error tablet info in runtime state, so that it can be returned to FE.
536
    void set_error_tablet_in_state(RuntimeState* state);
537
538
0
    size_t num_node_channels() const { return _node_channels.size(); }
539
540
0
    size_t get_pending_bytes() const {
541
0
        size_t mem_consumption = 0;
542
0
        for (const auto& kv : _node_channels) {
543
0
            mem_consumption += kv.second->get_pending_bytes();
544
0
        }
545
0
        return mem_consumption;
546
0
    }
547
548
    void set_tablets_received_rows(
549
            const std::vector<std::pair<int64_t, int64_t>>& tablets_received_rows, int64_t node_id);
550
551
    void set_tablets_filtered_rows(
552
            const std::vector<std::pair<int64_t, int64_t>>& tablets_filtered_rows, int64_t node_id);
553
554
0
    int64_t num_rows_filtered() {
555
        // the Unique table has no roll up or materilized view
556
        // we just add up filtered rows from all partitions
557
0
        return std::accumulate(_tablets_filtered_rows.cbegin(), _tablets_filtered_rows.cend(), 0,
558
0
                               [](int64_t sum, const auto& a) { return sum + a.second[0].second; });
559
0
    }
560
561
    // check whether the rows num written by different replicas is consistent
562
    Status check_tablet_received_rows_consistency();
563
564
    // check whether the rows num filtered by different replicas is consistent
565
    Status check_tablet_filtered_rows_consistency();
566
567
0
    void set_start_time(const int64_t& start_time) { _start_time = start_time; }
568
569
0
    VExprContextSPtr get_where_clause() { return _where_clause; }
570
571
private:
572
    friend class VNodeChannel;
573
    friend class VTabletWriter;
574
    friend class VRowDistribution;
575
576
    int _max_failed_replicas(int64_t tablet_id);
577
578
    int _load_required_replicas_num(int64_t tablet_id);
579
580
    bool _quorum_success(const std::unordered_set<int64_t>& unfinished_node_channel_ids,
581
                         const std::unordered_set<int64_t>& need_finish_tablets);
582
583
    int64_t _calc_max_wait_time_ms(const std::unordered_set<int64_t>& unfinished_node_channel_ids);
584
585
    VTabletWriter* _parent = nullptr;
586
    int64_t _index_id;
587
    VExprContextSPtr _where_clause;
588
589
    // from backend channel to tablet_id
590
    // ATTN: must be placed before `_node_channels` and `_channels_by_tablet`.
591
    // Because the destruct order of objects is opposite to the creation order.
592
    // So NodeChannel will be destructured first.
593
    // And the destructor function of NodeChannel waits for all RPCs to finish.
594
    // This ensures that it is safe to use `_tablets_by_channel` in the callback function for the end of the RPC.
595
    std::unordered_map<int64_t, std::unordered_set<int64_t>> _tablets_by_channel;
596
    // BeId -> channel
597
    std::unordered_map<int64_t, std::shared_ptr<VNodeChannel>> _node_channels;
598
    // from tablet_id to backend channel
599
    std::unordered_map<int64_t, std::vector<std::shared_ptr<VNodeChannel>>> _channels_by_tablet;
600
    // from partition_id to FE-planned bucket owner channel in cloud receiver-side random bucket mode
601
    std::unordered_map<int64_t, std::shared_ptr<VNodeChannel>> _channels_by_partition;
602
    bool _has_inc_node = false;
603
604
    // lock to protect _failed_channels and _failed_channels_msgs
605
    mutable std::mutex _fail_lock;
606
    // key is tablet_id, value is a set of failed node id
607
    std::unordered_map<int64_t, std::unordered_set<int64_t>> _failed_channels;
608
    // key is tablet_id, value is error message
609
    std::unordered_map<int64_t, std::string> _failed_channels_msgs;
610
    Status _intolerable_failure_status = Status::OK();
611
612
    std::unique_ptr<MemTracker> _index_channel_tracker;
613
    // rows num received by DeltaWriter per tablet, tablet_id -> <node_Id, rows_num>
614
    // used to verify whether the rows num received by different replicas is consistent
615
    std::map<int64_t, std::vector<std::pair<int64_t, int64_t>>> _tablets_received_rows;
616
617
    // rows num filtered by DeltaWriter per tablet, tablet_id -> <node_Id, filtered_rows_num>
618
    // used to verify whether the rows num filtered by different replicas is consistent
619
    std::map<int64_t, std::vector<std::pair<int64_t, int64_t>>> _tablets_filtered_rows;
620
621
    int64_t _start_time = 0;
622
};
623
} // namespace doris
624
625
namespace doris {
626
//
627
// write result to file
628
class VTabletWriter final : public AsyncResultWriter {
629
public:
630
    VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs,
631
                  std::shared_ptr<Dependency> dep, std::shared_ptr<Dependency> fin_dep);
632
633
    Status write(RuntimeState* state, Block& block) override;
634
635
    Status close(Status) override;
636
637
    Status open(RuntimeState* state, RuntimeProfile* profile) override;
638
639
    // the consumer func of sending pending batches in every NodeChannel.
640
    // use polling & NodeChannel::try_send_and_fetch_status() to achieve nonblocking sending.
641
    // only focus on pending batches and channel status, the internal errors of NodeChannels will be handled by the producer
642
    void _send_batch_process();
643
644
    Status on_partitions_created(TCreatePartitionResult* result);
645
646
    Status _send_new_partition_batch();
647
648
private:
649
    friend class VNodeChannel;
650
    friend class IndexChannel;
651
652
    using ChannelDistributionPayload = std::unordered_map<VNodeChannel*, Payload>;
653
    using ChannelDistributionPayloadVec = std::vector<std::unordered_map<VNodeChannel*, Payload>>;
654
655
    Status _init_row_distribution();
656
657
    Status _init(RuntimeState* state, RuntimeProfile* profile);
658
659
    Status _generate_one_index_channel_payload(RowPartTabletIds& row_part_tablet_tuple,
660
                                               int32_t index_idx,
661
                                               ChannelDistributionPayload& channel_payload);
662
663
    Status _generate_index_channels_payloads(std::vector<RowPartTabletIds>& row_part_tablet_ids,
664
                                             ChannelDistributionPayloadVec& payload);
665
666
    void _cancel_all_channel(Status status);
667
668
    Status _incremental_open_node_channel(const std::vector<TOlapTablePartition>& partitions);
669
670
    void _do_try_close(RuntimeState* state, const Status& exec_status);
671
672
    void _build_tablet_replica_info(const int64_t tablet_id, VOlapTablePartition* partition);
673
674
    TDataSink _t_sink;
675
676
    std::shared_ptr<MemTracker> _mem_tracker;
677
678
    ObjectPool* _pool = nullptr;
679
680
    bthread_t _sender_thread = 0;
681
682
    // unique load id
683
    PUniqueId _load_id;
684
    int64_t _txn_id = -1;
685
    int _num_replicas = -1;
686
    int _tuple_desc_id = -1;
687
688
    // this is tuple descriptor of destination OLAP table
689
    TupleDescriptor* _output_tuple_desc = nullptr;
690
    RowDescriptor* _output_row_desc = nullptr;
691
692
    // number of senders used to insert into OlapTable, if we only support single node insert,
693
    // all data from select should collectted and then send to OlapTable.
694
    // To support multiple senders, we maintain a channel for each sender.
695
    int _sender_id = -1;
696
    int _num_senders = -1;
697
    bool _is_high_priority = false;
698
699
    // TODO(zc): think about cache this data
700
    std::shared_ptr<OlapTableSchemaParam> _schema;
701
    OlapTableLocationParam* _location = nullptr;
702
    bool _write_single_replica = false;
703
    OlapTableLocationParam* _slave_location = nullptr;
704
    DorisNodesInfo* _nodes_info = nullptr;
705
706
    std::unique_ptr<OlapTabletFinder> _tablet_finder;
707
708
    // index_channel
709
    bthread::Mutex _stop_check_channel;
710
    std::vector<std::shared_ptr<IndexChannel>> _channels;
711
    std::unordered_map<int64_t, std::shared_ptr<IndexChannel>> _index_id_to_channel;
712
713
    std::unique_ptr<ThreadPoolToken> _send_batch_thread_pool_token;
714
715
    // support only one partition column now
716
    std::vector<std::vector<TStringLiteral>> _partitions_need_create;
717
718
    std::unique_ptr<OlapTableBlockConvertor> _block_convertor;
719
    // Stats for this
720
    int64_t _send_data_ns = 0;
721
    int64_t _number_input_rows = 0;
722
    int64_t _number_output_rows = 0;
723
    int64_t _filter_ns = 0;
724
725
    MonotonicStopWatch _row_distribution_watch;
726
727
    RuntimeProfile::Counter* _input_rows_counter = nullptr;
728
    RuntimeProfile::Counter* _output_rows_counter = nullptr;
729
    RuntimeProfile::Counter* _filtered_rows_counter = nullptr;
730
    RuntimeProfile::Counter* _send_data_timer = nullptr;
731
    RuntimeProfile::Counter* _row_distribution_timer = nullptr;
732
    RuntimeProfile::Counter* _append_node_channel_timer = nullptr;
733
    RuntimeProfile::Counter* _filter_timer = nullptr;
734
    RuntimeProfile::Counter* _where_clause_timer = nullptr;
735
    RuntimeProfile::Counter* _add_partition_request_timer = nullptr;
736
    RuntimeProfile::Counter* _wait_mem_limit_timer = nullptr;
737
    RuntimeProfile::Counter* _validate_data_timer = nullptr;
738
    RuntimeProfile::Counter* _open_timer = nullptr;
739
    RuntimeProfile::Counter* _close_timer = nullptr;
740
    RuntimeProfile::Counter* _non_blocking_send_timer = nullptr;
741
    RuntimeProfile::Counter* _non_blocking_send_work_timer = nullptr;
742
    RuntimeProfile::Counter* _serialize_batch_timer = nullptr;
743
    RuntimeProfile::Counter* _total_add_batch_exec_timer = nullptr;
744
    RuntimeProfile::Counter* _max_add_batch_exec_timer = nullptr;
745
    RuntimeProfile::Counter* _total_wait_exec_timer = nullptr;
746
    RuntimeProfile::Counter* _max_wait_exec_timer = nullptr;
747
    RuntimeProfile::Counter* _add_batch_number = nullptr;
748
    RuntimeProfile::Counter* _num_node_channels = nullptr;
749
    RuntimeProfile::Counter* _load_back_pressure_version_time_ms = nullptr;
750
751
    // the timeout of load channels opened by this tablet sink. in second
752
    int64_t _load_channel_timeout_s = 0;
753
    // the load txn absolute expiration time.
754
    int64_t _txn_expiration = 0;
755
756
    int32_t _send_batch_parallelism = 1;
757
    // Save the status of try_close() and close() method
758
    Status _close_status;
759
    // if we called try_close(), for auto partition the periodic send thread should stop if it's still waiting for node channels first-time open.
760
    // atomic: written by pthread (_do_try_close), read by bthread (_send_batch_process)
761
    std::atomic<bool> _try_close {false};
762
    bool _inited = false;
763
    bool _write_file_cache = false;
764
765
    // User can change this config at runtime, avoid it being modified during query or loading process.
766
    bool _transfer_large_data_by_brpc = false;
767
768
    VOlapTablePartitionParam* _vpartition = nullptr;
769
770
    RuntimeState* _state = nullptr; // not owned, set when open
771
772
    VRowDistribution _row_distribution;
773
    // reuse to avoid frequent memory allocation and release.
774
    std::vector<RowPartTabletIds> _row_part_tablet_ids;
775
776
    // tablet_id -> <total replicas num, load required replicas num>
777
    std::unordered_map<int64_t, std::pair<int, int>> _tablet_replica_info;
778
779
    // tablet_id -> set of backend_ids that have version gaps
780
    // these backends' success should not be counted for majority write
781
    std::unordered_map<int64_t, std::unordered_set<int64_t>> _tablet_version_gap_backends;
782
};
783
} // namespace doris