Coverage Report

Created: 2026-06-25 00:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/memtable/memtable_flush_executor.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/memtable/memtable_flush_executor.h"
19
20
#include <gen_cpp/olap_file.pb.h>
21
22
#include <algorithm>
23
#include <cstddef>
24
#include <ostream>
25
26
#include "common/config.h"
27
#include "common/logging.h"
28
#include "common/metrics/doris_metrics.h"
29
#include "common/metrics/metrics.h"
30
#include "common/metrics/system_metrics.h"
31
#include "common/signal_handler.h"
32
#include "load/memtable/memtable.h"
33
#include "runtime/thread_context.h"
34
#include "storage/binlog.h"
35
#include "storage/rowset/group_rowset_writer.h"
36
#include "storage/rowset/rowset_writer.h"
37
#include "storage/storage_engine.h"
38
#include "storage/tablet_info.h"
39
#include "util/debug_points.h"
40
#include "util/pretty_printer.h"
41
#include "util/stopwatch.hpp"
42
#include "util/time.h"
43
44
namespace doris {
45
using namespace ErrorCode;
46
47
bvar::Adder<int64_t> g_flush_task_num("memtable_flush_task_num");
48
49
class MemtableFlushTask : public Runnable {
50
    ENABLE_FACTORY_CREATOR(MemtableFlushTask);
51
52
public:
53
    MemtableFlushTask(std::shared_ptr<FlushToken> flush_token, std::shared_ptr<MemTable> memtable,
54
                      int32_t segment_id, int64_t submit_task_time)
55
18
            : _flush_token(flush_token),
56
18
              _memtable(memtable),
57
18
              _segment_id(segment_id),
58
18
              _submit_task_time(submit_task_time) {
59
18
        g_flush_task_num << 1;
60
18
    }
61
62
18
    ~MemtableFlushTask() override { g_flush_task_num << -1; }
63
64
12
    void run() override {
65
12
        auto token = _flush_token.lock();
66
12
        if (token) {
67
12
            token->_flush_memtable(_memtable, _segment_id, _submit_task_time);
68
12
        } else {
69
0
            LOG(WARNING) << "flush token is deconstructed, ignore the flush task";
70
0
        }
71
12
    }
72
73
protected:
74
    std::weak_ptr<FlushToken> _flush_token;
75
    std::shared_ptr<MemTable> _memtable;
76
    int32_t _segment_id;
77
    int64_t _submit_task_time;
78
};
79
80
class PartOfGroupMemtableFlushTask final : public MemtableFlushTask {
81
    ENABLE_FACTORY_CREATOR(PartOfGroupMemtableFlushTask);
82
83
public:
84
    PartOfGroupMemtableFlushTask(std::shared_ptr<FlushToken> flush_token,
85
                                 std::shared_ptr<SharedMemtable> shared_memtable,
86
                                 WriteRequestType write_req_type, int64_t submit_task_time)
87
6
            : MemtableFlushTask(flush_token, nullptr, 0, submit_task_time),
88
6
              _shared_memtable(std::move(shared_memtable)),
89
6
              _write_req_type(write_req_type) {}
90
91
6
    void run() override {
92
6
        auto token = _flush_token.lock();
93
6
        if (token) {
94
6
            token->_flush_group_memtable(_shared_memtable, _write_req_type, _submit_task_time);
95
6
        } else {
96
0
            LOG(WARNING) << "flush token is deconstructed, ignore the flush task";
97
0
        }
98
6
    }
99
100
private:
101
    std::shared_ptr<SharedMemtable> _shared_memtable;
102
    WriteRequestType _write_req_type;
103
};
104
105
0
std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
106
0
    os << "(flush time(ms)=" << stat.flush_time_ns / NANOS_PER_MILLIS
107
0
       << ", flush wait time(ms)=" << stat.flush_wait_time_ns / NANOS_PER_MILLIS
108
0
       << ", flush submit count=" << stat.flush_submit_count
109
0
       << ", running flush count=" << stat.flush_running_count
110
0
       << ", finish flush count=" << stat.flush_finish_count
111
0
       << ", flush bytes: " << stat.flush_size_bytes
112
0
       << ", flush disk bytes: " << stat.flush_disk_size_bytes << ")";
113
0
    return os;
114
0
}
115
116
3
SharedMemtable::~SharedMemtable() {
117
3
    if (block == nullptr) {
118
0
        return;
119
0
    }
120
3
    DCHECK(memtable != nullptr);
121
3
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
122
3
            memtable->resource_ctx()->memory_context()->mem_tracker()->write_tracker());
123
3
    SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker());
124
3
    block.reset();
125
3
}
126
127
Status FlushToken::_submit_sub_tasks(ThreadPool* pool,
128
15
                                     std::vector<std::shared_ptr<Runnable>> sub_tasks) {
129
33
    for (int i = 0; i < sub_tasks.size(); ++i) {
130
18
        {
131
18
            std::shared_lock rdlk(_flush_status_lock);
132
18
            DBUG_EXECUTE_IF("FlushToken.submit_sub_task_error", {
133
18
                if (i != 0) {
134
                    // only affect flush binlog task
135
18
                    _flush_status = Status::IOError<false>("dbug_be_memtable_submit_flush_error");
136
18
                }
137
18
            });
138
18
            if (!_flush_status.ok()) {
139
0
                return _flush_status;
140
0
            }
141
18
        }
142
18
        Status submit_st = pool->submit(std::move(sub_tasks[i]));
143
18
        if (UNLIKELY(!submit_st.ok())) {
144
0
            {
145
0
                std::lock_guard wrlk(_flush_status_lock);
146
0
                if (_flush_status.ok()) {
147
0
                    _flush_status = submit_st;
148
0
                }
149
0
            }
150
0
            _shutdown_flush_token();
151
0
            return submit_st;
152
0
        }
153
18
        _stats.flush_submit_count++;
154
18
    }
155
15
    return Status::OK();
156
15
}
157
158
18
Status FlushToken::submit(std::shared_ptr<MemTable> mem_table) {
159
18
    {
160
18
        std::shared_lock rdlk(_flush_status_lock);
161
18
        DBUG_EXECUTE_IF("FlushToken.submit_flush_error", {
162
18
            _flush_status = Status::IOError<false>("dbug_be_memtable_submit_flush_error");
163
18
        });
164
18
        if (!_flush_status.ok()) {
165
0
            return _flush_status;
166
0
        }
167
18
    }
168
169
18
    if (mem_table == nullptr || mem_table->empty()) {
170
3
        return Status::OK();
171
3
    }
172
15
    int64_t submit_task_time = MonotonicNanos();
173
15
    auto* group_rowset_writer = typeid_cast<GroupRowsetWriter*>(_rowset_writer.get());
174
15
    std::shared_ptr<SharedMemtable> shared_memtable;
175
15
    std::vector<std::shared_ptr<Runnable>> tasks;
176
15
    if (group_rowset_writer != nullptr) {
177
3
        auto data_writer = group_rowset_writer->data_writer();
178
3
        auto binlog_writer = group_rowset_writer->row_binlog_writer();
179
3
        DCHECK(data_writer != nullptr);
180
3
        DCHECK(binlog_writer != nullptr);
181
182
3
        shared_memtable = std::make_shared<SharedMemtable>();
183
3
        shared_memtable->memtable = mem_table;
184
        // Keep data/binlog segment_id allocators in sync.
185
3
        auto segment_id = data_writer->allocate_segment_id();
186
3
        auto binlog_segment_id = binlog_writer->allocate_segment_id();
187
3
        DCHECK_EQ(segment_id, binlog_segment_id);
188
3
        shared_memtable->segment_id = segment_id;
189
190
3
        tasks.emplace_back(PartOfGroupMemtableFlushTask::create_shared(
191
3
                shared_from_this(), shared_memtable, WriteRequestType::DATA, submit_task_time));
192
3
        tasks.emplace_back(PartOfGroupMemtableFlushTask::create_shared(
193
3
                shared_from_this(), shared_memtable, WriteRequestType::ROW_BINLOG,
194
3
                submit_task_time));
195
12
    } else {
196
12
        tasks.emplace_back(MemtableFlushTask::create_shared(shared_from_this(), mem_table,
197
12
                                                            _rowset_writer->allocate_segment_id(),
198
12
                                                            submit_task_time));
199
12
    }
200
    // NOTE: we should guarantee WorkloadGroup is not deconstructed when submit memtable flush task.
201
    // because currently WorkloadGroup's can only be destroyed when all queries in the group is finished,
202
    // but not consider whether load channel is finish.
203
15
    std::shared_ptr<WorkloadGroup> wg_sptr = _wg_wptr.lock();
204
15
    ThreadPool* wg_thread_pool = nullptr;
205
15
    if (wg_sptr) {
206
0
        wg_thread_pool = wg_sptr->get_memtable_flush_pool();
207
0
    }
208
15
    ThreadPool* pool = wg_thread_pool ? wg_thread_pool : _thread_pool;
209
210
15
    return _submit_sub_tasks(pool, std::move(tasks));
211
18
}
212
213
void FlushToken::_flush_group_memtable(std::shared_ptr<SharedMemtable> shared_memtable,
214
6
                                       WriteRequestType write_req_type, int64_t submit_task_time) {
215
6
    DCHECK(shared_memtable != nullptr);
216
6
    DCHECK(shared_memtable->memtable != nullptr);
217
6
    DCHECK(write_req_type == WriteRequestType::DATA ||
218
6
           write_req_type == WriteRequestType::ROW_BINLOG);
219
220
6
    auto* group_rowset_writer = typeid_cast<GroupRowsetWriter*>(_rowset_writer.get());
221
6
    DCHECK(group_rowset_writer != nullptr);
222
6
    auto flush_writer = write_req_type == WriteRequestType::DATA
223
6
                                ? group_rowset_writer->data_writer()
224
6
                                : group_rowset_writer->row_binlog_writer();
225
6
    DCHECK(flush_writer != nullptr);
226
6
    _flush_memtable_impl(flush_writer.get(), shared_memtable->memtable.get(),
227
6
                         shared_memtable->segment_id, submit_task_time, shared_memtable.get());
228
6
}
229
230
// NOTE: FlushToken's submit/cancel/wait run in one thread,
231
// so we don't need to make them mutually exclusive, std::atomic is enough.
232
18
void FlushToken::_wait_submit_task_finish() {
233
18
    std::unique_lock<std::mutex> lock(_mutex);
234
33
    _submit_task_finish_cond.wait(lock, [&]() { return _stats.flush_submit_count.load() == 0; });
235
18
}
236
237
32
void FlushToken::_wait_running_task_finish() {
238
32
    std::unique_lock<std::mutex> lock(_mutex);
239
32
    _running_task_finish_cond.wait(lock, [&]() { return _stats.flush_running_count.load() == 0; });
240
32
}
241
242
32
void FlushToken::cancel() {
243
32
    _shutdown_flush_token();
244
32
    _wait_running_task_finish();
245
32
}
246
247
18
Status FlushToken::wait() {
248
18
    _wait_submit_task_finish();
249
18
    {
250
18
        std::shared_lock rdlk(_flush_status_lock);
251
18
        if (!_flush_status.ok()) {
252
2
            return _flush_status;
253
2
        }
254
18
    }
255
16
    return Status::OK();
256
18
}
257
258
Status FlushToken::_try_reserve_memory(const std::shared_ptr<ResourceContext>& resource_context,
259
0
                                       int64_t size) {
260
0
    auto* thread_context = doris::thread_context();
261
0
    auto* memtable_flush_executor =
262
0
            ExecEnv::GetInstance()->storage_engine().memtable_flush_executor();
263
0
    Status st;
264
0
    int32_t max_waiting_time = config::memtable_wait_for_memory_sleep_time_s;
265
0
    do {
266
        // only try to reserve process memory
267
0
        st = thread_context->thread_mem_tracker_mgr->try_reserve(
268
0
                size, ThreadMemTrackerMgr::TryReserveChecker::CHECK_PROCESS);
269
0
        if (st.ok()) {
270
0
            memtable_flush_executor->inc_flushing_task();
271
0
            break;
272
0
        }
273
0
        if (_is_shutdown() || resource_context->task_controller()->is_cancelled()) {
274
0
            st = Status::Cancelled("flush memtable already cancelled");
275
0
            break;
276
0
        }
277
        // Make sure at least one memtable is flushing even reserve memory failed.
278
0
        if (memtable_flush_executor->check_and_inc_has_any_flushing_task()) {
279
            // If there are already any flushing task, Wait for some time and retry.
280
0
            LOG_EVERY_T(INFO, 60) << fmt::format(
281
0
                    "Failed to reserve memory {} for flush memtable, retry after 100ms",
282
0
                    PrettyPrinter::print_bytes(size));
283
0
            std::this_thread::sleep_for(std::chrono::seconds(1));
284
0
            max_waiting_time -= 1;
285
0
        } else {
286
0
            st = Status::OK();
287
0
            break;
288
0
        }
289
0
    } while (max_waiting_time > 0);
290
0
    return st;
291
0
}
292
293
Status FlushToken::_memtable2block(MemTable* memtable, SharedMemtable* shared_memtable,
294
18
                                   std::shared_ptr<Block>& flush_block) {
295
18
    DCHECK(memtable != nullptr);
296
297
18
    if (shared_memtable == nullptr) {
298
12
        std::unique_ptr<Block> block;
299
12
        RETURN_IF_ERROR(memtable->to_block(&block));
300
12
        flush_block.reset(block.release());
301
12
        return Status::OK();
302
12
    }
303
304
6
    std::call_once(shared_memtable->block_once, [&]() {
305
3
        std::unique_ptr<Block> block;
306
3
        shared_memtable->block_status = memtable->to_block(&block);
307
3
        if (shared_memtable->block_status.ok()) {
308
3
            shared_memtable->block.reset(block.release());
309
3
        }
310
3
    });
311
6
    if (!shared_memtable->block_status.ok()) {
312
0
        return shared_memtable->block_status;
313
0
    }
314
6
    flush_block = shared_memtable->block;
315
6
    DCHECK(flush_block != nullptr);
316
6
    return Status::OK();
317
6
}
318
319
void FlushToken::_flush_memtable_impl(RowsetWriter* flush_writer, MemTable* memtable,
320
                                      int32_t segment_id, int64_t submit_task_time,
321
18
                                      SharedMemtable* shared_memtable) {
322
18
    DCHECK(flush_writer != nullptr);
323
18
    DCHECK(memtable != nullptr);
324
325
18
    signal::set_signal_task_id(flush_writer->load_id());
326
18
    signal::tablet_id = memtable->tablet_id();
327
    // Count the task as running before registering the deferred cleanup so
328
    // cancel/shutdown paths keep flush_running_count symmetric on every exit.
329
18
    _stats.flush_running_count++;
330
18
    Defer defer {[&]() {
331
18
        std::lock_guard<std::mutex> lock(_mutex);
332
18
        _stats.flush_submit_count--;
333
18
        if (_stats.flush_submit_count == 0) {
334
15
            _submit_task_finish_cond.notify_one();
335
15
        }
336
18
        _stats.flush_running_count--;
337
18
        if (_stats.flush_running_count == 0) {
338
15
            _running_task_finish_cond.notify_one();
339
15
        }
340
18
    }};
341
18
    DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_before_first_shutdown",
342
18
                    { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
343
18
    if (_is_shutdown()) {
344
0
        return;
345
0
    }
346
18
    DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_after_first_shutdown",
347
18
                    { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
348
    // double check if shutdown to avoid wait running task finish count not accurate
349
18
    if (_is_shutdown()) {
350
0
        return;
351
0
    }
352
18
    DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_after_second_shutdown",
353
18
                    { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
354
18
    uint64_t flush_wait_time_ns = MonotonicNanos() - submit_task_time;
355
18
    _stats.flush_wait_time_ns += flush_wait_time_ns;
356
    // If previous flush has failed, return directly
357
18
    {
358
18
        std::shared_lock rdlk(_flush_status_lock);
359
18
        if (!_flush_status.ok()) {
360
0
            return;
361
0
        }
362
18
    }
363
364
18
    MonotonicStopWatch timer;
365
18
    timer.start();
366
18
    size_t memory_usage = memtable->memory_usage();
367
368
18
    int64_t flush_size = 0;
369
18
    Status s;
370
18
    memtable->update_mem_type(MemType::FLUSH);
371
18
    int64_t duration_ns = 0;
372
18
    {
373
18
        s = [&]() {
374
18
            SCOPED_RAW_TIMER(&duration_ns);
375
18
            SCOPED_ATTACH_TASK(memtable->resource_ctx());
376
18
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
377
18
                    memtable->resource_ctx()->memory_context()->mem_tracker()->write_tracker());
378
18
            SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker());
379
380
            // DEFER_RELEASE_RESERVED();
381
382
            // auto reserve_size = memtable->get_flush_reserve_memory_size();
383
            // if (memtable->resource_ctx()->task_controller()->is_enable_reserve_memory() &&
384
            //     reserve_size > 0) {
385
            //     RETURN_IF_ERROR(_try_reserve_memory(memtable->resource_ctx(), reserve_size));
386
            // }
387
388
            // Defer defer {[&]() {
389
            //     ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->dec_flushing_task();
390
            // }};
391
18
            std::shared_ptr<Block> flush_block;
392
18
            RETURN_IF_ERROR(_memtable2block(memtable, shared_memtable, flush_block));
393
18
            if (flush_writer->context().write_binlog_opt().enable) {
394
3
                const auto& memtable_lsns = memtable->row_binlog_lsns();
395
3
                CHECK_EQ(memtable_lsns.size(), flush_block->rows());
396
3
                CHECK(!memtable_lsns.empty());
397
3
                auto lsn_ids = std::make_shared<std::vector<int64_t>>(memtable_lsns.begin(),
398
3
                                                                      memtable_lsns.end());
399
3
                const_cast<RowsetWriterContext&>(flush_writer->context())
400
3
                        .write_binlog_opt()
401
3
                        .write_binlog_config()
402
3
                        .insert_seg_lsn(segment_id, std::move(lsn_ids));
403
3
            }
404
18
            RETURN_IF_ERROR(
405
18
                    flush_writer->flush_memtable(flush_block.get(), segment_id, &flush_size));
406
16
            memtable->set_flush_success();
407
408
16
            return Status::OK();
409
18
        }();
410
411
18
        if (s.ok()) {
412
16
            bool record_memtable_stat = shared_memtable == nullptr;
413
16
            if (shared_memtable != nullptr) {
414
4
                auto finished_sub_task_count = shared_memtable->add_finished_sub_task() + 1;
415
4
                record_memtable_stat =
416
4
                        finished_sub_task_count == shared_memtable->total_sub_task_count.load();
417
4
            }
418
16
            if (record_memtable_stat) {
419
13
                _memtable_stat += memtable->stat();
420
13
            }
421
16
            DorisMetrics::instance()->memtable_flush_total->increment(1);
422
16
            DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000);
423
16
        }
424
18
    }
425
426
18
    {
427
18
        std::shared_lock rdlk(_flush_status_lock);
428
18
        if (!_flush_status.ok()) {
429
0
            return;
430
0
        }
431
18
    }
432
18
    if (!s.ok()) {
433
2
        std::lock_guard wrlk(_flush_status_lock);
434
2
        if (_flush_status.ok()) {
435
2
            LOG(WARNING) << "Flush memtable failed with res = " << s
436
2
                         << ", load_id: " << print_id(flush_writer->load_id());
437
2
            _flush_status = s;
438
2
        }
439
2
        _shutdown_flush_token();
440
2
        return;
441
2
    }
442
443
16
    VLOG_CRITICAL << "flush memtable wait time: "
444
3
                  << PrettyPrinter::print(flush_wait_time_ns, TUnit::TIME_NS)
445
3
                  << ", flush memtable cost: "
446
3
                  << PrettyPrinter::print(timer.elapsed_time(), TUnit::TIME_NS)
447
3
                  << ", submit count: " << _stats.flush_submit_count
448
3
                  << ", running count: " << _stats.flush_running_count
449
3
                  << ", finish count: " << _stats.flush_finish_count
450
3
                  << ", mem size: " << PrettyPrinter::print_bytes(memory_usage)
451
3
                  << ", disk size: " << PrettyPrinter::print_bytes(flush_size);
452
16
    _stats.flush_time_ns += timer.elapsed_time();
453
16
    _stats.flush_finish_count++;
454
16
    _stats.flush_size_bytes += memtable->memory_usage();
455
16
    _stats.flush_disk_size_bytes += flush_size;
456
16
}
457
458
void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t segment_id,
459
12
                                 int64_t submit_task_time) {
460
12
    _flush_memtable_impl(_rowset_writer.get(), memtable_ptr.get(), segment_id, submit_task_time);
461
12
}
462
463
std::pair<int, int> MemTableFlushExecutor::calc_flush_thread_count(int num_cpus, int num_disk,
464
115
                                                                   int thread_num_per_store) {
465
115
    if (config::enable_adaptive_flush_threads && num_cpus > 0) {
466
99
        int min = std::max(1, (int)(num_cpus * config::min_flush_thread_num_per_cpu));
467
99
        int max = std::max(min, num_cpus * config::max_flush_thread_num_per_cpu);
468
99
        return {min, max};
469
99
    }
470
16
    int min = std::max(1, thread_num_per_store);
471
16
    int max = num_cpus == 0
472
16
                      ? num_disk * min
473
16
                      : std::min(num_disk * min, num_cpus * config::max_flush_thread_num_per_cpu);
474
16
    return {min, max};
475
115
}
476
477
44
void MemTableFlushExecutor::init(int num_disk) {
478
44
    _num_disk = std::max(1, num_disk);
479
44
    int num_cpus = std::thread::hardware_concurrency();
480
481
44
    auto [min_threads, max_threads] =
482
44
            calc_flush_thread_count(num_cpus, _num_disk, config::flush_thread_num_per_store);
483
44
    static_cast<void>(ThreadPoolBuilder("MemTableFlushThreadPool")
484
44
                              .set_min_threads(min_threads)
485
44
                              .set_max_threads(max_threads)
486
44
                              .build(&_flush_pool));
487
488
44
    auto [hi_min, hi_max] = calc_flush_thread_count(
489
44
            num_cpus, _num_disk, config::high_priority_flush_thread_num_per_store);
490
44
    static_cast<void>(ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool")
491
44
                              .set_min_threads(hi_min)
492
44
                              .set_max_threads(hi_max)
493
44
                              .build(&_high_prio_flush_pool));
494
44
}
495
496
11
void MemTableFlushExecutor::update_memtable_flush_threads() {
497
11
    int num_cpus = std::thread::hardware_concurrency();
498
499
11
    auto [min_threads, max_threads] =
500
11
            calc_flush_thread_count(num_cpus, _num_disk, config::flush_thread_num_per_store);
501
    // Update max_threads first to avoid constraint violation when increasing min_threads
502
11
    static_cast<void>(_flush_pool->set_max_threads(max_threads));
503
11
    static_cast<void>(_flush_pool->set_min_threads(min_threads));
504
505
11
    auto [hi_min, hi_max] = calc_flush_thread_count(
506
11
            num_cpus, _num_disk, config::high_priority_flush_thread_num_per_store);
507
    // Update max_threads first to avoid constraint violation when increasing min_threads
508
11
    static_cast<void>(_high_prio_flush_pool->set_max_threads(hi_max));
509
11
    static_cast<void>(_high_prio_flush_pool->set_min_threads(hi_min));
510
11
}
511
512
// NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order.
513
Status MemTableFlushExecutor::create_flush_token(
514
        std::shared_ptr<FlushToken>& flush_token, std::shared_ptr<RowsetWriter> rowset_writer,
515
        bool is_high_priority, std::shared_ptr<WorkloadGroup> wg_sptr,
516
18
        std::shared_ptr<OlapTableSchemaParam> table_schema_param) {
517
18
    switch (rowset_writer->type()) {
518
0
    case ALPHA_ROWSET:
519
        // alpha rowset do not support flush in CONCURRENT.  and not support alpha rowset now.
520
0
        return Status::InternalError<false>("not support alpha rowset load now.");
521
18
    case BETA_ROWSET: {
522
        // beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer.
523
18
        ThreadPool* pool = is_high_priority ? _high_prio_flush_pool.get() : _flush_pool.get();
524
18
        flush_token = FlushToken::create_shared(pool, wg_sptr);
525
18
        flush_token->set_rowset_writer(rowset_writer);
526
18
        flush_token->set_table_schema_param(std::move(table_schema_param));
527
18
        return Status::OK();
528
0
    }
529
0
    default:
530
0
        return Status::InternalError<false>("unknown rowset type.");
531
18
    }
532
18
}
533
534
} // namespace doris