Coverage Report

Created: 2026-03-16 21:03

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/memtable/memtable_flush_executor.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <atomic>
21
#include <condition_variable>
22
#include <cstdint>
23
#include <iosfwd>
24
#include <memory>
25
#include <utility>
26
#include <vector>
27
28
#include "common/status.h"
29
#include "load/memtable/memtable.h"
30
#include "util/threadpool.h"
31
32
namespace doris {
33
34
class DataDir;
35
class MemTable;
36
class RowsetWriter;
37
class WorkloadGroup;
38
39
// the statistic of a certain flush handler.
40
// use atomic because it may be updated by multi threads
41
struct FlushStatistic {
42
    std::atomic_uint64_t flush_time_ns = 0;
43
    std::atomic_uint64_t flush_submit_count = 0;
44
    std::atomic_int64_t flush_running_count = 0;
45
    std::atomic_uint64_t flush_finish_count = 0;
46
    std::atomic_uint64_t flush_size_bytes = 0;
47
    std::atomic_uint64_t flush_disk_size_bytes = 0;
48
    std::atomic_uint64_t flush_wait_time_ns = 0;
49
};
50
51
std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat);
52
53
// A thin wrapper of ThreadPoolToken to submit task.
54
// For a tablet, there may be multiple memtables, which will be flushed to disk
55
// one by one in the order of generation.
56
// If a memtable flush fails, then:
57
// 1. Immediately disallow submission of any subsequent memtable
58
// 2. For the memtables that have already been submitted, there is no need to flush,
59
//    because the entire job will definitely fail;
60
class FlushToken : public std::enable_shared_from_this<FlushToken> {
61
    ENABLE_FACTORY_CREATOR(FlushToken);
62
63
public:
64
    FlushToken(ThreadPool* thread_pool, std::shared_ptr<WorkloadGroup> wg_sptr)
65
15
            : _flush_status(Status::OK()), _thread_pool(thread_pool), _wg_wptr(wg_sptr) {}
66
67
    Status submit(std::shared_ptr<MemTable> mem_table);
68
69
    // error has happens, so we cancel this token
70
    // And remove all tasks in the queue.
71
    void cancel();
72
73
    // wait all tasks in token to be completed.
74
    Status wait();
75
76
    // get flush operations' statistics
77
35
    const FlushStatistic& get_stats() const { return _stats; }
78
79
15
    void set_rowset_writer(std::shared_ptr<RowsetWriter> rowset_writer) {
80
15
        _rowset_writer = rowset_writer;
81
15
    }
82
83
30
    const MemTableStat& memtable_stat() { return _memtable_stat; }
84
85
private:
86
30
    void _shutdown_flush_token() { _shutdown.store(true); }
87
24
    bool _is_shutdown() { return _shutdown.load(); }
88
    void _wait_submit_task_finish();
89
    void _wait_running_task_finish();
90
91
private:
92
    friend class MemtableFlushTask;
93
94
    void _flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t segment_id,
95
                         int64_t submit_task_time);
96
97
    Status _do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size);
98
99
    Status _try_reserve_memory(const std::shared_ptr<ResourceContext>& resource_context,
100
                               int64_t size);
101
102
    // Records the current flush status of the tablet.
103
    // Note: Once its value is set to Failed, it cannot return to SUCCESS.
104
    std::shared_mutex _flush_status_lock;
105
    Status _flush_status;
106
107
    FlushStatistic _stats;
108
109
    std::shared_ptr<RowsetWriter> _rowset_writer = nullptr;
110
111
    MemTableStat _memtable_stat;
112
113
    std::atomic<bool> _shutdown = false;
114
    ThreadPool* _thread_pool = nullptr;
115
116
    std::mutex _mutex;
117
    std::condition_variable _submit_task_finish_cond;
118
    std::condition_variable _running_task_finish_cond;
119
120
    std::weak_ptr<WorkloadGroup> _wg_wptr;
121
};
122
123
// MemTableFlushExecutor is responsible for flushing memtables to disk.
124
// It encapsulate a ThreadPool to handle all tasks.
125
// Usage Example:
126
//      ...
127
//      std::shared_ptr<FlushHandler> flush_handler;
128
//      memTableFlushExecutor.create_flush_token(&flush_handler);
129
//      ...
130
//      flush_token->submit(memtable)
131
//      ...
132
class MemTableFlushExecutor {
133
public:
134
38
    MemTableFlushExecutor() = default;
135
38
    ~MemTableFlushExecutor() {
136
38
        _flush_pool->shutdown();
137
38
        _high_prio_flush_pool->shutdown();
138
38
    }
139
140
    // init should be called after storage engine is opened,
141
    // because it needs path hash of each data dir.
142
    void init(int num_disk);
143
144
    Status create_flush_token(std::shared_ptr<FlushToken>& flush_token,
145
                              std::shared_ptr<RowsetWriter> rowset_writer, bool is_high_priority,
146
                              std::shared_ptr<WorkloadGroup> wg_sptr);
147
148
    // return true if it already has any flushing task
149
0
    bool check_and_inc_has_any_flushing_task() {
150
        // need to use CAS instead of only `if (0 == _flushing_task_count)` statement,
151
        // to avoid concurrent entries both pass the if statement
152
0
        int expected_count = 0;
153
0
        if (!_flushing_task_count.compare_exchange_strong(expected_count, 1)) {
154
0
            return true;
155
0
        }
156
0
        DCHECK(expected_count == 0 && _flushing_task_count == 1);
157
0
        return false;
158
0
    }
159
160
0
    void inc_flushing_task() { _flushing_task_count++; }
161
162
0
    void dec_flushing_task() { _flushing_task_count--; }
163
164
10
    ThreadPool* flush_pool() { return _flush_pool.get(); }
165
166
    void update_memtable_flush_threads();
167
168
private:
169
    std::unique_ptr<ThreadPool> _flush_pool;
170
    std::unique_ptr<ThreadPool> _high_prio_flush_pool;
171
    std::atomic<int> _flushing_task_count = 0;
172
    int _num_disk = 0;
173
};
174
175
} // namespace doris