Coverage Report

Created: 2026-03-16 05:10

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/writer/vtablet_writer_v2.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/writer/vtablet_writer_v2.h"
19
20
#include <brpc/uri.h>
21
#include <gen_cpp/DataSinks_types.h>
22
#include <gen_cpp/Descriptors_types.h>
23
#include <gen_cpp/Metrics_types.h>
24
#include <gen_cpp/Types_types.h>
25
#include <gen_cpp/internal_service.pb.h>
26
27
#include <cstdint>
28
#include <mutex>
29
#include <ranges>
30
#include <string>
31
#include <unordered_map>
32
33
#include "common/compiler_util.h" // IWYU pragma: keep
34
#include "common/logging.h"
35
#include "common/metrics/doris_metrics.h"
36
#include "common/object_pool.h"
37
#include "common/signal_handler.h"
38
#include "common/status.h"
39
#include "core/block/block.h"
40
#include "exec/sink/delta_writer_v2_pool.h"
41
#include "load/delta_writer/delta_writer_v2.h"
42
#include "runtime/descriptors.h"
43
#include "runtime/exec_env.h"
44
#include "runtime/runtime_profile.h"
45
#include "runtime/runtime_state.h"
46
#include "runtime/thread_context.h"
47
#include "storage/tablet_info.h"
48
#include "util/debug_points.h"
49
#include "util/defer_op.h"
50
#include "util/uid_util.h"
51
// NOLINTNEXTLINE(unused-includes)
52
#include "exec/sink/load_stream_map_pool.h"
53
#include "exec/sink/load_stream_stub.h" // IWYU pragma: keep
54
#include "exec/sink/vtablet_block_convertor.h"
55
#include "exec/sink/vtablet_finder.h"
56
57
namespace doris {
58
#include "common/compile_check_begin.h"
59
60
extern bvar::Adder<int64_t> g_sink_load_back_pressure_version_time_ms;
61
62
VTabletWriterV2::VTabletWriterV2(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs,
63
                                 std::shared_ptr<Dependency> dep,
64
                                 std::shared_ptr<Dependency> fin_dep)
65
13
        : AsyncResultWriter(output_exprs, dep, fin_dep), _t_sink(t_sink) {
66
13
    DCHECK(t_sink.__isset.olap_table_sink);
67
13
}
68
69
13
VTabletWriterV2::~VTabletWriterV2() = default;
70
71
0
Status VTabletWriterV2::on_partitions_created(TCreatePartitionResult* result) {
72
    // add new tablet locations. it will use by address. so add to pool
73
0
    auto* new_locations = _pool->add(new std::vector<TTabletLocation>(result->tablets));
74
0
    _location->add_locations(*new_locations);
75
76
    // update new node info
77
0
    _nodes_info->add_nodes(result->nodes);
78
79
    // incremental open stream
80
0
    RETURN_IF_ERROR(_incremental_open_streams(result->partitions));
81
82
0
    return Status::OK();
83
0
}
84
85
0
static Status on_partitions_created(void* writer, TCreatePartitionResult* result) {
86
0
    return static_cast<VTabletWriterV2*>(writer)->on_partitions_created(result);
87
0
}
88
89
Status VTabletWriterV2::_incremental_open_streams(
90
0
        const std::vector<TOlapTablePartition>& partitions) {
91
    // do what we did in prepare() for partitions. indexes which don't change when we create new partition is orthogonal to partitions.
92
0
    std::unordered_set<int64_t> known_indexes;
93
0
    std::unordered_set<int64_t> new_backends;
94
0
    for (const auto& t_partition : partitions) {
95
0
        VOlapTablePartition* partition = nullptr;
96
0
        RETURN_IF_ERROR(_vpartition->generate_partition_from(t_partition, partition));
97
0
        for (const auto& index : partition->indexes) {
98
0
            for (const auto& tablet_id : index.tablets) {
99
0
                auto nodes = _location->find_tablet(tablet_id)->node_ids;
100
0
                for (auto& node : nodes) {
101
0
                    PTabletID tablet;
102
0
                    tablet.set_partition_id(partition->id);
103
0
                    tablet.set_index_id(index.index_id);
104
0
                    tablet.set_tablet_id(tablet_id);
105
0
                    new_backends.insert(node);
106
0
                    _tablets_for_node[node].emplace(tablet_id, tablet);
107
0
                    _tablets_by_node[node].emplace(tablet_id);
108
0
                    if (known_indexes.contains(index.index_id)) [[likely]] {
109
0
                        continue;
110
0
                    }
111
0
                    _indexes_from_node[node].emplace_back(tablet);
112
0
                    known_indexes.insert(index.index_id);
113
0
                    VLOG_DEBUG << "incremental open stream (" << partition->id << ", " << tablet_id
114
0
                               << ")";
115
0
                }
116
0
                _build_tablet_replica_info(tablet_id, partition);
117
0
            }
118
0
        }
119
0
    }
120
0
    for (int64_t dst_id : new_backends) {
121
0
        auto streams = _load_stream_map->get_or_create(dst_id, true);
122
0
        RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
123
0
    }
124
0
    return Status::OK();
125
0
}
126
127
0
Status VTabletWriterV2::_init_row_distribution() {
128
0
    _row_distribution.init({.state = _state,
129
0
                            .block_convertor = _block_convertor.get(),
130
0
                            .tablet_finder = _tablet_finder.get(),
131
0
                            .vpartition = _vpartition,
132
0
                            .add_partition_request_timer = _add_partition_request_timer,
133
0
                            .txn_id = _txn_id,
134
0
                            .pool = _pool,
135
0
                            .location = _location,
136
0
                            .vec_output_expr_ctxs = &_vec_output_expr_ctxs,
137
0
                            .schema = _schema,
138
0
                            .caller = (void*)this,
139
0
                            .create_partition_callback = &::doris::on_partitions_created});
140
141
0
    return _row_distribution.open(_output_row_desc);
142
0
}
143
144
0
Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) {
145
0
    _pool = state->obj_pool();
146
0
    auto& table_sink = _t_sink.olap_table_sink;
147
0
    _load_id.set_hi(table_sink.load_id.hi);
148
0
    _load_id.set_lo(table_sink.load_id.lo);
149
0
    signal::set_signal_task_id(_load_id);
150
0
    _txn_id = table_sink.txn_id;
151
0
    _num_replicas = table_sink.num_replicas;
152
0
    _tuple_desc_id = table_sink.tuple_id;
153
0
    _write_file_cache = table_sink.write_file_cache;
154
0
    _schema.reset(new OlapTableSchemaParam());
155
0
    RETURN_IF_ERROR(_schema->init(table_sink.schema));
156
0
    _schema->set_timestamp_ms(state->timestamp_ms());
157
0
    _schema->set_nano_seconds(state->nano_seconds());
158
0
    _schema->set_timezone(state->timezone());
159
0
    _location = _pool->add(new OlapTableLocationParam(table_sink.location));
160
0
    _nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info));
161
162
    // if distributed column list is empty, we can ensure that tablet is with random distribution info
163
    // and if load_to_single_tablet is set and set to true, we should find only one tablet in one partition
164
    // for the whole olap table sink
165
0
    auto find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW;
166
0
    if (table_sink.partition.distributed_columns.empty()) {
167
0
        if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) {
168
0
            find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_SINK;
169
0
        } else {
170
0
            find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_BATCH;
171
0
        }
172
0
    }
173
0
    _vpartition = _pool->add(new doris::VOlapTablePartitionParam(_schema, table_sink.partition));
174
0
    _tablet_finder = std::make_unique<OlapTabletFinder>(_vpartition, find_tablet_mode);
175
0
    RETURN_IF_ERROR(_vpartition->init());
176
177
0
    _state = state;
178
0
    _operator_profile = profile;
179
180
0
    _sender_id = state->per_fragment_instance_idx();
181
0
    _num_senders = state->num_per_fragment_instances();
182
0
    _backend_id = state->backend_id();
183
0
    _stream_per_node = state->load_stream_per_node();
184
0
    _total_streams = state->total_load_streams();
185
0
    _num_local_sink = state->num_local_sink();
186
0
    LOG(INFO) << "init olap tablet sink, load_id: " << print_id(_load_id)
187
0
              << ", num senders: " << _num_senders << ", stream per node: " << _stream_per_node
188
0
              << ", total_streams " << _total_streams << ", num_local_sink: " << _num_local_sink;
189
0
    DCHECK(_stream_per_node > 0) << "load stream per node should be greator than 0";
190
0
    DCHECK(_total_streams > 0) << "total load streams should be greator than 0";
191
0
    DCHECK(_num_local_sink > 0) << "num local sink should be greator than 0";
192
0
    _is_high_priority =
193
0
            (state->execution_timeout() <= config::load_task_high_priority_threshold_second);
194
0
    DBUG_EXECUTE_IF("VTabletWriterV2._init.is_high_priority", { _is_high_priority = true; });
195
0
    _mem_tracker =
196
0
            std::make_shared<MemTracker>("VTabletWriterV2:" + std::to_string(state->load_job_id()));
197
0
    SCOPED_TIMER(_operator_profile->total_time_counter());
198
0
    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
199
200
    // get table's tuple descriptor
201
0
    _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_desc_id);
202
0
    DBUG_EXECUTE_IF("VTabletWriterV2._init._output_tuple_desc_null",
203
0
                    { _output_tuple_desc = nullptr; });
204
0
    if (_output_tuple_desc == nullptr) {
205
0
        return Status::InternalError("unknown destination tuple descriptor, id = {}",
206
0
                                     _tuple_desc_id);
207
0
    }
208
0
    auto output_tuple_desc_slots_size = _output_tuple_desc->slots().size();
209
0
    DBUG_EXECUTE_IF("VTabletWriterV2._init._vec_output_expr_ctxs_not_equal_output_tuple_slot",
210
0
                    { output_tuple_desc_slots_size++; });
211
0
    if (!_vec_output_expr_ctxs.empty() &&
212
0
        _vec_output_expr_ctxs.size() != output_tuple_desc_slots_size) {
213
0
        LOG(WARNING) << "output tuple slot num should be equal to num of output exprs, "
214
0
                     << "output_tuple_slot_num " << _output_tuple_desc->slots().size()
215
0
                     << " output_expr_num " << _vec_output_expr_ctxs.size();
216
0
        return Status::InvalidArgument(
217
0
                "output_tuple_slot_num {} should be equal to output_expr_num {}",
218
0
                _output_tuple_desc->slots().size(), _vec_output_expr_ctxs.size());
219
0
    }
220
221
0
    _block_convertor = std::make_unique<OlapTableBlockConvertor>(_output_tuple_desc);
222
    // if partition_type is OLAP_TABLE_SINK_HASH_PARTITIONED, we handle the processing of auto_increment column
223
    // on exchange node rather than on TabletWriter
224
0
    _block_convertor->init_autoinc_info(
225
0
            _schema->db_id(), _schema->table_id(), _state->batch_size(),
226
0
            _schema->is_fixed_partial_update() && !_schema->auto_increment_coulumn().empty(),
227
0
            _schema->auto_increment_column_unique_id());
228
0
    _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc));
229
230
    // add all counter
231
0
    _input_rows_counter = ADD_COUNTER(_operator_profile, "RowsRead", TUnit::UNIT);
232
0
    _output_rows_counter = ADD_COUNTER(_operator_profile, "RowsProduced", TUnit::UNIT);
233
0
    _filtered_rows_counter = ADD_COUNTER(_operator_profile, "RowsFiltered", TUnit::UNIT);
234
0
    _send_data_timer = ADD_TIMER_WITH_LEVEL(_operator_profile, "SendDataTime", 1);
235
0
    _wait_mem_limit_timer =
236
0
            ADD_CHILD_TIMER_WITH_LEVEL(_operator_profile, "WaitMemLimitTime", "SendDataTime", 1);
237
0
    _row_distribution_timer =
238
0
            ADD_CHILD_TIMER_WITH_LEVEL(_operator_profile, "RowDistributionTime", "SendDataTime", 1);
239
0
    _write_memtable_timer =
240
0
            ADD_CHILD_TIMER_WITH_LEVEL(_operator_profile, "WriteMemTableTime", "SendDataTime", 1);
241
0
    _add_partition_request_timer = ADD_CHILD_TIMER_WITH_LEVEL(
242
0
            _operator_profile, "AddPartitionRequestTime", "SendDataTime", 1);
243
0
    _validate_data_timer = ADD_TIMER_WITH_LEVEL(_operator_profile, "ValidateDataTime", 1);
244
0
    _open_timer = ADD_TIMER(_operator_profile, "OpenTime");
245
0
    _close_timer = ADD_TIMER(_operator_profile, "CloseWaitTime");
246
0
    _close_writer_timer = ADD_CHILD_TIMER(_operator_profile, "CloseWriterTime", "CloseWaitTime");
247
0
    _close_load_timer = ADD_CHILD_TIMER(_operator_profile, "CloseLoadTime", "CloseWaitTime");
248
0
    _load_back_pressure_version_time_ms =
249
0
            ADD_TIMER(_operator_profile, "LoadBackPressureVersionTimeMs");
250
251
0
    if (config::share_delta_writers) {
252
0
        _delta_writer_for_tablet = ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create(
253
0
                _load_id, _num_local_sink);
254
0
    } else {
255
0
        _delta_writer_for_tablet = std::make_shared<DeltaWriterV2Map>(_load_id);
256
0
    }
257
0
    _load_stream_map = ExecEnv::GetInstance()->load_stream_map_pool()->get_or_create(
258
0
            _load_id, _backend_id, _stream_per_node, _num_local_sink);
259
0
    return Status::OK();
260
0
}
261
262
0
Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) {
263
0
    RETURN_IF_ERROR(_init(state, profile));
264
0
    LOG(INFO) << "opening olap table sink, load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id
265
0
              << ", sink_id=" << _sender_id;
266
0
    _timeout_watch.start();
267
0
    SCOPED_TIMER(_operator_profile->total_time_counter());
268
0
    SCOPED_TIMER(_open_timer);
269
0
    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
270
271
0
    RETURN_IF_ERROR(_build_tablet_node_mapping());
272
0
    RETURN_IF_ERROR(_open_streams());
273
0
    RETURN_IF_ERROR(_init_row_distribution());
274
275
0
    return Status::OK();
276
0
}
277
278
0
Status VTabletWriterV2::_open_streams() {
279
0
    int fault_injection_skip_be = 0;
280
0
    bool any_backend = false;
281
0
    bool any_success = false;
282
0
    for (auto& [dst_id, _] : _tablets_for_node) {
283
0
        auto streams = _load_stream_map->get_or_create(dst_id);
284
0
        DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_one_backend", {
285
0
            if (fault_injection_skip_be < 1) {
286
0
                fault_injection_skip_be++;
287
0
                continue;
288
0
            }
289
0
        });
290
0
        DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_two_backends", {
291
0
            if (fault_injection_skip_be < 2) {
292
0
                fault_injection_skip_be++;
293
0
                continue;
294
0
            }
295
0
        });
296
0
        auto st = _open_streams_to_backend(dst_id, *streams);
297
0
        any_backend = true;
298
0
        any_success = any_success || st.ok();
299
0
    }
300
0
    if (any_backend && !any_success) {
301
0
        return Status::InternalError("failed to open streams to any BE");
302
0
    }
303
0
    return Status::OK();
304
0
}
305
306
0
Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, LoadStreamStubs& streams) {
307
0
    const auto* node_info = _nodes_info->find_node(dst_id);
308
0
    DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.node_info_null",
309
0
                    { node_info = nullptr; });
310
0
    if (node_info == nullptr) {
311
0
        return Status::InternalError("Unknown node {} in tablet location", dst_id);
312
0
    }
313
0
    auto idle_timeout_ms = _state->execution_timeout() * 1000;
314
0
    std::vector<PTabletID>& tablets_for_schema = _indexes_from_node[node_info->id];
315
0
    DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.no_schema_when_open_streams",
316
0
                    { tablets_for_schema.clear(); });
317
0
    auto st = streams.open(_state->exec_env()->brpc_streaming_client_cache(), *node_info, _txn_id,
318
0
                           *_schema, tablets_for_schema, _total_streams, idle_timeout_ms,
319
0
                           _state->enable_profile());
320
0
    if (!st.ok()) {
321
0
        LOG(WARNING) << "failed to open stream to backend " << dst_id
322
0
                     << ", load_id=" << print_id(_load_id) << ", err=" << st;
323
0
    }
324
0
    return st;
325
0
}
326
327
0
Status VTabletWriterV2::_build_tablet_node_mapping() {
328
0
    std::unordered_set<int64_t> known_indexes;
329
0
    for (const auto& partition : _vpartition->get_partitions()) {
330
0
        for (const auto& index : partition->indexes) {
331
0
            for (const auto& tablet_id : index.tablets) {
332
0
                auto* tablet_location = _location->find_tablet(tablet_id);
333
0
                DBUG_EXECUTE_IF("VTabletWriterV2._build_tablet_node_mapping.tablet_location_null",
334
0
                                { tablet_location = nullptr; });
335
0
                if (tablet_location == nullptr) {
336
0
                    return Status::InternalError("unknown tablet location, tablet id = {}",
337
0
                                                 tablet_id);
338
0
                }
339
0
                for (auto& node : tablet_location->node_ids) {
340
0
                    PTabletID tablet;
341
0
                    tablet.set_partition_id(partition->id);
342
0
                    tablet.set_index_id(index.index_id);
343
0
                    tablet.set_tablet_id(tablet_id);
344
0
                    _tablets_for_node[node].emplace(tablet_id, tablet);
345
0
                    constexpr int64_t DUMMY_TABLET_ID = 0;
346
0
                    if (tablet_id == DUMMY_TABLET_ID) [[unlikely]] {
347
                        // ignore fake tablet for auto partition
348
0
                        continue;
349
0
                    }
350
0
                    _tablets_by_node[node].emplace(tablet_id);
351
0
                    if (known_indexes.contains(index.index_id)) [[likely]] {
352
0
                        continue;
353
0
                    }
354
0
                    _indexes_from_node[node].emplace_back(tablet);
355
0
                    known_indexes.insert(index.index_id);
356
0
                }
357
0
                _build_tablet_replica_info(tablet_id, partition);
358
0
            }
359
0
        }
360
0
    }
361
0
    return Status::OK();
362
0
}
363
364
void VTabletWriterV2::_build_tablet_replica_info(const int64_t tablet_id,
365
0
                                                 VOlapTablePartition* partition) {
366
0
    if (partition != nullptr) {
367
0
        int total_replicas_num =
368
0
                partition->total_replica_num == 0 ? _num_replicas : partition->total_replica_num;
369
0
        int load_required_replicas_num = partition->load_required_replica_num == 0
370
0
                                                 ? (_num_replicas + 1) / 2
371
0
                                                 : partition->load_required_replica_num;
372
0
        _tablet_replica_info[tablet_id] =
373
0
                std::make_pair(total_replicas_num, load_required_replicas_num);
374
0
    } else {
375
0
        _tablet_replica_info[tablet_id] = std::make_pair(_num_replicas, (_num_replicas + 1) / 2);
376
0
    }
377
0
}
378
379
void VTabletWriterV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>& row_part_tablet_ids,
380
0
                                                RowsForTablet& rows_for_tablet) {
381
0
    for (int index_idx = 0; index_idx < row_part_tablet_ids.size(); index_idx++) {
382
0
        auto& row_ids = row_part_tablet_ids[index_idx].row_ids;
383
0
        auto& partition_ids = row_part_tablet_ids[index_idx].partition_ids;
384
0
        auto& tablet_ids = row_part_tablet_ids[index_idx].tablet_ids;
385
386
0
        for (size_t i = 0; i < row_ids.size(); i++) {
387
0
            auto& tablet_id = tablet_ids[i];
388
0
            auto it = rows_for_tablet.find(tablet_id);
389
0
            if (it == rows_for_tablet.end()) {
390
0
                Rows rows;
391
0
                rows.partition_id = partition_ids[i];
392
0
                rows.index_id = _schema->indexes()[index_idx]->index_id;
393
0
                rows.row_idxes.reserve(row_ids.size());
394
0
                auto [tmp_it, _] = rows_for_tablet.insert({tablet_id, rows});
395
0
                it = tmp_it;
396
0
            }
397
0
            it->second.row_idxes.push_back(row_ids[i]);
398
0
            _number_output_rows++;
399
0
        }
400
0
    }
401
0
}
402
403
Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id,
404
0
                                        std::vector<std::shared_ptr<LoadStreamStub>>& streams) {
405
0
    std::vector<int64_t> failed_node_ids;
406
0
    const auto* location = _location->find_tablet(tablet_id);
407
0
    DBUG_EXECUTE_IF("VTabletWriterV2._select_streams.location_null", { location = nullptr; });
408
0
    if (location == nullptr) {
409
0
        return Status::InternalError("unknown tablet location, tablet id = {}", tablet_id);
410
0
    }
411
0
    for (const auto& node_id : location->node_ids) {
412
0
        PTabletID tablet;
413
0
        tablet.set_partition_id(partition_id);
414
0
        tablet.set_index_id(index_id);
415
0
        tablet.set_tablet_id(tablet_id);
416
0
        VLOG_DEBUG << fmt::format("_select_streams P{} I{} T{}", partition_id, index_id, tablet_id);
417
0
        _tablets_for_node[node_id].emplace(tablet_id, tablet);
418
0
        auto stream = _load_stream_map->at(node_id)->select_one_stream();
419
0
        DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_two_backends", {
420
0
            LOG(INFO) << "[skip_two_backends](detail) tablet_id=" << tablet_id
421
0
                      << ", node_id=" << node_id
422
0
                      << ", stream_ok=" << (stream == nullptr ? "no" : "yes");
423
0
        });
424
0
        if (stream == nullptr) {
425
0
            LOG(WARNING) << "skip writing tablet " << tablet_id << " to backend " << node_id
426
0
                         << ": stream is not open";
427
0
            failed_node_ids.push_back(node_id);
428
0
            continue;
429
0
        }
430
0
        streams.emplace_back(std::move(stream));
431
0
    }
432
0
    DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_two_backends", {
433
0
        LOG(INFO) << "[skip_two_backends](summary) tablet_id=" << tablet_id
434
0
                  << ", num_streams=" << streams.size()
435
0
                  << ", num_nodes=" << location->node_ids.size();
436
0
    });
437
0
    if (streams.size() <= location->node_ids.size() / 2) {
438
0
        std::ostringstream success_msg;
439
0
        std::ostringstream failed_msg;
440
0
        for (auto& s : streams) {
441
0
            success_msg << ", " << s->dst_id();
442
0
        }
443
0
        for (auto id : failed_node_ids) {
444
0
            failed_msg << ", " << id;
445
0
        }
446
0
        LOG(INFO) << "failed to write enough replicas " << streams.size() << "/"
447
0
                  << location->node_ids.size() << " for tablet " << tablet_id
448
0
                  << " due to connection errors; success nodes" << success_msg.str()
449
0
                  << "; failed nodes" << failed_msg.str() << ".";
450
0
        return Status::InternalError(
451
0
                "failed to write enough replicas {}/{} for tablet {} due to connection errors",
452
0
                streams.size(), location->node_ids.size(), tablet_id);
453
0
    }
454
0
    Status st;
455
0
    for (auto& stream : streams) {
456
0
        st = stream->wait_for_schema(partition_id, index_id, tablet_id);
457
0
        if (st.ok()) {
458
0
            break;
459
0
        } else {
460
0
            LOG(WARNING) << "failed to get schema from stream " << stream << ", err=" << st;
461
0
        }
462
0
    }
463
0
    return st;
464
0
}
465
466
0
Status VTabletWriterV2::write(RuntimeState* state, Block& input_block) {
467
0
    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
468
0
    Status status = Status::OK();
469
470
0
    if (_state->query_options().dry_run_query) {
471
0
        return status;
472
0
    }
473
474
0
    int64_t total_wait_time_ms = 0;
475
0
    auto streams_for_node = _load_stream_map->get_streams_for_node();
476
0
    for (const auto& [dst_id, streams] : streams_for_node) {
477
0
        for (const auto& stream : streams->streams()) {
478
0
            auto wait_time_ms = stream->get_and_reset_load_back_pressure_version_wait_time_ms();
479
0
            if (wait_time_ms > 0) {
480
0
                total_wait_time_ms = std::max(total_wait_time_ms, wait_time_ms);
481
0
            }
482
0
        }
483
0
    }
484
0
    if (UNLIKELY(total_wait_time_ms > 0)) {
485
0
        std::this_thread::sleep_for(std::chrono::milliseconds(total_wait_time_ms));
486
0
        _load_back_pressure_version_block_ms.fetch_add(total_wait_time_ms);
487
0
    }
488
489
    // check out of limit
490
0
    RETURN_IF_ERROR(_send_new_partition_batch());
491
492
0
    auto input_rows = input_block.rows();
493
0
    auto input_bytes = input_block.bytes();
494
0
    if (UNLIKELY(input_rows == 0)) {
495
0
        return status;
496
0
    }
497
0
    SCOPED_TIMER(_operator_profile->total_time_counter());
498
0
    _number_input_rows += input_rows;
499
    // update incrementally so that FE can get the progress.
500
    // the real 'num_rows_load_total' will be set when sink being closed.
501
0
    _state->update_num_rows_load_total(input_rows);
502
0
    _state->update_num_bytes_load_total(input_bytes);
503
0
    DorisMetrics::instance()->load_rows->increment(input_rows);
504
0
    DorisMetrics::instance()->load_bytes->increment(input_bytes);
505
506
0
    SCOPED_RAW_TIMER(&_send_data_ns);
507
    // This is just for passing compilation.
508
0
    _row_distribution_watch.start();
509
510
0
    std::shared_ptr<Block> block;
511
0
    RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
512
0
            input_block, block, _row_part_tablet_ids, _number_input_rows));
513
0
    RowsForTablet rows_for_tablet;
514
0
    _generate_rows_for_tablet(_row_part_tablet_ids, rows_for_tablet);
515
516
0
    _row_distribution_watch.stop();
517
518
    // For each tablet, send its input_rows from block to delta writer
519
0
    for (const auto& [tablet_id, rows] : rows_for_tablet) {
520
0
        RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows));
521
0
    }
522
523
0
    COUNTER_SET(_input_rows_counter, _number_input_rows);
524
0
    COUNTER_SET(_output_rows_counter, _number_output_rows);
525
0
    COUNTER_SET(_filtered_rows_counter,
526
0
                _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows());
527
0
    COUNTER_SET(_send_data_timer, _send_data_ns);
528
0
    COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time());
529
0
    COUNTER_SET(_validate_data_timer, _block_convertor->validate_data_ns());
530
531
0
    return Status::OK();
532
0
}
533
534
Status VTabletWriterV2::_write_memtable(std::shared_ptr<Block> block, int64_t tablet_id,
535
0
                                        const Rows& rows) {
536
0
    auto st = Status::OK();
537
0
    auto delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id, [&]() {
538
0
        std::vector<std::shared_ptr<LoadStreamStub>> streams;
539
0
        st = _select_streams(tablet_id, rows.partition_id, rows.index_id, streams);
540
0
        if (!st.ok()) [[unlikely]] {
541
0
            LOG(WARNING) << "select stream failed, " << st << ", load_id=" << print_id(_load_id);
542
0
            return std::unique_ptr<DeltaWriterV2>(nullptr);
543
0
        }
544
0
        WriteRequest req {
545
0
                .tablet_id = tablet_id,
546
0
                .txn_id = _txn_id,
547
0
                .index_id = rows.index_id,
548
0
                .partition_id = rows.partition_id,
549
0
                .load_id = _load_id,
550
0
                .tuple_desc = _schema->tuple_desc(),
551
0
                .table_schema_param = _schema,
552
0
                .is_high_priority = _is_high_priority,
553
0
                .write_file_cache = _write_file_cache,
554
0
                .storage_vault_id {},
555
0
        };
556
0
        bool index_not_found = true;
557
0
        for (const auto& index : _schema->indexes()) {
558
0
            if (index->index_id == rows.index_id) {
559
0
                req.slots = &index->slots;
560
0
                req.schema_hash = index->schema_hash;
561
0
                index_not_found = false;
562
0
                break;
563
0
            }
564
0
        }
565
0
        DBUG_EXECUTE_IF("VTabletWriterV2._write_memtable.index_not_found",
566
0
                        { index_not_found = true; });
567
0
        if (index_not_found) [[unlikely]] {
568
0
            st = Status::InternalError("no index {} in schema", rows.index_id);
569
0
            LOG(WARNING) << "index " << rows.index_id
570
0
                         << " not found in schema, load_id=" << print_id(_load_id);
571
0
            return std::unique_ptr<DeltaWriterV2>(nullptr);
572
0
        }
573
0
        return DeltaWriterV2::create_unique(&req, streams, _state);
574
0
    });
575
0
    if (delta_writer == nullptr) {
576
0
        LOG(WARNING) << "failed to open DeltaWriter for tablet " << tablet_id
577
0
                     << ", load_id=" << print_id(_load_id) << ", err: " << st;
578
0
        return Status::InternalError("failed to open DeltaWriter {}: {}", tablet_id, st.msg());
579
0
    }
580
0
    {
581
0
        SCOPED_TIMER(_wait_mem_limit_timer);
582
0
        ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush(
583
0
                [state = _state]() { return state->is_cancelled(); });
584
0
        if (_state->is_cancelled()) {
585
0
            return _state->cancel_reason();
586
0
        }
587
0
    }
588
0
    SCOPED_TIMER(_write_memtable_timer);
589
0
    st = delta_writer->write(block.get(), rows.row_idxes);
590
0
    return st;
591
0
}
592
593
0
void VTabletWriterV2::_cancel(Status status) {
594
0
    LOG(INFO) << "canceled olap table sink. load_id=" << print_id(_load_id)
595
0
              << ", txn_id=" << _txn_id << ", sink_id=" << _sender_id
596
0
              << ", due to error: " << status;
597
0
    if (_delta_writer_for_tablet) {
598
0
        _delta_writer_for_tablet->cancel(status);
599
0
        _delta_writer_for_tablet.reset();
600
0
    }
601
0
    if (_load_stream_map) {
602
0
        _load_stream_map->for_each(
603
0
                [status](int64_t dst_id, LoadStreamStubs& streams) { streams.cancel(status); });
604
0
        _load_stream_map->release();
605
0
    }
606
0
}
607
608
0
Status VTabletWriterV2::_send_new_partition_batch() {
609
0
    if (_row_distribution.need_deal_batching()) { // maybe try_close more than 1 time
610
0
        RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
611
612
0
        Block tmp_block = _row_distribution._batching_block->to_block(); // Borrow out, for lval ref
613
614
        // these order is unique.
615
        //  1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block.
616
        //  2. deal batched block
617
        //  3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that.
618
0
        _row_distribution.clear_batching_stats();
619
0
        RETURN_IF_ERROR(this->write(_state, tmp_block));
620
0
        _row_distribution._batching_block->set_mutable_columns(
621
0
                tmp_block.mutate_columns()); // Recovery back
622
0
        _row_distribution._batching_block->clear_column_data();
623
0
        _row_distribution._deal_batched = false;
624
0
    }
625
0
    return Status::OK();
626
0
}
627
628
0
Status VTabletWriterV2::close(Status exec_status) {
629
0
    std::lock_guard<std::mutex> close_lock(_close_mutex);
630
0
    if (_is_closed) {
631
0
        return _close_status;
632
0
    }
633
0
    LOG(INFO) << "closing olap table sink, load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id
634
0
              << ", sink_id=" << _sender_id << ", status=" << exec_status.to_string();
635
0
    SCOPED_TIMER(_close_timer);
636
0
    Status status = exec_status;
637
638
0
    if (status.ok()) {
639
0
        SCOPED_TIMER(_operator_profile->total_time_counter());
640
0
        _row_distribution._deal_batched = true;
641
0
        status = _send_new_partition_batch();
642
0
    }
643
644
0
    DBUG_EXECUTE_IF("VTabletWriterV2.close.sleep", {
645
0
        auto sleep_sec = DebugPoints::instance()->get_debug_param_or_default<int32_t>(
646
0
                "VTabletWriterV2.close.sleep", "sleep_sec", 1);
647
0
        std::this_thread::sleep_for(std::chrono::seconds(sleep_sec));
648
0
    });
649
0
    DBUG_EXECUTE_IF("VTabletWriterV2.close.cancel",
650
0
                    { status = Status::InternalError("load cancel"); });
651
0
    if (status.ok()) {
652
        // only if status is ok can we call this _profile->total_time_counter().
653
        // if status is not ok, this sink may not be prepared, so that _profile is null
654
0
        SCOPED_TIMER(_operator_profile->total_time_counter());
655
656
0
        COUNTER_SET(_input_rows_counter, _number_input_rows);
657
0
        COUNTER_SET(_output_rows_counter, _number_output_rows);
658
0
        COUNTER_SET(_filtered_rows_counter,
659
0
                    _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows());
660
0
        COUNTER_SET(_send_data_timer, _send_data_ns);
661
0
        COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time());
662
0
        COUNTER_SET(_validate_data_timer, _block_convertor->validate_data_ns());
663
0
        auto back_pressure_time_ms = _load_back_pressure_version_block_ms.load();
664
0
        COUNTER_SET(_load_back_pressure_version_time_ms, back_pressure_time_ms);
665
0
        g_sink_load_back_pressure_version_time_ms << back_pressure_time_ms;
666
667
        // close DeltaWriters
668
0
        {
669
0
            std::unordered_map<int64_t, int32_t> segments_for_tablet;
670
0
            SCOPED_TIMER(_close_writer_timer);
671
            // close all delta writers if this is the last user
672
0
            auto st = _delta_writer_for_tablet->close(segments_for_tablet, _operator_profile);
673
0
            _delta_writer_for_tablet.reset();
674
0
            if (!st.ok()) {
675
0
                _cancel(st);
676
0
                return st;
677
0
            }
678
            // only the last sink closing delta writers will have segment num
679
0
            if (!segments_for_tablet.empty()) {
680
0
                _load_stream_map->save_segments_for_tablet(segments_for_tablet);
681
0
            }
682
0
        }
683
684
0
        _calc_tablets_to_commit();
685
0
        const bool is_last_sink = _load_stream_map->release();
686
0
        LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" << is_last_sink
687
0
                  << ", load_id=" << print_id(_load_id);
688
689
        // send CLOSE_LOAD on all non-incremental streams if this is the last sink
690
0
        if (is_last_sink) {
691
0
            _load_stream_map->close_load(false);
692
0
        }
693
694
        // close_wait on all non-incremental streams, even if this is not the last sink.
695
        // because some per-instance data structures are now shared among all sinks
696
        // due to sharing delta writers and load stream stubs.
697
        // Do not need to wait after quorum success,
698
        // for first-stage close_wait only ensure incremental streams load has been completed,
699
        // unified waiting in the second-stage close_wait.
700
0
        RETURN_IF_ERROR(_close_wait(_non_incremental_streams(), false));
701
702
        // send CLOSE_LOAD on all incremental streams if this is the last sink.
703
        // this must happen after all non-incremental streams are closed,
704
        // so we can ensure all sinks are in close phase before closing incremental streams.
705
0
        if (is_last_sink) {
706
0
            _load_stream_map->close_load(true);
707
0
        }
708
709
        // close_wait on all incremental streams, even if this is not the last sink.
710
0
        RETURN_IF_ERROR(_close_wait(_all_streams(), true));
711
712
        // calculate and submit commit info
713
0
        if (is_last_sink) {
714
0
            DBUG_EXECUTE_IF("VTabletWriterV2.close.add_failed_tablet", {
715
0
                auto streams = _load_stream_map->at(_tablets_for_node.begin()->first);
716
0
                int64_t tablet_id = -1;
717
0
                for (auto tablet : streams->success_tablets()) {
718
0
                    tablet_id = tablet;
719
0
                    break;
720
0
                }
721
0
                if (tablet_id != -1) {
722
0
                    LOG(INFO) << "fault injection: adding failed tablet_id: " << tablet_id;
723
0
                    streams->select_one_stream()->add_failed_tablet(
724
0
                            tablet_id, Status::InternalError("fault injection"));
725
0
                } else {
726
0
                    LOG(INFO) << "fault injection: failed to inject failed tablet_id";
727
0
                }
728
0
            });
729
730
0
            std::vector<TTabletCommitInfo> tablet_commit_infos;
731
0
            RETURN_IF_ERROR(_create_commit_info(tablet_commit_infos, _load_stream_map));
732
0
            _state->add_tablet_commit_infos(tablet_commit_infos);
733
0
        }
734
735
        // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node
736
0
        int64_t num_rows_load_total = _number_input_rows + _state->num_rows_load_filtered() +
737
0
                                      _state->num_rows_load_unselected();
738
0
        _state->set_num_rows_load_total(num_rows_load_total);
739
0
        _state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() +
740
0
                                              _tablet_finder->num_filtered_rows());
741
0
        _state->update_num_rows_load_unselected(
742
0
                _tablet_finder->num_immutable_partition_filtered_rows());
743
744
0
        if (_state->enable_profile() && _state->profile_level() >= 2) {
745
            // Output detailed profiling info for auto-partition requests
746
0
            _row_distribution.output_profile_info(_operator_profile);
747
0
        }
748
749
0
        LOG(INFO) << "finished to close olap table sink. load_id=" << print_id(_load_id)
750
0
                  << ", txn_id=" << _txn_id;
751
0
    } else {
752
0
        _cancel(status);
753
0
    }
754
755
0
    _is_closed = true;
756
0
    _close_status = status;
757
0
    return status;
758
0
}
759
760
0
std::unordered_set<std::shared_ptr<LoadStreamStub>> VTabletWriterV2::_all_streams() {
761
0
    std::unordered_set<std::shared_ptr<LoadStreamStub>> all_streams;
762
0
    auto streams_for_node = _load_stream_map->get_streams_for_node();
763
0
    for (const auto& [dst_id, streams] : streams_for_node) {
764
0
        for (const auto& stream : streams->streams()) {
765
0
            all_streams.insert(stream);
766
0
        }
767
0
    }
768
0
    return all_streams;
769
0
}
770
771
0
std::unordered_set<std::shared_ptr<LoadStreamStub>> VTabletWriterV2::_non_incremental_streams() {
772
0
    std::unordered_set<std::shared_ptr<LoadStreamStub>> non_incremental_streams;
773
0
    auto streams_for_node = _load_stream_map->get_streams_for_node();
774
0
    for (const auto& [dst_id, streams] : streams_for_node) {
775
0
        for (const auto& stream : streams->streams()) {
776
0
            if (!stream->is_incremental()) {
777
0
                non_incremental_streams.insert(stream);
778
0
            }
779
0
        }
780
0
    }
781
0
    return non_incremental_streams;
782
0
}
783
784
Status VTabletWriterV2::_close_wait(
785
        std::unordered_set<std::shared_ptr<LoadStreamStub>> unfinished_streams,
786
0
        bool need_wait_after_quorum_success) {
787
0
    SCOPED_TIMER(_close_load_timer);
788
0
    Status status;
789
0
    auto streams_for_node = _load_stream_map->get_streams_for_node();
790
    // 1. first wait for quorum success
791
0
    std::unordered_set<int64_t> need_finish_tablets;
792
0
    auto partition_ids = _tablet_finder->partition_ids();
793
0
    for (const auto& part : _vpartition->get_partitions()) {
794
0
        if (partition_ids.contains(part->id)) {
795
0
            for (const auto& index : part->indexes) {
796
0
                for (const auto& tablet_id : index.tablets) {
797
0
                    need_finish_tablets.insert(tablet_id);
798
0
                }
799
0
            }
800
0
        }
801
0
    }
802
0
    while (true) {
803
0
        RETURN_IF_ERROR(_check_timeout());
804
0
        RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status, streams_for_node));
805
0
        bool quorum_success = _quorum_success(unfinished_streams, need_finish_tablets);
806
0
        if (quorum_success || unfinished_streams.empty()) {
807
0
            LOG(INFO) << "quorum_success: " << quorum_success
808
0
                      << ", is all finished: " << unfinished_streams.empty()
809
0
                      << ", txn_id: " << _txn_id << ", load_id: " << print_id(_load_id);
810
0
            break;
811
0
        }
812
0
        bthread_usleep(1000 * 10);
813
0
    }
814
815
    // 2. then wait for remaining streams as much as possible
816
0
    if (!unfinished_streams.empty() && need_wait_after_quorum_success) {
817
0
        int64_t arrival_quorum_success_time = UnixMillis();
818
0
        int64_t max_wait_time_ms = _calc_max_wait_time_ms(streams_for_node, unfinished_streams);
819
0
        while (true) {
820
0
            RETURN_IF_ERROR(_check_timeout());
821
0
            RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status, streams_for_node));
822
0
            if (unfinished_streams.empty()) {
823
0
                break;
824
0
            }
825
0
            int64_t elapsed_ms = UnixMillis() - arrival_quorum_success_time;
826
0
            if (elapsed_ms > max_wait_time_ms ||
827
0
                _state->execution_timeout() - elapsed_ms / 1000 <
828
0
                        config::quorum_success_remaining_timeout_seconds) {
829
0
                std::stringstream unfinished_streams_str;
830
0
                for (const auto& stream : unfinished_streams) {
831
0
                    unfinished_streams_str << stream->stream_id() << ",";
832
0
                }
833
0
                LOG(WARNING) << "reach max wait time, max_wait_time_ms: " << max_wait_time_ms
834
0
                             << ", load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id
835
0
                             << ", unfinished streams: " << unfinished_streams_str.str();
836
0
                break;
837
0
            }
838
0
            bthread_usleep(1000 * 10);
839
0
        }
840
0
    }
841
842
0
    if (!status.ok()) {
843
0
        LOG(WARNING) << "close_wait failed: " << status << ", load_id=" << print_id(_load_id);
844
0
    }
845
0
    return status;
846
0
}
847
848
bool VTabletWriterV2::_quorum_success(
849
        const std::unordered_set<std::shared_ptr<LoadStreamStub>>& unfinished_streams,
850
0
        const std::unordered_set<int64_t>& need_finish_tablets) {
851
0
    if (!config::enable_quorum_success_write) {
852
0
        return false;
853
0
    }
854
0
    auto streams_for_node = _load_stream_map->get_streams_for_node();
855
0
    if (need_finish_tablets.empty()) [[unlikely]] {
856
0
        return false;
857
0
    }
858
859
    // 1. calculate finished tablets replica num
860
0
    std::unordered_set<int64_t> finished_dst_ids;
861
0
    std::unordered_map<int64_t, int64_t> finished_tablets_replica;
862
0
    for (const auto& [dst_id, streams] : streams_for_node) {
863
0
        bool finished = true;
864
0
        for (const auto& stream : streams->streams()) {
865
0
            if (unfinished_streams.contains(stream) || !stream->check_cancel().ok()) {
866
0
                finished = false;
867
0
                break;
868
0
            }
869
0
        }
870
0
        if (finished) {
871
0
            finished_dst_ids.insert(dst_id);
872
0
        }
873
0
    }
874
0
    for (const auto& [dst_id, _] : streams_for_node) {
875
0
        if (!finished_dst_ids.contains(dst_id)) {
876
0
            continue;
877
0
        }
878
0
        for (const auto& tablet_id : _tablets_by_node[dst_id]) {
879
0
            finished_tablets_replica[tablet_id]++;
880
0
        }
881
0
    }
882
883
    // 2. check if quorum success
884
0
    for (const auto& tablet_id : need_finish_tablets) {
885
0
        if (finished_tablets_replica[tablet_id] < _load_required_replicas_num(tablet_id)) {
886
0
            return false;
887
0
        }
888
0
    }
889
0
    return true;
890
0
}
891
892
0
int VTabletWriterV2::_load_required_replicas_num(int64_t tablet_id) {
893
0
    auto [total_replicas_num, load_required_replicas_num] = _tablet_replica_info[tablet_id];
894
0
    if (total_replicas_num == 0) {
895
0
        return (_num_replicas + 1) / 2;
896
0
    }
897
0
    return load_required_replicas_num;
898
0
}
899
900
int64_t VTabletWriterV2::_calc_max_wait_time_ms(
901
        const std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>>& streams_for_node,
902
0
        const std::unordered_set<std::shared_ptr<LoadStreamStub>>& unfinished_streams) {
903
    // 1. calculate avg speed of all unfinished streams
904
0
    int64_t elapsed_ms = _timeout_watch.elapsed_time() / 1000 / 1000;
905
0
    int64_t total_bytes = 0;
906
0
    int finished_count = 0;
907
0
    for (const auto& [dst_id, streams] : streams_for_node) {
908
0
        for (const auto& stream : streams->streams()) {
909
0
            if (unfinished_streams.contains(stream) || !stream->check_cancel().ok()) {
910
0
                continue;
911
0
            }
912
0
            total_bytes += stream->bytes_written();
913
0
            finished_count++;
914
0
        }
915
0
    }
916
    // no data loaded in index channel, return 0
917
0
    if (total_bytes == 0 || finished_count == 0) {
918
0
        return 0;
919
0
    }
920
    // if elapsed_ms is equal to 0, explain the loaded data is too small
921
0
    if (elapsed_ms <= 0) {
922
0
        return config::quorum_success_min_wait_seconds * 1000;
923
0
    }
924
0
    double avg_speed =
925
0
            static_cast<double>(total_bytes) / (static_cast<double>(elapsed_ms) * finished_count);
926
927
    // 2. calculate max wait time of each unfinished stream and return the max value
928
0
    int64_t max_wait_time_ms = 0;
929
0
    for (const auto& [dst_id, streams] : streams_for_node) {
930
0
        for (const auto& stream : streams->streams()) {
931
0
            if (unfinished_streams.contains(stream)) {
932
0
                int64_t bytes = stream->bytes_written();
933
0
                int64_t wait =
934
0
                        avg_speed > 0 ? static_cast<int64_t>(static_cast<double>(bytes) / avg_speed)
935
0
                                      : 0;
936
0
                max_wait_time_ms = std::max(max_wait_time_ms, wait);
937
0
            }
938
0
        }
939
0
    }
940
941
    // 3. calculate max wait time
942
    // introduce quorum_success_min_wait_time_ms to avoid jitter of small load
943
0
    max_wait_time_ms -= UnixMillis() - _timeout_watch.elapsed_time() / 1000 / 1000;
944
0
    max_wait_time_ms =
945
0
            std::max(static_cast<int64_t>(static_cast<double>(max_wait_time_ms) *
946
0
                                          (1.0 + config::quorum_success_max_wait_multiplier)),
947
0
                     config::quorum_success_min_wait_seconds * 1000);
948
949
0
    return max_wait_time_ms;
950
0
}
951
952
0
Status VTabletWriterV2::_check_timeout() {
953
0
    int64_t remain_ms = static_cast<int64_t>(_state->execution_timeout()) * 1000 -
954
0
                        _timeout_watch.elapsed_time() / 1000 / 1000;
955
0
    DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { remain_ms = 0; });
956
0
    if (remain_ms <= 0) {
957
0
        LOG(WARNING) << "load timed out before close waiting, load_id=" << print_id(_load_id);
958
0
        return Status::TimedOut("load timed out before close waiting");
959
0
    }
960
0
    return Status::OK();
961
0
}
962
963
Status VTabletWriterV2::_check_streams_finish(
964
        std::unordered_set<std::shared_ptr<LoadStreamStub>>& unfinished_streams, Status& status,
965
0
        const std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>>& streams_for_node) {
966
0
    for (const auto& [dst_id, streams] : streams_for_node) {
967
0
        for (const auto& stream : streams->streams()) {
968
0
            if (!unfinished_streams.contains(stream)) {
969
0
                continue;
970
0
            }
971
0
            bool is_closed = false;
972
0
            auto stream_st = stream->close_finish_check(_state, &is_closed);
973
0
            DBUG_EXECUTE_IF("VTabletWriterV2._check_streams_finish.close_stream_failed",
974
0
                            { stream_st = Status::InternalError("close stream failed"); });
975
0
            if (!stream_st.ok()) {
976
0
                status = stream_st;
977
0
                unfinished_streams.erase(stream);
978
0
                LOG(WARNING) << "close_wait failed: " << stream_st
979
0
                             << ", load_id=" << print_id(_load_id);
980
0
            }
981
0
            if (is_closed) {
982
0
                unfinished_streams.erase(stream);
983
0
            }
984
0
        }
985
0
    }
986
0
    return status;
987
0
}
988
989
0
void VTabletWriterV2::_calc_tablets_to_commit() {
990
0
    LOG(INFO) << "saving close load info, load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id
991
0
              << ", sink_id=" << _sender_id;
992
0
    for (const auto& [dst_id, tablets] : _tablets_for_node) {
993
0
        std::vector<PTabletID> tablets_to_commit;
994
0
        std::vector<int64_t> partition_ids;
995
0
        for (const auto& [tablet_id, tablet] : tablets) {
996
0
            if (_tablet_finder->partition_ids().contains(tablet.partition_id())) {
997
0
                if (VLOG_DEBUG_IS_ON) {
998
0
                    partition_ids.push_back(tablet.partition_id());
999
0
                }
1000
0
                PTabletID t(tablet);
1001
0
                tablets_to_commit.push_back(t);
1002
0
            }
1003
0
        }
1004
0
        if (VLOG_DEBUG_IS_ON) {
1005
0
            std::string msg("close load partitions: ");
1006
0
            msg.reserve(partition_ids.size() * 7);
1007
0
            for (auto v : partition_ids) {
1008
0
                msg.append(std::to_string(v) + ", ");
1009
0
            }
1010
0
            LOG(WARNING) << msg;
1011
0
        }
1012
0
        _load_stream_map->save_tablets_to_commit(dst_id, tablets_to_commit);
1013
0
    }
1014
0
}
1015
1016
Status VTabletWriterV2::_create_commit_info(std::vector<TTabletCommitInfo>& tablet_commit_infos,
1017
13
                                            std::shared_ptr<LoadStreamMap> load_stream_map) {
1018
13
    std::unordered_map<int64_t, int> failed_tablets;
1019
13
    std::unordered_map<int64_t, Status> failed_reason;
1020
33
    load_stream_map->for_each([&](int64_t dst_id, LoadStreamStubs& streams) {
1021
33
        size_t num_success_tablets = 0;
1022
33
        size_t num_failed_tablets = 0;
1023
33
        for (auto [tablet_id, reason] : streams.failed_tablets()) {
1024
12
            failed_tablets[tablet_id]++;
1025
12
            failed_reason[tablet_id] = reason;
1026
12
            num_failed_tablets++;
1027
12
        }
1028
50
        for (auto tablet_id : streams.success_tablets()) {
1029
50
            TTabletCommitInfo commit_info;
1030
50
            commit_info.tabletId = tablet_id;
1031
50
            commit_info.backendId = dst_id;
1032
50
            tablet_commit_infos.emplace_back(std::move(commit_info));
1033
50
            num_success_tablets++;
1034
50
        }
1035
33
        LOG(INFO) << "streams to dst_id: " << dst_id << ", success tablets: " << num_success_tablets
1036
33
                  << ", failed tablets: " << num_failed_tablets;
1037
33
    });
1038
1039
13
    for (auto [tablet_id, replicas] : failed_tablets) {
1040
10
        auto [total_replicas_num, load_required_replicas_num] = _tablet_replica_info[tablet_id];
1041
10
        int max_failed_replicas = total_replicas_num == 0
1042
10
                                          ? (_num_replicas - 1) / 2
1043
10
                                          : total_replicas_num - load_required_replicas_num;
1044
10
        if (replicas > max_failed_replicas) {
1045
4
            LOG(INFO) << "tablet " << tablet_id
1046
4
                      << " failed on majority backends: " << failed_reason[tablet_id];
1047
4
            return Status::InternalError("tablet {} failed on majority backends: {}", tablet_id,
1048
4
                                         failed_reason[tablet_id]);
1049
4
        }
1050
10
    }
1051
9
    return Status::OK();
1052
13
}
1053
1054
} // namespace doris