be/src/exec/sink/writer/vtablet_writer_v2.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/sink/writer/vtablet_writer_v2.h" |
19 | | |
20 | | #include <brpc/uri.h> |
21 | | #include <gen_cpp/DataSinks_types.h> |
22 | | #include <gen_cpp/Descriptors_types.h> |
23 | | #include <gen_cpp/Metrics_types.h> |
24 | | #include <gen_cpp/Types_types.h> |
25 | | #include <gen_cpp/internal_service.pb.h> |
26 | | |
27 | | #include <cstdint> |
28 | | #include <mutex> |
29 | | #include <ranges> |
30 | | #include <string> |
31 | | #include <unordered_map> |
32 | | |
33 | | #include "common/compiler_util.h" // IWYU pragma: keep |
34 | | #include "common/logging.h" |
35 | | #include "common/metrics/doris_metrics.h" |
36 | | #include "common/object_pool.h" |
37 | | #include "common/signal_handler.h" |
38 | | #include "common/status.h" |
39 | | #include "core/block/block.h" |
40 | | #include "exec/sink/delta_writer_v2_pool.h" |
41 | | #include "load/delta_writer/delta_writer_v2.h" |
42 | | #include "runtime/descriptors.h" |
43 | | #include "runtime/exec_env.h" |
44 | | #include "runtime/runtime_profile.h" |
45 | | #include "runtime/runtime_state.h" |
46 | | #include "runtime/thread_context.h" |
47 | | #include "storage/tablet_info.h" |
48 | | #include "util/debug_points.h" |
49 | | #include "util/defer_op.h" |
50 | | #include "util/uid_util.h" |
51 | | // NOLINTNEXTLINE(unused-includes) |
52 | | #include "exec/sink/load_stream_map_pool.h" |
53 | | #include "exec/sink/load_stream_stub.h" // IWYU pragma: keep |
54 | | #include "exec/sink/vtablet_block_convertor.h" |
55 | | #include "exec/sink/vtablet_finder.h" |
56 | | |
57 | | namespace doris { |
58 | | #include "common/compile_check_begin.h" |
59 | | |
60 | | extern bvar::Adder<int64_t> g_sink_load_back_pressure_version_time_ms; |
61 | | |
62 | | VTabletWriterV2::VTabletWriterV2(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs, |
63 | | std::shared_ptr<Dependency> dep, |
64 | | std::shared_ptr<Dependency> fin_dep) |
65 | 13 | : AsyncResultWriter(output_exprs, dep, fin_dep), _t_sink(t_sink) { |
66 | 13 | DCHECK(t_sink.__isset.olap_table_sink); |
67 | 13 | } |
68 | | |
69 | 13 | VTabletWriterV2::~VTabletWriterV2() = default; |
70 | | |
71 | 0 | Status VTabletWriterV2::on_partitions_created(TCreatePartitionResult* result) { |
72 | | // add new tablet locations. it will use by address. so add to pool |
73 | 0 | auto* new_locations = _pool->add(new std::vector<TTabletLocation>(result->tablets)); |
74 | 0 | _location->add_locations(*new_locations); |
75 | | |
76 | | // update new node info |
77 | 0 | _nodes_info->add_nodes(result->nodes); |
78 | | |
79 | | // incremental open stream |
80 | 0 | RETURN_IF_ERROR(_incremental_open_streams(result->partitions)); |
81 | | |
82 | 0 | return Status::OK(); |
83 | 0 | } |
84 | | |
85 | 0 | static Status on_partitions_created(void* writer, TCreatePartitionResult* result) { |
86 | 0 | return static_cast<VTabletWriterV2*>(writer)->on_partitions_created(result); |
87 | 0 | } |
88 | | |
89 | | Status VTabletWriterV2::_incremental_open_streams( |
90 | 0 | const std::vector<TOlapTablePartition>& partitions) { |
91 | | // do what we did in prepare() for partitions. indexes which don't change when we create new partition is orthogonal to partitions. |
92 | 0 | std::unordered_set<int64_t> known_indexes; |
93 | 0 | std::unordered_set<int64_t> new_backends; |
94 | 0 | for (const auto& t_partition : partitions) { |
95 | 0 | VOlapTablePartition* partition = nullptr; |
96 | 0 | RETURN_IF_ERROR(_vpartition->generate_partition_from(t_partition, partition)); |
97 | 0 | for (const auto& index : partition->indexes) { |
98 | 0 | for (const auto& tablet_id : index.tablets) { |
99 | 0 | auto nodes = _location->find_tablet(tablet_id)->node_ids; |
100 | 0 | for (auto& node : nodes) { |
101 | 0 | PTabletID tablet; |
102 | 0 | tablet.set_partition_id(partition->id); |
103 | 0 | tablet.set_index_id(index.index_id); |
104 | 0 | tablet.set_tablet_id(tablet_id); |
105 | 0 | new_backends.insert(node); |
106 | 0 | _tablets_for_node[node].emplace(tablet_id, tablet); |
107 | 0 | _tablets_by_node[node].emplace(tablet_id); |
108 | 0 | if (known_indexes.contains(index.index_id)) [[likely]] { |
109 | 0 | continue; |
110 | 0 | } |
111 | 0 | _indexes_from_node[node].emplace_back(tablet); |
112 | 0 | known_indexes.insert(index.index_id); |
113 | 0 | VLOG_DEBUG << "incremental open stream (" << partition->id << ", " << tablet_id |
114 | 0 | << ")"; |
115 | 0 | } |
116 | 0 | _build_tablet_replica_info(tablet_id, partition); |
117 | 0 | } |
118 | 0 | } |
119 | 0 | } |
120 | 0 | for (int64_t dst_id : new_backends) { |
121 | 0 | auto streams = _load_stream_map->get_or_create(dst_id, true); |
122 | 0 | RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams)); |
123 | 0 | } |
124 | 0 | return Status::OK(); |
125 | 0 | } |
126 | | |
127 | 0 | Status VTabletWriterV2::_init_row_distribution() { |
128 | 0 | _row_distribution.init({.state = _state, |
129 | 0 | .block_convertor = _block_convertor.get(), |
130 | 0 | .tablet_finder = _tablet_finder.get(), |
131 | 0 | .vpartition = _vpartition, |
132 | 0 | .add_partition_request_timer = _add_partition_request_timer, |
133 | 0 | .txn_id = _txn_id, |
134 | 0 | .pool = _pool, |
135 | 0 | .location = _location, |
136 | 0 | .vec_output_expr_ctxs = &_vec_output_expr_ctxs, |
137 | 0 | .schema = _schema, |
138 | 0 | .caller = (void*)this, |
139 | 0 | .create_partition_callback = &::doris::on_partitions_created}); |
140 | |
|
141 | 0 | return _row_distribution.open(_output_row_desc); |
142 | 0 | } |
143 | | |
144 | 0 | Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) { |
145 | 0 | _pool = state->obj_pool(); |
146 | 0 | auto& table_sink = _t_sink.olap_table_sink; |
147 | 0 | _load_id.set_hi(table_sink.load_id.hi); |
148 | 0 | _load_id.set_lo(table_sink.load_id.lo); |
149 | 0 | signal::set_signal_task_id(_load_id); |
150 | 0 | _txn_id = table_sink.txn_id; |
151 | 0 | _num_replicas = table_sink.num_replicas; |
152 | 0 | _tuple_desc_id = table_sink.tuple_id; |
153 | 0 | _write_file_cache = table_sink.write_file_cache; |
154 | 0 | _schema.reset(new OlapTableSchemaParam()); |
155 | 0 | RETURN_IF_ERROR(_schema->init(table_sink.schema)); |
156 | 0 | _schema->set_timestamp_ms(state->timestamp_ms()); |
157 | 0 | _schema->set_nano_seconds(state->nano_seconds()); |
158 | 0 | _schema->set_timezone(state->timezone()); |
159 | 0 | _location = _pool->add(new OlapTableLocationParam(table_sink.location)); |
160 | 0 | _nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info)); |
161 | | |
162 | | // if distributed column list is empty, we can ensure that tablet is with random distribution info |
163 | | // and if load_to_single_tablet is set and set to true, we should find only one tablet in one partition |
164 | | // for the whole olap table sink |
165 | 0 | auto find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW; |
166 | 0 | if (table_sink.partition.distributed_columns.empty()) { |
167 | 0 | if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) { |
168 | 0 | find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_SINK; |
169 | 0 | } else { |
170 | 0 | find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_BATCH; |
171 | 0 | } |
172 | 0 | } |
173 | 0 | _vpartition = _pool->add(new doris::VOlapTablePartitionParam(_schema, table_sink.partition)); |
174 | 0 | _tablet_finder = std::make_unique<OlapTabletFinder>(_vpartition, find_tablet_mode); |
175 | 0 | RETURN_IF_ERROR(_vpartition->init()); |
176 | | |
177 | 0 | _state = state; |
178 | 0 | _operator_profile = profile; |
179 | |
|
180 | 0 | _sender_id = state->per_fragment_instance_idx(); |
181 | 0 | _num_senders = state->num_per_fragment_instances(); |
182 | 0 | _backend_id = state->backend_id(); |
183 | 0 | _stream_per_node = state->load_stream_per_node(); |
184 | 0 | _total_streams = state->total_load_streams(); |
185 | 0 | _num_local_sink = state->num_local_sink(); |
186 | 0 | LOG(INFO) << "init olap tablet sink, load_id: " << print_id(_load_id) |
187 | 0 | << ", num senders: " << _num_senders << ", stream per node: " << _stream_per_node |
188 | 0 | << ", total_streams " << _total_streams << ", num_local_sink: " << _num_local_sink; |
189 | 0 | DCHECK(_stream_per_node > 0) << "load stream per node should be greator than 0"; |
190 | 0 | DCHECK(_total_streams > 0) << "total load streams should be greator than 0"; |
191 | 0 | DCHECK(_num_local_sink > 0) << "num local sink should be greator than 0"; |
192 | 0 | _is_high_priority = |
193 | 0 | (state->execution_timeout() <= config::load_task_high_priority_threshold_second); |
194 | 0 | DBUG_EXECUTE_IF("VTabletWriterV2._init.is_high_priority", { _is_high_priority = true; }); |
195 | 0 | _mem_tracker = |
196 | 0 | std::make_shared<MemTracker>("VTabletWriterV2:" + std::to_string(state->load_job_id())); |
197 | 0 | SCOPED_TIMER(_operator_profile->total_time_counter()); |
198 | 0 | SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); |
199 | | |
200 | | // get table's tuple descriptor |
201 | 0 | _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_desc_id); |
202 | 0 | DBUG_EXECUTE_IF("VTabletWriterV2._init._output_tuple_desc_null", |
203 | 0 | { _output_tuple_desc = nullptr; }); |
204 | 0 | if (_output_tuple_desc == nullptr) { |
205 | 0 | return Status::InternalError("unknown destination tuple descriptor, id = {}", |
206 | 0 | _tuple_desc_id); |
207 | 0 | } |
208 | 0 | auto output_tuple_desc_slots_size = _output_tuple_desc->slots().size(); |
209 | 0 | DBUG_EXECUTE_IF("VTabletWriterV2._init._vec_output_expr_ctxs_not_equal_output_tuple_slot", |
210 | 0 | { output_tuple_desc_slots_size++; }); |
211 | 0 | if (!_vec_output_expr_ctxs.empty() && |
212 | 0 | _vec_output_expr_ctxs.size() != output_tuple_desc_slots_size) { |
213 | 0 | LOG(WARNING) << "output tuple slot num should be equal to num of output exprs, " |
214 | 0 | << "output_tuple_slot_num " << _output_tuple_desc->slots().size() |
215 | 0 | << " output_expr_num " << _vec_output_expr_ctxs.size(); |
216 | 0 | return Status::InvalidArgument( |
217 | 0 | "output_tuple_slot_num {} should be equal to output_expr_num {}", |
218 | 0 | _output_tuple_desc->slots().size(), _vec_output_expr_ctxs.size()); |
219 | 0 | } |
220 | | |
221 | 0 | _block_convertor = std::make_unique<OlapTableBlockConvertor>(_output_tuple_desc); |
222 | | // if partition_type is OLAP_TABLE_SINK_HASH_PARTITIONED, we handle the processing of auto_increment column |
223 | | // on exchange node rather than on TabletWriter |
224 | 0 | _block_convertor->init_autoinc_info( |
225 | 0 | _schema->db_id(), _schema->table_id(), _state->batch_size(), |
226 | 0 | _schema->is_fixed_partial_update() && !_schema->auto_increment_coulumn().empty(), |
227 | 0 | _schema->auto_increment_column_unique_id()); |
228 | 0 | _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc)); |
229 | | |
230 | | // add all counter |
231 | 0 | _input_rows_counter = ADD_COUNTER(_operator_profile, "RowsRead", TUnit::UNIT); |
232 | 0 | _output_rows_counter = ADD_COUNTER(_operator_profile, "RowsProduced", TUnit::UNIT); |
233 | 0 | _filtered_rows_counter = ADD_COUNTER(_operator_profile, "RowsFiltered", TUnit::UNIT); |
234 | 0 | _send_data_timer = ADD_TIMER_WITH_LEVEL(_operator_profile, "SendDataTime", 1); |
235 | 0 | _wait_mem_limit_timer = |
236 | 0 | ADD_CHILD_TIMER_WITH_LEVEL(_operator_profile, "WaitMemLimitTime", "SendDataTime", 1); |
237 | 0 | _row_distribution_timer = |
238 | 0 | ADD_CHILD_TIMER_WITH_LEVEL(_operator_profile, "RowDistributionTime", "SendDataTime", 1); |
239 | 0 | _write_memtable_timer = |
240 | 0 | ADD_CHILD_TIMER_WITH_LEVEL(_operator_profile, "WriteMemTableTime", "SendDataTime", 1); |
241 | 0 | _add_partition_request_timer = ADD_CHILD_TIMER_WITH_LEVEL( |
242 | 0 | _operator_profile, "AddPartitionRequestTime", "SendDataTime", 1); |
243 | 0 | _validate_data_timer = ADD_TIMER_WITH_LEVEL(_operator_profile, "ValidateDataTime", 1); |
244 | 0 | _open_timer = ADD_TIMER(_operator_profile, "OpenTime"); |
245 | 0 | _close_timer = ADD_TIMER(_operator_profile, "CloseWaitTime"); |
246 | 0 | _close_writer_timer = ADD_CHILD_TIMER(_operator_profile, "CloseWriterTime", "CloseWaitTime"); |
247 | 0 | _close_load_timer = ADD_CHILD_TIMER(_operator_profile, "CloseLoadTime", "CloseWaitTime"); |
248 | 0 | _load_back_pressure_version_time_ms = |
249 | 0 | ADD_TIMER(_operator_profile, "LoadBackPressureVersionTimeMs"); |
250 | |
|
251 | 0 | if (config::share_delta_writers) { |
252 | 0 | _delta_writer_for_tablet = ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create( |
253 | 0 | _load_id, _num_local_sink); |
254 | 0 | } else { |
255 | 0 | _delta_writer_for_tablet = std::make_shared<DeltaWriterV2Map>(_load_id); |
256 | 0 | } |
257 | 0 | _load_stream_map = ExecEnv::GetInstance()->load_stream_map_pool()->get_or_create( |
258 | 0 | _load_id, _backend_id, _stream_per_node, _num_local_sink); |
259 | 0 | return Status::OK(); |
260 | 0 | } |
261 | | |
262 | 0 | Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) { |
263 | 0 | RETURN_IF_ERROR(_init(state, profile)); |
264 | 0 | LOG(INFO) << "opening olap table sink, load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id |
265 | 0 | << ", sink_id=" << _sender_id; |
266 | 0 | _timeout_watch.start(); |
267 | 0 | SCOPED_TIMER(_operator_profile->total_time_counter()); |
268 | 0 | SCOPED_TIMER(_open_timer); |
269 | 0 | SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); |
270 | |
|
271 | 0 | RETURN_IF_ERROR(_build_tablet_node_mapping()); |
272 | 0 | RETURN_IF_ERROR(_open_streams()); |
273 | 0 | RETURN_IF_ERROR(_init_row_distribution()); |
274 | | |
275 | 0 | return Status::OK(); |
276 | 0 | } |
277 | | |
278 | 0 | Status VTabletWriterV2::_open_streams() { |
279 | 0 | int fault_injection_skip_be = 0; |
280 | 0 | bool any_backend = false; |
281 | 0 | bool any_success = false; |
282 | 0 | for (auto& [dst_id, _] : _tablets_for_node) { |
283 | 0 | auto streams = _load_stream_map->get_or_create(dst_id); |
284 | 0 | DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_one_backend", { |
285 | 0 | if (fault_injection_skip_be < 1) { |
286 | 0 | fault_injection_skip_be++; |
287 | 0 | continue; |
288 | 0 | } |
289 | 0 | }); |
290 | 0 | DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_two_backends", { |
291 | 0 | if (fault_injection_skip_be < 2) { |
292 | 0 | fault_injection_skip_be++; |
293 | 0 | continue; |
294 | 0 | } |
295 | 0 | }); |
296 | 0 | auto st = _open_streams_to_backend(dst_id, *streams); |
297 | 0 | any_backend = true; |
298 | 0 | any_success = any_success || st.ok(); |
299 | 0 | } |
300 | 0 | if (any_backend && !any_success) { |
301 | 0 | return Status::InternalError("failed to open streams to any BE"); |
302 | 0 | } |
303 | 0 | return Status::OK(); |
304 | 0 | } |
305 | | |
306 | 0 | Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, LoadStreamStubs& streams) { |
307 | 0 | const auto* node_info = _nodes_info->find_node(dst_id); |
308 | 0 | DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.node_info_null", |
309 | 0 | { node_info = nullptr; }); |
310 | 0 | if (node_info == nullptr) { |
311 | 0 | return Status::InternalError("Unknown node {} in tablet location", dst_id); |
312 | 0 | } |
313 | 0 | auto idle_timeout_ms = _state->execution_timeout() * 1000; |
314 | 0 | std::vector<PTabletID>& tablets_for_schema = _indexes_from_node[node_info->id]; |
315 | 0 | DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.no_schema_when_open_streams", |
316 | 0 | { tablets_for_schema.clear(); }); |
317 | 0 | auto st = streams.open(_state->exec_env()->brpc_streaming_client_cache(), *node_info, _txn_id, |
318 | 0 | *_schema, tablets_for_schema, _total_streams, idle_timeout_ms, |
319 | 0 | _state->enable_profile()); |
320 | 0 | if (!st.ok()) { |
321 | 0 | LOG(WARNING) << "failed to open stream to backend " << dst_id |
322 | 0 | << ", load_id=" << print_id(_load_id) << ", err=" << st; |
323 | 0 | } |
324 | 0 | return st; |
325 | 0 | } |
326 | | |
327 | 0 | Status VTabletWriterV2::_build_tablet_node_mapping() { |
328 | 0 | std::unordered_set<int64_t> known_indexes; |
329 | 0 | for (const auto& partition : _vpartition->get_partitions()) { |
330 | 0 | for (const auto& index : partition->indexes) { |
331 | 0 | for (const auto& tablet_id : index.tablets) { |
332 | 0 | auto* tablet_location = _location->find_tablet(tablet_id); |
333 | 0 | DBUG_EXECUTE_IF("VTabletWriterV2._build_tablet_node_mapping.tablet_location_null", |
334 | 0 | { tablet_location = nullptr; }); |
335 | 0 | if (tablet_location == nullptr) { |
336 | 0 | return Status::InternalError("unknown tablet location, tablet id = {}", |
337 | 0 | tablet_id); |
338 | 0 | } |
339 | 0 | for (auto& node : tablet_location->node_ids) { |
340 | 0 | PTabletID tablet; |
341 | 0 | tablet.set_partition_id(partition->id); |
342 | 0 | tablet.set_index_id(index.index_id); |
343 | 0 | tablet.set_tablet_id(tablet_id); |
344 | 0 | _tablets_for_node[node].emplace(tablet_id, tablet); |
345 | 0 | constexpr int64_t DUMMY_TABLET_ID = 0; |
346 | 0 | if (tablet_id == DUMMY_TABLET_ID) [[unlikely]] { |
347 | | // ignore fake tablet for auto partition |
348 | 0 | continue; |
349 | 0 | } |
350 | 0 | _tablets_by_node[node].emplace(tablet_id); |
351 | 0 | if (known_indexes.contains(index.index_id)) [[likely]] { |
352 | 0 | continue; |
353 | 0 | } |
354 | 0 | _indexes_from_node[node].emplace_back(tablet); |
355 | 0 | known_indexes.insert(index.index_id); |
356 | 0 | } |
357 | 0 | _build_tablet_replica_info(tablet_id, partition); |
358 | 0 | } |
359 | 0 | } |
360 | 0 | } |
361 | 0 | return Status::OK(); |
362 | 0 | } |
363 | | |
364 | | void VTabletWriterV2::_build_tablet_replica_info(const int64_t tablet_id, |
365 | 0 | VOlapTablePartition* partition) { |
366 | 0 | if (partition != nullptr) { |
367 | 0 | int total_replicas_num = |
368 | 0 | partition->total_replica_num == 0 ? _num_replicas : partition->total_replica_num; |
369 | 0 | int load_required_replicas_num = partition->load_required_replica_num == 0 |
370 | 0 | ? (_num_replicas + 1) / 2 |
371 | 0 | : partition->load_required_replica_num; |
372 | 0 | _tablet_replica_info[tablet_id] = |
373 | 0 | std::make_pair(total_replicas_num, load_required_replicas_num); |
374 | 0 | } else { |
375 | 0 | _tablet_replica_info[tablet_id] = std::make_pair(_num_replicas, (_num_replicas + 1) / 2); |
376 | 0 | } |
377 | 0 | } |
378 | | |
379 | | void VTabletWriterV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>& row_part_tablet_ids, |
380 | 0 | RowsForTablet& rows_for_tablet) { |
381 | 0 | for (int index_idx = 0; index_idx < row_part_tablet_ids.size(); index_idx++) { |
382 | 0 | auto& row_ids = row_part_tablet_ids[index_idx].row_ids; |
383 | 0 | auto& partition_ids = row_part_tablet_ids[index_idx].partition_ids; |
384 | 0 | auto& tablet_ids = row_part_tablet_ids[index_idx].tablet_ids; |
385 | |
|
386 | 0 | for (size_t i = 0; i < row_ids.size(); i++) { |
387 | 0 | auto& tablet_id = tablet_ids[i]; |
388 | 0 | auto it = rows_for_tablet.find(tablet_id); |
389 | 0 | if (it == rows_for_tablet.end()) { |
390 | 0 | Rows rows; |
391 | 0 | rows.partition_id = partition_ids[i]; |
392 | 0 | rows.index_id = _schema->indexes()[index_idx]->index_id; |
393 | 0 | rows.row_idxes.reserve(row_ids.size()); |
394 | 0 | auto [tmp_it, _] = rows_for_tablet.insert({tablet_id, rows}); |
395 | 0 | it = tmp_it; |
396 | 0 | } |
397 | 0 | it->second.row_idxes.push_back(row_ids[i]); |
398 | 0 | _number_output_rows++; |
399 | 0 | } |
400 | 0 | } |
401 | 0 | } |
402 | | |
403 | | Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id, |
404 | 0 | std::vector<std::shared_ptr<LoadStreamStub>>& streams) { |
405 | 0 | std::vector<int64_t> failed_node_ids; |
406 | 0 | const auto* location = _location->find_tablet(tablet_id); |
407 | 0 | DBUG_EXECUTE_IF("VTabletWriterV2._select_streams.location_null", { location = nullptr; }); |
408 | 0 | if (location == nullptr) { |
409 | 0 | return Status::InternalError("unknown tablet location, tablet id = {}", tablet_id); |
410 | 0 | } |
411 | 0 | for (const auto& node_id : location->node_ids) { |
412 | 0 | PTabletID tablet; |
413 | 0 | tablet.set_partition_id(partition_id); |
414 | 0 | tablet.set_index_id(index_id); |
415 | 0 | tablet.set_tablet_id(tablet_id); |
416 | 0 | VLOG_DEBUG << fmt::format("_select_streams P{} I{} T{}", partition_id, index_id, tablet_id); |
417 | 0 | _tablets_for_node[node_id].emplace(tablet_id, tablet); |
418 | 0 | auto stream = _load_stream_map->at(node_id)->select_one_stream(); |
419 | 0 | DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_two_backends", { |
420 | 0 | LOG(INFO) << "[skip_two_backends](detail) tablet_id=" << tablet_id |
421 | 0 | << ", node_id=" << node_id |
422 | 0 | << ", stream_ok=" << (stream == nullptr ? "no" : "yes"); |
423 | 0 | }); |
424 | 0 | if (stream == nullptr) { |
425 | 0 | LOG(WARNING) << "skip writing tablet " << tablet_id << " to backend " << node_id |
426 | 0 | << ": stream is not open"; |
427 | 0 | failed_node_ids.push_back(node_id); |
428 | 0 | continue; |
429 | 0 | } |
430 | 0 | streams.emplace_back(std::move(stream)); |
431 | 0 | } |
432 | 0 | DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_two_backends", { |
433 | 0 | LOG(INFO) << "[skip_two_backends](summary) tablet_id=" << tablet_id |
434 | 0 | << ", num_streams=" << streams.size() |
435 | 0 | << ", num_nodes=" << location->node_ids.size(); |
436 | 0 | }); |
437 | 0 | if (streams.size() <= location->node_ids.size() / 2) { |
438 | 0 | std::ostringstream success_msg; |
439 | 0 | std::ostringstream failed_msg; |
440 | 0 | for (auto& s : streams) { |
441 | 0 | success_msg << ", " << s->dst_id(); |
442 | 0 | } |
443 | 0 | for (auto id : failed_node_ids) { |
444 | 0 | failed_msg << ", " << id; |
445 | 0 | } |
446 | 0 | LOG(INFO) << "failed to write enough replicas " << streams.size() << "/" |
447 | 0 | << location->node_ids.size() << " for tablet " << tablet_id |
448 | 0 | << " due to connection errors; success nodes" << success_msg.str() |
449 | 0 | << "; failed nodes" << failed_msg.str() << "."; |
450 | 0 | return Status::InternalError( |
451 | 0 | "failed to write enough replicas {}/{} for tablet {} due to connection errors", |
452 | 0 | streams.size(), location->node_ids.size(), tablet_id); |
453 | 0 | } |
454 | 0 | Status st; |
455 | 0 | for (auto& stream : streams) { |
456 | 0 | st = stream->wait_for_schema(partition_id, index_id, tablet_id); |
457 | 0 | if (st.ok()) { |
458 | 0 | break; |
459 | 0 | } else { |
460 | 0 | LOG(WARNING) << "failed to get schema from stream " << stream << ", err=" << st; |
461 | 0 | } |
462 | 0 | } |
463 | 0 | return st; |
464 | 0 | } |
465 | | |
466 | 0 | Status VTabletWriterV2::write(RuntimeState* state, Block& input_block) { |
467 | 0 | SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); |
468 | 0 | Status status = Status::OK(); |
469 | |
|
470 | 0 | if (_state->query_options().dry_run_query) { |
471 | 0 | return status; |
472 | 0 | } |
473 | | |
474 | 0 | int64_t total_wait_time_ms = 0; |
475 | 0 | auto streams_for_node = _load_stream_map->get_streams_for_node(); |
476 | 0 | for (const auto& [dst_id, streams] : streams_for_node) { |
477 | 0 | for (const auto& stream : streams->streams()) { |
478 | 0 | auto wait_time_ms = stream->get_and_reset_load_back_pressure_version_wait_time_ms(); |
479 | 0 | if (wait_time_ms > 0) { |
480 | 0 | total_wait_time_ms = std::max(total_wait_time_ms, wait_time_ms); |
481 | 0 | } |
482 | 0 | } |
483 | 0 | } |
484 | 0 | if (UNLIKELY(total_wait_time_ms > 0)) { |
485 | 0 | std::this_thread::sleep_for(std::chrono::milliseconds(total_wait_time_ms)); |
486 | 0 | _load_back_pressure_version_block_ms.fetch_add(total_wait_time_ms); |
487 | 0 | } |
488 | | |
489 | | // check out of limit |
490 | 0 | RETURN_IF_ERROR(_send_new_partition_batch()); |
491 | | |
492 | 0 | auto input_rows = input_block.rows(); |
493 | 0 | auto input_bytes = input_block.bytes(); |
494 | 0 | if (UNLIKELY(input_rows == 0)) { |
495 | 0 | return status; |
496 | 0 | } |
497 | 0 | SCOPED_TIMER(_operator_profile->total_time_counter()); |
498 | 0 | _number_input_rows += input_rows; |
499 | | // update incrementally so that FE can get the progress. |
500 | | // the real 'num_rows_load_total' will be set when sink being closed. |
501 | 0 | _state->update_num_rows_load_total(input_rows); |
502 | 0 | _state->update_num_bytes_load_total(input_bytes); |
503 | 0 | DorisMetrics::instance()->load_rows->increment(input_rows); |
504 | 0 | DorisMetrics::instance()->load_bytes->increment(input_bytes); |
505 | |
|
506 | 0 | SCOPED_RAW_TIMER(&_send_data_ns); |
507 | | // This is just for passing compilation. |
508 | 0 | _row_distribution_watch.start(); |
509 | |
|
510 | 0 | std::shared_ptr<Block> block; |
511 | 0 | RETURN_IF_ERROR(_row_distribution.generate_rows_distribution( |
512 | 0 | input_block, block, _row_part_tablet_ids, _number_input_rows)); |
513 | 0 | RowsForTablet rows_for_tablet; |
514 | 0 | _generate_rows_for_tablet(_row_part_tablet_ids, rows_for_tablet); |
515 | |
|
516 | 0 | _row_distribution_watch.stop(); |
517 | | |
518 | | // For each tablet, send its input_rows from block to delta writer |
519 | 0 | for (const auto& [tablet_id, rows] : rows_for_tablet) { |
520 | 0 | RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows)); |
521 | 0 | } |
522 | | |
523 | 0 | COUNTER_SET(_input_rows_counter, _number_input_rows); |
524 | 0 | COUNTER_SET(_output_rows_counter, _number_output_rows); |
525 | 0 | COUNTER_SET(_filtered_rows_counter, |
526 | 0 | _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows()); |
527 | 0 | COUNTER_SET(_send_data_timer, _send_data_ns); |
528 | 0 | COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time()); |
529 | 0 | COUNTER_SET(_validate_data_timer, _block_convertor->validate_data_ns()); |
530 | |
|
531 | 0 | return Status::OK(); |
532 | 0 | } |
533 | | |
534 | | Status VTabletWriterV2::_write_memtable(std::shared_ptr<Block> block, int64_t tablet_id, |
535 | 0 | const Rows& rows) { |
536 | 0 | auto st = Status::OK(); |
537 | 0 | auto delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id, [&]() { |
538 | 0 | std::vector<std::shared_ptr<LoadStreamStub>> streams; |
539 | 0 | st = _select_streams(tablet_id, rows.partition_id, rows.index_id, streams); |
540 | 0 | if (!st.ok()) [[unlikely]] { |
541 | 0 | LOG(WARNING) << "select stream failed, " << st << ", load_id=" << print_id(_load_id); |
542 | 0 | return std::unique_ptr<DeltaWriterV2>(nullptr); |
543 | 0 | } |
544 | 0 | WriteRequest req { |
545 | 0 | .tablet_id = tablet_id, |
546 | 0 | .txn_id = _txn_id, |
547 | 0 | .index_id = rows.index_id, |
548 | 0 | .partition_id = rows.partition_id, |
549 | 0 | .load_id = _load_id, |
550 | 0 | .tuple_desc = _schema->tuple_desc(), |
551 | 0 | .table_schema_param = _schema, |
552 | 0 | .is_high_priority = _is_high_priority, |
553 | 0 | .write_file_cache = _write_file_cache, |
554 | 0 | .storage_vault_id {}, |
555 | 0 | }; |
556 | 0 | bool index_not_found = true; |
557 | 0 | for (const auto& index : _schema->indexes()) { |
558 | 0 | if (index->index_id == rows.index_id) { |
559 | 0 | req.slots = &index->slots; |
560 | 0 | req.schema_hash = index->schema_hash; |
561 | 0 | index_not_found = false; |
562 | 0 | break; |
563 | 0 | } |
564 | 0 | } |
565 | 0 | DBUG_EXECUTE_IF("VTabletWriterV2._write_memtable.index_not_found", |
566 | 0 | { index_not_found = true; }); |
567 | 0 | if (index_not_found) [[unlikely]] { |
568 | 0 | st = Status::InternalError("no index {} in schema", rows.index_id); |
569 | 0 | LOG(WARNING) << "index " << rows.index_id |
570 | 0 | << " not found in schema, load_id=" << print_id(_load_id); |
571 | 0 | return std::unique_ptr<DeltaWriterV2>(nullptr); |
572 | 0 | } |
573 | 0 | return DeltaWriterV2::create_unique(&req, streams, _state); |
574 | 0 | }); |
575 | 0 | if (delta_writer == nullptr) { |
576 | 0 | LOG(WARNING) << "failed to open DeltaWriter for tablet " << tablet_id |
577 | 0 | << ", load_id=" << print_id(_load_id) << ", err: " << st; |
578 | 0 | return Status::InternalError("failed to open DeltaWriter {}: {}", tablet_id, st.msg()); |
579 | 0 | } |
580 | 0 | { |
581 | 0 | SCOPED_TIMER(_wait_mem_limit_timer); |
582 | 0 | ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush( |
583 | 0 | [state = _state]() { return state->is_cancelled(); }); |
584 | 0 | if (_state->is_cancelled()) { |
585 | 0 | return _state->cancel_reason(); |
586 | 0 | } |
587 | 0 | } |
588 | 0 | SCOPED_TIMER(_write_memtable_timer); |
589 | 0 | st = delta_writer->write(block.get(), rows.row_idxes); |
590 | 0 | return st; |
591 | 0 | } |
592 | | |
593 | 0 | void VTabletWriterV2::_cancel(Status status) { |
594 | 0 | LOG(INFO) << "canceled olap table sink. load_id=" << print_id(_load_id) |
595 | 0 | << ", txn_id=" << _txn_id << ", sink_id=" << _sender_id |
596 | 0 | << ", due to error: " << status; |
597 | 0 | if (_delta_writer_for_tablet) { |
598 | 0 | _delta_writer_for_tablet->cancel(status); |
599 | 0 | _delta_writer_for_tablet.reset(); |
600 | 0 | } |
601 | 0 | if (_load_stream_map) { |
602 | 0 | _load_stream_map->for_each( |
603 | 0 | [status](int64_t dst_id, LoadStreamStubs& streams) { streams.cancel(status); }); |
604 | 0 | _load_stream_map->release(); |
605 | 0 | } |
606 | 0 | } |
607 | | |
608 | 0 | Status VTabletWriterV2::_send_new_partition_batch() { |
609 | 0 | if (_row_distribution.need_deal_batching()) { // maybe try_close more than 1 time |
610 | 0 | RETURN_IF_ERROR(_row_distribution.automatic_create_partition()); |
611 | | |
612 | 0 | Block tmp_block = _row_distribution._batching_block->to_block(); // Borrow out, for lval ref |
613 | | |
614 | | // these order is unique. |
615 | | // 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block. |
616 | | // 2. deal batched block |
617 | | // 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that. |
618 | 0 | _row_distribution.clear_batching_stats(); |
619 | 0 | RETURN_IF_ERROR(this->write(_state, tmp_block)); |
620 | 0 | _row_distribution._batching_block->set_mutable_columns( |
621 | 0 | tmp_block.mutate_columns()); // Recovery back |
622 | 0 | _row_distribution._batching_block->clear_column_data(); |
623 | 0 | _row_distribution._deal_batched = false; |
624 | 0 | } |
625 | 0 | return Status::OK(); |
626 | 0 | } |
627 | | |
628 | 0 | Status VTabletWriterV2::close(Status exec_status) { |
629 | 0 | std::lock_guard<std::mutex> close_lock(_close_mutex); |
630 | 0 | if (_is_closed) { |
631 | 0 | return _close_status; |
632 | 0 | } |
633 | 0 | LOG(INFO) << "closing olap table sink, load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id |
634 | 0 | << ", sink_id=" << _sender_id << ", status=" << exec_status.to_string(); |
635 | 0 | SCOPED_TIMER(_close_timer); |
636 | 0 | Status status = exec_status; |
637 | |
|
638 | 0 | if (status.ok()) { |
639 | 0 | SCOPED_TIMER(_operator_profile->total_time_counter()); |
640 | 0 | _row_distribution._deal_batched = true; |
641 | 0 | status = _send_new_partition_batch(); |
642 | 0 | } |
643 | |
|
644 | 0 | DBUG_EXECUTE_IF("VTabletWriterV2.close.sleep", { |
645 | 0 | auto sleep_sec = DebugPoints::instance()->get_debug_param_or_default<int32_t>( |
646 | 0 | "VTabletWriterV2.close.sleep", "sleep_sec", 1); |
647 | 0 | std::this_thread::sleep_for(std::chrono::seconds(sleep_sec)); |
648 | 0 | }); |
649 | 0 | DBUG_EXECUTE_IF("VTabletWriterV2.close.cancel", |
650 | 0 | { status = Status::InternalError("load cancel"); }); |
651 | 0 | if (status.ok()) { |
652 | | // only if status is ok can we call this _profile->total_time_counter(). |
653 | | // if status is not ok, this sink may not be prepared, so that _profile is null |
654 | 0 | SCOPED_TIMER(_operator_profile->total_time_counter()); |
655 | |
|
656 | 0 | COUNTER_SET(_input_rows_counter, _number_input_rows); |
657 | 0 | COUNTER_SET(_output_rows_counter, _number_output_rows); |
658 | 0 | COUNTER_SET(_filtered_rows_counter, |
659 | 0 | _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows()); |
660 | 0 | COUNTER_SET(_send_data_timer, _send_data_ns); |
661 | 0 | COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time()); |
662 | 0 | COUNTER_SET(_validate_data_timer, _block_convertor->validate_data_ns()); |
663 | 0 | auto back_pressure_time_ms = _load_back_pressure_version_block_ms.load(); |
664 | 0 | COUNTER_SET(_load_back_pressure_version_time_ms, back_pressure_time_ms); |
665 | 0 | g_sink_load_back_pressure_version_time_ms << back_pressure_time_ms; |
666 | | |
667 | | // close DeltaWriters |
668 | 0 | { |
669 | 0 | std::unordered_map<int64_t, int32_t> segments_for_tablet; |
670 | 0 | SCOPED_TIMER(_close_writer_timer); |
671 | | // close all delta writers if this is the last user |
672 | 0 | auto st = _delta_writer_for_tablet->close(segments_for_tablet, _operator_profile); |
673 | 0 | _delta_writer_for_tablet.reset(); |
674 | 0 | if (!st.ok()) { |
675 | 0 | _cancel(st); |
676 | 0 | return st; |
677 | 0 | } |
678 | | // only the last sink closing delta writers will have segment num |
679 | 0 | if (!segments_for_tablet.empty()) { |
680 | 0 | _load_stream_map->save_segments_for_tablet(segments_for_tablet); |
681 | 0 | } |
682 | 0 | } |
683 | | |
684 | 0 | _calc_tablets_to_commit(); |
685 | 0 | const bool is_last_sink = _load_stream_map->release(); |
686 | 0 | LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" << is_last_sink |
687 | 0 | << ", load_id=" << print_id(_load_id); |
688 | | |
689 | | // send CLOSE_LOAD on all non-incremental streams if this is the last sink |
690 | 0 | if (is_last_sink) { |
691 | 0 | _load_stream_map->close_load(false); |
692 | 0 | } |
693 | | |
694 | | // close_wait on all non-incremental streams, even if this is not the last sink. |
695 | | // because some per-instance data structures are now shared among all sinks |
696 | | // due to sharing delta writers and load stream stubs. |
697 | | // Do not need to wait after quorum success, |
698 | | // for first-stage close_wait only ensure incremental streams load has been completed, |
699 | | // unified waiting in the second-stage close_wait. |
700 | 0 | RETURN_IF_ERROR(_close_wait(_non_incremental_streams(), false)); |
701 | | |
702 | | // send CLOSE_LOAD on all incremental streams if this is the last sink. |
703 | | // this must happen after all non-incremental streams are closed, |
704 | | // so we can ensure all sinks are in close phase before closing incremental streams. |
705 | 0 | if (is_last_sink) { |
706 | 0 | _load_stream_map->close_load(true); |
707 | 0 | } |
708 | | |
709 | | // close_wait on all incremental streams, even if this is not the last sink. |
710 | 0 | RETURN_IF_ERROR(_close_wait(_all_streams(), true)); |
711 | | |
712 | | // calculate and submit commit info |
713 | 0 | if (is_last_sink) { |
714 | 0 | DBUG_EXECUTE_IF("VTabletWriterV2.close.add_failed_tablet", { |
715 | 0 | auto streams = _load_stream_map->at(_tablets_for_node.begin()->first); |
716 | 0 | int64_t tablet_id = -1; |
717 | 0 | for (auto tablet : streams->success_tablets()) { |
718 | 0 | tablet_id = tablet; |
719 | 0 | break; |
720 | 0 | } |
721 | 0 | if (tablet_id != -1) { |
722 | 0 | LOG(INFO) << "fault injection: adding failed tablet_id: " << tablet_id; |
723 | 0 | streams->select_one_stream()->add_failed_tablet( |
724 | 0 | tablet_id, Status::InternalError("fault injection")); |
725 | 0 | } else { |
726 | 0 | LOG(INFO) << "fault injection: failed to inject failed tablet_id"; |
727 | 0 | } |
728 | 0 | }); |
729 | |
|
730 | 0 | std::vector<TTabletCommitInfo> tablet_commit_infos; |
731 | 0 | RETURN_IF_ERROR(_create_commit_info(tablet_commit_infos, _load_stream_map)); |
732 | 0 | _state->add_tablet_commit_infos(tablet_commit_infos); |
733 | 0 | } |
734 | | |
735 | | // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node |
736 | 0 | int64_t num_rows_load_total = _number_input_rows + _state->num_rows_load_filtered() + |
737 | 0 | _state->num_rows_load_unselected(); |
738 | 0 | _state->set_num_rows_load_total(num_rows_load_total); |
739 | 0 | _state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() + |
740 | 0 | _tablet_finder->num_filtered_rows()); |
741 | 0 | _state->update_num_rows_load_unselected( |
742 | 0 | _tablet_finder->num_immutable_partition_filtered_rows()); |
743 | |
|
744 | 0 | if (_state->enable_profile() && _state->profile_level() >= 2) { |
745 | | // Output detailed profiling info for auto-partition requests |
746 | 0 | _row_distribution.output_profile_info(_operator_profile); |
747 | 0 | } |
748 | |
|
749 | 0 | LOG(INFO) << "finished to close olap table sink. load_id=" << print_id(_load_id) |
750 | 0 | << ", txn_id=" << _txn_id; |
751 | 0 | } else { |
752 | 0 | _cancel(status); |
753 | 0 | } |
754 | | |
755 | 0 | _is_closed = true; |
756 | 0 | _close_status = status; |
757 | 0 | return status; |
758 | 0 | } |
759 | | |
760 | 0 | std::unordered_set<std::shared_ptr<LoadStreamStub>> VTabletWriterV2::_all_streams() { |
761 | 0 | std::unordered_set<std::shared_ptr<LoadStreamStub>> all_streams; |
762 | 0 | auto streams_for_node = _load_stream_map->get_streams_for_node(); |
763 | 0 | for (const auto& [dst_id, streams] : streams_for_node) { |
764 | 0 | for (const auto& stream : streams->streams()) { |
765 | 0 | all_streams.insert(stream); |
766 | 0 | } |
767 | 0 | } |
768 | 0 | return all_streams; |
769 | 0 | } |
770 | | |
771 | 0 | std::unordered_set<std::shared_ptr<LoadStreamStub>> VTabletWriterV2::_non_incremental_streams() { |
772 | 0 | std::unordered_set<std::shared_ptr<LoadStreamStub>> non_incremental_streams; |
773 | 0 | auto streams_for_node = _load_stream_map->get_streams_for_node(); |
774 | 0 | for (const auto& [dst_id, streams] : streams_for_node) { |
775 | 0 | for (const auto& stream : streams->streams()) { |
776 | 0 | if (!stream->is_incremental()) { |
777 | 0 | non_incremental_streams.insert(stream); |
778 | 0 | } |
779 | 0 | } |
780 | 0 | } |
781 | 0 | return non_incremental_streams; |
782 | 0 | } |
783 | | |
784 | | Status VTabletWriterV2::_close_wait( |
785 | | std::unordered_set<std::shared_ptr<LoadStreamStub>> unfinished_streams, |
786 | 0 | bool need_wait_after_quorum_success) { |
787 | 0 | SCOPED_TIMER(_close_load_timer); |
788 | 0 | Status status; |
789 | 0 | auto streams_for_node = _load_stream_map->get_streams_for_node(); |
790 | | // 1. first wait for quorum success |
791 | 0 | std::unordered_set<int64_t> need_finish_tablets; |
792 | 0 | auto partition_ids = _tablet_finder->partition_ids(); |
793 | 0 | for (const auto& part : _vpartition->get_partitions()) { |
794 | 0 | if (partition_ids.contains(part->id)) { |
795 | 0 | for (const auto& index : part->indexes) { |
796 | 0 | for (const auto& tablet_id : index.tablets) { |
797 | 0 | need_finish_tablets.insert(tablet_id); |
798 | 0 | } |
799 | 0 | } |
800 | 0 | } |
801 | 0 | } |
802 | 0 | while (true) { |
803 | 0 | RETURN_IF_ERROR(_check_timeout()); |
804 | 0 | RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status, streams_for_node)); |
805 | 0 | bool quorum_success = _quorum_success(unfinished_streams, need_finish_tablets); |
806 | 0 | if (quorum_success || unfinished_streams.empty()) { |
807 | 0 | LOG(INFO) << "quorum_success: " << quorum_success |
808 | 0 | << ", is all finished: " << unfinished_streams.empty() |
809 | 0 | << ", txn_id: " << _txn_id << ", load_id: " << print_id(_load_id); |
810 | 0 | break; |
811 | 0 | } |
812 | 0 | bthread_usleep(1000 * 10); |
813 | 0 | } |
814 | | |
815 | | // 2. then wait for remaining streams as much as possible |
816 | 0 | if (!unfinished_streams.empty() && need_wait_after_quorum_success) { |
817 | 0 | int64_t arrival_quorum_success_time = UnixMillis(); |
818 | 0 | int64_t max_wait_time_ms = _calc_max_wait_time_ms(streams_for_node, unfinished_streams); |
819 | 0 | while (true) { |
820 | 0 | RETURN_IF_ERROR(_check_timeout()); |
821 | 0 | RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status, streams_for_node)); |
822 | 0 | if (unfinished_streams.empty()) { |
823 | 0 | break; |
824 | 0 | } |
825 | 0 | int64_t elapsed_ms = UnixMillis() - arrival_quorum_success_time; |
826 | 0 | if (elapsed_ms > max_wait_time_ms || |
827 | 0 | _state->execution_timeout() - elapsed_ms / 1000 < |
828 | 0 | config::quorum_success_remaining_timeout_seconds) { |
829 | 0 | std::stringstream unfinished_streams_str; |
830 | 0 | for (const auto& stream : unfinished_streams) { |
831 | 0 | unfinished_streams_str << stream->stream_id() << ","; |
832 | 0 | } |
833 | 0 | LOG(WARNING) << "reach max wait time, max_wait_time_ms: " << max_wait_time_ms |
834 | 0 | << ", load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id |
835 | 0 | << ", unfinished streams: " << unfinished_streams_str.str(); |
836 | 0 | break; |
837 | 0 | } |
838 | 0 | bthread_usleep(1000 * 10); |
839 | 0 | } |
840 | 0 | } |
841 | | |
842 | 0 | if (!status.ok()) { |
843 | 0 | LOG(WARNING) << "close_wait failed: " << status << ", load_id=" << print_id(_load_id); |
844 | 0 | } |
845 | 0 | return status; |
846 | 0 | } |
847 | | |
848 | | bool VTabletWriterV2::_quorum_success( |
849 | | const std::unordered_set<std::shared_ptr<LoadStreamStub>>& unfinished_streams, |
850 | 0 | const std::unordered_set<int64_t>& need_finish_tablets) { |
851 | 0 | if (!config::enable_quorum_success_write) { |
852 | 0 | return false; |
853 | 0 | } |
854 | 0 | auto streams_for_node = _load_stream_map->get_streams_for_node(); |
855 | 0 | if (need_finish_tablets.empty()) [[unlikely]] { |
856 | 0 | return false; |
857 | 0 | } |
858 | | |
859 | | // 1. calculate finished tablets replica num |
860 | 0 | std::unordered_set<int64_t> finished_dst_ids; |
861 | 0 | std::unordered_map<int64_t, int64_t> finished_tablets_replica; |
862 | 0 | for (const auto& [dst_id, streams] : streams_for_node) { |
863 | 0 | bool finished = true; |
864 | 0 | for (const auto& stream : streams->streams()) { |
865 | 0 | if (unfinished_streams.contains(stream) || !stream->check_cancel().ok()) { |
866 | 0 | finished = false; |
867 | 0 | break; |
868 | 0 | } |
869 | 0 | } |
870 | 0 | if (finished) { |
871 | 0 | finished_dst_ids.insert(dst_id); |
872 | 0 | } |
873 | 0 | } |
874 | 0 | for (const auto& [dst_id, _] : streams_for_node) { |
875 | 0 | if (!finished_dst_ids.contains(dst_id)) { |
876 | 0 | continue; |
877 | 0 | } |
878 | 0 | for (const auto& tablet_id : _tablets_by_node[dst_id]) { |
879 | 0 | finished_tablets_replica[tablet_id]++; |
880 | 0 | } |
881 | 0 | } |
882 | | |
883 | | // 2. check if quorum success |
884 | 0 | for (const auto& tablet_id : need_finish_tablets) { |
885 | 0 | if (finished_tablets_replica[tablet_id] < _load_required_replicas_num(tablet_id)) { |
886 | 0 | return false; |
887 | 0 | } |
888 | 0 | } |
889 | 0 | return true; |
890 | 0 | } |
891 | | |
892 | 0 | int VTabletWriterV2::_load_required_replicas_num(int64_t tablet_id) { |
893 | 0 | auto [total_replicas_num, load_required_replicas_num] = _tablet_replica_info[tablet_id]; |
894 | 0 | if (total_replicas_num == 0) { |
895 | 0 | return (_num_replicas + 1) / 2; |
896 | 0 | } |
897 | 0 | return load_required_replicas_num; |
898 | 0 | } |
899 | | |
900 | | int64_t VTabletWriterV2::_calc_max_wait_time_ms( |
901 | | const std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>>& streams_for_node, |
902 | 0 | const std::unordered_set<std::shared_ptr<LoadStreamStub>>& unfinished_streams) { |
903 | | // 1. calculate avg speed of all unfinished streams |
904 | 0 | int64_t elapsed_ms = _timeout_watch.elapsed_time() / 1000 / 1000; |
905 | 0 | int64_t total_bytes = 0; |
906 | 0 | int finished_count = 0; |
907 | 0 | for (const auto& [dst_id, streams] : streams_for_node) { |
908 | 0 | for (const auto& stream : streams->streams()) { |
909 | 0 | if (unfinished_streams.contains(stream) || !stream->check_cancel().ok()) { |
910 | 0 | continue; |
911 | 0 | } |
912 | 0 | total_bytes += stream->bytes_written(); |
913 | 0 | finished_count++; |
914 | 0 | } |
915 | 0 | } |
916 | | // no data loaded in index channel, return 0 |
917 | 0 | if (total_bytes == 0 || finished_count == 0) { |
918 | 0 | return 0; |
919 | 0 | } |
920 | | // if elapsed_ms is equal to 0, explain the loaded data is too small |
921 | 0 | if (elapsed_ms <= 0) { |
922 | 0 | return config::quorum_success_min_wait_seconds * 1000; |
923 | 0 | } |
924 | 0 | double avg_speed = |
925 | 0 | static_cast<double>(total_bytes) / (static_cast<double>(elapsed_ms) * finished_count); |
926 | | |
927 | | // 2. calculate max wait time of each unfinished stream and return the max value |
928 | 0 | int64_t max_wait_time_ms = 0; |
929 | 0 | for (const auto& [dst_id, streams] : streams_for_node) { |
930 | 0 | for (const auto& stream : streams->streams()) { |
931 | 0 | if (unfinished_streams.contains(stream)) { |
932 | 0 | int64_t bytes = stream->bytes_written(); |
933 | 0 | int64_t wait = |
934 | 0 | avg_speed > 0 ? static_cast<int64_t>(static_cast<double>(bytes) / avg_speed) |
935 | 0 | : 0; |
936 | 0 | max_wait_time_ms = std::max(max_wait_time_ms, wait); |
937 | 0 | } |
938 | 0 | } |
939 | 0 | } |
940 | | |
941 | | // 3. calculate max wait time |
942 | | // introduce quorum_success_min_wait_time_ms to avoid jitter of small load |
943 | 0 | max_wait_time_ms -= UnixMillis() - _timeout_watch.elapsed_time() / 1000 / 1000; |
944 | 0 | max_wait_time_ms = |
945 | 0 | std::max(static_cast<int64_t>(static_cast<double>(max_wait_time_ms) * |
946 | 0 | (1.0 + config::quorum_success_max_wait_multiplier)), |
947 | 0 | config::quorum_success_min_wait_seconds * 1000); |
948 | |
|
949 | 0 | return max_wait_time_ms; |
950 | 0 | } |
951 | | |
952 | 0 | Status VTabletWriterV2::_check_timeout() { |
953 | 0 | int64_t remain_ms = static_cast<int64_t>(_state->execution_timeout()) * 1000 - |
954 | 0 | _timeout_watch.elapsed_time() / 1000 / 1000; |
955 | 0 | DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { remain_ms = 0; }); |
956 | 0 | if (remain_ms <= 0) { |
957 | 0 | LOG(WARNING) << "load timed out before close waiting, load_id=" << print_id(_load_id); |
958 | 0 | return Status::TimedOut("load timed out before close waiting"); |
959 | 0 | } |
960 | 0 | return Status::OK(); |
961 | 0 | } |
962 | | |
963 | | Status VTabletWriterV2::_check_streams_finish( |
964 | | std::unordered_set<std::shared_ptr<LoadStreamStub>>& unfinished_streams, Status& status, |
965 | 0 | const std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>>& streams_for_node) { |
966 | 0 | for (const auto& [dst_id, streams] : streams_for_node) { |
967 | 0 | for (const auto& stream : streams->streams()) { |
968 | 0 | if (!unfinished_streams.contains(stream)) { |
969 | 0 | continue; |
970 | 0 | } |
971 | 0 | bool is_closed = false; |
972 | 0 | auto stream_st = stream->close_finish_check(_state, &is_closed); |
973 | 0 | DBUG_EXECUTE_IF("VTabletWriterV2._check_streams_finish.close_stream_failed", |
974 | 0 | { stream_st = Status::InternalError("close stream failed"); }); |
975 | 0 | if (!stream_st.ok()) { |
976 | 0 | status = stream_st; |
977 | 0 | unfinished_streams.erase(stream); |
978 | 0 | LOG(WARNING) << "close_wait failed: " << stream_st |
979 | 0 | << ", load_id=" << print_id(_load_id); |
980 | 0 | } |
981 | 0 | if (is_closed) { |
982 | 0 | unfinished_streams.erase(stream); |
983 | 0 | } |
984 | 0 | } |
985 | 0 | } |
986 | 0 | return status; |
987 | 0 | } |
988 | | |
989 | 0 | void VTabletWriterV2::_calc_tablets_to_commit() { |
990 | 0 | LOG(INFO) << "saving close load info, load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id |
991 | 0 | << ", sink_id=" << _sender_id; |
992 | 0 | for (const auto& [dst_id, tablets] : _tablets_for_node) { |
993 | 0 | std::vector<PTabletID> tablets_to_commit; |
994 | 0 | std::vector<int64_t> partition_ids; |
995 | 0 | for (const auto& [tablet_id, tablet] : tablets) { |
996 | 0 | if (_tablet_finder->partition_ids().contains(tablet.partition_id())) { |
997 | 0 | if (VLOG_DEBUG_IS_ON) { |
998 | 0 | partition_ids.push_back(tablet.partition_id()); |
999 | 0 | } |
1000 | 0 | PTabletID t(tablet); |
1001 | 0 | tablets_to_commit.push_back(t); |
1002 | 0 | } |
1003 | 0 | } |
1004 | 0 | if (VLOG_DEBUG_IS_ON) { |
1005 | 0 | std::string msg("close load partitions: "); |
1006 | 0 | msg.reserve(partition_ids.size() * 7); |
1007 | 0 | for (auto v : partition_ids) { |
1008 | 0 | msg.append(std::to_string(v) + ", "); |
1009 | 0 | } |
1010 | 0 | LOG(WARNING) << msg; |
1011 | 0 | } |
1012 | 0 | _load_stream_map->save_tablets_to_commit(dst_id, tablets_to_commit); |
1013 | 0 | } |
1014 | 0 | } |
1015 | | |
1016 | | Status VTabletWriterV2::_create_commit_info(std::vector<TTabletCommitInfo>& tablet_commit_infos, |
1017 | 13 | std::shared_ptr<LoadStreamMap> load_stream_map) { |
1018 | 13 | std::unordered_map<int64_t, int> failed_tablets; |
1019 | 13 | std::unordered_map<int64_t, Status> failed_reason; |
1020 | 33 | load_stream_map->for_each([&](int64_t dst_id, LoadStreamStubs& streams) { |
1021 | 33 | size_t num_success_tablets = 0; |
1022 | 33 | size_t num_failed_tablets = 0; |
1023 | 33 | for (auto [tablet_id, reason] : streams.failed_tablets()) { |
1024 | 12 | failed_tablets[tablet_id]++; |
1025 | 12 | failed_reason[tablet_id] = reason; |
1026 | 12 | num_failed_tablets++; |
1027 | 12 | } |
1028 | 50 | for (auto tablet_id : streams.success_tablets()) { |
1029 | 50 | TTabletCommitInfo commit_info; |
1030 | 50 | commit_info.tabletId = tablet_id; |
1031 | 50 | commit_info.backendId = dst_id; |
1032 | 50 | tablet_commit_infos.emplace_back(std::move(commit_info)); |
1033 | 50 | num_success_tablets++; |
1034 | 50 | } |
1035 | 33 | LOG(INFO) << "streams to dst_id: " << dst_id << ", success tablets: " << num_success_tablets |
1036 | 33 | << ", failed tablets: " << num_failed_tablets; |
1037 | 33 | }); |
1038 | | |
1039 | 13 | for (auto [tablet_id, replicas] : failed_tablets) { |
1040 | 10 | auto [total_replicas_num, load_required_replicas_num] = _tablet_replica_info[tablet_id]; |
1041 | 10 | int max_failed_replicas = total_replicas_num == 0 |
1042 | 10 | ? (_num_replicas - 1) / 2 |
1043 | 10 | : total_replicas_num - load_required_replicas_num; |
1044 | 10 | if (replicas > max_failed_replicas) { |
1045 | 4 | LOG(INFO) << "tablet " << tablet_id |
1046 | 4 | << " failed on majority backends: " << failed_reason[tablet_id]; |
1047 | 4 | return Status::InternalError("tablet {} failed on majority backends: {}", tablet_id, |
1048 | 4 | failed_reason[tablet_id]); |
1049 | 4 | } |
1050 | 10 | } |
1051 | 9 | return Status::OK(); |
1052 | 13 | } |
1053 | | |
1054 | | } // namespace doris |