Coverage Report

Created: 2026-04-01 19:30

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/memtable/memtable_flush_executor.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <atomic>
21
#include <condition_variable>
22
#include <cstdint>
23
#include <iosfwd>
24
#include <memory>
25
#include <utility>
26
#include <vector>
27
28
#include "common/status.h"
29
#include "load/memtable/memtable.h"
30
#include "util/threadpool.h"
31
32
namespace doris {
33
34
class DataDir;
35
class MemTable;
36
class MemTableMemoryLimiter;
37
class RowsetWriter;
38
class SystemMetrics;
39
class WorkloadGroup;
40
41
// the statistic of a certain flush handler.
42
// use atomic because it may be updated by multi threads
43
struct FlushStatistic {
44
    std::atomic_uint64_t flush_time_ns = 0;
45
    std::atomic_uint64_t flush_submit_count = 0;
46
    std::atomic_int64_t flush_running_count = 0;
47
    std::atomic_uint64_t flush_finish_count = 0;
48
    std::atomic_uint64_t flush_size_bytes = 0;
49
    std::atomic_uint64_t flush_disk_size_bytes = 0;
50
    std::atomic_uint64_t flush_wait_time_ns = 0;
51
};
52
53
std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat);
54
55
// A thin wrapper of ThreadPoolToken to submit task.
56
// For a tablet, there may be multiple memtables, which will be flushed to disk
57
// one by one in the order of generation.
58
// If a memtable flush fails, then:
59
// 1. Immediately disallow submission of any subsequent memtable
60
// 2. For the memtables that have already been submitted, there is no need to flush,
61
//    because the entire job will definitely fail;
62
class FlushToken : public std::enable_shared_from_this<FlushToken> {
63
    ENABLE_FACTORY_CREATOR(FlushToken);
64
65
public:
66
    FlushToken(ThreadPool* thread_pool, std::shared_ptr<WorkloadGroup> wg_sptr)
67
15
            : _flush_status(Status::OK()), _thread_pool(thread_pool), _wg_wptr(wg_sptr) {}
68
69
    Status submit(std::shared_ptr<MemTable> mem_table);
70
71
    // error has happens, so we cancel this token
72
    // And remove all tasks in the queue.
73
    void cancel();
74
75
    // wait all tasks in token to be completed.
76
    Status wait();
77
78
    // get flush operations' statistics
79
35
    const FlushStatistic& get_stats() const { return _stats; }
80
81
15
    void set_rowset_writer(std::shared_ptr<RowsetWriter> rowset_writer) {
82
15
        _rowset_writer = rowset_writer;
83
15
    }
84
85
30
    const MemTableStat& memtable_stat() { return _memtable_stat; }
86
87
private:
88
30
    void _shutdown_flush_token() { _shutdown.store(true); }
89
24
    bool _is_shutdown() { return _shutdown.load(); }
90
    void _wait_submit_task_finish();
91
    void _wait_running_task_finish();
92
93
private:
94
    friend class MemtableFlushTask;
95
96
    void _flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t segment_id,
97
                         int64_t submit_task_time);
98
99
    Status _do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size);
100
101
    Status _try_reserve_memory(const std::shared_ptr<ResourceContext>& resource_context,
102
                               int64_t size);
103
104
    // Records the current flush status of the tablet.
105
    // Note: Once its value is set to Failed, it cannot return to SUCCESS.
106
    std::shared_mutex _flush_status_lock;
107
    Status _flush_status;
108
109
    FlushStatistic _stats;
110
111
    std::shared_ptr<RowsetWriter> _rowset_writer = nullptr;
112
113
    MemTableStat _memtable_stat;
114
115
    std::atomic<bool> _shutdown = false;
116
    ThreadPool* _thread_pool = nullptr;
117
118
    std::mutex _mutex;
119
    std::condition_variable _submit_task_finish_cond;
120
    std::condition_variable _running_task_finish_cond;
121
122
    std::weak_ptr<WorkloadGroup> _wg_wptr;
123
};
124
125
// MemTableFlushExecutor is responsible for flushing memtables to disk.
126
// It encapsulate a ThreadPool to handle all tasks.
127
// Usage Example:
128
//      ...
129
//      std::shared_ptr<FlushHandler> flush_handler;
130
//      memTableFlushExecutor.create_flush_token(&flush_handler);
131
//      ...
132
//      flush_token->submit(memtable)
133
//      ...
134
class MemTableFlushExecutor {
135
public:
136
38
    MemTableFlushExecutor() = default;
137
38
    ~MemTableFlushExecutor() {
138
38
        _flush_pool->shutdown();
139
38
        _high_prio_flush_pool->shutdown();
140
38
    }
141
142
    // init should be called after storage engine is opened,
143
    // because it needs path hash of each data dir.
144
    void init(int num_disk);
145
146
    Status create_flush_token(std::shared_ptr<FlushToken>& flush_token,
147
                              std::shared_ptr<RowsetWriter> rowset_writer, bool is_high_priority,
148
                              std::shared_ptr<WorkloadGroup> wg_sptr);
149
150
    // return true if it already has any flushing task
151
0
    bool check_and_inc_has_any_flushing_task() {
152
        // need to use CAS instead of only `if (0 == _flushing_task_count)` statement,
153
        // to avoid concurrent entries both pass the if statement
154
0
        int expected_count = 0;
155
0
        if (!_flushing_task_count.compare_exchange_strong(expected_count, 1)) {
156
0
            return true;
157
0
        }
158
0
        DCHECK(expected_count == 0 && _flushing_task_count == 1);
159
0
        return false;
160
0
    }
161
162
0
    void inc_flushing_task() { _flushing_task_count++; }
163
164
0
    void dec_flushing_task() { _flushing_task_count--; }
165
166
10
    ThreadPool* flush_pool() { return _flush_pool.get(); }
167
168
0
    ThreadPool* high_prio_flush_pool() { return _high_prio_flush_pool.get(); }
169
170
    void update_memtable_flush_threads();
171
172
    // Returns {min_threads, max_threads} for a flush thread pool.
173
    // thread_num_per_store is used as the baseline when adaptive mode is off.
174
    static std::pair<int, int> calc_flush_thread_count(int num_cpus, int num_disk,
175
                                                       int thread_num_per_store);
176
177
private:
178
    std::unique_ptr<ThreadPool> _flush_pool;
179
    std::unique_ptr<ThreadPool> _high_prio_flush_pool;
180
    std::atomic<int> _flushing_task_count = 0;
181
    int _num_disk = 0;
182
};
183
184
} // namespace doris