Coverage Report

Created: 2026-07-02 14:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/memtable/memtable_flush_executor.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/memtable/memtable_flush_executor.h"
19
20
#include <gen_cpp/olap_file.pb.h>
21
22
#include <algorithm>
23
#include <cstddef>
24
#include <ostream>
25
26
#include "common/config.h"
27
#include "common/logging.h"
28
#include "common/metrics/doris_metrics.h"
29
#include "common/metrics/metrics.h"
30
#include "common/metrics/system_metrics.h"
31
#include "common/signal_handler.h"
32
#include "exec/sink/autoinc_buffer.h"
33
#include "load/memtable/memtable.h"
34
#include "runtime/thread_context.h"
35
#include "storage/binlog.h"
36
#include "storage/rowset/group_rowset_writer.h"
37
#include "storage/rowset/rowset_writer.h"
38
#include "storage/storage_engine.h"
39
#include "storage/tablet_info.h"
40
#include "util/debug_points.h"
41
#include "util/pretty_printer.h"
42
#include "util/stopwatch.hpp"
43
#include "util/time.h"
44
45
namespace doris {
46
using namespace ErrorCode;
47
48
bvar::Adder<int64_t> g_flush_task_num("memtable_flush_task_num");
49
50
class MemtableFlushTask : public Runnable {
51
    ENABLE_FACTORY_CREATOR(MemtableFlushTask);
52
53
public:
54
    MemtableFlushTask(std::shared_ptr<FlushToken> flush_token, std::shared_ptr<MemTable> memtable,
55
                      int32_t segment_id, int64_t submit_task_time)
56
36
            : _flush_token(flush_token),
57
36
              _memtable(memtable),
58
36
              _segment_id(segment_id),
59
36
              _submit_task_time(submit_task_time) {
60
36
        g_flush_task_num << 1;
61
36
    }
62
63
36
    ~MemtableFlushTask() override { g_flush_task_num << -1; }
64
65
24
    void run() override {
66
24
        auto token = _flush_token.lock();
67
24
        if (token) {
68
24
            token->_flush_memtable(_memtable, _segment_id, _submit_task_time);
69
24
        } else {
70
0
            LOG(WARNING) << "flush token is deconstructed, ignore the flush task";
71
0
        }
72
24
    }
73
74
protected:
75
    std::weak_ptr<FlushToken> _flush_token;
76
    std::shared_ptr<MemTable> _memtable;
77
    int32_t _segment_id;
78
    int64_t _submit_task_time;
79
};
80
81
class PartOfGroupMemtableFlushTask final : public MemtableFlushTask {
82
    ENABLE_FACTORY_CREATOR(PartOfGroupMemtableFlushTask);
83
84
public:
85
    PartOfGroupMemtableFlushTask(std::shared_ptr<FlushToken> flush_token,
86
                                 std::shared_ptr<SharedMemtable> shared_memtable,
87
                                 WriteRequestType write_req_type, int64_t submit_task_time)
88
12
            : MemtableFlushTask(flush_token, nullptr, 0, submit_task_time),
89
12
              _shared_memtable(std::move(shared_memtable)),
90
12
              _write_req_type(write_req_type) {}
91
92
12
    void run() override {
93
12
        auto token = _flush_token.lock();
94
12
        if (token) {
95
12
            token->_flush_group_memtable(_shared_memtable, _write_req_type, _submit_task_time);
96
12
        } else {
97
0
            LOG(WARNING) << "flush token is deconstructed, ignore the flush task";
98
0
        }
99
12
    }
100
101
private:
102
    std::shared_ptr<SharedMemtable> _shared_memtable;
103
    WriteRequestType _write_req_type;
104
};
105
106
0
std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
107
0
    os << "(flush time(ms)=" << stat.flush_time_ns / NANOS_PER_MILLIS
108
0
       << ", flush wait time(ms)=" << stat.flush_wait_time_ns / NANOS_PER_MILLIS
109
0
       << ", flush submit count=" << stat.flush_submit_count
110
0
       << ", running flush count=" << stat.flush_running_count
111
0
       << ", finish flush count=" << stat.flush_finish_count
112
0
       << ", flush bytes: " << stat.flush_size_bytes
113
0
       << ", flush disk bytes: " << stat.flush_disk_size_bytes << ")";
114
0
    return os;
115
0
}
116
117
6
SharedMemtable::~SharedMemtable() {
118
6
    if (block == nullptr) {
119
0
        return;
120
0
    }
121
6
    DCHECK(memtable != nullptr);
122
6
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
123
6
            memtable->resource_ctx()->memory_context()->mem_tracker()->write_tracker());
124
6
    SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker());
125
6
    block.reset();
126
6
}
127
128
Status FlushToken::_submit_sub_tasks(ThreadPool* pool,
129
30
                                     std::vector<std::shared_ptr<Runnable>> sub_tasks) {
130
66
    for (int i = 0; i < sub_tasks.size(); ++i) {
131
36
        {
132
36
            std::shared_lock rdlk(_flush_status_lock);
133
36
            DBUG_EXECUTE_IF("FlushToken.submit_sub_task_error", {
134
36
                if (i != 0) {
135
                    // only affect flush binlog task
136
36
                    _flush_status = Status::IOError<false>("dbug_be_memtable_submit_flush_error");
137
36
                }
138
36
            });
139
36
            if (!_flush_status.ok()) {
140
0
                return _flush_status;
141
0
            }
142
36
        }
143
36
        Status submit_st = pool->submit(std::move(sub_tasks[i]));
144
36
        if (UNLIKELY(!submit_st.ok())) {
145
0
            {
146
0
                std::lock_guard wrlk(_flush_status_lock);
147
0
                if (_flush_status.ok()) {
148
0
                    _flush_status = submit_st;
149
0
                }
150
0
            }
151
0
            _shutdown_flush_token();
152
0
            return submit_st;
153
0
        }
154
36
        _stats.flush_submit_count++;
155
36
    }
156
30
    return Status::OK();
157
30
}
158
159
36
Status FlushToken::submit(std::shared_ptr<MemTable> mem_table) {
160
36
    {
161
36
        std::shared_lock rdlk(_flush_status_lock);
162
36
        DBUG_EXECUTE_IF("FlushToken.submit_flush_error", {
163
36
            _flush_status = Status::IOError<false>("dbug_be_memtable_submit_flush_error");
164
36
        });
165
36
        if (!_flush_status.ok()) {
166
0
            return _flush_status;
167
0
        }
168
36
    }
169
170
36
    if (mem_table == nullptr || mem_table->empty()) {
171
6
        return Status::OK();
172
6
    }
173
30
    int64_t submit_task_time = MonotonicNanos();
174
30
    auto* group_rowset_writer = typeid_cast<GroupRowsetWriter*>(_rowset_writer.get());
175
30
    std::shared_ptr<SharedMemtable> shared_memtable;
176
30
    std::vector<std::shared_ptr<Runnable>> tasks;
177
30
    if (group_rowset_writer != nullptr) {
178
6
        auto data_writer = group_rowset_writer->data_writer();
179
6
        auto binlog_writer = group_rowset_writer->row_binlog_writer();
180
6
        DCHECK(data_writer != nullptr);
181
6
        DCHECK(binlog_writer != nullptr);
182
183
6
        shared_memtable = std::make_shared<SharedMemtable>();
184
6
        shared_memtable->memtable = mem_table;
185
        // Keep data/binlog segment_id allocators in sync.
186
6
        auto segment_id = data_writer->allocate_segment_id();
187
6
        auto binlog_segment_id = binlog_writer->allocate_segment_id();
188
6
        DCHECK_EQ(segment_id, binlog_segment_id);
189
6
        shared_memtable->segment_id = segment_id;
190
191
6
        if (binlog_writer->context().write_binlog_opt().enable) {
192
6
            if (_row_binlog_lsn_buffer == nullptr) {
193
0
                std::unique_lock<std::mutex> lock(_mutex);
194
0
                if (_row_binlog_lsn_buffer == nullptr) {
195
0
                    if (_table_schema_param == nullptr) {
196
0
                        return Status::InternalError<false>(
197
0
                                "need binlog but table_schema_param is null");
198
0
                    }
199
0
                    _row_binlog_lsn_buffer =
200
0
                            GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer(
201
0
                                    _table_schema_param->db_id(), _table_schema_param->table_id(),
202
0
                                    kBinlogLsnAutoIncId);
203
0
                }
204
0
            }
205
6
            std::shared_ptr<std::vector<int64_t>> lsn;
206
6
            RETURN_IF_ERROR(
207
6
                    allocate_binlog_lsn(_row_binlog_lsn_buffer, mem_table->raw_rows(), &lsn));
208
6
            DCHECK(lsn != nullptr && !lsn->empty());
209
6
            const_cast<RowsetWriterContext&>(binlog_writer->context())
210
6
                    .write_binlog_opt()
211
6
                    .write_binlog_config()
212
6
                    .insert_seg_lsn(shared_memtable->segment_id, lsn);
213
6
        }
214
215
6
        tasks.emplace_back(PartOfGroupMemtableFlushTask::create_shared(
216
6
                shared_from_this(), shared_memtable, WriteRequestType::DATA, submit_task_time));
217
6
        tasks.emplace_back(PartOfGroupMemtableFlushTask::create_shared(
218
6
                shared_from_this(), shared_memtable, WriteRequestType::ROW_BINLOG,
219
6
                submit_task_time));
220
24
    } else {
221
24
        tasks.emplace_back(MemtableFlushTask::create_shared(shared_from_this(), mem_table,
222
24
                                                            _rowset_writer->allocate_segment_id(),
223
24
                                                            submit_task_time));
224
24
    }
225
    // NOTE: we should guarantee WorkloadGroup is not deconstructed when submit memtable flush task.
226
    // because currently WorkloadGroup's can only be destroyed when all queries in the group is finished,
227
    // but not consider whether load channel is finish.
228
30
    std::shared_ptr<WorkloadGroup> wg_sptr = _wg_wptr.lock();
229
30
    ThreadPool* wg_thread_pool = nullptr;
230
30
    if (wg_sptr) {
231
0
        wg_thread_pool = wg_sptr->get_memtable_flush_pool();
232
0
    }
233
30
    ThreadPool* pool = wg_thread_pool ? wg_thread_pool : _thread_pool;
234
235
30
    return _submit_sub_tasks(pool, std::move(tasks));
236
30
}
237
238
void FlushToken::_flush_group_memtable(std::shared_ptr<SharedMemtable> shared_memtable,
239
12
                                       WriteRequestType write_req_type, int64_t submit_task_time) {
240
12
    DCHECK(shared_memtable != nullptr);
241
12
    DCHECK(shared_memtable->memtable != nullptr);
242
12
    DCHECK(write_req_type == WriteRequestType::DATA ||
243
12
           write_req_type == WriteRequestType::ROW_BINLOG);
244
245
12
    auto* group_rowset_writer = typeid_cast<GroupRowsetWriter*>(_rowset_writer.get());
246
12
    DCHECK(group_rowset_writer != nullptr);
247
12
    auto flush_writer = write_req_type == WriteRequestType::DATA
248
12
                                ? group_rowset_writer->data_writer()
249
12
                                : group_rowset_writer->row_binlog_writer();
250
12
    DCHECK(flush_writer != nullptr);
251
12
    _flush_memtable_impl(flush_writer.get(), shared_memtable->memtable.get(),
252
12
                         shared_memtable->segment_id, submit_task_time, shared_memtable.get());
253
12
}
254
255
// NOTE: FlushToken's submit/cancel/wait run in one thread,
256
// so we don't need to make them mutually exclusive, std::atomic is enough.
257
36
void FlushToken::_wait_submit_task_finish() {
258
36
    std::unique_lock<std::mutex> lock(_mutex);
259
66
    _submit_task_finish_cond.wait(lock, [&]() { return _stats.flush_submit_count.load() == 0; });
260
36
}
261
262
64
void FlushToken::_wait_running_task_finish() {
263
64
    std::unique_lock<std::mutex> lock(_mutex);
264
64
    _running_task_finish_cond.wait(lock, [&]() { return _stats.flush_running_count.load() == 0; });
265
64
}
266
267
64
void FlushToken::cancel() {
268
64
    _shutdown_flush_token();
269
64
    _wait_running_task_finish();
270
64
}
271
272
36
Status FlushToken::wait() {
273
36
    _wait_submit_task_finish();
274
36
    {
275
36
        std::shared_lock rdlk(_flush_status_lock);
276
36
        if (!_flush_status.ok()) {
277
4
            return _flush_status;
278
4
        }
279
36
    }
280
32
    return Status::OK();
281
36
}
282
283
Status FlushToken::_try_reserve_memory(const std::shared_ptr<ResourceContext>& resource_context,
284
0
                                       int64_t size) {
285
0
    auto* thread_context = doris::thread_context();
286
0
    auto* memtable_flush_executor =
287
0
            ExecEnv::GetInstance()->storage_engine().memtable_flush_executor();
288
0
    Status st;
289
0
    int32_t max_waiting_time = config::memtable_wait_for_memory_sleep_time_s;
290
0
    do {
291
        // only try to reserve process memory
292
0
        st = thread_context->thread_mem_tracker_mgr->try_reserve(
293
0
                size, ThreadMemTrackerMgr::TryReserveChecker::CHECK_PROCESS);
294
0
        if (st.ok()) {
295
0
            memtable_flush_executor->inc_flushing_task();
296
0
            break;
297
0
        }
298
0
        if (_is_shutdown() || resource_context->task_controller()->is_cancelled()) {
299
0
            st = Status::Cancelled("flush memtable already cancelled");
300
0
            break;
301
0
        }
302
        // Make sure at least one memtable is flushing even reserve memory failed.
303
0
        if (memtable_flush_executor->check_and_inc_has_any_flushing_task()) {
304
            // If there are already any flushing task, Wait for some time and retry.
305
0
            LOG_EVERY_T(INFO, 60) << fmt::format(
306
0
                    "Failed to reserve memory {} for flush memtable, retry after 100ms",
307
0
                    PrettyPrinter::print_bytes(size));
308
0
            std::this_thread::sleep_for(std::chrono::seconds(1));
309
0
            max_waiting_time -= 1;
310
0
        } else {
311
0
            st = Status::OK();
312
0
            break;
313
0
        }
314
0
    } while (max_waiting_time > 0);
315
0
    return st;
316
0
}
317
318
Status FlushToken::_memtable2block(MemTable* memtable, SharedMemtable* shared_memtable,
319
36
                                   std::shared_ptr<Block>& flush_block) {
320
36
    DCHECK(memtable != nullptr);
321
322
36
    if (shared_memtable == nullptr) {
323
24
        std::unique_ptr<Block> block;
324
24
        RETURN_IF_ERROR(memtable->to_block(&block));
325
24
        flush_block.reset(block.release());
326
24
        return Status::OK();
327
24
    }
328
329
12
    std::call_once(shared_memtable->block_once, [&]() {
330
6
        std::unique_ptr<Block> block;
331
6
        shared_memtable->block_status = memtable->to_block(&block);
332
6
        if (shared_memtable->block_status.ok()) {
333
6
            shared_memtable->block.reset(block.release());
334
6
        }
335
6
    });
336
12
    if (!shared_memtable->block_status.ok()) {
337
0
        return shared_memtable->block_status;
338
0
    }
339
12
    flush_block = shared_memtable->block;
340
12
    DCHECK(flush_block != nullptr);
341
12
    return Status::OK();
342
12
}
343
344
void FlushToken::_flush_memtable_impl(RowsetWriter* flush_writer, MemTable* memtable,
345
                                      int32_t segment_id, int64_t submit_task_time,
346
36
                                      SharedMemtable* shared_memtable) {
347
36
    DCHECK(flush_writer != nullptr);
348
36
    DCHECK(memtable != nullptr);
349
350
36
    signal::set_signal_task_id(flush_writer->load_id());
351
36
    signal::tablet_id = memtable->tablet_id();
352
    // Count the task as running before registering the deferred cleanup so
353
    // cancel/shutdown paths keep flush_running_count symmetric on every exit.
354
36
    _stats.flush_running_count++;
355
36
    Defer defer {[&]() {
356
36
        std::lock_guard<std::mutex> lock(_mutex);
357
36
        _stats.flush_submit_count--;
358
36
        if (_stats.flush_submit_count == 0) {
359
30
            _submit_task_finish_cond.notify_one();
360
30
        }
361
36
        _stats.flush_running_count--;
362
36
        if (_stats.flush_running_count == 0) {
363
30
            _running_task_finish_cond.notify_one();
364
30
        }
365
36
    }};
366
36
    DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_before_first_shutdown",
367
36
                    { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
368
36
    if (_is_shutdown()) {
369
0
        return;
370
0
    }
371
36
    DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_after_first_shutdown",
372
36
                    { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
373
    // double check if shutdown to avoid wait running task finish count not accurate
374
36
    if (_is_shutdown()) {
375
0
        return;
376
0
    }
377
36
    DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_after_second_shutdown",
378
36
                    { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
379
36
    uint64_t flush_wait_time_ns = MonotonicNanos() - submit_task_time;
380
36
    _stats.flush_wait_time_ns += flush_wait_time_ns;
381
    // If previous flush has failed, return directly
382
36
    {
383
36
        std::shared_lock rdlk(_flush_status_lock);
384
36
        if (!_flush_status.ok()) {
385
0
            return;
386
0
        }
387
36
    }
388
389
36
    MonotonicStopWatch timer;
390
36
    timer.start();
391
36
    size_t memory_usage = memtable->memory_usage();
392
393
36
    int64_t flush_size = 0;
394
36
    Status s;
395
36
    memtable->update_mem_type(MemType::FLUSH);
396
36
    int64_t duration_ns = 0;
397
36
    {
398
36
        s = [&]() {
399
35
            SCOPED_RAW_TIMER(&duration_ns);
400
35
            SCOPED_ATTACH_TASK(memtable->resource_ctx());
401
35
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
402
35
                    memtable->resource_ctx()->memory_context()->mem_tracker()->write_tracker());
403
35
            SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker());
404
405
            // DEFER_RELEASE_RESERVED();
406
407
            // auto reserve_size = memtable->get_flush_reserve_memory_size();
408
            // if (memtable->resource_ctx()->task_controller()->is_enable_reserve_memory() &&
409
            //     reserve_size > 0) {
410
            //     RETURN_IF_ERROR(_try_reserve_memory(memtable->resource_ctx(), reserve_size));
411
            // }
412
413
            // Defer defer {[&]() {
414
            //     ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->dec_flushing_task();
415
            // }};
416
35
            std::shared_ptr<Block> flush_block;
417
35
            RETURN_IF_ERROR(_memtable2block(memtable, shared_memtable, flush_block));
418
35
            RETURN_IF_ERROR(
419
35
                    flush_writer->flush_memtable(flush_block.get(), segment_id, &flush_size));
420
31
            memtable->set_flush_success();
421
422
31
            return Status::OK();
423
35
        }();
424
425
36
        if (s.ok()) {
426
32
            bool record_memtable_stat = shared_memtable == nullptr;
427
32
            if (shared_memtable != nullptr) {
428
8
                auto finished_sub_task_count = shared_memtable->add_finished_sub_task() + 1;
429
8
                record_memtable_stat =
430
8
                        finished_sub_task_count == shared_memtable->total_sub_task_count.load();
431
8
            }
432
32
            if (record_memtable_stat) {
433
26
                _memtable_stat += memtable->stat();
434
26
            }
435
32
            DorisMetrics::instance()->memtable_flush_total->increment(1);
436
32
            DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000);
437
32
        }
438
36
    }
439
440
36
    {
441
36
        std::shared_lock rdlk(_flush_status_lock);
442
36
        if (!_flush_status.ok()) {
443
0
            return;
444
0
        }
445
36
    }
446
36
    if (!s.ok()) {
447
4
        std::lock_guard wrlk(_flush_status_lock);
448
4
        if (_flush_status.ok()) {
449
4
            LOG(WARNING) << "Flush memtable failed with res = " << s
450
4
                         << ", load_id: " << print_id(flush_writer->load_id());
451
4
            _flush_status = s;
452
4
        }
453
4
        _shutdown_flush_token();
454
4
        return;
455
4
    }
456
457
32
    VLOG_CRITICAL << "flush memtable wait time: "
458
6
                  << PrettyPrinter::print(flush_wait_time_ns, TUnit::TIME_NS)
459
6
                  << ", flush memtable cost: "
460
6
                  << PrettyPrinter::print(timer.elapsed_time(), TUnit::TIME_NS)
461
6
                  << ", submit count: " << _stats.flush_submit_count
462
6
                  << ", running count: " << _stats.flush_running_count
463
6
                  << ", finish count: " << _stats.flush_finish_count
464
6
                  << ", mem size: " << PrettyPrinter::print_bytes(memory_usage)
465
6
                  << ", disk size: " << PrettyPrinter::print_bytes(flush_size);
466
32
    _stats.flush_time_ns += timer.elapsed_time();
467
32
    _stats.flush_finish_count++;
468
32
    _stats.flush_size_bytes += memtable->memory_usage();
469
32
    _stats.flush_disk_size_bytes += flush_size;
470
32
}
471
472
void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t segment_id,
473
24
                                 int64_t submit_task_time) {
474
24
    _flush_memtable_impl(_rowset_writer.get(), memtable_ptr.get(), segment_id, submit_task_time);
475
24
}
476
477
std::pair<int, int> MemTableFlushExecutor::calc_flush_thread_count(int num_cpus, int num_disk,
478
230
                                                                   int thread_num_per_store) {
479
230
    if (config::enable_adaptive_flush_threads && num_cpus > 0) {
480
198
        int min = std::max(1, (int)(num_cpus * config::min_flush_thread_num_per_cpu));
481
198
        int max = std::max(min, num_cpus * config::max_flush_thread_num_per_cpu);
482
198
        return {min, max};
483
198
    }
484
32
    int min = std::max(1, thread_num_per_store);
485
32
    int max = num_cpus == 0
486
32
                      ? num_disk * min
487
32
                      : std::min(num_disk * min, num_cpus * config::max_flush_thread_num_per_cpu);
488
32
    return {min, max};
489
230
}
490
491
88
void MemTableFlushExecutor::init(int num_disk) {
492
88
    _num_disk = std::max(1, num_disk);
493
88
    int num_cpus = std::thread::hardware_concurrency();
494
495
88
    auto [min_threads, max_threads] =
496
88
            calc_flush_thread_count(num_cpus, _num_disk, config::flush_thread_num_per_store);
497
88
    static_cast<void>(ThreadPoolBuilder("MemTableFlushThreadPool")
498
88
                              .set_min_threads(min_threads)
499
88
                              .set_max_threads(max_threads)
500
88
                              .build(&_flush_pool));
501
502
88
    auto [hi_min, hi_max] = calc_flush_thread_count(
503
88
            num_cpus, _num_disk, config::high_priority_flush_thread_num_per_store);
504
88
    static_cast<void>(ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool")
505
88
                              .set_min_threads(hi_min)
506
88
                              .set_max_threads(hi_max)
507
88
                              .build(&_high_prio_flush_pool));
508
88
}
509
510
22
void MemTableFlushExecutor::update_memtable_flush_threads() {
511
22
    int num_cpus = std::thread::hardware_concurrency();
512
513
22
    auto [min_threads, max_threads] =
514
22
            calc_flush_thread_count(num_cpus, _num_disk, config::flush_thread_num_per_store);
515
    // Update max_threads first to avoid constraint violation when increasing min_threads
516
22
    static_cast<void>(_flush_pool->set_max_threads(max_threads));
517
22
    static_cast<void>(_flush_pool->set_min_threads(min_threads));
518
519
22
    auto [hi_min, hi_max] = calc_flush_thread_count(
520
22
            num_cpus, _num_disk, config::high_priority_flush_thread_num_per_store);
521
    // Update max_threads first to avoid constraint violation when increasing min_threads
522
22
    static_cast<void>(_high_prio_flush_pool->set_max_threads(hi_max));
523
22
    static_cast<void>(_high_prio_flush_pool->set_min_threads(hi_min));
524
22
}
525
526
// NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order.
527
Status MemTableFlushExecutor::create_flush_token(
528
        std::shared_ptr<FlushToken>& flush_token, std::shared_ptr<RowsetWriter> rowset_writer,
529
        bool is_high_priority, std::shared_ptr<WorkloadGroup> wg_sptr,
530
36
        std::shared_ptr<OlapTableSchemaParam> table_schema_param) {
531
36
    switch (rowset_writer->type()) {
532
0
    case ALPHA_ROWSET:
533
        // alpha rowset do not support flush in CONCURRENT.  and not support alpha rowset now.
534
0
        return Status::InternalError<false>("not support alpha rowset load now.");
535
36
    case BETA_ROWSET: {
536
        // beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer.
537
36
        ThreadPool* pool = is_high_priority ? _high_prio_flush_pool.get() : _flush_pool.get();
538
36
        flush_token = FlushToken::create_shared(pool, wg_sptr);
539
36
        flush_token->set_rowset_writer(rowset_writer);
540
36
        flush_token->set_table_schema_param(std::move(table_schema_param));
541
36
        return Status::OK();
542
0
    }
543
0
    default:
544
0
        return Status::InternalError<false>("unknown rowset type.");
545
36
    }
546
36
}
547
548
} // namespace doris