Coverage Report

Created: 2026-06-29 14:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/writer/vtablet_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/writer/vtablet_writer.h"
19
20
#include <brpc/http_method.h>
21
#include <bthread/bthread.h>
22
#include <fmt/format.h>
23
#include <gen_cpp/DataSinks_types.h>
24
#include <gen_cpp/Descriptors_types.h>
25
#include <gen_cpp/Exprs_types.h>
26
#include <gen_cpp/FrontendService.h>
27
#include <gen_cpp/FrontendService_types.h>
28
#include <gen_cpp/HeartbeatService_types.h>
29
#include <gen_cpp/Metrics_types.h>
30
#include <gen_cpp/Types_types.h>
31
#include <gen_cpp/data.pb.h>
32
#include <gen_cpp/internal_service.pb.h>
33
#include <glog/logging.h>
34
#include <google/protobuf/stubs/common.h>
35
#include <sys/param.h>
36
37
#include <algorithm>
38
#include <initializer_list>
39
#include <memory>
40
#include <mutex>
41
#include <sstream>
42
#include <string>
43
#include <unordered_map>
44
#include <unordered_set>
45
#include <utility>
46
#include <vector>
47
48
#include "cloud/config.h"
49
#include "common/compiler_util.h" // IWYU pragma: keep
50
#include "common/config.h"
51
#include "common/logging.h"
52
#include "common/metrics/doris_metrics.h"
53
#include "common/object_pool.h"
54
#include "common/signal_handler.h"
55
#include "common/status.h"
56
#include "core/block/block.h"
57
#include "core/column/column.h"
58
#include "core/column/column_const.h"
59
#include "core/data_type/data_type.h"
60
#include "core/data_type/data_type_nullable.h"
61
#include "cpp/sync_point.h"
62
#include "exec/sink/vrow_distribution.h"
63
#include "exec/sink/vtablet_block_convertor.h"
64
#include "exec/sink/vtablet_finder.h"
65
#include "exprs/vexpr.h"
66
#include "exprs/vexpr_fwd.h"
67
#include "runtime/descriptors.h"
68
#include "runtime/exec_env.h"
69
#include "runtime/memory/memory_reclamation.h"
70
#include "runtime/query_context.h"
71
#include "runtime/runtime_profile.h"
72
#include "runtime/runtime_state.h"
73
#include "runtime/thread_context.h"
74
#include "service/backend_options.h"
75
#include "storage/tablet_info.h"
76
#include "util/brpc_closure.h"
77
#include "util/debug_points.h"
78
#include "util/defer_op.h"
79
#include "util/mem_info.h"
80
#include "util/network_util.h"
81
#include "util/proto_util.h"
82
#include "util/threadpool.h"
83
#include "util/thrift_rpc_helper.h"
84
#include "util/thrift_util.h"
85
#include "util/time.h"
86
#include "util/uid_util.h"
87
88
namespace doris {
89
class TExpr;
90
91
bvar::Adder<int64_t> g_sink_write_bytes;
92
bvar::PerSecond<bvar::Adder<int64_t>> g_sink_write_bytes_per_second("sink_throughput_byte",
93
                                                                    &g_sink_write_bytes, 60);
94
bvar::Adder<int64_t> g_sink_write_rows;
95
bvar::PerSecond<bvar::Adder<int64_t>> g_sink_write_rows_per_second("sink_throughput_row",
96
                                                                   &g_sink_write_rows, 60);
97
bvar::Adder<int64_t> g_sink_load_back_pressure_version_time_ms(
98
        "load_back_pressure_version_time_ms");
99
100
static const OlapTableIndexTablets* find_partition_index(const VOlapTablePartition& partition,
101
0
                                                         int64_t index_id) {
102
0
    for (const auto& index : partition.indexes) {
103
0
        if (index.index_id == index_id) {
104
0
            return &index;
105
0
        }
106
0
    }
107
0
    return nullptr;
108
0
}
109
110
static int64_t adaptive_bucket_be_id(const VOlapTablePartition& partition,
111
0
                                     const OlapTableIndexTablets* index) {
112
0
    if (index != nullptr && index->__isset.bucket_be_id) {
113
0
        return index->bucket_be_id > 0 ? index->bucket_be_id : BackendOptions::get_backend_id();
114
0
    }
115
0
    return partition.bucket_be_id > 0 ? partition.bucket_be_id : BackendOptions::get_backend_id();
116
0
}
117
118
static const std::vector<int32_t>& adaptive_local_bucket_seqs(const VOlapTablePartition& partition,
119
0
                                                              const OlapTableIndexTablets* index) {
120
0
    if (index != nullptr && index->__isset.local_bucket_seqs) {
121
0
        return index->local_bucket_seqs;
122
0
    }
123
0
    return partition.local_bucket_seqs;
124
0
}
125
126
static constexpr int64_t CLOSE_WAIT_EVENT_FALLBACK_MS = 1000;
127
128
Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPartition>& tablets,
129
0
                          bool incremental) {
130
0
    SCOPED_CONSUME_MEM_TRACKER(_index_channel_tracker.get());
131
0
    for (const auto& tablet : tablets) {
132
        // First find the location BEs of this tablet
133
0
        auto* tablet_locations = _parent->_location->find_tablet(tablet.tablet_id);
134
0
        if (tablet_locations == nullptr) {
135
0
            return Status::InternalError("unknown tablet, tablet_id={}", tablet.tablet_id);
136
0
        }
137
0
        std::vector<std::shared_ptr<VNodeChannel>> channels;
138
        // For tablet, deal with its' all replica (in some node).
139
0
        for (auto& replica_node_id : tablet_locations->node_ids) {
140
0
            std::shared_ptr<VNodeChannel> channel;
141
0
            auto it = _node_channels.find(replica_node_id);
142
            // when we prepare for TableSink or incremental open tablet, we need init
143
0
            if (it == _node_channels.end()) {
144
                // NodeChannel is not added to the _parent->_pool.
145
                // Because the deconstruction of NodeChannel may take a long time to wait rpc finish.
146
                // but the ObjectPool will hold a spin lock to delete objects.
147
0
                channel =
148
0
                        std::make_shared<VNodeChannel>(_parent, this, replica_node_id, incremental);
149
0
                _node_channels.emplace(replica_node_id, channel);
150
                // incremental opened new node. when close we have use two-stage close.
151
0
                if (incremental) {
152
0
                    _has_inc_node = true;
153
0
                }
154
0
                VLOG_CRITICAL << "init new node for instance " << _parent->_sender_id
155
0
                              << ", node id:" << replica_node_id << ", incremantal:" << incremental;
156
0
            } else {
157
0
                channel = it->second;
158
0
            }
159
0
            channel->add_tablet(tablet);
160
0
            if (_parent->_tablet_finder->is_adaptive_random_bucket() && config::is_cloud_mode()) {
161
0
                for (const auto* part : _parent->_vpartition->get_partitions()) {
162
0
                    if (part->id != tablet.partition_id) {
163
0
                        continue;
164
0
                    }
165
0
                    const auto* index = find_partition_index(*part, _index_id);
166
0
                    const auto bucket_be_id = adaptive_bucket_be_id(*part, index);
167
0
                    if (bucket_be_id != replica_node_id) {
168
0
                        continue;
169
0
                    }
170
0
                    _channels_by_partition.emplace(tablet.partition_id, channel);
171
0
                    break;
172
0
                }
173
0
            }
174
0
            if (_parent->_write_single_replica) {
175
0
                auto* slave_location = _parent->_slave_location->find_tablet(tablet.tablet_id);
176
0
                if (slave_location != nullptr) {
177
0
                    channel->add_slave_tablet_nodes(tablet.tablet_id, slave_location->node_ids);
178
0
                }
179
0
            }
180
0
            channels.push_back(channel);
181
0
            _tablets_by_channel[replica_node_id].insert(tablet.tablet_id);
182
0
        }
183
0
        _channels_by_tablet.emplace(tablet.tablet_id, std::move(channels));
184
0
    }
185
0
    for (auto& it : _node_channels) {
186
0
        RETURN_IF_ERROR(it.second->init(state));
187
0
    }
188
0
    if (_where_clause != nullptr) {
189
0
        RETURN_IF_ERROR(_where_clause->prepare(state, *_parent->_output_row_desc));
190
0
        RETURN_IF_ERROR(_where_clause->open(state));
191
0
    }
192
193
0
    return Status::OK();
194
0
}
195
196
void IndexChannel::mark_as_failed(const VNodeChannel* node_channel, const std::string& err,
197
0
                                  int64_t tablet_id) {
198
0
    DCHECK(node_channel != nullptr);
199
0
    LOG(INFO) << "mark node_id:" << node_channel->channel_info() << " tablet_id: " << tablet_id
200
0
              << " as failed, err: " << err;
201
0
    auto node_id = node_channel->node_id();
202
0
    const auto& it = _tablets_by_channel.find(node_id);
203
0
    if (it == _tablets_by_channel.end()) {
204
0
        return;
205
0
    }
206
207
0
    {
208
0
        std::lock_guard<std::mutex> l(_fail_lock);
209
0
        if (tablet_id == -1) {
210
0
            for (const auto the_tablet_id : it->second) {
211
0
                _failed_channels[the_tablet_id].insert(node_id);
212
0
                _failed_channels_msgs.emplace(the_tablet_id,
213
0
                                              err + ", host: " + node_channel->host());
214
0
                if (_failed_channels[the_tablet_id].size() > _max_failed_replicas(the_tablet_id)) {
215
0
                    _intolerable_failure_status = Status::Error<ErrorCode::INTERNAL_ERROR, false>(
216
0
                            _failed_channels_msgs[the_tablet_id]);
217
0
                }
218
0
            }
219
0
        } else {
220
0
            _failed_channels[tablet_id].insert(node_id);
221
0
            _failed_channels_msgs.emplace(tablet_id, err + ", host: " + node_channel->host());
222
0
            if (_failed_channels[tablet_id].size() > _max_failed_replicas(tablet_id)) {
223
0
                _intolerable_failure_status = Status::Error<ErrorCode::INTERNAL_ERROR, false>(
224
0
                        _failed_channels_msgs[tablet_id]);
225
0
            }
226
0
        }
227
0
    }
228
0
}
229
230
0
int IndexChannel::_max_failed_replicas(int64_t tablet_id) {
231
0
    auto [total_replicas_num, load_required_replicas_num] =
232
0
            _parent->_tablet_replica_info[tablet_id];
233
0
    int max_failed_replicas = total_replicas_num == 0
234
0
                                      ? (_parent->_num_replicas - 1) / 2
235
0
                                      : total_replicas_num - load_required_replicas_num;
236
0
    return max_failed_replicas;
237
0
}
238
239
0
int IndexChannel::_load_required_replicas_num(int64_t tablet_id) {
240
0
    auto [total_replicas_num, load_required_replicas_num] =
241
0
            _parent->_tablet_replica_info[tablet_id];
242
0
    if (total_replicas_num == 0) {
243
0
        return (_parent->_num_replicas + 1) / 2;
244
0
    }
245
0
    return load_required_replicas_num;
246
0
}
247
248
0
Status IndexChannel::check_intolerable_failure() {
249
0
    std::lock_guard<std::mutex> l(_fail_lock);
250
0
    return _intolerable_failure_status;
251
0
}
252
253
0
void IndexChannel::set_error_tablet_in_state(RuntimeState* state) {
254
0
    std::vector<TErrorTabletInfo> error_tablet_infos;
255
256
0
    {
257
0
        std::lock_guard<std::mutex> l(_fail_lock);
258
0
        for (const auto& it : _failed_channels_msgs) {
259
0
            TErrorTabletInfo error_info;
260
0
            error_info.__set_tabletId(it.first);
261
0
            error_info.__set_msg(it.second);
262
0
            error_tablet_infos.emplace_back(error_info);
263
0
        }
264
0
    }
265
0
    state->add_error_tablet_infos(error_tablet_infos);
266
0
}
267
268
void IndexChannel::set_tablets_received_rows(
269
0
        const std::vector<std::pair<int64_t, int64_t>>& tablets_received_rows, int64_t node_id) {
270
0
    for (const auto& [tablet_id, rows_num] : tablets_received_rows) {
271
0
        _tablets_received_rows[tablet_id].emplace_back(node_id, rows_num);
272
0
    }
273
0
}
274
275
void IndexChannel::set_tablets_filtered_rows(
276
0
        const std::vector<std::pair<int64_t, int64_t>>& tablets_filtered_rows, int64_t node_id) {
277
0
    for (const auto& [tablet_id, rows_num] : tablets_filtered_rows) {
278
0
        _tablets_filtered_rows[tablet_id].emplace_back(node_id, rows_num);
279
0
    }
280
0
}
281
282
0
Status IndexChannel::check_tablet_received_rows_consistency() {
283
0
    for (auto& tablet : _tablets_received_rows) {
284
0
        for (size_t i = 0; i < tablet.second.size(); i++) {
285
0
            VLOG_NOTICE << "check_tablet_received_rows_consistency, load_id: " << _parent->_load_id
286
0
                        << ", txn_id: " << std::to_string(_parent->_txn_id)
287
0
                        << ", tablet_id: " << tablet.first
288
0
                        << ", node_id: " << tablet.second[i].first
289
0
                        << ", rows_num: " << tablet.second[i].second;
290
0
            if (i == 0) {
291
0
                continue;
292
0
            }
293
0
            if (tablet.second[i].second != tablet.second[0].second) {
294
0
                return Status::InternalError(
295
0
                        "rows num written by multi replicas doest't match, load_id={}, txn_id={}, "
296
0
                        "tablt_id={}, node_id={}, rows_num={}, node_id={}, rows_num={}",
297
0
                        print_id(_parent->_load_id), _parent->_txn_id, tablet.first,
298
0
                        tablet.second[i].first, tablet.second[i].second, tablet.second[0].first,
299
0
                        tablet.second[0].second);
300
0
            }
301
0
        }
302
0
    }
303
0
    return Status::OK();
304
0
}
305
306
0
Status IndexChannel::check_tablet_filtered_rows_consistency() {
307
0
    for (auto& tablet : _tablets_filtered_rows) {
308
0
        for (size_t i = 0; i < tablet.second.size(); i++) {
309
0
            VLOG_NOTICE << "check_tablet_filtered_rows_consistency, load_id: " << _parent->_load_id
310
0
                        << ", txn_id: " << std::to_string(_parent->_txn_id)
311
0
                        << ", tablet_id: " << tablet.first
312
0
                        << ", node_id: " << tablet.second[i].first
313
0
                        << ", rows_num: " << tablet.second[i].second;
314
0
            if (i == 0) {
315
0
                continue;
316
0
            }
317
0
            if (tablet.second[i].second != tablet.second[0].second) {
318
0
                return Status::InternalError(
319
0
                        "rows num filtered by multi replicas doest't match, load_id={}, txn_id={}, "
320
0
                        "tablt_id={}, node_id={}, rows_num={}, node_id={}, rows_num={}",
321
0
                        print_id(_parent->_load_id), _parent->_txn_id, tablet.first,
322
0
                        tablet.second[i].first, tablet.second[i].second, tablet.second[0].first,
323
0
                        tablet.second[0].second);
324
0
            }
325
0
        }
326
0
    }
327
0
    return Status::OK();
328
0
}
329
330
static Status cancel_channel_and_check_intolerable_failure(Status status,
331
                                                           const std::string& err_msg,
332
0
                                                           IndexChannel& ich, VNodeChannel& nch) {
333
0
    LOG(WARNING) << nch.channel_info() << ", close channel failed, err: " << err_msg;
334
0
    ich.mark_as_failed(&nch, err_msg, -1);
335
    // cancel the node channel in best effort
336
0
    nch.cancel(err_msg);
337
338
    // check if index has intolerable failure
339
0
    if (Status index_st = ich.check_intolerable_failure(); !index_st.ok()) {
340
0
        status = std::move(index_st);
341
0
    } else if (Status receive_st = ich.check_tablet_received_rows_consistency(); !receive_st.ok()) {
342
0
        status = std::move(receive_st);
343
0
    } else if (Status filter_st = ich.check_tablet_filtered_rows_consistency(); !filter_st.ok()) {
344
0
        status = std::move(filter_st);
345
0
    }
346
0
    return status;
347
0
}
348
349
0
void IndexChannel::wait_for_close_event(int64_t observed_version, int64_t timeout_ms) {
350
0
    std::unique_lock<bthread::Mutex> lock(_close_wait_mutex);
351
0
    if (observed_version != close_wait_version()) {
352
0
        return;
353
0
    }
354
0
    static_cast<void>(_close_wait_cv.wait_for(lock, timeout_ms * 1000));
355
0
}
356
357
0
void IndexChannel::notify_close_wait() {
358
0
    _close_wait_version.fetch_add(1, std::memory_order_acq_rel);
359
0
    std::lock_guard<bthread::Mutex> lock(_close_wait_mutex);
360
0
    _close_wait_cv.notify_all();
361
0
}
362
363
Status IndexChannel::close_wait(
364
        RuntimeState* state, WriterStats* writer_stats,
365
        std::unordered_map<int64_t, AddBatchCounter>* node_add_batch_counter_map,
366
        std::unordered_set<int64_t> unfinished_node_channel_ids,
367
0
        bool need_wait_after_quorum_success) {
368
0
    DBUG_EXECUTE_IF("IndexChannel.close_wait.timeout",
369
0
                    { return Status::TimedOut("injected timeout"); });
370
0
    Status status = Status::OK();
371
    // 1. wait quorum success
372
0
    std::unordered_set<int64_t> need_finish_tablets;
373
0
    auto partition_ids = _parent->_tablet_finder->partition_ids();
374
0
    for (const auto& part : _parent->_vpartition->get_partitions()) {
375
0
        if (partition_ids.contains(part->id)) {
376
0
            for (const auto& index : part->indexes) {
377
0
                for (const auto& tablet_id : index.tablets) {
378
0
                    need_finish_tablets.insert(tablet_id);
379
0
                }
380
0
            }
381
0
        }
382
0
    }
383
0
    while (true) {
384
0
        int64_t close_wait_version = this->close_wait_version();
385
0
        RETURN_IF_ERROR(check_each_node_channel_close(
386
0
                &unfinished_node_channel_ids, node_add_batch_counter_map, writer_stats, status));
387
0
        bool quorum_success = _quorum_success(unfinished_node_channel_ids, need_finish_tablets);
388
0
        if (unfinished_node_channel_ids.empty() || quorum_success) {
389
0
            LOG(INFO) << "quorum_success: " << quorum_success
390
0
                      << ", is all finished: " << unfinished_node_channel_ids.empty()
391
0
                      << ", txn_id: " << _parent->_txn_id
392
0
                      << ", load_id: " << print_id(_parent->_load_id);
393
0
            break;
394
0
        }
395
0
        wait_for_close_event(close_wait_version, CLOSE_WAIT_EVENT_FALLBACK_MS);
396
0
    }
397
398
    // 2. wait for all node channel to complete as much as possible
399
0
    if (!unfinished_node_channel_ids.empty() && need_wait_after_quorum_success) {
400
0
        int64_t arrival_quorum_success_time = UnixMillis();
401
0
        int64_t max_wait_time_ms = _calc_max_wait_time_ms(unfinished_node_channel_ids);
402
0
        while (true) {
403
0
            int64_t close_wait_version = this->close_wait_version();
404
0
            RETURN_IF_ERROR(check_each_node_channel_close(&unfinished_node_channel_ids,
405
0
                                                          node_add_batch_counter_map, writer_stats,
406
0
                                                          status));
407
0
            if (unfinished_node_channel_ids.empty()) {
408
0
                break;
409
0
            }
410
0
            int64_t elapsed_ms = UnixMillis() - arrival_quorum_success_time;
411
0
            if (elapsed_ms > max_wait_time_ms ||
412
0
                _parent->_load_channel_timeout_s - elapsed_ms / 1000 <
413
0
                        config::quorum_success_remaining_timeout_seconds) {
414
                // cancel unfinished node channel
415
0
                std::stringstream unfinished_node_channel_host_str;
416
0
                for (auto& it : unfinished_node_channel_ids) {
417
0
                    unfinished_node_channel_host_str << _node_channels[it]->host() << ",";
418
0
                    _node_channels[it]->cancel("timeout");
419
0
                }
420
0
                LOG(WARNING) << "reach max wait time, max_wait_time_ms: " << max_wait_time_ms
421
0
                             << ", cancel unfinished node channel and finish close"
422
0
                             << ", load id: " << print_id(_parent->_load_id)
423
0
                             << ", txn_id: " << _parent->_txn_id << ", unfinished node channel: "
424
0
                             << unfinished_node_channel_host_str.str();
425
0
                break;
426
0
            }
427
0
            wait_for_close_event(close_wait_version, std::min(CLOSE_WAIT_EVENT_FALLBACK_MS,
428
0
                                                              max_wait_time_ms - elapsed_ms));
429
0
        }
430
0
    }
431
0
    return status;
432
0
}
433
434
Status IndexChannel::check_each_node_channel_close(
435
        std::unordered_set<int64_t>* unfinished_node_channel_ids,
436
        std::unordered_map<int64_t, AddBatchCounter>* node_add_batch_counter_map,
437
0
        WriterStats* writer_stats, Status status) {
438
0
    Status final_status = Status::OK();
439
0
    for (auto& it : _node_channels) {
440
0
        std::shared_ptr<VNodeChannel> node_channel = it.second;
441
        // If the node channel is not in the unfinished_node_channel_ids,
442
        // it means the node channel is already closed.
443
0
        if (!unfinished_node_channel_ids->contains(it.first)) {
444
0
            continue;
445
0
        }
446
0
        bool node_channel_closed = false;
447
0
        auto close_status = it.second->close_wait(_parent->_state, &node_channel_closed);
448
0
        if (node_channel_closed) {
449
0
            close_status = it.second->after_close_handle(_parent->_state, writer_stats,
450
0
                                                         node_add_batch_counter_map);
451
0
            unfinished_node_channel_ids->erase(it.first);
452
0
        }
453
0
        DBUG_EXECUTE_IF("IndexChannel.check_each_node_channel_close.close_status_not_ok",
454
0
                        { close_status = Status::InternalError("injected close status not ok"); });
455
0
        if (!close_status.ok()) {
456
0
            final_status = cancel_channel_and_check_intolerable_failure(
457
0
                    std::move(final_status), close_status.to_string(), *this, *it.second);
458
0
        }
459
0
    }
460
461
0
    return final_status;
462
0
}
463
464
bool IndexChannel::_quorum_success(const std::unordered_set<int64_t>& unfinished_node_channel_ids,
465
0
                                   const std::unordered_set<int64_t>& need_finish_tablets) {
466
0
    if (!config::enable_quorum_success_write) {
467
0
        return false;
468
0
    }
469
0
    if (need_finish_tablets.empty()) [[unlikely]] {
470
0
        return false;
471
0
    }
472
473
    // 1. collect all write tablets and finished tablets
474
0
    std::unordered_map<int64_t, int64_t> finished_tablets_replica;
475
0
    for (const auto& [node_id, node_channel] : _node_channels) {
476
0
        if (unfinished_node_channel_ids.contains(node_id) || !node_channel->check_status().ok()) {
477
0
            continue;
478
0
        }
479
0
        for (const auto& tablet_id : _tablets_by_channel[node_id]) {
480
            // Only count non-gap backends for quorum success.
481
            // Gap backends' success doesn't count toward majority write.
482
0
            auto gap_it = _parent->_tablet_version_gap_backends.find(tablet_id);
483
0
            if (gap_it == _parent->_tablet_version_gap_backends.end() ||
484
0
                gap_it->second.find(node_id) == gap_it->second.end()) {
485
0
                finished_tablets_replica[tablet_id]++;
486
0
            }
487
0
        }
488
0
    }
489
490
    // 2. check if quorum success
491
0
    for (const auto& tablet_id : need_finish_tablets) {
492
0
        if (finished_tablets_replica[tablet_id] < _load_required_replicas_num(tablet_id)) {
493
0
            return false;
494
0
        }
495
0
    }
496
497
0
    return true;
498
0
}
499
500
int64_t IndexChannel::_calc_max_wait_time_ms(
501
0
        const std::unordered_set<int64_t>& unfinished_node_channel_ids) {
502
    // 1. calculate avg speed of all unfinished node channel
503
0
    int64_t elapsed_ms = UnixMillis() - _start_time;
504
0
    int64_t total_bytes = 0;
505
0
    int finished_count = 0;
506
0
    for (const auto& [node_id, node_channel] : _node_channels) {
507
0
        if (unfinished_node_channel_ids.contains(node_id)) {
508
0
            continue;
509
0
        }
510
0
        total_bytes += node_channel->write_bytes();
511
0
        finished_count++;
512
0
    }
513
    // no data loaded in index channel, return 0
514
0
    if (total_bytes == 0 || finished_count == 0) {
515
0
        return 0;
516
0
    }
517
    // if elapsed_ms is equal to 0, explain the loaded data is too small
518
0
    if (elapsed_ms <= 0) {
519
0
        return config::quorum_success_min_wait_seconds * 1000;
520
0
    }
521
0
    double avg_speed =
522
0
            static_cast<double>(total_bytes) / (static_cast<double>(elapsed_ms) * finished_count);
523
524
    // 2. calculate max wait time of each unfinished node channel and return the max value
525
0
    int64_t max_wait_time_ms = 0;
526
0
    for (int64_t id : unfinished_node_channel_ids) {
527
0
        int64_t bytes = _node_channels[id]->write_bytes();
528
0
        int64_t wait =
529
0
                avg_speed > 0 ? static_cast<int64_t>(static_cast<double>(bytes) / avg_speed) : 0;
530
0
        max_wait_time_ms = std::max(max_wait_time_ms, wait);
531
0
    }
532
533
    // 3. calculate max wait time
534
    // introduce quorum_success_min_wait_seconds to avoid jitter of small load
535
0
    max_wait_time_ms -= UnixMillis() - _start_time;
536
0
    max_wait_time_ms =
537
0
            std::max(static_cast<int64_t>(static_cast<double>(max_wait_time_ms) *
538
0
                                          (1.0 + config::quorum_success_max_wait_multiplier)),
539
0
                     config::quorum_success_min_wait_seconds * 1000);
540
541
0
    return max_wait_time_ms;
542
0
}
543
544
0
static Status none_of(std::initializer_list<bool> vars) {
545
0
    bool none = std::none_of(vars.begin(), vars.end(), [](bool var) { return var; });
546
0
    Status st = Status::OK();
547
0
    if (!none) {
548
0
        std::string vars_str;
549
0
        std::for_each(vars.begin(), vars.end(),
550
0
                      [&vars_str](bool var) -> void { vars_str += (var ? "1/" : "0/"); });
551
0
        if (!vars_str.empty()) {
552
0
            vars_str.pop_back(); // 0/1/0/ -> 0/1/0
553
0
        }
554
0
        st = Status::Uninitialized(vars_str);
555
0
    }
556
557
0
    return st;
558
0
}
559
560
VNodeChannel::VNodeChannel(VTabletWriter* parent, IndexChannel* index_channel, int64_t node_id,
561
                           bool is_incremental)
562
0
        : _parent(parent),
563
0
          _index_channel(index_channel),
564
0
          _node_id(node_id),
565
0
          _is_incremental(is_incremental) {
566
0
    _cur_add_block_request = std::make_shared<PTabletWriterAddBlockRequest>();
567
0
    _node_channel_tracker = std::make_shared<MemTracker>(
568
0
            fmt::format("NodeChannel:indexID={}:threadId={}",
569
0
                        std::to_string(_index_channel->_index_id), ThreadContext::get_thread_id()));
570
0
    _load_mem_limit = MemInfo::mem_limit() * config::load_process_max_memory_limit_percent / 100;
571
0
}
572
573
0
VNodeChannel::~VNodeChannel() = default;
574
575
0
void VNodeChannel::clear_all_blocks() {
576
0
    std::lock_guard<std::mutex> lg(_pending_batches_lock);
577
0
    std::queue<AddBlockReq> empty;
578
0
    std::swap(_pending_blocks, empty);
579
0
    _cur_mutable_block.reset();
580
0
}
581
582
// we don't need to send tablet_writer_cancel rpc request when
583
// init failed, so set _is_closed to true.
584
// if "_cancelled" is set to true,
585
// no need to set _cancel_msg because the error will be
586
// returned directly via "TabletSink::prepare()" method.
587
0
Status VNodeChannel::init(RuntimeState* state) {
588
0
    if (_inited) {
589
0
        return Status::OK();
590
0
    }
591
592
0
    SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
593
0
    _task_exec_ctx = state->get_task_execution_context();
594
0
    _tuple_desc = _parent->_output_tuple_desc;
595
0
    _state = state;
596
    // get corresponding BE node.
597
0
    const auto* node = _parent->_nodes_info->find_node(_node_id);
598
0
    if (node == nullptr) {
599
0
        _cancelled = true;
600
0
        _is_closed = true;
601
0
        return Status::InternalError("unknown node id, id={}", _node_id);
602
0
    }
603
0
    _node_info = *node;
604
605
0
    _load_info = "load_id=" + print_id(_parent->_load_id) +
606
0
                 ", txn_id=" + std::to_string(_parent->_txn_id);
607
608
0
    _row_desc = std::make_unique<RowDescriptor>(_tuple_desc);
609
0
    _batch_size = state->batch_size();
610
611
0
    _stub = state->exec_env()->brpc_internal_client_cache()->get_client(_node_info.host,
612
0
                                                                        _node_info.brpc_port);
613
0
    if (_stub == nullptr) {
614
0
        _cancelled = true;
615
0
        _is_closed = true;
616
0
        return Status::InternalError("Get rpc stub failed, host={}, port={}, info={}",
617
0
                                     _node_info.host, _node_info.brpc_port, channel_info());
618
0
    }
619
620
0
    _rpc_timeout_ms = state->execution_timeout() * 1000;
621
0
    _timeout_watch.start();
622
623
    // Initialize _cur_add_block_request
624
0
    if (!_cur_add_block_request->has_id()) {
625
0
        *(_cur_add_block_request->mutable_id()) = _parent->_load_id;
626
0
    }
627
0
    _cur_add_block_request->set_index_id(_index_channel->_index_id);
628
0
    _cur_add_block_request->set_sender_id(_parent->_sender_id);
629
0
    _cur_add_block_request->set_backend_id(_node_id);
630
0
    _cur_add_block_request->set_eos(false);
631
    // Adaptive random bucket add-block RPCs carry partition ids because the receiver
632
    // chooses the current tablet from its local adaptive state.
633
0
    _cur_add_block_request->set_is_adaptive_random_bucket(
634
0
            _parent->_tablet_finder->is_adaptive_random_bucket());
635
636
    // add block closure
637
    // Has to using value to capture _task_exec_ctx because tablet writer may destroyed during callback.
638
0
    _send_block_callback = WriteBlockCallback<PTabletWriterAddBlockResult>::create_shared();
639
0
    _send_block_callback->addFailedHandler(
640
0
            [&, task_exec_ctx = _task_exec_ctx](const WriteBlockCallbackContext& ctx) {
641
0
                std::shared_ptr<TaskExecutionContext> ctx_lock = task_exec_ctx.lock();
642
0
                if (ctx_lock == nullptr) {
643
0
                    return;
644
0
                }
645
0
                _add_block_failed_callback(ctx);
646
0
            });
647
648
0
    _send_block_callback->addSuccessHandler(
649
0
            [&, task_exec_ctx = _task_exec_ctx](const PTabletWriterAddBlockResult& result,
650
0
                                                const WriteBlockCallbackContext& ctx) {
651
0
                std::shared_ptr<TaskExecutionContext> ctx_lock = task_exec_ctx.lock();
652
0
                if (ctx_lock == nullptr) {
653
0
                    return;
654
0
                }
655
0
                _add_block_success_callback(result, ctx);
656
0
            });
657
658
0
    _name = fmt::format("VNodeChannel[{}-{}]", _index_channel->_index_id, _node_id);
659
    // The node channel will send _batch_size rows of data each rpc. When the
660
    // number of tablets is large, the number of data rows received by each
661
    // tablet is small, TabletsChannel need to traverse each tablet for import.
662
    // so the import performance is poor. Therefore, we set _batch_size to
663
    // a relatively large value to improve the import performance.
664
0
    _batch_size = std::max(_batch_size, 8192);
665
666
0
    if (_state) {
667
0
        QueryContext* query_ctx = _state->get_query_ctx();
668
0
        if (query_ctx) {
669
0
            auto wg_ptr = query_ctx->workload_group();
670
0
            if (wg_ptr) {
671
0
                _wg_id = wg_ptr->id();
672
0
            }
673
0
        }
674
0
    }
675
676
0
    _inited = true;
677
0
    return Status::OK();
678
0
}
679
680
0
void VNodeChannel::_set_adaptive_random_bucket_open_request(PTabletWriterOpenRequest* request) {
681
0
    std::unordered_map<int64_t, std::vector<int64_t>> partition_to_ordered_tablets;
682
0
    std::unordered_map<int64_t, std::unordered_set<int64_t>> partition_to_local_tablets;
683
0
    for (const auto& tablet : _all_tablets) {
684
0
        partition_to_ordered_tablets[tablet.partition_id].push_back(tablet.tablet_id);
685
0
        partition_to_local_tablets[tablet.partition_id].insert(tablet.tablet_id);
686
0
    }
687
0
    std::unordered_map<int64_t, const VOlapTablePartition*> id_to_partition;
688
0
    for (const auto* part : _parent->_vpartition->get_partitions()) {
689
0
        id_to_partition.emplace(part->id, part);
690
0
    }
691
0
    for (const auto& [partition_id, ordered_tablets] : partition_to_ordered_tablets) {
692
0
        auto partition_it = id_to_partition.find(partition_id);
693
0
        if (partition_it == id_to_partition.end()) {
694
0
            LOG(WARNING) << "unknown partition for adaptive random bucket, load_id="
695
0
                         << _parent->_load_id << ", partition_id=" << partition_id;
696
0
            continue;
697
0
        }
698
0
        const auto* index_info =
699
0
                find_partition_index(*partition_it->second, _index_channel->_index_id);
700
0
        if (index_info == nullptr) {
701
0
            LOG(WARNING) << "unknown index for adaptive random bucket, load_id="
702
0
                         << _parent->_load_id << ", partition_id=" << partition_id
703
0
                         << ", index_id=" << _index_channel->_index_id;
704
0
            continue;
705
0
        }
706
0
        std::vector<int64_t> selected_ordered_tablets;
707
0
        const auto& local_bucket_seqs =
708
0
                adaptive_local_bucket_seqs(*partition_it->second, index_info);
709
0
        if (!local_bucket_seqs.empty()) {
710
0
            const auto& full_ordered_tablets = index_info->tablets;
711
0
            for (auto bucket_seq : local_bucket_seqs) {
712
0
                if (bucket_seq < 0 ||
713
0
                    bucket_seq >= cast_set<int32_t>(full_ordered_tablets.size())) {
714
0
                    LOG(WARNING) << "invalid local bucket seq, load_id=" << _parent->_load_id
715
0
                                 << ", partition_id=" << partition_id
716
0
                                 << ", bucket_seq=" << bucket_seq
717
0
                                 << ", full_ordered_tablets_size=" << full_ordered_tablets.size();
718
0
                    continue;
719
0
                }
720
0
                auto tablet_id = full_ordered_tablets[bucket_seq];
721
0
                if (!partition_to_local_tablets[partition_id].contains(tablet_id)) {
722
0
                    LOG(WARNING) << "skip non-local tablet selected by local bucket seq, load_id="
723
0
                                 << _parent->_load_id << ", partition_id=" << partition_id
724
0
                                 << ", bucket_seq=" << bucket_seq << ", tablet_id=" << tablet_id
725
0
                                 << ", node_id=" << _node_id;
726
0
                    continue;
727
0
                }
728
0
                selected_ordered_tablets.push_back(tablet_id);
729
0
            }
730
0
        } else {
731
0
            selected_ordered_tablets = ordered_tablets;
732
0
        }
733
0
        if (selected_ordered_tablets.empty()) {
734
0
            VLOG_DEBUG << "skip adaptive random bucket partition without selected local "
735
0
                          "tablet, load_id="
736
0
                       << _parent->_load_id << ", partition_id=" << partition_id
737
0
                       << ", node_id=" << _node_id;
738
0
            continue;
739
0
        }
740
0
        _adaptive_partition_compat_tablets[partition_id] = selected_ordered_tablets.front();
741
0
        auto* random_bucket_partition = request->add_random_bucket_partitions();
742
0
        random_bucket_partition->set_partition_id(partition_id);
743
0
        for (auto tablet_id : selected_ordered_tablets) {
744
0
            random_bucket_partition->add_ordered_tablet_ids(tablet_id);
745
0
        }
746
0
    }
747
0
}
748
749
0
void VNodeChannel::_open_internal(bool is_incremental) {
750
0
    if (_tablets_wait_open.empty()) {
751
0
        return;
752
0
    }
753
0
    SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
754
0
    auto request = std::make_shared<PTabletWriterOpenRequest>();
755
0
    request->mutable_id()->CopyFrom(_parent->_load_id);
756
0
    request->set_index_id(_index_channel->_index_id);
757
0
    request->set_txn_id(_parent->_txn_id);
758
0
    request->set_sender_id(_parent->_sender_id);
759
0
    request->mutable_schema()->CopyFrom(*_parent->_schema->to_protobuf());
760
0
    if (_parent->_t_sink.olap_table_sink.__isset.storage_vault_id) {
761
0
        request->set_storage_vault_id(_parent->_t_sink.olap_table_sink.storage_vault_id);
762
0
    }
763
    // Adaptive random bucket open RPCs initialize receiver-side selected-tablet state.
764
0
    request->set_is_adaptive_random_bucket(_parent->_tablet_finder->is_adaptive_random_bucket());
765
0
    std::set<int64_t> deduper;
766
0
    for (auto& tablet : _tablets_wait_open) {
767
0
        if (deduper.contains(tablet.tablet_id)) {
768
0
            continue;
769
0
        }
770
0
        auto* ptablet = request->add_tablets();
771
0
        ptablet->set_partition_id(tablet.partition_id);
772
0
        ptablet->set_tablet_id(tablet.tablet_id);
773
0
        deduper.insert(tablet.tablet_id);
774
0
        _all_tablets.push_back(std::move(tablet));
775
0
    }
776
0
    _tablets_wait_open.clear();
777
778
0
    request->set_num_senders(_parent->_num_senders);
779
0
    request->set_need_gen_rollup(false); // Useless but it is a required field in pb
780
0
    request->set_load_channel_timeout_s(_parent->_load_channel_timeout_s);
781
0
    request->set_is_high_priority(_parent->_is_high_priority);
782
0
    request->set_sender_ip(BackendOptions::get_localhost());
783
0
    request->set_is_vectorized(true);
784
0
    request->set_backend_id(_node_id);
785
0
    request->set_enable_profile(_state->enable_profile());
786
0
    request->set_is_incremental(is_incremental);
787
0
    request->set_txn_expiration(_parent->_txn_expiration);
788
0
    request->set_write_file_cache(_parent->_write_file_cache);
789
790
0
    if (_parent->_tablet_finder->is_adaptive_random_bucket()) {
791
0
        _set_adaptive_random_bucket_open_request(request.get());
792
0
    }
793
794
0
    if (_wg_id > 0) {
795
0
        request->set_workload_group_id(_wg_id);
796
0
    }
797
798
0
    auto open_callback = DummyBrpcCallback<PTabletWriterOpenResult>::create_shared();
799
0
    auto open_closure = AutoReleaseClosure<
800
0
            PTabletWriterOpenRequest,
801
0
            DummyBrpcCallback<PTabletWriterOpenResult>>::create_unique(request, open_callback);
802
0
    open_callback->cntl_->set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec * 1000);
803
0
    if (config::tablet_writer_ignore_eovercrowded) {
804
0
        open_callback->cntl_->ignore_eovercrowded();
805
0
    }
806
0
    VLOG_DEBUG << fmt::format("txn {}: open NodeChannel to {}, incremental: {}, senders: {}",
807
0
                              _parent->_txn_id, _node_id, is_incremental, _parent->_num_senders);
808
    // the real transmission here. the corresponding BE's load mgr will open load channel for it.
809
0
    _stub->tablet_writer_open(open_closure->cntl_.get(), open_closure->request_.get(),
810
0
                              open_closure->response_.get(), open_closure.get());
811
0
    open_closure.release();
812
0
    _open_callbacks.push_back(open_callback);
813
0
}
814
815
0
void VNodeChannel::open() {
816
0
    _open_internal(false);
817
0
}
818
819
0
void VNodeChannel::incremental_open() {
820
0
    VLOG_DEBUG << "incremental opening node channel" << _node_id;
821
0
    _open_internal(true);
822
0
}
823
824
0
Status VNodeChannel::open_wait() {
825
0
    Status status;
826
0
    for (auto& open_callback : _open_callbacks) {
827
        // because of incremental open, we will wait multi times. so skip the closures which have been checked and set to nullptr in previous rounds
828
0
        if (open_callback == nullptr) {
829
0
            continue;
830
0
        }
831
832
0
        open_callback->join();
833
0
        SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
834
0
        if (open_callback->cntl_->Failed()) {
835
0
            if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available(
836
0
                        _stub, _node_info.host, _node_info.brpc_port)) {
837
0
                ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
838
0
                        open_callback->cntl_->remote_side());
839
0
            }
840
0
            _cancelled = true;
841
0
            auto error_code = open_callback->cntl_->ErrorCode();
842
0
            auto error_text = open_callback->cntl_->ErrorText();
843
0
            if (error_text.find("Reached timeout") != std::string::npos) {
844
0
                LOG(WARNING) << "failed to open tablet writer may caused by timeout. increase BE "
845
0
                                "config `tablet_writer_open_rpc_timeout_sec` if you are sure that "
846
0
                                "your table building and data are reasonable.";
847
0
            }
848
0
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>(
849
0
                    "failed to open tablet writer, error={}, error_text={}, info={}",
850
0
                    berror(error_code), error_text, channel_info());
851
0
        }
852
0
        status = Status::create(open_callback->response_->status());
853
854
0
        if (!status.ok()) {
855
0
            _cancelled = true;
856
0
            return status;
857
0
        }
858
0
    }
859
860
0
    return status;
861
0
}
862
863
0
Status VNodeChannel::add_block(Block* block, const Payload* payload) {
864
0
    SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
865
0
    if (payload->row_part_tablet_ids == nullptr || payload->row_ids == nullptr ||
866
0
        payload->row_ids->empty()) {
867
0
        return Status::OK();
868
0
    }
869
0
    DCHECK_EQ(payload->row_ids->size(), payload->route_idxs.size());
870
    // If add_block() when _eos_is_produced==true, there must be sth wrong, we can only mark this channel as failed.
871
0
    auto st = none_of({_cancelled, _eos_is_produced});
872
0
    if (!st.ok()) {
873
0
        if (_cancelled) {
874
0
            std::lock_guard<std::mutex> l(_cancel_msg_lock);
875
0
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("add row failed. {}",
876
0
                                                                   _cancel_msg);
877
0
        } else {
878
0
            return std::move(st.prepend("already stopped, can't add row. cancelled/eos: "));
879
0
        }
880
0
    }
881
882
    // We use OlapTableSink mem_tracker which has the same ancestor of _plan node,
883
    // so in the ideal case, mem limit is a matter for _plan node.
884
    // But there is still some unfinished things, we do mem limit here temporarily.
885
    // _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below.
886
    // It's fine to do a fake add_block() and return OK, because we will check _cancelled in next add_block() or mark_close().
887
0
    constexpr int64_t kBackPressureSleepMs = 10;
888
0
    auto* memtable_limiter = ExecEnv::GetInstance()->memtable_memory_limiter();
889
0
    while (true) {
890
0
        bool is_exceed_soft_mem_limit = GlobalMemoryArbitrator::is_exceed_soft_mem_limit();
891
0
        int64_t memtable_mem =
892
0
                (memtable_limiter != nullptr && memtable_limiter->mem_tracker() != nullptr)
893
0
                        ? memtable_limiter->mem_tracker()->consumption()
894
0
                        : 0;
895
        // Note: Memtable memory is not included in load memory statistics (MemoryProfile::load_current_usage())
896
        // for performance and memory control complexity reasons. Therefore, we explicitly add memtable memory
897
        // consumption here to ensure accurate back pressure decisions and prevent OOM during heavy loads.
898
0
        auto current_load_mem_value = MemoryProfile::load_current_usage() + memtable_mem;
899
0
        bool mem_limit_exceeded = is_exceed_soft_mem_limit ||
900
0
                                  current_load_mem_value > _load_mem_limit ||
901
0
                                  _pending_batches_bytes > _max_pending_batches_bytes;
902
0
        bool need_back_pressure = !_cancelled && !_state->is_cancelled() &&
903
0
                                  _pending_batches_num > 0 && mem_limit_exceeded;
904
0
        if (!need_back_pressure) {
905
0
            break;
906
0
        }
907
0
        SCOPED_RAW_TIMER(&_stat.mem_exceeded_block_ns);
908
0
        std::this_thread::sleep_for(std::chrono::milliseconds(kBackPressureSleepMs));
909
0
    }
910
911
0
    if (UNLIKELY(!_cur_mutable_block)) {
912
0
        _cur_mutable_block = MutableBlock::create_unique(block->clone_empty());
913
0
    }
914
915
0
    SCOPED_RAW_TIMER(&_stat.append_node_channel_ns);
916
0
    st = block->append_to_block_by_selector(_cur_mutable_block.get(), *payload->row_ids);
917
0
    if (!st.ok()) {
918
0
        _cancel_with_msg(fmt::format("{}, err: {}", channel_info(), st.to_string()));
919
0
        return st;
920
0
    }
921
0
    auto* row_part_tablet_ids = payload->row_part_tablet_ids;
922
0
    for (uint32_t route_idx : payload->route_idxs) {
923
0
        auto partition_id = row_part_tablet_ids->partition_ids[route_idx];
924
0
        _cur_add_block_request->add_partition_ids(partition_id);
925
0
        if (_parent->_tablet_finder->is_adaptive_random_bucket()) {
926
0
            auto tablet_it = _adaptive_partition_compat_tablets.find(partition_id);
927
0
            if (tablet_it == _adaptive_partition_compat_tablets.end()) {
928
0
                return Status::InternalError(
929
0
                        "{}, err: missing adaptive random bucket compatible tablet, "
930
0
                        "partition_id={}",
931
0
                        channel_info(), partition_id);
932
0
            }
933
0
            _cur_add_block_request->add_tablet_ids(tablet_it->second);
934
0
        } else {
935
0
            _cur_add_block_request->add_tablet_ids(row_part_tablet_ids->tablet_ids[route_idx]);
936
0
        }
937
0
    }
938
0
    _write_bytes.fetch_add(_cur_mutable_block->bytes());
939
940
0
    if (_cur_mutable_block->rows() >= _batch_size ||
941
0
        _cur_mutable_block->bytes() > config::doris_scanner_row_bytes) {
942
0
        {
943
0
            SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
944
0
            std::lock_guard<std::mutex> l(_pending_batches_lock);
945
            // To simplify the add_row logic, postpone adding block into req until the time of sending req
946
0
            _pending_batches_bytes += _cur_mutable_block->allocated_bytes();
947
0
            _cur_add_block_request->set_eos(
948
0
                    false); // for multi-add, only when marking close we set it eos.
949
            // Copy the request to tmp request to add to pend block queue
950
0
            auto tmp_add_block_request = std::make_shared<PTabletWriterAddBlockRequest>();
951
0
            *tmp_add_block_request = *_cur_add_block_request;
952
0
            _pending_blocks.emplace(std::move(_cur_mutable_block), tmp_add_block_request);
953
0
            _pending_batches_num++;
954
0
            VLOG_DEBUG << "VTabletWriter:" << _parent << " VNodeChannel:" << this
955
0
                       << " pending_batches_bytes:" << _pending_batches_bytes
956
0
                       << " jobid:" << std::to_string(_state->load_job_id())
957
0
                       << " loadinfo:" << _load_info;
958
0
        }
959
0
        _cur_mutable_block = MutableBlock::create_unique(block->clone_empty());
960
0
        _cur_add_block_request->clear_tablet_ids();
961
0
        _cur_add_block_request->clear_partition_ids();
962
0
    }
963
964
0
    return Status::OK();
965
0
}
966
967
0
static void injection_full_gc_fn() {
968
0
    MemoryReclamation::revoke_process_memory("injection_full_gc_fn");
969
0
}
970
971
int VNodeChannel::try_send_and_fetch_status(RuntimeState* state,
972
0
                                            std::unique_ptr<ThreadPoolToken>& thread_pool_token) {
973
0
    DBUG_EXECUTE_IF("VNodeChannel.try_send_and_fetch_status_full_gc", {
974
0
        std::thread t(injection_full_gc_fn);
975
0
        t.join();
976
0
    });
977
978
0
    if (_cancelled || _send_finished) { // not run
979
0
        return 0;
980
0
    }
981
982
0
    auto load_back_pressure_version_wait_time_ms = _load_back_pressure_version_wait_time_ms.load();
983
0
    if (UNLIKELY(load_back_pressure_version_wait_time_ms > 0)) {
984
0
        std::this_thread::sleep_for(
985
0
                std::chrono::milliseconds(load_back_pressure_version_wait_time_ms));
986
0
        _load_back_pressure_version_block_ms.fetch_add(
987
0
                load_back_pressure_version_wait_time_ms); // already in milliseconds
988
0
        _load_back_pressure_version_wait_time_ms = 0;
989
0
    }
990
991
    // set closure for sending block.
992
0
    if (!_send_block_callback->try_set_in_flight()) {
993
        // There is packet in flight, skip.
994
0
        return _send_finished ? 0 : 1;
995
0
    }
996
997
    // We are sure that try_send_batch is not running
998
0
    if (_pending_batches_num > 0) {
999
0
        auto s = thread_pool_token->submit_func([this, state] { try_send_pending_block(state); });
1000
0
        if (!s.ok()) {
1001
0
            _cancel_with_msg("submit send_batch task to send_batch_thread_pool failed");
1002
            // sending finished. clear in flight
1003
0
            _send_block_callback->clear_in_flight();
1004
0
        }
1005
        // in_flight is cleared in closure::Run
1006
0
    } else {
1007
        // sending finished. clear in flight
1008
0
        _send_block_callback->clear_in_flight();
1009
0
    }
1010
0
    return _send_finished ? 0 : 1;
1011
0
}
1012
1013
0
void VNodeChannel::_cancel_with_msg(const std::string& msg) {
1014
0
    LOG(WARNING) << "cancel node channel " << channel_info() << ", error message: " << msg;
1015
0
    {
1016
0
        std::lock_guard<std::mutex> l(_cancel_msg_lock);
1017
0
        if (_cancel_msg.empty()) {
1018
0
            _cancel_msg = msg;
1019
0
        }
1020
0
    }
1021
0
    _cancelled = true;
1022
0
    _index_channel->notify_close_wait();
1023
0
}
1024
1025
void VNodeChannel::_refresh_back_pressure_version_wait_time(
1026
        const ::google::protobuf::RepeatedPtrField<::doris::PTabletLoadRowsetInfo>&
1027
0
                tablet_load_infos) {
1028
0
    int64_t max_rowset_num_gap = 0;
1029
    // if any one tablet is under high load pressure, we would make the whole procedure
1030
    // sleep to prevent the corresponding BE return -235
1031
0
    std::for_each(
1032
0
            tablet_load_infos.begin(), tablet_load_infos.end(),
1033
0
            [&max_rowset_num_gap](auto& load_info) {
1034
0
                int64_t cur_rowset_num = load_info.current_rowset_nums();
1035
0
                int64_t high_load_point = load_info.max_config_rowset_nums() *
1036
0
                                          (config::load_back_pressure_version_threshold / 100);
1037
0
                DCHECK(cur_rowset_num > high_load_point);
1038
0
                max_rowset_num_gap = std::max(max_rowset_num_gap, cur_rowset_num - high_load_point);
1039
0
            });
1040
    // to slow down the high load pressure
1041
    // we would use the rowset num gap to calculate one sleep time
1042
    // for example:
1043
    // if the max tablet version is 2000, there are 3 BE
1044
    // A: ====================  1800
1045
    // B: ===================   1700
1046
    // C: ==================    1600
1047
    //    ==================    1600
1048
    //                      ^
1049
    //                      the high load point
1050
    // then then max gap is 1800 - (max tablet version * config::load_back_pressure_version_threshold / 100) = 200,
1051
    // we would make the whole send procesure sleep
1052
    // 1200ms for compaction to be done toe reduce the high pressure
1053
0
    auto max_time = config::max_load_back_pressure_version_wait_time_ms;
1054
0
    if (UNLIKELY(max_rowset_num_gap > 0)) {
1055
0
        _load_back_pressure_version_wait_time_ms.store(
1056
0
                std::min(max_rowset_num_gap + 1000, max_time));
1057
0
        LOG(INFO) << "try to back pressure version, wait time(ms): "
1058
0
                  << _load_back_pressure_version_wait_time_ms
1059
0
                  << ", load id: " << print_id(_parent->_load_id)
1060
0
                  << ", max_rowset_num_gap: " << max_rowset_num_gap;
1061
0
    }
1062
0
}
1063
1064
0
void VNodeChannel::try_send_pending_block(RuntimeState* state) {
1065
0
    SCOPED_ATTACH_TASK(state);
1066
0
    SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker);
1067
0
    SCOPED_ATOMIC_TIMER(&_actual_consume_ns);
1068
0
    signal::set_signal_task_id(_parent->_load_id);
1069
0
    AddBlockReq send_block;
1070
0
    {
1071
0
        std::lock_guard<std::mutex> l(_pending_batches_lock);
1072
0
        DCHECK(!_pending_blocks.empty());
1073
0
        send_block = std::move(_pending_blocks.front());
1074
0
        _pending_blocks.pop();
1075
0
        _pending_batches_num--;
1076
0
        _pending_batches_bytes -= send_block.first->allocated_bytes();
1077
0
    }
1078
1079
0
    auto mutable_block = std::move(send_block.first);
1080
0
    auto request = std::move(send_block.second); // doesn't need to be saved in heap
1081
1082
    // tablet_ids has already set when add row
1083
0
    request->set_packet_seq(_next_packet_seq);
1084
0
    auto block = mutable_block->to_block();
1085
0
    int request_rows = request->is_adaptive_random_bucket() && !request->eos()
1086
0
                               ? request->partition_ids_size()
1087
0
                               : request->tablet_ids_size();
1088
0
    if (block.rows() != request_rows) {
1089
0
        cancel(
1090
0
                fmt::format("{}, err: invalid add block request row count, block rows: {}, "
1091
0
                            "request rows: {}, adaptive_random_bucket: {}, eos: {}",
1092
0
                            channel_info(), block.rows(), request_rows,
1093
0
                            request->is_adaptive_random_bucket(), request->eos()));
1094
0
        _send_block_callback->clear_in_flight();
1095
0
        return;
1096
0
    }
1097
0
    if (block.rows() > 0) {
1098
0
        SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
1099
0
        size_t uncompressed_bytes = 0, compressed_bytes = 0;
1100
0
        int64_t compressed_time = 0;
1101
0
        Status st = block.serialize(state->be_exec_version(), request->mutable_block(),
1102
0
                                    &uncompressed_bytes, &compressed_bytes, &compressed_time,
1103
0
                                    state->fragement_transmission_compression_type(),
1104
0
                                    _parent->_transfer_large_data_by_brpc);
1105
0
        TEST_INJECTION_POINT_CALLBACK("VNodeChannel::try_send_block", &st);
1106
0
        if (!st.ok()) {
1107
0
            cancel(fmt::format("{}, err: {}", channel_info(), st.to_string()));
1108
0
            _send_block_callback->clear_in_flight();
1109
0
            return;
1110
0
        }
1111
0
        if (double(compressed_bytes) >= double(config::brpc_max_body_size) * 0.95F) {
1112
0
            LOG(WARNING) << "send block too large, this rpc may failed. send size: "
1113
0
                         << compressed_bytes << ", threshold: " << config::brpc_max_body_size
1114
0
                         << ", " << channel_info();
1115
0
        }
1116
0
    }
1117
1118
0
    auto remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS;
1119
0
    if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
1120
0
        if (remain_ms <= 0 && !request->eos()) {
1121
0
            cancel(fmt::format("{}, err: load timeout after {} ms", channel_info(),
1122
0
                               _rpc_timeout_ms));
1123
0
            _send_block_callback->clear_in_flight();
1124
0
            return;
1125
0
        } else {
1126
0
            remain_ms = config::min_load_rpc_timeout_ms;
1127
0
        }
1128
0
    }
1129
1130
0
    _send_block_callback->reset();
1131
0
    _send_block_callback->cntl_->set_timeout_ms(remain_ms);
1132
0
    if (config::tablet_writer_ignore_eovercrowded) {
1133
0
        _send_block_callback->cntl_->ignore_eovercrowded();
1134
0
    }
1135
1136
0
    if (request->eos()) {
1137
0
        if (!request->is_adaptive_random_bucket() || !request->has_block()) {
1138
0
            for (auto pid : _parent->_tablet_finder->partition_ids()) {
1139
0
                request->add_partition_ids(pid);
1140
0
            }
1141
0
        }
1142
1143
0
        request->set_write_single_replica(_parent->_write_single_replica);
1144
0
        if (_parent->_write_single_replica) {
1145
0
            for (auto& _slave_tablet_node : _slave_tablet_nodes) {
1146
0
                PSlaveTabletNodes slave_tablet_nodes;
1147
0
                for (auto node_id : _slave_tablet_node.second) {
1148
0
                    const auto* node = _parent->_nodes_info->find_node(node_id);
1149
0
                    DBUG_EXECUTE_IF("VNodeChannel.try_send_pending_block.slave_node_not_found", {
1150
0
                        LOG(WARNING) << "trigger "
1151
0
                                        "VNodeChannel.try_send_pending_block.slave_node_not_found "
1152
0
                                        "debug point will set node to nullptr";
1153
0
                        node = nullptr;
1154
0
                    });
1155
0
                    if (node == nullptr) {
1156
0
                        LOG(WARNING) << "slave node not found, node_id=" << node_id;
1157
0
                        cancel(fmt::format("slave node not found, node_id={}", node_id));
1158
0
                        _send_block_callback->clear_in_flight();
1159
0
                        return;
1160
0
                    }
1161
0
                    PNodeInfo* pnode = slave_tablet_nodes.add_slave_nodes();
1162
0
                    pnode->set_id(node->id);
1163
0
                    pnode->set_option(node->option);
1164
0
                    pnode->set_host(node->host);
1165
0
                    pnode->set_async_internal_port(node->brpc_port);
1166
0
                }
1167
0
                request->mutable_slave_tablet_nodes()->insert(
1168
0
                        {_slave_tablet_node.first, slave_tablet_nodes});
1169
0
            }
1170
0
        }
1171
1172
        // eos request must be the last request-> it's a signal makeing callback function to set _add_batch_finished true.
1173
        // end_mark makes is_last_rpc true when rpc finished and call callbacks.
1174
0
        _send_block_callback->end_mark();
1175
0
        _send_finished = true;
1176
0
        CHECK(_pending_batches_num == 0) << _pending_batches_num;
1177
0
    }
1178
1179
0
    auto send_block_closure = AutoReleaseClosure<
1180
0
            PTabletWriterAddBlockRequest,
1181
0
            WriteBlockCallback<PTabletWriterAddBlockResult>>::create_unique(request,
1182
0
                                                                            _send_block_callback);
1183
0
    if (_parent->_transfer_large_data_by_brpc && request->has_block() &&
1184
0
        request->block().has_column_values() && request->ByteSizeLong() > MIN_HTTP_BRPC_SIZE) {
1185
0
        Status st = request_embed_attachment_contain_blockv2(send_block_closure->request_.get(),
1186
0
                                                             send_block_closure);
1187
0
        if (!st.ok()) {
1188
0
            cancel(fmt::format("{}, err: {}", channel_info(), st.to_string()));
1189
0
            _send_block_callback->clear_in_flight();
1190
0
            return;
1191
0
        }
1192
1193
0
        std::string host = _node_info.host;
1194
0
        auto dns_cache = ExecEnv::GetInstance()->dns_cache();
1195
0
        if (dns_cache == nullptr) {
1196
0
            LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
1197
0
        } else if (!is_valid_ip(_node_info.host)) {
1198
0
            Status status = dns_cache->get(_node_info.host, &host);
1199
0
            if (!status.ok()) {
1200
0
                LOG(WARNING) << "failed to get ip from host " << _node_info.host << ": "
1201
0
                             << status.to_string();
1202
0
                cancel(fmt::format("failed to get ip from host {}", _node_info.host));
1203
0
                _send_block_callback->clear_in_flight();
1204
0
                return;
1205
0
            }
1206
0
        }
1207
        //format an ipv6 address
1208
0
        std::string brpc_url = get_brpc_http_url(host, _node_info.brpc_port);
1209
0
        std::shared_ptr<PBackendService_Stub> _brpc_http_stub =
1210
0
                _state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url,
1211
0
                                                                                          "http");
1212
0
        if (_brpc_http_stub == nullptr) {
1213
0
            cancel(fmt::format("{}, failed to open brpc http client to {}", channel_info(),
1214
0
                               brpc_url));
1215
0
            _send_block_callback->clear_in_flight();
1216
0
            return;
1217
0
        }
1218
0
        _send_block_callback->cntl_->http_request().uri() =
1219
0
                brpc_url + "/PInternalServiceImpl/tablet_writer_add_block_by_http";
1220
0
        _send_block_callback->cntl_->http_request().set_method(brpc::HTTP_METHOD_POST);
1221
0
        _send_block_callback->cntl_->http_request().set_content_type("application/json");
1222
1223
0
        {
1224
0
            _brpc_http_stub->tablet_writer_add_block_by_http(
1225
0
                    send_block_closure->cntl_.get(), nullptr, send_block_closure->response_.get(),
1226
0
                    send_block_closure.get());
1227
0
            send_block_closure.release();
1228
0
        }
1229
0
    } else {
1230
0
        _send_block_callback->cntl_->http_request().Clear();
1231
0
        {
1232
0
            _stub->tablet_writer_add_block(
1233
0
                    send_block_closure->cntl_.get(), send_block_closure->request_.get(),
1234
0
                    send_block_closure->response_.get(), send_block_closure.get());
1235
0
            send_block_closure.release();
1236
0
        }
1237
0
    }
1238
1239
0
    _next_packet_seq++;
1240
0
}
1241
1242
void VNodeChannel::_add_block_success_callback(const PTabletWriterAddBlockResult& result,
1243
0
                                               const WriteBlockCallbackContext& ctx) {
1244
0
    std::lock_guard<std::mutex> l(this->_closed_lock);
1245
0
    if (this->_is_closed) {
1246
        // if the node channel is closed, no need to call the following logic,
1247
        // and notice that _index_channel may already be destroyed.
1248
0
        return;
1249
0
    }
1250
0
    SCOPED_ATTACH_TASK(_state);
1251
0
    Status status(Status::create(result.status()));
1252
0
    if (status.ok()) {
1253
0
        _refresh_back_pressure_version_wait_time(result.tablet_load_rowset_num_infos());
1254
        // if has error tablet, handle them first
1255
0
        for (const auto& error : result.tablet_errors()) {
1256
0
            _index_channel->mark_as_failed(this, "tablet error: " + error.msg(), error.tablet_id());
1257
0
        }
1258
1259
0
        Status st = _index_channel->check_intolerable_failure();
1260
0
        if (!st.ok()) {
1261
0
            _cancel_with_msg(st.to_string());
1262
0
        } else if (ctx._is_last_rpc) {
1263
0
            bool skip_tablet_info = false;
1264
0
            DBUG_EXECUTE_IF("VNodeChannel.add_block_success_callback.incomplete_commit_info",
1265
0
                            { skip_tablet_info = true; });
1266
0
            for (const auto& tablet : result.tablet_vec()) {
1267
0
                DBUG_EXECUTE_IF("VNodeChannel.add_block_success_callback.incomplete_commit_info", {
1268
0
                    if (skip_tablet_info) {
1269
0
                        LOG(INFO) << "skip tablet info: " << tablet.tablet_id();
1270
0
                        skip_tablet_info = false;
1271
0
                        continue;
1272
0
                    }
1273
0
                });
1274
0
                TTabletCommitInfo commit_info;
1275
0
                commit_info.tabletId = tablet.tablet_id();
1276
0
                commit_info.backendId = _node_id;
1277
0
                _tablet_commit_infos.emplace_back(std::move(commit_info));
1278
0
                if (tablet.has_received_rows()) {
1279
0
                    _tablets_received_rows.emplace_back(tablet.tablet_id(), tablet.received_rows());
1280
0
                }
1281
0
                if (tablet.has_num_rows_filtered()) {
1282
0
                    _tablets_filtered_rows.emplace_back(tablet.tablet_id(),
1283
0
                                                        tablet.num_rows_filtered());
1284
0
                }
1285
0
                VLOG_CRITICAL << "master replica commit info: tabletId=" << tablet.tablet_id()
1286
0
                              << ", backendId=" << _node_id
1287
0
                              << ", master node id: " << this->node_id()
1288
0
                              << ", host: " << this->host() << ", txn_id=" << _parent->_txn_id;
1289
0
            }
1290
0
            if (_parent->_write_single_replica) {
1291
0
                for (const auto& tablet_slave_node_ids : result.success_slave_tablet_node_ids()) {
1292
0
                    for (auto slave_node_id : tablet_slave_node_ids.second.slave_node_ids()) {
1293
0
                        TTabletCommitInfo commit_info;
1294
0
                        commit_info.tabletId = tablet_slave_node_ids.first;
1295
0
                        commit_info.backendId = slave_node_id;
1296
0
                        _tablet_commit_infos.emplace_back(std::move(commit_info));
1297
0
                        VLOG_CRITICAL
1298
0
                                << "slave replica commit info: tabletId="
1299
0
                                << tablet_slave_node_ids.first << ", backendId=" << slave_node_id
1300
0
                                << ", master node id: " << this->node_id()
1301
0
                                << ", host: " << this->host() << ", txn_id=" << _parent->_txn_id;
1302
0
                    }
1303
0
                }
1304
0
            }
1305
0
            _add_batches_finished = true;
1306
0
            _index_channel->notify_close_wait();
1307
0
        }
1308
0
    } else {
1309
0
        _cancel_with_msg(fmt::format("{}, add batch req success but status isn't ok, err: {}",
1310
0
                                     channel_info(), status.to_string()));
1311
0
    }
1312
1313
0
    if (result.has_execution_time_us()) {
1314
0
        _add_batch_counter.add_batch_execution_time_us += result.execution_time_us();
1315
0
        _add_batch_counter.add_batch_wait_execution_time_us += result.wait_execution_time_us();
1316
0
        _add_batch_counter.add_batch_num++;
1317
0
    }
1318
0
    if (result.has_load_channel_profile()) {
1319
0
        TRuntimeProfileTree tprofile;
1320
0
        const auto* buf = (const uint8_t*)result.load_channel_profile().data();
1321
0
        auto len = cast_set<uint32_t>(result.load_channel_profile().size());
1322
0
        auto st = deserialize_thrift_msg(buf, &len, false, &tprofile);
1323
0
        if (st.ok()) {
1324
0
            _state->load_channel_profile()->update(tprofile);
1325
0
        } else {
1326
0
            LOG(WARNING) << "load channel TRuntimeProfileTree deserialize failed, errmsg=" << st;
1327
0
        }
1328
0
    }
1329
0
}
1330
1331
0
void VNodeChannel::_add_block_failed_callback(const WriteBlockCallbackContext& ctx) {
1332
0
    std::lock_guard<std::mutex> l(this->_closed_lock);
1333
0
    if (this->_is_closed) {
1334
        // if the node channel is closed, no need to call `mark_as_failed`,
1335
        // and notice that _index_channel may already be destroyed.
1336
0
        return;
1337
0
    }
1338
0
    SCOPED_ATTACH_TASK(_state);
1339
    // If rpc failed, mark all tablets on this node channel as failed
1340
0
    _index_channel->mark_as_failed(this,
1341
0
                                   fmt::format("rpc failed, error code:{}, error text:{}",
1342
0
                                               _send_block_callback->cntl_->ErrorCode(),
1343
0
                                               _send_block_callback->cntl_->ErrorText()),
1344
0
                                   -1);
1345
0
    if (_send_block_callback->cntl_->ErrorText().find("Reached timeout") != std::string::npos) {
1346
0
        LOG(WARNING) << "rpc failed may caused by timeout. increase BE config "
1347
0
                        "`min_load_rpc_timeout_ms` of to avoid this if you are sure that your "
1348
0
                        "table building and data are reasonable.";
1349
0
    }
1350
0
    Status st = _index_channel->check_intolerable_failure();
1351
0
    if (!st.ok()) {
1352
0
        _cancel_with_msg(fmt::format("{}, err: {}", channel_info(), st.to_string()));
1353
0
    } else if (ctx._is_last_rpc) {
1354
        // if this is last rpc, will must set _add_batches_finished. otherwise, node channel's close_wait
1355
        // will be blocked.
1356
0
        _add_batches_finished = true;
1357
0
        _index_channel->notify_close_wait();
1358
0
    }
1359
0
}
1360
1361
// When _cancelled is true, we still need to send a tablet_writer_cancel
1362
// rpc request to truly release the load channel
1363
0
void VNodeChannel::cancel(const std::string& cancel_msg) {
1364
0
    if (_is_closed) {
1365
        // skip the channels that have been canceled or close_wait.
1366
0
        return;
1367
0
    }
1368
0
    SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
1369
    // set _is_closed to true finally
1370
0
    Defer set_closed {[&]() {
1371
0
        std::lock_guard<std::mutex> l(_closed_lock);
1372
0
        _is_closed = true;
1373
0
    }};
1374
    // we don't need to wait last rpc finished, cause closure's release/reset will join.
1375
    // But do we need brpc::StartCancel(call_id)?
1376
0
    _cancel_with_msg(cancel_msg);
1377
    // if not inited, _stub will be nullptr, skip sending cancel rpc
1378
0
    if (!_inited) {
1379
0
        return;
1380
0
    }
1381
1382
0
    auto request = std::make_shared<PTabletWriterCancelRequest>();
1383
0
    request->mutable_id()->CopyFrom(_parent->_load_id);
1384
0
    request->set_index_id(_index_channel->_index_id);
1385
0
    request->set_sender_id(_parent->_sender_id);
1386
0
    request->set_cancel_reason(cancel_msg);
1387
1388
    // cancel is already in post-processing, so error status could be ignored. so not keeping cancel_callback is acceptable.
1389
0
    auto cancel_callback = DummyBrpcCallback<PTabletWriterCancelResult>::create_shared();
1390
0
    auto closure = AutoReleaseClosure<
1391
0
            PTabletWriterCancelRequest,
1392
0
            DummyBrpcCallback<PTabletWriterCancelResult>>::create_unique(request, cancel_callback);
1393
1394
0
    auto remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS;
1395
0
    if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
1396
0
        remain_ms = config::min_load_rpc_timeout_ms;
1397
0
    }
1398
0
    cancel_callback->cntl_->set_timeout_ms(remain_ms);
1399
0
    if (config::tablet_writer_ignore_eovercrowded) {
1400
0
        closure->cntl_->ignore_eovercrowded();
1401
0
    }
1402
0
    _stub->tablet_writer_cancel(closure->cntl_.get(), closure->request_.get(),
1403
0
                                closure->response_.get(), closure.get());
1404
0
    closure.release();
1405
0
}
1406
1407
0
Status VNodeChannel::close_wait(RuntimeState* state, bool* is_closed) {
1408
0
    DBUG_EXECUTE_IF("VNodeChannel.close_wait_full_gc", {
1409
0
        std::thread t(injection_full_gc_fn);
1410
0
        t.join();
1411
0
    });
1412
0
    SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
1413
1414
0
    *is_closed = true;
1415
1416
0
    auto st = none_of({_cancelled, !_eos_is_produced});
1417
0
    if (!st.ok()) {
1418
0
        if (_cancelled) {
1419
0
            std::lock_guard<std::mutex> l(_cancel_msg_lock);
1420
0
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("wait close failed. {}",
1421
0
                                                                   _cancel_msg);
1422
0
        } else {
1423
0
            return std::move(
1424
0
                    st.prepend("already stopped, skip waiting for close. cancelled/!eos: "));
1425
0
        }
1426
0
    }
1427
1428
0
    DBUG_EXECUTE_IF("VNodeChannel.close_wait.cancelled", {
1429
0
        _cancelled = true;
1430
0
        _cancel_msg = "injected cancel";
1431
0
    });
1432
1433
0
    if (state->is_cancelled()) {
1434
0
        _cancel_with_msg(state->cancel_reason().to_string());
1435
0
    }
1436
1437
    // Waiting for finished until _add_batches_finished changed by rpc's finished callback.
1438
    // it may take a long time, so we couldn't set a timeout
1439
    // For pipeline engine, the close is called in async writer's process block method,
1440
    // so that it will not block pipeline thread.
1441
0
    if (!_add_batches_finished && !_cancelled && !state->is_cancelled()) {
1442
0
        *is_closed = false;
1443
0
        return Status::OK();
1444
0
    }
1445
0
    VLOG_CRITICAL << _parent->_sender_id << " close wait finished";
1446
0
    return Status::OK();
1447
0
}
1448
1449
Status VNodeChannel::after_close_handle(
1450
        RuntimeState* state, WriterStats* writer_stats,
1451
0
        std::unordered_map<int64_t, AddBatchCounter>* node_add_batch_counter_map) {
1452
0
    Status st = Status::Error<ErrorCode::INTERNAL_ERROR, false>(get_cancel_msg());
1453
0
    _close_time_ms = UnixMillis() - _close_time_ms;
1454
1455
0
    if (_add_batches_finished) {
1456
0
        _close_check();
1457
0
        _state->add_tablet_commit_infos(_tablet_commit_infos);
1458
1459
0
        _index_channel->set_error_tablet_in_state(state);
1460
0
        _index_channel->set_tablets_received_rows(_tablets_received_rows, _node_id);
1461
0
        _index_channel->set_tablets_filtered_rows(_tablets_filtered_rows, _node_id);
1462
1463
0
        std::lock_guard<std::mutex> l(_closed_lock);
1464
        // only when normal close, we set _is_closed to true.
1465
        // otherwise, we will set it to true in cancel().
1466
0
        _is_closed = true;
1467
0
        st = Status::OK();
1468
0
    }
1469
1470
0
    time_report(node_add_batch_counter_map, writer_stats);
1471
0
    return st;
1472
0
}
1473
1474
0
Status VNodeChannel::check_status() {
1475
0
    return none_of({_cancelled, !_eos_is_produced});
1476
0
}
1477
1478
0
void VNodeChannel::_close_check() {
1479
0
    std::lock_guard<std::mutex> lg(_pending_batches_lock);
1480
0
    CHECK(_pending_blocks.empty()) << name();
1481
0
    CHECK(_cur_mutable_block == nullptr) << name();
1482
0
}
1483
1484
0
void VNodeChannel::mark_close(bool hang_wait) {
1485
0
    auto st = none_of({_cancelled, _eos_is_produced});
1486
0
    if (!st.ok()) {
1487
0
        return;
1488
0
    }
1489
1490
0
    bool need_adaptive_random_bucket_eos = _cur_add_block_request->is_adaptive_random_bucket();
1491
0
    {
1492
0
        std::lock_guard<std::mutex> l(_pending_batches_lock);
1493
0
        if (!_cur_mutable_block) [[unlikely]] {
1494
            // never had a block arrived. add a dummy block
1495
0
            _cur_mutable_block = MutableBlock::create_unique();
1496
0
        }
1497
0
        if (need_adaptive_random_bucket_eos && _cur_mutable_block->rows() > 0) {
1498
0
            _cur_add_block_request->set_eos(false);
1499
0
            auto tmp_add_block_request =
1500
0
                    std::make_shared<PTabletWriterAddBlockRequest>(*_cur_add_block_request);
1501
0
            _pending_blocks.emplace(std::move(_cur_mutable_block), tmp_add_block_request);
1502
0
            _pending_batches_num++;
1503
0
            _cur_add_block_request->clear_tablet_ids();
1504
0
            _cur_add_block_request->clear_partition_ids();
1505
0
            _cur_mutable_block = MutableBlock::create_unique();
1506
0
        }
1507
0
        _cur_add_block_request->set_eos(true);
1508
0
        _cur_add_block_request->set_hang_wait(hang_wait);
1509
0
        auto tmp_add_block_request =
1510
0
                std::make_shared<PTabletWriterAddBlockRequest>(*_cur_add_block_request);
1511
        // when prepare to close, add block to queue so that try_send_pending_block thread will send it.
1512
0
        _pending_blocks.emplace(std::move(_cur_mutable_block), tmp_add_block_request);
1513
0
        _pending_batches_num++;
1514
0
        DCHECK(_pending_blocks.back().second->eos());
1515
0
        _close_time_ms = UnixMillis();
1516
0
        LOG(INFO) << channel_info()
1517
0
                  << " mark closed, left pending batch size: " << _pending_blocks.size()
1518
0
                  << " hang_wait: " << hang_wait;
1519
0
    }
1520
1521
0
    _eos_is_produced = true;
1522
0
}
1523
1524
VTabletWriter::VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs,
1525
                             std::shared_ptr<Dependency> dep, std::shared_ptr<Dependency> fin_dep)
1526
0
        : AsyncResultWriter(output_exprs, dep, fin_dep), _t_sink(t_sink) {
1527
0
    _transfer_large_data_by_brpc = config::transfer_large_data_by_brpc;
1528
0
}
1529
1530
0
void VTabletWriter::_send_batch_process() {
1531
0
    SCOPED_TIMER(_non_blocking_send_timer);
1532
0
    SCOPED_ATTACH_TASK(_state);
1533
0
    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
1534
1535
0
    int sleep_time = int(config::olap_table_sink_send_interval_microseconds *
1536
0
                         (_vpartition->is_auto_partition()
1537
0
                                  ? config::olap_table_sink_send_interval_auto_partition_factor
1538
0
                                  : 1));
1539
1540
0
    while (true) {
1541
        // incremental open will temporarily make channels into abnormal state. stop checking when this.
1542
0
        std::unique_lock<bthread::Mutex> l(_stop_check_channel);
1543
1544
0
        int running_channels_num = 0;
1545
0
        int opened_nodes = 0;
1546
0
        for (const auto& index_channel : _channels) {
1547
0
            index_channel->for_each_node_channel([&running_channels_num,
1548
0
                                                  this](const std::shared_ptr<VNodeChannel>& ch) {
1549
                // if this channel all completed(cancelled), got 0. else 1.
1550
0
                running_channels_num +=
1551
0
                        ch->try_send_and_fetch_status(_state, this->_send_batch_thread_pool_token);
1552
0
            });
1553
0
            opened_nodes += index_channel->num_node_channels();
1554
0
        }
1555
1556
        // auto partition table may have no node channel temporarily. wait to open.
1557
0
        if (opened_nodes != 0 && running_channels_num == 0) {
1558
0
            LOG(INFO) << "All node channels are stopped(maybe finished/offending/cancelled), "
1559
0
                         "sender thread exit. "
1560
0
                      << print_id(_load_id);
1561
0
            return;
1562
0
        }
1563
1564
        // for auto partition tables, there's a situation: we haven't open any node channel but decide to cancel the task.
1565
        // then the judge in front will never be true because opened_nodes won't increase. so we have to specially check wether we called close.
1566
        // we must RECHECK opened_nodes below, after got closed signal, because it may changed. Think of this:
1567
        //      checked opened_nodes = 0 ---> new block arrived ---> task finished, close() was called ---> we got _try_close here
1568
        // if we don't check again, we may lose the last package.
1569
0
        if (_try_close.load(std::memory_order_acquire)) {
1570
0
            opened_nodes = 0;
1571
0
            std::ranges::for_each(_channels,
1572
0
                                  [&opened_nodes](const std::shared_ptr<IndexChannel>& ich) {
1573
0
                                      opened_nodes += ich->num_node_channels();
1574
0
                                  });
1575
0
            if (opened_nodes == 0) {
1576
0
                LOG(INFO) << "No node channel have ever opened but now we have to close. sender "
1577
0
                             "thread exit. "
1578
0
                          << print_id(_load_id);
1579
0
                return;
1580
0
            }
1581
0
        }
1582
0
        bthread_usleep(sleep_time);
1583
0
    }
1584
0
}
1585
1586
0
static void* periodic_send_batch(void* writer) {
1587
0
    auto* tablet_writer = (VTabletWriter*)(writer);
1588
0
    tablet_writer->_send_batch_process();
1589
0
    return nullptr;
1590
0
}
1591
1592
0
Status VTabletWriter::open(doris::RuntimeState* state, doris::RuntimeProfile* profile) {
1593
0
    RETURN_IF_ERROR(_init(state, profile));
1594
0
    signal::set_signal_task_id(_load_id);
1595
0
    SCOPED_TIMER(profile->total_time_counter());
1596
0
    SCOPED_TIMER(_open_timer);
1597
0
    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
1598
1599
0
    fmt::memory_buffer buf;
1600
0
    for (const auto& index_channel : _channels) {
1601
0
        fmt::format_to(buf, "index id:{}", index_channel->_index_id);
1602
0
        index_channel->for_each_node_channel(
1603
0
                [](const std::shared_ptr<VNodeChannel>& ch) { ch->open(); });
1604
0
    }
1605
0
    VLOG_DEBUG << "list of open index id = " << fmt::to_string(buf);
1606
1607
0
    for (const auto& index_channel : _channels) {
1608
0
        index_channel->set_start_time(UnixMillis());
1609
0
        index_channel->for_each_node_channel([&index_channel](
1610
0
                                                     const std::shared_ptr<VNodeChannel>& ch) {
1611
0
            auto st = ch->open_wait();
1612
0
            if (!st.ok()) {
1613
                // The open() phase is mainly to generate DeltaWriter instances on the nodes corresponding to each node channel.
1614
                // This phase will not fail due to a single tablet.
1615
                // Therefore, if the open() phase fails, all tablets corresponding to the node need to be marked as failed.
1616
0
                index_channel->mark_as_failed(
1617
0
                        ch.get(),
1618
0
                        fmt::format("{}, open failed, err: {}", ch->channel_info(), st.to_string()),
1619
0
                        -1);
1620
0
            }
1621
0
        });
1622
1623
0
        RETURN_IF_ERROR(index_channel->check_intolerable_failure());
1624
0
    }
1625
0
    _send_batch_thread_pool_token = state->exec_env()->send_batch_thread_pool()->new_token(
1626
0
            ThreadPool::ExecutionMode::CONCURRENT, _send_batch_parallelism);
1627
1628
    // start to send batch continually. this must be called after _init
1629
0
    if (bthread_start_background(&_sender_thread, nullptr, periodic_send_batch, (void*)this) != 0) {
1630
0
        return Status::Error<ErrorCode::INTERNAL_ERROR>("bthread_start_backgroud failed");
1631
0
    }
1632
0
    return Status::OK();
1633
0
}
1634
1635
0
Status VTabletWriter::on_partitions_created(TCreatePartitionResult* result) {
1636
    // add new tablet locations. it will use by address. so add to pool
1637
0
    auto* new_locations = _pool->add(new std::vector<TTabletLocation>(result->tablets));
1638
0
    _location->add_locations(*new_locations);
1639
0
    if (_write_single_replica) {
1640
0
        auto* slave_locations = _pool->add(new std::vector<TTabletLocation>(result->slave_tablets));
1641
0
        _slave_location->add_locations(*slave_locations);
1642
0
    }
1643
1644
    // update new node info
1645
0
    _nodes_info->add_nodes(result->nodes);
1646
1647
    // incremental open node channel
1648
0
    RETURN_IF_ERROR(_incremental_open_node_channel(result->partitions));
1649
1650
0
    return Status::OK();
1651
0
}
1652
1653
0
static Status on_partitions_created(void* writer, TCreatePartitionResult* result) {
1654
0
    return static_cast<VTabletWriter*>(writer)->on_partitions_created(result);
1655
0
}
1656
1657
0
Status VTabletWriter::_init_row_distribution() {
1658
0
    _row_distribution.init({.state = _state,
1659
0
                            .block_convertor = _block_convertor.get(),
1660
0
                            .tablet_finder = _tablet_finder.get(),
1661
0
                            .vpartition = _vpartition,
1662
0
                            .add_partition_request_timer = _add_partition_request_timer,
1663
0
                            .txn_id = _txn_id,
1664
0
                            .pool = _pool,
1665
0
                            .location = _location,
1666
0
                            .vec_output_expr_ctxs = &_vec_output_expr_ctxs,
1667
0
                            .schema = _schema,
1668
0
                            .caller = this,
1669
0
                            .write_single_replica = _write_single_replica,
1670
0
                            .create_partition_callback = &::doris::on_partitions_created});
1671
1672
0
    return _row_distribution.open(_output_row_desc);
1673
0
}
1674
1675
0
Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) {
1676
0
    DCHECK(_t_sink.__isset.olap_table_sink);
1677
0
    _pool = state->obj_pool();
1678
0
    auto& table_sink = _t_sink.olap_table_sink;
1679
0
    _load_id.set_hi(table_sink.load_id.hi);
1680
0
    _load_id.set_lo(table_sink.load_id.lo);
1681
0
    _txn_id = table_sink.txn_id;
1682
0
    _num_replicas = table_sink.num_replicas;
1683
0
    _tuple_desc_id = table_sink.tuple_id;
1684
0
    _write_file_cache = table_sink.write_file_cache;
1685
0
    _schema.reset(new OlapTableSchemaParam());
1686
0
    RETURN_IF_ERROR(_schema->init(table_sink.schema));
1687
0
    _schema->set_timestamp_ms(state->timestamp_ms());
1688
0
    _schema->set_nano_seconds(state->nano_seconds());
1689
0
    _schema->set_timezone(state->timezone());
1690
0
    _location = _pool->add(new OlapTableLocationParam(table_sink.location));
1691
0
    _nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info));
1692
0
    if (table_sink.__isset.write_single_replica && table_sink.write_single_replica) {
1693
0
        _write_single_replica = true;
1694
0
        _slave_location = _pool->add(new OlapTableLocationParam(table_sink.slave_location));
1695
0
        if (!config::enable_single_replica_load) {
1696
0
            return Status::InternalError("single replica load is disabled on BE.");
1697
0
        }
1698
0
    }
1699
1700
0
    if (config::is_cloud_mode() &&
1701
0
        (!table_sink.__isset.txn_timeout_s || table_sink.txn_timeout_s <= 0)) {
1702
0
        return Status::InternalError("The txn_timeout_s of TDataSink is invalid");
1703
0
    }
1704
0
    _txn_expiration = ::time(nullptr) + table_sink.txn_timeout_s;
1705
1706
0
    if (table_sink.__isset.load_channel_timeout_s) {
1707
0
        _load_channel_timeout_s = table_sink.load_channel_timeout_s;
1708
0
    } else {
1709
0
        _load_channel_timeout_s = config::streaming_load_rpc_max_alive_time_sec;
1710
0
    }
1711
0
    if (table_sink.__isset.send_batch_parallelism && table_sink.send_batch_parallelism > 1) {
1712
0
        _send_batch_parallelism = table_sink.send_batch_parallelism;
1713
0
    }
1714
    // If distributed column list is empty, the table uses random distribution.
1715
    // Mode priority (highest to lowest):
1716
    //   1. FIND_TABLET_EVERY_SINK: load_to_single_tablet=true (legacy single-tablet mode).
1717
    //   2. FIND_TABLET_RANDOM_BUCKET: FE set enable_adaptive_random_bucket on the sink,
1718
    //      meaning enable_adaptive_random_bucket_load is ON. Using a sink-level flag (mirroring
1719
    //      load_to_single_tablet) ensures the mode is fixed correctly when the initial
1720
    //      partition list is empty (e.g. auto-partition tables on first load).
1721
    //   3. FIND_TABLET_EVERY_BATCH: default round-robin per batch.
1722
0
    auto find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW;
1723
0
    if (table_sink.partition.distributed_columns.empty()) {
1724
0
        if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) {
1725
0
            find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_SINK;
1726
0
        } else if (table_sink.__isset.enable_adaptive_random_bucket &&
1727
0
                   table_sink.enable_adaptive_random_bucket && config::is_cloud_mode()) {
1728
0
            find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_RANDOM_BUCKET;
1729
0
        } else {
1730
0
            find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_BATCH;
1731
0
        }
1732
0
    }
1733
0
    _vpartition = _pool->add(new doris::VOlapTablePartitionParam(_schema, table_sink.partition));
1734
0
    _tablet_finder = std::make_unique<OlapTabletFinder>(_vpartition, find_tablet_mode);
1735
0
    RETURN_IF_ERROR(_vpartition->init());
1736
1737
0
    _state = state;
1738
0
    _operator_profile = profile;
1739
1740
0
    _sender_id = state->per_fragment_instance_idx();
1741
0
    _num_senders = state->num_per_fragment_instances();
1742
0
    _is_high_priority =
1743
0
            (state->execution_timeout() <= config::load_task_high_priority_threshold_second);
1744
0
    DBUG_EXECUTE_IF("VTabletWriter._init.is_high_priority", { _is_high_priority = true; });
1745
    // profile must add to state's object pool
1746
0
    _mem_tracker =
1747
0
            std::make_shared<MemTracker>("OlapTableSink:" + std::to_string(state->load_job_id()));
1748
0
    SCOPED_TIMER(profile->total_time_counter());
1749
0
    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
1750
1751
    // get table's tuple descriptor
1752
0
    _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_desc_id);
1753
0
    if (_output_tuple_desc == nullptr) {
1754
0
        LOG(WARNING) << "unknown destination tuple descriptor, id=" << _tuple_desc_id;
1755
0
        return Status::InternalError("unknown destination tuple descriptor");
1756
0
    }
1757
1758
0
    if (!_vec_output_expr_ctxs.empty() &&
1759
0
        _output_tuple_desc->slots().size() != _vec_output_expr_ctxs.size()) {
1760
0
        LOG(WARNING) << "output tuple slot num should be equal to num of output exprs, "
1761
0
                     << "output_tuple_slot_num " << _output_tuple_desc->slots().size()
1762
0
                     << " output_expr_num " << _vec_output_expr_ctxs.size();
1763
0
        return Status::InvalidArgument(
1764
0
                "output_tuple_slot_num {} should be equal to output_expr_num {}",
1765
0
                _output_tuple_desc->slots().size(), _vec_output_expr_ctxs.size());
1766
0
    }
1767
1768
0
    _block_convertor = std::make_unique<OlapTableBlockConvertor>(_output_tuple_desc);
1769
    // if partition_type is OLAP_TABLE_SINK_HASH_PARTITIONED, we handle the processing of auto_increment column
1770
    // on exchange node rather than on TabletWriter
1771
0
    _block_convertor->init_autoinc_info(
1772
0
            _schema->db_id(), _schema->table_id(), _state->batch_size(),
1773
0
            _schema->is_fixed_partial_update() && !_schema->auto_increment_coulumn().empty(),
1774
0
            _schema->auto_increment_column_unique_id());
1775
0
    _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc));
1776
1777
    // add all counter
1778
0
    _input_rows_counter = ADD_COUNTER(profile, "RowsRead", TUnit::UNIT);
1779
0
    _output_rows_counter = ADD_COUNTER(profile, "RowsProduced", TUnit::UNIT);
1780
0
    _filtered_rows_counter = ADD_COUNTER(profile, "RowsFiltered", TUnit::UNIT);
1781
0
    _send_data_timer = ADD_TIMER(profile, "SendDataTime");
1782
0
    _wait_mem_limit_timer = ADD_CHILD_TIMER(profile, "WaitMemLimitTime", "SendDataTime");
1783
0
    _row_distribution_timer = ADD_CHILD_TIMER(profile, "RowDistributionTime", "SendDataTime");
1784
0
    _filter_timer = ADD_CHILD_TIMER(profile, "FilterTime", "SendDataTime");
1785
0
    _where_clause_timer = ADD_CHILD_TIMER(profile, "WhereClauseTime", "SendDataTime");
1786
0
    _append_node_channel_timer = ADD_CHILD_TIMER(profile, "AppendNodeChannelTime", "SendDataTime");
1787
0
    _add_partition_request_timer =
1788
0
            ADD_CHILD_TIMER(profile, "AddPartitionRequestTime", "SendDataTime");
1789
0
    _validate_data_timer = ADD_TIMER(profile, "ValidateDataTime");
1790
0
    _open_timer = ADD_TIMER(profile, "OpenTime");
1791
0
    _close_timer = ADD_TIMER(profile, "CloseWaitTime");
1792
0
    _non_blocking_send_timer = ADD_TIMER(profile, "NonBlockingSendTime");
1793
0
    _non_blocking_send_work_timer =
1794
0
            ADD_CHILD_TIMER(profile, "NonBlockingSendWorkTime", "NonBlockingSendTime");
1795
0
    _serialize_batch_timer =
1796
0
            ADD_CHILD_TIMER(profile, "SerializeBatchTime", "NonBlockingSendWorkTime");
1797
0
    _total_add_batch_exec_timer = ADD_TIMER(profile, "TotalAddBatchExecTime");
1798
0
    _max_add_batch_exec_timer = ADD_TIMER(profile, "MaxAddBatchExecTime");
1799
0
    _total_wait_exec_timer = ADD_TIMER(profile, "TotalWaitExecTime");
1800
0
    _max_wait_exec_timer = ADD_TIMER(profile, "MaxWaitExecTime");
1801
0
    _add_batch_number = ADD_COUNTER(profile, "NumberBatchAdded", TUnit::UNIT);
1802
0
    _num_node_channels = ADD_COUNTER(profile, "NumberNodeChannels", TUnit::UNIT);
1803
0
    _load_back_pressure_version_time_ms = ADD_TIMER(profile, "LoadBackPressureVersionTimeMs");
1804
1805
#ifdef DEBUG
1806
    // check: tablet ids should be unique
1807
    {
1808
        std::unordered_set<int64_t> tablet_ids;
1809
        const auto& partitions = _vpartition->get_partitions();
1810
        for (int i = 0; i < _schema->indexes().size(); ++i) {
1811
            for (const auto& partition : partitions) {
1812
                for (const auto& tablet : partition->indexes[i].tablets) {
1813
                    CHECK(tablet_ids.count(tablet) == 0) << "found duplicate tablet id: " << tablet;
1814
                    tablet_ids.insert(tablet);
1815
                }
1816
            }
1817
        }
1818
    }
1819
#endif
1820
1821
    // open all channels
1822
0
    const auto& partitions = _vpartition->get_partitions();
1823
0
    for (int i = 0; i < _schema->indexes().size(); ++i) {
1824
        // collect all tablets belong to this rollup
1825
0
        std::vector<TTabletWithPartition> tablets;
1826
0
        auto* index = _schema->indexes()[i];
1827
0
        for (const auto& part : partitions) {
1828
0
            for (const auto& tablet : part->indexes[i].tablets) {
1829
0
                TTabletWithPartition tablet_with_partition;
1830
0
                tablet_with_partition.partition_id = part->id;
1831
0
                tablet_with_partition.tablet_id = tablet;
1832
0
                tablets.emplace_back(std::move(tablet_with_partition));
1833
0
                _build_tablet_replica_info(tablet, part);
1834
0
            }
1835
0
        }
1836
0
        if (tablets.empty() && !_vpartition->is_auto_partition()) {
1837
0
            LOG(WARNING) << "load job:" << state->load_job_id() << " index: " << index->index_id
1838
0
                         << " would open 0 tablet";
1839
0
        }
1840
0
        _channels.emplace_back(new IndexChannel(this, index->index_id, index->where_clause));
1841
0
        _index_id_to_channel[index->index_id] = _channels.back();
1842
0
        RETURN_IF_ERROR(_channels.back()->init(state, tablets));
1843
0
    }
1844
1845
0
    RETURN_IF_ERROR(_init_row_distribution());
1846
1847
0
    _inited = true;
1848
0
    return Status::OK();
1849
0
}
1850
1851
Status VTabletWriter::_incremental_open_node_channel(
1852
0
        const std::vector<TOlapTablePartition>& partitions) {
1853
    // do what we did in prepare() for partitions. indexes which don't change when we create new partition is orthogonal to partitions.
1854
0
    std::unique_lock<bthread::Mutex> _l(_stop_check_channel);
1855
0
    for (int i = 0; i < _schema->indexes().size(); ++i) {
1856
0
        const OlapTableIndexSchema* index = _schema->indexes()[i];
1857
0
        std::vector<TTabletWithPartition> tablets;
1858
0
        for (const auto& t_part : partitions) {
1859
0
            VOlapTablePartition* part = nullptr;
1860
0
            RETURN_IF_ERROR(_vpartition->generate_partition_from(t_part, part));
1861
0
            for (const auto& tablet : part->indexes[i].tablets) {
1862
0
                TTabletWithPartition tablet_with_partition;
1863
0
                tablet_with_partition.partition_id = part->id;
1864
0
                tablet_with_partition.tablet_id = tablet;
1865
0
                tablets.emplace_back(std::move(tablet_with_partition));
1866
0
                _build_tablet_replica_info(tablet, part);
1867
0
            }
1868
0
            DCHECK(!tablets.empty()) << "incremental open got nothing!";
1869
0
        }
1870
        // update and reinit for existing channels.
1871
0
        std::shared_ptr<IndexChannel> channel = _index_id_to_channel[index->index_id];
1872
0
        DCHECK(channel != nullptr);
1873
0
        RETURN_IF_ERROR(channel->init(_state, tablets, true)); // add tablets into it
1874
0
    }
1875
1876
0
    fmt::memory_buffer buf;
1877
0
    for (auto& channel : _channels) {
1878
        // incremental open new partition's tablet on storage side
1879
0
        channel->for_each_node_channel(
1880
0
                [](const std::shared_ptr<VNodeChannel>& ch) { ch->incremental_open(); });
1881
0
        fmt::format_to(buf, "index id:{}", channel->_index_id);
1882
0
        VLOG_DEBUG << "list of open index id = " << fmt::to_string(buf);
1883
1884
0
        channel->for_each_node_channel([&channel](const std::shared_ptr<VNodeChannel>& ch) {
1885
0
            auto st = ch->open_wait();
1886
0
            if (!st.ok()) {
1887
                // The open() phase is mainly to generate DeltaWriter instances on the nodes corresponding to each node channel.
1888
                // This phase will not fail due to a single tablet.
1889
                // Therefore, if the open() phase fails, all tablets corresponding to the node need to be marked as failed.
1890
0
                channel->mark_as_failed(
1891
0
                        ch.get(),
1892
0
                        fmt::format("{}, open failed, err: {}", ch->channel_info(), st.to_string()),
1893
0
                        -1);
1894
0
            }
1895
0
        });
1896
1897
0
        RETURN_IF_ERROR(channel->check_intolerable_failure());
1898
0
    }
1899
1900
0
    return Status::OK();
1901
0
}
1902
1903
void VTabletWriter::_build_tablet_replica_info(const int64_t tablet_id,
1904
0
                                               VOlapTablePartition* partition) {
1905
0
    if (partition != nullptr) {
1906
0
        int total_replicas_num =
1907
0
                partition->total_replica_num == 0 ? _num_replicas : partition->total_replica_num;
1908
0
        int load_required_replicas_num = partition->load_required_replica_num == 0
1909
0
                                                 ? (_num_replicas + 1) / 2
1910
0
                                                 : partition->load_required_replica_num;
1911
0
        _tablet_replica_info.emplace(
1912
0
                tablet_id, std::make_pair(total_replicas_num, load_required_replicas_num));
1913
        // Copy version gap backends info for this tablet
1914
0
        if (auto it = partition->tablet_version_gap_backends.find(tablet_id);
1915
0
            it != partition->tablet_version_gap_backends.end()) {
1916
0
            _tablet_version_gap_backends[tablet_id] = it->second;
1917
0
        }
1918
0
    } else {
1919
0
        _tablet_replica_info.emplace(tablet_id,
1920
0
                                     std::make_pair(_num_replicas, (_num_replicas + 1) / 2));
1921
0
    }
1922
0
}
1923
1924
0
void VTabletWriter::_cancel_all_channel(Status status) {
1925
0
    for (const auto& index_channel : _channels) {
1926
0
        index_channel->for_each_node_channel([&status](const std::shared_ptr<VNodeChannel>& ch) {
1927
0
            ch->cancel(status.to_string());
1928
0
        });
1929
0
    }
1930
0
    LOG(INFO) << fmt::format(
1931
0
            "close olap table sink. load_id={}, txn_id={}, canceled all node channels due to "
1932
0
            "error: {}",
1933
0
            print_id(_load_id), _txn_id, status);
1934
0
}
1935
1936
0
Status VTabletWriter::_send_new_partition_batch() {
1937
0
    if (_row_distribution.need_deal_batching()) { // maybe try_close more than 1 time
1938
0
        RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
1939
1940
0
        Block tmp_block = _row_distribution._batching_block->to_block(); // Borrow out, for lval ref
1941
1942
        // these order is unique.
1943
        //  1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block.
1944
        //  2. deal batched block
1945
        //  3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that.
1946
0
        _row_distribution.clear_batching_stats();
1947
0
        Defer recover_batching_block([&]() {
1948
0
            _row_distribution._batching_block->set_mutable_columns(
1949
0
                    std::move(tmp_block).mutate_columns());
1950
0
            _row_distribution._batching_block->clear_column_data();
1951
0
        });
1952
0
        RETURN_IF_ERROR(this->write(_state, tmp_block));
1953
0
        _row_distribution._deal_batched = false;
1954
0
    }
1955
0
    return Status::OK();
1956
0
}
1957
1958
0
void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status) {
1959
0
    SCOPED_TIMER(_close_timer);
1960
0
    Status status = exec_status;
1961
1962
    // must before set _try_close
1963
0
    if (status.ok()) {
1964
0
        SCOPED_TIMER(_operator_profile->total_time_counter());
1965
0
        _row_distribution._deal_batched = true;
1966
0
        status = _send_new_partition_batch();
1967
0
    }
1968
1969
0
    _try_close.store(true, std::memory_order_release); // will stop periodic thread
1970
0
    if (status.ok()) {
1971
        // BE id -> add_batch method counter
1972
0
        std::unordered_map<int64_t, AddBatchCounter> node_add_batch_counter_map;
1973
1974
        // only if status is ok can we call this _profile->total_time_counter().
1975
        // if status is not ok, this sink may not be prepared, so that _profile is null
1976
0
        SCOPED_TIMER(_operator_profile->total_time_counter());
1977
0
        for (const auto& index_channel : _channels) {
1978
            // two-step mark close. first we send close_origin to recievers to close all originly exist TabletsChannel.
1979
            // when they all closed, we are sure all Writer of instances called _do_try_close. that means no new channel
1980
            // will be opened. the refcount of recievers will be monotonically decreasing. then we are safe to close all
1981
            // our channels.
1982
0
            if (index_channel->has_incremental_node_channel()) {
1983
0
                if (!status.ok()) {
1984
0
                    break;
1985
0
                }
1986
0
                VLOG_TRACE << _sender_id << " first stage close start " << _txn_id;
1987
0
                index_channel->for_init_node_channel(
1988
0
                        [&index_channel, &status, this](const std::shared_ptr<VNodeChannel>& ch) {
1989
0
                            if (!status.ok() || ch->is_closed()) {
1990
0
                                return;
1991
0
                            }
1992
0
                            VLOG_DEBUG << index_channel->_parent->_sender_id << "'s " << ch->host()
1993
0
                                       << "mark close1 for inits " << _txn_id;
1994
0
                            ch->mark_close(true);
1995
0
                            if (ch->is_cancelled()) {
1996
0
                                status = cancel_channel_and_check_intolerable_failure(
1997
0
                                        std::move(status), ch->get_cancel_msg(), *index_channel,
1998
0
                                        *ch);
1999
0
                            }
2000
0
                        });
2001
0
                if (!status.ok()) {
2002
0
                    break;
2003
0
                }
2004
                // Do not need to wait after quorum success,
2005
                // for first-stage close_wait only ensure incremental node channels load has been completed,
2006
                // unified waiting in the second-stage close_wait.
2007
0
                status = index_channel->close_wait(_state, nullptr, nullptr,
2008
0
                                                   index_channel->init_node_channel_ids(), false);
2009
0
                if (!status.ok()) {
2010
0
                    break;
2011
0
                }
2012
0
                VLOG_DEBUG << _sender_id << " first stage finished. closeing inc nodes " << _txn_id;
2013
0
                index_channel->for_inc_node_channel(
2014
0
                        [&index_channel, &status, this](const std::shared_ptr<VNodeChannel>& ch) {
2015
0
                            if (!status.ok() || ch->is_closed()) {
2016
0
                                return;
2017
0
                            }
2018
                            // only first try close, all node channels will mark_close()
2019
0
                            VLOG_DEBUG << index_channel->_parent->_sender_id << "'s " << ch->host()
2020
0
                                       << "mark close2 for inc " << _txn_id;
2021
0
                            ch->mark_close();
2022
0
                            if (ch->is_cancelled()) {
2023
0
                                status = cancel_channel_and_check_intolerable_failure(
2024
0
                                        std::move(status), ch->get_cancel_msg(), *index_channel,
2025
0
                                        *ch);
2026
0
                            }
2027
0
                        });
2028
0
            } else { // not has_incremental_node_channel
2029
0
                VLOG_TRACE << _sender_id << " has no incremental channels " << _txn_id;
2030
0
                index_channel->for_each_node_channel(
2031
0
                        [&index_channel, &status](const std::shared_ptr<VNodeChannel>& ch) {
2032
0
                            if (!status.ok() || ch->is_closed()) {
2033
0
                                return;
2034
0
                            }
2035
                            // only first try close, all node channels will mark_close()
2036
0
                            ch->mark_close();
2037
0
                            if (ch->is_cancelled()) {
2038
0
                                status = cancel_channel_and_check_intolerable_failure(
2039
0
                                        std::move(status), ch->get_cancel_msg(), *index_channel,
2040
0
                                        *ch);
2041
0
                            }
2042
0
                        });
2043
0
            }
2044
0
        } // end for index channels
2045
0
    }
2046
2047
0
    if (!status.ok()) {
2048
0
        _cancel_all_channel(status);
2049
0
        _close_status = status;
2050
0
    }
2051
0
}
2052
2053
0
Status VTabletWriter::close(Status exec_status) {
2054
0
    if (!_inited) {
2055
0
        DCHECK(!exec_status.ok());
2056
0
        _cancel_all_channel(exec_status);
2057
0
        _close_status = exec_status;
2058
0
        return _close_status;
2059
0
    }
2060
2061
0
    SCOPED_TIMER(_close_timer);
2062
0
    SCOPED_TIMER(_operator_profile->total_time_counter());
2063
2064
    // will make the last batch of request-> close_wait will wait this finished.
2065
0
    _do_try_close(_state, exec_status);
2066
0
    TEST_INJECTION_POINT("VOlapTableSink::close");
2067
2068
0
    DBUG_EXECUTE_IF("VTabletWriter.close.sleep", {
2069
0
        auto sleep_sec = DebugPoints::instance()->get_debug_param_or_default<int32_t>(
2070
0
                "VTabletWriter.close.sleep", "sleep_sec", 1);
2071
0
        std::this_thread::sleep_for(std::chrono::seconds(sleep_sec));
2072
0
    });
2073
0
    DBUG_EXECUTE_IF("VTabletWriter.close.close_status_not_ok",
2074
0
                    { _close_status = Status::InternalError("injected close status not ok"); });
2075
2076
    // If _close_status is not ok, all nodes have been canceled in try_close.
2077
0
    if (_close_status.ok()) {
2078
0
        auto status = Status::OK();
2079
        // BE id -> add_batch method counter
2080
0
        std::unordered_map<int64_t, AddBatchCounter> node_add_batch_counter_map;
2081
0
        WriterStats writer_stats;
2082
2083
0
        for (const auto& index_channel : _channels) {
2084
0
            if (!status.ok()) {
2085
0
                break;
2086
0
            }
2087
0
            int64_t add_batch_exec_time = 0;
2088
0
            int64_t wait_exec_time = 0;
2089
0
            status = index_channel->close_wait(_state, &writer_stats, &node_add_batch_counter_map,
2090
0
                                               index_channel->each_node_channel_ids(), true);
2091
2092
            // Due to the non-determinism of compaction, the rowsets of each replica may be different from each other on different
2093
            // BE nodes. The number of rows filtered in SegmentWriter depends on the historical rowsets located in the correspoding
2094
            // BE node. So we check the number of rows filtered on each succeccful BE to ensure the consistency of the current load
2095
0
            if (status.ok() && !_write_single_replica && _schema->is_strict_mode() &&
2096
0
                _schema->is_partial_update()) {
2097
0
                if (Status st = index_channel->check_tablet_filtered_rows_consistency(); !st.ok()) {
2098
0
                    status = st;
2099
0
                } else {
2100
0
                    _state->set_num_rows_filtered_in_strict_mode_partial_update(
2101
0
                            index_channel->num_rows_filtered());
2102
0
                }
2103
0
            }
2104
2105
0
            writer_stats.num_node_channels += index_channel->num_node_channels();
2106
0
            writer_stats.max_add_batch_exec_time_ns =
2107
0
                    std::max(add_batch_exec_time, writer_stats.max_add_batch_exec_time_ns);
2108
0
            writer_stats.max_wait_exec_time_ns =
2109
0
                    std::max(wait_exec_time, writer_stats.max_wait_exec_time_ns);
2110
0
        } // end for index channels
2111
2112
0
        if (status.ok()) {
2113
            // TODO need to be improved
2114
0
            LOG(INFO) << "total mem_exceeded_block_ns="
2115
0
                      << writer_stats.channel_stat.mem_exceeded_block_ns
2116
0
                      << ", total queue_push_lock_ns=" << writer_stats.queue_push_lock_ns
2117
0
                      << ", total actual_consume_ns=" << writer_stats.actual_consume_ns
2118
0
                      << ", load id=" << print_id(_load_id) << ", txn_id=" << _txn_id;
2119
2120
0
            COUNTER_SET(_input_rows_counter, _number_input_rows);
2121
0
            COUNTER_SET(_output_rows_counter, _number_output_rows);
2122
0
            COUNTER_SET(_filtered_rows_counter,
2123
0
                        _block_convertor->num_filtered_rows() +
2124
0
                                _tablet_finder->num_filtered_rows() +
2125
0
                                _state->num_rows_filtered_in_strict_mode_partial_update());
2126
0
            COUNTER_SET(_send_data_timer, _send_data_ns);
2127
0
            COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time());
2128
0
            COUNTER_SET(_filter_timer, _filter_ns);
2129
0
            COUNTER_SET(_append_node_channel_timer,
2130
0
                        writer_stats.channel_stat.append_node_channel_ns);
2131
0
            COUNTER_SET(_where_clause_timer, writer_stats.channel_stat.where_clause_ns);
2132
0
            COUNTER_SET(_wait_mem_limit_timer, writer_stats.channel_stat.mem_exceeded_block_ns);
2133
0
            COUNTER_SET(_validate_data_timer, _block_convertor->validate_data_ns());
2134
0
            COUNTER_SET(_serialize_batch_timer, writer_stats.serialize_batch_ns);
2135
0
            COUNTER_SET(_non_blocking_send_work_timer, writer_stats.actual_consume_ns);
2136
0
            COUNTER_SET(_total_add_batch_exec_timer, writer_stats.total_add_batch_exec_time_ns);
2137
0
            COUNTER_SET(_max_add_batch_exec_timer, writer_stats.max_add_batch_exec_time_ns);
2138
0
            COUNTER_SET(_total_wait_exec_timer, writer_stats.total_wait_exec_time_ns);
2139
0
            COUNTER_SET(_max_wait_exec_timer, writer_stats.max_wait_exec_time_ns);
2140
0
            COUNTER_SET(_add_batch_number, writer_stats.total_add_batch_num);
2141
0
            COUNTER_SET(_num_node_channels, writer_stats.num_node_channels);
2142
0
            COUNTER_SET(_load_back_pressure_version_time_ms,
2143
0
                        writer_stats.load_back_pressure_version_time_ms);
2144
0
            g_sink_load_back_pressure_version_time_ms
2145
0
                    << writer_stats.load_back_pressure_version_time_ms;
2146
2147
            // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node
2148
0
            int64_t num_rows_load_total = _number_input_rows + _state->num_rows_load_filtered() +
2149
0
                                          _state->num_rows_load_unselected();
2150
0
            _state->set_num_rows_load_total(num_rows_load_total);
2151
0
            _state->update_num_rows_load_filtered(
2152
0
                    _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows() +
2153
0
                    _state->num_rows_filtered_in_strict_mode_partial_update());
2154
0
            _state->update_num_rows_load_unselected(
2155
0
                    _tablet_finder->num_immutable_partition_filtered_rows());
2156
2157
0
            if (_state->enable_profile() && _state->profile_level() >= 2) {
2158
                // Output detailed profiling info for auto-partition requests
2159
0
                _row_distribution.output_profile_info(_operator_profile);
2160
0
            }
2161
2162
            // print log of add batch time of all node, for tracing load performance easily
2163
0
            std::stringstream ss;
2164
0
            ss << "finished to close olap table sink. load_id=" << print_id(_load_id)
2165
0
               << ", txn_id=" << _txn_id
2166
0
               << ", node add batch time(ms)/wait execution time(ms)/close time(ms)/num: ";
2167
0
            for (auto const& pair : node_add_batch_counter_map) {
2168
0
                ss << "{" << pair.first << ":(" << (pair.second.add_batch_execution_time_us / 1000)
2169
0
                   << ")(" << (pair.second.add_batch_wait_execution_time_us / 1000) << ")("
2170
0
                   << pair.second.close_wait_time_ms << ")(" << pair.second.add_batch_num << ")} ";
2171
0
            }
2172
0
            LOG(INFO) << ss.str();
2173
0
        } else {
2174
0
            _cancel_all_channel(status);
2175
0
        }
2176
0
        _close_status = status;
2177
0
    }
2178
2179
    // Sender join() must put after node channels mark_close/cancel.
2180
    // But there is no specific sequence required between sender join() & close_wait().
2181
0
    if (_sender_thread) {
2182
0
        bthread_join(_sender_thread, nullptr);
2183
        // We have to wait all task in _send_batch_thread_pool_token finished,
2184
        // because it is difficult to handle concurrent problem if we just
2185
        // shutdown it.
2186
0
        _send_batch_thread_pool_token->wait();
2187
0
    }
2188
2189
    // We clear NodeChannels' batches here, cuz NodeChannels' batches destruction will use
2190
    // OlapTableSink::_mem_tracker and its parents.
2191
    // But their destructions are after OlapTableSink's.
2192
0
    for (const auto& index_channel : _channels) {
2193
0
        index_channel->for_each_node_channel(
2194
0
                [](const std::shared_ptr<VNodeChannel>& ch) { ch->clear_all_blocks(); });
2195
0
    }
2196
0
    return _close_status;
2197
0
}
2198
2199
Status VTabletWriter::_generate_one_index_channel_payload(
2200
        RowPartTabletIds& row_part_tablet_id, int32_t index_idx,
2201
0
        ChannelDistributionPayload& channel_payload) {
2202
0
    auto& row_ids = row_part_tablet_id.row_ids;
2203
0
    auto& partition_ids = row_part_tablet_id.partition_ids;
2204
0
    auto& tablet_ids = row_part_tablet_id.tablet_ids;
2205
2206
0
    size_t row_cnt = row_ids.size();
2207
2208
0
    for (size_t i = 0; i < row_ids.size(); i++) {
2209
0
        if (_tablet_finder->is_adaptive_random_bucket() && config::is_cloud_mode()) {
2210
0
            auto partition_it = _channels[index_idx]->_channels_by_partition.find(partition_ids[i]);
2211
0
            if (partition_it == _channels[index_idx]->_channels_by_partition.end()) {
2212
0
                return Status::InternalError(
2213
0
                        "unknown partition channel, load_id={}, index_id={}, partition_id={}",
2214
0
                        print_id(_load_id), _channels[index_idx]->_index_id, partition_ids[i]);
2215
0
            }
2216
0
            auto payload_it =
2217
0
                    channel_payload.find(partition_it->second.get()); // <VNodeChannel*, Payload>
2218
0
            if (payload_it == channel_payload.end()) {
2219
0
                auto [tmp_it, _] = channel_payload.emplace(
2220
0
                        partition_it->second.get(),
2221
0
                        Payload {std::make_unique<IColumn::Selector>(), &row_part_tablet_id,
2222
0
                                 std::vector<uint32_t>()});
2223
0
                payload_it = tmp_it;
2224
0
                payload_it->second.row_ids->reserve(row_cnt);
2225
0
                payload_it->second.route_idxs.reserve(row_cnt);
2226
0
            }
2227
0
            payload_it->second.row_ids->push_back(row_ids[i]);
2228
0
            payload_it->second.route_idxs.push_back(cast_set<uint32_t>(i));
2229
0
            continue;
2230
0
        }
2231
2232
        // (tablet_id, VNodeChannel) where this tablet locate
2233
0
        auto it = _channels[index_idx]->_channels_by_tablet.find(tablet_ids[i]);
2234
0
        if (it == _channels[index_idx]->_channels_by_tablet.end()) {
2235
0
            return Status::InternalError("unknown tablet, load_id={}, index_id={}, tablet_id={}",
2236
0
                                         print_id(_load_id), _channels[index_idx]->_index_id,
2237
0
                                         tablet_ids[i]);
2238
0
        }
2239
2240
0
        std::vector<std::shared_ptr<VNodeChannel>>& tablet_locations = it->second;
2241
0
        for (const auto& locate_node : tablet_locations) {
2242
0
            auto payload_it = channel_payload.find(locate_node.get()); // <VNodeChannel*, Payload>
2243
0
            if (payload_it == channel_payload.end()) {
2244
0
                auto [tmp_it, _] = channel_payload.emplace(
2245
0
                        locate_node.get(), Payload {std::make_unique<IColumn::Selector>(),
2246
0
                                                    &row_part_tablet_id, std::vector<uint32_t>()});
2247
0
                payload_it = tmp_it;
2248
0
                payload_it->second.row_ids->reserve(row_cnt);
2249
0
                payload_it->second.route_idxs.reserve(row_cnt);
2250
0
            }
2251
0
            payload_it->second.row_ids->push_back(row_ids[i]);
2252
0
            payload_it->second.route_idxs.push_back(cast_set<uint32_t>(i));
2253
0
        }
2254
0
    }
2255
0
    return Status::OK();
2256
0
}
2257
2258
Status VTabletWriter::_generate_index_channels_payloads(
2259
        std::vector<RowPartTabletIds>& row_part_tablet_ids,
2260
0
        ChannelDistributionPayloadVec& payload) {
2261
0
    for (int i = 0; i < _schema->indexes().size(); i++) {
2262
0
        RETURN_IF_ERROR(_generate_one_index_channel_payload(row_part_tablet_ids[i], i, payload[i]));
2263
0
    }
2264
0
    return Status::OK();
2265
0
}
2266
2267
0
Status VTabletWriter::write(RuntimeState* state, doris::Block& input_block) {
2268
0
    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
2269
0
    Status status = Status::OK();
2270
2271
0
    DCHECK(_state);
2272
0
    DCHECK(_state->query_options().__isset.dry_run_query);
2273
0
    if (_state->query_options().dry_run_query) {
2274
0
        return status;
2275
0
    }
2276
2277
    // check out of limit
2278
0
    RETURN_IF_ERROR(_send_new_partition_batch());
2279
2280
0
    const bool is_replaying_batched_block = _row_distribution._deal_batched;
2281
0
    auto rows = input_block.rows();
2282
0
    auto bytes = input_block.bytes();
2283
0
    if (UNLIKELY(rows == 0)) {
2284
0
        return status;
2285
0
    }
2286
0
    SCOPED_TIMER(_operator_profile->total_time_counter());
2287
0
    SCOPED_RAW_TIMER(&_send_data_ns);
2288
2289
0
    std::shared_ptr<Block> block;
2290
0
    _number_input_rows += rows;
2291
    // update incrementally so that FE can get the progress.
2292
    // the real 'num_rows_load_total' will be set when sink being closed.
2293
0
    _state->update_num_rows_load_total(rows);
2294
0
    _state->update_num_bytes_load_total(bytes);
2295
0
    if (!is_replaying_batched_block) {
2296
0
        DorisMetrics::instance()->load_rows->increment(rows);
2297
0
        DorisMetrics::instance()->load_bytes->increment(bytes);
2298
0
    }
2299
2300
0
    _row_distribution_watch.start();
2301
0
    RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
2302
0
            input_block, block, _row_part_tablet_ids, _number_input_rows));
2303
2304
0
    ChannelDistributionPayloadVec channel_to_payload;
2305
2306
0
    channel_to_payload.resize(_channels.size());
2307
0
    Status generate_payload_status =
2308
0
            _generate_index_channels_payloads(_row_part_tablet_ids, channel_to_payload);
2309
0
    _row_distribution_watch.stop();
2310
0
    RETURN_IF_ERROR(generate_payload_status);
2311
2312
    // Add block to node channel
2313
0
    for (size_t i = 0; i < _channels.size(); i++) {
2314
0
        for (const auto& entry : channel_to_payload[i]) {
2315
            // if this node channel is already failed, this add_row will be skipped
2316
            // entry.second is a [row -> tablet] mapping
2317
0
            auto st = entry.first->add_block(block.get(), &entry.second);
2318
0
            if (!st.ok()) {
2319
0
                _channels[i]->mark_as_failed(entry.first, st.to_string());
2320
0
            }
2321
0
        }
2322
0
    }
2323
2324
    // check intolerable failure
2325
0
    for (const auto& index_channel : _channels) {
2326
0
        RETURN_IF_ERROR(index_channel->check_intolerable_failure());
2327
0
    }
2328
2329
0
    g_sink_write_bytes << bytes;
2330
0
    g_sink_write_rows << rows;
2331
0
    return Status::OK();
2332
0
}
2333
2334
} // namespace doris