be/src/load/group_commit/group_commit_mgr.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <gen_cpp/PaloInternalService_types.h> |
21 | | |
22 | | #include <atomic> |
23 | | #include <condition_variable> |
24 | | #include <cstdint> |
25 | | #include <future> |
26 | | #include <memory> |
27 | | #include <mutex> |
28 | | #include <shared_mutex> |
29 | | #include <unordered_map> |
30 | | #include <utility> |
31 | | |
32 | | #include "common/status.h" |
33 | | #include "core/block/block.h" |
34 | | #include "exec/sink/writer/vwal_writer.h" |
35 | | #include "load/group_commit/wal/wal_manager.h" |
36 | | #include "runtime/exec_env.h" |
37 | | #include "util/threadpool.h" |
38 | | |
39 | | namespace doris { |
40 | | class ExecEnv; |
41 | | class TUniqueId; |
42 | | class RuntimeState; |
43 | | |
44 | | class Dependency; |
45 | | |
46 | | struct BlockData { |
47 | 0 | BlockData(const std::shared_ptr<Block>& block) : block(block), block_bytes(block->bytes()) {}; |
48 | | std::shared_ptr<Block> block; |
49 | | size_t block_bytes; |
50 | | }; |
51 | | |
52 | | class LoadBlockQueue { |
53 | | public: |
54 | | LoadBlockQueue(const UniqueId& load_instance_id, std::string& label, int64_t txn_id, |
55 | | int64_t schema_version, int64_t index_size, |
56 | | std::shared_ptr<std::atomic_size_t> all_block_queues_bytes, |
57 | | bool wait_internal_group_commit_finish, int64_t group_commit_interval_ms, |
58 | | int64_t group_commit_data_bytes) |
59 | 0 | : load_instance_id(load_instance_id), |
60 | 0 | label(label), |
61 | 0 | txn_id(txn_id), |
62 | 0 | schema_version(schema_version), |
63 | 0 | index_size(index_size), |
64 | 0 | wait_internal_group_commit_finish(wait_internal_group_commit_finish), |
65 | 0 | _group_commit_interval_ms(group_commit_interval_ms), |
66 | 0 | _start_time(std::chrono::steady_clock::now()), |
67 | 0 | _last_print_time(_start_time), |
68 | 0 | _group_commit_data_bytes(group_commit_data_bytes), |
69 | 0 | _all_block_queues_bytes(all_block_queues_bytes) {}; |
70 | | |
71 | | Status add_block(RuntimeState* runtime_state, std::shared_ptr<Block> block, bool write_wal, |
72 | | UniqueId& load_id); |
73 | | Status get_block(RuntimeState* runtime_state, Block* block, bool* find_block, bool* eos, |
74 | | std::shared_ptr<Dependency> get_block_dep); |
75 | | bool contain_load_id(const UniqueId& load_id); |
76 | | Status add_load_id(const UniqueId& load_id, const std::shared_ptr<Dependency> put_block_dep); |
77 | | Status remove_load_id(const UniqueId& load_id); |
78 | | void cancel(const Status& st); |
79 | 0 | bool need_commit() { return _need_commit; } |
80 | | |
81 | | Status create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, const std::string& import_label, |
82 | | WalManager* wal_manager, std::vector<TSlotDescriptor>& slot_desc, |
83 | | int be_exe_version); |
84 | | Status close_wal(); |
85 | | bool has_enough_wal_disk_space(size_t estimated_wal_bytes); |
86 | | void append_dependency(std::shared_ptr<Dependency> finish_dep); |
87 | | void append_read_dependency(std::shared_ptr<Dependency> read_dep); |
88 | 0 | int64_t get_group_commit_interval_ms() { return _group_commit_interval_ms; }; |
89 | | |
90 | 0 | std::string debug_string() const { |
91 | 0 | fmt::memory_buffer debug_string_buffer; |
92 | 0 | fmt::format_to( |
93 | 0 | debug_string_buffer, |
94 | 0 | "load_instance_id={}, label={}, txn_id={}, " |
95 | 0 | "wait_internal_group_commit_finish={}, data_size_condition={}, " |
96 | 0 | "group_commit_load_count={}, process_finish={}, _need_commit={}, schema_version={}", |
97 | 0 | load_instance_id.to_string(), label, txn_id, wait_internal_group_commit_finish, |
98 | 0 | data_size_condition, group_commit_load_count, process_finish.load(), _need_commit, |
99 | 0 | schema_version); |
100 | 0 | return fmt::to_string(debug_string_buffer); |
101 | 0 | } |
102 | | |
103 | | UniqueId load_instance_id; |
104 | | std::string label; |
105 | | int64_t txn_id; |
106 | | int64_t schema_version; |
107 | | int64_t index_size; |
108 | | bool wait_internal_group_commit_finish = false; |
109 | | bool data_size_condition = false; |
110 | | |
111 | | // counts of load in one group commit |
112 | | std::atomic_size_t group_commit_load_count = 0; |
113 | | |
114 | | // the execute status of this internal group commit |
115 | | std::mutex mutex; |
116 | | std::atomic<bool> process_finish = false; |
117 | | Status status = Status::OK(); |
118 | | std::vector<std::shared_ptr<Dependency>> dependencies; |
119 | | |
120 | | private: |
121 | | void _cancel_without_lock(const Status& st); |
122 | | std::string _get_load_ids(); |
123 | | |
124 | | // the set of load ids of all blocks in this queue |
125 | | std::map<UniqueId, std::shared_ptr<Dependency>> _load_ids_to_write_dep; |
126 | | std::vector<std::shared_ptr<Dependency>> _read_deps; |
127 | | std::list<BlockData> _block_queue; |
128 | | |
129 | | // wal |
130 | | std::string _wal_base_path; |
131 | | std::shared_ptr<VWalWriter> _v_wal_writer; |
132 | | |
133 | | // commit |
134 | | bool _need_commit = false; |
135 | | // commit by time interval, can be changed by 'ALTER TABLE my_table SET ("group_commit_interval_ms"="1000");' |
136 | | int64_t _group_commit_interval_ms; |
137 | | std::chrono::steady_clock::time_point _start_time; |
138 | | std::chrono::steady_clock::time_point _last_print_time; |
139 | | // commit by data size |
140 | | int64_t _group_commit_data_bytes; |
141 | | int64_t _data_bytes = 0; |
142 | | |
143 | | // memory back pressure, memory consumption of all tables' load block queues |
144 | | std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes; |
145 | | std::condition_variable _get_cond; |
146 | | }; |
147 | | |
148 | | class GroupCommitTable { |
149 | | public: |
150 | | GroupCommitTable(ExecEnv* exec_env, doris::ThreadPool* thread_pool, int64_t db_id, |
151 | | int64_t table_id, std::shared_ptr<std::atomic_size_t> all_block_queue_bytes) |
152 | 0 | : _exec_env(exec_env), |
153 | 0 | _thread_pool(thread_pool), |
154 | 0 | _all_block_queues_bytes(all_block_queue_bytes), |
155 | 0 | _db_id(db_id), |
156 | 0 | _table_id(table_id) {}; |
157 | | Status get_first_block_load_queue(int64_t table_id, int64_t base_schema_version, |
158 | | int64_t index_size, const UniqueId& load_id, |
159 | | std::shared_ptr<LoadBlockQueue>& load_block_queue, |
160 | | int be_exe_version, |
161 | | std::shared_ptr<MemTrackerLimiter> mem_tracker, |
162 | | std::shared_ptr<Dependency> create_plan_dep, |
163 | | std::shared_ptr<Dependency> put_block_dep); |
164 | | Status get_load_block_queue(const TUniqueId& instance_id, |
165 | | std::shared_ptr<LoadBlockQueue>& load_block_queue, |
166 | | std::shared_ptr<Dependency> get_block_dep); |
167 | | void remove_load_id(const UniqueId& load_id); |
168 | | |
169 | | private: |
170 | | Status _create_group_commit_load(int be_exe_version, |
171 | | std::shared_ptr<MemTrackerLimiter> mem_tracker); |
172 | | Status _exec_plan_fragment(int64_t db_id, int64_t table_id, const std::string& label, |
173 | | int64_t txn_id, const TPipelineFragmentParams& pipeline_params); |
174 | | Status _finish_group_commit_load(int64_t db_id, int64_t table_id, const std::string& label, |
175 | | int64_t txn_id, const TUniqueId& instance_id, Status& status, |
176 | | RuntimeState* state); |
177 | | |
178 | | ExecEnv* _exec_env = nullptr; |
179 | | ThreadPool* _thread_pool = nullptr; |
180 | | // memory consumption of all tables' load block queues, used for memory back pressure. |
181 | | std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes; |
182 | | |
183 | | int64_t _db_id; |
184 | | int64_t _table_id; |
185 | | |
186 | | std::mutex _lock; |
187 | | // fragment_instance_id to load_block_queue |
188 | | std::unordered_map<UniqueId, std::shared_ptr<LoadBlockQueue>> _load_block_queues; |
189 | | bool _is_creating_plan_fragment = false; |
190 | | // user_load_id -> <create_plan_dep, put_block_dep, base_schema_version, index_size> |
191 | | std::unordered_map<UniqueId, std::tuple<std::shared_ptr<Dependency>, |
192 | | std::shared_ptr<Dependency>, int64_t, int64_t>> |
193 | | _create_plan_deps; |
194 | | std::string _create_plan_failed_reason; |
195 | | }; |
196 | | |
197 | | class GroupCommitMgr { |
198 | | public: |
199 | | GroupCommitMgr(ExecEnv* exec_env); |
200 | | virtual ~GroupCommitMgr(); |
201 | | |
202 | | void stop(); |
203 | | |
204 | | // used when init group_commit_scan_node |
205 | | Status get_load_block_queue(int64_t table_id, const TUniqueId& instance_id, |
206 | | std::shared_ptr<LoadBlockQueue>& load_block_queue, |
207 | | std::shared_ptr<Dependency> get_block_dep); |
208 | | Status get_first_block_load_queue(int64_t db_id, int64_t table_id, int64_t base_schema_version, |
209 | | int64_t index_size, const UniqueId& load_id, |
210 | | std::shared_ptr<LoadBlockQueue>& load_block_queue, |
211 | | int be_exe_version, |
212 | | std::shared_ptr<MemTrackerLimiter> mem_tracker, |
213 | | std::shared_ptr<Dependency> create_plan_dep, |
214 | | std::shared_ptr<Dependency> put_block_dep); |
215 | | void remove_load_id(int64_t table_id, const UniqueId& load_id); |
216 | | std::promise<Status> debug_promise; |
217 | | std::future<Status> debug_future = debug_promise.get_future(); |
218 | | |
219 | | private: |
220 | | ExecEnv* _exec_env = nullptr; |
221 | | std::unique_ptr<doris::ThreadPool> _thread_pool; |
222 | | // memory consumption of all tables' load block queues, used for memory back pressure. |
223 | | std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes; |
224 | | |
225 | | std::mutex _lock; |
226 | | // TODO remove table when unused |
227 | | std::unordered_map<int64_t, std::shared_ptr<GroupCommitTable>> _table_map; |
228 | | }; |
229 | | |
230 | | } // namespace doris |