Coverage Report

Created: 2026-05-21 12:43

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/memtable/memtable_flush_executor.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/memtable/memtable_flush_executor.h"
19
20
#include <gen_cpp/olap_file.pb.h>
21
22
#include <algorithm>
23
#include <cstddef>
24
#include <ostream>
25
26
#include "common/config.h"
27
#include "common/logging.h"
28
#include "common/metrics/doris_metrics.h"
29
#include "common/metrics/metrics.h"
30
#include "common/metrics/system_metrics.h"
31
#include "common/signal_handler.h"
32
#include "exec/sink/autoinc_buffer.h"
33
#include "load/memtable/memtable.h"
34
#include "runtime/thread_context.h"
35
#include "storage/binlog.h"
36
#include "storage/rowset/group_rowset_writer.h"
37
#include "storage/rowset/rowset_writer.h"
38
#include "storage/storage_engine.h"
39
#include "storage/tablet_info.h"
40
#include "util/debug_points.h"
41
#include "util/pretty_printer.h"
42
#include "util/stopwatch.hpp"
43
#include "util/time.h"
44
45
namespace doris {
46
using namespace ErrorCode;
47
48
bvar::Adder<int64_t> g_flush_task_num("memtable_flush_task_num");
49
50
class MemtableFlushTask : public Runnable {
51
    ENABLE_FACTORY_CREATOR(MemtableFlushTask);
52
53
public:
54
    MemtableFlushTask(std::shared_ptr<FlushToken> flush_token, std::shared_ptr<MemTable> memtable,
55
                      int32_t segment_id, int64_t submit_task_time)
56
18
            : _flush_token(flush_token),
57
18
              _memtable(memtable),
58
18
              _segment_id(segment_id),
59
18
              _submit_task_time(submit_task_time) {
60
18
        g_flush_task_num << 1;
61
18
    }
62
63
18
    ~MemtableFlushTask() override { g_flush_task_num << -1; }
64
65
12
    void run() override {
66
12
        auto token = _flush_token.lock();
67
12
        if (token) {
68
12
            token->_flush_memtable(_memtable, _segment_id, _submit_task_time);
69
12
        } else {
70
0
            LOG(WARNING) << "flush token is deconstructed, ignore the flush task";
71
0
        }
72
12
    }
73
74
protected:
75
    std::weak_ptr<FlushToken> _flush_token;
76
    std::shared_ptr<MemTable> _memtable;
77
    int32_t _segment_id;
78
    int64_t _submit_task_time;
79
};
80
81
class PartOfGroupMemtableFlushTask final : public MemtableFlushTask {
82
    ENABLE_FACTORY_CREATOR(PartOfGroupMemtableFlushTask);
83
84
public:
85
    PartOfGroupMemtableFlushTask(std::shared_ptr<FlushToken> flush_token,
86
                                 std::shared_ptr<SharedMemtable> shared_memtable,
87
                                 WriteRequestType write_req_type, int64_t submit_task_time)
88
6
            : MemtableFlushTask(flush_token, nullptr, 0, submit_task_time),
89
6
              _shared_memtable(std::move(shared_memtable)),
90
6
              _write_req_type(write_req_type) {}
91
92
6
    void run() override {
93
6
        auto token = _flush_token.lock();
94
6
        if (token) {
95
6
            token->_flush_group_memtable(_shared_memtable, _write_req_type, _submit_task_time);
96
6
        } else {
97
0
            LOG(WARNING) << "flush token is deconstructed, ignore the flush task";
98
0
        }
99
6
    }
100
101
private:
102
    std::shared_ptr<SharedMemtable> _shared_memtable;
103
    WriteRequestType _write_req_type;
104
};
105
106
0
std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
107
0
    os << "(flush time(ms)=" << stat.flush_time_ns / NANOS_PER_MILLIS
108
0
       << ", flush wait time(ms)=" << stat.flush_wait_time_ns / NANOS_PER_MILLIS
109
0
       << ", flush submit count=" << stat.flush_submit_count
110
0
       << ", running flush count=" << stat.flush_running_count
111
0
       << ", finish flush count=" << stat.flush_finish_count
112
0
       << ", flush bytes: " << stat.flush_size_bytes
113
0
       << ", flush disk bytes: " << stat.flush_disk_size_bytes << ")";
114
0
    return os;
115
0
}
116
117
3
SharedMemtable::~SharedMemtable() {
118
3
    if (block == nullptr) {
119
0
        return;
120
0
    }
121
3
    DCHECK(memtable != nullptr);
122
3
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
123
3
            memtable->resource_ctx()->memory_context()->mem_tracker()->write_tracker());
124
3
    SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker());
125
3
    block.reset();
126
3
}
127
128
Status FlushToken::_submit_sub_tasks(ThreadPool* pool,
129
15
                                     std::vector<std::shared_ptr<Runnable>> sub_tasks) {
130
33
    for (int i = 0; i < sub_tasks.size(); ++i) {
131
18
        {
132
18
            std::shared_lock rdlk(_flush_status_lock);
133
18
            DBUG_EXECUTE_IF("FlushToken.submit_sub_task_error", {
134
18
                if (i != 0) {
135
                    // only affect flush binlog task
136
18
                    _flush_status = Status::IOError<false>("dbug_be_memtable_submit_flush_error");
137
18
                }
138
18
            });
139
18
            if (!_flush_status.ok()) {
140
0
                return _flush_status;
141
0
            }
142
18
        }
143
18
        Status submit_st = pool->submit(std::move(sub_tasks[i]));
144
18
        if (UNLIKELY(!submit_st.ok())) {
145
0
            {
146
0
                std::lock_guard wrlk(_flush_status_lock);
147
0
                if (_flush_status.ok()) {
148
0
                    _flush_status = submit_st;
149
0
                }
150
0
            }
151
0
            _shutdown_flush_token();
152
0
            return submit_st;
153
0
        }
154
18
        _stats.flush_submit_count++;
155
18
    }
156
15
    return Status::OK();
157
15
}
158
159
18
Status FlushToken::submit(std::shared_ptr<MemTable> mem_table) {
160
18
    {
161
18
        std::shared_lock rdlk(_flush_status_lock);
162
18
        DBUG_EXECUTE_IF("FlushToken.submit_flush_error", {
163
18
            _flush_status = Status::IOError<false>("dbug_be_memtable_submit_flush_error");
164
18
        });
165
18
        if (!_flush_status.ok()) {
166
0
            return _flush_status;
167
0
        }
168
18
    }
169
170
18
    if (mem_table == nullptr || mem_table->empty()) {
171
3
        return Status::OK();
172
3
    }
173
15
    int64_t submit_task_time = MonotonicNanos();
174
15
    auto* group_rowset_writer = typeid_cast<GroupRowsetWriter*>(_rowset_writer.get());
175
15
    std::shared_ptr<SharedMemtable> shared_memtable;
176
15
    std::vector<std::shared_ptr<Runnable>> tasks;
177
15
    if (group_rowset_writer != nullptr) {
178
3
        auto data_writer = group_rowset_writer->data_writer();
179
3
        auto binlog_writer = group_rowset_writer->row_binlog_writer();
180
3
        DCHECK(data_writer != nullptr);
181
3
        DCHECK(binlog_writer != nullptr);
182
183
3
        shared_memtable = std::make_shared<SharedMemtable>();
184
3
        shared_memtable->memtable = mem_table;
185
        // Keep data/binlog segment_id allocators in sync.
186
3
        auto segment_id = data_writer->allocate_segment_id();
187
3
        auto binlog_segment_id = binlog_writer->allocate_segment_id();
188
3
        DCHECK_EQ(segment_id, binlog_segment_id);
189
3
        shared_memtable->segment_id = segment_id;
190
191
3
        if (binlog_writer->context().write_binlog_opt().enable) {
192
3
            if (_row_binlog_lsn_buffer == nullptr) {
193
0
                std::unique_lock<std::mutex> lock(_mutex);
194
0
                if (_row_binlog_lsn_buffer == nullptr) {
195
0
                    if (_table_schema_param == nullptr) {
196
0
                        return Status::InternalError<false>(
197
0
                                "need binlog but table_schema_param is null");
198
0
                    }
199
0
                    _row_binlog_lsn_buffer =
200
0
                            GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer(
201
0
                                    _table_schema_param->db_id(), _table_schema_param->table_id(),
202
0
                                    kBinlogLsnAutoIncId);
203
0
                }
204
0
            }
205
3
            std::shared_ptr<std::vector<int64_t>> lsn;
206
3
            RETURN_IF_ERROR(
207
3
                    allocate_binlog_lsn(_row_binlog_lsn_buffer, mem_table->raw_rows(), &lsn));
208
3
            DCHECK(lsn != nullptr && !lsn->empty());
209
3
            const_cast<RowsetWriterContext&>(binlog_writer->context())
210
3
                    .write_binlog_opt()
211
3
                    .write_binlog_config()
212
3
                    .insert_seg_lsn(shared_memtable->segment_id, lsn);
213
3
        }
214
215
3
        tasks.emplace_back(PartOfGroupMemtableFlushTask::create_shared(
216
3
                shared_from_this(), shared_memtable, WriteRequestType::DATA, submit_task_time));
217
3
        tasks.emplace_back(PartOfGroupMemtableFlushTask::create_shared(
218
3
                shared_from_this(), shared_memtable, WriteRequestType::ROW_BINLOG,
219
3
                submit_task_time));
220
12
    } else {
221
12
        tasks.emplace_back(MemtableFlushTask::create_shared(shared_from_this(), mem_table,
222
12
                                                            _rowset_writer->allocate_segment_id(),
223
12
                                                            submit_task_time));
224
12
    }
225
    // NOTE: we should guarantee WorkloadGroup is not deconstructed when submit memtable flush task.
226
    // because currently WorkloadGroup's can only be destroyed when all queries in the group is finished,
227
    // but not consider whether load channel is finish.
228
15
    std::shared_ptr<WorkloadGroup> wg_sptr = _wg_wptr.lock();
229
15
    ThreadPool* wg_thread_pool = nullptr;
230
15
    if (wg_sptr) {
231
0
        wg_thread_pool = wg_sptr->get_memtable_flush_pool();
232
0
    }
233
15
    ThreadPool* pool = wg_thread_pool ? wg_thread_pool : _thread_pool;
234
235
15
    return _submit_sub_tasks(pool, std::move(tasks));
236
15
}
237
238
void FlushToken::_flush_group_memtable(std::shared_ptr<SharedMemtable> shared_memtable,
239
6
                                       WriteRequestType write_req_type, int64_t submit_task_time) {
240
6
    DCHECK(shared_memtable != nullptr);
241
6
    DCHECK(shared_memtable->memtable != nullptr);
242
6
    DCHECK(write_req_type == WriteRequestType::DATA ||
243
6
           write_req_type == WriteRequestType::ROW_BINLOG);
244
245
6
    auto* group_rowset_writer = typeid_cast<GroupRowsetWriter*>(_rowset_writer.get());
246
6
    DCHECK(group_rowset_writer != nullptr);
247
6
    auto flush_writer = write_req_type == WriteRequestType::DATA
248
6
                                ? group_rowset_writer->data_writer()
249
6
                                : group_rowset_writer->row_binlog_writer();
250
6
    DCHECK(flush_writer != nullptr);
251
6
    _flush_memtable_impl(flush_writer.get(), shared_memtable->memtable.get(),
252
6
                         shared_memtable->segment_id, submit_task_time, shared_memtable.get());
253
6
}
254
255
// NOTE: FlushToken's submit/cancel/wait run in one thread,
256
// so we don't need to make them mutually exclusive, std::atomic is enough.
257
18
void FlushToken::_wait_submit_task_finish() {
258
18
    std::unique_lock<std::mutex> lock(_mutex);
259
33
    _submit_task_finish_cond.wait(lock, [&]() { return _stats.flush_submit_count.load() == 0; });
260
18
}
261
262
30
void FlushToken::_wait_running_task_finish() {
263
30
    std::unique_lock<std::mutex> lock(_mutex);
264
30
    _running_task_finish_cond.wait(lock, [&]() { return _stats.flush_running_count.load() == 0; });
265
30
}
266
267
30
void FlushToken::cancel() {
268
30
    _shutdown_flush_token();
269
30
    _wait_running_task_finish();
270
30
}
271
272
18
Status FlushToken::wait() {
273
18
    _wait_submit_task_finish();
274
18
    {
275
18
        std::shared_lock rdlk(_flush_status_lock);
276
18
        if (!_flush_status.ok()) {
277
2
            return _flush_status;
278
2
        }
279
18
    }
280
16
    return Status::OK();
281
18
}
282
283
Status FlushToken::_try_reserve_memory(const std::shared_ptr<ResourceContext>& resource_context,
284
0
                                       int64_t size) {
285
0
    auto* thread_context = doris::thread_context();
286
0
    auto* memtable_flush_executor =
287
0
            ExecEnv::GetInstance()->storage_engine().memtable_flush_executor();
288
0
    Status st;
289
0
    int32_t max_waiting_time = config::memtable_wait_for_memory_sleep_time_s;
290
0
    do {
291
        // only try to reserve process memory
292
0
        st = thread_context->thread_mem_tracker_mgr->try_reserve(
293
0
                size, ThreadMemTrackerMgr::TryReserveChecker::CHECK_PROCESS);
294
0
        if (st.ok()) {
295
0
            memtable_flush_executor->inc_flushing_task();
296
0
            break;
297
0
        }
298
0
        if (_is_shutdown() || resource_context->task_controller()->is_cancelled()) {
299
0
            st = Status::Cancelled("flush memtable already cancelled");
300
0
            break;
301
0
        }
302
        // Make sure at least one memtable is flushing even reserve memory failed.
303
0
        if (memtable_flush_executor->check_and_inc_has_any_flushing_task()) {
304
            // If there are already any flushing task, Wait for some time and retry.
305
0
            LOG_EVERY_T(INFO, 60) << fmt::format(
306
0
                    "Failed to reserve memory {} for flush memtable, retry after 100ms",
307
0
                    PrettyPrinter::print_bytes(size));
308
0
            std::this_thread::sleep_for(std::chrono::seconds(1));
309
0
            max_waiting_time -= 1;
310
0
        } else {
311
0
            st = Status::OK();
312
0
            break;
313
0
        }
314
0
    } while (max_waiting_time > 0);
315
0
    return st;
316
0
}
317
318
Status FlushToken::_memtable2block(MemTable* memtable, SharedMemtable* shared_memtable,
319
18
                                   std::shared_ptr<Block>& flush_block) {
320
18
    DCHECK(memtable != nullptr);
321
322
18
    if (shared_memtable == nullptr) {
323
12
        std::unique_ptr<Block> block;
324
12
        RETURN_IF_ERROR(memtable->to_block(&block));
325
12
        flush_block.reset(block.release());
326
12
        return Status::OK();
327
12
    }
328
329
6
    std::call_once(shared_memtable->block_once, [&]() {
330
3
        std::unique_ptr<Block> block;
331
3
        shared_memtable->block_status = memtable->to_block(&block);
332
3
        if (shared_memtable->block_status.ok()) {
333
3
            shared_memtable->block.reset(block.release());
334
3
        }
335
3
    });
336
6
    if (!shared_memtable->block_status.ok()) {
337
0
        return shared_memtable->block_status;
338
0
    }
339
6
    flush_block = shared_memtable->block;
340
6
    DCHECK(flush_block != nullptr);
341
6
    return Status::OK();
342
6
}
343
344
void FlushToken::_flush_memtable_impl(RowsetWriter* flush_writer, MemTable* memtable,
345
                                      int32_t segment_id, int64_t submit_task_time,
346
18
                                      SharedMemtable* shared_memtable) {
347
18
    DCHECK(flush_writer != nullptr);
348
18
    DCHECK(memtable != nullptr);
349
350
18
    signal::set_signal_task_id(flush_writer->load_id());
351
18
    signal::tablet_id = memtable->tablet_id();
352
    // Count the task as running before registering the deferred cleanup so
353
    // cancel/shutdown paths keep flush_running_count symmetric on every exit.
354
18
    _stats.flush_running_count++;
355
18
    Defer defer {[&]() {
356
18
        std::lock_guard<std::mutex> lock(_mutex);
357
18
        _stats.flush_submit_count--;
358
18
        if (_stats.flush_submit_count == 0) {
359
15
            _submit_task_finish_cond.notify_one();
360
15
        }
361
18
        _stats.flush_running_count--;
362
18
        if (_stats.flush_running_count == 0) {
363
15
            _running_task_finish_cond.notify_one();
364
15
        }
365
18
    }};
366
18
    DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_before_first_shutdown",
367
18
                    { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
368
18
    if (_is_shutdown()) {
369
0
        return;
370
0
    }
371
18
    DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_after_first_shutdown",
372
18
                    { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
373
    // double check if shutdown to avoid wait running task finish count not accurate
374
18
    if (_is_shutdown()) {
375
0
        return;
376
0
    }
377
18
    DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_after_second_shutdown",
378
18
                    { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
379
18
    uint64_t flush_wait_time_ns = MonotonicNanos() - submit_task_time;
380
18
    _stats.flush_wait_time_ns += flush_wait_time_ns;
381
    // If previous flush has failed, return directly
382
18
    {
383
18
        std::shared_lock rdlk(_flush_status_lock);
384
18
        if (!_flush_status.ok()) {
385
0
            return;
386
0
        }
387
18
    }
388
389
18
    MonotonicStopWatch timer;
390
18
    timer.start();
391
18
    size_t memory_usage = memtable->memory_usage();
392
393
18
    int64_t flush_size = 0;
394
18
    Status s;
395
18
    memtable->update_mem_type(MemType::FLUSH);
396
18
    int64_t duration_ns = 0;
397
18
    {
398
18
        s = [&]() {
399
18
            SCOPED_RAW_TIMER(&duration_ns);
400
18
            SCOPED_ATTACH_TASK(memtable->resource_ctx());
401
18
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
402
18
                    memtable->resource_ctx()->memory_context()->mem_tracker()->write_tracker());
403
18
            SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker());
404
405
            // DEFER_RELEASE_RESERVED();
406
407
            // auto reserve_size = memtable->get_flush_reserve_memory_size();
408
            // if (memtable->resource_ctx()->task_controller()->is_enable_reserve_memory() &&
409
            //     reserve_size > 0) {
410
            //     RETURN_IF_ERROR(_try_reserve_memory(memtable->resource_ctx(), reserve_size));
411
            // }
412
413
            // Defer defer {[&]() {
414
            //     ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->dec_flushing_task();
415
            // }};
416
18
            std::shared_ptr<Block> flush_block;
417
18
            RETURN_IF_ERROR(_memtable2block(memtable, shared_memtable, flush_block));
418
18
            RETURN_IF_ERROR(
419
18
                    flush_writer->flush_memtable(flush_block.get(), segment_id, &flush_size));
420
16
            memtable->set_flush_success();
421
422
16
            return Status::OK();
423
18
        }();
424
425
18
        if (s.ok()) {
426
16
            bool record_memtable_stat = shared_memtable == nullptr;
427
16
            if (shared_memtable != nullptr) {
428
4
                auto finished_sub_task_count = shared_memtable->add_finished_sub_task() + 1;
429
4
                record_memtable_stat =
430
4
                        finished_sub_task_count == shared_memtable->total_sub_task_count.load();
431
4
            }
432
16
            if (record_memtable_stat) {
433
13
                _memtable_stat += memtable->stat();
434
13
            }
435
16
            DorisMetrics::instance()->memtable_flush_total->increment(1);
436
16
            DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000);
437
16
        }
438
18
    }
439
440
18
    {
441
18
        std::shared_lock rdlk(_flush_status_lock);
442
18
        if (!_flush_status.ok()) {
443
0
            return;
444
0
        }
445
18
    }
446
18
    if (!s.ok()) {
447
2
        std::lock_guard wrlk(_flush_status_lock);
448
2
        if (_flush_status.ok()) {
449
2
            LOG(WARNING) << "Flush memtable failed with res = " << s
450
2
                         << ", load_id: " << print_id(flush_writer->load_id());
451
2
            _flush_status = s;
452
2
        }
453
2
        _shutdown_flush_token();
454
2
        return;
455
2
    }
456
457
16
    VLOG_CRITICAL << "flush memtable wait time: "
458
3
                  << PrettyPrinter::print(flush_wait_time_ns, TUnit::TIME_NS)
459
3
                  << ", flush memtable cost: "
460
3
                  << PrettyPrinter::print(timer.elapsed_time(), TUnit::TIME_NS)
461
3
                  << ", submit count: " << _stats.flush_submit_count
462
3
                  << ", running count: " << _stats.flush_running_count
463
3
                  << ", finish count: " << _stats.flush_finish_count
464
3
                  << ", mem size: " << PrettyPrinter::print_bytes(memory_usage)
465
3
                  << ", disk size: " << PrettyPrinter::print_bytes(flush_size);
466
16
    _stats.flush_time_ns += timer.elapsed_time();
467
16
    _stats.flush_finish_count++;
468
16
    _stats.flush_size_bytes += memtable->memory_usage();
469
16
    _stats.flush_disk_size_bytes += flush_size;
470
16
}
471
472
void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t segment_id,
473
12
                                 int64_t submit_task_time) {
474
12
    _flush_memtable_impl(_rowset_writer.get(), memtable_ptr.get(), segment_id, submit_task_time);
475
12
}
476
477
std::pair<int, int> MemTableFlushExecutor::calc_flush_thread_count(int num_cpus, int num_disk,
478
113
                                                                   int thread_num_per_store) {
479
113
    if (config::enable_adaptive_flush_threads && num_cpus > 0) {
480
97
        int min = std::max(1, (int)(num_cpus * config::min_flush_thread_num_per_cpu));
481
97
        int max = std::max(min, num_cpus * config::max_flush_thread_num_per_cpu);
482
97
        return {min, max};
483
97
    }
484
16
    int min = std::max(1, thread_num_per_store);
485
16
    int max = num_cpus == 0
486
16
                      ? num_disk * min
487
16
                      : std::min(num_disk * min, num_cpus * config::max_flush_thread_num_per_cpu);
488
16
    return {min, max};
489
113
}
490
491
43
void MemTableFlushExecutor::init(int num_disk) {
492
43
    _num_disk = std::max(1, num_disk);
493
43
    int num_cpus = std::thread::hardware_concurrency();
494
495
43
    auto [min_threads, max_threads] =
496
43
            calc_flush_thread_count(num_cpus, _num_disk, config::flush_thread_num_per_store);
497
43
    static_cast<void>(ThreadPoolBuilder("MemTableFlushThreadPool")
498
43
                              .set_min_threads(min_threads)
499
43
                              .set_max_threads(max_threads)
500
43
                              .build(&_flush_pool));
501
502
43
    auto [hi_min, hi_max] = calc_flush_thread_count(
503
43
            num_cpus, _num_disk, config::high_priority_flush_thread_num_per_store);
504
43
    static_cast<void>(ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool")
505
43
                              .set_min_threads(hi_min)
506
43
                              .set_max_threads(hi_max)
507
43
                              .build(&_high_prio_flush_pool));
508
43
}
509
510
11
void MemTableFlushExecutor::update_memtable_flush_threads() {
511
11
    int num_cpus = std::thread::hardware_concurrency();
512
513
11
    auto [min_threads, max_threads] =
514
11
            calc_flush_thread_count(num_cpus, _num_disk, config::flush_thread_num_per_store);
515
    // Update max_threads first to avoid constraint violation when increasing min_threads
516
11
    static_cast<void>(_flush_pool->set_max_threads(max_threads));
517
11
    static_cast<void>(_flush_pool->set_min_threads(min_threads));
518
519
11
    auto [hi_min, hi_max] = calc_flush_thread_count(
520
11
            num_cpus, _num_disk, config::high_priority_flush_thread_num_per_store);
521
    // Update max_threads first to avoid constraint violation when increasing min_threads
522
11
    static_cast<void>(_high_prio_flush_pool->set_max_threads(hi_max));
523
11
    static_cast<void>(_high_prio_flush_pool->set_min_threads(hi_min));
524
11
}
525
526
// NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order.
527
Status MemTableFlushExecutor::create_flush_token(
528
        std::shared_ptr<FlushToken>& flush_token, std::shared_ptr<RowsetWriter> rowset_writer,
529
        bool is_high_priority, std::shared_ptr<WorkloadGroup> wg_sptr,
530
17
        std::shared_ptr<OlapTableSchemaParam> table_schema_param) {
531
17
    switch (rowset_writer->type()) {
532
0
    case ALPHA_ROWSET:
533
        // alpha rowset do not support flush in CONCURRENT.  and not support alpha rowset now.
534
0
        return Status::InternalError<false>("not support alpha rowset load now.");
535
17
    case BETA_ROWSET: {
536
        // beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer.
537
17
        ThreadPool* pool = is_high_priority ? _high_prio_flush_pool.get() : _flush_pool.get();
538
17
        flush_token = FlushToken::create_shared(pool, wg_sptr);
539
17
        flush_token->set_rowset_writer(rowset_writer);
540
17
        flush_token->set_table_schema_param(std::move(table_schema_param));
541
17
        return Status::OK();
542
0
    }
543
0
    default:
544
0
        return Status::InternalError<false>("unknown rowset type.");
545
17
    }
546
17
}
547
548
} // namespace doris