Coverage Report

Created: 2026-05-20 18:32

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/memtable/memtable_flush_executor.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <atomic>
21
#include <condition_variable>
22
#include <cstdint>
23
#include <iosfwd>
24
#include <memory>
25
#include <utility>
26
#include <vector>
27
28
#include "common/status.h"
29
#include "load/delta_writer/delta_writer_context.h"
30
#include "load/memtable/memtable.h"
31
#include "util/threadpool.h"
32
33
namespace doris {
34
35
class DataDir;
36
class MemTable;
37
class MemTableMemoryLimiter;
38
class Block;
39
class GroupRowsetWriter;
40
class OlapTableSchemaParam;
41
class AutoIncIDBuffer;
42
class RowsetWriter;
43
class SystemMetrics;
44
class WorkloadGroup;
45
46
// the statistic of a certain flush handler.
47
// use atomic because it may be updated by multi threads
48
struct FlushStatistic {
49
    std::atomic_uint64_t flush_time_ns = 0;
50
    std::atomic_uint64_t flush_submit_count = 0;
51
    std::atomic_int64_t flush_running_count = 0;
52
    std::atomic_uint64_t flush_finish_count = 0;
53
    std::atomic_uint64_t flush_size_bytes = 0;
54
    std::atomic_uint64_t flush_disk_size_bytes = 0;
55
    std::atomic_uint64_t flush_wait_time_ns = 0;
56
};
57
58
struct SharedMemtable {
59
    std::shared_ptr<MemTable> memtable;
60
    int32_t segment_id = 0;
61
62
    ~SharedMemtable();
63
64
    std::once_flag block_once;
65
    Status block_status;
66
    std::shared_ptr<Block> block;
67
68
    std::atomic<int> finished_sub_task_count {0};
69
    // data + binlog
70
    std::atomic<int> total_sub_task_count {2};
71
72
4
    int add_finished_sub_task() { return finished_sub_task_count.fetch_add(1); }
73
74
0
    std::string debug_string() const {
75
0
        return "PartOfGroupMemtableFlushTask{segment_id=" + std::to_string(segment_id) +
76
0
               ", finished_sub_task_count=" + std::to_string(finished_sub_task_count.load()) +
77
0
               ", total_sub_task_count=" + std::to_string(total_sub_task_count.load()) + "}";
78
0
    }
79
};
80
81
std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat);
82
83
// A thin wrapper of ThreadPoolToken to submit task.
84
// For a tablet, there may be multiple memtables, which will be flushed to disk
85
// one by one in the order of generation.
86
// If a memtable flush fails, then:
87
// 1. Immediately disallow submission of any subsequent memtable
88
// 2. For the memtables that have already been submitted, there is no need to flush,
89
//    because the entire job will definitely fail;
90
class FlushToken : public std::enable_shared_from_this<FlushToken> {
91
    ENABLE_FACTORY_CREATOR(FlushToken);
92
93
public:
94
    FlushToken(ThreadPool* thread_pool, std::shared_ptr<WorkloadGroup> wg_sptr)
95
18
            : _flush_status(Status::OK()), _thread_pool(thread_pool), _wg_wptr(wg_sptr) {}
96
97
    Status submit(std::shared_ptr<MemTable> mem_table);
98
99
    // error has happens, so we cancel this token
100
    // And remove all tasks in the queue.
101
    void cancel();
102
103
    // wait all tasks in token to be completed.
104
    Status wait();
105
106
    // get flush operations' statistics
107
41
    const FlushStatistic& get_stats() const { return _stats; }
108
109
18
    void set_rowset_writer(std::shared_ptr<RowsetWriter> rowset_writer) {
110
18
        _rowset_writer = rowset_writer;
111
18
    }
112
113
18
    void set_table_schema_param(std::shared_ptr<OlapTableSchemaParam> table_schema_param) {
114
18
        _table_schema_param = std::move(table_schema_param);
115
18
    }
116
117
#ifdef BE_TEST
118
    void set_row_binlog_lsn_buffer_for_test(
119
3
            std::shared_ptr<AutoIncIDBuffer> row_binlog_lsn_buffer) {
120
3
        _row_binlog_lsn_buffer = std::move(row_binlog_lsn_buffer);
121
3
    }
122
#endif
123
124
30
    const MemTableStat& memtable_stat() { return _memtable_stat; }
125
126
private:
127
32
    void _shutdown_flush_token() { _shutdown.store(true); }
128
36
    bool _is_shutdown() { return _shutdown.load(); }
129
    void _wait_submit_task_finish();
130
    void _wait_running_task_finish();
131
132
private:
133
    friend class MemtableFlushTask;
134
    friend class PartOfGroupMemtableFlushTask;
135
136
    Status _submit_sub_tasks(ThreadPool* pool, std::vector<std::shared_ptr<Runnable>> sub_tasks);
137
138
    void _flush_memtable_impl(RowsetWriter* flush_writer, MemTable* memtable, int32_t segment_id,
139
                              int64_t submit_task_time, SharedMemtable* shared_memtable = nullptr);
140
141
    void _flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t segment_id,
142
                         int64_t submit_task_time);
143
144
    void _flush_group_memtable(std::shared_ptr<SharedMemtable> shared_memtable,
145
                               WriteRequestType write_req_type, int64_t submit_task_time);
146
147
    Status _memtable2block(MemTable* memtable, SharedMemtable* shared_memtable,
148
                           std::shared_ptr<Block>& flush_block);
149
150
    Status _try_reserve_memory(const std::shared_ptr<ResourceContext>& resource_context,
151
                               int64_t size);
152
153
    // Records the current flush status of the tablet.
154
    // Note: Once its value is set to Failed, it cannot return to SUCCESS.
155
    std::shared_mutex _flush_status_lock;
156
    Status _flush_status;
157
158
    FlushStatistic _stats;
159
160
    std::shared_ptr<RowsetWriter> _rowset_writer = nullptr;
161
162
    std::shared_ptr<OlapTableSchemaParam> _table_schema_param = nullptr;
163
    std::shared_ptr<AutoIncIDBuffer> _row_binlog_lsn_buffer = nullptr;
164
165
    MemTableStat _memtable_stat;
166
167
    std::atomic<bool> _shutdown = false;
168
    ThreadPool* _thread_pool = nullptr;
169
170
    std::mutex _mutex;
171
    std::condition_variable _submit_task_finish_cond;
172
    std::condition_variable _running_task_finish_cond;
173
174
    std::weak_ptr<WorkloadGroup> _wg_wptr;
175
};
176
177
// MemTableFlushExecutor is responsible for flushing memtables to disk.
178
// It encapsulate a ThreadPool to handle all tasks.
179
// Usage Example:
180
//      ...
181
//      std::shared_ptr<FlushHandler> flush_handler;
182
//      memTableFlushExecutor.create_flush_token(&flush_handler);
183
//      ...
184
//      flush_token->submit(memtable)
185
//      ...
186
class MemTableFlushExecutor {
187
public:
188
43
    MemTableFlushExecutor() = default;
189
43
    ~MemTableFlushExecutor() {
190
43
        _flush_pool->shutdown();
191
43
        _high_prio_flush_pool->shutdown();
192
43
    }
193
194
    // init should be called after storage engine is opened,
195
    // because it needs path hash of each data dir.
196
    void init(int num_disk);
197
198
    Status create_flush_token(std::shared_ptr<FlushToken>& flush_token,
199
                              std::shared_ptr<RowsetWriter> rowset_writer, bool is_high_priority,
200
                              std::shared_ptr<WorkloadGroup> wg_sptr,
201
                              std::shared_ptr<OlapTableSchemaParam> table_schema_param = nullptr);
202
203
    // return true if it already has any flushing task
204
0
    bool check_and_inc_has_any_flushing_task() {
205
        // need to use CAS instead of only `if (0 == _flushing_task_count)` statement,
206
        // to avoid concurrent entries both pass the if statement
207
0
        int expected_count = 0;
208
0
        if (!_flushing_task_count.compare_exchange_strong(expected_count, 1)) {
209
0
            return true;
210
0
        }
211
0
        DCHECK(expected_count == 0 && _flushing_task_count == 1);
212
0
        return false;
213
0
    }
214
215
0
    void inc_flushing_task() { _flushing_task_count++; }
216
217
0
    void dec_flushing_task() { _flushing_task_count--; }
218
219
10
    ThreadPool* flush_pool() { return _flush_pool.get(); }
220
221
0
    ThreadPool* high_prio_flush_pool() { return _high_prio_flush_pool.get(); }
222
223
    void update_memtable_flush_threads();
224
225
    // Returns {min_threads, max_threads} for a flush thread pool.
226
    // thread_num_per_store is used as the baseline when adaptive mode is off.
227
    static std::pair<int, int> calc_flush_thread_count(int num_cpus, int num_disk,
228
                                                       int thread_num_per_store);
229
230
private:
231
    std::unique_ptr<ThreadPool> _flush_pool;
232
    std::unique_ptr<ThreadPool> _high_prio_flush_pool;
233
    std::atomic<int> _flushing_task_count = 0;
234
    int _num_disk = 0;
235
};
236
237
} // namespace doris