be/src/exec/sink/writer/vtablet_writer.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | #include <brpc/controller.h> |
20 | | #include <bthread/types.h> |
21 | | #include <butil/errno.h> |
22 | | #include <fmt/format.h> |
23 | | #include <gen_cpp/Exprs_types.h> |
24 | | #include <gen_cpp/FrontendService.h> |
25 | | #include <gen_cpp/FrontendService_types.h> |
26 | | #include <gen_cpp/PaloInternalService_types.h> |
27 | | #include <gen_cpp/Types_types.h> |
28 | | #include <gen_cpp/internal_service.pb.h> |
29 | | #include <gen_cpp/types.pb.h> |
30 | | #include <glog/logging.h> |
31 | | #include <google/protobuf/stubs/callback.h> |
32 | | |
33 | | // IWYU pragma: no_include <bits/chrono.h> |
34 | | #include <bthread/mutex.h> |
35 | | |
36 | | #include <atomic> |
37 | | #include <chrono> // IWYU pragma: keep |
38 | | #include <cstddef> |
39 | | #include <cstdint> |
40 | | #include <functional> |
41 | | #include <map> |
42 | | #include <memory> |
43 | | #include <mutex> |
44 | | #include <ostream> |
45 | | #include <queue> |
46 | | #include <sstream> |
47 | | #include <string> |
48 | | #include <thread> |
49 | | #include <unordered_map> |
50 | | #include <unordered_set> |
51 | | #include <utility> |
52 | | #include <vector> |
53 | | |
54 | | #include "common/config.h" |
55 | | #include "common/status.h" |
56 | | #include "core/block/block.h" |
57 | | #include "core/column/column.h" |
58 | | #include "core/data_type/data_type.h" |
59 | | #include "exec/sink/vrow_distribution.h" |
60 | | #include "exec/sink/vtablet_block_convertor.h" |
61 | | #include "exec/sink/vtablet_finder.h" |
62 | | #include "exec/sink/writer/async_result_writer.h" |
63 | | #include "exprs/vexpr_fwd.h" |
64 | | #include "runtime/exec_env.h" |
65 | | #include "runtime/memory/mem_tracker.h" |
66 | | #include "runtime/runtime_profile.h" |
67 | | #include "runtime/thread_context.h" |
68 | | #include "storage/tablet_info.h" |
69 | | #include "util/brpc_closure.h" |
70 | | #include "util/stopwatch.hpp" |
71 | | |
72 | | namespace doris { |
73 | | class ObjectPool; |
74 | | class RowDescriptor; |
75 | | class RuntimeState; |
76 | | class TDataSink; |
77 | | class TExpr; |
78 | | class Thread; |
79 | | class ThreadPoolToken; |
80 | | class TupleDescriptor; |
81 | | |
82 | | // The counter of add_batch rpc of a single node |
83 | | struct AddBatchCounter { |
84 | | // total execution time of a add_batch rpc |
85 | | int64_t add_batch_execution_time_us = 0; |
86 | | // lock waiting time in a add_batch rpc |
87 | | int64_t add_batch_wait_execution_time_us = 0; |
88 | | // number of add_batch call |
89 | | int64_t add_batch_num = 0; |
90 | | // time passed between marked close and finish close |
91 | | int64_t close_wait_time_ms = 0; |
92 | | |
93 | 554 | AddBatchCounter& operator+=(const AddBatchCounter& rhs) { |
94 | 554 | add_batch_execution_time_us += rhs.add_batch_execution_time_us; |
95 | 554 | add_batch_wait_execution_time_us += rhs.add_batch_wait_execution_time_us; |
96 | 554 | add_batch_num += rhs.add_batch_num; |
97 | 554 | close_wait_time_ms += rhs.close_wait_time_ms; |
98 | 554 | return *this; |
99 | 554 | } |
100 | 0 | friend AddBatchCounter operator+(const AddBatchCounter& lhs, const AddBatchCounter& rhs) { |
101 | 0 | AddBatchCounter sum = lhs; |
102 | 0 | sum += rhs; |
103 | 0 | return sum; |
104 | 0 | } |
105 | | }; |
106 | | |
107 | | struct WriteBlockCallbackContext { |
108 | | std::atomic<bool> _is_last_rpc {false}; |
109 | | }; |
110 | | |
111 | | // It's very error-prone to guarantee the handler capture vars' & this closure's destruct sequence. |
112 | | // So using create() to get the closure pointer is recommended. We can delete the closure ptr before the capture vars destruction. |
113 | | // Delete this point is safe, don't worry about RPC callback will run after WriteBlockCallback deleted. |
114 | | // "Ping-Pong" between sender and receiver, `try_set_in_flight` when send, `clear_in_flight` after rpc failure or callback, |
115 | | // then next send will start, and it will wait for the rpc callback to complete when it is destroyed. |
116 | | template <typename T> |
117 | | class WriteBlockCallback final : public ::doris::DummyBrpcCallback<T> { |
118 | | ENABLE_FACTORY_CREATOR(WriteBlockCallback); |
119 | | |
120 | | public: |
121 | 556 | WriteBlockCallback() : cid(INVALID_BTHREAD_ID) {} |
122 | 556 | ~WriteBlockCallback() override = default; |
123 | | |
124 | 556 | void addFailedHandler(const std::function<void(const WriteBlockCallbackContext&)>& fn) { |
125 | 556 | failed_handler = fn; |
126 | 556 | } |
127 | | void addSuccessHandler( |
128 | 556 | const std::function<void(const T&, const WriteBlockCallbackContext&)>& fn) { |
129 | 556 | success_handler = fn; |
130 | 556 | } |
131 | | |
132 | 0 | void join() override { |
133 | | // We rely on in_flight to assure one rpc is running, |
134 | | // while cid is not reliable due to memory order. |
135 | | // in_flight is written before getting callid, |
136 | | // so we can not use memory fence to synchronize. |
137 | 0 | while (_packet_in_flight) { |
138 | | // cid here is complicated |
139 | 0 | if (cid != INVALID_BTHREAD_ID) { |
140 | | // actually cid may be the last rpc call id. |
141 | 0 | brpc::Join(cid); |
142 | 0 | } |
143 | 0 | if (_packet_in_flight) { |
144 | 0 | std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
145 | 0 | } |
146 | 0 | } |
147 | 0 | } |
148 | | |
149 | | // plz follow this order: reset() -> set_in_flight() -> send brpc batch |
150 | 556 | void reset() { |
151 | 556 | ::doris::DummyBrpcCallback<T>::cntl_->Reset(); |
152 | 556 | cid = ::doris::DummyBrpcCallback<T>::cntl_->call_id(); |
153 | 556 | } |
154 | | |
155 | | // if _packet_in_flight == false, set it to true. Return true. |
156 | | // if _packet_in_flight == true, Return false. |
157 | 86.6k | bool try_set_in_flight() { |
158 | 86.6k | bool value = false; |
159 | 86.6k | return _packet_in_flight.compare_exchange_strong(value, true); |
160 | 86.6k | } |
161 | | |
162 | 86.2k | void clear_in_flight() { _packet_in_flight = false; } |
163 | | |
164 | | bool is_packet_in_flight() { return _packet_in_flight; } |
165 | | |
166 | 556 | void end_mark() { |
167 | 556 | DCHECK(_ctx._is_last_rpc == false); |
168 | 556 | _ctx._is_last_rpc = true; |
169 | 556 | } |
170 | | |
171 | 554 | void call() override { |
172 | 554 | DCHECK(_packet_in_flight); |
173 | 554 | if (::doris::DummyBrpcCallback<T>::cntl_->Failed()) { |
174 | 0 | LOG(WARNING) << "failed to send brpc batch, error=" |
175 | 0 | << berror(::doris::DummyBrpcCallback<T>::cntl_->ErrorCode()) |
176 | 0 | << ", error_text=" << ::doris::DummyBrpcCallback<T>::cntl_->ErrorText(); |
177 | 0 | failed_handler(_ctx); |
178 | 554 | } else { |
179 | 554 | success_handler(*(::doris::DummyBrpcCallback<T>::response_), _ctx); |
180 | 554 | } |
181 | 554 | clear_in_flight(); |
182 | 554 | } |
183 | | |
184 | | private: |
185 | | brpc::CallId cid; |
186 | | std::atomic<bool> _packet_in_flight {false}; |
187 | | WriteBlockCallbackContext _ctx; |
188 | | std::function<void(const WriteBlockCallbackContext&)> failed_handler; |
189 | | std::function<void(const T&, const WriteBlockCallbackContext&)> success_handler; |
190 | | }; |
191 | | |
192 | | class IndexChannel; |
193 | | class VTabletWriter; |
194 | | |
195 | | class VNodeChannelStat { |
196 | | public: |
197 | 556 | VNodeChannelStat& operator+=(const VNodeChannelStat& stat) { |
198 | 556 | mem_exceeded_block_ns += stat.mem_exceeded_block_ns; |
199 | 556 | where_clause_ns += stat.where_clause_ns; |
200 | 556 | append_node_channel_ns += stat.append_node_channel_ns; |
201 | 556 | return *this; |
202 | 556 | }; |
203 | | |
204 | | int64_t mem_exceeded_block_ns = 0; |
205 | | int64_t where_clause_ns = 0; |
206 | | int64_t append_node_channel_ns = 0; |
207 | | }; |
208 | | |
209 | | struct WriterStats { |
210 | | int64_t serialize_batch_ns = 0; |
211 | | int64_t queue_push_lock_ns = 0; |
212 | | int64_t actual_consume_ns = 0; |
213 | | int64_t total_add_batch_exec_time_ns = 0; |
214 | | int64_t max_add_batch_exec_time_ns = 0; |
215 | | int64_t total_wait_exec_time_ns = 0; |
216 | | int64_t max_wait_exec_time_ns = 0; |
217 | | int64_t total_add_batch_num = 0; |
218 | | int64_t num_node_channels = 0; |
219 | | int64_t load_back_pressure_version_time_ms = 0; |
220 | | VNodeChannelStat channel_stat; |
221 | | }; |
222 | | |
223 | | // pair<row_id,tablet_id> |
224 | | using Payload = std::pair<std::unique_ptr<IColumn::Selector>, std::vector<int64_t>>; |
225 | | |
226 | | // every NodeChannel keeps a data transmission channel with one BE. for multiple times open, it has a dozen of requests and corresponding closures. |
227 | | class VNodeChannel { |
228 | | public: |
229 | | VNodeChannel(VTabletWriter* parent, IndexChannel* index_channel, int64_t node_id, |
230 | | bool is_incremental = false); |
231 | | |
232 | | ~VNodeChannel(); |
233 | | |
234 | | // called before open, used to add tablet located in this backend. called by IndexChannel::init |
235 | 2.96k | void add_tablet(const TTabletWithPartition& tablet) { _tablets_wait_open.emplace_back(tablet); } |
236 | 0 | std::string debug_tablets() const { |
237 | 0 | std::stringstream ss; |
238 | 0 | for (const auto& tab : _all_tablets) { |
239 | 0 | tab.printTo(ss); |
240 | 0 | ss << '\n'; |
241 | 0 | } |
242 | 0 | return ss.str(); |
243 | 0 | } |
244 | | |
245 | 0 | void add_slave_tablet_nodes(int64_t tablet_id, const std::vector<int64_t>& slave_nodes) { |
246 | 0 | _slave_tablet_nodes[tablet_id] = slave_nodes; |
247 | 0 | } |
248 | | |
249 | | // this function is NON_REENTRANT |
250 | | Status init(RuntimeState* state); |
251 | | /// these two functions will call open_internal. should keep that clear --- REENTRANT |
252 | | // build corresponding connect to BE. NON-REENTRANT |
253 | | void open(); |
254 | | // for auto partition, we use this to open more tablet. KEEP IT REENTRANT |
255 | | void incremental_open(); |
256 | | // this will block until all request transmission which were opened or incremental opened finished. |
257 | | // this function will called multi times. NON_REENTRANT |
258 | | Status open_wait(); |
259 | | |
260 | | Status add_block(Block* block, const Payload* payload); |
261 | | |
262 | | // @return: 1 if running, 0 if finished. |
263 | | // @caller: VOlapTabletSink::_send_batch_process. it's a continual asynchronous process. |
264 | | int try_send_and_fetch_status(RuntimeState* state, |
265 | | std::unique_ptr<ThreadPoolToken>& thread_pool_token); |
266 | | // when there's pending block found by try_send_and_fetch_status(), we will awake a thread to send it. |
267 | | void try_send_pending_block(RuntimeState* state); |
268 | | |
269 | | void clear_all_blocks(); |
270 | | |
271 | | // two ways to stop channel: |
272 | | // 1. mark_close()->close_wait() PS. close_wait() will block waiting for the last AddBatch rpc response. |
273 | | // 2. just cancel() |
274 | | // hang_wait = true will make reciever hang until all sender mark_closed. |
275 | | void mark_close(bool hang_wait = false); |
276 | | |
277 | 556 | bool is_closed() const { return _is_closed; } |
278 | 556 | bool is_cancelled() const { return _cancelled; } |
279 | 556 | std::string get_cancel_msg() { |
280 | 556 | std::lock_guard<std::mutex> l(_cancel_msg_lock); |
281 | 556 | if (!_cancel_msg.empty()) { |
282 | 0 | return _cancel_msg; |
283 | 0 | } |
284 | 556 | return fmt::format("{} is cancelled", channel_info()); |
285 | 556 | } |
286 | | |
287 | | // two ways to stop channel: |
288 | | // 1. mark_close()->close_wait() PS. close_wait() will block waiting for the last AddBatch rpc response. |
289 | | // 2. just cancel() |
290 | | Status close_wait(RuntimeState* state, bool* is_closed); |
291 | | |
292 | | Status after_close_handle( |
293 | | RuntimeState* state, WriterStats* writer_stats, |
294 | | std::unordered_map<int64_t, AddBatchCounter>* node_add_batch_counter_map); |
295 | | |
296 | | Status check_status(); |
297 | | |
298 | | void cancel(const std::string& cancel_msg); |
299 | | |
300 | | void time_report(std::unordered_map<int64_t, AddBatchCounter>* add_batch_counter_map, |
301 | 556 | WriterStats* writer_stats) const { |
302 | 556 | if (add_batch_counter_map != nullptr) { |
303 | 554 | (*add_batch_counter_map)[_node_id] += _add_batch_counter; |
304 | 554 | (*add_batch_counter_map)[_node_id].close_wait_time_ms = _close_time_ms; |
305 | 554 | } |
306 | 556 | if (writer_stats != nullptr) { |
307 | 554 | writer_stats->serialize_batch_ns += _serialize_batch_ns; |
308 | 554 | writer_stats->channel_stat += _stat; |
309 | 554 | writer_stats->queue_push_lock_ns += _queue_push_lock_ns; |
310 | 554 | writer_stats->actual_consume_ns += _actual_consume_ns; |
311 | 554 | writer_stats->total_add_batch_exec_time_ns += |
312 | 554 | (_add_batch_counter.add_batch_execution_time_us * 1000); |
313 | 554 | writer_stats->total_wait_exec_time_ns += |
314 | 554 | (_add_batch_counter.add_batch_wait_execution_time_us * 1000); |
315 | 554 | writer_stats->total_add_batch_num += _add_batch_counter.add_batch_num; |
316 | 554 | writer_stats->load_back_pressure_version_time_ms += |
317 | 554 | _load_back_pressure_version_block_ms; |
318 | 554 | } |
319 | 556 | } |
320 | | |
321 | 0 | int64_t node_id() const { return _node_id; } |
322 | 0 | std::string host() const { return _node_info.host; } |
323 | 0 | std::string name() const { return _name; } |
324 | | |
325 | 1.11k | std::string channel_info() const { |
326 | 1.11k | return fmt::format("{}, {}, node={}:{}", _name, _load_info, _node_info.host, |
327 | 1.11k | _node_info.brpc_port); |
328 | 1.11k | } |
329 | | |
330 | 0 | size_t get_pending_bytes() { return _pending_batches_bytes; } |
331 | | |
332 | 0 | bool is_incremental() const { return _is_incremental; } |
333 | | |
334 | 0 | int64_t write_bytes() const { return _write_bytes.load(); } |
335 | | |
336 | | protected: |
337 | | // make a real open request for relative BE's load channel. |
338 | | void _open_internal(bool is_incremental); |
339 | | |
340 | | void _close_check(); |
341 | | void _cancel_with_msg(const std::string& msg); |
342 | | |
343 | | void _add_block_success_callback(const PTabletWriterAddBlockResult& result, |
344 | | const WriteBlockCallbackContext& ctx); |
345 | | void _add_block_failed_callback(const WriteBlockCallbackContext& ctx); |
346 | | |
347 | | void _refresh_back_pressure_version_wait_time( |
348 | | const ::google::protobuf::RepeatedPtrField<::doris::PTabletLoadRowsetInfo>& |
349 | | tablet_load_infos); |
350 | | |
351 | | VTabletWriter* _parent = nullptr; |
352 | | IndexChannel* _index_channel = nullptr; |
353 | | int64_t _node_id = -1; |
354 | | std::string _load_info; |
355 | | std::string _name; |
356 | | |
357 | | std::shared_ptr<MemTracker> _node_channel_tracker; |
358 | | int64_t _load_mem_limit = -1; |
359 | | |
360 | | TupleDescriptor* _tuple_desc = nullptr; |
361 | | NodeInfo _node_info; |
362 | | |
363 | | // this should be set in init() using config |
364 | | int _rpc_timeout_ms = 60000; |
365 | | int64_t _next_packet_seq = 0; |
366 | | MonotonicStopWatch _timeout_watch; |
367 | | |
368 | | // the timestamp when this node channel be marked closed and finished closed |
369 | | uint64_t _close_time_ms = 0; |
370 | | |
371 | | // user cancel or get some errors |
372 | | std::atomic<bool> _cancelled {false}; |
373 | | std::mutex _cancel_msg_lock; |
374 | | std::string _cancel_msg; |
375 | | |
376 | | // send finished means the consumer thread which send the rpc can exit |
377 | | std::atomic<bool> _send_finished {false}; |
378 | | |
379 | | // add batches finished means the last rpc has be response, used to check whether this channel can be closed |
380 | | std::atomic<bool> _add_batches_finished {false}; // reuse for vectorized |
381 | | |
382 | | bool _eos_is_produced {false}; // only for restricting producer behaviors |
383 | | |
384 | | std::unique_ptr<RowDescriptor> _row_desc; |
385 | | int _batch_size = 0; |
386 | | |
387 | | // limit _pending_batches size |
388 | | std::atomic<size_t> _pending_batches_bytes {0}; |
389 | | size_t _max_pending_batches_bytes {(size_t)config::nodechannel_pending_queue_max_bytes}; |
390 | | std::mutex _pending_batches_lock; // reuse for vectorized |
391 | | std::atomic<int> _pending_batches_num {0}; // reuse for vectorized |
392 | | |
393 | | std::shared_ptr<PBackendService_Stub> _stub; |
394 | | // because we have incremantal open, we should keep one relative closure for one request. it's similarly for adding block. |
395 | | std::vector<std::shared_ptr<DummyBrpcCallback<PTabletWriterOpenResult>>> _open_callbacks; |
396 | | |
397 | | std::vector<TTabletWithPartition> _all_tablets; |
398 | | std::vector<TTabletWithPartition> _tablets_wait_open; |
399 | | // map from tablet_id to node_id where slave replicas locate in |
400 | | std::unordered_map<int64_t, std::vector<int64_t>> _slave_tablet_nodes; |
401 | | std::vector<TTabletCommitInfo> _tablet_commit_infos; |
402 | | |
403 | | AddBatchCounter _add_batch_counter; |
404 | | std::atomic<int64_t> _serialize_batch_ns {0}; |
405 | | std::atomic<int64_t> _queue_push_lock_ns {0}; |
406 | | std::atomic<int64_t> _actual_consume_ns {0}; |
407 | | std::atomic<int64_t> _load_back_pressure_version_block_ms {0}; |
408 | | |
409 | | VNodeChannelStat _stat; |
410 | | // lock to protect _is_closed. |
411 | | // The methods in the IndexChannel are called back in the RpcClosure in the NodeChannel. |
412 | | // However, this rpc callback may occur after the whole task is finished (e.g. due to network latency), |
413 | | // and by that time the IndexChannel may have been destructured, so we should not call the |
414 | | // IndexChannel methods anymore, otherwise the BE will crash. |
415 | | // Therefore, we use the _is_closed and _closed_lock to ensure that the RPC callback |
416 | | // function will not call the IndexChannel method after the NodeChannel is closed. |
417 | | // The IndexChannel is definitely accessible until the NodeChannel is closed. |
418 | | std::mutex _closed_lock; |
419 | | bool _is_closed = false; |
420 | | bool _inited = false; |
421 | | |
422 | | RuntimeState* _state = nullptr; |
423 | | // A context lock for callbacks, the callback has to lock the ctx, to avoid |
424 | | // the object is deleted during callback is running. |
425 | | std::weak_ptr<TaskExecutionContext> _task_exec_ctx; |
426 | | // rows number received per tablet, tablet_id -> rows_num |
427 | | std::vector<std::pair<int64_t, int64_t>> _tablets_received_rows; |
428 | | // rows number filtered per tablet, tablet_id -> filtered_rows_num |
429 | | std::vector<std::pair<int64_t, int64_t>> _tablets_filtered_rows; |
430 | | |
431 | | // build a _cur_mutable_block and push into _pending_blocks. when not building, this block is empty. |
432 | | std::unique_ptr<MutableBlock> _cur_mutable_block; |
433 | | std::shared_ptr<PTabletWriterAddBlockRequest> _cur_add_block_request; |
434 | | |
435 | | using AddBlockReq = |
436 | | std::pair<std::unique_ptr<MutableBlock>, std::shared_ptr<PTabletWriterAddBlockRequest>>; |
437 | | std::queue<AddBlockReq> _pending_blocks; |
438 | | // send block to slave BE rely on this. dont reconstruct it. |
439 | | std::shared_ptr<WriteBlockCallback<PTabletWriterAddBlockResult>> _send_block_callback = nullptr; |
440 | | |
441 | | int64_t _wg_id = -1; |
442 | | |
443 | | bool _is_incremental; |
444 | | |
445 | | std::atomic<int64_t> _write_bytes {0}; |
446 | | std::atomic<int64_t> _load_back_pressure_version_wait_time_ms {0}; |
447 | | }; |
448 | | |
449 | | // an IndexChannel is related to specific table and its rollup and mv |
450 | | class IndexChannel { |
451 | | public: |
452 | | IndexChannel(VTabletWriter* parent, int64_t index_id, VExprContextSPtr where_clause) |
453 | 550 | : _parent(parent), _index_id(index_id), _where_clause(std::move(where_clause)) { |
454 | 550 | _index_channel_tracker = |
455 | 550 | std::make_unique<MemTracker>("IndexChannel:indexID=" + std::to_string(_index_id)); |
456 | 550 | } |
457 | 556 | ~IndexChannel() = default; |
458 | | |
459 | | // allow to init multi times, for incremental open more tablets for one index(table) |
460 | | Status init(RuntimeState* state, const std::vector<TTabletWithPartition>& tablets, |
461 | | bool incremental = false); |
462 | | |
463 | | void for_each_node_channel( |
464 | 89.3k | const std::function<void(const std::shared_ptr<VNodeChannel>&)>& func) { |
465 | 89.3k | for (auto& it : _node_channels) { |
466 | 89.3k | func(it.second); |
467 | 89.3k | } |
468 | 89.3k | } |
469 | | |
470 | | void for_init_node_channel( |
471 | 0 | const std::function<void(const std::shared_ptr<VNodeChannel>&)>& func) { |
472 | 0 | for (auto& it : _node_channels) { |
473 | 0 | if (!it.second->is_incremental()) { |
474 | 0 | func(it.second); |
475 | 0 | } |
476 | 0 | } |
477 | 0 | } |
478 | | |
479 | | void for_inc_node_channel( |
480 | 0 | const std::function<void(const std::shared_ptr<VNodeChannel>&)>& func) { |
481 | 0 | for (auto& it : _node_channels) { |
482 | 0 | if (it.second->is_incremental()) { |
483 | 0 | func(it.second); |
484 | 0 | } |
485 | 0 | } |
486 | 0 | } |
487 | | |
488 | 0 | std::unordered_set<int64_t> init_node_channel_ids() { |
489 | 0 | std::unordered_set<int64_t> node_channel_ids; |
490 | 0 | for (auto& it : _node_channels) { |
491 | 0 | if (!it.second->is_incremental()) { |
492 | 0 | node_channel_ids.insert(it.first); |
493 | 0 | } |
494 | 0 | } |
495 | 0 | return node_channel_ids; |
496 | 0 | } |
497 | | |
498 | 0 | std::unordered_set<int64_t> inc_node_channel_ids() { |
499 | 0 | std::unordered_set<int64_t> node_channel_ids; |
500 | 0 | for (auto& it : _node_channels) { |
501 | 0 | if (it.second->is_incremental()) { |
502 | 0 | node_channel_ids.insert(it.first); |
503 | 0 | } |
504 | 0 | } |
505 | 0 | return node_channel_ids; |
506 | 0 | } |
507 | | |
508 | 556 | std::unordered_set<int64_t> each_node_channel_ids() { |
509 | 556 | std::unordered_set<int64_t> node_channel_ids; |
510 | 556 | for (auto& it : _node_channels) { |
511 | 556 | node_channel_ids.insert(it.first); |
512 | 556 | } |
513 | 556 | return node_channel_ids; |
514 | 556 | } |
515 | | |
516 | 556 | bool has_incremental_node_channel() const { return _has_inc_node; } |
517 | | |
518 | | void mark_as_failed(const VNodeChannel* node_channel, const std::string& err, |
519 | | int64_t tablet_id = -1); |
520 | | Status check_intolerable_failure(); |
521 | | |
522 | | Status close_wait(RuntimeState* state, WriterStats* writer_stats, |
523 | | std::unordered_map<int64_t, AddBatchCounter>* node_add_batch_counter_map, |
524 | | std::unordered_set<int64_t> unfinished_node_channel_ids, |
525 | | bool need_wait_after_quorum_success); |
526 | | |
527 | | Status check_each_node_channel_close( |
528 | | std::unordered_set<int64_t>* unfinished_node_channel_ids, |
529 | | std::unordered_map<int64_t, AddBatchCounter>* node_add_batch_counter_map, |
530 | | WriterStats* writer_stats, Status status); |
531 | | |
532 | | // set error tablet info in runtime state, so that it can be returned to FE. |
533 | | void set_error_tablet_in_state(RuntimeState* state); |
534 | | |
535 | 88.6k | size_t num_node_channels() const { return _node_channels.size(); } |
536 | | |
537 | 0 | size_t get_pending_bytes() const { |
538 | 0 | size_t mem_consumption = 0; |
539 | 0 | for (const auto& kv : _node_channels) { |
540 | 0 | mem_consumption += kv.second->get_pending_bytes(); |
541 | 0 | } |
542 | 0 | return mem_consumption; |
543 | 0 | } |
544 | | |
545 | | void set_tablets_received_rows( |
546 | | const std::vector<std::pair<int64_t, int64_t>>& tablets_received_rows, int64_t node_id); |
547 | | |
548 | | void set_tablets_filtered_rows( |
549 | | const std::vector<std::pair<int64_t, int64_t>>& tablets_filtered_rows, int64_t node_id); |
550 | | |
551 | 0 | int64_t num_rows_filtered() { |
552 | | // the Unique table has no roll up or materilized view |
553 | | // we just add up filtered rows from all partitions |
554 | 0 | return std::accumulate(_tablets_filtered_rows.cbegin(), _tablets_filtered_rows.cend(), 0, |
555 | 0 | [](int64_t sum, const auto& a) { return sum + a.second[0].second; }); |
556 | 0 | } |
557 | | |
558 | | // check whether the rows num written by different replicas is consistent |
559 | | Status check_tablet_received_rows_consistency(); |
560 | | |
561 | | // check whether the rows num filtered by different replicas is consistent |
562 | | Status check_tablet_filtered_rows_consistency(); |
563 | | |
564 | 556 | void set_start_time(const int64_t& start_time) { _start_time = start_time; } |
565 | | |
566 | 0 | VExprContextSPtr get_where_clause() { return _where_clause; } |
567 | | |
568 | | private: |
569 | | friend class VNodeChannel; |
570 | | friend class VTabletWriter; |
571 | | friend class VRowDistribution; |
572 | | |
573 | | int _max_failed_replicas(int64_t tablet_id); |
574 | | |
575 | | int _load_required_replicas_num(int64_t tablet_id); |
576 | | |
577 | | bool _quorum_success(const std::unordered_set<int64_t>& unfinished_node_channel_ids, |
578 | | const std::unordered_set<int64_t>& need_finish_tablets); |
579 | | |
580 | | int64_t _calc_max_wait_time_ms(const std::unordered_set<int64_t>& unfinished_node_channel_ids); |
581 | | |
582 | | VTabletWriter* _parent = nullptr; |
583 | | int64_t _index_id; |
584 | | VExprContextSPtr _where_clause; |
585 | | |
586 | | // from backend channel to tablet_id |
587 | | // ATTN: must be placed before `_node_channels` and `_channels_by_tablet`. |
588 | | // Because the destruct order of objects is opposite to the creation order. |
589 | | // So NodeChannel will be destructured first. |
590 | | // And the destructor function of NodeChannel waits for all RPCs to finish. |
591 | | // This ensures that it is safe to use `_tablets_by_channel` in the callback function for the end of the RPC. |
592 | | std::unordered_map<int64_t, std::unordered_set<int64_t>> _tablets_by_channel; |
593 | | // BeId -> channel |
594 | | std::unordered_map<int64_t, std::shared_ptr<VNodeChannel>> _node_channels; |
595 | | // from tablet_id to backend channel |
596 | | std::unordered_map<int64_t, std::vector<std::shared_ptr<VNodeChannel>>> _channels_by_tablet; |
597 | | bool _has_inc_node = false; |
598 | | |
599 | | // lock to protect _failed_channels and _failed_channels_msgs |
600 | | mutable std::mutex _fail_lock; |
601 | | // key is tablet_id, value is a set of failed node id |
602 | | std::unordered_map<int64_t, std::unordered_set<int64_t>> _failed_channels; |
603 | | // key is tablet_id, value is error message |
604 | | std::unordered_map<int64_t, std::string> _failed_channels_msgs; |
605 | | Status _intolerable_failure_status = Status::OK(); |
606 | | |
607 | | std::unique_ptr<MemTracker> _index_channel_tracker; |
608 | | // rows num received by DeltaWriter per tablet, tablet_id -> <node_Id, rows_num> |
609 | | // used to verify whether the rows num received by different replicas is consistent |
610 | | std::map<int64_t, std::vector<std::pair<int64_t, int64_t>>> _tablets_received_rows; |
611 | | |
612 | | // rows num filtered by DeltaWriter per tablet, tablet_id -> <node_Id, filtered_rows_num> |
613 | | // used to verify whether the rows num filtered by different replicas is consistent |
614 | | std::map<int64_t, std::vector<std::pair<int64_t, int64_t>>> _tablets_filtered_rows; |
615 | | |
616 | | int64_t _start_time = 0; |
617 | | }; |
618 | | } // namespace doris |
619 | | |
620 | | namespace doris { |
621 | | // |
622 | | // write result to file |
623 | | class VTabletWriter final : public AsyncResultWriter { |
624 | | public: |
625 | | VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs, |
626 | | std::shared_ptr<Dependency> dep, std::shared_ptr<Dependency> fin_dep); |
627 | | |
628 | | Status write(RuntimeState* state, Block& block) override; |
629 | | |
630 | | Status close(Status) override; |
631 | | |
632 | | Status open(RuntimeState* state, RuntimeProfile* profile) override; |
633 | | |
634 | | // the consumer func of sending pending batches in every NodeChannel. |
635 | | // use polling & NodeChannel::try_send_and_fetch_status() to achieve nonblocking sending. |
636 | | // only focus on pending batches and channel status, the internal errors of NodeChannels will be handled by the producer |
637 | | void _send_batch_process(); |
638 | | |
639 | | Status on_partitions_created(TCreatePartitionResult* result); |
640 | | |
641 | | Status _send_new_partition_batch(); |
642 | | |
643 | | private: |
644 | | friend class VNodeChannel; |
645 | | friend class IndexChannel; |
646 | | |
647 | | using ChannelDistributionPayload = std::unordered_map<VNodeChannel*, Payload>; |
648 | | using ChannelDistributionPayloadVec = std::vector<std::unordered_map<VNodeChannel*, Payload>>; |
649 | | |
650 | | Status _init_row_distribution(); |
651 | | |
652 | | Status _init(RuntimeState* state, RuntimeProfile* profile); |
653 | | |
654 | | void _generate_one_index_channel_payload(RowPartTabletIds& row_part_tablet_tuple, |
655 | | int32_t index_idx, |
656 | | ChannelDistributionPayload& channel_payload); |
657 | | |
658 | | void _generate_index_channels_payloads(std::vector<RowPartTabletIds>& row_part_tablet_ids, |
659 | | ChannelDistributionPayloadVec& payload); |
660 | | |
661 | | void _cancel_all_channel(Status status); |
662 | | |
663 | | Status _incremental_open_node_channel(const std::vector<TOlapTablePartition>& partitions); |
664 | | |
665 | | void _do_try_close(RuntimeState* state, const Status& exec_status); |
666 | | |
667 | | void _build_tablet_replica_info(const int64_t tablet_id, VOlapTablePartition* partition); |
668 | | |
669 | | TDataSink _t_sink; |
670 | | |
671 | | std::shared_ptr<MemTracker> _mem_tracker; |
672 | | |
673 | | ObjectPool* _pool = nullptr; |
674 | | |
675 | | bthread_t _sender_thread = 0; |
676 | | |
677 | | // unique load id |
678 | | PUniqueId _load_id; |
679 | | int64_t _txn_id = -1; |
680 | | int _num_replicas = -1; |
681 | | int _tuple_desc_id = -1; |
682 | | |
683 | | // this is tuple descriptor of destination OLAP table |
684 | | TupleDescriptor* _output_tuple_desc = nullptr; |
685 | | RowDescriptor* _output_row_desc = nullptr; |
686 | | |
687 | | // number of senders used to insert into OlapTable, if we only support single node insert, |
688 | | // all data from select should collectted and then send to OlapTable. |
689 | | // To support multiple senders, we maintain a channel for each sender. |
690 | | int _sender_id = -1; |
691 | | int _num_senders = -1; |
692 | | bool _is_high_priority = false; |
693 | | |
694 | | // TODO(zc): think about cache this data |
695 | | std::shared_ptr<OlapTableSchemaParam> _schema; |
696 | | OlapTableLocationParam* _location = nullptr; |
697 | | bool _write_single_replica = false; |
698 | | OlapTableLocationParam* _slave_location = nullptr; |
699 | | DorisNodesInfo* _nodes_info = nullptr; |
700 | | |
701 | | std::unique_ptr<OlapTabletFinder> _tablet_finder; |
702 | | |
703 | | // index_channel |
704 | | bthread::Mutex _stop_check_channel; |
705 | | std::vector<std::shared_ptr<IndexChannel>> _channels; |
706 | | std::unordered_map<int64_t, std::shared_ptr<IndexChannel>> _index_id_to_channel; |
707 | | |
708 | | std::unique_ptr<ThreadPoolToken> _send_batch_thread_pool_token; |
709 | | |
710 | | // support only one partition column now |
711 | | std::vector<std::vector<TStringLiteral>> _partitions_need_create; |
712 | | |
713 | | std::unique_ptr<OlapTableBlockConvertor> _block_convertor; |
714 | | // Stats for this |
715 | | int64_t _send_data_ns = 0; |
716 | | int64_t _number_input_rows = 0; |
717 | | int64_t _number_output_rows = 0; |
718 | | int64_t _filter_ns = 0; |
719 | | |
720 | | MonotonicStopWatch _row_distribution_watch; |
721 | | |
722 | | RuntimeProfile::Counter* _input_rows_counter = nullptr; |
723 | | RuntimeProfile::Counter* _output_rows_counter = nullptr; |
724 | | RuntimeProfile::Counter* _filtered_rows_counter = nullptr; |
725 | | RuntimeProfile::Counter* _send_data_timer = nullptr; |
726 | | RuntimeProfile::Counter* _row_distribution_timer = nullptr; |
727 | | RuntimeProfile::Counter* _append_node_channel_timer = nullptr; |
728 | | RuntimeProfile::Counter* _filter_timer = nullptr; |
729 | | RuntimeProfile::Counter* _where_clause_timer = nullptr; |
730 | | RuntimeProfile::Counter* _add_partition_request_timer = nullptr; |
731 | | RuntimeProfile::Counter* _wait_mem_limit_timer = nullptr; |
732 | | RuntimeProfile::Counter* _validate_data_timer = nullptr; |
733 | | RuntimeProfile::Counter* _open_timer = nullptr; |
734 | | RuntimeProfile::Counter* _close_timer = nullptr; |
735 | | RuntimeProfile::Counter* _non_blocking_send_timer = nullptr; |
736 | | RuntimeProfile::Counter* _non_blocking_send_work_timer = nullptr; |
737 | | RuntimeProfile::Counter* _serialize_batch_timer = nullptr; |
738 | | RuntimeProfile::Counter* _total_add_batch_exec_timer = nullptr; |
739 | | RuntimeProfile::Counter* _max_add_batch_exec_timer = nullptr; |
740 | | RuntimeProfile::Counter* _total_wait_exec_timer = nullptr; |
741 | | RuntimeProfile::Counter* _max_wait_exec_timer = nullptr; |
742 | | RuntimeProfile::Counter* _add_batch_number = nullptr; |
743 | | RuntimeProfile::Counter* _num_node_channels = nullptr; |
744 | | RuntimeProfile::Counter* _load_back_pressure_version_time_ms = nullptr; |
745 | | |
746 | | // the timeout of load channels opened by this tablet sink. in second |
747 | | int64_t _load_channel_timeout_s = 0; |
748 | | // the load txn absolute expiration time. |
749 | | int64_t _txn_expiration = 0; |
750 | | |
751 | | int32_t _send_batch_parallelism = 1; |
752 | | // Save the status of try_close() and close() method |
753 | | Status _close_status; |
754 | | // if we called try_close(), for auto partition the periodic send thread should stop if it's still waiting for node channels first-time open. |
755 | | bool _try_close = false; |
756 | | // for non-pipeline, if close() did something, close_wait() should wait it. |
757 | | bool _close_wait = false; |
758 | | bool _inited = false; |
759 | | bool _write_file_cache = false; |
760 | | |
761 | | // User can change this config at runtime, avoid it being modified during query or loading process. |
762 | | bool _transfer_large_data_by_brpc = false; |
763 | | |
764 | | VOlapTablePartitionParam* _vpartition = nullptr; |
765 | | |
766 | | RuntimeState* _state = nullptr; // not owned, set when open |
767 | | |
768 | | VRowDistribution _row_distribution; |
769 | | // reuse to avoid frequent memory allocation and release. |
770 | | std::vector<RowPartTabletIds> _row_part_tablet_ids; |
771 | | |
772 | | // tablet_id -> <total replicas num, load required replicas num> |
773 | | std::unordered_map<int64_t, std::pair<int, int>> _tablet_replica_info; |
774 | | }; |
775 | | } // namespace doris |