Coverage Report

Created: 2026-06-23 14:23

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/memtable/memtable_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/memtable/memtable_writer.h"
19
20
#include <fmt/format.h>
21
22
#include <filesystem>
23
#include <ostream>
24
#include <string>
25
#include <utility>
26
27
#include "common/compiler_util.h" // IWYU pragma: keep
28
#include "common/config.h"
29
#include "common/logging.h"
30
#include "common/status.h"
31
#include "core/block/block.h"
32
#include "io/fs/file_writer.h" // IWYU pragma: keep
33
#include "load/memtable/memtable.h"
34
#include "load/memtable/memtable_flush_executor.h"
35
#include "load/memtable/memtable_memory_limiter.h"
36
#include "runtime/exec_env.h"
37
#include "runtime/memory/mem_tracker.h"
38
#include "service/backend_options.h"
39
#include "storage/rowset/beta_rowset_writer.h"
40
#include "storage/rowset/group_rowset_writer.h"
41
#include "storage/rowset/rowset_writer.h"
42
#include "storage/schema_change/schema_change.h"
43
#include "storage/storage_engine.h"
44
#include "storage/tablet/tablet_schema.h"
45
#include "storage/tablet_info.h"
46
#include "util/mem_info.h"
47
#include "util/stopwatch.hpp"
48
49
namespace doris {
50
bvar::Adder<uint64_t> g_flush_cuz_rowscnt_oveflow("flush_cuz_rowscnt_oveflow");
51
52
using namespace ErrorCode;
53
54
266k
MemTableWriter::MemTableWriter(const WriteRequest& req) : _req(req) {}
55
56
266k
MemTableWriter::~MemTableWriter() {
57
266k
    if (!_is_init) {
58
212k
        return;
59
212k
    }
60
53.9k
    if (_flush_token != nullptr) {
61
        // cancel and wait all memtables in flush queue to be finished
62
53.9k
        _flush_token->cancel();
63
53.9k
    }
64
53.9k
    _mem_table.reset();
65
53.9k
}
66
67
Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer,
68
                            TabletSchemaSPtr tablet_schema,
69
                            std::shared_ptr<PartialUpdateInfo> partial_update_info,
70
53.8k
                            std::shared_ptr<WorkloadGroup> wg_sptr, bool unique_key_mow) {
71
53.8k
    _rowset_writer = rowset_writer;
72
53.8k
    _tablet_schema = tablet_schema;
73
53.8k
    _unique_key_mow = unique_key_mow;
74
53.8k
    _partial_update_info = partial_update_info;
75
53.8k
    _resource_ctx = thread_context()->resource_ctx();
76
53.8k
    _need_row_binlog_lsn = false;
77
53.9k
    if (_req.table_schema_param != nullptr) {
78
66.4k
        for (const auto* index_schema : _req.table_schema_param->indexes()) {
79
66.4k
            if (index_schema->index_id == _req.index_id) {
80
53.8k
                _need_row_binlog_lsn = index_schema->row_binlog_id > 0;
81
53.8k
                break;
82
53.8k
            }
83
66.4k
        }
84
53.9k
    }
85
53.8k
    if (_need_row_binlog_lsn) {
86
0
        const auto keys_type = _tablet_schema->keys_type();
87
0
        if (keys_type == KeysType::AGG_KEYS ||
88
0
            (keys_type == KeysType::UNIQUE_KEYS && !_unique_key_mow)) {
89
            // Row-binlog LSN sidecar does not support MemTable aggregation now. For AGG tables
90
            // and unique key merge-on-read tables, multiple input rows can be merged into one
91
            // output row in MemTable, so their output LSN semantics should be implemented
92
            // explicitly before enabling row-binlog LSNs on these table types.
93
0
            return Status::NotSupported(
94
0
                    "row binlog lsn does not support AGG table or unique key merge-on-read table");
95
0
        }
96
0
    }
97
98
53.8k
    _reset_mem_table();
99
100
    // create flush handler
101
    // by assigning segment_id to memtable before submiting to flush executor,
102
    // we can make sure same keys sort in the same order in all replicas.
103
53.8k
    RETURN_IF_ERROR(
104
53.8k
            ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->create_flush_token(
105
53.8k
                    _flush_token, _rowset_writer, _req.is_high_priority, wg_sptr,
106
53.8k
                    _req.table_schema_param));
107
108
53.8k
    _is_init = true;
109
53.8k
    return Status::OK();
110
53.8k
}
111
112
113k
Status MemTableWriter::write(const Block* block, const TabletAddRowsPayload& rows) {
113
113k
    if (UNLIKELY(rows.row_idxs.empty())) {
114
0
        DCHECK(rows.row_binlog_lsns.empty());
115
0
        return Status::OK();
116
0
    }
117
113k
    _lock_watch.start();
118
113k
    std::lock_guard<std::mutex> l(_lock);
119
113k
    _lock_watch.stop();
120
113k
    if (_is_cancelled) {
121
1
        return _cancel_status;
122
1
    }
123
113k
    if (!_is_init) {
124
0
        return Status::Error<NOT_INITIALIZED>("delta segment writer has not been initialized");
125
0
    }
126
113k
    if (_is_closed) {
127
0
        return Status::Error<ALREADY_CLOSED>("write block after closed tablet_id={}, load_id={}-{}",
128
0
                                             _req.tablet_id, _req.load_id.hi(), _req.load_id.lo());
129
0
    }
130
131
113k
    if (_need_row_binlog_lsn) {
132
0
        if (rows.row_binlog_lsns.empty()) {
133
0
            return Status::InternalError(
134
0
                    "row binlog lsn is missing for tablet_id={}, index_id={}, load_id={}-{}",
135
0
                    _req.tablet_id, _req.index_id, _req.load_id.hi(), _req.load_id.lo());
136
0
        }
137
0
        DCHECK(rows.row_binlog_lsns.size() == rows.row_idxs.size());
138
0
    }
139
140
    // Flush and reset memtable if it is raw rows great than int32_t.
141
113k
    int64_t raw_rows = _mem_table->raw_rows();
142
113k
    DBUG_EXECUTE_IF("MemTableWriter.too_many_raws",
143
113k
                    { raw_rows = std::numeric_limits<int32_t>::max(); });
144
113k
    if (raw_rows + rows.row_idxs.size() > std::numeric_limits<int32_t>::max()) {
145
0
        g_flush_cuz_rowscnt_oveflow << 1;
146
0
        RETURN_IF_ERROR(_flush_memtable());
147
0
    }
148
149
113k
    _total_received_rows += rows.row_idxs.size();
150
113k
    auto st = _mem_table->insert(block, rows);
151
152
    // Reset memtable immediately after insert failure to prevent potential flush operations.
153
    // This is a defensive measure because:
154
    // 1. When insert fails (e.g., memory allocation failure during add_rows),
155
    //    the memtable is in an inconsistent state and should not be flushed
156
    // 2. However, memory pressure might trigger a flush operation on this failed memtable
157
    // 3. By resetting here, we ensure the failed memtable won't be included in any subsequent flush,
158
    //    thus preventing potential crashes
159
113k
    DBUG_EXECUTE_IF("MemTableWriter.write.random_insert_error", {
160
113k
        if (rand() % 100 < (100 * dp->param("percent", 0.3))) {
161
113k
            st = Status::InternalError<false>("write memtable random failed for debug");
162
113k
        }
163
113k
    });
164
113k
    if (!st.ok()) [[unlikely]] {
165
0
        _reset_mem_table();
166
0
        return st;
167
0
    }
168
169
113k
    if (UNLIKELY(_mem_table->need_agg() && config::enable_shrink_memory)) {
170
0
        _mem_table->shrink_memtable_by_agg();
171
0
    }
172
113k
    if (UNLIKELY(_mem_table->need_flush())) {
173
0
        RETURN_IF_ERROR(_flush_memtable());
174
0
    }
175
176
113k
    return Status::OK();
177
113k
}
178
179
0
Status MemTableWriter::_flush_memtable() {
180
0
    auto s = _flush_memtable_async();
181
0
    _reset_mem_table();
182
0
    if (UNLIKELY(!s.ok())) {
183
0
        return s;
184
0
    }
185
0
    return Status::OK();
186
0
}
187
188
53.9k
Status MemTableWriter::_flush_memtable_async() {
189
53.9k
    DCHECK(_flush_token != nullptr);
190
53.9k
    std::shared_ptr<MemTable> memtable;
191
53.9k
    {
192
53.9k
        std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
193
53.9k
        memtable = _mem_table;
194
53.9k
        _mem_table = nullptr;
195
53.9k
        memtable->update_mem_type(MemType::WRITE_FINISHED);
196
53.9k
        _freezed_mem_tables.push_back(memtable);
197
53.9k
    }
198
53.9k
    return _flush_token->submit(memtable);
199
53.9k
}
200
201
0
Status MemTableWriter::flush_async() {
202
0
    std::lock_guard<std::mutex> l(_lock);
203
    // Three calling paths:
204
    // 1. call by local, from `VTabletWriterV2::_write_memtable`.
205
    // 2. call by remote, from `LoadChannelMgr::_get_load_channel`.
206
    // 3. call by daemon thread, from `handle_paused_queries` -> `flush_workload_group_memtables`.
207
0
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker());
208
0
    if (!_is_init || _is_closed) {
209
        // This writer is uninitialized or closed before flushing, do nothing.
210
        // We return OK instead of NOT_INITIALIZED or ALREADY_CLOSED.
211
        // Because this method maybe called when trying to reduce mem consumption,
212
        // and at that time, the writer may not be initialized yet and that is a normal case.
213
0
        return Status::OK();
214
0
    }
215
216
0
    if (_is_cancelled) {
217
0
        return _cancel_status;
218
0
    }
219
220
0
    VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: "
221
0
                << PrettyPrinter::print_bytes(_mem_table->memory_usage())
222
0
                << ", tablet: " << _req.tablet_id << ", load id: " << print_id(_req.load_id);
223
0
    auto s = _flush_memtable_async();
224
0
    _reset_mem_table();
225
0
    return s;
226
0
}
227
228
91
Status MemTableWriter::wait_flush() {
229
91
    {
230
91
        std::lock_guard<std::mutex> l(_lock);
231
91
        if (!_is_init || _is_closed) {
232
            // return OK instead of NOT_INITIALIZED or ALREADY_CLOSED for same reason
233
            // as described in flush_async()
234
91
            return Status::OK();
235
91
        }
236
0
        if (_is_cancelled) {
237
0
            return _cancel_status;
238
0
        }
239
0
    }
240
0
    SCOPED_RAW_TIMER(&_wait_flush_time_ns);
241
0
    RETURN_IF_ERROR(_flush_token->wait());
242
0
    return Status::OK();
243
0
}
244
245
53.7k
void MemTableWriter::_reset_mem_table() {
246
53.7k
    {
247
53.7k
        std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
248
53.7k
        _mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema, _req.slots, _req.tuple_desc,
249
53.7k
                                      _unique_key_mow, _partial_update_info.get(), _resource_ctx,
250
53.7k
                                      _need_row_binlog_lsn));
251
53.7k
    }
252
253
53.7k
    _segment_num++;
254
53.7k
}
255
256
53.9k
Status MemTableWriter::close() {
257
53.9k
    _lock_watch.start();
258
53.9k
    std::lock_guard<std::mutex> l(_lock);
259
53.9k
    _lock_watch.stop();
260
53.9k
    if (_is_cancelled) {
261
0
        return _cancel_status;
262
0
    }
263
53.9k
    if (!_is_init) {
264
0
        return Status::Error<NOT_INITIALIZED>("delta segment writer has not been initialized");
265
0
    }
266
53.9k
    if (_is_closed) {
267
0
        LOG(WARNING) << "close after closed tablet_id=" << _req.tablet_id
268
0
                     << " load_id=" << _req.load_id;
269
0
        return Status::OK();
270
0
    }
271
272
53.9k
    auto s = _flush_memtable_async();
273
53.9k
    {
274
53.9k
        std::lock_guard<std::mutex> lm(_mem_table_ptr_lock);
275
53.9k
        _mem_table.reset();
276
53.9k
    }
277
53.9k
    _is_closed = true;
278
53.9k
    if (UNLIKELY(!s.ok())) {
279
0
        return s;
280
53.9k
    } else {
281
53.9k
        return Status::OK();
282
53.9k
    }
283
53.9k
}
284
285
53.9k
Status MemTableWriter::_do_close_wait() {
286
53.9k
    SCOPED_RAW_TIMER(&_close_wait_time_ns);
287
53.9k
    std::lock_guard<std::mutex> l(_lock);
288
53.9k
    DCHECK(_is_init)
289
2
            << "delta writer is supposed be to initialized before close_wait() being called";
290
291
53.9k
    if (_is_cancelled) {
292
0
        return _cancel_status;
293
0
    }
294
295
53.9k
    Status st;
296
    // return error if previous flush failed
297
53.9k
    {
298
53.9k
        SCOPED_RAW_TIMER(&_wait_flush_time_ns);
299
53.9k
        st = _flush_token->wait();
300
53.9k
    }
301
53.9k
    if (UNLIKELY(!st.ok())) {
302
46
        LOG(WARNING) << "previous flush failed tablet " << _req.tablet_id;
303
46
        return st;
304
46
    }
305
306
53.9k
    if (_rowset_writer->num_rows() + _flush_token->memtable_stat().merged_rows !=
307
53.9k
        _total_received_rows) {
308
0
        LOG(WARNING) << "the rows number written doesn't match, rowset num rows written to file: "
309
0
                     << _rowset_writer->num_rows()
310
0
                     << ", merged_rows: " << _flush_token->memtable_stat().merged_rows
311
0
                     << ", total received rows: " << _total_received_rows;
312
0
        return Status::InternalError("rows number written by delta writer dosen't match");
313
0
    }
314
315
    // print slow log if wait more than 1s
316
53.9k
    if (_wait_flush_time_ns > 1000UL * 1000 * 1000) {
317
632
        LOG(INFO) << "close delta writer for tablet: " << _req.tablet_id
318
632
                  << ", load id: " << print_id(_req.load_id) << ", wait close for "
319
632
                  << _wait_flush_time_ns << "(ns), stats: " << _flush_token->get_stats();
320
632
    }
321
322
53.9k
    return Status::OK();
323
53.9k
}
324
325
37
void MemTableWriter::_update_profile(RuntimeProfile* profile) {
326
37
    if (!profile) {
327
0
        return;
328
0
    }
329
    // NOTE: MemTableWriter may be accessed when profile is out of scope, in MemTableMemoryLimiter.
330
    // To avoid accessing dangling pointers, we cannot make profile as a member of MemTableWriter.
331
37
    auto child =
332
37
            profile->create_child(fmt::format("MemTableWriter {}", _req.tablet_id), true, true);
333
37
    auto lock_timer = ADD_TIMER(child, "LockTime");
334
37
    auto sort_timer = ADD_TIMER(child, "MemTableSortTime");
335
37
    auto agg_timer = ADD_TIMER(child, "MemTableAggTime");
336
37
    auto memtable_duration_timer = ADD_TIMER(child, "MemTableDurationTime");
337
37
    auto segment_writer_timer = ADD_TIMER(child, "SegmentWriterTime");
338
37
    auto wait_flush_timer = ADD_TIMER(child, "MemTableWaitFlushTime");
339
37
    auto put_into_output_timer = ADD_TIMER(child, "MemTablePutIntoOutputTime");
340
37
    auto delete_bitmap_timer = ADD_TIMER(child, "DeleteBitmapTime");
341
37
    auto close_wait_timer = ADD_TIMER(child, "CloseWaitTime");
342
37
    auto sort_times = ADD_COUNTER(child, "MemTableSortTimes", TUnit::UNIT);
343
37
    auto agg_times = ADD_COUNTER(child, "MemTableAggTimes", TUnit::UNIT);
344
37
    auto segment_num = ADD_COUNTER(child, "SegmentNum", TUnit::UNIT);
345
37
    auto raw_rows_num = ADD_COUNTER(child, "RawRowNum", TUnit::UNIT);
346
37
    auto merged_rows_num = ADD_COUNTER(child, "MergedRowNum", TUnit::UNIT);
347
348
37
    COUNTER_UPDATE(lock_timer, _lock_watch.elapsed_time());
349
37
    COUNTER_SET(delete_bitmap_timer, _rowset_writer->delete_bitmap_ns());
350
37
    COUNTER_SET(segment_writer_timer, _rowset_writer->segment_writer_ns());
351
37
    COUNTER_SET(wait_flush_timer, _wait_flush_time_ns);
352
37
    COUNTER_SET(close_wait_timer, _close_wait_time_ns);
353
37
    COUNTER_SET(segment_num, _segment_num);
354
37
    const auto& memtable_stat = _flush_token->memtable_stat();
355
37
    COUNTER_SET(sort_timer, memtable_stat.sort_ns);
356
37
    COUNTER_SET(agg_timer, memtable_stat.agg_ns);
357
37
    COUNTER_SET(memtable_duration_timer, memtable_stat.duration_ns);
358
37
    COUNTER_SET(put_into_output_timer, memtable_stat.put_into_output_ns);
359
37
    COUNTER_SET(sort_times, memtable_stat.sort_times);
360
37
    COUNTER_SET(agg_times, memtable_stat.agg_times);
361
37
    COUNTER_SET(raw_rows_num, memtable_stat.raw_rows);
362
37
    COUNTER_SET(merged_rows_num, memtable_stat.merged_rows);
363
37
}
364
365
54.0k
Status MemTableWriter::cancel() {
366
54.0k
    return cancel_with_status(Status::Cancelled("already cancelled"));
367
54.0k
}
368
369
146k
Status MemTableWriter::cancel_with_status(const Status& st) {
370
146k
    std::lock_guard<std::mutex> l(_lock);
371
146k
    if (_is_cancelled) {
372
12
        return Status::OK();
373
12
    }
374
146k
    {
375
146k
        std::lock_guard<std::mutex> lm(_mem_table_ptr_lock);
376
146k
        _mem_table.reset();
377
146k
    }
378
146k
    if (_flush_token != nullptr) {
379
        // cancel and wait all memtables in flush queue to be finished
380
54.0k
        _flush_token->cancel();
381
54.0k
    }
382
146k
    _is_cancelled = true;
383
146k
    _cancel_status = st;
384
146k
    return Status::OK();
385
146k
}
386
387
53.8k
const FlushStatistic& MemTableWriter::get_flush_token_stats() {
388
53.8k
    return _flush_token->get_stats();
389
53.8k
}
390
391
113k
uint64_t MemTableWriter::flush_running_count() const {
392
113k
    return _flush_token == nullptr ? 0 : _flush_token->get_stats().flush_running_count.load();
393
113k
}
394
395
7.17M
int64_t MemTableWriter::mem_consumption(MemType mem) {
396
7.17M
    if (!_is_init) {
397
        // This method may be called before this writer is initialized.
398
        // So _flush_token may be null.
399
82
        return 0;
400
82
    }
401
7.17M
    int64_t mem_usage = 0;
402
7.17M
    {
403
7.17M
        std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
404
7.17M
        for (const auto& mem_table : _freezed_mem_tables) {
405
2.60M
            auto mem_table_sptr = mem_table.lock();
406
2.60M
            if (mem_table_sptr != nullptr && mem_table_sptr->get_mem_type() == mem) {
407
860k
                mem_usage += mem_table_sptr->memory_usage();
408
860k
            }
409
2.60M
        }
410
7.17M
    }
411
7.17M
    return mem_usage;
412
7.17M
}
413
414
3.58M
int64_t MemTableWriter::active_memtable_mem_consumption() {
415
3.58M
    std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
416
3.58M
    return _mem_table != nullptr ? _mem_table->memory_usage() : 0;
417
3.58M
}
418
419
} // namespace doris