Coverage Report

Created: 2026-06-02 16:32

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/writer/vtablet_writer_v2.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/writer/vtablet_writer_v2.h"
19
20
#include <brpc/uri.h>
21
#include <gen_cpp/DataSinks_types.h>
22
#include <gen_cpp/Descriptors_types.h>
23
#include <gen_cpp/Metrics_types.h>
24
#include <gen_cpp/Types_types.h>
25
#include <gen_cpp/internal_service.pb.h>
26
27
#include <cstdint>
28
#include <mutex>
29
#include <ranges>
30
#include <string>
31
#include <unordered_map>
32
33
#include "common/compiler_util.h" // IWYU pragma: keep
34
#include "common/logging.h"
35
#include "common/metrics/doris_metrics.h"
36
#include "common/object_pool.h"
37
#include "common/signal_handler.h"
38
#include "common/status.h"
39
#include "core/block/block.h"
40
#include "exec/sink/delta_writer_v2_pool.h"
41
#include "load/delta_writer/delta_writer_v2.h"
42
#include "runtime/descriptors.h"
43
#include "runtime/exec_env.h"
44
#include "runtime/runtime_profile.h"
45
#include "runtime/runtime_state.h"
46
#include "runtime/thread_context.h"
47
#include "storage/tablet_info.h"
48
#include "util/debug_points.h"
49
#include "util/defer_op.h"
50
#include "util/uid_util.h"
51
// NOLINTNEXTLINE(unused-includes)
52
#include "exec/sink/load_stream_map_pool.h"
53
#include "exec/sink/load_stream_stub.h" // IWYU pragma: keep
54
#include "exec/sink/vtablet_block_convertor.h"
55
#include "exec/sink/vtablet_finder.h"
56
57
namespace doris {
58
59
extern bvar::Adder<int64_t> g_sink_load_back_pressure_version_time_ms;
60
61
VTabletWriterV2::VTabletWriterV2(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs,
62
                                 std::shared_ptr<Dependency> dep,
63
                                 std::shared_ptr<Dependency> fin_dep)
64
13
        : AsyncResultWriter(output_exprs, dep, fin_dep), _t_sink(t_sink) {
65
13
    DCHECK(t_sink.__isset.olap_table_sink);
66
13
}
67
68
13
VTabletWriterV2::~VTabletWriterV2() = default;
69
70
0
Status VTabletWriterV2::on_partitions_created(TCreatePartitionResult* result) {
71
    // add new tablet locations. it will use by address. so add to pool
72
0
    auto* new_locations = _pool->add(new std::vector<TTabletLocation>(result->tablets));
73
0
    _location->add_locations(*new_locations);
74
75
    // update new node info
76
0
    _nodes_info->add_nodes(result->nodes);
77
78
    // incremental open stream
79
0
    RETURN_IF_ERROR(_incremental_open_streams(result->partitions));
80
81
0
    return Status::OK();
82
0
}
83
84
0
static Status on_partitions_created(void* writer, TCreatePartitionResult* result) {
85
0
    return static_cast<VTabletWriterV2*>(writer)->on_partitions_created(result);
86
0
}
87
88
Status VTabletWriterV2::_incremental_open_streams(
89
0
        const std::vector<TOlapTablePartition>& partitions) {
90
    // do what we did in prepare() for partitions. indexes which don't change when we create new partition is orthogonal to partitions.
91
0
    std::unordered_set<int64_t> known_indexes;
92
0
    std::unordered_set<int64_t> new_backends;
93
0
    for (const auto& t_partition : partitions) {
94
0
        VOlapTablePartition* partition = nullptr;
95
0
        RETURN_IF_ERROR(_vpartition->generate_partition_from(t_partition, partition));
96
0
        for (const auto& index : partition->indexes) {
97
0
            for (const auto& tablet_id : index.tablets) {
98
0
                auto nodes = _location->find_tablet(tablet_id)->node_ids;
99
0
                for (auto& node : nodes) {
100
0
                    PTabletID tablet;
101
0
                    tablet.set_partition_id(partition->id);
102
0
                    tablet.set_index_id(index.index_id);
103
0
                    tablet.set_tablet_id(tablet_id);
104
0
                    new_backends.insert(node);
105
0
                    _tablets_for_node[node].emplace(tablet_id, tablet);
106
0
                    _tablets_by_node[node].emplace(tablet_id);
107
0
                    if (known_indexes.contains(index.index_id)) [[likely]] {
108
0
                        continue;
109
0
                    }
110
0
                    _indexes_from_node[node].emplace_back(tablet);
111
0
                    known_indexes.insert(index.index_id);
112
0
                    VLOG_DEBUG << "incremental open stream (" << partition->id << ", " << tablet_id
113
0
                               << ")";
114
0
                }
115
0
                _build_tablet_replica_info(tablet_id, partition);
116
0
            }
117
0
        }
118
0
    }
119
0
    for (int64_t dst_id : new_backends) {
120
0
        auto streams = _load_stream_map->get_or_create(dst_id, true);
121
0
        RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
122
0
    }
123
0
    return Status::OK();
124
0
}
125
126
0
Status VTabletWriterV2::_init_row_distribution() {
127
0
    _row_distribution.init({.state = _state,
128
0
                            .block_convertor = _block_convertor.get(),
129
0
                            .tablet_finder = _tablet_finder.get(),
130
0
                            .vpartition = _vpartition,
131
0
                            .add_partition_request_timer = _add_partition_request_timer,
132
0
                            .txn_id = _txn_id,
133
0
                            .pool = _pool,
134
0
                            .location = _location,
135
0
                            .vec_output_expr_ctxs = &_vec_output_expr_ctxs,
136
0
                            .schema = _schema,
137
0
                            .caller = (void*)this,
138
0
                            .create_partition_callback = &::doris::on_partitions_created});
139
140
0
    return _row_distribution.open(_output_row_desc);
141
0
}
142
143
0
Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) {
144
0
    _pool = state->obj_pool();
145
0
    auto& table_sink = _t_sink.olap_table_sink;
146
0
    _load_id.set_hi(table_sink.load_id.hi);
147
0
    _load_id.set_lo(table_sink.load_id.lo);
148
0
    signal::set_signal_task_id(_load_id);
149
0
    _txn_id = table_sink.txn_id;
150
0
    _num_replicas = table_sink.num_replicas;
151
0
    _tuple_desc_id = table_sink.tuple_id;
152
0
    _write_file_cache = table_sink.write_file_cache;
153
0
    _schema.reset(new OlapTableSchemaParam());
154
0
    RETURN_IF_ERROR(_schema->init(table_sink.schema));
155
0
    _schema->set_timestamp_ms(state->timestamp_ms());
156
0
    _schema->set_nano_seconds(state->nano_seconds());
157
0
    _schema->set_timezone(state->timezone());
158
0
    _location = _pool->add(new OlapTableLocationParam(table_sink.location));
159
0
    _nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info));
160
161
    // if distributed column list is empty, we can ensure that tablet is with random distribution info
162
    // and if load_to_single_tablet is set and set to true, we should find only one tablet in one partition
163
    // for the whole olap table sink
164
0
    auto find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW;
165
0
    if (table_sink.partition.distributed_columns.empty()) {
166
0
        if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) {
167
0
            find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_SINK;
168
0
        } else {
169
0
            find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_BATCH;
170
0
        }
171
0
    }
172
0
    _vpartition = _pool->add(new doris::VOlapTablePartitionParam(_schema, table_sink.partition));
173
0
    _tablet_finder = std::make_unique<OlapTabletFinder>(_vpartition, find_tablet_mode);
174
0
    RETURN_IF_ERROR(_vpartition->init());
175
176
0
    _state = state;
177
0
    _operator_profile = profile;
178
179
0
    _sender_id = state->per_fragment_instance_idx();
180
0
    _num_senders = state->num_per_fragment_instances();
181
0
    _backend_id = state->backend_id();
182
0
    _stream_per_node = state->load_stream_per_node();
183
0
    _total_streams = state->total_load_streams();
184
0
    _num_local_sink = state->num_local_sink();
185
0
    LOG(INFO) << "init olap tablet sink, load_id: " << print_id(_load_id)
186
0
              << ", num senders: " << _num_senders << ", stream per node: " << _stream_per_node
187
0
              << ", total_streams " << _total_streams << ", num_local_sink: " << _num_local_sink;
188
0
    DCHECK(_stream_per_node > 0) << "load stream per node should be greator than 0";
189
0
    DCHECK(_total_streams > 0) << "total load streams should be greator than 0";
190
0
    DCHECK(_num_local_sink > 0) << "num local sink should be greator than 0";
191
0
    _is_high_priority =
192
0
            (state->execution_timeout() <= config::load_task_high_priority_threshold_second);
193
0
    DBUG_EXECUTE_IF("VTabletWriterV2._init.is_high_priority", { _is_high_priority = true; });
194
0
    _mem_tracker =
195
0
            std::make_shared<MemTracker>("VTabletWriterV2:" + std::to_string(state->load_job_id()));
196
0
    SCOPED_TIMER(_operator_profile->total_time_counter());
197
0
    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
198
199
    // get table's tuple descriptor
200
0
    _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_desc_id);
201
0
    DBUG_EXECUTE_IF("VTabletWriterV2._init._output_tuple_desc_null",
202
0
                    { _output_tuple_desc = nullptr; });
203
0
    if (_output_tuple_desc == nullptr) {
204
0
        return Status::InternalError("unknown destination tuple descriptor, id = {}",
205
0
                                     _tuple_desc_id);
206
0
    }
207
0
    auto output_tuple_desc_slots_size = _output_tuple_desc->slots().size();
208
0
    DBUG_EXECUTE_IF("VTabletWriterV2._init._vec_output_expr_ctxs_not_equal_output_tuple_slot",
209
0
                    { output_tuple_desc_slots_size++; });
210
0
    if (!_vec_output_expr_ctxs.empty() &&
211
0
        _vec_output_expr_ctxs.size() != output_tuple_desc_slots_size) {
212
0
        LOG(WARNING) << "output tuple slot num should be equal to num of output exprs, "
213
0
                     << "output_tuple_slot_num " << _output_tuple_desc->slots().size()
214
0
                     << " output_expr_num " << _vec_output_expr_ctxs.size();
215
0
        return Status::InvalidArgument(
216
0
                "output_tuple_slot_num {} should be equal to output_expr_num {}",
217
0
                _output_tuple_desc->slots().size(), _vec_output_expr_ctxs.size());
218
0
    }
219
220
0
    _block_convertor = std::make_unique<OlapTableBlockConvertor>(
221
0
            _output_tuple_desc, _schema->is_insert_set(), _schema->is_insert(),
222
0
            _schema->is_strict_mode());
223
    // if partition_type is OLAP_TABLE_SINK_HASH_PARTITIONED, we handle the processing of auto_increment column
224
    // on exchange node rather than on TabletWriter
225
0
    _block_convertor->init_autoinc_info(
226
0
            _schema->db_id(), _schema->table_id(), _state->batch_size(),
227
0
            _schema->is_fixed_partial_update() && !_schema->auto_increment_coulumn().empty(),
228
0
            _schema->auto_increment_column_unique_id());
229
0
    _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc));
230
231
    // add all counter
232
0
    _input_rows_counter = ADD_COUNTER(_operator_profile, "RowsRead", TUnit::UNIT);
233
0
    _output_rows_counter = ADD_COUNTER(_operator_profile, "RowsProduced", TUnit::UNIT);
234
0
    _filtered_rows_counter = ADD_COUNTER(_operator_profile, "RowsFiltered", TUnit::UNIT);
235
0
    _send_data_timer = ADD_TIMER_WITH_LEVEL(_operator_profile, "SendDataTime", 1);
236
0
    _wait_mem_limit_timer =
237
0
            ADD_CHILD_TIMER_WITH_LEVEL(_operator_profile, "WaitMemLimitTime", "SendDataTime", 1);
238
0
    _row_distribution_timer =
239
0
            ADD_CHILD_TIMER_WITH_LEVEL(_operator_profile, "RowDistributionTime", "SendDataTime", 1);
240
0
    _write_memtable_timer =
241
0
            ADD_CHILD_TIMER_WITH_LEVEL(_operator_profile, "WriteMemTableTime", "SendDataTime", 1);
242
0
    _add_partition_request_timer = ADD_CHILD_TIMER_WITH_LEVEL(
243
0
            _operator_profile, "AddPartitionRequestTime", "SendDataTime", 1);
244
0
    _validate_data_timer = ADD_TIMER_WITH_LEVEL(_operator_profile, "ValidateDataTime", 1);
245
0
    _open_timer = ADD_TIMER(_operator_profile, "OpenTime");
246
0
    _close_timer = ADD_TIMER(_operator_profile, "CloseWaitTime");
247
0
    _close_writer_timer = ADD_CHILD_TIMER(_operator_profile, "CloseWriterTime", "CloseWaitTime");
248
0
    _close_load_timer = ADD_CHILD_TIMER(_operator_profile, "CloseLoadTime", "CloseWaitTime");
249
0
    _load_back_pressure_version_time_ms =
250
0
            ADD_TIMER(_operator_profile, "LoadBackPressureVersionTimeMs");
251
252
0
    if (config::share_delta_writers) {
253
0
        _delta_writer_for_tablet = ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create(
254
0
                _load_id, _num_local_sink);
255
0
    } else {
256
0
        _delta_writer_for_tablet = std::make_shared<DeltaWriterV2Map>(_load_id);
257
0
    }
258
0
    _load_stream_map = ExecEnv::GetInstance()->load_stream_map_pool()->get_or_create(
259
0
            _load_id, _backend_id, _stream_per_node, _num_local_sink);
260
0
    return Status::OK();
261
0
}
262
263
0
Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) {
264
0
    RETURN_IF_ERROR(_init(state, profile));
265
0
    LOG(INFO) << "opening olap table sink, load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id
266
0
              << ", sink_id=" << _sender_id;
267
0
    _timeout_watch.start();
268
0
    SCOPED_TIMER(_operator_profile->total_time_counter());
269
0
    SCOPED_TIMER(_open_timer);
270
0
    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
271
272
0
    RETURN_IF_ERROR(_build_tablet_node_mapping());
273
0
    RETURN_IF_ERROR(_open_streams());
274
0
    RETURN_IF_ERROR(_init_row_distribution());
275
276
0
    return Status::OK();
277
0
}
278
279
0
Status VTabletWriterV2::_open_streams() {
280
0
    int fault_injection_skip_be = 0;
281
0
    bool any_backend = false;
282
0
    bool any_success = false;
283
0
    for (auto& [dst_id, _] : _tablets_for_node) {
284
0
        auto streams = _load_stream_map->get_or_create(dst_id);
285
0
        DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_one_backend", {
286
0
            if (fault_injection_skip_be < 1) {
287
0
                fault_injection_skip_be++;
288
0
                continue;
289
0
            }
290
0
        });
291
0
        DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_two_backends", {
292
0
            if (fault_injection_skip_be < 2) {
293
0
                fault_injection_skip_be++;
294
0
                continue;
295
0
            }
296
0
        });
297
0
        auto st = _open_streams_to_backend(dst_id, *streams);
298
0
        any_backend = true;
299
0
        any_success = any_success || st.ok();
300
0
    }
301
0
    if (any_backend && !any_success) {
302
0
        return Status::InternalError("failed to open streams to any BE");
303
0
    }
304
0
    return Status::OK();
305
0
}
306
307
0
Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, LoadStreamStubs& streams) {
308
0
    const auto* node_info = _nodes_info->find_node(dst_id);
309
0
    DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.node_info_null",
310
0
                    { node_info = nullptr; });
311
0
    if (node_info == nullptr) {
312
0
        return Status::InternalError("Unknown node {} in tablet location", dst_id);
313
0
    }
314
0
    auto idle_timeout_ms = _state->execution_timeout() * 1000;
315
0
    std::vector<PTabletID>& tablets_for_schema = _indexes_from_node[node_info->id];
316
0
    DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.no_schema_when_open_streams",
317
0
                    { tablets_for_schema.clear(); });
318
0
    auto st = streams.open(_state->exec_env()->brpc_streaming_client_cache(), *node_info, _txn_id,
319
0
                           *_schema, tablets_for_schema, _total_streams, idle_timeout_ms,
320
0
                           _state->enable_profile());
321
0
    if (!st.ok()) {
322
0
        LOG(WARNING) << "failed to open stream to backend " << dst_id
323
0
                     << ", load_id=" << print_id(_load_id) << ", err=" << st;
324
0
    }
325
0
    return st;
326
0
}
327
328
0
Status VTabletWriterV2::_build_tablet_node_mapping() {
329
0
    std::unordered_set<int64_t> known_indexes;
330
0
    for (const auto& partition : _vpartition->get_partitions()) {
331
0
        for (const auto& index : partition->indexes) {
332
0
            for (const auto& tablet_id : index.tablets) {
333
0
                auto* tablet_location = _location->find_tablet(tablet_id);
334
0
                DBUG_EXECUTE_IF("VTabletWriterV2._build_tablet_node_mapping.tablet_location_null",
335
0
                                { tablet_location = nullptr; });
336
0
                if (tablet_location == nullptr) {
337
0
                    return Status::InternalError("unknown tablet location, tablet id = {}",
338
0
                                                 tablet_id);
339
0
                }
340
0
                for (auto& node : tablet_location->node_ids) {
341
0
                    PTabletID tablet;
342
0
                    tablet.set_partition_id(partition->id);
343
0
                    tablet.set_index_id(index.index_id);
344
0
                    tablet.set_tablet_id(tablet_id);
345
0
                    _tablets_for_node[node].emplace(tablet_id, tablet);
346
0
                    constexpr int64_t DUMMY_TABLET_ID = 0;
347
0
                    if (tablet_id == DUMMY_TABLET_ID) [[unlikely]] {
348
                        // ignore fake tablet for auto partition
349
0
                        continue;
350
0
                    }
351
0
                    _tablets_by_node[node].emplace(tablet_id);
352
0
                    if (known_indexes.contains(index.index_id)) [[likely]] {
353
0
                        continue;
354
0
                    }
355
0
                    _indexes_from_node[node].emplace_back(tablet);
356
0
                    known_indexes.insert(index.index_id);
357
0
                }
358
0
                _build_tablet_replica_info(tablet_id, partition);
359
0
            }
360
0
        }
361
0
    }
362
0
    return Status::OK();
363
0
}
364
365
void VTabletWriterV2::_build_tablet_replica_info(const int64_t tablet_id,
366
0
                                                 VOlapTablePartition* partition) {
367
0
    if (partition != nullptr) {
368
0
        int total_replicas_num =
369
0
                partition->total_replica_num == 0 ? _num_replicas : partition->total_replica_num;
370
0
        int load_required_replicas_num = partition->load_required_replica_num == 0
371
0
                                                 ? (_num_replicas + 1) / 2
372
0
                                                 : partition->load_required_replica_num;
373
0
        _tablet_replica_info[tablet_id] =
374
0
                std::make_pair(total_replicas_num, load_required_replicas_num);
375
        // Copy version gap backends info for this tablet
376
0
        if (auto it = partition->tablet_version_gap_backends.find(tablet_id);
377
0
            it != partition->tablet_version_gap_backends.end()) {
378
0
            _tablet_version_gap_backends[tablet_id] = it->second;
379
0
        }
380
0
    } else {
381
0
        _tablet_replica_info[tablet_id] = std::make_pair(_num_replicas, (_num_replicas + 1) / 2);
382
0
    }
383
0
}
384
385
void VTabletWriterV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>& row_part_tablet_ids,
386
0
                                                RowsForTablet& rows_for_tablet) {
387
0
    for (int index_idx = 0; index_idx < row_part_tablet_ids.size(); index_idx++) {
388
0
        auto& row_ids = row_part_tablet_ids[index_idx].row_ids;
389
0
        auto& partition_ids = row_part_tablet_ids[index_idx].partition_ids;
390
0
        auto& tablet_ids = row_part_tablet_ids[index_idx].tablet_ids;
391
392
0
        for (size_t i = 0; i < row_ids.size(); i++) {
393
0
            auto& tablet_id = tablet_ids[i];
394
0
            auto it = rows_for_tablet.find(tablet_id);
395
0
            if (it == rows_for_tablet.end()) {
396
0
                Rows rows;
397
0
                rows.partition_id = partition_ids[i];
398
0
                rows.index_id = _schema->indexes()[index_idx]->index_id;
399
0
                rows.row_idxes.reserve(row_ids.size());
400
0
                auto [tmp_it, _] = rows_for_tablet.insert({tablet_id, rows});
401
0
                it = tmp_it;
402
0
            }
403
0
            it->second.row_idxes.push_back(row_ids[i]);
404
0
            _number_output_rows++;
405
0
        }
406
0
    }
407
0
}
408
409
Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id,
410
0
                                        std::vector<std::shared_ptr<LoadStreamStub>>& streams) {
411
0
    std::vector<int64_t> failed_node_ids;
412
0
    const auto* location = _location->find_tablet(tablet_id);
413
0
    DBUG_EXECUTE_IF("VTabletWriterV2._select_streams.location_null", { location = nullptr; });
414
0
    if (location == nullptr) {
415
0
        return Status::InternalError("unknown tablet location, tablet id = {}", tablet_id);
416
0
    }
417
0
    for (const auto& node_id : location->node_ids) {
418
0
        PTabletID tablet;
419
0
        tablet.set_partition_id(partition_id);
420
0
        tablet.set_index_id(index_id);
421
0
        tablet.set_tablet_id(tablet_id);
422
0
        VLOG_DEBUG << fmt::format("_select_streams P{} I{} T{}", partition_id, index_id, tablet_id);
423
0
        _tablets_for_node[node_id].emplace(tablet_id, tablet);
424
0
        auto stream = _load_stream_map->at(node_id)->select_one_stream();
425
0
        DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_two_backends", {
426
0
            LOG(INFO) << "[skip_two_backends](detail) tablet_id=" << tablet_id
427
0
                      << ", node_id=" << node_id
428
0
                      << ", stream_ok=" << (stream == nullptr ? "no" : "yes");
429
0
        });
430
0
        if (stream == nullptr) {
431
0
            LOG(WARNING) << "skip writing tablet " << tablet_id << " to backend " << node_id
432
0
                         << ": stream is not open";
433
0
            failed_node_ids.push_back(node_id);
434
0
            continue;
435
0
        }
436
0
        streams.emplace_back(std::move(stream));
437
0
    }
438
0
    DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_two_backends", {
439
0
        LOG(INFO) << "[skip_two_backends](summary) tablet_id=" << tablet_id
440
0
                  << ", num_streams=" << streams.size()
441
0
                  << ", num_nodes=" << location->node_ids.size();
442
0
    });
443
0
    if (streams.size() <= location->node_ids.size() / 2) {
444
0
        std::ostringstream success_msg;
445
0
        std::ostringstream failed_msg;
446
0
        for (auto& s : streams) {
447
0
            success_msg << ", " << s->dst_id();
448
0
        }
449
0
        for (auto id : failed_node_ids) {
450
0
            failed_msg << ", " << id;
451
0
        }
452
0
        LOG(INFO) << "failed to write enough replicas " << streams.size() << "/"
453
0
                  << location->node_ids.size() << " for tablet " << tablet_id
454
0
                  << " due to connection errors; success nodes" << success_msg.str()
455
0
                  << "; failed nodes" << failed_msg.str() << ".";
456
0
        return Status::InternalError(
457
0
                "failed to write enough replicas {}/{} for tablet {} due to connection errors",
458
0
                streams.size(), location->node_ids.size(), tablet_id);
459
0
    }
460
0
    Status st;
461
0
    for (auto& stream : streams) {
462
0
        st = stream->wait_for_schema(partition_id, index_id, tablet_id);
463
0
        if (st.ok()) {
464
0
            break;
465
0
        } else {
466
0
            LOG(WARNING) << "failed to get schema from stream " << stream << ", err=" << st;
467
0
        }
468
0
    }
469
0
    return st;
470
0
}
471
472
0
Status VTabletWriterV2::write(RuntimeState* state, Block& input_block) {
473
0
    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
474
0
    Status status = Status::OK();
475
476
0
    if (_state->query_options().dry_run_query) {
477
0
        return status;
478
0
    }
479
480
0
    int64_t total_wait_time_ms = 0;
481
0
    auto streams_for_node = _load_stream_map->get_streams_for_node();
482
0
    for (const auto& [dst_id, streams] : streams_for_node) {
483
0
        for (const auto& stream : streams->streams()) {
484
0
            auto wait_time_ms = stream->get_and_reset_load_back_pressure_version_wait_time_ms();
485
0
            if (wait_time_ms > 0) {
486
0
                total_wait_time_ms = std::max(total_wait_time_ms, wait_time_ms);
487
0
            }
488
0
        }
489
0
    }
490
0
    if (UNLIKELY(total_wait_time_ms > 0)) {
491
0
        std::this_thread::sleep_for(std::chrono::milliseconds(total_wait_time_ms));
492
0
        _load_back_pressure_version_block_ms.fetch_add(total_wait_time_ms);
493
0
    }
494
495
    // check out of limit
496
0
    RETURN_IF_ERROR(_send_new_partition_batch());
497
498
0
    auto input_rows = input_block.rows();
499
0
    auto input_bytes = input_block.bytes();
500
0
    if (UNLIKELY(input_rows == 0)) {
501
0
        return status;
502
0
    }
503
0
    SCOPED_TIMER(_operator_profile->total_time_counter());
504
0
    _number_input_rows += input_rows;
505
    // update incrementally so that FE can get the progress.
506
    // the real 'num_rows_load_total' will be set when sink being closed.
507
0
    _state->update_num_rows_load_total(input_rows);
508
0
    _state->update_num_bytes_load_total(input_bytes);
509
0
    DorisMetrics::instance()->load_rows->increment(input_rows);
510
0
    DorisMetrics::instance()->load_bytes->increment(input_bytes);
511
512
0
    SCOPED_RAW_TIMER(&_send_data_ns);
513
    // This is just for passing compilation.
514
0
    _row_distribution_watch.start();
515
516
0
    std::shared_ptr<Block> block;
517
0
    RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
518
0
            input_block, block, _row_part_tablet_ids, _number_input_rows));
519
0
    RowsForTablet rows_for_tablet;
520
0
    _generate_rows_for_tablet(_row_part_tablet_ids, rows_for_tablet);
521
522
0
    _row_distribution_watch.stop();
523
524
    // For each tablet, send its input_rows from block to delta writer
525
0
    for (const auto& [tablet_id, rows] : rows_for_tablet) {
526
0
        RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows));
527
0
    }
528
529
0
    COUNTER_SET(_input_rows_counter, _number_input_rows);
530
0
    COUNTER_SET(_output_rows_counter, _number_output_rows);
531
0
    COUNTER_SET(_filtered_rows_counter,
532
0
                _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows());
533
0
    COUNTER_SET(_send_data_timer, _send_data_ns);
534
0
    COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time());
535
0
    COUNTER_SET(_validate_data_timer, _block_convertor->validate_data_ns());
536
537
0
    return Status::OK();
538
0
}
539
540
Status VTabletWriterV2::_write_memtable(std::shared_ptr<Block> block, int64_t tablet_id,
541
0
                                        const Rows& rows) {
542
0
    auto st = Status::OK();
543
0
    auto delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id, [&]() {
544
0
        std::vector<std::shared_ptr<LoadStreamStub>> streams;
545
0
        st = _select_streams(tablet_id, rows.partition_id, rows.index_id, streams);
546
0
        if (!st.ok()) [[unlikely]] {
547
0
            LOG(WARNING) << "select stream failed, " << st << ", load_id=" << print_id(_load_id);
548
0
            return std::unique_ptr<DeltaWriterV2>(nullptr);
549
0
        }
550
0
        WriteRequest req {
551
0
                .tablet_id = tablet_id,
552
0
                .txn_id = _txn_id,
553
0
                .index_id = rows.index_id,
554
0
                .partition_id = rows.partition_id,
555
0
                .load_id = _load_id,
556
0
                .tuple_desc = _schema->tuple_desc(),
557
0
                .table_schema_param = _schema,
558
0
                .is_high_priority = _is_high_priority,
559
0
                .write_file_cache = _write_file_cache,
560
0
                .storage_vault_id {},
561
0
        };
562
0
        bool index_not_found = true;
563
0
        for (const auto& index : _schema->indexes()) {
564
0
            if (index->index_id == rows.index_id) {
565
0
                req.slots = &index->slots;
566
0
                req.schema_hash = index->schema_hash;
567
0
                index_not_found = false;
568
0
                break;
569
0
            }
570
0
        }
571
0
        DBUG_EXECUTE_IF("VTabletWriterV2._write_memtable.index_not_found",
572
0
                        { index_not_found = true; });
573
0
        if (index_not_found) [[unlikely]] {
574
0
            st = Status::InternalError("no index {} in schema", rows.index_id);
575
0
            LOG(WARNING) << "index " << rows.index_id
576
0
                         << " not found in schema, load_id=" << print_id(_load_id);
577
0
            return std::unique_ptr<DeltaWriterV2>(nullptr);
578
0
        }
579
0
        return DeltaWriterV2::create_unique(&req, streams, _state);
580
0
    });
581
0
    if (delta_writer == nullptr) {
582
0
        LOG(WARNING) << "failed to open DeltaWriter for tablet " << tablet_id
583
0
                     << ", load_id=" << print_id(_load_id) << ", err: " << st;
584
0
        return Status::InternalError("failed to open DeltaWriter {}: {}", tablet_id, st.msg());
585
0
    }
586
0
    {
587
0
        SCOPED_TIMER(_wait_mem_limit_timer);
588
0
        ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush(
589
0
                [state = _state]() { return state->is_cancelled(); });
590
0
        if (_state->is_cancelled()) {
591
0
            return _state->cancel_reason();
592
0
        }
593
0
    }
594
0
    SCOPED_TIMER(_write_memtable_timer);
595
0
    st = delta_writer->write(block.get(), rows.row_idxes);
596
0
    return st;
597
0
}
598
599
0
void VTabletWriterV2::_cancel(Status status) {
600
0
    LOG(INFO) << "canceled olap table sink. load_id=" << print_id(_load_id)
601
0
              << ", txn_id=" << _txn_id << ", sink_id=" << _sender_id
602
0
              << ", due to error: " << status;
603
0
    if (_delta_writer_for_tablet) {
604
0
        _delta_writer_for_tablet->cancel(status);
605
0
        _delta_writer_for_tablet.reset();
606
0
    }
607
0
    if (_load_stream_map) {
608
0
        _load_stream_map->for_each(
609
0
                [status](int64_t dst_id, LoadStreamStubs& streams) { streams.cancel(status); });
610
0
        _load_stream_map->release();
611
0
    }
612
0
}
613
614
0
Status VTabletWriterV2::_send_new_partition_batch() {
615
0
    if (_row_distribution.need_deal_batching()) { // maybe try_close more than 1 time
616
0
        RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
617
618
0
        Block tmp_block = _row_distribution._batching_block->to_block(); // Borrow out, for lval ref
619
620
        // these order is unique.
621
        //  1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block.
622
        //  2. deal batched block
623
        //  3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that.
624
0
        _row_distribution.clear_batching_stats();
625
0
        Defer recover_batching_block([&]() {
626
0
            _row_distribution._batching_block->set_mutable_columns(
627
0
                    std::move(tmp_block).mutate_columns());
628
0
            _row_distribution._batching_block->clear_column_data();
629
0
        });
630
0
        RETURN_IF_ERROR(this->write(_state, tmp_block));
631
0
        _row_distribution._deal_batched = false;
632
0
    }
633
0
    return Status::OK();
634
0
}
635
636
0
Status VTabletWriterV2::close(Status exec_status) {
637
0
    std::lock_guard<std::mutex> close_lock(_close_mutex);
638
0
    if (_is_closed) {
639
0
        return _close_status;
640
0
    }
641
0
    LOG(INFO) << "closing olap table sink, load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id
642
0
              << ", sink_id=" << _sender_id << ", status=" << exec_status.to_string();
643
0
    SCOPED_TIMER(_close_timer);
644
0
    Status status = exec_status;
645
646
0
    if (status.ok()) {
647
0
        SCOPED_TIMER(_operator_profile->total_time_counter());
648
0
        _row_distribution._deal_batched = true;
649
0
        status = _send_new_partition_batch();
650
0
    }
651
652
0
    DBUG_EXECUTE_IF("VTabletWriterV2.close.sleep", {
653
0
        auto sleep_sec = DebugPoints::instance()->get_debug_param_or_default<int32_t>(
654
0
                "VTabletWriterV2.close.sleep", "sleep_sec", 1);
655
0
        std::this_thread::sleep_for(std::chrono::seconds(sleep_sec));
656
0
    });
657
0
    DBUG_EXECUTE_IF("VTabletWriterV2.close.cancel",
658
0
                    { status = Status::InternalError("load cancel"); });
659
0
    if (status.ok()) {
660
        // only if status is ok can we call this _profile->total_time_counter().
661
        // if status is not ok, this sink may not be prepared, so that _profile is null
662
0
        SCOPED_TIMER(_operator_profile->total_time_counter());
663
664
0
        COUNTER_SET(_input_rows_counter, _number_input_rows);
665
0
        COUNTER_SET(_output_rows_counter, _number_output_rows);
666
0
        COUNTER_SET(_filtered_rows_counter,
667
0
                    _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows());
668
0
        COUNTER_SET(_send_data_timer, _send_data_ns);
669
0
        COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time());
670
0
        COUNTER_SET(_validate_data_timer, _block_convertor->validate_data_ns());
671
0
        auto back_pressure_time_ms = _load_back_pressure_version_block_ms.load();
672
0
        COUNTER_SET(_load_back_pressure_version_time_ms, back_pressure_time_ms);
673
0
        g_sink_load_back_pressure_version_time_ms << back_pressure_time_ms;
674
675
        // close DeltaWriters
676
0
        {
677
0
            std::unordered_map<int64_t, int32_t> segments_for_tablet;
678
0
            SCOPED_TIMER(_close_writer_timer);
679
            // close all delta writers if this is the last user
680
0
            auto st = _delta_writer_for_tablet->close(segments_for_tablet, _operator_profile);
681
0
            _delta_writer_for_tablet.reset();
682
0
            if (!st.ok()) {
683
0
                _cancel(st);
684
0
                return st;
685
0
            }
686
            // only the last sink closing delta writers will have segment num
687
0
            if (!segments_for_tablet.empty()) {
688
0
                _load_stream_map->save_segments_for_tablet(segments_for_tablet);
689
0
            }
690
0
        }
691
692
0
        _calc_tablets_to_commit();
693
0
        const bool is_last_sink = _load_stream_map->release();
694
0
        LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" << is_last_sink
695
0
                  << ", load_id=" << print_id(_load_id);
696
697
        // send CLOSE_LOAD on all non-incremental streams if this is the last sink
698
0
        if (is_last_sink) {
699
0
            _load_stream_map->close_load(false);
700
0
        }
701
702
        // close_wait on all non-incremental streams, even if this is not the last sink.
703
        // because some per-instance data structures are now shared among all sinks
704
        // due to sharing delta writers and load stream stubs.
705
        // Do not need to wait after quorum success,
706
        // for first-stage close_wait only ensure incremental streams load has been completed,
707
        // unified waiting in the second-stage close_wait.
708
0
        RETURN_IF_ERROR(_close_wait(_non_incremental_streams(), false));
709
710
        // send CLOSE_LOAD on all incremental streams if this is the last sink.
711
        // this must happen after all non-incremental streams are closed,
712
        // so we can ensure all sinks are in close phase before closing incremental streams.
713
0
        if (is_last_sink) {
714
0
            _load_stream_map->close_load(true);
715
0
        }
716
717
        // close_wait on all incremental streams, even if this is not the last sink.
718
0
        RETURN_IF_ERROR(_close_wait(_all_streams(), true));
719
720
        // calculate and submit commit info
721
0
        if (is_last_sink) {
722
0
            DBUG_EXECUTE_IF("VTabletWriterV2.close.add_failed_tablet", {
723
0
                auto streams = _load_stream_map->at(_tablets_for_node.begin()->first);
724
0
                int64_t tablet_id = -1;
725
0
                for (auto tablet : streams->success_tablets()) {
726
0
                    tablet_id = tablet;
727
0
                    break;
728
0
                }
729
0
                if (tablet_id != -1) {
730
0
                    LOG(INFO) << "fault injection: adding failed tablet_id: " << tablet_id;
731
0
                    streams->select_one_stream()->add_failed_tablet(
732
0
                            tablet_id, Status::InternalError("fault injection"));
733
0
                } else {
734
0
                    LOG(INFO) << "fault injection: failed to inject failed tablet_id";
735
0
                }
736
0
            });
737
738
0
            std::vector<TTabletCommitInfo> tablet_commit_infos;
739
0
            RETURN_IF_ERROR(_create_commit_info(tablet_commit_infos, _load_stream_map));
740
0
            _state->add_tablet_commit_infos(tablet_commit_infos);
741
0
        }
742
743
        // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node
744
0
        int64_t num_rows_load_total = _number_input_rows + _state->num_rows_load_filtered() +
745
0
                                      _state->num_rows_load_unselected();
746
0
        _state->set_num_rows_load_total(num_rows_load_total);
747
0
        _state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() +
748
0
                                              _tablet_finder->num_filtered_rows());
749
0
        _state->update_num_rows_load_unselected(
750
0
                _tablet_finder->num_immutable_partition_filtered_rows());
751
752
0
        if (_state->enable_profile() && _state->profile_level() >= 2) {
753
            // Output detailed profiling info for auto-partition requests
754
0
            _row_distribution.output_profile_info(_operator_profile);
755
0
        }
756
757
0
        LOG(INFO) << "finished to close olap table sink. load_id=" << print_id(_load_id)
758
0
                  << ", txn_id=" << _txn_id;
759
0
    } else {
760
0
        _cancel(status);
761
0
    }
762
763
0
    _is_closed = true;
764
0
    _close_status = status;
765
0
    return status;
766
0
}
767
768
0
std::unordered_set<std::shared_ptr<LoadStreamStub>> VTabletWriterV2::_all_streams() {
769
0
    std::unordered_set<std::shared_ptr<LoadStreamStub>> all_streams;
770
0
    auto streams_for_node = _load_stream_map->get_streams_for_node();
771
0
    for (const auto& [dst_id, streams] : streams_for_node) {
772
0
        for (const auto& stream : streams->streams()) {
773
0
            all_streams.insert(stream);
774
0
        }
775
0
    }
776
0
    return all_streams;
777
0
}
778
779
0
std::unordered_set<std::shared_ptr<LoadStreamStub>> VTabletWriterV2::_non_incremental_streams() {
780
0
    std::unordered_set<std::shared_ptr<LoadStreamStub>> non_incremental_streams;
781
0
    auto streams_for_node = _load_stream_map->get_streams_for_node();
782
0
    for (const auto& [dst_id, streams] : streams_for_node) {
783
0
        for (const auto& stream : streams->streams()) {
784
0
            if (!stream->is_incremental()) {
785
0
                non_incremental_streams.insert(stream);
786
0
            }
787
0
        }
788
0
    }
789
0
    return non_incremental_streams;
790
0
}
791
792
Status VTabletWriterV2::_close_wait(
793
        std::unordered_set<std::shared_ptr<LoadStreamStub>> unfinished_streams,
794
0
        bool need_wait_after_quorum_success) {
795
0
    SCOPED_TIMER(_close_load_timer);
796
0
    Status status;
797
0
    auto streams_for_node = _load_stream_map->get_streams_for_node();
798
    // 1. first wait for quorum success
799
0
    std::unordered_set<int64_t> need_finish_tablets;
800
0
    auto partition_ids = _tablet_finder->partition_ids();
801
0
    for (const auto& part : _vpartition->get_partitions()) {
802
0
        if (partition_ids.contains(part->id)) {
803
0
            for (const auto& index : part->indexes) {
804
0
                for (const auto& tablet_id : index.tablets) {
805
0
                    need_finish_tablets.insert(tablet_id);
806
0
                }
807
0
            }
808
0
        }
809
0
    }
810
0
    while (true) {
811
0
        RETURN_IF_ERROR(_check_timeout());
812
0
        RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status, streams_for_node));
813
0
        bool quorum_success = _quorum_success(unfinished_streams, need_finish_tablets);
814
0
        if (quorum_success || unfinished_streams.empty()) {
815
0
            LOG(INFO) << "quorum_success: " << quorum_success
816
0
                      << ", is all finished: " << unfinished_streams.empty()
817
0
                      << ", txn_id: " << _txn_id << ", load_id: " << print_id(_load_id);
818
0
            break;
819
0
        }
820
0
        bthread_usleep(1000 * 10);
821
0
    }
822
823
    // 2. then wait for remaining streams as much as possible
824
0
    if (!unfinished_streams.empty() && need_wait_after_quorum_success) {
825
0
        int64_t arrival_quorum_success_time = UnixMillis();
826
0
        int64_t max_wait_time_ms = _calc_max_wait_time_ms(streams_for_node, unfinished_streams);
827
0
        while (true) {
828
0
            RETURN_IF_ERROR(_check_timeout());
829
0
            RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status, streams_for_node));
830
0
            if (unfinished_streams.empty()) {
831
0
                break;
832
0
            }
833
0
            int64_t elapsed_ms = UnixMillis() - arrival_quorum_success_time;
834
0
            if (elapsed_ms > max_wait_time_ms ||
835
0
                _state->execution_timeout() - elapsed_ms / 1000 <
836
0
                        config::quorum_success_remaining_timeout_seconds) {
837
0
                std::stringstream unfinished_streams_str;
838
0
                for (const auto& stream : unfinished_streams) {
839
0
                    unfinished_streams_str << stream->stream_id() << ",";
840
0
                }
841
0
                LOG(WARNING) << "reach max wait time, max_wait_time_ms: " << max_wait_time_ms
842
0
                             << ", load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id
843
0
                             << ", unfinished streams: " << unfinished_streams_str.str();
844
0
                break;
845
0
            }
846
0
            bthread_usleep(1000 * 10);
847
0
        }
848
0
    }
849
850
0
    if (!status.ok()) {
851
0
        LOG(WARNING) << "close_wait failed: " << status << ", load_id=" << print_id(_load_id);
852
0
    }
853
0
    return status;
854
0
}
855
856
bool VTabletWriterV2::_quorum_success(
857
        const std::unordered_set<std::shared_ptr<LoadStreamStub>>& unfinished_streams,
858
0
        const std::unordered_set<int64_t>& need_finish_tablets) {
859
0
    if (!config::enable_quorum_success_write) {
860
0
        return false;
861
0
    }
862
0
    auto streams_for_node = _load_stream_map->get_streams_for_node();
863
0
    if (need_finish_tablets.empty()) [[unlikely]] {
864
0
        return false;
865
0
    }
866
867
    // 1. calculate finished tablets replica num
868
0
    std::unordered_set<int64_t> finished_dst_ids;
869
0
    std::unordered_map<int64_t, int64_t> finished_tablets_replica;
870
0
    for (const auto& [dst_id, streams] : streams_for_node) {
871
0
        bool finished = true;
872
0
        for (const auto& stream : streams->streams()) {
873
0
            if (unfinished_streams.contains(stream) || !stream->check_cancel().ok()) {
874
0
                finished = false;
875
0
                break;
876
0
            }
877
0
        }
878
0
        if (finished) {
879
0
            finished_dst_ids.insert(dst_id);
880
0
        }
881
0
    }
882
0
    for (const auto& [dst_id, _] : streams_for_node) {
883
0
        if (!finished_dst_ids.contains(dst_id)) {
884
0
            continue;
885
0
        }
886
0
        for (const auto& tablet_id : _tablets_by_node[dst_id]) {
887
            // Only count non-gap backends for quorum success.
888
            // Gap backends' success doesn't count toward majority write.
889
0
            auto gap_it = _tablet_version_gap_backends.find(tablet_id);
890
0
            if (gap_it == _tablet_version_gap_backends.end() ||
891
0
                gap_it->second.find(dst_id) == gap_it->second.end()) {
892
0
                finished_tablets_replica[tablet_id]++;
893
0
            }
894
0
        }
895
0
    }
896
897
    // 2. check if quorum success
898
0
    for (const auto& tablet_id : need_finish_tablets) {
899
0
        if (finished_tablets_replica[tablet_id] < _load_required_replicas_num(tablet_id)) {
900
0
            return false;
901
0
        }
902
0
    }
903
0
    return true;
904
0
}
905
906
10
int VTabletWriterV2::_load_required_replicas_num(int64_t tablet_id) {
907
10
    auto [total_replicas_num, load_required_replicas_num] = _tablet_replica_info[tablet_id];
908
10
    if (total_replicas_num == 0) {
909
0
        return (_num_replicas + 1) / 2;
910
0
    }
911
10
    return load_required_replicas_num;
912
10
}
913
914
int64_t VTabletWriterV2::_calc_max_wait_time_ms(
915
        const std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>>& streams_for_node,
916
0
        const std::unordered_set<std::shared_ptr<LoadStreamStub>>& unfinished_streams) {
917
    // 1. calculate avg speed of all unfinished streams
918
0
    int64_t elapsed_ms = _timeout_watch.elapsed_time() / 1000 / 1000;
919
0
    int64_t total_bytes = 0;
920
0
    int finished_count = 0;
921
0
    for (const auto& [dst_id, streams] : streams_for_node) {
922
0
        for (const auto& stream : streams->streams()) {
923
0
            if (unfinished_streams.contains(stream) || !stream->check_cancel().ok()) {
924
0
                continue;
925
0
            }
926
0
            total_bytes += stream->bytes_written();
927
0
            finished_count++;
928
0
        }
929
0
    }
930
    // no data loaded in index channel, return 0
931
0
    if (total_bytes == 0 || finished_count == 0) {
932
0
        return 0;
933
0
    }
934
    // if elapsed_ms is equal to 0, explain the loaded data is too small
935
0
    if (elapsed_ms <= 0) {
936
0
        return config::quorum_success_min_wait_seconds * 1000;
937
0
    }
938
0
    double avg_speed =
939
0
            static_cast<double>(total_bytes) / (static_cast<double>(elapsed_ms) * finished_count);
940
941
    // 2. calculate max wait time of each unfinished stream and return the max value
942
0
    int64_t max_wait_time_ms = 0;
943
0
    for (const auto& [dst_id, streams] : streams_for_node) {
944
0
        for (const auto& stream : streams->streams()) {
945
0
            if (unfinished_streams.contains(stream)) {
946
0
                int64_t bytes = stream->bytes_written();
947
0
                int64_t wait =
948
0
                        avg_speed > 0 ? static_cast<int64_t>(static_cast<double>(bytes) / avg_speed)
949
0
                                      : 0;
950
0
                max_wait_time_ms = std::max(max_wait_time_ms, wait);
951
0
            }
952
0
        }
953
0
    }
954
955
    // 3. calculate max wait time
956
    // introduce quorum_success_min_wait_time_ms to avoid jitter of small load
957
0
    max_wait_time_ms -= UnixMillis() - _timeout_watch.elapsed_time() / 1000 / 1000;
958
0
    max_wait_time_ms =
959
0
            std::max(static_cast<int64_t>(static_cast<double>(max_wait_time_ms) *
960
0
                                          (1.0 + config::quorum_success_max_wait_multiplier)),
961
0
                     config::quorum_success_min_wait_seconds * 1000);
962
963
0
    return max_wait_time_ms;
964
0
}
965
966
0
Status VTabletWriterV2::_check_timeout() {
967
0
    int64_t remain_ms = static_cast<int64_t>(_state->execution_timeout()) * 1000 -
968
0
                        _timeout_watch.elapsed_time() / 1000 / 1000;
969
0
    DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { remain_ms = 0; });
970
0
    if (remain_ms <= 0) {
971
0
        LOG(WARNING) << "load timed out before close waiting, load_id=" << print_id(_load_id);
972
0
        return Status::TimedOut("load timed out before close waiting");
973
0
    }
974
0
    return Status::OK();
975
0
}
976
977
Status VTabletWriterV2::_check_streams_finish(
978
        std::unordered_set<std::shared_ptr<LoadStreamStub>>& unfinished_streams, Status& status,
979
0
        const std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>>& streams_for_node) {
980
0
    for (const auto& [dst_id, streams] : streams_for_node) {
981
0
        for (const auto& stream : streams->streams()) {
982
0
            if (!unfinished_streams.contains(stream)) {
983
0
                continue;
984
0
            }
985
0
            bool is_closed = false;
986
0
            auto stream_st = stream->close_finish_check(_state, &is_closed);
987
0
            DBUG_EXECUTE_IF("VTabletWriterV2._check_streams_finish.close_stream_failed",
988
0
                            { stream_st = Status::InternalError("close stream failed"); });
989
0
            if (!stream_st.ok()) {
990
0
                status = stream_st;
991
0
                unfinished_streams.erase(stream);
992
0
                LOG(WARNING) << "close_wait failed: " << stream_st
993
0
                             << ", load_id=" << print_id(_load_id);
994
0
            }
995
0
            if (is_closed) {
996
0
                unfinished_streams.erase(stream);
997
0
            }
998
0
        }
999
0
    }
1000
0
    return status;
1001
0
}
1002
1003
0
void VTabletWriterV2::_calc_tablets_to_commit() {
1004
0
    LOG(INFO) << "saving close load info, load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id
1005
0
              << ", sink_id=" << _sender_id;
1006
0
    for (const auto& [dst_id, tablets] : _tablets_for_node) {
1007
0
        std::vector<PTabletID> tablets_to_commit;
1008
0
        std::vector<int64_t> partition_ids;
1009
0
        for (const auto& [tablet_id, tablet] : tablets) {
1010
0
            if (_tablet_finder->partition_ids().contains(tablet.partition_id())) {
1011
0
                if (VLOG_DEBUG_IS_ON) {
1012
0
                    partition_ids.push_back(tablet.partition_id());
1013
0
                }
1014
0
                PTabletID t(tablet);
1015
0
                tablets_to_commit.push_back(t);
1016
0
            }
1017
0
        }
1018
0
        if (VLOG_DEBUG_IS_ON) {
1019
0
            std::string msg("close load partitions: ");
1020
0
            msg.reserve(partition_ids.size() * 7);
1021
0
            for (auto v : partition_ids) {
1022
0
                msg.append(std::to_string(v) + ", ");
1023
0
            }
1024
0
            LOG(WARNING) << msg;
1025
0
        }
1026
0
        _load_stream_map->save_tablets_to_commit(dst_id, tablets_to_commit);
1027
0
    }
1028
0
}
1029
1030
Status VTabletWriterV2::_create_commit_info(std::vector<TTabletCommitInfo>& tablet_commit_infos,
1031
13
                                            std::shared_ptr<LoadStreamMap> load_stream_map) {
1032
    // Track per-tablet non-gap success count and failure reasons
1033
13
    std::unordered_map<int64_t, int> success_tablets_replica;
1034
13
    std::unordered_set<int64_t> failed_tablets;
1035
13
    std::unordered_map<int64_t, Status> failed_reason;
1036
33
    load_stream_map->for_each([&](int64_t dst_id, LoadStreamStubs& streams) {
1037
33
        size_t num_success_tablets = 0;
1038
33
        size_t num_failed_tablets = 0;
1039
33
        for (auto [tablet_id, reason] : streams.failed_tablets()) {
1040
12
            failed_tablets.insert(tablet_id);
1041
12
            failed_reason[tablet_id] = reason;
1042
12
            num_failed_tablets++;
1043
12
        }
1044
50
        for (auto tablet_id : streams.success_tablets()) {
1045
50
            TTabletCommitInfo commit_info;
1046
50
            commit_info.tabletId = tablet_id;
1047
50
            commit_info.backendId = dst_id;
1048
50
            tablet_commit_infos.emplace_back(std::move(commit_info));
1049
            // Only count non-gap backends toward success
1050
50
            auto gap_it = _tablet_version_gap_backends.find(tablet_id);
1051
50
            if (gap_it == _tablet_version_gap_backends.end() ||
1052
50
                gap_it->second.find(dst_id) == gap_it->second.end()) {
1053
50
                success_tablets_replica[tablet_id]++;
1054
50
            }
1055
50
            num_success_tablets++;
1056
50
        }
1057
33
        LOG(INFO) << "streams to dst_id: " << dst_id << ", success tablets: " << num_success_tablets
1058
33
                  << ", failed tablets: " << num_failed_tablets;
1059
33
    });
1060
1061
13
    for (auto tablet_id : failed_tablets) {
1062
10
        int succ_count = success_tablets_replica[tablet_id];
1063
10
        int required = _load_required_replicas_num(tablet_id);
1064
10
        if (succ_count < required) {
1065
4
            LOG(INFO) << "tablet " << tablet_id
1066
4
                      << " failed on majority backends (success=" << succ_count
1067
4
                      << ", required=" << required << "): " << failed_reason[tablet_id];
1068
4
            return Status::InternalError("tablet {} failed on majority backends: {}", tablet_id,
1069
4
                                         failed_reason[tablet_id]);
1070
4
        }
1071
10
    }
1072
9
    return Status::OK();
1073
13
}
1074
1075
} // namespace doris