Coverage Report

Created: 2026-06-29 23:23

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/writer/vtablet_writer_v2.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/writer/vtablet_writer_v2.h"
19
20
#include <brpc/uri.h>
21
#include <gen_cpp/DataSinks_types.h>
22
#include <gen_cpp/Descriptors_types.h>
23
#include <gen_cpp/Metrics_types.h>
24
#include <gen_cpp/Types_types.h>
25
#include <gen_cpp/internal_service.pb.h>
26
27
#include <algorithm>
28
#include <cstdint>
29
#include <mutex>
30
#include <ranges>
31
#include <string>
32
#include <unordered_map>
33
34
#include "common/compiler_util.h" // IWYU pragma: keep
35
#include "common/logging.h"
36
#include "common/metrics/doris_metrics.h"
37
#include "common/object_pool.h"
38
#include "common/signal_handler.h"
39
#include "common/status.h"
40
#include "core/block/block.h"
41
#include "exec/sink/delta_writer_v2_pool.h"
42
#include "load/delta_writer/delta_writer_v2.h"
43
#include "runtime/descriptors.h"
44
#include "runtime/exec_env.h"
45
#include "runtime/runtime_profile.h"
46
#include "runtime/runtime_state.h"
47
#include "runtime/thread_context.h"
48
#include "storage/tablet_info.h"
49
#include "util/debug_points.h"
50
#include "util/defer_op.h"
51
#include "util/uid_util.h"
52
// NOLINTNEXTLINE(unused-includes)
53
#include "exec/sink/load_stream_map_pool.h"
54
#include "exec/sink/load_stream_stub.h" // IWYU pragma: keep
55
#include "exec/sink/vtablet_block_convertor.h"
56
#include "exec/sink/vtablet_finder.h"
57
58
namespace doris {
59
60
extern bvar::Adder<int64_t> g_sink_load_back_pressure_version_time_ms;
61
62
static constexpr int64_t CLOSE_WAIT_EVENT_FALLBACK_MS = 1000;
63
64
VTabletWriterV2::VTabletWriterV2(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs,
65
                                 std::shared_ptr<Dependency> dep,
66
                                 std::shared_ptr<Dependency> fin_dep)
67
15
        : AsyncResultWriter(output_exprs, dep, fin_dep), _t_sink(t_sink) {
68
15
    DCHECK(t_sink.__isset.olap_table_sink);
69
15
}
70
71
15
VTabletWriterV2::~VTabletWriterV2() = default;
72
73
0
Status VTabletWriterV2::on_partitions_created(TCreatePartitionResult* result) {
74
    // add new tablet locations. it will use by address. so add to pool
75
0
    auto* new_locations = _pool->add(new std::vector<TTabletLocation>(result->tablets));
76
0
    _location->add_locations(*new_locations);
77
78
    // update new node info
79
0
    _nodes_info->add_nodes(result->nodes);
80
81
    // incremental open stream
82
0
    RETURN_IF_ERROR(_incremental_open_streams(result->partitions));
83
84
0
    return Status::OK();
85
0
}
86
87
0
static Status on_partitions_created(void* writer, TCreatePartitionResult* result) {
88
0
    return static_cast<VTabletWriterV2*>(writer)->on_partitions_created(result);
89
0
}
90
91
Status VTabletWriterV2::_incremental_open_streams(
92
0
        const std::vector<TOlapTablePartition>& partitions) {
93
    // do what we did in prepare() for partitions. indexes which don't change when we create new partition is orthogonal to partitions.
94
0
    std::unordered_set<int64_t> known_indexes;
95
0
    std::unordered_set<int64_t> new_backends;
96
0
    for (const auto& t_partition : partitions) {
97
0
        VOlapTablePartition* partition = nullptr;
98
0
        RETURN_IF_ERROR(_vpartition->generate_partition_from(t_partition, partition));
99
0
        for (const auto& index : partition->indexes) {
100
0
            for (const auto& tablet_id : index.tablets) {
101
0
                auto nodes = _location->find_tablet(tablet_id)->node_ids;
102
0
                for (auto& node : nodes) {
103
0
                    PTabletID tablet;
104
0
                    tablet.set_partition_id(partition->id);
105
0
                    tablet.set_index_id(index.index_id);
106
0
                    tablet.set_tablet_id(tablet_id);
107
0
                    new_backends.insert(node);
108
0
                    _tablets_for_node[node].emplace(tablet_id, tablet);
109
0
                    _tablets_by_node[node].emplace(tablet_id);
110
0
                    if (known_indexes.contains(index.index_id)) [[likely]] {
111
0
                        continue;
112
0
                    }
113
0
                    _indexes_from_node[node].emplace_back(tablet);
114
0
                    known_indexes.insert(index.index_id);
115
0
                    VLOG_DEBUG << "incremental open stream (" << partition->id << ", " << tablet_id
116
0
                               << ")";
117
0
                }
118
0
                _build_tablet_replica_info(tablet_id, partition);
119
0
            }
120
0
        }
121
0
    }
122
0
    for (int64_t dst_id : new_backends) {
123
0
        auto streams = _load_stream_map->get_or_create(dst_id, true);
124
0
        RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
125
0
    }
126
0
    return Status::OK();
127
0
}
128
129
0
Status VTabletWriterV2::_init_row_distribution() {
130
0
    _row_distribution.init({.state = _state,
131
0
                            .block_convertor = _block_convertor.get(),
132
0
                            .tablet_finder = _tablet_finder.get(),
133
0
                            .vpartition = _vpartition,
134
0
                            .add_partition_request_timer = _add_partition_request_timer,
135
0
                            .txn_id = _txn_id,
136
0
                            .pool = _pool,
137
0
                            .location = _location,
138
0
                            .vec_output_expr_ctxs = &_vec_output_expr_ctxs,
139
0
                            .schema = _schema,
140
0
                            .caller = (void*)this,
141
0
                            .create_partition_callback = &::doris::on_partitions_created});
142
143
0
    return _row_distribution.open(_output_row_desc);
144
0
}
145
146
2
Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) {
147
2
    _pool = state->obj_pool();
148
2
    auto& table_sink = _t_sink.olap_table_sink;
149
2
    _load_id.set_hi(table_sink.load_id.hi);
150
2
    _load_id.set_lo(table_sink.load_id.lo);
151
2
    signal::set_signal_task_id(_load_id);
152
2
    _txn_id = table_sink.txn_id;
153
2
    _num_replicas = table_sink.num_replicas;
154
2
    _tuple_desc_id = table_sink.tuple_id;
155
2
    _write_file_cache = table_sink.write_file_cache;
156
2
    _schema.reset(new OlapTableSchemaParam());
157
2
    RETURN_IF_ERROR(_schema->init(table_sink.schema));
158
2
    _schema->set_timestamp_ms(state->timestamp_ms());
159
2
    _schema->set_nano_seconds(state->nano_seconds());
160
2
    _schema->set_timezone(state->timezone());
161
2
    _location = _pool->add(new OlapTableLocationParam(table_sink.location));
162
2
    _nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info));
163
164
    // If distributed column list is empty, the table uses random distribution.
165
    // Mode priority (highest to lowest):
166
    //   1. FIND_TABLET_EVERY_SINK: load_to_single_tablet=true (legacy single-tablet mode).
167
    //   2. FIND_TABLET_EVERY_BATCH: default round-robin per batch.
168
2
    auto find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW;
169
2
    if (table_sink.partition.distributed_columns.empty()) {
170
0
        if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) {
171
0
            find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_SINK;
172
0
        } else {
173
0
            find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_BATCH;
174
0
        }
175
0
    }
176
2
    _vpartition = _pool->add(new doris::VOlapTablePartitionParam(_schema, table_sink.partition));
177
2
    _tablet_finder = std::make_unique<OlapTabletFinder>(_vpartition, find_tablet_mode);
178
2
    RETURN_IF_ERROR(_vpartition->init());
179
180
2
    _state = state;
181
2
    _operator_profile = profile;
182
183
2
    _sender_id = state->per_fragment_instance_idx();
184
2
    _num_senders = state->num_per_fragment_instances();
185
2
    _backend_id = state->backend_id();
186
2
    _stream_per_node = state->load_stream_per_node();
187
2
    _total_streams = state->total_load_streams();
188
2
    _num_local_sink = state->num_local_sink();
189
2
    LOG(INFO) << "init olap tablet sink, load_id: " << print_id(_load_id)
190
2
              << ", num senders: " << _num_senders << ", stream per node: " << _stream_per_node
191
2
              << ", total_streams " << _total_streams << ", num_local_sink: " << _num_local_sink;
192
2
    DCHECK(_stream_per_node > 0) << "load stream per node should be greator than 0";
193
2
    DCHECK(_total_streams > 0) << "total load streams should be greator than 0";
194
2
    DCHECK(_num_local_sink > 0) << "num local sink should be greator than 0";
195
2
    _is_high_priority =
196
2
            (state->execution_timeout() <= config::load_task_high_priority_threshold_second);
197
2
    DBUG_EXECUTE_IF("VTabletWriterV2._init.is_high_priority", { _is_high_priority = true; });
198
2
    _mem_tracker =
199
2
            std::make_shared<MemTracker>("VTabletWriterV2:" + std::to_string(state->load_job_id()));
200
2
    SCOPED_TIMER(_operator_profile->total_time_counter());
201
2
    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
202
203
    // get table's tuple descriptor
204
2
    _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_desc_id);
205
2
    DBUG_EXECUTE_IF("VTabletWriterV2._init._output_tuple_desc_null",
206
2
                    { _output_tuple_desc = nullptr; });
207
2
    if (_output_tuple_desc == nullptr) {
208
0
        return Status::InternalError("unknown destination tuple descriptor, id = {}",
209
0
                                     _tuple_desc_id);
210
0
    }
211
2
    auto output_tuple_desc_slots_size = _output_tuple_desc->slots().size();
212
2
    DBUG_EXECUTE_IF("VTabletWriterV2._init._vec_output_expr_ctxs_not_equal_output_tuple_slot",
213
2
                    { output_tuple_desc_slots_size++; });
214
2
    if (!_vec_output_expr_ctxs.empty() &&
215
2
        _vec_output_expr_ctxs.size() != output_tuple_desc_slots_size) {
216
0
        LOG(WARNING) << "output tuple slot num should be equal to num of output exprs, "
217
0
                     << "output_tuple_slot_num " << _output_tuple_desc->slots().size()
218
0
                     << " output_expr_num " << _vec_output_expr_ctxs.size();
219
0
        return Status::InvalidArgument(
220
0
                "output_tuple_slot_num {} should be equal to output_expr_num {}",
221
0
                _output_tuple_desc->slots().size(), _vec_output_expr_ctxs.size());
222
0
    }
223
224
2
    _block_convertor = std::make_unique<OlapTableBlockConvertor>(_output_tuple_desc);
225
    // if partition_type is OLAP_TABLE_SINK_HASH_PARTITIONED, we handle the processing of auto_increment column
226
    // on exchange node rather than on TabletWriter
227
2
    _block_convertor->init_autoinc_info(
228
2
            _schema->db_id(), _schema->table_id(), _state->batch_size(),
229
2
            _schema->is_fixed_partial_update() && !_schema->auto_increment_coulumn().empty(),
230
2
            _schema->auto_increment_column_unique_id());
231
2
    _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc));
232
233
    // add all counter
234
2
    _input_rows_counter = ADD_COUNTER(_operator_profile, "RowsRead", TUnit::UNIT);
235
2
    _output_rows_counter = ADD_COUNTER(_operator_profile, "RowsProduced", TUnit::UNIT);
236
2
    _filtered_rows_counter = ADD_COUNTER(_operator_profile, "RowsFiltered", TUnit::UNIT);
237
2
    _send_data_timer = ADD_TIMER_WITH_LEVEL(_operator_profile, "SendDataTime", 1);
238
2
    _wait_mem_limit_timer =
239
2
            ADD_CHILD_TIMER_WITH_LEVEL(_operator_profile, "WaitMemLimitTime", "SendDataTime", 1);
240
2
    _row_distribution_timer =
241
2
            ADD_CHILD_TIMER_WITH_LEVEL(_operator_profile, "RowDistributionTime", "SendDataTime", 1);
242
2
    _write_memtable_timer =
243
2
            ADD_CHILD_TIMER_WITH_LEVEL(_operator_profile, "WriteMemTableTime", "SendDataTime", 1);
244
2
    _add_partition_request_timer = ADD_CHILD_TIMER_WITH_LEVEL(
245
2
            _operator_profile, "AddPartitionRequestTime", "SendDataTime", 1);
246
2
    _validate_data_timer = ADD_TIMER_WITH_LEVEL(_operator_profile, "ValidateDataTime", 1);
247
2
    _open_timer = ADD_TIMER(_operator_profile, "OpenTime");
248
2
    _close_timer = ADD_TIMER(_operator_profile, "CloseWaitTime");
249
2
    _close_writer_timer = ADD_CHILD_TIMER(_operator_profile, "CloseWriterTime", "CloseWaitTime");
250
2
    _close_load_timer = ADD_CHILD_TIMER(_operator_profile, "CloseLoadTime", "CloseWaitTime");
251
2
    _load_back_pressure_version_time_ms =
252
2
            ADD_TIMER(_operator_profile, "LoadBackPressureVersionTimeMs");
253
254
2
    if (config::share_delta_writers) {
255
2
        _delta_writer_for_tablet = ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create(
256
2
                _load_id, _num_local_sink);
257
2
    } else {
258
0
        _delta_writer_for_tablet = std::make_shared<DeltaWriterV2Map>(_load_id);
259
0
    }
260
2
    _load_stream_map = ExecEnv::GetInstance()->load_stream_map_pool()->get_or_create(
261
2
            _load_id, _backend_id, _stream_per_node, _num_local_sink);
262
2
    return Status::OK();
263
2
}
264
265
0
Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) {
266
0
    RETURN_IF_ERROR(_init(state, profile));
267
0
    LOG(INFO) << "opening olap table sink, load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id
268
0
              << ", sink_id=" << _sender_id;
269
0
    _timeout_watch.start();
270
0
    SCOPED_TIMER(_operator_profile->total_time_counter());
271
0
    SCOPED_TIMER(_open_timer);
272
0
    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
273
274
0
    RETURN_IF_ERROR(_build_tablet_node_mapping());
275
0
    RETURN_IF_ERROR(_open_streams());
276
0
    RETURN_IF_ERROR(_init_row_distribution());
277
278
0
    return Status::OK();
279
0
}
280
281
0
Status VTabletWriterV2::_open_streams() {
282
0
    int fault_injection_skip_be = 0;
283
0
    bool any_backend = false;
284
0
    bool any_success = false;
285
0
    for (auto& [dst_id, _] : _tablets_for_node) {
286
0
        auto streams = _load_stream_map->get_or_create(dst_id);
287
0
        DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_one_backend", {
288
0
            if (fault_injection_skip_be < 1) {
289
0
                fault_injection_skip_be++;
290
0
                continue;
291
0
            }
292
0
        });
293
0
        DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_two_backends", {
294
0
            if (fault_injection_skip_be < 2) {
295
0
                fault_injection_skip_be++;
296
0
                continue;
297
0
            }
298
0
        });
299
0
        auto st = _open_streams_to_backend(dst_id, *streams);
300
0
        any_backend = true;
301
0
        any_success = any_success || st.ok();
302
0
    }
303
0
    if (any_backend && !any_success) {
304
0
        return Status::InternalError("failed to open streams to any BE");
305
0
    }
306
0
    return Status::OK();
307
0
}
308
309
0
Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, LoadStreamStubs& streams) {
310
0
    const auto* node_info = _nodes_info->find_node(dst_id);
311
0
    DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.node_info_null",
312
0
                    { node_info = nullptr; });
313
0
    if (node_info == nullptr) {
314
0
        return Status::InternalError("Unknown node {} in tablet location", dst_id);
315
0
    }
316
0
    auto idle_timeout_ms = _state->execution_timeout() * 1000;
317
0
    std::vector<PTabletID>& tablets_for_schema = _indexes_from_node[node_info->id];
318
0
    DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.no_schema_when_open_streams",
319
0
                    { tablets_for_schema.clear(); });
320
0
    auto st = streams.open(_state->exec_env()->brpc_streaming_client_cache(), *node_info, _txn_id,
321
0
                           *_schema, tablets_for_schema, _total_streams, idle_timeout_ms,
322
0
                           _state->enable_profile());
323
0
    if (!st.ok()) {
324
0
        LOG(WARNING) << "failed to open stream to backend " << dst_id
325
0
                     << ", load_id=" << print_id(_load_id) << ", err=" << st;
326
0
    }
327
0
    return st;
328
0
}
329
330
0
Status VTabletWriterV2::_build_tablet_node_mapping() {
331
0
    std::unordered_set<int64_t> known_indexes;
332
0
    for (const auto& partition : _vpartition->get_partitions()) {
333
0
        for (const auto& index : partition->indexes) {
334
0
            for (const auto& tablet_id : index.tablets) {
335
0
                auto* tablet_location = _location->find_tablet(tablet_id);
336
0
                DBUG_EXECUTE_IF("VTabletWriterV2._build_tablet_node_mapping.tablet_location_null",
337
0
                                { tablet_location = nullptr; });
338
0
                if (tablet_location == nullptr) {
339
0
                    return Status::InternalError("unknown tablet location, tablet id = {}",
340
0
                                                 tablet_id);
341
0
                }
342
0
                for (auto& node : tablet_location->node_ids) {
343
0
                    PTabletID tablet;
344
0
                    tablet.set_partition_id(partition->id);
345
0
                    tablet.set_index_id(index.index_id);
346
0
                    tablet.set_tablet_id(tablet_id);
347
0
                    _tablets_for_node[node].emplace(tablet_id, tablet);
348
0
                    constexpr int64_t DUMMY_TABLET_ID = 0;
349
0
                    if (tablet_id == DUMMY_TABLET_ID) [[unlikely]] {
350
                        // ignore fake tablet for auto partition
351
0
                        continue;
352
0
                    }
353
0
                    _tablets_by_node[node].emplace(tablet_id);
354
0
                    if (known_indexes.contains(index.index_id)) [[likely]] {
355
0
                        continue;
356
0
                    }
357
0
                    _indexes_from_node[node].emplace_back(tablet);
358
0
                    known_indexes.insert(index.index_id);
359
0
                }
360
0
                _build_tablet_replica_info(tablet_id, partition);
361
0
            }
362
0
        }
363
0
    }
364
0
    return Status::OK();
365
0
}
366
367
void VTabletWriterV2::_build_tablet_replica_info(const int64_t tablet_id,
368
0
                                                 VOlapTablePartition* partition) {
369
0
    if (partition != nullptr) {
370
0
        int total_replicas_num =
371
0
                partition->total_replica_num == 0 ? _num_replicas : partition->total_replica_num;
372
0
        int load_required_replicas_num = partition->load_required_replica_num == 0
373
0
                                                 ? (_num_replicas + 1) / 2
374
0
                                                 : partition->load_required_replica_num;
375
0
        _tablet_replica_info[tablet_id] =
376
0
                std::make_pair(total_replicas_num, load_required_replicas_num);
377
        // Copy version gap backends info for this tablet
378
0
        if (auto it = partition->tablet_version_gap_backends.find(tablet_id);
379
0
            it != partition->tablet_version_gap_backends.end()) {
380
0
            _tablet_version_gap_backends[tablet_id] = it->second;
381
0
        }
382
0
    } else {
383
0
        _tablet_replica_info[tablet_id] = std::make_pair(_num_replicas, (_num_replicas + 1) / 2);
384
0
    }
385
0
}
386
387
void VTabletWriterV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>& row_part_tablet_ids,
388
0
                                                RowsForTablet& rows_for_tablet) {
389
0
    for (int index_idx = 0; index_idx < row_part_tablet_ids.size(); index_idx++) {
390
0
        auto& row_ids = row_part_tablet_ids[index_idx].row_ids;
391
0
        auto& partition_ids = row_part_tablet_ids[index_idx].partition_ids;
392
0
        auto& tablet_ids = row_part_tablet_ids[index_idx].tablet_ids;
393
394
0
        for (size_t i = 0; i < row_ids.size(); i++) {
395
0
            auto& tablet_id = tablet_ids[i];
396
0
            auto it = rows_for_tablet.find(tablet_id);
397
0
            if (it == rows_for_tablet.end()) {
398
0
                Rows rows;
399
0
                rows.partition_id = partition_ids[i];
400
0
                rows.index_id = _schema->indexes()[index_idx]->index_id;
401
0
                rows.row_idxes.reserve(row_ids.size());
402
0
                auto [tmp_it, _] = rows_for_tablet.insert({tablet_id, rows});
403
0
                it = tmp_it;
404
0
            }
405
0
            it->second.row_idxes.push_back(row_ids[i]);
406
0
            _number_output_rows++;
407
0
        }
408
0
    }
409
0
}
410
411
Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id,
412
1
                                        std::vector<std::shared_ptr<LoadStreamStub>>& streams) {
413
1
    std::vector<int64_t> failed_node_ids;
414
1
    const auto* location = _location->find_tablet(tablet_id);
415
1
    DBUG_EXECUTE_IF("VTabletWriterV2._select_streams.location_null", { location = nullptr; });
416
1
    if (location == nullptr) {
417
0
        return Status::InternalError("unknown tablet location, tablet id = {}", tablet_id);
418
0
    }
419
1
    for (const auto& node_id : location->node_ids) {
420
1
        PTabletID tablet;
421
1
        tablet.set_partition_id(partition_id);
422
1
        tablet.set_index_id(index_id);
423
1
        tablet.set_tablet_id(tablet_id);
424
1
        VLOG_DEBUG << fmt::format("_select_streams P{} I{} T{}", partition_id, index_id, tablet_id);
425
1
        _tablets_for_node[node_id].emplace(tablet_id, tablet);
426
1
        auto stream = _load_stream_map->at(node_id)->select_one_stream();
427
1
        DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_two_backends", {
428
1
            LOG(INFO) << "[skip_two_backends](detail) tablet_id=" << tablet_id
429
1
                      << ", node_id=" << node_id
430
1
                      << ", stream_ok=" << (stream == nullptr ? "no" : "yes");
431
1
        });
432
1
        if (stream == nullptr) {
433
0
            LOG(WARNING) << "skip writing tablet " << tablet_id << " to backend " << node_id
434
0
                         << ": stream is not open";
435
0
            failed_node_ids.push_back(node_id);
436
0
            continue;
437
0
        }
438
1
        streams.emplace_back(std::move(stream));
439
1
    }
440
1
    DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_two_backends", {
441
1
        LOG(INFO) << "[skip_two_backends](summary) tablet_id=" << tablet_id
442
1
                  << ", num_streams=" << streams.size()
443
1
                  << ", num_nodes=" << location->node_ids.size();
444
1
    });
445
1
    if (streams.size() <= location->node_ids.size() / 2) {
446
0
        std::ostringstream success_msg;
447
0
        std::ostringstream failed_msg;
448
0
        for (auto& s : streams) {
449
0
            success_msg << ", " << s->dst_id();
450
0
        }
451
0
        for (auto id : failed_node_ids) {
452
0
            failed_msg << ", " << id;
453
0
        }
454
0
        LOG(INFO) << "failed to write enough replicas " << streams.size() << "/"
455
0
                  << location->node_ids.size() << " for tablet " << tablet_id
456
0
                  << " due to connection errors; success nodes" << success_msg.str()
457
0
                  << "; failed nodes" << failed_msg.str() << ".";
458
0
        return Status::InternalError(
459
0
                "failed to write enough replicas {}/{} for tablet {} due to connection errors",
460
0
                streams.size(), location->node_ids.size(), tablet_id);
461
0
    }
462
1
    Status st;
463
1
    for (auto& stream : streams) {
464
1
        st = stream->wait_for_schema(partition_id, index_id, tablet_id);
465
1
        if (st.ok()) {
466
1
            break;
467
1
        } else {
468
0
            LOG(WARNING) << "failed to get schema from stream " << stream << ", err=" << st;
469
0
        }
470
1
    }
471
1
    return st;
472
1
}
473
474
0
Status VTabletWriterV2::write(RuntimeState* state, Block& input_block) {
475
0
    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
476
0
    Status status = Status::OK();
477
478
0
    if (_state->query_options().dry_run_query) {
479
0
        return status;
480
0
    }
481
482
0
    int64_t total_wait_time_ms = 0;
483
0
    auto streams_for_node = _load_stream_map->get_streams_for_node();
484
0
    for (const auto& [dst_id, streams] : streams_for_node) {
485
0
        for (const auto& stream : streams->streams()) {
486
0
            auto wait_time_ms = stream->get_and_reset_load_back_pressure_version_wait_time_ms();
487
0
            if (wait_time_ms > 0) {
488
0
                total_wait_time_ms = std::max(total_wait_time_ms, wait_time_ms);
489
0
            }
490
0
        }
491
0
    }
492
0
    if (UNLIKELY(total_wait_time_ms > 0)) {
493
0
        std::this_thread::sleep_for(std::chrono::milliseconds(total_wait_time_ms));
494
0
        _load_back_pressure_version_block_ms.fetch_add(total_wait_time_ms);
495
0
    }
496
497
    // check out of limit
498
0
    RETURN_IF_ERROR(_send_new_partition_batch());
499
500
0
    const bool is_replaying_batched_block = _row_distribution._deal_batched;
501
0
    auto input_rows = input_block.rows();
502
0
    auto input_bytes = input_block.bytes();
503
0
    if (UNLIKELY(input_rows == 0)) {
504
0
        return status;
505
0
    }
506
0
    SCOPED_TIMER(_operator_profile->total_time_counter());
507
0
    _number_input_rows += input_rows;
508
    // update incrementally so that FE can get the progress.
509
    // the real 'num_rows_load_total' will be set when sink being closed.
510
0
    _state->update_num_rows_load_total(input_rows);
511
0
    _state->update_num_bytes_load_total(input_bytes);
512
0
    if (!is_replaying_batched_block) {
513
0
        DorisMetrics::instance()->load_rows->increment(input_rows);
514
0
        DorisMetrics::instance()->load_bytes->increment(input_bytes);
515
0
    }
516
517
0
    SCOPED_RAW_TIMER(&_send_data_ns);
518
    // This is just for passing compilation.
519
0
    _row_distribution_watch.start();
520
521
0
    std::shared_ptr<Block> block;
522
0
    RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
523
0
            input_block, block, _row_part_tablet_ids, _number_input_rows));
524
0
    RowsForTablet rows_for_tablet;
525
0
    _generate_rows_for_tablet(_row_part_tablet_ids, rows_for_tablet);
526
527
0
    _row_distribution_watch.stop();
528
529
    // For each tablet, send its input_rows from block to delta writer
530
0
    for (const auto& [tablet_id, rows] : rows_for_tablet) {
531
0
        RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows));
532
0
    }
533
534
0
    COUNTER_SET(_input_rows_counter, _number_input_rows);
535
0
    COUNTER_SET(_output_rows_counter, _number_output_rows);
536
0
    COUNTER_SET(_filtered_rows_counter,
537
0
                _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows());
538
0
    COUNTER_SET(_send_data_timer, _send_data_ns);
539
0
    COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time());
540
0
    COUNTER_SET(_validate_data_timer, _block_convertor->validate_data_ns());
541
542
0
    return Status::OK();
543
0
}
544
545
Status VTabletWriterV2::_write_memtable(std::shared_ptr<Block> block, int64_t tablet_id,
546
1
                                        const Rows& rows) {
547
1
    auto st = Status::OK();
548
1
    auto delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id, [&]() {
549
1
        std::vector<std::shared_ptr<LoadStreamStub>> streams;
550
1
        st = _select_streams(tablet_id, rows.partition_id, rows.index_id, streams);
551
1
        if (!st.ok()) [[unlikely]] {
552
0
            LOG(WARNING) << "select stream failed, " << st << ", load_id=" << print_id(_load_id);
553
0
            return std::unique_ptr<DeltaWriterV2>(nullptr);
554
0
        }
555
1
        WriteRequest req {
556
1
                .tablet_id = tablet_id,
557
1
                .txn_id = _txn_id,
558
1
                .index_id = rows.index_id,
559
1
                .partition_id = rows.partition_id,
560
1
                .load_id = _load_id,
561
1
                .tuple_desc = _schema->tuple_desc(),
562
1
                .table_schema_param = _schema,
563
1
                .is_high_priority = _is_high_priority,
564
1
                .write_file_cache = _write_file_cache,
565
1
                .storage_vault_id {},
566
1
                .enable_table_memtable_backpressure = _tablet_finder->is_adaptive_random_bucket(),
567
1
        };
568
1
        bool index_not_found = true;
569
1
        for (const auto& index : _schema->indexes()) {
570
1
            if (index->index_id == rows.index_id) {
571
1
                req.slots = &index->slots;
572
1
                req.schema_hash = index->schema_hash;
573
1
                index_not_found = false;
574
1
                break;
575
1
            }
576
1
        }
577
1
        DBUG_EXECUTE_IF("VTabletWriterV2._write_memtable.index_not_found",
578
1
                        { index_not_found = true; });
579
1
        if (index_not_found) [[unlikely]] {
580
0
            st = Status::InternalError("no index {} in schema", rows.index_id);
581
0
            LOG(WARNING) << "index " << rows.index_id
582
0
                         << " not found in schema, load_id=" << print_id(_load_id);
583
0
            return std::unique_ptr<DeltaWriterV2>(nullptr);
584
0
        }
585
1
        std::shared_ptr<WorkloadGroup> workload_group = nullptr;
586
1
        if (_state->get_query_ctx()) {
587
1
            workload_group = _state->workload_group();
588
1
        }
589
1
        return DeltaWriterV2::create_unique(&req, streams, workload_group);
590
1
    });
591
1
    if (delta_writer == nullptr) {
592
0
        LOG(WARNING) << "failed to open DeltaWriter for tablet " << tablet_id
593
0
                     << ", load_id=" << print_id(_load_id) << ", err: " << st;
594
0
        return Status::InternalError("failed to open DeltaWriter {}: {}", tablet_id, st.msg());
595
0
    }
596
1
    {
597
1
        SCOPED_TIMER(_wait_mem_limit_timer);
598
1
        ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush(
599
1
                [state = _state]() { return state->is_cancelled(); },
600
1
                _state->workload_group().get());
601
1
        if (_state->is_cancelled()) {
602
0
            return _state->cancel_reason();
603
0
        }
604
1
    }
605
1
    SCOPED_TIMER(_write_memtable_timer);
606
1
    bool memtable_flushed = false;
607
1
    st = delta_writer->write(
608
1
            block.get(), rows.row_idxes,
609
1
            [state = _state]() {
610
0
                if (state->is_cancelled()) {
611
0
                    return state->cancel_reason();
612
0
                }
613
0
                return Status::OK();
614
0
            },
615
1
            &memtable_flushed);
616
1
    return st;
617
1
}
618
619
1
void VTabletWriterV2::_cancel(Status status) {
620
1
    LOG(INFO) << "canceled olap table sink. load_id=" << print_id(_load_id)
621
1
              << ", txn_id=" << _txn_id << ", sink_id=" << _sender_id
622
1
              << ", due to error: " << status;
623
1
    if (_delta_writer_for_tablet) {
624
1
        _delta_writer_for_tablet->cancel(status);
625
1
        _delta_writer_for_tablet.reset();
626
1
    }
627
1
    if (_load_stream_map) {
628
1
        _load_stream_map->for_each(
629
1
                [status](int64_t dst_id, LoadStreamStubs& streams) { streams.cancel(status); });
630
1
        _load_stream_map->release();
631
1
    }
632
1
}
633
634
0
Status VTabletWriterV2::_send_new_partition_batch() {
635
0
    if (_row_distribution.need_deal_batching()) { // maybe try_close more than 1 time
636
0
        RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
637
638
0
        Block tmp_block = _row_distribution._batching_block->to_block(); // Borrow out, for lval ref
639
640
        // these order is unique.
641
        //  1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block.
642
        //  2. deal batched block
643
        //  3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that.
644
0
        _row_distribution.clear_batching_stats();
645
0
        Defer recover_batching_block([&]() {
646
0
            _row_distribution._batching_block->set_mutable_columns(
647
0
                    std::move(tmp_block).mutate_columns());
648
0
            _row_distribution._batching_block->clear_column_data();
649
0
        });
650
0
        RETURN_IF_ERROR(this->write(_state, tmp_block));
651
0
        _row_distribution._deal_batched = false;
652
0
    }
653
0
    return Status::OK();
654
0
}
655
656
0
Status VTabletWriterV2::close(Status exec_status) {
657
0
    std::lock_guard<std::mutex> close_lock(_close_mutex);
658
0
    if (_is_closed) {
659
0
        return _close_status;
660
0
    }
661
0
    LOG(INFO) << "closing olap table sink, load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id
662
0
              << ", sink_id=" << _sender_id << ", status=" << exec_status.to_string();
663
0
    SCOPED_TIMER(_close_timer);
664
0
    Status status = exec_status;
665
666
0
    if (status.ok()) {
667
0
        SCOPED_TIMER(_operator_profile->total_time_counter());
668
0
        _row_distribution._deal_batched = true;
669
0
        status = _send_new_partition_batch();
670
0
    }
671
672
0
    DBUG_EXECUTE_IF("VTabletWriterV2.close.sleep", {
673
0
        auto sleep_sec = DebugPoints::instance()->get_debug_param_or_default<int32_t>(
674
0
                "VTabletWriterV2.close.sleep", "sleep_sec", 1);
675
0
        std::this_thread::sleep_for(std::chrono::seconds(sleep_sec));
676
0
    });
677
0
    DBUG_EXECUTE_IF("VTabletWriterV2.close.cancel",
678
0
                    { status = Status::InternalError("load cancel"); });
679
0
    if (status.ok()) {
680
        // only if status is ok can we call this _profile->total_time_counter().
681
        // if status is not ok, this sink may not be prepared, so that _profile is null
682
0
        SCOPED_TIMER(_operator_profile->total_time_counter());
683
684
0
        COUNTER_SET(_input_rows_counter, _number_input_rows);
685
0
        COUNTER_SET(_output_rows_counter, _number_output_rows);
686
0
        COUNTER_SET(_filtered_rows_counter,
687
0
                    _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows());
688
0
        COUNTER_SET(_send_data_timer, _send_data_ns);
689
0
        COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time());
690
0
        COUNTER_SET(_validate_data_timer, _block_convertor->validate_data_ns());
691
0
        auto back_pressure_time_ms = _load_back_pressure_version_block_ms.load();
692
0
        COUNTER_SET(_load_back_pressure_version_time_ms, back_pressure_time_ms);
693
0
        g_sink_load_back_pressure_version_time_ms << back_pressure_time_ms;
694
695
        // close DeltaWriters
696
0
        {
697
0
            std::unordered_map<int64_t, int32_t> segments_for_tablet;
698
0
            SCOPED_TIMER(_close_writer_timer);
699
            // close all delta writers if this is the last user
700
0
            RuntimeProfile* delta_writer_profile =
701
0
                    (_state->enable_profile() && _state->profile_level() >= 2) ? _operator_profile
702
0
                                                                               : nullptr;
703
0
            auto st = _delta_writer_for_tablet->close(segments_for_tablet, delta_writer_profile);
704
0
            _delta_writer_for_tablet.reset();
705
0
            if (!st.ok()) {
706
0
                _cancel(st);
707
0
                return st;
708
0
            }
709
            // only the last sink closing delta writers will have segment num
710
0
            if (!segments_for_tablet.empty()) {
711
0
                _load_stream_map->save_segments_for_tablet(segments_for_tablet);
712
0
            }
713
0
        }
714
715
0
        _calc_tablets_to_commit();
716
0
        const bool is_last_sink = _load_stream_map->release();
717
0
        LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" << is_last_sink
718
0
                  << ", load_id=" << print_id(_load_id);
719
720
        // send CLOSE_LOAD on all non-incremental streams if this is the last sink
721
0
        if (is_last_sink) {
722
0
            _load_stream_map->close_load(false);
723
0
        }
724
725
        // close_wait on all non-incremental streams, even if this is not the last sink.
726
        // because some per-instance data structures are now shared among all sinks
727
        // due to sharing delta writers and load stream stubs.
728
        // Do not need to wait after quorum success,
729
        // for first-stage close_wait only ensure incremental streams load has been completed,
730
        // unified waiting in the second-stage close_wait.
731
0
        RETURN_IF_ERROR(_close_wait(_non_incremental_streams(), false));
732
733
        // send CLOSE_LOAD on all incremental streams if this is the last sink.
734
        // this must happen after all non-incremental streams are closed,
735
        // so we can ensure all sinks are in close phase before closing incremental streams.
736
0
        if (is_last_sink) {
737
0
            _load_stream_map->close_load(true);
738
0
        }
739
740
        // close_wait on all incremental streams, even if this is not the last sink.
741
0
        RETURN_IF_ERROR(_close_wait(_all_streams(), true));
742
743
        // calculate and submit commit info
744
0
        if (is_last_sink) {
745
0
            DBUG_EXECUTE_IF("VTabletWriterV2.close.add_failed_tablet", {
746
0
                auto streams = _load_stream_map->at(_tablets_for_node.begin()->first);
747
0
                int64_t tablet_id = -1;
748
0
                for (auto tablet : streams->success_tablets()) {
749
0
                    tablet_id = tablet;
750
0
                    break;
751
0
                }
752
0
                if (tablet_id != -1) {
753
0
                    LOG(INFO) << "fault injection: adding failed tablet_id: " << tablet_id;
754
0
                    streams->select_one_stream()->add_failed_tablet(
755
0
                            tablet_id, Status::InternalError("fault injection"));
756
0
                } else {
757
0
                    LOG(INFO) << "fault injection: failed to inject failed tablet_id";
758
0
                }
759
0
            });
760
761
0
            std::vector<TTabletCommitInfo> tablet_commit_infos;
762
0
            RETURN_IF_ERROR(_create_commit_info(tablet_commit_infos, _load_stream_map));
763
0
            _state->add_tablet_commit_infos(tablet_commit_infos);
764
0
        }
765
766
        // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node
767
0
        int64_t num_rows_load_total = _number_input_rows + _state->num_rows_load_filtered() +
768
0
                                      _state->num_rows_load_unselected();
769
0
        _state->set_num_rows_load_total(num_rows_load_total);
770
0
        _state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() +
771
0
                                              _tablet_finder->num_filtered_rows());
772
0
        _state->update_num_rows_load_unselected(
773
0
                _tablet_finder->num_immutable_partition_filtered_rows());
774
775
0
        if (_state->enable_profile() && _state->profile_level() >= 2) {
776
            // Output detailed profiling info for auto-partition requests
777
0
            _row_distribution.output_profile_info(_operator_profile);
778
0
        }
779
780
0
        LOG(INFO) << "finished to close olap table sink. load_id=" << print_id(_load_id)
781
0
                  << ", txn_id=" << _txn_id;
782
0
    } else {
783
0
        _cancel(status);
784
0
    }
785
786
0
    _is_closed = true;
787
0
    _close_status = status;
788
0
    return status;
789
0
}
790
791
0
std::unordered_set<std::shared_ptr<LoadStreamStub>> VTabletWriterV2::_all_streams() {
792
0
    std::unordered_set<std::shared_ptr<LoadStreamStub>> all_streams;
793
0
    auto streams_for_node = _load_stream_map->get_streams_for_node();
794
0
    for (const auto& [dst_id, streams] : streams_for_node) {
795
0
        for (const auto& stream : streams->streams()) {
796
0
            all_streams.insert(stream);
797
0
        }
798
0
    }
799
0
    return all_streams;
800
0
}
801
802
0
std::unordered_set<std::shared_ptr<LoadStreamStub>> VTabletWriterV2::_non_incremental_streams() {
803
0
    std::unordered_set<std::shared_ptr<LoadStreamStub>> non_incremental_streams;
804
0
    auto streams_for_node = _load_stream_map->get_streams_for_node();
805
0
    for (const auto& [dst_id, streams] : streams_for_node) {
806
0
        for (const auto& stream : streams->streams()) {
807
0
            if (!stream->is_incremental()) {
808
0
                non_incremental_streams.insert(stream);
809
0
            }
810
0
        }
811
0
    }
812
0
    return non_incremental_streams;
813
0
}
814
815
Status VTabletWriterV2::_close_wait(
816
        std::unordered_set<std::shared_ptr<LoadStreamStub>> unfinished_streams,
817
0
        bool need_wait_after_quorum_success) {
818
0
    SCOPED_TIMER(_close_load_timer);
819
0
    Status status;
820
0
    auto streams_for_node = _load_stream_map->get_streams_for_node();
821
    // 1. first wait for quorum success
822
0
    std::unordered_set<int64_t> need_finish_tablets;
823
0
    auto partition_ids = _tablet_finder->partition_ids();
824
0
    for (const auto& part : _vpartition->get_partitions()) {
825
0
        if (partition_ids.contains(part->id)) {
826
0
            for (const auto& index : part->indexes) {
827
0
                for (const auto& tablet_id : index.tablets) {
828
0
                    need_finish_tablets.insert(tablet_id);
829
0
                }
830
0
            }
831
0
        }
832
0
    }
833
0
    while (true) {
834
0
        int64_t close_wait_version = _load_stream_map->close_wait_version();
835
0
        RETURN_IF_ERROR(_check_timeout());
836
0
        RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status, streams_for_node));
837
0
        bool quorum_success = _quorum_success(unfinished_streams, need_finish_tablets);
838
0
        if (quorum_success || unfinished_streams.empty()) {
839
0
            LOG(INFO) << "quorum_success: " << quorum_success
840
0
                      << ", is all finished: " << unfinished_streams.empty()
841
0
                      << ", txn_id: " << _txn_id << ", load_id: " << print_id(_load_id);
842
0
            break;
843
0
        }
844
0
        _load_stream_map->wait_for_close_event(close_wait_version, CLOSE_WAIT_EVENT_FALLBACK_MS);
845
0
    }
846
847
    // 2. then wait for remaining streams as much as possible
848
0
    if (!unfinished_streams.empty() && need_wait_after_quorum_success) {
849
0
        int64_t arrival_quorum_success_time = UnixMillis();
850
0
        int64_t max_wait_time_ms = _calc_max_wait_time_ms(streams_for_node, unfinished_streams);
851
0
        while (true) {
852
0
            int64_t close_wait_version = _load_stream_map->close_wait_version();
853
0
            RETURN_IF_ERROR(_check_timeout());
854
0
            RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status, streams_for_node));
855
0
            if (unfinished_streams.empty()) {
856
0
                break;
857
0
            }
858
0
            int64_t elapsed_ms = UnixMillis() - arrival_quorum_success_time;
859
0
            if (elapsed_ms > max_wait_time_ms ||
860
0
                _state->execution_timeout() - elapsed_ms / 1000 <
861
0
                        config::quorum_success_remaining_timeout_seconds) {
862
0
                std::stringstream unfinished_streams_str;
863
0
                for (const auto& stream : unfinished_streams) {
864
0
                    unfinished_streams_str << stream->stream_id() << ",";
865
0
                }
866
0
                LOG(WARNING) << "reach max wait time, max_wait_time_ms: " << max_wait_time_ms
867
0
                             << ", load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id
868
0
                             << ", unfinished streams: " << unfinished_streams_str.str();
869
0
                break;
870
0
            }
871
0
            _load_stream_map->wait_for_close_event(
872
0
                    close_wait_version,
873
0
                    std::min(CLOSE_WAIT_EVENT_FALLBACK_MS, max_wait_time_ms - elapsed_ms));
874
0
        }
875
0
    }
876
877
0
    if (!status.ok()) {
878
0
        LOG(WARNING) << "close_wait failed: " << status << ", load_id=" << print_id(_load_id);
879
0
    }
880
0
    return status;
881
0
}
882
883
bool VTabletWriterV2::_quorum_success(
884
        const std::unordered_set<std::shared_ptr<LoadStreamStub>>& unfinished_streams,
885
0
        const std::unordered_set<int64_t>& need_finish_tablets) {
886
0
    if (!config::enable_quorum_success_write) {
887
0
        return false;
888
0
    }
889
0
    auto streams_for_node = _load_stream_map->get_streams_for_node();
890
0
    if (need_finish_tablets.empty()) [[unlikely]] {
891
0
        return false;
892
0
    }
893
894
    // 1. calculate finished tablets replica num
895
0
    std::unordered_set<int64_t> finished_dst_ids;
896
0
    std::unordered_map<int64_t, int64_t> finished_tablets_replica;
897
0
    for (const auto& [dst_id, streams] : streams_for_node) {
898
0
        bool finished = true;
899
0
        for (const auto& stream : streams->streams()) {
900
0
            if (unfinished_streams.contains(stream) || !stream->check_cancel().ok()) {
901
0
                finished = false;
902
0
                break;
903
0
            }
904
0
        }
905
0
        if (finished) {
906
0
            finished_dst_ids.insert(dst_id);
907
0
        }
908
0
    }
909
0
    for (const auto& [dst_id, _] : streams_for_node) {
910
0
        if (!finished_dst_ids.contains(dst_id)) {
911
0
            continue;
912
0
        }
913
0
        for (const auto& tablet_id : _tablets_by_node[dst_id]) {
914
            // Only count non-gap backends for quorum success.
915
            // Gap backends' success doesn't count toward majority write.
916
0
            auto gap_it = _tablet_version_gap_backends.find(tablet_id);
917
0
            if (gap_it == _tablet_version_gap_backends.end() ||
918
0
                gap_it->second.find(dst_id) == gap_it->second.end()) {
919
0
                finished_tablets_replica[tablet_id]++;
920
0
            }
921
0
        }
922
0
    }
923
924
    // 2. check if quorum success
925
0
    for (const auto& tablet_id : need_finish_tablets) {
926
0
        if (finished_tablets_replica[tablet_id] < _load_required_replicas_num(tablet_id)) {
927
0
            return false;
928
0
        }
929
0
    }
930
0
    return true;
931
0
}
932
933
10
int VTabletWriterV2::_load_required_replicas_num(int64_t tablet_id) {
934
10
    auto [total_replicas_num, load_required_replicas_num] = _tablet_replica_info[tablet_id];
935
10
    if (total_replicas_num == 0) {
936
0
        return (_num_replicas + 1) / 2;
937
0
    }
938
10
    return load_required_replicas_num;
939
10
}
940
941
int64_t VTabletWriterV2::_calc_max_wait_time_ms(
942
        const std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>>& streams_for_node,
943
0
        const std::unordered_set<std::shared_ptr<LoadStreamStub>>& unfinished_streams) {
944
    // 1. calculate avg speed of all unfinished streams
945
0
    int64_t elapsed_ms = _timeout_watch.elapsed_time() / 1000 / 1000;
946
0
    int64_t total_bytes = 0;
947
0
    int finished_count = 0;
948
0
    for (const auto& [dst_id, streams] : streams_for_node) {
949
0
        for (const auto& stream : streams->streams()) {
950
0
            if (unfinished_streams.contains(stream) || !stream->check_cancel().ok()) {
951
0
                continue;
952
0
            }
953
0
            total_bytes += stream->bytes_written();
954
0
            finished_count++;
955
0
        }
956
0
    }
957
    // no data loaded in index channel, return 0
958
0
    if (total_bytes == 0 || finished_count == 0) {
959
0
        return 0;
960
0
    }
961
    // if elapsed_ms is equal to 0, explain the loaded data is too small
962
0
    if (elapsed_ms <= 0) {
963
0
        return config::quorum_success_min_wait_seconds * 1000;
964
0
    }
965
0
    double avg_speed =
966
0
            static_cast<double>(total_bytes) / (static_cast<double>(elapsed_ms) * finished_count);
967
968
    // 2. calculate max wait time of each unfinished stream and return the max value
969
0
    int64_t max_wait_time_ms = 0;
970
0
    for (const auto& [dst_id, streams] : streams_for_node) {
971
0
        for (const auto& stream : streams->streams()) {
972
0
            if (unfinished_streams.contains(stream)) {
973
0
                int64_t bytes = stream->bytes_written();
974
0
                int64_t wait =
975
0
                        avg_speed > 0 ? static_cast<int64_t>(static_cast<double>(bytes) / avg_speed)
976
0
                                      : 0;
977
0
                max_wait_time_ms = std::max(max_wait_time_ms, wait);
978
0
            }
979
0
        }
980
0
    }
981
982
    // 3. calculate max wait time
983
    // introduce quorum_success_min_wait_time_ms to avoid jitter of small load
984
0
    max_wait_time_ms -= UnixMillis() - _timeout_watch.elapsed_time() / 1000 / 1000;
985
0
    max_wait_time_ms =
986
0
            std::max(static_cast<int64_t>(static_cast<double>(max_wait_time_ms) *
987
0
                                          (1.0 + config::quorum_success_max_wait_multiplier)),
988
0
                     config::quorum_success_min_wait_seconds * 1000);
989
990
0
    return max_wait_time_ms;
991
0
}
992
993
0
Status VTabletWriterV2::_check_timeout() {
994
0
    int64_t remain_ms = static_cast<int64_t>(_state->execution_timeout()) * 1000 -
995
0
                        _timeout_watch.elapsed_time() / 1000 / 1000;
996
0
    DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { remain_ms = 0; });
997
0
    if (remain_ms <= 0) {
998
0
        LOG(WARNING) << "load timed out before close waiting, load_id=" << print_id(_load_id);
999
0
        return Status::TimedOut("load timed out before close waiting");
1000
0
    }
1001
0
    return Status::OK();
1002
0
}
1003
1004
Status VTabletWriterV2::_check_streams_finish(
1005
        std::unordered_set<std::shared_ptr<LoadStreamStub>>& unfinished_streams, Status& status,
1006
0
        const std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>>& streams_for_node) {
1007
0
    for (const auto& [dst_id, streams] : streams_for_node) {
1008
0
        for (const auto& stream : streams->streams()) {
1009
0
            if (!unfinished_streams.contains(stream)) {
1010
0
                continue;
1011
0
            }
1012
0
            bool is_closed = false;
1013
0
            auto stream_st = stream->close_finish_check(_state, &is_closed);
1014
0
            DBUG_EXECUTE_IF("VTabletWriterV2._check_streams_finish.close_stream_failed",
1015
0
                            { stream_st = Status::InternalError("close stream failed"); });
1016
0
            if (!stream_st.ok()) {
1017
0
                status = stream_st;
1018
0
                unfinished_streams.erase(stream);
1019
0
                LOG(WARNING) << "close_wait failed: " << stream_st
1020
0
                             << ", load_id=" << print_id(_load_id);
1021
0
            }
1022
0
            if (is_closed) {
1023
0
                unfinished_streams.erase(stream);
1024
0
            }
1025
0
        }
1026
0
    }
1027
0
    return status;
1028
0
}
1029
1030
0
void VTabletWriterV2::_calc_tablets_to_commit() {
1031
0
    LOG(INFO) << "saving close load info, load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id
1032
0
              << ", sink_id=" << _sender_id;
1033
0
    for (const auto& [dst_id, tablets] : _tablets_for_node) {
1034
0
        std::vector<PTabletID> tablets_to_commit;
1035
0
        std::vector<int64_t> partition_ids;
1036
0
        for (const auto& [tablet_id, tablet] : tablets) {
1037
0
            if (_tablet_finder->partition_ids().contains(tablet.partition_id())) {
1038
0
                if (VLOG_DEBUG_IS_ON) {
1039
0
                    partition_ids.push_back(tablet.partition_id());
1040
0
                }
1041
0
                PTabletID t(tablet);
1042
0
                tablets_to_commit.push_back(t);
1043
0
            }
1044
0
        }
1045
0
        if (VLOG_DEBUG_IS_ON) {
1046
0
            std::string msg("close load partitions: ");
1047
0
            msg.reserve(partition_ids.size() * 7);
1048
0
            for (auto v : partition_ids) {
1049
0
                msg.append(std::to_string(v) + ", ");
1050
0
            }
1051
0
            LOG(WARNING) << msg;
1052
0
        }
1053
0
        _load_stream_map->save_tablets_to_commit(dst_id, tablets_to_commit);
1054
0
    }
1055
0
}
1056
1057
Status VTabletWriterV2::_create_commit_info(std::vector<TTabletCommitInfo>& tablet_commit_infos,
1058
13
                                            std::shared_ptr<LoadStreamMap> load_stream_map) {
1059
    // Track per-tablet non-gap success count and failure reasons
1060
13
    std::unordered_map<int64_t, int> success_tablets_replica;
1061
13
    std::unordered_set<int64_t> failed_tablets;
1062
13
    std::unordered_map<int64_t, Status> failed_reason;
1063
33
    load_stream_map->for_each([&](int64_t dst_id, LoadStreamStubs& streams) {
1064
33
        size_t num_success_tablets = 0;
1065
33
        size_t num_failed_tablets = 0;
1066
33
        for (auto [tablet_id, reason] : streams.failed_tablets()) {
1067
12
            failed_tablets.insert(tablet_id);
1068
12
            failed_reason[tablet_id] = reason;
1069
12
            num_failed_tablets++;
1070
12
        }
1071
50
        for (auto tablet_id : streams.success_tablets()) {
1072
50
            TTabletCommitInfo commit_info;
1073
50
            commit_info.tabletId = tablet_id;
1074
50
            commit_info.backendId = dst_id;
1075
50
            tablet_commit_infos.emplace_back(std::move(commit_info));
1076
            // Only count non-gap backends toward success
1077
50
            auto gap_it = _tablet_version_gap_backends.find(tablet_id);
1078
50
            if (gap_it == _tablet_version_gap_backends.end() ||
1079
50
                gap_it->second.find(dst_id) == gap_it->second.end()) {
1080
50
                success_tablets_replica[tablet_id]++;
1081
50
            }
1082
50
            num_success_tablets++;
1083
50
        }
1084
33
        LOG(INFO) << "streams to dst_id: " << dst_id << ", success tablets: " << num_success_tablets
1085
33
                  << ", failed tablets: " << num_failed_tablets;
1086
33
    });
1087
1088
13
    for (auto tablet_id : failed_tablets) {
1089
10
        int succ_count = success_tablets_replica[tablet_id];
1090
10
        int required = _load_required_replicas_num(tablet_id);
1091
10
        if (succ_count < required) {
1092
4
            LOG(INFO) << "tablet " << tablet_id
1093
4
                      << " failed on majority backends (success=" << succ_count
1094
4
                      << ", required=" << required << "): " << failed_reason[tablet_id];
1095
4
            return Status::InternalError("tablet {} failed on majority backends: {}", tablet_id,
1096
4
                                         failed_reason[tablet_id]);
1097
4
        }
1098
10
    }
1099
9
    return Status::OK();
1100
13
}
1101
1102
} // namespace doris