Coverage Report

Created: 2026-03-13 19:41

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/group_commit/group_commit_mgr.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/PaloInternalService_types.h>
21
22
#include <atomic>
23
#include <condition_variable>
24
#include <cstdint>
25
#include <future>
26
#include <memory>
27
#include <mutex>
28
#include <shared_mutex>
29
#include <unordered_map>
30
#include <utility>
31
32
#include "common/status.h"
33
#include "core/block/block.h"
34
#include "exec/sink/writer/vwal_writer.h"
35
#include "load/group_commit/wal/wal_manager.h"
36
#include "runtime/exec_env.h"
37
#include "util/threadpool.h"
38
39
namespace doris {
40
class ExecEnv;
41
class TUniqueId;
42
class RuntimeState;
43
44
class Dependency;
45
46
struct BlockData {
47
0
    BlockData(const std::shared_ptr<Block>& block) : block(block), block_bytes(block->bytes()) {};
48
    std::shared_ptr<Block> block;
49
    size_t block_bytes;
50
};
51
52
class LoadBlockQueue {
53
public:
54
    LoadBlockQueue(const UniqueId& load_instance_id, std::string& label, int64_t txn_id,
55
                   int64_t schema_version, int64_t index_size,
56
                   std::shared_ptr<std::atomic_size_t> all_block_queues_bytes,
57
                   bool wait_internal_group_commit_finish, int64_t group_commit_interval_ms,
58
                   int64_t group_commit_data_bytes)
59
0
            : load_instance_id(load_instance_id),
60
0
              label(label),
61
0
              txn_id(txn_id),
62
0
              schema_version(schema_version),
63
0
              index_size(index_size),
64
0
              wait_internal_group_commit_finish(wait_internal_group_commit_finish),
65
0
              _group_commit_interval_ms(group_commit_interval_ms),
66
0
              _start_time(std::chrono::steady_clock::now()),
67
0
              _last_print_time(_start_time),
68
0
              _group_commit_data_bytes(group_commit_data_bytes),
69
0
              _all_block_queues_bytes(all_block_queues_bytes) {};
70
71
    Status add_block(RuntimeState* runtime_state, std::shared_ptr<Block> block, bool write_wal,
72
                     UniqueId& load_id);
73
    Status get_block(RuntimeState* runtime_state, Block* block, bool* find_block, bool* eos,
74
                     std::shared_ptr<Dependency> get_block_dep);
75
    bool contain_load_id(const UniqueId& load_id);
76
    Status add_load_id(const UniqueId& load_id, const std::shared_ptr<Dependency> put_block_dep);
77
    Status remove_load_id(const UniqueId& load_id);
78
    void cancel(const Status& st);
79
0
    bool need_commit() { return _need_commit; }
80
81
    Status create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, const std::string& import_label,
82
                      WalManager* wal_manager, std::vector<TSlotDescriptor>& slot_desc,
83
                      int be_exe_version);
84
    Status close_wal();
85
    bool has_enough_wal_disk_space(size_t estimated_wal_bytes);
86
    void append_dependency(std::shared_ptr<Dependency> finish_dep);
87
    void append_read_dependency(std::shared_ptr<Dependency> read_dep);
88
0
    int64_t get_group_commit_interval_ms() { return _group_commit_interval_ms; };
89
90
0
    std::string debug_string() const {
91
0
        fmt::memory_buffer debug_string_buffer;
92
0
        fmt::format_to(
93
0
                debug_string_buffer,
94
0
                "load_instance_id={}, label={}, txn_id={}, "
95
0
                "wait_internal_group_commit_finish={}, data_size_condition={}, "
96
0
                "group_commit_load_count={}, process_finish={}, _need_commit={}, schema_version={}",
97
0
                load_instance_id.to_string(), label, txn_id, wait_internal_group_commit_finish,
98
0
                data_size_condition, group_commit_load_count, process_finish.load(), _need_commit,
99
0
                schema_version);
100
0
        return fmt::to_string(debug_string_buffer);
101
0
    }
102
103
    UniqueId load_instance_id;
104
    std::string label;
105
    int64_t txn_id;
106
    int64_t schema_version;
107
    int64_t index_size;
108
    bool wait_internal_group_commit_finish = false;
109
    bool data_size_condition = false;
110
111
    // counts of load in one group commit
112
    std::atomic_size_t group_commit_load_count = 0;
113
114
    // the execute status of this internal group commit
115
    std::mutex mutex;
116
    std::atomic<bool> process_finish = false;
117
    Status status = Status::OK();
118
    std::vector<std::shared_ptr<Dependency>> dependencies;
119
120
private:
121
    void _cancel_without_lock(const Status& st);
122
    std::string _get_load_ids();
123
124
    // the set of load ids of all blocks in this queue
125
    std::map<UniqueId, std::shared_ptr<Dependency>> _load_ids_to_write_dep;
126
    std::vector<std::shared_ptr<Dependency>> _read_deps;
127
    std::list<BlockData> _block_queue;
128
129
    // wal
130
    std::string _wal_base_path;
131
    std::shared_ptr<VWalWriter> _v_wal_writer;
132
133
    // commit
134
    bool _need_commit = false;
135
    // commit by time interval, can be changed by 'ALTER TABLE my_table SET ("group_commit_interval_ms"="1000");'
136
    int64_t _group_commit_interval_ms;
137
    std::chrono::steady_clock::time_point _start_time;
138
    std::chrono::steady_clock::time_point _last_print_time;
139
    // commit by data size
140
    int64_t _group_commit_data_bytes;
141
    int64_t _data_bytes = 0;
142
143
    // memory back pressure, memory consumption of all tables' load block queues
144
    std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
145
    std::condition_variable _get_cond;
146
};
147
148
class GroupCommitTable {
149
public:
150
    GroupCommitTable(ExecEnv* exec_env, doris::ThreadPool* thread_pool, int64_t db_id,
151
                     int64_t table_id, std::shared_ptr<std::atomic_size_t> all_block_queue_bytes)
152
0
            : _exec_env(exec_env),
153
0
              _thread_pool(thread_pool),
154
0
              _all_block_queues_bytes(all_block_queue_bytes),
155
0
              _db_id(db_id),
156
0
              _table_id(table_id) {};
157
    Status get_first_block_load_queue(int64_t table_id, int64_t base_schema_version,
158
                                      int64_t index_size, const UniqueId& load_id,
159
                                      std::shared_ptr<LoadBlockQueue>& load_block_queue,
160
                                      int be_exe_version,
161
                                      std::shared_ptr<MemTrackerLimiter> mem_tracker,
162
                                      std::shared_ptr<Dependency> create_plan_dep,
163
                                      std::shared_ptr<Dependency> put_block_dep);
164
    Status get_load_block_queue(const TUniqueId& instance_id,
165
                                std::shared_ptr<LoadBlockQueue>& load_block_queue,
166
                                std::shared_ptr<Dependency> get_block_dep);
167
    void remove_load_id(const UniqueId& load_id);
168
169
private:
170
    Status _create_group_commit_load(int be_exe_version,
171
                                     std::shared_ptr<MemTrackerLimiter> mem_tracker);
172
    Status _exec_plan_fragment(int64_t db_id, int64_t table_id, const std::string& label,
173
                               int64_t txn_id, const TPipelineFragmentParams& pipeline_params);
174
    Status _finish_group_commit_load(int64_t db_id, int64_t table_id, const std::string& label,
175
                                     int64_t txn_id, const TUniqueId& instance_id, Status& status,
176
                                     RuntimeState* state);
177
178
    ExecEnv* _exec_env = nullptr;
179
    ThreadPool* _thread_pool = nullptr;
180
    // memory consumption of all tables' load block queues, used for memory back pressure.
181
    std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
182
183
    int64_t _db_id;
184
    int64_t _table_id;
185
186
    std::mutex _lock;
187
    // fragment_instance_id to load_block_queue
188
    std::unordered_map<UniqueId, std::shared_ptr<LoadBlockQueue>> _load_block_queues;
189
    bool _is_creating_plan_fragment = false;
190
    // user_load_id -> <create_plan_dep, put_block_dep, base_schema_version, index_size>
191
    std::unordered_map<UniqueId, std::tuple<std::shared_ptr<Dependency>,
192
                                            std::shared_ptr<Dependency>, int64_t, int64_t>>
193
            _create_plan_deps;
194
    std::string _create_plan_failed_reason;
195
};
196
197
class GroupCommitMgr {
198
public:
199
    GroupCommitMgr(ExecEnv* exec_env);
200
    virtual ~GroupCommitMgr();
201
202
    void stop();
203
204
    // used when init group_commit_scan_node
205
    Status get_load_block_queue(int64_t table_id, const TUniqueId& instance_id,
206
                                std::shared_ptr<LoadBlockQueue>& load_block_queue,
207
                                std::shared_ptr<Dependency> get_block_dep);
208
    Status get_first_block_load_queue(int64_t db_id, int64_t table_id, int64_t base_schema_version,
209
                                      int64_t index_size, const UniqueId& load_id,
210
                                      std::shared_ptr<LoadBlockQueue>& load_block_queue,
211
                                      int be_exe_version,
212
                                      std::shared_ptr<MemTrackerLimiter> mem_tracker,
213
                                      std::shared_ptr<Dependency> create_plan_dep,
214
                                      std::shared_ptr<Dependency> put_block_dep);
215
    void remove_load_id(int64_t table_id, const UniqueId& load_id);
216
    std::promise<Status> debug_promise;
217
    std::future<Status> debug_future = debug_promise.get_future();
218
219
private:
220
    ExecEnv* _exec_env = nullptr;
221
    std::unique_ptr<doris::ThreadPool> _thread_pool;
222
    // memory consumption of all tables' load block queues, used for memory back pressure.
223
    std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
224
225
    std::mutex _lock;
226
    // TODO remove table when unused
227
    std::unordered_map<int64_t, std::shared_ptr<GroupCommitTable>> _table_map;
228
};
229
230
} // namespace doris