Coverage Report

Created: 2026-03-15 20:53

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/memtable/memtable_flush_executor.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/memtable/memtable_flush_executor.h"
19
20
#include <gen_cpp/olap_file.pb.h>
21
22
#include <algorithm>
23
#include <cstddef>
24
#include <ostream>
25
26
#include "common/config.h"
27
#include "common/logging.h"
28
#include "common/metrics/doris_metrics.h"
29
#include "common/metrics/metrics.h"
30
#include "common/signal_handler.h"
31
#include "load/memtable/memtable.h"
32
#include "runtime/thread_context.h"
33
#include "storage/rowset/rowset_writer.h"
34
#include "storage/storage_engine.h"
35
#include "util/debug_points.h"
36
#include "util/pretty_printer.h"
37
#include "util/stopwatch.hpp"
38
#include "util/time.h"
39
40
namespace doris {
41
using namespace ErrorCode;
42
43
bvar::Adder<int64_t> g_flush_task_num("memtable_flush_task_num");
44
45
class MemtableFlushTask final : public Runnable {
46
    ENABLE_FACTORY_CREATOR(MemtableFlushTask);
47
48
public:
49
    MemtableFlushTask(std::shared_ptr<FlushToken> flush_token, std::shared_ptr<MemTable> memtable,
50
                      int32_t segment_id, int64_t submit_task_time)
51
12
            : _flush_token(flush_token),
52
12
              _memtable(memtable),
53
12
              _segment_id(segment_id),
54
12
              _submit_task_time(submit_task_time) {
55
12
        g_flush_task_num << 1;
56
12
    }
57
58
12
    ~MemtableFlushTask() override { g_flush_task_num << -1; }
59
60
12
    void run() override {
61
12
        auto token = _flush_token.lock();
62
12
        if (token) {
63
12
            token->_flush_memtable(_memtable, _segment_id, _submit_task_time);
64
12
        } else {
65
0
            LOG(WARNING) << "flush token is deconstructed, ignore the flush task";
66
0
        }
67
12
    }
68
69
private:
70
    std::weak_ptr<FlushToken> _flush_token;
71
    std::shared_ptr<MemTable> _memtable;
72
    int32_t _segment_id;
73
    int64_t _submit_task_time;
74
};
75
76
0
std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
77
0
    os << "(flush time(ms)=" << stat.flush_time_ns / NANOS_PER_MILLIS
78
0
       << ", flush wait time(ms)=" << stat.flush_wait_time_ns / NANOS_PER_MILLIS
79
0
       << ", flush submit count=" << stat.flush_submit_count
80
0
       << ", running flush count=" << stat.flush_running_count
81
0
       << ", finish flush count=" << stat.flush_finish_count
82
0
       << ", flush bytes: " << stat.flush_size_bytes
83
0
       << ", flush disk bytes: " << stat.flush_disk_size_bytes << ")";
84
0
    return os;
85
0
}
86
87
15
Status FlushToken::submit(std::shared_ptr<MemTable> mem_table) {
88
15
    {
89
15
        std::shared_lock rdlk(_flush_status_lock);
90
15
        DBUG_EXECUTE_IF("FlushToken.submit_flush_error", {
91
15
            _flush_status = Status::IOError<false>("dbug_be_memtable_submit_flush_error");
92
15
        });
93
15
        if (!_flush_status.ok()) {
94
0
            return _flush_status;
95
0
        }
96
15
    }
97
98
15
    if (mem_table == nullptr || mem_table->empty()) {
99
3
        return Status::OK();
100
3
    }
101
12
    int64_t submit_task_time = MonotonicNanos();
102
12
    auto task = MemtableFlushTask::create_shared(
103
12
            shared_from_this(), mem_table, _rowset_writer->allocate_segment_id(), submit_task_time);
104
    // NOTE: we should guarantee WorkloadGroup is not deconstructed when submit memtable flush task.
105
    // because currently WorkloadGroup's can only be destroyed when all queries in the group is finished,
106
    // but not consider whether load channel is finish.
107
12
    std::shared_ptr<WorkloadGroup> wg_sptr = _wg_wptr.lock();
108
12
    ThreadPool* wg_thread_pool = nullptr;
109
12
    if (wg_sptr) {
110
0
        wg_thread_pool = wg_sptr->get_memtable_flush_pool();
111
0
    }
112
12
    Status ret = wg_thread_pool ? wg_thread_pool->submit(std::move(task))
113
12
                                : _thread_pool->submit(std::move(task));
114
12
    if (ret.ok()) {
115
        // _wait_running_task_finish was executed after this function, so no need to notify _cond here
116
12
        _stats.flush_submit_count++;
117
12
    }
118
12
    return ret;
119
15
}
120
121
// NOTE: FlushToken's submit/cancel/wait run in one thread,
122
// so we don't need to make them mutually exclusive, std::atomic is enough.
123
15
void FlushToken::_wait_submit_task_finish() {
124
15
    std::unique_lock<std::mutex> lock(_mutex);
125
27
    _submit_task_finish_cond.wait(lock, [&]() { return _stats.flush_submit_count.load() == 0; });
126
15
}
127
128
30
void FlushToken::_wait_running_task_finish() {
129
30
    std::unique_lock<std::mutex> lock(_mutex);
130
30
    _running_task_finish_cond.wait(lock, [&]() { return _stats.flush_running_count.load() == 0; });
131
30
}
132
133
30
void FlushToken::cancel() {
134
30
    _shutdown_flush_token();
135
30
    _wait_running_task_finish();
136
30
}
137
138
15
Status FlushToken::wait() {
139
15
    _wait_submit_task_finish();
140
15
    {
141
15
        std::shared_lock rdlk(_flush_status_lock);
142
15
        if (!_flush_status.ok()) {
143
0
            return _flush_status;
144
0
        }
145
15
    }
146
15
    return Status::OK();
147
15
}
148
149
Status FlushToken::_try_reserve_memory(const std::shared_ptr<ResourceContext>& resource_context,
150
0
                                       int64_t size) {
151
0
    auto* thread_context = doris::thread_context();
152
0
    auto* memtable_flush_executor =
153
0
            ExecEnv::GetInstance()->storage_engine().memtable_flush_executor();
154
0
    Status st;
155
0
    int32_t max_waiting_time = config::memtable_wait_for_memory_sleep_time_s;
156
0
    do {
157
        // only try to reserve process memory
158
0
        st = thread_context->thread_mem_tracker_mgr->try_reserve(
159
0
                size, ThreadMemTrackerMgr::TryReserveChecker::CHECK_PROCESS);
160
0
        if (st.ok()) {
161
0
            memtable_flush_executor->inc_flushing_task();
162
0
            break;
163
0
        }
164
0
        if (_is_shutdown() || resource_context->task_controller()->is_cancelled()) {
165
0
            st = Status::Cancelled("flush memtable already cancelled");
166
0
            break;
167
0
        }
168
        // Make sure at least one memtable is flushing even reserve memory failed.
169
0
        if (memtable_flush_executor->check_and_inc_has_any_flushing_task()) {
170
            // If there are already any flushing task, Wait for some time and retry.
171
0
            LOG_EVERY_T(INFO, 60) << fmt::format(
172
0
                    "Failed to reserve memory {} for flush memtable, retry after 100ms",
173
0
                    PrettyPrinter::print_bytes(size));
174
0
            std::this_thread::sleep_for(std::chrono::seconds(1));
175
0
            max_waiting_time -= 1;
176
0
        } else {
177
0
            st = Status::OK();
178
0
            break;
179
0
        }
180
0
    } while (max_waiting_time > 0);
181
0
    return st;
182
0
}
183
184
12
Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size) {
185
12
    VLOG_CRITICAL << "begin to flush memtable for tablet: " << memtable->tablet_id()
186
3
                  << ", memsize: " << PrettyPrinter::print_bytes(memtable->memory_usage())
187
3
                  << ", rows: " << memtable->stat().raw_rows;
188
12
    memtable->update_mem_type(MemType::FLUSH);
189
12
    int64_t duration_ns = 0;
190
12
    {
191
12
        SCOPED_RAW_TIMER(&duration_ns);
192
12
        SCOPED_ATTACH_TASK(memtable->resource_ctx());
193
12
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
194
12
                memtable->resource_ctx()->memory_context()->mem_tracker()->write_tracker());
195
12
        SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker());
196
197
        // DEFER_RELEASE_RESERVED();
198
199
        // auto reserve_size = memtable->get_flush_reserve_memory_size();
200
        // if (memtable->resource_ctx()->task_controller()->is_enable_reserve_memory() &&
201
        //     reserve_size > 0) {
202
        //     RETURN_IF_ERROR(_try_reserve_memory(memtable->resource_ctx(), reserve_size));
203
        // }
204
205
        // Defer defer {[&]() {
206
        //     ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->dec_flushing_task();
207
        // }};
208
12
        std::unique_ptr<Block> block;
209
12
        RETURN_IF_ERROR(memtable->to_block(&block));
210
12
        RETURN_IF_ERROR(_rowset_writer->flush_memtable(block.get(), segment_id, flush_size));
211
12
        memtable->set_flush_success();
212
12
    }
213
0
    _memtable_stat += memtable->stat();
214
12
    DorisMetrics::instance()->memtable_flush_total->increment(1);
215
12
    DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000);
216
12
    VLOG_CRITICAL << "after flush memtable for tablet: " << memtable->tablet_id()
217
3
                  << ", flushsize: " << PrettyPrinter::print_bytes(*flush_size);
218
12
    return Status::OK();
219
12
}
220
221
void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t segment_id,
222
12
                                 int64_t submit_task_time) {
223
12
    signal::set_signal_task_id(_rowset_writer->load_id());
224
12
    signal::tablet_id = memtable_ptr->tablet_id();
225
12
    Defer defer {[&]() {
226
12
        std::lock_guard<std::mutex> lock(_mutex);
227
12
        _stats.flush_submit_count--;
228
12
        if (_stats.flush_submit_count == 0) {
229
12
            _submit_task_finish_cond.notify_one();
230
12
        }
231
12
        _stats.flush_running_count--;
232
12
        if (_stats.flush_running_count == 0) {
233
12
            _running_task_finish_cond.notify_one();
234
12
        }
235
12
    }};
236
12
    DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_before_first_shutdown",
237
12
                    { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
238
12
    if (_is_shutdown()) {
239
0
        return;
240
0
    }
241
12
    DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_after_first_shutdown",
242
12
                    { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
243
12
    _stats.flush_running_count++;
244
    // double check if shutdown to avoid wait running task finish count not accurate
245
12
    if (_is_shutdown()) {
246
0
        return;
247
0
    }
248
12
    DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_after_second_shutdown",
249
12
                    { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
250
12
    uint64_t flush_wait_time_ns = MonotonicNanos() - submit_task_time;
251
12
    _stats.flush_wait_time_ns += flush_wait_time_ns;
252
    // If previous flush has failed, return directly
253
12
    {
254
12
        std::shared_lock rdlk(_flush_status_lock);
255
12
        if (!_flush_status.ok()) {
256
0
            return;
257
0
        }
258
12
    }
259
260
12
    MonotonicStopWatch timer;
261
12
    timer.start();
262
12
    size_t memory_usage = memtable_ptr->memory_usage();
263
264
12
    int64_t flush_size;
265
12
    Status s = _do_flush_memtable(memtable_ptr.get(), segment_id, &flush_size);
266
267
12
    {
268
12
        std::shared_lock rdlk(_flush_status_lock);
269
12
        if (!_flush_status.ok()) {
270
0
            return;
271
0
        }
272
12
    }
273
12
    if (!s.ok()) {
274
0
        std::lock_guard wrlk(_flush_status_lock);
275
0
        LOG(WARNING) << "Flush memtable failed with res = " << s
276
0
                     << ", load_id: " << print_id(_rowset_writer->load_id());
277
0
        _flush_status = s;
278
0
        return;
279
0
    }
280
281
12
    VLOG_CRITICAL << "flush memtable wait time: "
282
3
                  << PrettyPrinter::print(flush_wait_time_ns, TUnit::TIME_NS)
283
3
                  << ", flush memtable cost: "
284
3
                  << PrettyPrinter::print(timer.elapsed_time(), TUnit::TIME_NS)
285
3
                  << ", submit count: " << _stats.flush_submit_count
286
3
                  << ", running count: " << _stats.flush_running_count
287
3
                  << ", finish count: " << _stats.flush_finish_count
288
3
                  << ", mem size: " << PrettyPrinter::print_bytes(memory_usage)
289
3
                  << ", disk size: " << PrettyPrinter::print_bytes(flush_size);
290
12
    _stats.flush_time_ns += timer.elapsed_time();
291
12
    _stats.flush_finish_count++;
292
12
    _stats.flush_size_bytes += memtable_ptr->memory_usage();
293
12
    _stats.flush_disk_size_bytes += flush_size;
294
12
}
295
296
38
void MemTableFlushExecutor::init(int num_disk) {
297
38
    _num_disk = std::max(1, num_disk);
298
38
    int num_cpus = std::thread::hardware_concurrency();
299
38
    int min_threads = std::max(1, config::flush_thread_num_per_store);
300
38
    int max_threads = num_cpus == 0 ? _num_disk * min_threads
301
38
                                    : std::min(_num_disk * min_threads,
302
38
                                               num_cpus * config::max_flush_thread_num_per_cpu);
303
38
    static_cast<void>(ThreadPoolBuilder("MemTableFlushThreadPool")
304
38
                              .set_min_threads(min_threads)
305
38
                              .set_max_threads(max_threads)
306
38
                              .build(&_flush_pool));
307
308
38
    min_threads = std::max(1, config::high_priority_flush_thread_num_per_store);
309
38
    max_threads = num_cpus == 0 ? _num_disk * min_threads
310
38
                                : std::min(_num_disk * min_threads,
311
38
                                           num_cpus * config::max_flush_thread_num_per_cpu);
312
38
    static_cast<void>(ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool")
313
38
                              .set_min_threads(min_threads)
314
38
                              .set_max_threads(max_threads)
315
38
                              .build(&_high_prio_flush_pool));
316
38
}
317
318
10
void MemTableFlushExecutor::update_memtable_flush_threads() {
319
10
    int num_cpus = std::thread::hardware_concurrency();
320
10
    int min_threads = std::max(1, config::flush_thread_num_per_store);
321
10
    int max_threads = num_cpus == 0 ? _num_disk * min_threads
322
10
                                    : std::min(_num_disk * min_threads,
323
10
                                               num_cpus * config::max_flush_thread_num_per_cpu);
324
    // Update max_threads first to avoid constraint violation when increasing min_threads
325
10
    static_cast<void>(_flush_pool->set_max_threads(max_threads));
326
10
    static_cast<void>(_flush_pool->set_min_threads(min_threads));
327
328
10
    min_threads = std::max(1, config::high_priority_flush_thread_num_per_store);
329
10
    max_threads = num_cpus == 0 ? _num_disk * min_threads
330
10
                                : std::min(_num_disk * min_threads,
331
10
                                           num_cpus * config::max_flush_thread_num_per_cpu);
332
    // Update max_threads first to avoid constraint violation when increasing min_threads
333
10
    static_cast<void>(_high_prio_flush_pool->set_max_threads(max_threads));
334
10
    static_cast<void>(_high_prio_flush_pool->set_min_threads(min_threads));
335
10
}
336
337
// NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order.
338
Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& flush_token,
339
                                                 std::shared_ptr<RowsetWriter> rowset_writer,
340
                                                 bool is_high_priority,
341
15
                                                 std::shared_ptr<WorkloadGroup> wg_sptr) {
342
15
    switch (rowset_writer->type()) {
343
0
    case ALPHA_ROWSET:
344
        // alpha rowset do not support flush in CONCURRENT.  and not support alpha rowset now.
345
0
        return Status::InternalError<false>("not support alpha rowset load now.");
346
15
    case BETA_ROWSET: {
347
        // beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer.
348
15
        ThreadPool* pool = is_high_priority ? _high_prio_flush_pool.get() : _flush_pool.get();
349
15
        flush_token = FlushToken::create_shared(pool, wg_sptr);
350
15
        flush_token->set_rowset_writer(rowset_writer);
351
15
        return Status::OK();
352
0
    }
353
0
    default:
354
0
        return Status::InternalError<false>("unknown rowset type.");
355
15
    }
356
15
}
357
358
} // namespace doris