Coverage Report

Created: 2026-03-18 00:36

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/writer/vtablet_writer_v2.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/writer/vtablet_writer_v2.h"
19
20
#include <brpc/uri.h>
21
#include <gen_cpp/DataSinks_types.h>
22
#include <gen_cpp/Descriptors_types.h>
23
#include <gen_cpp/Metrics_types.h>
24
#include <gen_cpp/Types_types.h>
25
#include <gen_cpp/internal_service.pb.h>
26
27
#include <cstdint>
28
#include <mutex>
29
#include <ranges>
30
#include <string>
31
#include <unordered_map>
32
33
#include "common/compiler_util.h" // IWYU pragma: keep
34
#include "common/logging.h"
35
#include "common/metrics/doris_metrics.h"
36
#include "common/object_pool.h"
37
#include "common/signal_handler.h"
38
#include "common/status.h"
39
#include "core/block/block.h"
40
#include "exec/sink/delta_writer_v2_pool.h"
41
#include "load/delta_writer/delta_writer_v2.h"
42
#include "runtime/descriptors.h"
43
#include "runtime/exec_env.h"
44
#include "runtime/runtime_profile.h"
45
#include "runtime/runtime_state.h"
46
#include "runtime/thread_context.h"
47
#include "storage/tablet_info.h"
48
#include "util/debug_points.h"
49
#include "util/defer_op.h"
50
#include "util/uid_util.h"
51
// NOLINTNEXTLINE(unused-includes)
52
#include "exec/sink/load_stream_map_pool.h"
53
#include "exec/sink/load_stream_stub.h" // IWYU pragma: keep
54
#include "exec/sink/vtablet_block_convertor.h"
55
#include "exec/sink/vtablet_finder.h"
56
57
namespace doris {
58
#include "common/compile_check_begin.h"
59
60
extern bvar::Adder<int64_t> g_sink_load_back_pressure_version_time_ms;
61
62
VTabletWriterV2::VTabletWriterV2(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs,
63
                                 std::shared_ptr<Dependency> dep,
64
                                 std::shared_ptr<Dependency> fin_dep)
65
13
        : AsyncResultWriter(output_exprs, dep, fin_dep), _t_sink(t_sink) {
66
13
    DCHECK(t_sink.__isset.olap_table_sink);
67
13
}
68
69
13
VTabletWriterV2::~VTabletWriterV2() = default;
70
71
0
Status VTabletWriterV2::on_partitions_created(TCreatePartitionResult* result) {
72
    // add new tablet locations. it will use by address. so add to pool
73
0
    auto* new_locations = _pool->add(new std::vector<TTabletLocation>(result->tablets));
74
0
    _location->add_locations(*new_locations);
75
76
    // update new node info
77
0
    _nodes_info->add_nodes(result->nodes);
78
79
    // incremental open stream
80
0
    RETURN_IF_ERROR(_incremental_open_streams(result->partitions));
81
82
0
    return Status::OK();
83
0
}
84
85
0
static Status on_partitions_created(void* writer, TCreatePartitionResult* result) {
86
0
    return static_cast<VTabletWriterV2*>(writer)->on_partitions_created(result);
87
0
}
88
89
Status VTabletWriterV2::_incremental_open_streams(
90
0
        const std::vector<TOlapTablePartition>& partitions) {
91
    // do what we did in prepare() for partitions. indexes which don't change when we create new partition is orthogonal to partitions.
92
0
    std::unordered_set<int64_t> known_indexes;
93
0
    std::unordered_set<int64_t> new_backends;
94
0
    for (const auto& t_partition : partitions) {
95
0
        VOlapTablePartition* partition = nullptr;
96
0
        RETURN_IF_ERROR(_vpartition->generate_partition_from(t_partition, partition));
97
0
        for (const auto& index : partition->indexes) {
98
0
            for (const auto& tablet_id : index.tablets) {
99
0
                auto nodes = _location->find_tablet(tablet_id)->node_ids;
100
0
                for (auto& node : nodes) {
101
0
                    PTabletID tablet;
102
0
                    tablet.set_partition_id(partition->id);
103
0
                    tablet.set_index_id(index.index_id);
104
0
                    tablet.set_tablet_id(tablet_id);
105
0
                    new_backends.insert(node);
106
0
                    _tablets_for_node[node].emplace(tablet_id, tablet);
107
0
                    _tablets_by_node[node].emplace(tablet_id);
108
0
                    if (known_indexes.contains(index.index_id)) [[likely]] {
109
0
                        continue;
110
0
                    }
111
0
                    _indexes_from_node[node].emplace_back(tablet);
112
0
                    known_indexes.insert(index.index_id);
113
0
                    VLOG_DEBUG << "incremental open stream (" << partition->id << ", " << tablet_id
114
0
                               << ")";
115
0
                }
116
0
                _build_tablet_replica_info(tablet_id, partition);
117
0
            }
118
0
        }
119
0
    }
120
0
    for (int64_t dst_id : new_backends) {
121
0
        auto streams = _load_stream_map->get_or_create(dst_id, true);
122
0
        RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
123
0
    }
124
0
    return Status::OK();
125
0
}
126
127
0
Status VTabletWriterV2::_init_row_distribution() {
128
0
    _row_distribution.init({.state = _state,
129
0
                            .block_convertor = _block_convertor.get(),
130
0
                            .tablet_finder = _tablet_finder.get(),
131
0
                            .vpartition = _vpartition,
132
0
                            .add_partition_request_timer = _add_partition_request_timer,
133
0
                            .txn_id = _txn_id,
134
0
                            .pool = _pool,
135
0
                            .location = _location,
136
0
                            .vec_output_expr_ctxs = &_vec_output_expr_ctxs,
137
0
                            .schema = _schema,
138
0
                            .caller = (void*)this,
139
0
                            .create_partition_callback = &::doris::on_partitions_created});
140
141
0
    return _row_distribution.open(_output_row_desc);
142
0
}
143
144
0
Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) {
145
0
    _pool = state->obj_pool();
146
0
    auto& table_sink = _t_sink.olap_table_sink;
147
0
    _load_id.set_hi(table_sink.load_id.hi);
148
0
    _load_id.set_lo(table_sink.load_id.lo);
149
0
    signal::set_signal_task_id(_load_id);
150
0
    _txn_id = table_sink.txn_id;
151
0
    _num_replicas = table_sink.num_replicas;
152
0
    _tuple_desc_id = table_sink.tuple_id;
153
0
    _write_file_cache = table_sink.write_file_cache;
154
0
    _schema.reset(new OlapTableSchemaParam());
155
0
    RETURN_IF_ERROR(_schema->init(table_sink.schema));
156
0
    _schema->set_timestamp_ms(state->timestamp_ms());
157
0
    _schema->set_nano_seconds(state->nano_seconds());
158
0
    _schema->set_timezone(state->timezone());
159
0
    _location = _pool->add(new OlapTableLocationParam(table_sink.location));
160
0
    _nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info));
161
162
    // if distributed column list is empty, we can ensure that tablet is with random distribution info
163
    // and if load_to_single_tablet is set and set to true, we should find only one tablet in one partition
164
    // for the whole olap table sink
165
0
    auto find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW;
166
0
    if (table_sink.partition.distributed_columns.empty()) {
167
0
        if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) {
168
0
            find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_SINK;
169
0
        } else {
170
0
            find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_BATCH;
171
0
        }
172
0
    }
173
0
    _vpartition = _pool->add(new doris::VOlapTablePartitionParam(_schema, table_sink.partition));
174
0
    _tablet_finder = std::make_unique<OlapTabletFinder>(_vpartition, find_tablet_mode);
175
0
    RETURN_IF_ERROR(_vpartition->init());
176
177
0
    _state = state;
178
0
    _operator_profile = profile;
179
180
0
    _sender_id = state->per_fragment_instance_idx();
181
0
    _num_senders = state->num_per_fragment_instances();
182
0
    _backend_id = state->backend_id();
183
0
    _stream_per_node = state->load_stream_per_node();
184
0
    _total_streams = state->total_load_streams();
185
0
    _num_local_sink = state->num_local_sink();
186
0
    LOG(INFO) << "init olap tablet sink, load_id: " << print_id(_load_id)
187
0
              << ", num senders: " << _num_senders << ", stream per node: " << _stream_per_node
188
0
              << ", total_streams " << _total_streams << ", num_local_sink: " << _num_local_sink;
189
0
    DCHECK(_stream_per_node > 0) << "load stream per node should be greator than 0";
190
0
    DCHECK(_total_streams > 0) << "total load streams should be greator than 0";
191
0
    DCHECK(_num_local_sink > 0) << "num local sink should be greator than 0";
192
0
    _is_high_priority =
193
0
            (state->execution_timeout() <= config::load_task_high_priority_threshold_second);
194
0
    DBUG_EXECUTE_IF("VTabletWriterV2._init.is_high_priority", { _is_high_priority = true; });
195
0
    _mem_tracker =
196
0
            std::make_shared<MemTracker>("VTabletWriterV2:" + std::to_string(state->load_job_id()));
197
0
    SCOPED_TIMER(_operator_profile->total_time_counter());
198
0
    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
199
200
    // get table's tuple descriptor
201
0
    _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_desc_id);
202
0
    DBUG_EXECUTE_IF("VTabletWriterV2._init._output_tuple_desc_null",
203
0
                    { _output_tuple_desc = nullptr; });
204
0
    if (_output_tuple_desc == nullptr) {
205
0
        return Status::InternalError("unknown destination tuple descriptor, id = {}",
206
0
                                     _tuple_desc_id);
207
0
    }
208
0
    auto output_tuple_desc_slots_size = _output_tuple_desc->slots().size();
209
0
    DBUG_EXECUTE_IF("VTabletWriterV2._init._vec_output_expr_ctxs_not_equal_output_tuple_slot",
210
0
                    { output_tuple_desc_slots_size++; });
211
0
    if (!_vec_output_expr_ctxs.empty() &&
212
0
        _vec_output_expr_ctxs.size() != output_tuple_desc_slots_size) {
213
0
        LOG(WARNING) << "output tuple slot num should be equal to num of output exprs, "
214
0
                     << "output_tuple_slot_num " << _output_tuple_desc->slots().size()
215
0
                     << " output_expr_num " << _vec_output_expr_ctxs.size();
216
0
        return Status::InvalidArgument(
217
0
                "output_tuple_slot_num {} should be equal to output_expr_num {}",
218
0
                _output_tuple_desc->slots().size(), _vec_output_expr_ctxs.size());
219
0
    }
220
221
0
    _block_convertor = std::make_unique<OlapTableBlockConvertor>(_output_tuple_desc);
222
    // if partition_type is OLAP_TABLE_SINK_HASH_PARTITIONED, we handle the processing of auto_increment column
223
    // on exchange node rather than on TabletWriter
224
0
    _block_convertor->init_autoinc_info(
225
0
            _schema->db_id(), _schema->table_id(), _state->batch_size(),
226
0
            _schema->is_fixed_partial_update() && !_schema->auto_increment_coulumn().empty(),
227
0
            _schema->auto_increment_column_unique_id());
228
0
    _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc));
229
230
    // add all counter
231
0
    _input_rows_counter = ADD_COUNTER(_operator_profile, "RowsRead", TUnit::UNIT);
232
0
    _output_rows_counter = ADD_COUNTER(_operator_profile, "RowsProduced", TUnit::UNIT);
233
0
    _filtered_rows_counter = ADD_COUNTER(_operator_profile, "RowsFiltered", TUnit::UNIT);
234
0
    _send_data_timer = ADD_TIMER_WITH_LEVEL(_operator_profile, "SendDataTime", 1);
235
0
    _wait_mem_limit_timer =
236
0
            ADD_CHILD_TIMER_WITH_LEVEL(_operator_profile, "WaitMemLimitTime", "SendDataTime", 1);
237
0
    _row_distribution_timer =
238
0
            ADD_CHILD_TIMER_WITH_LEVEL(_operator_profile, "RowDistributionTime", "SendDataTime", 1);
239
0
    _write_memtable_timer =
240
0
            ADD_CHILD_TIMER_WITH_LEVEL(_operator_profile, "WriteMemTableTime", "SendDataTime", 1);
241
0
    _add_partition_request_timer = ADD_CHILD_TIMER_WITH_LEVEL(
242
0
            _operator_profile, "AddPartitionRequestTime", "SendDataTime", 1);
243
0
    _validate_data_timer = ADD_TIMER_WITH_LEVEL(_operator_profile, "ValidateDataTime", 1);
244
0
    _open_timer = ADD_TIMER(_operator_profile, "OpenTime");
245
0
    _close_timer = ADD_TIMER(_operator_profile, "CloseWaitTime");
246
0
    _close_writer_timer = ADD_CHILD_TIMER(_operator_profile, "CloseWriterTime", "CloseWaitTime");
247
0
    _close_load_timer = ADD_CHILD_TIMER(_operator_profile, "CloseLoadTime", "CloseWaitTime");
248
0
    _load_back_pressure_version_time_ms =
249
0
            ADD_TIMER(_operator_profile, "LoadBackPressureVersionTimeMs");
250
251
0
    if (config::share_delta_writers) {
252
0
        _delta_writer_for_tablet = ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create(
253
0
                _load_id, _num_local_sink);
254
0
    } else {
255
0
        _delta_writer_for_tablet = std::make_shared<DeltaWriterV2Map>(_load_id);
256
0
    }
257
0
    _load_stream_map = ExecEnv::GetInstance()->load_stream_map_pool()->get_or_create(
258
0
            _load_id, _backend_id, _stream_per_node, _num_local_sink);
259
0
    return Status::OK();
260
0
}
261
262
0
Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) {
263
0
    RETURN_IF_ERROR(_init(state, profile));
264
0
    LOG(INFO) << "opening olap table sink, load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id
265
0
              << ", sink_id=" << _sender_id;
266
0
    _timeout_watch.start();
267
0
    SCOPED_TIMER(_operator_profile->total_time_counter());
268
0
    SCOPED_TIMER(_open_timer);
269
0
    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
270
271
0
    RETURN_IF_ERROR(_build_tablet_node_mapping());
272
0
    RETURN_IF_ERROR(_open_streams());
273
0
    RETURN_IF_ERROR(_init_row_distribution());
274
275
0
    return Status::OK();
276
0
}
277
278
0
Status VTabletWriterV2::_open_streams() {
279
0
    int fault_injection_skip_be = 0;
280
0
    bool any_backend = false;
281
0
    bool any_success = false;
282
0
    for (auto& [dst_id, _] : _tablets_for_node) {
283
0
        auto streams = _load_stream_map->get_or_create(dst_id);
284
0
        DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_one_backend", {
285
0
            if (fault_injection_skip_be < 1) {
286
0
                fault_injection_skip_be++;
287
0
                continue;
288
0
            }
289
0
        });
290
0
        DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_two_backends", {
291
0
            if (fault_injection_skip_be < 2) {
292
0
                fault_injection_skip_be++;
293
0
                continue;
294
0
            }
295
0
        });
296
0
        auto st = _open_streams_to_backend(dst_id, *streams);
297
0
        any_backend = true;
298
0
        any_success = any_success || st.ok();
299
0
    }
300
0
    if (any_backend && !any_success) {
301
0
        return Status::InternalError("failed to open streams to any BE");
302
0
    }
303
0
    return Status::OK();
304
0
}
305
306
0
Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, LoadStreamStubs& streams) {
307
0
    const auto* node_info = _nodes_info->find_node(dst_id);
308
0
    DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.node_info_null",
309
0
                    { node_info = nullptr; });
310
0
    if (node_info == nullptr) {
311
0
        return Status::InternalError("Unknown node {} in tablet location", dst_id);
312
0
    }
313
0
    auto idle_timeout_ms = _state->execution_timeout() * 1000;
314
0
    std::vector<PTabletID>& tablets_for_schema = _indexes_from_node[node_info->id];
315
0
    DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.no_schema_when_open_streams",
316
0
                    { tablets_for_schema.clear(); });
317
0
    auto st = streams.open(_state->exec_env()->brpc_streaming_client_cache(), *node_info, _txn_id,
318
0
                           *_schema, tablets_for_schema, _total_streams, idle_timeout_ms,
319
0
                           _state->enable_profile());
320
0
    if (!st.ok()) {
321
0
        LOG(WARNING) << "failed to open stream to backend " << dst_id
322
0
                     << ", load_id=" << print_id(_load_id) << ", err=" << st;
323
0
    }
324
0
    return st;
325
0
}
326
327
0
Status VTabletWriterV2::_build_tablet_node_mapping() {
328
0
    std::unordered_set<int64_t> known_indexes;
329
0
    for (const auto& partition : _vpartition->get_partitions()) {
330
0
        for (const auto& index : partition->indexes) {
331
0
            for (const auto& tablet_id : index.tablets) {
332
0
                auto* tablet_location = _location->find_tablet(tablet_id);
333
0
                DBUG_EXECUTE_IF("VTabletWriterV2._build_tablet_node_mapping.tablet_location_null",
334
0
                                { tablet_location = nullptr; });
335
0
                if (tablet_location == nullptr) {
336
0
                    return Status::InternalError("unknown tablet location, tablet id = {}",
337
0
                                                 tablet_id);
338
0
                }
339
0
                for (auto& node : tablet_location->node_ids) {
340
0
                    PTabletID tablet;
341
0
                    tablet.set_partition_id(partition->id);
342
0
                    tablet.set_index_id(index.index_id);
343
0
                    tablet.set_tablet_id(tablet_id);
344
0
                    _tablets_for_node[node].emplace(tablet_id, tablet);
345
0
                    constexpr int64_t DUMMY_TABLET_ID = 0;
346
0
                    if (tablet_id == DUMMY_TABLET_ID) [[unlikely]] {
347
                        // ignore fake tablet for auto partition
348
0
                        continue;
349
0
                    }
350
0
                    _tablets_by_node[node].emplace(tablet_id);
351
0
                    if (known_indexes.contains(index.index_id)) [[likely]] {
352
0
                        continue;
353
0
                    }
354
0
                    _indexes_from_node[node].emplace_back(tablet);
355
0
                    known_indexes.insert(index.index_id);
356
0
                }
357
0
                _build_tablet_replica_info(tablet_id, partition);
358
0
            }
359
0
        }
360
0
    }
361
0
    return Status::OK();
362
0
}
363
364
void VTabletWriterV2::_build_tablet_replica_info(const int64_t tablet_id,
365
0
                                                 VOlapTablePartition* partition) {
366
0
    if (partition != nullptr) {
367
0
        int total_replicas_num =
368
0
                partition->total_replica_num == 0 ? _num_replicas : partition->total_replica_num;
369
0
        int load_required_replicas_num = partition->load_required_replica_num == 0
370
0
                                                 ? (_num_replicas + 1) / 2
371
0
                                                 : partition->load_required_replica_num;
372
0
        _tablet_replica_info[tablet_id] =
373
0
                std::make_pair(total_replicas_num, load_required_replicas_num);
374
        // Copy version gap backends info for this tablet
375
0
        if (auto it = partition->tablet_version_gap_backends.find(tablet_id);
376
0
            it != partition->tablet_version_gap_backends.end()) {
377
0
            _tablet_version_gap_backends[tablet_id] = it->second;
378
0
        }
379
0
    } else {
380
0
        _tablet_replica_info[tablet_id] = std::make_pair(_num_replicas, (_num_replicas + 1) / 2);
381
0
    }
382
0
}
383
384
void VTabletWriterV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>& row_part_tablet_ids,
385
0
                                                RowsForTablet& rows_for_tablet) {
386
0
    for (int index_idx = 0; index_idx < row_part_tablet_ids.size(); index_idx++) {
387
0
        auto& row_ids = row_part_tablet_ids[index_idx].row_ids;
388
0
        auto& partition_ids = row_part_tablet_ids[index_idx].partition_ids;
389
0
        auto& tablet_ids = row_part_tablet_ids[index_idx].tablet_ids;
390
391
0
        for (size_t i = 0; i < row_ids.size(); i++) {
392
0
            auto& tablet_id = tablet_ids[i];
393
0
            auto it = rows_for_tablet.find(tablet_id);
394
0
            if (it == rows_for_tablet.end()) {
395
0
                Rows rows;
396
0
                rows.partition_id = partition_ids[i];
397
0
                rows.index_id = _schema->indexes()[index_idx]->index_id;
398
0
                rows.row_idxes.reserve(row_ids.size());
399
0
                auto [tmp_it, _] = rows_for_tablet.insert({tablet_id, rows});
400
0
                it = tmp_it;
401
0
            }
402
0
            it->second.row_idxes.push_back(row_ids[i]);
403
0
            _number_output_rows++;
404
0
        }
405
0
    }
406
0
}
407
408
Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id,
409
0
                                        std::vector<std::shared_ptr<LoadStreamStub>>& streams) {
410
0
    std::vector<int64_t> failed_node_ids;
411
0
    const auto* location = _location->find_tablet(tablet_id);
412
0
    DBUG_EXECUTE_IF("VTabletWriterV2._select_streams.location_null", { location = nullptr; });
413
0
    if (location == nullptr) {
414
0
        return Status::InternalError("unknown tablet location, tablet id = {}", tablet_id);
415
0
    }
416
0
    for (const auto& node_id : location->node_ids) {
417
0
        PTabletID tablet;
418
0
        tablet.set_partition_id(partition_id);
419
0
        tablet.set_index_id(index_id);
420
0
        tablet.set_tablet_id(tablet_id);
421
0
        VLOG_DEBUG << fmt::format("_select_streams P{} I{} T{}", partition_id, index_id, tablet_id);
422
0
        _tablets_for_node[node_id].emplace(tablet_id, tablet);
423
0
        auto stream = _load_stream_map->at(node_id)->select_one_stream();
424
0
        DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_two_backends", {
425
0
            LOG(INFO) << "[skip_two_backends](detail) tablet_id=" << tablet_id
426
0
                      << ", node_id=" << node_id
427
0
                      << ", stream_ok=" << (stream == nullptr ? "no" : "yes");
428
0
        });
429
0
        if (stream == nullptr) {
430
0
            LOG(WARNING) << "skip writing tablet " << tablet_id << " to backend " << node_id
431
0
                         << ": stream is not open";
432
0
            failed_node_ids.push_back(node_id);
433
0
            continue;
434
0
        }
435
0
        streams.emplace_back(std::move(stream));
436
0
    }
437
0
    DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_two_backends", {
438
0
        LOG(INFO) << "[skip_two_backends](summary) tablet_id=" << tablet_id
439
0
                  << ", num_streams=" << streams.size()
440
0
                  << ", num_nodes=" << location->node_ids.size();
441
0
    });
442
0
    if (streams.size() <= location->node_ids.size() / 2) {
443
0
        std::ostringstream success_msg;
444
0
        std::ostringstream failed_msg;
445
0
        for (auto& s : streams) {
446
0
            success_msg << ", " << s->dst_id();
447
0
        }
448
0
        for (auto id : failed_node_ids) {
449
0
            failed_msg << ", " << id;
450
0
        }
451
0
        LOG(INFO) << "failed to write enough replicas " << streams.size() << "/"
452
0
                  << location->node_ids.size() << " for tablet " << tablet_id
453
0
                  << " due to connection errors; success nodes" << success_msg.str()
454
0
                  << "; failed nodes" << failed_msg.str() << ".";
455
0
        return Status::InternalError(
456
0
                "failed to write enough replicas {}/{} for tablet {} due to connection errors",
457
0
                streams.size(), location->node_ids.size(), tablet_id);
458
0
    }
459
0
    Status st;
460
0
    for (auto& stream : streams) {
461
0
        st = stream->wait_for_schema(partition_id, index_id, tablet_id);
462
0
        if (st.ok()) {
463
0
            break;
464
0
        } else {
465
0
            LOG(WARNING) << "failed to get schema from stream " << stream << ", err=" << st;
466
0
        }
467
0
    }
468
0
    return st;
469
0
}
470
471
0
Status VTabletWriterV2::write(RuntimeState* state, Block& input_block) {
472
0
    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
473
0
    Status status = Status::OK();
474
475
0
    if (_state->query_options().dry_run_query) {
476
0
        return status;
477
0
    }
478
479
0
    int64_t total_wait_time_ms = 0;
480
0
    auto streams_for_node = _load_stream_map->get_streams_for_node();
481
0
    for (const auto& [dst_id, streams] : streams_for_node) {
482
0
        for (const auto& stream : streams->streams()) {
483
0
            auto wait_time_ms = stream->get_and_reset_load_back_pressure_version_wait_time_ms();
484
0
            if (wait_time_ms > 0) {
485
0
                total_wait_time_ms = std::max(total_wait_time_ms, wait_time_ms);
486
0
            }
487
0
        }
488
0
    }
489
0
    if (UNLIKELY(total_wait_time_ms > 0)) {
490
0
        std::this_thread::sleep_for(std::chrono::milliseconds(total_wait_time_ms));
491
0
        _load_back_pressure_version_block_ms.fetch_add(total_wait_time_ms);
492
0
    }
493
494
    // check out of limit
495
0
    RETURN_IF_ERROR(_send_new_partition_batch());
496
497
0
    auto input_rows = input_block.rows();
498
0
    auto input_bytes = input_block.bytes();
499
0
    if (UNLIKELY(input_rows == 0)) {
500
0
        return status;
501
0
    }
502
0
    SCOPED_TIMER(_operator_profile->total_time_counter());
503
0
    _number_input_rows += input_rows;
504
    // update incrementally so that FE can get the progress.
505
    // the real 'num_rows_load_total' will be set when sink being closed.
506
0
    _state->update_num_rows_load_total(input_rows);
507
0
    _state->update_num_bytes_load_total(input_bytes);
508
0
    DorisMetrics::instance()->load_rows->increment(input_rows);
509
0
    DorisMetrics::instance()->load_bytes->increment(input_bytes);
510
511
0
    SCOPED_RAW_TIMER(&_send_data_ns);
512
    // This is just for passing compilation.
513
0
    _row_distribution_watch.start();
514
515
0
    std::shared_ptr<Block> block;
516
0
    RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
517
0
            input_block, block, _row_part_tablet_ids, _number_input_rows));
518
0
    RowsForTablet rows_for_tablet;
519
0
    _generate_rows_for_tablet(_row_part_tablet_ids, rows_for_tablet);
520
521
0
    _row_distribution_watch.stop();
522
523
    // For each tablet, send its input_rows from block to delta writer
524
0
    for (const auto& [tablet_id, rows] : rows_for_tablet) {
525
0
        RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows));
526
0
    }
527
528
0
    COUNTER_SET(_input_rows_counter, _number_input_rows);
529
0
    COUNTER_SET(_output_rows_counter, _number_output_rows);
530
0
    COUNTER_SET(_filtered_rows_counter,
531
0
                _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows());
532
0
    COUNTER_SET(_send_data_timer, _send_data_ns);
533
0
    COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time());
534
0
    COUNTER_SET(_validate_data_timer, _block_convertor->validate_data_ns());
535
536
0
    return Status::OK();
537
0
}
538
539
Status VTabletWriterV2::_write_memtable(std::shared_ptr<Block> block, int64_t tablet_id,
540
0
                                        const Rows& rows) {
541
0
    auto st = Status::OK();
542
0
    auto delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id, [&]() {
543
0
        std::vector<std::shared_ptr<LoadStreamStub>> streams;
544
0
        st = _select_streams(tablet_id, rows.partition_id, rows.index_id, streams);
545
0
        if (!st.ok()) [[unlikely]] {
546
0
            LOG(WARNING) << "select stream failed, " << st << ", load_id=" << print_id(_load_id);
547
0
            return std::unique_ptr<DeltaWriterV2>(nullptr);
548
0
        }
549
0
        WriteRequest req {
550
0
                .tablet_id = tablet_id,
551
0
                .txn_id = _txn_id,
552
0
                .index_id = rows.index_id,
553
0
                .partition_id = rows.partition_id,
554
0
                .load_id = _load_id,
555
0
                .tuple_desc = _schema->tuple_desc(),
556
0
                .table_schema_param = _schema,
557
0
                .is_high_priority = _is_high_priority,
558
0
                .write_file_cache = _write_file_cache,
559
0
                .storage_vault_id {},
560
0
        };
561
0
        bool index_not_found = true;
562
0
        for (const auto& index : _schema->indexes()) {
563
0
            if (index->index_id == rows.index_id) {
564
0
                req.slots = &index->slots;
565
0
                req.schema_hash = index->schema_hash;
566
0
                index_not_found = false;
567
0
                break;
568
0
            }
569
0
        }
570
0
        DBUG_EXECUTE_IF("VTabletWriterV2._write_memtable.index_not_found",
571
0
                        { index_not_found = true; });
572
0
        if (index_not_found) [[unlikely]] {
573
0
            st = Status::InternalError("no index {} in schema", rows.index_id);
574
0
            LOG(WARNING) << "index " << rows.index_id
575
0
                         << " not found in schema, load_id=" << print_id(_load_id);
576
0
            return std::unique_ptr<DeltaWriterV2>(nullptr);
577
0
        }
578
0
        return DeltaWriterV2::create_unique(&req, streams, _state);
579
0
    });
580
0
    if (delta_writer == nullptr) {
581
0
        LOG(WARNING) << "failed to open DeltaWriter for tablet " << tablet_id
582
0
                     << ", load_id=" << print_id(_load_id) << ", err: " << st;
583
0
        return Status::InternalError("failed to open DeltaWriter {}: {}", tablet_id, st.msg());
584
0
    }
585
0
    {
586
0
        SCOPED_TIMER(_wait_mem_limit_timer);
587
0
        ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush(
588
0
                [state = _state]() { return state->is_cancelled(); });
589
0
        if (_state->is_cancelled()) {
590
0
            return _state->cancel_reason();
591
0
        }
592
0
    }
593
0
    SCOPED_TIMER(_write_memtable_timer);
594
0
    st = delta_writer->write(block.get(), rows.row_idxes);
595
0
    return st;
596
0
}
597
598
0
void VTabletWriterV2::_cancel(Status status) {
599
0
    LOG(INFO) << "canceled olap table sink. load_id=" << print_id(_load_id)
600
0
              << ", txn_id=" << _txn_id << ", sink_id=" << _sender_id
601
0
              << ", due to error: " << status;
602
0
    if (_delta_writer_for_tablet) {
603
0
        _delta_writer_for_tablet->cancel(status);
604
0
        _delta_writer_for_tablet.reset();
605
0
    }
606
0
    if (_load_stream_map) {
607
0
        _load_stream_map->for_each(
608
0
                [status](int64_t dst_id, LoadStreamStubs& streams) { streams.cancel(status); });
609
0
        _load_stream_map->release();
610
0
    }
611
0
}
612
613
0
Status VTabletWriterV2::_send_new_partition_batch() {
614
0
    if (_row_distribution.need_deal_batching()) { // maybe try_close more than 1 time
615
0
        RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
616
617
0
        Block tmp_block = _row_distribution._batching_block->to_block(); // Borrow out, for lval ref
618
619
        // these order is unique.
620
        //  1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block.
621
        //  2. deal batched block
622
        //  3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that.
623
0
        _row_distribution.clear_batching_stats();
624
0
        RETURN_IF_ERROR(this->write(_state, tmp_block));
625
0
        _row_distribution._batching_block->set_mutable_columns(
626
0
                tmp_block.mutate_columns()); // Recovery back
627
0
        _row_distribution._batching_block->clear_column_data();
628
0
        _row_distribution._deal_batched = false;
629
0
    }
630
0
    return Status::OK();
631
0
}
632
633
0
Status VTabletWriterV2::close(Status exec_status) {
634
0
    std::lock_guard<std::mutex> close_lock(_close_mutex);
635
0
    if (_is_closed) {
636
0
        return _close_status;
637
0
    }
638
0
    LOG(INFO) << "closing olap table sink, load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id
639
0
              << ", sink_id=" << _sender_id << ", status=" << exec_status.to_string();
640
0
    SCOPED_TIMER(_close_timer);
641
0
    Status status = exec_status;
642
643
0
    if (status.ok()) {
644
0
        SCOPED_TIMER(_operator_profile->total_time_counter());
645
0
        _row_distribution._deal_batched = true;
646
0
        status = _send_new_partition_batch();
647
0
    }
648
649
0
    DBUG_EXECUTE_IF("VTabletWriterV2.close.sleep", {
650
0
        auto sleep_sec = DebugPoints::instance()->get_debug_param_or_default<int32_t>(
651
0
                "VTabletWriterV2.close.sleep", "sleep_sec", 1);
652
0
        std::this_thread::sleep_for(std::chrono::seconds(sleep_sec));
653
0
    });
654
0
    DBUG_EXECUTE_IF("VTabletWriterV2.close.cancel",
655
0
                    { status = Status::InternalError("load cancel"); });
656
0
    if (status.ok()) {
657
        // only if status is ok can we call this _profile->total_time_counter().
658
        // if status is not ok, this sink may not be prepared, so that _profile is null
659
0
        SCOPED_TIMER(_operator_profile->total_time_counter());
660
661
0
        COUNTER_SET(_input_rows_counter, _number_input_rows);
662
0
        COUNTER_SET(_output_rows_counter, _number_output_rows);
663
0
        COUNTER_SET(_filtered_rows_counter,
664
0
                    _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows());
665
0
        COUNTER_SET(_send_data_timer, _send_data_ns);
666
0
        COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time());
667
0
        COUNTER_SET(_validate_data_timer, _block_convertor->validate_data_ns());
668
0
        auto back_pressure_time_ms = _load_back_pressure_version_block_ms.load();
669
0
        COUNTER_SET(_load_back_pressure_version_time_ms, back_pressure_time_ms);
670
0
        g_sink_load_back_pressure_version_time_ms << back_pressure_time_ms;
671
672
        // close DeltaWriters
673
0
        {
674
0
            std::unordered_map<int64_t, int32_t> segments_for_tablet;
675
0
            SCOPED_TIMER(_close_writer_timer);
676
            // close all delta writers if this is the last user
677
0
            auto st = _delta_writer_for_tablet->close(segments_for_tablet, _operator_profile);
678
0
            _delta_writer_for_tablet.reset();
679
0
            if (!st.ok()) {
680
0
                _cancel(st);
681
0
                return st;
682
0
            }
683
            // only the last sink closing delta writers will have segment num
684
0
            if (!segments_for_tablet.empty()) {
685
0
                _load_stream_map->save_segments_for_tablet(segments_for_tablet);
686
0
            }
687
0
        }
688
689
0
        _calc_tablets_to_commit();
690
0
        const bool is_last_sink = _load_stream_map->release();
691
0
        LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" << is_last_sink
692
0
                  << ", load_id=" << print_id(_load_id);
693
694
        // send CLOSE_LOAD on all non-incremental streams if this is the last sink
695
0
        if (is_last_sink) {
696
0
            _load_stream_map->close_load(false);
697
0
        }
698
699
        // close_wait on all non-incremental streams, even if this is not the last sink.
700
        // because some per-instance data structures are now shared among all sinks
701
        // due to sharing delta writers and load stream stubs.
702
        // Do not need to wait after quorum success,
703
        // for first-stage close_wait only ensure incremental streams load has been completed,
704
        // unified waiting in the second-stage close_wait.
705
0
        RETURN_IF_ERROR(_close_wait(_non_incremental_streams(), false));
706
707
        // send CLOSE_LOAD on all incremental streams if this is the last sink.
708
        // this must happen after all non-incremental streams are closed,
709
        // so we can ensure all sinks are in close phase before closing incremental streams.
710
0
        if (is_last_sink) {
711
0
            _load_stream_map->close_load(true);
712
0
        }
713
714
        // close_wait on all incremental streams, even if this is not the last sink.
715
0
        RETURN_IF_ERROR(_close_wait(_all_streams(), true));
716
717
        // calculate and submit commit info
718
0
        if (is_last_sink) {
719
0
            DBUG_EXECUTE_IF("VTabletWriterV2.close.add_failed_tablet", {
720
0
                auto streams = _load_stream_map->at(_tablets_for_node.begin()->first);
721
0
                int64_t tablet_id = -1;
722
0
                for (auto tablet : streams->success_tablets()) {
723
0
                    tablet_id = tablet;
724
0
                    break;
725
0
                }
726
0
                if (tablet_id != -1) {
727
0
                    LOG(INFO) << "fault injection: adding failed tablet_id: " << tablet_id;
728
0
                    streams->select_one_stream()->add_failed_tablet(
729
0
                            tablet_id, Status::InternalError("fault injection"));
730
0
                } else {
731
0
                    LOG(INFO) << "fault injection: failed to inject failed tablet_id";
732
0
                }
733
0
            });
734
735
0
            std::vector<TTabletCommitInfo> tablet_commit_infos;
736
0
            RETURN_IF_ERROR(_create_commit_info(tablet_commit_infos, _load_stream_map));
737
0
            _state->add_tablet_commit_infos(tablet_commit_infos);
738
0
        }
739
740
        // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node
741
0
        int64_t num_rows_load_total = _number_input_rows + _state->num_rows_load_filtered() +
742
0
                                      _state->num_rows_load_unselected();
743
0
        _state->set_num_rows_load_total(num_rows_load_total);
744
0
        _state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() +
745
0
                                              _tablet_finder->num_filtered_rows());
746
0
        _state->update_num_rows_load_unselected(
747
0
                _tablet_finder->num_immutable_partition_filtered_rows());
748
749
0
        if (_state->enable_profile() && _state->profile_level() >= 2) {
750
            // Output detailed profiling info for auto-partition requests
751
0
            _row_distribution.output_profile_info(_operator_profile);
752
0
        }
753
754
0
        LOG(INFO) << "finished to close olap table sink. load_id=" << print_id(_load_id)
755
0
                  << ", txn_id=" << _txn_id;
756
0
    } else {
757
0
        _cancel(status);
758
0
    }
759
760
0
    _is_closed = true;
761
0
    _close_status = status;
762
0
    return status;
763
0
}
764
765
0
std::unordered_set<std::shared_ptr<LoadStreamStub>> VTabletWriterV2::_all_streams() {
766
0
    std::unordered_set<std::shared_ptr<LoadStreamStub>> all_streams;
767
0
    auto streams_for_node = _load_stream_map->get_streams_for_node();
768
0
    for (const auto& [dst_id, streams] : streams_for_node) {
769
0
        for (const auto& stream : streams->streams()) {
770
0
            all_streams.insert(stream);
771
0
        }
772
0
    }
773
0
    return all_streams;
774
0
}
775
776
0
std::unordered_set<std::shared_ptr<LoadStreamStub>> VTabletWriterV2::_non_incremental_streams() {
777
0
    std::unordered_set<std::shared_ptr<LoadStreamStub>> non_incremental_streams;
778
0
    auto streams_for_node = _load_stream_map->get_streams_for_node();
779
0
    for (const auto& [dst_id, streams] : streams_for_node) {
780
0
        for (const auto& stream : streams->streams()) {
781
0
            if (!stream->is_incremental()) {
782
0
                non_incremental_streams.insert(stream);
783
0
            }
784
0
        }
785
0
    }
786
0
    return non_incremental_streams;
787
0
}
788
789
Status VTabletWriterV2::_close_wait(
790
        std::unordered_set<std::shared_ptr<LoadStreamStub>> unfinished_streams,
791
0
        bool need_wait_after_quorum_success) {
792
0
    SCOPED_TIMER(_close_load_timer);
793
0
    Status status;
794
0
    auto streams_for_node = _load_stream_map->get_streams_for_node();
795
    // 1. first wait for quorum success
796
0
    std::unordered_set<int64_t> need_finish_tablets;
797
0
    auto partition_ids = _tablet_finder->partition_ids();
798
0
    for (const auto& part : _vpartition->get_partitions()) {
799
0
        if (partition_ids.contains(part->id)) {
800
0
            for (const auto& index : part->indexes) {
801
0
                for (const auto& tablet_id : index.tablets) {
802
0
                    need_finish_tablets.insert(tablet_id);
803
0
                }
804
0
            }
805
0
        }
806
0
    }
807
0
    while (true) {
808
0
        RETURN_IF_ERROR(_check_timeout());
809
0
        RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status, streams_for_node));
810
0
        bool quorum_success = _quorum_success(unfinished_streams, need_finish_tablets);
811
0
        if (quorum_success || unfinished_streams.empty()) {
812
0
            LOG(INFO) << "quorum_success: " << quorum_success
813
0
                      << ", is all finished: " << unfinished_streams.empty()
814
0
                      << ", txn_id: " << _txn_id << ", load_id: " << print_id(_load_id);
815
0
            break;
816
0
        }
817
0
        bthread_usleep(1000 * 10);
818
0
    }
819
820
    // 2. then wait for remaining streams as much as possible
821
0
    if (!unfinished_streams.empty() && need_wait_after_quorum_success) {
822
0
        int64_t arrival_quorum_success_time = UnixMillis();
823
0
        int64_t max_wait_time_ms = _calc_max_wait_time_ms(streams_for_node, unfinished_streams);
824
0
        while (true) {
825
0
            RETURN_IF_ERROR(_check_timeout());
826
0
            RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status, streams_for_node));
827
0
            if (unfinished_streams.empty()) {
828
0
                break;
829
0
            }
830
0
            int64_t elapsed_ms = UnixMillis() - arrival_quorum_success_time;
831
0
            if (elapsed_ms > max_wait_time_ms ||
832
0
                _state->execution_timeout() - elapsed_ms / 1000 <
833
0
                        config::quorum_success_remaining_timeout_seconds) {
834
0
                std::stringstream unfinished_streams_str;
835
0
                for (const auto& stream : unfinished_streams) {
836
0
                    unfinished_streams_str << stream->stream_id() << ",";
837
0
                }
838
0
                LOG(WARNING) << "reach max wait time, max_wait_time_ms: " << max_wait_time_ms
839
0
                             << ", load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id
840
0
                             << ", unfinished streams: " << unfinished_streams_str.str();
841
0
                break;
842
0
            }
843
0
            bthread_usleep(1000 * 10);
844
0
        }
845
0
    }
846
847
0
    if (!status.ok()) {
848
0
        LOG(WARNING) << "close_wait failed: " << status << ", load_id=" << print_id(_load_id);
849
0
    }
850
0
    return status;
851
0
}
852
853
bool VTabletWriterV2::_quorum_success(
854
        const std::unordered_set<std::shared_ptr<LoadStreamStub>>& unfinished_streams,
855
0
        const std::unordered_set<int64_t>& need_finish_tablets) {
856
0
    if (!config::enable_quorum_success_write) {
857
0
        return false;
858
0
    }
859
0
    auto streams_for_node = _load_stream_map->get_streams_for_node();
860
0
    if (need_finish_tablets.empty()) [[unlikely]] {
861
0
        return false;
862
0
    }
863
864
    // 1. calculate finished tablets replica num
865
0
    std::unordered_set<int64_t> finished_dst_ids;
866
0
    std::unordered_map<int64_t, int64_t> finished_tablets_replica;
867
0
    for (const auto& [dst_id, streams] : streams_for_node) {
868
0
        bool finished = true;
869
0
        for (const auto& stream : streams->streams()) {
870
0
            if (unfinished_streams.contains(stream) || !stream->check_cancel().ok()) {
871
0
                finished = false;
872
0
                break;
873
0
            }
874
0
        }
875
0
        if (finished) {
876
0
            finished_dst_ids.insert(dst_id);
877
0
        }
878
0
    }
879
0
    for (const auto& [dst_id, _] : streams_for_node) {
880
0
        if (!finished_dst_ids.contains(dst_id)) {
881
0
            continue;
882
0
        }
883
0
        for (const auto& tablet_id : _tablets_by_node[dst_id]) {
884
            // Only count non-gap backends for quorum success.
885
            // Gap backends' success doesn't count toward majority write.
886
0
            auto gap_it = _tablet_version_gap_backends.find(tablet_id);
887
0
            if (gap_it == _tablet_version_gap_backends.end() ||
888
0
                gap_it->second.find(dst_id) == gap_it->second.end()) {
889
0
                finished_tablets_replica[tablet_id]++;
890
0
            }
891
0
        }
892
0
    }
893
894
    // 2. check if quorum success
895
0
    for (const auto& tablet_id : need_finish_tablets) {
896
0
        if (finished_tablets_replica[tablet_id] < _load_required_replicas_num(tablet_id)) {
897
0
            return false;
898
0
        }
899
0
    }
900
0
    return true;
901
0
}
902
903
10
int VTabletWriterV2::_load_required_replicas_num(int64_t tablet_id) {
904
10
    auto [total_replicas_num, load_required_replicas_num] = _tablet_replica_info[tablet_id];
905
10
    if (total_replicas_num == 0) {
906
0
        return (_num_replicas + 1) / 2;
907
0
    }
908
10
    return load_required_replicas_num;
909
10
}
910
911
int64_t VTabletWriterV2::_calc_max_wait_time_ms(
912
        const std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>>& streams_for_node,
913
0
        const std::unordered_set<std::shared_ptr<LoadStreamStub>>& unfinished_streams) {
914
    // 1. calculate avg speed of all unfinished streams
915
0
    int64_t elapsed_ms = _timeout_watch.elapsed_time() / 1000 / 1000;
916
0
    int64_t total_bytes = 0;
917
0
    int finished_count = 0;
918
0
    for (const auto& [dst_id, streams] : streams_for_node) {
919
0
        for (const auto& stream : streams->streams()) {
920
0
            if (unfinished_streams.contains(stream) || !stream->check_cancel().ok()) {
921
0
                continue;
922
0
            }
923
0
            total_bytes += stream->bytes_written();
924
0
            finished_count++;
925
0
        }
926
0
    }
927
    // no data loaded in index channel, return 0
928
0
    if (total_bytes == 0 || finished_count == 0) {
929
0
        return 0;
930
0
    }
931
    // if elapsed_ms is equal to 0, explain the loaded data is too small
932
0
    if (elapsed_ms <= 0) {
933
0
        return config::quorum_success_min_wait_seconds * 1000;
934
0
    }
935
0
    double avg_speed =
936
0
            static_cast<double>(total_bytes) / (static_cast<double>(elapsed_ms) * finished_count);
937
938
    // 2. calculate max wait time of each unfinished stream and return the max value
939
0
    int64_t max_wait_time_ms = 0;
940
0
    for (const auto& [dst_id, streams] : streams_for_node) {
941
0
        for (const auto& stream : streams->streams()) {
942
0
            if (unfinished_streams.contains(stream)) {
943
0
                int64_t bytes = stream->bytes_written();
944
0
                int64_t wait =
945
0
                        avg_speed > 0 ? static_cast<int64_t>(static_cast<double>(bytes) / avg_speed)
946
0
                                      : 0;
947
0
                max_wait_time_ms = std::max(max_wait_time_ms, wait);
948
0
            }
949
0
        }
950
0
    }
951
952
    // 3. calculate max wait time
953
    // introduce quorum_success_min_wait_time_ms to avoid jitter of small load
954
0
    max_wait_time_ms -= UnixMillis() - _timeout_watch.elapsed_time() / 1000 / 1000;
955
0
    max_wait_time_ms =
956
0
            std::max(static_cast<int64_t>(static_cast<double>(max_wait_time_ms) *
957
0
                                          (1.0 + config::quorum_success_max_wait_multiplier)),
958
0
                     config::quorum_success_min_wait_seconds * 1000);
959
960
0
    return max_wait_time_ms;
961
0
}
962
963
0
Status VTabletWriterV2::_check_timeout() {
964
0
    int64_t remain_ms = static_cast<int64_t>(_state->execution_timeout()) * 1000 -
965
0
                        _timeout_watch.elapsed_time() / 1000 / 1000;
966
0
    DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { remain_ms = 0; });
967
0
    if (remain_ms <= 0) {
968
0
        LOG(WARNING) << "load timed out before close waiting, load_id=" << print_id(_load_id);
969
0
        return Status::TimedOut("load timed out before close waiting");
970
0
    }
971
0
    return Status::OK();
972
0
}
973
974
Status VTabletWriterV2::_check_streams_finish(
975
        std::unordered_set<std::shared_ptr<LoadStreamStub>>& unfinished_streams, Status& status,
976
0
        const std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>>& streams_for_node) {
977
0
    for (const auto& [dst_id, streams] : streams_for_node) {
978
0
        for (const auto& stream : streams->streams()) {
979
0
            if (!unfinished_streams.contains(stream)) {
980
0
                continue;
981
0
            }
982
0
            bool is_closed = false;
983
0
            auto stream_st = stream->close_finish_check(_state, &is_closed);
984
0
            DBUG_EXECUTE_IF("VTabletWriterV2._check_streams_finish.close_stream_failed",
985
0
                            { stream_st = Status::InternalError("close stream failed"); });
986
0
            if (!stream_st.ok()) {
987
0
                status = stream_st;
988
0
                unfinished_streams.erase(stream);
989
0
                LOG(WARNING) << "close_wait failed: " << stream_st
990
0
                             << ", load_id=" << print_id(_load_id);
991
0
            }
992
0
            if (is_closed) {
993
0
                unfinished_streams.erase(stream);
994
0
            }
995
0
        }
996
0
    }
997
0
    return status;
998
0
}
999
1000
0
void VTabletWriterV2::_calc_tablets_to_commit() {
1001
0
    LOG(INFO) << "saving close load info, load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id
1002
0
              << ", sink_id=" << _sender_id;
1003
0
    for (const auto& [dst_id, tablets] : _tablets_for_node) {
1004
0
        std::vector<PTabletID> tablets_to_commit;
1005
0
        std::vector<int64_t> partition_ids;
1006
0
        for (const auto& [tablet_id, tablet] : tablets) {
1007
0
            if (_tablet_finder->partition_ids().contains(tablet.partition_id())) {
1008
0
                if (VLOG_DEBUG_IS_ON) {
1009
0
                    partition_ids.push_back(tablet.partition_id());
1010
0
                }
1011
0
                PTabletID t(tablet);
1012
0
                tablets_to_commit.push_back(t);
1013
0
            }
1014
0
        }
1015
0
        if (VLOG_DEBUG_IS_ON) {
1016
0
            std::string msg("close load partitions: ");
1017
0
            msg.reserve(partition_ids.size() * 7);
1018
0
            for (auto v : partition_ids) {
1019
0
                msg.append(std::to_string(v) + ", ");
1020
0
            }
1021
0
            LOG(WARNING) << msg;
1022
0
        }
1023
0
        _load_stream_map->save_tablets_to_commit(dst_id, tablets_to_commit);
1024
0
    }
1025
0
}
1026
1027
Status VTabletWriterV2::_create_commit_info(std::vector<TTabletCommitInfo>& tablet_commit_infos,
1028
13
                                            std::shared_ptr<LoadStreamMap> load_stream_map) {
1029
    // Track per-tablet non-gap success count and failure reasons
1030
13
    std::unordered_map<int64_t, int> success_tablets_replica;
1031
13
    std::unordered_set<int64_t> failed_tablets;
1032
13
    std::unordered_map<int64_t, Status> failed_reason;
1033
33
    load_stream_map->for_each([&](int64_t dst_id, LoadStreamStubs& streams) {
1034
33
        size_t num_success_tablets = 0;
1035
33
        size_t num_failed_tablets = 0;
1036
33
        for (auto [tablet_id, reason] : streams.failed_tablets()) {
1037
12
            failed_tablets.insert(tablet_id);
1038
12
            failed_reason[tablet_id] = reason;
1039
12
            num_failed_tablets++;
1040
12
        }
1041
50
        for (auto tablet_id : streams.success_tablets()) {
1042
50
            TTabletCommitInfo commit_info;
1043
50
            commit_info.tabletId = tablet_id;
1044
50
            commit_info.backendId = dst_id;
1045
50
            tablet_commit_infos.emplace_back(std::move(commit_info));
1046
            // Only count non-gap backends toward success
1047
50
            auto gap_it = _tablet_version_gap_backends.find(tablet_id);
1048
50
            if (gap_it == _tablet_version_gap_backends.end() ||
1049
50
                gap_it->second.find(dst_id) == gap_it->second.end()) {
1050
50
                success_tablets_replica[tablet_id]++;
1051
50
            }
1052
50
            num_success_tablets++;
1053
50
        }
1054
33
        LOG(INFO) << "streams to dst_id: " << dst_id << ", success tablets: " << num_success_tablets
1055
33
                  << ", failed tablets: " << num_failed_tablets;
1056
33
    });
1057
1058
13
    for (auto tablet_id : failed_tablets) {
1059
10
        int succ_count = success_tablets_replica[tablet_id];
1060
10
        int required = _load_required_replicas_num(tablet_id);
1061
10
        if (succ_count < required) {
1062
4
            LOG(INFO) << "tablet " << tablet_id
1063
4
                      << " failed on majority backends (success=" << succ_count
1064
4
                      << ", required=" << required << "): " << failed_reason[tablet_id];
1065
4
            return Status::InternalError("tablet {} failed on majority backends: {}", tablet_id,
1066
4
                                         failed_reason[tablet_id]);
1067
4
        }
1068
10
    }
1069
9
    return Status::OK();
1070
13
}
1071
1072
} // namespace doris