Coverage Report

Created: 2026-04-02 14:31

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/memtable/memtable_flush_executor.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/memtable/memtable_flush_executor.h"
19
20
#include <gen_cpp/olap_file.pb.h>
21
22
#include <algorithm>
23
#include <cstddef>
24
#include <ostream>
25
26
#include "common/config.h"
27
#include "common/logging.h"
28
#include "common/metrics/doris_metrics.h"
29
#include "common/metrics/metrics.h"
30
#include "common/metrics/system_metrics.h"
31
#include "common/signal_handler.h"
32
#include "load/memtable/memtable.h"
33
#include "runtime/thread_context.h"
34
#include "storage/rowset/rowset_writer.h"
35
#include "storage/storage_engine.h"
36
#include "util/debug_points.h"
37
#include "util/pretty_printer.h"
38
#include "util/stopwatch.hpp"
39
#include "util/time.h"
40
41
namespace doris {
42
using namespace ErrorCode;
43
44
bvar::Adder<int64_t> g_flush_task_num("memtable_flush_task_num");
45
46
class MemtableFlushTask final : public Runnable {
47
    ENABLE_FACTORY_CREATOR(MemtableFlushTask);
48
49
public:
50
    MemtableFlushTask(std::shared_ptr<FlushToken> flush_token, std::shared_ptr<MemTable> memtable,
51
                      int32_t segment_id, int64_t submit_task_time)
52
12
            : _flush_token(flush_token),
53
12
              _memtable(memtable),
54
12
              _segment_id(segment_id),
55
12
              _submit_task_time(submit_task_time) {
56
12
        g_flush_task_num << 1;
57
12
    }
58
59
12
    ~MemtableFlushTask() override { g_flush_task_num << -1; }
60
61
12
    void run() override {
62
12
        auto token = _flush_token.lock();
63
12
        if (token) {
64
12
            token->_flush_memtable(_memtable, _segment_id, _submit_task_time);
65
12
        } else {
66
0
            LOG(WARNING) << "flush token is deconstructed, ignore the flush task";
67
0
        }
68
12
    }
69
70
private:
71
    std::weak_ptr<FlushToken> _flush_token;
72
    std::shared_ptr<MemTable> _memtable;
73
    int32_t _segment_id;
74
    int64_t _submit_task_time;
75
};
76
77
0
std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
78
0
    os << "(flush time(ms)=" << stat.flush_time_ns / NANOS_PER_MILLIS
79
0
       << ", flush wait time(ms)=" << stat.flush_wait_time_ns / NANOS_PER_MILLIS
80
0
       << ", flush submit count=" << stat.flush_submit_count
81
0
       << ", running flush count=" << stat.flush_running_count
82
0
       << ", finish flush count=" << stat.flush_finish_count
83
0
       << ", flush bytes: " << stat.flush_size_bytes
84
0
       << ", flush disk bytes: " << stat.flush_disk_size_bytes << ")";
85
0
    return os;
86
0
}
87
88
15
Status FlushToken::submit(std::shared_ptr<MemTable> mem_table) {
89
15
    {
90
15
        std::shared_lock rdlk(_flush_status_lock);
91
15
        DBUG_EXECUTE_IF("FlushToken.submit_flush_error", {
92
15
            _flush_status = Status::IOError<false>("dbug_be_memtable_submit_flush_error");
93
15
        });
94
15
        if (!_flush_status.ok()) {
95
0
            return _flush_status;
96
0
        }
97
15
    }
98
99
15
    if (mem_table == nullptr || mem_table->empty()) {
100
3
        return Status::OK();
101
3
    }
102
12
    int64_t submit_task_time = MonotonicNanos();
103
12
    auto task = MemtableFlushTask::create_shared(
104
12
            shared_from_this(), mem_table, _rowset_writer->allocate_segment_id(), submit_task_time);
105
    // NOTE: we should guarantee WorkloadGroup is not deconstructed when submit memtable flush task.
106
    // because currently WorkloadGroup's can only be destroyed when all queries in the group is finished,
107
    // but not consider whether load channel is finish.
108
12
    std::shared_ptr<WorkloadGroup> wg_sptr = _wg_wptr.lock();
109
12
    ThreadPool* wg_thread_pool = nullptr;
110
12
    if (wg_sptr) {
111
0
        wg_thread_pool = wg_sptr->get_memtable_flush_pool();
112
0
    }
113
12
    Status ret = wg_thread_pool ? wg_thread_pool->submit(std::move(task))
114
12
                                : _thread_pool->submit(std::move(task));
115
12
    if (ret.ok()) {
116
        // _wait_running_task_finish was executed after this function, so no need to notify _cond here
117
12
        _stats.flush_submit_count++;
118
12
    }
119
12
    return ret;
120
15
}
121
122
// NOTE: FlushToken's submit/cancel/wait run in one thread,
123
// so we don't need to make them mutually exclusive, std::atomic is enough.
124
15
void FlushToken::_wait_submit_task_finish() {
125
15
    std::unique_lock<std::mutex> lock(_mutex);
126
27
    _submit_task_finish_cond.wait(lock, [&]() { return _stats.flush_submit_count.load() == 0; });
127
15
}
128
129
30
void FlushToken::_wait_running_task_finish() {
130
30
    std::unique_lock<std::mutex> lock(_mutex);
131
30
    _running_task_finish_cond.wait(lock, [&]() { return _stats.flush_running_count.load() == 0; });
132
30
}
133
134
30
void FlushToken::cancel() {
135
30
    _shutdown_flush_token();
136
30
    _wait_running_task_finish();
137
30
}
138
139
15
Status FlushToken::wait() {
140
15
    _wait_submit_task_finish();
141
15
    {
142
15
        std::shared_lock rdlk(_flush_status_lock);
143
15
        if (!_flush_status.ok()) {
144
0
            return _flush_status;
145
0
        }
146
15
    }
147
15
    return Status::OK();
148
15
}
149
150
Status FlushToken::_try_reserve_memory(const std::shared_ptr<ResourceContext>& resource_context,
151
0
                                       int64_t size) {
152
0
    auto* thread_context = doris::thread_context();
153
0
    auto* memtable_flush_executor =
154
0
            ExecEnv::GetInstance()->storage_engine().memtable_flush_executor();
155
0
    Status st;
156
0
    int32_t max_waiting_time = config::memtable_wait_for_memory_sleep_time_s;
157
0
    do {
158
        // only try to reserve process memory
159
0
        st = thread_context->thread_mem_tracker_mgr->try_reserve(
160
0
                size, ThreadMemTrackerMgr::TryReserveChecker::CHECK_PROCESS);
161
0
        if (st.ok()) {
162
0
            memtable_flush_executor->inc_flushing_task();
163
0
            break;
164
0
        }
165
0
        if (_is_shutdown() || resource_context->task_controller()->is_cancelled()) {
166
0
            st = Status::Cancelled("flush memtable already cancelled");
167
0
            break;
168
0
        }
169
        // Make sure at least one memtable is flushing even reserve memory failed.
170
0
        if (memtable_flush_executor->check_and_inc_has_any_flushing_task()) {
171
            // If there are already any flushing task, Wait for some time and retry.
172
0
            LOG_EVERY_T(INFO, 60) << fmt::format(
173
0
                    "Failed to reserve memory {} for flush memtable, retry after 100ms",
174
0
                    PrettyPrinter::print_bytes(size));
175
0
            std::this_thread::sleep_for(std::chrono::seconds(1));
176
0
            max_waiting_time -= 1;
177
0
        } else {
178
0
            st = Status::OK();
179
0
            break;
180
0
        }
181
0
    } while (max_waiting_time > 0);
182
0
    return st;
183
0
}
184
185
12
Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size) {
186
12
    VLOG_CRITICAL << "begin to flush memtable for tablet: " << memtable->tablet_id()
187
3
                  << ", memsize: " << PrettyPrinter::print_bytes(memtable->memory_usage())
188
3
                  << ", rows: " << memtable->stat().raw_rows;
189
12
    memtable->update_mem_type(MemType::FLUSH);
190
12
    int64_t duration_ns = 0;
191
12
    {
192
12
        SCOPED_RAW_TIMER(&duration_ns);
193
12
        SCOPED_ATTACH_TASK(memtable->resource_ctx());
194
12
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
195
12
                memtable->resource_ctx()->memory_context()->mem_tracker()->write_tracker());
196
12
        SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker());
197
198
        // DEFER_RELEASE_RESERVED();
199
200
        // auto reserve_size = memtable->get_flush_reserve_memory_size();
201
        // if (memtable->resource_ctx()->task_controller()->is_enable_reserve_memory() &&
202
        //     reserve_size > 0) {
203
        //     RETURN_IF_ERROR(_try_reserve_memory(memtable->resource_ctx(), reserve_size));
204
        // }
205
206
        // Defer defer {[&]() {
207
        //     ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->dec_flushing_task();
208
        // }};
209
12
        std::unique_ptr<Block> block;
210
12
        RETURN_IF_ERROR(memtable->to_block(&block));
211
12
        RETURN_IF_ERROR(_rowset_writer->flush_memtable(block.get(), segment_id, flush_size));
212
12
        memtable->set_flush_success();
213
12
    }
214
0
    _memtable_stat += memtable->stat();
215
12
    DorisMetrics::instance()->memtable_flush_total->increment(1);
216
12
    DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000);
217
12
    VLOG_CRITICAL << "after flush memtable for tablet: " << memtable->tablet_id()
218
3
                  << ", flushsize: " << PrettyPrinter::print_bytes(*flush_size);
219
12
    return Status::OK();
220
12
}
221
222
void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t segment_id,
223
12
                                 int64_t submit_task_time) {
224
12
    signal::set_signal_task_id(_rowset_writer->load_id());
225
12
    signal::tablet_id = memtable_ptr->tablet_id();
226
    // Count the task as running before registering the deferred cleanup so
227
    // cancel/shutdown paths keep flush_running_count symmetric on every exit.
228
12
    _stats.flush_running_count++;
229
12
    Defer defer {[&]() {
230
12
        std::lock_guard<std::mutex> lock(_mutex);
231
12
        _stats.flush_submit_count--;
232
12
        if (_stats.flush_submit_count == 0) {
233
12
            _submit_task_finish_cond.notify_one();
234
12
        }
235
12
        _stats.flush_running_count--;
236
12
        if (_stats.flush_running_count == 0) {
237
12
            _running_task_finish_cond.notify_one();
238
12
        }
239
12
    }};
240
12
    DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_before_first_shutdown",
241
12
                    { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
242
12
    if (_is_shutdown()) {
243
0
        return;
244
0
    }
245
12
    DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_after_first_shutdown",
246
12
                    { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
247
    // double check if shutdown to avoid wait running task finish count not accurate
248
12
    if (_is_shutdown()) {
249
0
        return;
250
0
    }
251
12
    DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_after_second_shutdown",
252
12
                    { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
253
12
    uint64_t flush_wait_time_ns = MonotonicNanos() - submit_task_time;
254
12
    _stats.flush_wait_time_ns += flush_wait_time_ns;
255
    // If previous flush has failed, return directly
256
12
    {
257
12
        std::shared_lock rdlk(_flush_status_lock);
258
12
        if (!_flush_status.ok()) {
259
0
            return;
260
0
        }
261
12
    }
262
263
12
    MonotonicStopWatch timer;
264
12
    timer.start();
265
12
    size_t memory_usage = memtable_ptr->memory_usage();
266
267
12
    int64_t flush_size;
268
12
    Status s = _do_flush_memtable(memtable_ptr.get(), segment_id, &flush_size);
269
270
12
    {
271
12
        std::shared_lock rdlk(_flush_status_lock);
272
12
        if (!_flush_status.ok()) {
273
0
            return;
274
0
        }
275
12
    }
276
12
    if (!s.ok()) {
277
0
        std::lock_guard wrlk(_flush_status_lock);
278
0
        LOG(WARNING) << "Flush memtable failed with res = " << s
279
0
                     << ", load_id: " << print_id(_rowset_writer->load_id());
280
0
        _flush_status = s;
281
0
        return;
282
0
    }
283
284
12
    VLOG_CRITICAL << "flush memtable wait time: "
285
3
                  << PrettyPrinter::print(flush_wait_time_ns, TUnit::TIME_NS)
286
3
                  << ", flush memtable cost: "
287
3
                  << PrettyPrinter::print(timer.elapsed_time(), TUnit::TIME_NS)
288
3
                  << ", submit count: " << _stats.flush_submit_count
289
3
                  << ", running count: " << _stats.flush_running_count
290
3
                  << ", finish count: " << _stats.flush_finish_count
291
3
                  << ", mem size: " << PrettyPrinter::print_bytes(memory_usage)
292
3
                  << ", disk size: " << PrettyPrinter::print_bytes(flush_size);
293
12
    _stats.flush_time_ns += timer.elapsed_time();
294
12
    _stats.flush_finish_count++;
295
12
    _stats.flush_size_bytes += memtable_ptr->memory_usage();
296
12
    _stats.flush_disk_size_bytes += flush_size;
297
12
}
298
299
std::pair<int, int> MemTableFlushExecutor::calc_flush_thread_count(int num_cpus, int num_disk,
300
103
                                                                   int thread_num_per_store) {
301
103
    if (config::enable_adaptive_flush_threads && num_cpus > 0) {
302
87
        int min = std::max(1, (int)(num_cpus * config::min_flush_thread_num_per_cpu));
303
87
        int max = std::max(min, num_cpus * config::max_flush_thread_num_per_cpu);
304
87
        return {min, max};
305
87
    }
306
16
    int min = std::max(1, thread_num_per_store);
307
16
    int max = num_cpus == 0
308
16
                      ? num_disk * min
309
16
                      : std::min(num_disk * min, num_cpus * config::max_flush_thread_num_per_cpu);
310
16
    return {min, max};
311
103
}
312
313
38
void MemTableFlushExecutor::init(int num_disk) {
314
38
    _num_disk = std::max(1, num_disk);
315
38
    int num_cpus = std::thread::hardware_concurrency();
316
317
38
    auto [min_threads, max_threads] =
318
38
            calc_flush_thread_count(num_cpus, _num_disk, config::flush_thread_num_per_store);
319
38
    static_cast<void>(ThreadPoolBuilder("MemTableFlushThreadPool")
320
38
                              .set_min_threads(min_threads)
321
38
                              .set_max_threads(max_threads)
322
38
                              .build(&_flush_pool));
323
324
38
    auto [hi_min, hi_max] = calc_flush_thread_count(
325
38
            num_cpus, _num_disk, config::high_priority_flush_thread_num_per_store);
326
38
    static_cast<void>(ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool")
327
38
                              .set_min_threads(hi_min)
328
38
                              .set_max_threads(hi_max)
329
38
                              .build(&_high_prio_flush_pool));
330
38
}
331
332
11
void MemTableFlushExecutor::update_memtable_flush_threads() {
333
11
    int num_cpus = std::thread::hardware_concurrency();
334
335
11
    auto [min_threads, max_threads] =
336
11
            calc_flush_thread_count(num_cpus, _num_disk, config::flush_thread_num_per_store);
337
    // Update max_threads first to avoid constraint violation when increasing min_threads
338
11
    static_cast<void>(_flush_pool->set_max_threads(max_threads));
339
11
    static_cast<void>(_flush_pool->set_min_threads(min_threads));
340
341
11
    auto [hi_min, hi_max] = calc_flush_thread_count(
342
11
            num_cpus, _num_disk, config::high_priority_flush_thread_num_per_store);
343
    // Update max_threads first to avoid constraint violation when increasing min_threads
344
11
    static_cast<void>(_high_prio_flush_pool->set_max_threads(hi_max));
345
11
    static_cast<void>(_high_prio_flush_pool->set_min_threads(hi_min));
346
11
}
347
348
// NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order.
349
Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& flush_token,
350
                                                 std::shared_ptr<RowsetWriter> rowset_writer,
351
                                                 bool is_high_priority,
352
15
                                                 std::shared_ptr<WorkloadGroup> wg_sptr) {
353
15
    switch (rowset_writer->type()) {
354
0
    case ALPHA_ROWSET:
355
        // alpha rowset do not support flush in CONCURRENT.  and not support alpha rowset now.
356
0
        return Status::InternalError<false>("not support alpha rowset load now.");
357
15
    case BETA_ROWSET: {
358
        // beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer.
359
15
        ThreadPool* pool = is_high_priority ? _high_prio_flush_pool.get() : _flush_pool.get();
360
15
        flush_token = FlushToken::create_shared(pool, wg_sptr);
361
15
        flush_token->set_rowset_writer(rowset_writer);
362
15
        return Status::OK();
363
0
    }
364
0
    default:
365
0
        return Status::InternalError<false>("unknown rowset type.");
366
15
    }
367
15
}
368
369
} // namespace doris