Coverage Report

Created: 2026-05-12 22:35

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/memtable/memtable_flush_executor.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/memtable/memtable_flush_executor.h"
19
20
#include <gen_cpp/olap_file.pb.h>
21
22
#include <algorithm>
23
#include <cstddef>
24
#include <ostream>
25
26
#include "common/config.h"
27
#include "common/logging.h"
28
#include "common/metrics/doris_metrics.h"
29
#include "common/metrics/metrics.h"
30
#include "common/metrics/system_metrics.h"
31
#include "common/signal_handler.h"
32
#include "exec/sink/autoinc_buffer.h"
33
#include "load/memtable/memtable.h"
34
#include "runtime/thread_context.h"
35
#include "storage/binlog.h"
36
#include "storage/rowset/group_rowset_writer.h"
37
#include "storage/rowset/rowset_writer.h"
38
#include "storage/storage_engine.h"
39
#include "storage/tablet_info.h"
40
#include "util/debug_points.h"
41
#include "util/pretty_printer.h"
42
#include "util/stopwatch.hpp"
43
#include "util/time.h"
44
45
namespace doris {
46
using namespace ErrorCode;
47
48
bvar::Adder<int64_t> g_flush_task_num("memtable_flush_task_num");
49
50
class MemtableFlushTask : public Runnable {
51
    ENABLE_FACTORY_CREATOR(MemtableFlushTask);
52
53
public:
54
    MemtableFlushTask(std::shared_ptr<FlushToken> flush_token, std::shared_ptr<MemTable> memtable,
55
                      int32_t segment_id, int64_t submit_task_time)
56
18
            : _flush_token(flush_token),
57
18
              _memtable(memtable),
58
18
              _segment_id(segment_id),
59
18
              _submit_task_time(submit_task_time) {
60
18
        g_flush_task_num << 1;
61
18
    }
62
63
18
    ~MemtableFlushTask() override { g_flush_task_num << -1; }
64
65
12
    void run() override {
66
12
        auto token = _flush_token.lock();
67
12
        if (token) {
68
12
            token->_flush_memtable(_memtable, _segment_id, _submit_task_time);
69
12
        } else {
70
0
            LOG(WARNING) << "flush token is deconstructed, ignore the flush task";
71
0
        }
72
12
    }
73
74
protected:
75
    std::weak_ptr<FlushToken> _flush_token;
76
    std::shared_ptr<MemTable> _memtable;
77
    int32_t _segment_id;
78
    int64_t _submit_task_time;
79
};
80
81
class PartOfGroupMemtableFlushTask final : public MemtableFlushTask {
82
    ENABLE_FACTORY_CREATOR(PartOfGroupMemtableFlushTask);
83
84
public:
85
    PartOfGroupMemtableFlushTask(std::shared_ptr<FlushToken> flush_token,
86
                                 std::shared_ptr<SharedMemtable> shared_memtable,
87
                                 WriteRequestType write_req_type, int64_t submit_task_time)
88
6
            : MemtableFlushTask(flush_token, nullptr, 0, submit_task_time),
89
6
              _shared_memtable(std::move(shared_memtable)),
90
6
              _write_req_type(write_req_type) {}
91
92
6
    void run() override {
93
6
        auto token = _flush_token.lock();
94
6
        if (token) {
95
6
            token->_flush_group_memtable(_shared_memtable, _write_req_type, _submit_task_time);
96
6
        } else {
97
0
            LOG(WARNING) << "flush token is deconstructed, ignore the flush task";
98
0
        }
99
6
    }
100
101
private:
102
    std::shared_ptr<SharedMemtable> _shared_memtable;
103
    WriteRequestType _write_req_type;
104
};
105
106
0
std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
107
0
    os << "(flush time(ms)=" << stat.flush_time_ns / NANOS_PER_MILLIS
108
0
       << ", flush wait time(ms)=" << stat.flush_wait_time_ns / NANOS_PER_MILLIS
109
0
       << ", flush submit count=" << stat.flush_submit_count
110
0
       << ", running flush count=" << stat.flush_running_count
111
0
       << ", finish flush count=" << stat.flush_finish_count
112
0
       << ", flush bytes: " << stat.flush_size_bytes
113
0
       << ", flush disk bytes: " << stat.flush_disk_size_bytes << ")";
114
0
    return os;
115
0
}
116
117
3
SharedMemtable::~SharedMemtable() {
118
3
    if (block == nullptr) {
119
0
        return;
120
0
    }
121
3
    DCHECK(memtable != nullptr);
122
3
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
123
3
            memtable->resource_ctx()->memory_context()->mem_tracker()->write_tracker());
124
3
    SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker());
125
3
    block.reset();
126
3
}
127
128
Status FlushToken::_submit_sub_tasks(ThreadPool* pool,
129
15
                                     std::vector<std::shared_ptr<Runnable>> sub_tasks) {
130
33
    for (int i = 0; i < sub_tasks.size(); ++i) {
131
18
        {
132
18
            std::shared_lock rdlk(_flush_status_lock);
133
18
            DBUG_EXECUTE_IF("FlushToken.submit_sub_task_error", {
134
18
                if (i != 0) {
135
                    // only affect flush binlog task
136
18
                    _flush_status = Status::IOError<false>("dbug_be_memtable_submit_flush_error");
137
18
                }
138
18
            });
139
18
            if (!_flush_status.ok()) {
140
0
                return _flush_status;
141
0
            }
142
18
        }
143
18
        Status submit_st = pool->submit(std::move(sub_tasks[i]));
144
18
        if (UNLIKELY(!submit_st.ok())) {
145
0
            {
146
0
                std::lock_guard wrlk(_flush_status_lock);
147
0
                if (_flush_status.ok()) {
148
0
                    _flush_status = submit_st;
149
0
                }
150
0
            }
151
0
            _shutdown_flush_token();
152
0
            return submit_st;
153
0
        }
154
18
        _stats.flush_submit_count++;
155
18
    }
156
15
    return Status::OK();
157
15
}
158
159
18
Status FlushToken::submit(std::shared_ptr<MemTable> mem_table) {
160
18
    {
161
18
        std::shared_lock rdlk(_flush_status_lock);
162
18
        DBUG_EXECUTE_IF("FlushToken.submit_flush_error", {
163
18
            _flush_status = Status::IOError<false>("dbug_be_memtable_submit_flush_error");
164
18
        });
165
18
        if (!_flush_status.ok()) {
166
0
            return _flush_status;
167
0
        }
168
18
    }
169
170
18
    if (mem_table == nullptr || mem_table->empty()) {
171
3
        return Status::OK();
172
3
    }
173
15
    int64_t submit_task_time = MonotonicNanos();
174
15
    auto* group_rowset_writer = typeid_cast<GroupRowsetWriter*>(_rowset_writer.get());
175
15
    std::shared_ptr<SharedMemtable> shared_memtable;
176
15
    std::vector<std::shared_ptr<Runnable>> tasks;
177
15
    if (group_rowset_writer != nullptr) {
178
3
        auto data_writer = group_rowset_writer->data_writer();
179
3
        auto binlog_writer = group_rowset_writer->row_binlog_writer();
180
3
        DCHECK(data_writer != nullptr);
181
3
        DCHECK(binlog_writer != nullptr);
182
183
3
        shared_memtable = std::make_shared<SharedMemtable>();
184
3
        shared_memtable->memtable = mem_table;
185
        // Keep data/binlog segment_id allocators in sync.
186
3
        auto segment_id = data_writer->allocate_segment_id();
187
3
        auto binlog_segment_id = binlog_writer->allocate_segment_id();
188
3
        DCHECK_EQ(segment_id, binlog_segment_id);
189
3
        shared_memtable->segment_id = segment_id;
190
191
3
        if (binlog_writer->context().write_binlog_opt().need_build_binlog()) {
192
3
            if (_row_binlog_lsn_buffer == nullptr) {
193
0
                std::unique_lock<std::mutex> lock(_mutex);
194
0
                if (_row_binlog_lsn_buffer == nullptr) {
195
0
                    if (_table_schema_param == nullptr) {
196
0
                        return Status::InternalError<false>(
197
0
                                "need binlog but table_schema_param is null");
198
0
                    }
199
0
                    _row_binlog_lsn_buffer =
200
0
                            GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer(
201
0
                                    _table_schema_param->db_id(), _table_schema_param->table_id(),
202
0
                                    kBinlogLsnAutoIncId);
203
0
                }
204
0
            }
205
3
            std::shared_ptr<std::vector<int128_t>> lsn;
206
3
            RETURN_IF_ERROR(
207
3
                    allocate_binlog_lsn(_row_binlog_lsn_buffer, mem_table->raw_rows(), &lsn));
208
3
            DCHECK(lsn != nullptr && !lsn->empty());
209
3
            const_cast<RowsetWriterContext&>(binlog_writer->context())
210
3
                    .write_binlog_opt()
211
3
                    .write_binlog_config()
212
3
                    .insert_seg_lsn(shared_memtable->segment_id, lsn);
213
3
        }
214
215
3
        tasks.emplace_back(PartOfGroupMemtableFlushTask::create_shared(
216
3
                shared_from_this(), shared_memtable, WriteRequestType::DATA_IN_GROUP,
217
3
                submit_task_time));
218
3
        tasks.emplace_back(PartOfGroupMemtableFlushTask::create_shared(
219
3
                shared_from_this(), shared_memtable, WriteRequestType::BINLOG_IN_GROUP,
220
3
                submit_task_time));
221
12
    } else {
222
12
        tasks.emplace_back(MemtableFlushTask::create_shared(shared_from_this(), mem_table,
223
12
                                                            _rowset_writer->allocate_segment_id(),
224
12
                                                            submit_task_time));
225
12
    }
226
    // NOTE: we should guarantee WorkloadGroup is not deconstructed when submit memtable flush task.
227
    // because currently WorkloadGroup's can only be destroyed when all queries in the group is finished,
228
    // but not consider whether load channel is finish.
229
15
    std::shared_ptr<WorkloadGroup> wg_sptr = _wg_wptr.lock();
230
15
    ThreadPool* wg_thread_pool = nullptr;
231
15
    if (wg_sptr) {
232
0
        wg_thread_pool = wg_sptr->get_memtable_flush_pool();
233
0
    }
234
15
    ThreadPool* pool = wg_thread_pool ? wg_thread_pool : _thread_pool;
235
236
15
    return _submit_sub_tasks(pool, std::move(tasks));
237
15
}
238
239
void FlushToken::_flush_group_memtable(std::shared_ptr<SharedMemtable> shared_memtable,
240
6
                                       WriteRequestType write_req_type, int64_t submit_task_time) {
241
6
    DCHECK(shared_memtable != nullptr);
242
6
    DCHECK(shared_memtable->memtable != nullptr);
243
6
    DCHECK(write_req_type == WriteRequestType::DATA_IN_GROUP ||
244
6
           write_req_type == WriteRequestType::BINLOG_IN_GROUP);
245
246
6
    auto* group_rowset_writer = typeid_cast<GroupRowsetWriter*>(_rowset_writer.get());
247
6
    DCHECK(group_rowset_writer != nullptr);
248
6
    auto flush_writer = write_req_type == WriteRequestType::DATA_IN_GROUP
249
6
                                ? group_rowset_writer->data_writer()
250
6
                                : group_rowset_writer->row_binlog_writer();
251
6
    DCHECK(flush_writer != nullptr);
252
6
    _flush_memtable_impl(flush_writer.get(), shared_memtable->memtable.get(),
253
6
                         shared_memtable->segment_id, submit_task_time, shared_memtable.get());
254
6
}
255
256
// NOTE: FlushToken's submit/cancel/wait run in one thread,
257
// so we don't need to make them mutually exclusive, std::atomic is enough.
258
18
void FlushToken::_wait_submit_task_finish() {
259
18
    std::unique_lock<std::mutex> lock(_mutex);
260
33
    _submit_task_finish_cond.wait(lock, [&]() { return _stats.flush_submit_count.load() == 0; });
261
18
}
262
263
30
void FlushToken::_wait_running_task_finish() {
264
30
    std::unique_lock<std::mutex> lock(_mutex);
265
30
    _running_task_finish_cond.wait(lock, [&]() { return _stats.flush_running_count.load() == 0; });
266
30
}
267
268
30
void FlushToken::cancel() {
269
30
    _shutdown_flush_token();
270
30
    _wait_running_task_finish();
271
30
}
272
273
18
Status FlushToken::wait() {
274
18
    _wait_submit_task_finish();
275
18
    {
276
18
        std::shared_lock rdlk(_flush_status_lock);
277
18
        if (!_flush_status.ok()) {
278
2
            return _flush_status;
279
2
        }
280
18
    }
281
16
    return Status::OK();
282
18
}
283
284
Status FlushToken::_try_reserve_memory(const std::shared_ptr<ResourceContext>& resource_context,
285
0
                                       int64_t size) {
286
0
    auto* thread_context = doris::thread_context();
287
0
    auto* memtable_flush_executor =
288
0
            ExecEnv::GetInstance()->storage_engine().memtable_flush_executor();
289
0
    Status st;
290
0
    int32_t max_waiting_time = config::memtable_wait_for_memory_sleep_time_s;
291
0
    do {
292
        // only try to reserve process memory
293
0
        st = thread_context->thread_mem_tracker_mgr->try_reserve(
294
0
                size, ThreadMemTrackerMgr::TryReserveChecker::CHECK_PROCESS);
295
0
        if (st.ok()) {
296
0
            memtable_flush_executor->inc_flushing_task();
297
0
            break;
298
0
        }
299
0
        if (_is_shutdown() || resource_context->task_controller()->is_cancelled()) {
300
0
            st = Status::Cancelled("flush memtable already cancelled");
301
0
            break;
302
0
        }
303
        // Make sure at least one memtable is flushing even reserve memory failed.
304
0
        if (memtable_flush_executor->check_and_inc_has_any_flushing_task()) {
305
            // If there are already any flushing task, Wait for some time and retry.
306
0
            LOG_EVERY_T(INFO, 60) << fmt::format(
307
0
                    "Failed to reserve memory {} for flush memtable, retry after 100ms",
308
0
                    PrettyPrinter::print_bytes(size));
309
0
            std::this_thread::sleep_for(std::chrono::seconds(1));
310
0
            max_waiting_time -= 1;
311
0
        } else {
312
0
            st = Status::OK();
313
0
            break;
314
0
        }
315
0
    } while (max_waiting_time > 0);
316
0
    return st;
317
0
}
318
319
Status FlushToken::_memtable2block(MemTable* memtable, SharedMemtable* shared_memtable,
320
17
                                   std::shared_ptr<Block>& flush_block) {
321
17
    DCHECK(memtable != nullptr);
322
323
17
    if (shared_memtable == nullptr) {
324
12
        std::unique_ptr<Block> block;
325
12
        RETURN_IF_ERROR(memtable->to_block(&block));
326
12
        flush_block.reset(block.release());
327
12
        return Status::OK();
328
12
    }
329
330
5
    std::call_once(shared_memtable->block_once, [&]() {
331
3
        std::unique_ptr<Block> block;
332
3
        shared_memtable->block_status = memtable->to_block(&block);
333
3
        if (shared_memtable->block_status.ok()) {
334
3
            shared_memtable->block.reset(block.release());
335
3
        }
336
3
    });
337
5
    if (!shared_memtable->block_status.ok()) {
338
0
        return shared_memtable->block_status;
339
0
    }
340
5
    flush_block = shared_memtable->block;
341
5
    DCHECK(flush_block != nullptr);
342
5
    return Status::OK();
343
5
}
344
345
void FlushToken::_flush_memtable_impl(RowsetWriter* flush_writer, MemTable* memtable,
346
                                      int32_t segment_id, int64_t submit_task_time,
347
18
                                      SharedMemtable* shared_memtable) {
348
18
    DCHECK(flush_writer != nullptr);
349
18
    DCHECK(memtable != nullptr);
350
351
18
    signal::set_signal_task_id(flush_writer->load_id());
352
18
    signal::tablet_id = memtable->tablet_id();
353
    // Count the task as running before registering the deferred cleanup so
354
    // cancel/shutdown paths keep flush_running_count symmetric on every exit.
355
18
    _stats.flush_running_count++;
356
18
    Defer defer {[&]() {
357
18
        std::lock_guard<std::mutex> lock(_mutex);
358
18
        _stats.flush_submit_count--;
359
18
        if (_stats.flush_submit_count == 0) {
360
15
            _submit_task_finish_cond.notify_one();
361
15
        }
362
18
        _stats.flush_running_count--;
363
18
        if (_stats.flush_running_count == 0) {
364
15
            _running_task_finish_cond.notify_one();
365
15
        }
366
18
    }};
367
18
    DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_before_first_shutdown",
368
18
                    { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
369
18
    if (_is_shutdown()) {
370
0
        return;
371
0
    }
372
18
    DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_after_first_shutdown",
373
18
                    { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
374
    // double check if shutdown to avoid wait running task finish count not accurate
375
18
    if (_is_shutdown()) {
376
0
        return;
377
0
    }
378
18
    DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_after_second_shutdown",
379
18
                    { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
380
18
    uint64_t flush_wait_time_ns = MonotonicNanos() - submit_task_time;
381
18
    _stats.flush_wait_time_ns += flush_wait_time_ns;
382
    // If previous flush has failed, return directly
383
18
    {
384
18
        std::shared_lock rdlk(_flush_status_lock);
385
18
        if (!_flush_status.ok()) {
386
0
            return;
387
0
        }
388
18
    }
389
390
18
    MonotonicStopWatch timer;
391
18
    timer.start();
392
18
    size_t memory_usage = memtable->memory_usage();
393
394
18
    int64_t flush_size = 0;
395
18
    Status s;
396
18
    memtable->update_mem_type(MemType::FLUSH);
397
18
    int64_t duration_ns = 0;
398
18
    {
399
18
        s = [&]() {
400
18
            SCOPED_RAW_TIMER(&duration_ns);
401
18
            SCOPED_ATTACH_TASK(memtable->resource_ctx());
402
18
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
403
18
                    memtable->resource_ctx()->memory_context()->mem_tracker()->write_tracker());
404
18
            SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker());
405
406
            // DEFER_RELEASE_RESERVED();
407
408
            // auto reserve_size = memtable->get_flush_reserve_memory_size();
409
            // if (memtable->resource_ctx()->task_controller()->is_enable_reserve_memory() &&
410
            //     reserve_size > 0) {
411
            //     RETURN_IF_ERROR(_try_reserve_memory(memtable->resource_ctx(), reserve_size));
412
            // }
413
414
            // Defer defer {[&]() {
415
            //     ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->dec_flushing_task();
416
            // }};
417
18
            std::shared_ptr<Block> flush_block;
418
18
            RETURN_IF_ERROR(_memtable2block(memtable, shared_memtable, flush_block));
419
18
            RETURN_IF_ERROR(
420
18
                    flush_writer->flush_memtable(flush_block.get(), segment_id, &flush_size));
421
16
            memtable->set_flush_success();
422
423
16
            return Status::OK();
424
18
        }();
425
426
18
        if (s.ok()) {
427
16
            bool record_memtable_stat = shared_memtable == nullptr;
428
16
            if (shared_memtable != nullptr) {
429
4
                auto finished_sub_task_count = shared_memtable->add_finished_sub_task() + 1;
430
4
                record_memtable_stat =
431
4
                        finished_sub_task_count == shared_memtable->total_sub_task_count.load();
432
4
            }
433
16
            if (record_memtable_stat) {
434
13
                _memtable_stat += memtable->stat();
435
13
            }
436
16
            DorisMetrics::instance()->memtable_flush_total->increment(1);
437
16
            DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000);
438
16
        }
439
18
    }
440
441
18
    {
442
18
        std::shared_lock rdlk(_flush_status_lock);
443
18
        if (!_flush_status.ok()) {
444
0
            return;
445
0
        }
446
18
    }
447
18
    if (!s.ok()) {
448
2
        std::lock_guard wrlk(_flush_status_lock);
449
2
        if (_flush_status.ok()) {
450
2
            LOG(WARNING) << "Flush memtable failed with res = " << s
451
2
                         << ", load_id: " << print_id(flush_writer->load_id());
452
2
            _flush_status = s;
453
2
        }
454
2
        _shutdown_flush_token();
455
2
        return;
456
2
    }
457
458
16
    VLOG_CRITICAL << "flush memtable wait time: "
459
3
                  << PrettyPrinter::print(flush_wait_time_ns, TUnit::TIME_NS)
460
3
                  << ", flush memtable cost: "
461
3
                  << PrettyPrinter::print(timer.elapsed_time(), TUnit::TIME_NS)
462
3
                  << ", submit count: " << _stats.flush_submit_count
463
3
                  << ", running count: " << _stats.flush_running_count
464
3
                  << ", finish count: " << _stats.flush_finish_count
465
3
                  << ", mem size: " << PrettyPrinter::print_bytes(memory_usage)
466
3
                  << ", disk size: " << PrettyPrinter::print_bytes(flush_size);
467
16
    _stats.flush_time_ns += timer.elapsed_time();
468
16
    _stats.flush_finish_count++;
469
16
    _stats.flush_size_bytes += memtable->memory_usage();
470
16
    _stats.flush_disk_size_bytes += flush_size;
471
16
}
472
473
void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t segment_id,
474
12
                                 int64_t submit_task_time) {
475
12
    _flush_memtable_impl(_rowset_writer.get(), memtable_ptr.get(), segment_id, submit_task_time);
476
12
}
477
478
std::pair<int, int> MemTableFlushExecutor::calc_flush_thread_count(int num_cpus, int num_disk,
479
113
                                                                   int thread_num_per_store) {
480
113
    if (config::enable_adaptive_flush_threads && num_cpus > 0) {
481
97
        int min = std::max(1, (int)(num_cpus * config::min_flush_thread_num_per_cpu));
482
97
        int max = std::max(min, num_cpus * config::max_flush_thread_num_per_cpu);
483
97
        return {min, max};
484
97
    }
485
16
    int min = std::max(1, thread_num_per_store);
486
16
    int max = num_cpus == 0
487
16
                      ? num_disk * min
488
16
                      : std::min(num_disk * min, num_cpus * config::max_flush_thread_num_per_cpu);
489
16
    return {min, max};
490
113
}
491
492
43
void MemTableFlushExecutor::init(int num_disk) {
493
43
    _num_disk = std::max(1, num_disk);
494
43
    int num_cpus = std::thread::hardware_concurrency();
495
496
43
    auto [min_threads, max_threads] =
497
43
            calc_flush_thread_count(num_cpus, _num_disk, config::flush_thread_num_per_store);
498
43
    static_cast<void>(ThreadPoolBuilder("MemTableFlushThreadPool")
499
43
                              .set_min_threads(min_threads)
500
43
                              .set_max_threads(max_threads)
501
43
                              .build(&_flush_pool));
502
503
43
    auto [hi_min, hi_max] = calc_flush_thread_count(
504
43
            num_cpus, _num_disk, config::high_priority_flush_thread_num_per_store);
505
43
    static_cast<void>(ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool")
506
43
                              .set_min_threads(hi_min)
507
43
                              .set_max_threads(hi_max)
508
43
                              .build(&_high_prio_flush_pool));
509
43
}
510
511
11
void MemTableFlushExecutor::update_memtable_flush_threads() {
512
11
    int num_cpus = std::thread::hardware_concurrency();
513
514
11
    auto [min_threads, max_threads] =
515
11
            calc_flush_thread_count(num_cpus, _num_disk, config::flush_thread_num_per_store);
516
    // Update max_threads first to avoid constraint violation when increasing min_threads
517
11
    static_cast<void>(_flush_pool->set_max_threads(max_threads));
518
11
    static_cast<void>(_flush_pool->set_min_threads(min_threads));
519
520
11
    auto [hi_min, hi_max] = calc_flush_thread_count(
521
11
            num_cpus, _num_disk, config::high_priority_flush_thread_num_per_store);
522
    // Update max_threads first to avoid constraint violation when increasing min_threads
523
11
    static_cast<void>(_high_prio_flush_pool->set_max_threads(hi_max));
524
11
    static_cast<void>(_high_prio_flush_pool->set_min_threads(hi_min));
525
11
}
526
527
// NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order.
528
Status MemTableFlushExecutor::create_flush_token(
529
        std::shared_ptr<FlushToken>& flush_token, std::shared_ptr<RowsetWriter> rowset_writer,
530
        bool is_high_priority, std::shared_ptr<WorkloadGroup> wg_sptr,
531
17
        std::shared_ptr<OlapTableSchemaParam> table_schema_param) {
532
17
    switch (rowset_writer->type()) {
533
0
    case ALPHA_ROWSET:
534
        // alpha rowset do not support flush in CONCURRENT.  and not support alpha rowset now.
535
0
        return Status::InternalError<false>("not support alpha rowset load now.");
536
17
    case BETA_ROWSET: {
537
        // beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer.
538
17
        ThreadPool* pool = is_high_priority ? _high_prio_flush_pool.get() : _flush_pool.get();
539
17
        flush_token = FlushToken::create_shared(pool, wg_sptr);
540
17
        flush_token->set_rowset_writer(rowset_writer);
541
17
        flush_token->set_table_schema_param(std::move(table_schema_param));
542
17
        return Status::OK();
543
0
    }
544
0
    default:
545
0
        return Status::InternalError<false>("unknown rowset type.");
546
17
    }
547
17
}
548
549
} // namespace doris