Coverage Report

Created: 2026-03-12 17:15

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/writer/vtablet_writer.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
#include <brpc/controller.h>
20
#include <bthread/types.h>
21
#include <butil/errno.h>
22
#include <fmt/format.h>
23
#include <gen_cpp/Exprs_types.h>
24
#include <gen_cpp/FrontendService.h>
25
#include <gen_cpp/FrontendService_types.h>
26
#include <gen_cpp/PaloInternalService_types.h>
27
#include <gen_cpp/Types_types.h>
28
#include <gen_cpp/internal_service.pb.h>
29
#include <gen_cpp/types.pb.h>
30
#include <glog/logging.h>
31
#include <google/protobuf/stubs/callback.h>
32
33
// IWYU pragma: no_include <bits/chrono.h>
34
#include <bthread/mutex.h>
35
36
#include <atomic>
37
#include <chrono> // IWYU pragma: keep
38
#include <cstddef>
39
#include <cstdint>
40
#include <functional>
41
#include <map>
42
#include <memory>
43
#include <mutex>
44
#include <ostream>
45
#include <queue>
46
#include <sstream>
47
#include <string>
48
#include <thread>
49
#include <unordered_map>
50
#include <unordered_set>
51
#include <utility>
52
#include <vector>
53
54
#include "common/config.h"
55
#include "common/status.h"
56
#include "core/block/block.h"
57
#include "core/column/column.h"
58
#include "core/data_type/data_type.h"
59
#include "exec/sink/vrow_distribution.h"
60
#include "exec/sink/vtablet_block_convertor.h"
61
#include "exec/sink/vtablet_finder.h"
62
#include "exec/sink/writer/async_result_writer.h"
63
#include "exprs/vexpr_fwd.h"
64
#include "runtime/exec_env.h"
65
#include "runtime/memory/mem_tracker.h"
66
#include "runtime/runtime_profile.h"
67
#include "runtime/thread_context.h"
68
#include "storage/tablet_info.h"
69
#include "util/brpc_closure.h"
70
#include "util/stopwatch.hpp"
71
72
namespace doris {
73
class ObjectPool;
74
class RowDescriptor;
75
class RuntimeState;
76
class TDataSink;
77
class TExpr;
78
class Thread;
79
class ThreadPoolToken;
80
class TupleDescriptor;
81
82
// The counter of add_batch rpc of a single node
83
struct AddBatchCounter {
84
    // total execution time of a add_batch rpc
85
    int64_t add_batch_execution_time_us = 0;
86
    // lock waiting time in a add_batch rpc
87
    int64_t add_batch_wait_execution_time_us = 0;
88
    // number of add_batch call
89
    int64_t add_batch_num = 0;
90
    // time passed between marked close and finish close
91
    int64_t close_wait_time_ms = 0;
92
93
554
    AddBatchCounter& operator+=(const AddBatchCounter& rhs) {
94
554
        add_batch_execution_time_us += rhs.add_batch_execution_time_us;
95
554
        add_batch_wait_execution_time_us += rhs.add_batch_wait_execution_time_us;
96
554
        add_batch_num += rhs.add_batch_num;
97
554
        close_wait_time_ms += rhs.close_wait_time_ms;
98
554
        return *this;
99
554
    }
100
0
    friend AddBatchCounter operator+(const AddBatchCounter& lhs, const AddBatchCounter& rhs) {
101
0
        AddBatchCounter sum = lhs;
102
0
        sum += rhs;
103
0
        return sum;
104
0
    }
105
};
106
107
struct WriteBlockCallbackContext {
108
    std::atomic<bool> _is_last_rpc {false};
109
};
110
111
// It's very error-prone to guarantee the handler capture vars' & this closure's destruct sequence.
112
// So using create() to get the closure pointer is recommended. We can delete the closure ptr before the capture vars destruction.
113
// Delete this point is safe, don't worry about RPC callback will run after WriteBlockCallback deleted.
114
// "Ping-Pong" between sender and receiver, `try_set_in_flight` when send, `clear_in_flight` after rpc failure or callback,
115
// then next send will start, and it will wait for the rpc callback to complete when it is destroyed.
116
template <typename T>
117
class WriteBlockCallback final : public ::doris::DummyBrpcCallback<T> {
118
    ENABLE_FACTORY_CREATOR(WriteBlockCallback);
119
120
public:
121
556
    WriteBlockCallback() : cid(INVALID_BTHREAD_ID) {}
122
556
    ~WriteBlockCallback() override = default;
123
124
556
    void addFailedHandler(const std::function<void(const WriteBlockCallbackContext&)>& fn) {
125
556
        failed_handler = fn;
126
556
    }
127
    void addSuccessHandler(
128
556
            const std::function<void(const T&, const WriteBlockCallbackContext&)>& fn) {
129
556
        success_handler = fn;
130
556
    }
131
132
0
    void join() override {
133
        // We rely on in_flight to assure one rpc is running,
134
        // while cid is not reliable due to memory order.
135
        // in_flight is written before getting callid,
136
        // so we can not use memory fence to synchronize.
137
0
        while (_packet_in_flight) {
138
            // cid here is complicated
139
0
            if (cid != INVALID_BTHREAD_ID) {
140
                // actually cid may be the last rpc call id.
141
0
                brpc::Join(cid);
142
0
            }
143
0
            if (_packet_in_flight) {
144
0
                std::this_thread::sleep_for(std::chrono::milliseconds(10));
145
0
            }
146
0
        }
147
0
    }
148
149
    // plz follow this order: reset() -> set_in_flight() -> send brpc batch
150
556
    void reset() {
151
556
        ::doris::DummyBrpcCallback<T>::cntl_->Reset();
152
556
        cid = ::doris::DummyBrpcCallback<T>::cntl_->call_id();
153
556
    }
154
155
    // if _packet_in_flight == false, set it to true. Return true.
156
    // if _packet_in_flight == true, Return false.
157
86.6k
    bool try_set_in_flight() {
158
86.6k
        bool value = false;
159
86.6k
        return _packet_in_flight.compare_exchange_strong(value, true);
160
86.6k
    }
161
162
86.2k
    void clear_in_flight() { _packet_in_flight = false; }
163
164
    bool is_packet_in_flight() { return _packet_in_flight; }
165
166
556
    void end_mark() {
167
556
        DCHECK(_ctx._is_last_rpc == false);
168
556
        _ctx._is_last_rpc = true;
169
556
    }
170
171
554
    void call() override {
172
554
        DCHECK(_packet_in_flight);
173
554
        if (::doris::DummyBrpcCallback<T>::cntl_->Failed()) {
174
0
            LOG(WARNING) << "failed to send brpc batch, error="
175
0
                         << berror(::doris::DummyBrpcCallback<T>::cntl_->ErrorCode())
176
0
                         << ", error_text=" << ::doris::DummyBrpcCallback<T>::cntl_->ErrorText();
177
0
            failed_handler(_ctx);
178
554
        } else {
179
554
            success_handler(*(::doris::DummyBrpcCallback<T>::response_), _ctx);
180
554
        }
181
554
        clear_in_flight();
182
554
    }
183
184
private:
185
    brpc::CallId cid;
186
    std::atomic<bool> _packet_in_flight {false};
187
    WriteBlockCallbackContext _ctx;
188
    std::function<void(const WriteBlockCallbackContext&)> failed_handler;
189
    std::function<void(const T&, const WriteBlockCallbackContext&)> success_handler;
190
};
191
192
class IndexChannel;
193
class VTabletWriter;
194
195
class VNodeChannelStat {
196
public:
197
556
    VNodeChannelStat& operator+=(const VNodeChannelStat& stat) {
198
556
        mem_exceeded_block_ns += stat.mem_exceeded_block_ns;
199
556
        where_clause_ns += stat.where_clause_ns;
200
556
        append_node_channel_ns += stat.append_node_channel_ns;
201
556
        return *this;
202
556
    };
203
204
    int64_t mem_exceeded_block_ns = 0;
205
    int64_t where_clause_ns = 0;
206
    int64_t append_node_channel_ns = 0;
207
};
208
209
struct WriterStats {
210
    int64_t serialize_batch_ns = 0;
211
    int64_t queue_push_lock_ns = 0;
212
    int64_t actual_consume_ns = 0;
213
    int64_t total_add_batch_exec_time_ns = 0;
214
    int64_t max_add_batch_exec_time_ns = 0;
215
    int64_t total_wait_exec_time_ns = 0;
216
    int64_t max_wait_exec_time_ns = 0;
217
    int64_t total_add_batch_num = 0;
218
    int64_t num_node_channels = 0;
219
    int64_t load_back_pressure_version_time_ms = 0;
220
    VNodeChannelStat channel_stat;
221
};
222
223
// pair<row_id,tablet_id>
224
using Payload = std::pair<std::unique_ptr<IColumn::Selector>, std::vector<int64_t>>;
225
226
// every NodeChannel keeps a data transmission channel with one BE. for multiple times open, it has a dozen of requests and corresponding closures.
227
class VNodeChannel {
228
public:
229
    VNodeChannel(VTabletWriter* parent, IndexChannel* index_channel, int64_t node_id,
230
                 bool is_incremental = false);
231
232
    ~VNodeChannel();
233
234
    // called before open, used to add tablet located in this backend. called by IndexChannel::init
235
2.96k
    void add_tablet(const TTabletWithPartition& tablet) { _tablets_wait_open.emplace_back(tablet); }
236
0
    std::string debug_tablets() const {
237
0
        std::stringstream ss;
238
0
        for (const auto& tab : _all_tablets) {
239
0
            tab.printTo(ss);
240
0
            ss << '\n';
241
0
        }
242
0
        return ss.str();
243
0
    }
244
245
0
    void add_slave_tablet_nodes(int64_t tablet_id, const std::vector<int64_t>& slave_nodes) {
246
0
        _slave_tablet_nodes[tablet_id] = slave_nodes;
247
0
    }
248
249
    // this function is NON_REENTRANT
250
    Status init(RuntimeState* state);
251
    /// these two functions will call open_internal. should keep that clear --- REENTRANT
252
    // build corresponding connect to BE. NON-REENTRANT
253
    void open();
254
    // for auto partition, we use this to open more tablet. KEEP IT REENTRANT
255
    void incremental_open();
256
    // this will block until all request transmission which were opened or incremental opened finished.
257
    // this function will called multi times. NON_REENTRANT
258
    Status open_wait();
259
260
    Status add_block(Block* block, const Payload* payload);
261
262
    // @return: 1 if running, 0 if finished.
263
    // @caller: VOlapTabletSink::_send_batch_process. it's a continual asynchronous process.
264
    int try_send_and_fetch_status(RuntimeState* state,
265
                                  std::unique_ptr<ThreadPoolToken>& thread_pool_token);
266
    // when there's pending block found by try_send_and_fetch_status(), we will awake a thread to send it.
267
    void try_send_pending_block(RuntimeState* state);
268
269
    void clear_all_blocks();
270
271
    // two ways to stop channel:
272
    // 1. mark_close()->close_wait() PS. close_wait() will block waiting for the last AddBatch rpc response.
273
    // 2. just cancel()
274
    // hang_wait = true will make reciever hang until all sender mark_closed.
275
    void mark_close(bool hang_wait = false);
276
277
556
    bool is_closed() const { return _is_closed; }
278
556
    bool is_cancelled() const { return _cancelled; }
279
556
    std::string get_cancel_msg() {
280
556
        std::lock_guard<std::mutex> l(_cancel_msg_lock);
281
556
        if (!_cancel_msg.empty()) {
282
0
            return _cancel_msg;
283
0
        }
284
556
        return fmt::format("{} is cancelled", channel_info());
285
556
    }
286
287
    // two ways to stop channel:
288
    // 1. mark_close()->close_wait() PS. close_wait() will block waiting for the last AddBatch rpc response.
289
    // 2. just cancel()
290
    Status close_wait(RuntimeState* state, bool* is_closed);
291
292
    Status after_close_handle(
293
            RuntimeState* state, WriterStats* writer_stats,
294
            std::unordered_map<int64_t, AddBatchCounter>* node_add_batch_counter_map);
295
296
    Status check_status();
297
298
    void cancel(const std::string& cancel_msg);
299
300
    void time_report(std::unordered_map<int64_t, AddBatchCounter>* add_batch_counter_map,
301
556
                     WriterStats* writer_stats) const {
302
556
        if (add_batch_counter_map != nullptr) {
303
554
            (*add_batch_counter_map)[_node_id] += _add_batch_counter;
304
554
            (*add_batch_counter_map)[_node_id].close_wait_time_ms = _close_time_ms;
305
554
        }
306
556
        if (writer_stats != nullptr) {
307
554
            writer_stats->serialize_batch_ns += _serialize_batch_ns;
308
554
            writer_stats->channel_stat += _stat;
309
554
            writer_stats->queue_push_lock_ns += _queue_push_lock_ns;
310
554
            writer_stats->actual_consume_ns += _actual_consume_ns;
311
554
            writer_stats->total_add_batch_exec_time_ns +=
312
554
                    (_add_batch_counter.add_batch_execution_time_us * 1000);
313
554
            writer_stats->total_wait_exec_time_ns +=
314
554
                    (_add_batch_counter.add_batch_wait_execution_time_us * 1000);
315
554
            writer_stats->total_add_batch_num += _add_batch_counter.add_batch_num;
316
554
            writer_stats->load_back_pressure_version_time_ms +=
317
554
                    _load_back_pressure_version_block_ms;
318
554
        }
319
556
    }
320
321
0
    int64_t node_id() const { return _node_id; }
322
0
    std::string host() const { return _node_info.host; }
323
0
    std::string name() const { return _name; }
324
325
1.11k
    std::string channel_info() const {
326
1.11k
        return fmt::format("{}, {}, node={}:{}", _name, _load_info, _node_info.host,
327
1.11k
                           _node_info.brpc_port);
328
1.11k
    }
329
330
0
    size_t get_pending_bytes() { return _pending_batches_bytes; }
331
332
0
    bool is_incremental() const { return _is_incremental; }
333
334
0
    int64_t write_bytes() const { return _write_bytes.load(); }
335
336
protected:
337
    // make a real open request for relative BE's load channel.
338
    void _open_internal(bool is_incremental);
339
340
    void _close_check();
341
    void _cancel_with_msg(const std::string& msg);
342
343
    void _add_block_success_callback(const PTabletWriterAddBlockResult& result,
344
                                     const WriteBlockCallbackContext& ctx);
345
    void _add_block_failed_callback(const WriteBlockCallbackContext& ctx);
346
347
    void _refresh_back_pressure_version_wait_time(
348
            const ::google::protobuf::RepeatedPtrField<::doris::PTabletLoadRowsetInfo>&
349
                    tablet_load_infos);
350
351
    VTabletWriter* _parent = nullptr;
352
    IndexChannel* _index_channel = nullptr;
353
    int64_t _node_id = -1;
354
    std::string _load_info;
355
    std::string _name;
356
357
    std::shared_ptr<MemTracker> _node_channel_tracker;
358
    int64_t _load_mem_limit = -1;
359
360
    TupleDescriptor* _tuple_desc = nullptr;
361
    NodeInfo _node_info;
362
363
    // this should be set in init() using config
364
    int _rpc_timeout_ms = 60000;
365
    int64_t _next_packet_seq = 0;
366
    MonotonicStopWatch _timeout_watch;
367
368
    // the timestamp when this node channel be marked closed and finished closed
369
    uint64_t _close_time_ms = 0;
370
371
    // user cancel or get some errors
372
    std::atomic<bool> _cancelled {false};
373
    std::mutex _cancel_msg_lock;
374
    std::string _cancel_msg;
375
376
    // send finished means the consumer thread which send the rpc can exit
377
    std::atomic<bool> _send_finished {false};
378
379
    // add batches finished means the last rpc has be response, used to check whether this channel can be closed
380
    std::atomic<bool> _add_batches_finished {false}; // reuse for vectorized
381
382
    bool _eos_is_produced {false}; // only for restricting producer behaviors
383
384
    std::unique_ptr<RowDescriptor> _row_desc;
385
    int _batch_size = 0;
386
387
    // limit _pending_batches size
388
    std::atomic<size_t> _pending_batches_bytes {0};
389
    size_t _max_pending_batches_bytes {(size_t)config::nodechannel_pending_queue_max_bytes};
390
    std::mutex _pending_batches_lock;          // reuse for vectorized
391
    std::atomic<int> _pending_batches_num {0}; // reuse for vectorized
392
393
    std::shared_ptr<PBackendService_Stub> _stub;
394
    // because we have incremantal open, we should keep one relative closure for one request. it's similarly for adding block.
395
    std::vector<std::shared_ptr<DummyBrpcCallback<PTabletWriterOpenResult>>> _open_callbacks;
396
397
    std::vector<TTabletWithPartition> _all_tablets;
398
    std::vector<TTabletWithPartition> _tablets_wait_open;
399
    // map from tablet_id to node_id where slave replicas locate in
400
    std::unordered_map<int64_t, std::vector<int64_t>> _slave_tablet_nodes;
401
    std::vector<TTabletCommitInfo> _tablet_commit_infos;
402
403
    AddBatchCounter _add_batch_counter;
404
    std::atomic<int64_t> _serialize_batch_ns {0};
405
    std::atomic<int64_t> _queue_push_lock_ns {0};
406
    std::atomic<int64_t> _actual_consume_ns {0};
407
    std::atomic<int64_t> _load_back_pressure_version_block_ms {0};
408
409
    VNodeChannelStat _stat;
410
    // lock to protect _is_closed.
411
    // The methods in the IndexChannel are called back in the RpcClosure in the NodeChannel.
412
    // However, this rpc callback may occur after the whole task is finished (e.g. due to network latency),
413
    // and by that time the IndexChannel may have been destructured, so we should not call the
414
    // IndexChannel methods anymore, otherwise the BE will crash.
415
    // Therefore, we use the _is_closed and _closed_lock to ensure that the RPC callback
416
    // function will not call the IndexChannel method after the NodeChannel is closed.
417
    // The IndexChannel is definitely accessible until the NodeChannel is closed.
418
    std::mutex _closed_lock;
419
    bool _is_closed = false;
420
    bool _inited = false;
421
422
    RuntimeState* _state = nullptr;
423
    // A context lock for callbacks, the callback has to lock the ctx, to avoid
424
    // the object is deleted during callback is running.
425
    std::weak_ptr<TaskExecutionContext> _task_exec_ctx;
426
    // rows number received per tablet, tablet_id -> rows_num
427
    std::vector<std::pair<int64_t, int64_t>> _tablets_received_rows;
428
    // rows number filtered per tablet, tablet_id -> filtered_rows_num
429
    std::vector<std::pair<int64_t, int64_t>> _tablets_filtered_rows;
430
431
    // build a _cur_mutable_block and push into _pending_blocks. when not building, this block is empty.
432
    std::unique_ptr<MutableBlock> _cur_mutable_block;
433
    std::shared_ptr<PTabletWriterAddBlockRequest> _cur_add_block_request;
434
435
    using AddBlockReq =
436
            std::pair<std::unique_ptr<MutableBlock>, std::shared_ptr<PTabletWriterAddBlockRequest>>;
437
    std::queue<AddBlockReq> _pending_blocks;
438
    // send block to slave BE rely on this. dont reconstruct it.
439
    std::shared_ptr<WriteBlockCallback<PTabletWriterAddBlockResult>> _send_block_callback = nullptr;
440
441
    int64_t _wg_id = -1;
442
443
    bool _is_incremental;
444
445
    std::atomic<int64_t> _write_bytes {0};
446
    std::atomic<int64_t> _load_back_pressure_version_wait_time_ms {0};
447
};
448
449
// an IndexChannel is related to specific table and its rollup and mv
450
class IndexChannel {
451
public:
452
    IndexChannel(VTabletWriter* parent, int64_t index_id, VExprContextSPtr where_clause)
453
550
            : _parent(parent), _index_id(index_id), _where_clause(std::move(where_clause)) {
454
550
        _index_channel_tracker =
455
550
                std::make_unique<MemTracker>("IndexChannel:indexID=" + std::to_string(_index_id));
456
550
    }
457
556
    ~IndexChannel() = default;
458
459
    // allow to init multi times, for incremental open more tablets for one index(table)
460
    Status init(RuntimeState* state, const std::vector<TTabletWithPartition>& tablets,
461
                bool incremental = false);
462
463
    void for_each_node_channel(
464
89.3k
            const std::function<void(const std::shared_ptr<VNodeChannel>&)>& func) {
465
89.3k
        for (auto& it : _node_channels) {
466
89.3k
            func(it.second);
467
89.3k
        }
468
89.3k
    }
469
470
    void for_init_node_channel(
471
0
            const std::function<void(const std::shared_ptr<VNodeChannel>&)>& func) {
472
0
        for (auto& it : _node_channels) {
473
0
            if (!it.second->is_incremental()) {
474
0
                func(it.second);
475
0
            }
476
0
        }
477
0
    }
478
479
    void for_inc_node_channel(
480
0
            const std::function<void(const std::shared_ptr<VNodeChannel>&)>& func) {
481
0
        for (auto& it : _node_channels) {
482
0
            if (it.second->is_incremental()) {
483
0
                func(it.second);
484
0
            }
485
0
        }
486
0
    }
487
488
0
    std::unordered_set<int64_t> init_node_channel_ids() {
489
0
        std::unordered_set<int64_t> node_channel_ids;
490
0
        for (auto& it : _node_channels) {
491
0
            if (!it.second->is_incremental()) {
492
0
                node_channel_ids.insert(it.first);
493
0
            }
494
0
        }
495
0
        return node_channel_ids;
496
0
    }
497
498
0
    std::unordered_set<int64_t> inc_node_channel_ids() {
499
0
        std::unordered_set<int64_t> node_channel_ids;
500
0
        for (auto& it : _node_channels) {
501
0
            if (it.second->is_incremental()) {
502
0
                node_channel_ids.insert(it.first);
503
0
            }
504
0
        }
505
0
        return node_channel_ids;
506
0
    }
507
508
556
    std::unordered_set<int64_t> each_node_channel_ids() {
509
556
        std::unordered_set<int64_t> node_channel_ids;
510
556
        for (auto& it : _node_channels) {
511
556
            node_channel_ids.insert(it.first);
512
556
        }
513
556
        return node_channel_ids;
514
556
    }
515
516
556
    bool has_incremental_node_channel() const { return _has_inc_node; }
517
518
    void mark_as_failed(const VNodeChannel* node_channel, const std::string& err,
519
                        int64_t tablet_id = -1);
520
    Status check_intolerable_failure();
521
522
    Status close_wait(RuntimeState* state, WriterStats* writer_stats,
523
                      std::unordered_map<int64_t, AddBatchCounter>* node_add_batch_counter_map,
524
                      std::unordered_set<int64_t> unfinished_node_channel_ids,
525
                      bool need_wait_after_quorum_success);
526
527
    Status check_each_node_channel_close(
528
            std::unordered_set<int64_t>* unfinished_node_channel_ids,
529
            std::unordered_map<int64_t, AddBatchCounter>* node_add_batch_counter_map,
530
            WriterStats* writer_stats, Status status);
531
532
    // set error tablet info in runtime state, so that it can be returned to FE.
533
    void set_error_tablet_in_state(RuntimeState* state);
534
535
88.6k
    size_t num_node_channels() const { return _node_channels.size(); }
536
537
0
    size_t get_pending_bytes() const {
538
0
        size_t mem_consumption = 0;
539
0
        for (const auto& kv : _node_channels) {
540
0
            mem_consumption += kv.second->get_pending_bytes();
541
0
        }
542
0
        return mem_consumption;
543
0
    }
544
545
    void set_tablets_received_rows(
546
            const std::vector<std::pair<int64_t, int64_t>>& tablets_received_rows, int64_t node_id);
547
548
    void set_tablets_filtered_rows(
549
            const std::vector<std::pair<int64_t, int64_t>>& tablets_filtered_rows, int64_t node_id);
550
551
0
    int64_t num_rows_filtered() {
552
        // the Unique table has no roll up or materilized view
553
        // we just add up filtered rows from all partitions
554
0
        return std::accumulate(_tablets_filtered_rows.cbegin(), _tablets_filtered_rows.cend(), 0,
555
0
                               [](int64_t sum, const auto& a) { return sum + a.second[0].second; });
556
0
    }
557
558
    // check whether the rows num written by different replicas is consistent
559
    Status check_tablet_received_rows_consistency();
560
561
    // check whether the rows num filtered by different replicas is consistent
562
    Status check_tablet_filtered_rows_consistency();
563
564
556
    void set_start_time(const int64_t& start_time) { _start_time = start_time; }
565
566
0
    VExprContextSPtr get_where_clause() { return _where_clause; }
567
568
private:
569
    friend class VNodeChannel;
570
    friend class VTabletWriter;
571
    friend class VRowDistribution;
572
573
    int _max_failed_replicas(int64_t tablet_id);
574
575
    int _load_required_replicas_num(int64_t tablet_id);
576
577
    bool _quorum_success(const std::unordered_set<int64_t>& unfinished_node_channel_ids,
578
                         const std::unordered_set<int64_t>& need_finish_tablets);
579
580
    int64_t _calc_max_wait_time_ms(const std::unordered_set<int64_t>& unfinished_node_channel_ids);
581
582
    VTabletWriter* _parent = nullptr;
583
    int64_t _index_id;
584
    VExprContextSPtr _where_clause;
585
586
    // from backend channel to tablet_id
587
    // ATTN: must be placed before `_node_channels` and `_channels_by_tablet`.
588
    // Because the destruct order of objects is opposite to the creation order.
589
    // So NodeChannel will be destructured first.
590
    // And the destructor function of NodeChannel waits for all RPCs to finish.
591
    // This ensures that it is safe to use `_tablets_by_channel` in the callback function for the end of the RPC.
592
    std::unordered_map<int64_t, std::unordered_set<int64_t>> _tablets_by_channel;
593
    // BeId -> channel
594
    std::unordered_map<int64_t, std::shared_ptr<VNodeChannel>> _node_channels;
595
    // from tablet_id to backend channel
596
    std::unordered_map<int64_t, std::vector<std::shared_ptr<VNodeChannel>>> _channels_by_tablet;
597
    bool _has_inc_node = false;
598
599
    // lock to protect _failed_channels and _failed_channels_msgs
600
    mutable std::mutex _fail_lock;
601
    // key is tablet_id, value is a set of failed node id
602
    std::unordered_map<int64_t, std::unordered_set<int64_t>> _failed_channels;
603
    // key is tablet_id, value is error message
604
    std::unordered_map<int64_t, std::string> _failed_channels_msgs;
605
    Status _intolerable_failure_status = Status::OK();
606
607
    std::unique_ptr<MemTracker> _index_channel_tracker;
608
    // rows num received by DeltaWriter per tablet, tablet_id -> <node_Id, rows_num>
609
    // used to verify whether the rows num received by different replicas is consistent
610
    std::map<int64_t, std::vector<std::pair<int64_t, int64_t>>> _tablets_received_rows;
611
612
    // rows num filtered by DeltaWriter per tablet, tablet_id -> <node_Id, filtered_rows_num>
613
    // used to verify whether the rows num filtered by different replicas is consistent
614
    std::map<int64_t, std::vector<std::pair<int64_t, int64_t>>> _tablets_filtered_rows;
615
616
    int64_t _start_time = 0;
617
};
618
} // namespace doris
619
620
namespace doris {
621
//
622
// write result to file
623
class VTabletWriter final : public AsyncResultWriter {
624
public:
625
    VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs,
626
                  std::shared_ptr<Dependency> dep, std::shared_ptr<Dependency> fin_dep);
627
628
    Status write(RuntimeState* state, Block& block) override;
629
630
    Status close(Status) override;
631
632
    Status open(RuntimeState* state, RuntimeProfile* profile) override;
633
634
    // the consumer func of sending pending batches in every NodeChannel.
635
    // use polling & NodeChannel::try_send_and_fetch_status() to achieve nonblocking sending.
636
    // only focus on pending batches and channel status, the internal errors of NodeChannels will be handled by the producer
637
    void _send_batch_process();
638
639
    Status on_partitions_created(TCreatePartitionResult* result);
640
641
    Status _send_new_partition_batch();
642
643
private:
644
    friend class VNodeChannel;
645
    friend class IndexChannel;
646
647
    using ChannelDistributionPayload = std::unordered_map<VNodeChannel*, Payload>;
648
    using ChannelDistributionPayloadVec = std::vector<std::unordered_map<VNodeChannel*, Payload>>;
649
650
    Status _init_row_distribution();
651
652
    Status _init(RuntimeState* state, RuntimeProfile* profile);
653
654
    void _generate_one_index_channel_payload(RowPartTabletIds& row_part_tablet_tuple,
655
                                             int32_t index_idx,
656
                                             ChannelDistributionPayload& channel_payload);
657
658
    void _generate_index_channels_payloads(std::vector<RowPartTabletIds>& row_part_tablet_ids,
659
                                           ChannelDistributionPayloadVec& payload);
660
661
    void _cancel_all_channel(Status status);
662
663
    Status _incremental_open_node_channel(const std::vector<TOlapTablePartition>& partitions);
664
665
    void _do_try_close(RuntimeState* state, const Status& exec_status);
666
667
    void _build_tablet_replica_info(const int64_t tablet_id, VOlapTablePartition* partition);
668
669
    TDataSink _t_sink;
670
671
    std::shared_ptr<MemTracker> _mem_tracker;
672
673
    ObjectPool* _pool = nullptr;
674
675
    bthread_t _sender_thread = 0;
676
677
    // unique load id
678
    PUniqueId _load_id;
679
    int64_t _txn_id = -1;
680
    int _num_replicas = -1;
681
    int _tuple_desc_id = -1;
682
683
    // this is tuple descriptor of destination OLAP table
684
    TupleDescriptor* _output_tuple_desc = nullptr;
685
    RowDescriptor* _output_row_desc = nullptr;
686
687
    // number of senders used to insert into OlapTable, if we only support single node insert,
688
    // all data from select should collectted and then send to OlapTable.
689
    // To support multiple senders, we maintain a channel for each sender.
690
    int _sender_id = -1;
691
    int _num_senders = -1;
692
    bool _is_high_priority = false;
693
694
    // TODO(zc): think about cache this data
695
    std::shared_ptr<OlapTableSchemaParam> _schema;
696
    OlapTableLocationParam* _location = nullptr;
697
    bool _write_single_replica = false;
698
    OlapTableLocationParam* _slave_location = nullptr;
699
    DorisNodesInfo* _nodes_info = nullptr;
700
701
    std::unique_ptr<OlapTabletFinder> _tablet_finder;
702
703
    // index_channel
704
    bthread::Mutex _stop_check_channel;
705
    std::vector<std::shared_ptr<IndexChannel>> _channels;
706
    std::unordered_map<int64_t, std::shared_ptr<IndexChannel>> _index_id_to_channel;
707
708
    std::unique_ptr<ThreadPoolToken> _send_batch_thread_pool_token;
709
710
    // support only one partition column now
711
    std::vector<std::vector<TStringLiteral>> _partitions_need_create;
712
713
    std::unique_ptr<OlapTableBlockConvertor> _block_convertor;
714
    // Stats for this
715
    int64_t _send_data_ns = 0;
716
    int64_t _number_input_rows = 0;
717
    int64_t _number_output_rows = 0;
718
    int64_t _filter_ns = 0;
719
720
    MonotonicStopWatch _row_distribution_watch;
721
722
    RuntimeProfile::Counter* _input_rows_counter = nullptr;
723
    RuntimeProfile::Counter* _output_rows_counter = nullptr;
724
    RuntimeProfile::Counter* _filtered_rows_counter = nullptr;
725
    RuntimeProfile::Counter* _send_data_timer = nullptr;
726
    RuntimeProfile::Counter* _row_distribution_timer = nullptr;
727
    RuntimeProfile::Counter* _append_node_channel_timer = nullptr;
728
    RuntimeProfile::Counter* _filter_timer = nullptr;
729
    RuntimeProfile::Counter* _where_clause_timer = nullptr;
730
    RuntimeProfile::Counter* _add_partition_request_timer = nullptr;
731
    RuntimeProfile::Counter* _wait_mem_limit_timer = nullptr;
732
    RuntimeProfile::Counter* _validate_data_timer = nullptr;
733
    RuntimeProfile::Counter* _open_timer = nullptr;
734
    RuntimeProfile::Counter* _close_timer = nullptr;
735
    RuntimeProfile::Counter* _non_blocking_send_timer = nullptr;
736
    RuntimeProfile::Counter* _non_blocking_send_work_timer = nullptr;
737
    RuntimeProfile::Counter* _serialize_batch_timer = nullptr;
738
    RuntimeProfile::Counter* _total_add_batch_exec_timer = nullptr;
739
    RuntimeProfile::Counter* _max_add_batch_exec_timer = nullptr;
740
    RuntimeProfile::Counter* _total_wait_exec_timer = nullptr;
741
    RuntimeProfile::Counter* _max_wait_exec_timer = nullptr;
742
    RuntimeProfile::Counter* _add_batch_number = nullptr;
743
    RuntimeProfile::Counter* _num_node_channels = nullptr;
744
    RuntimeProfile::Counter* _load_back_pressure_version_time_ms = nullptr;
745
746
    // the timeout of load channels opened by this tablet sink. in second
747
    int64_t _load_channel_timeout_s = 0;
748
    // the load txn absolute expiration time.
749
    int64_t _txn_expiration = 0;
750
751
    int32_t _send_batch_parallelism = 1;
752
    // Save the status of try_close() and close() method
753
    Status _close_status;
754
    // if we called try_close(), for auto partition the periodic send thread should stop if it's still waiting for node channels first-time open.
755
    bool _try_close = false;
756
    // for non-pipeline, if close() did something, close_wait() should wait it.
757
    bool _close_wait = false;
758
    bool _inited = false;
759
    bool _write_file_cache = false;
760
761
    // User can change this config at runtime, avoid it being modified during query or loading process.
762
    bool _transfer_large_data_by_brpc = false;
763
764
    VOlapTablePartitionParam* _vpartition = nullptr;
765
766
    RuntimeState* _state = nullptr; // not owned, set when open
767
768
    VRowDistribution _row_distribution;
769
    // reuse to avoid frequent memory allocation and release.
770
    std::vector<RowPartTabletIds> _row_part_tablet_ids;
771
772
    // tablet_id -> <total replicas num, load required replicas num>
773
    std::unordered_map<int64_t, std::pair<int, int>> _tablet_replica_info;
774
};
775
} // namespace doris