Coverage Report

Created: 2026-03-13 03:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/memtable/memtable_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/memtable/memtable_writer.h"
19
20
#include <fmt/format.h>
21
22
#include <filesystem>
23
#include <ostream>
24
#include <string>
25
#include <utility>
26
27
#include "common/compiler_util.h" // IWYU pragma: keep
28
#include "common/config.h"
29
#include "common/logging.h"
30
#include "common/status.h"
31
#include "core/block/block.h"
32
#include "io/fs/file_writer.h" // IWYU pragma: keep
33
#include "load/memtable/memtable.h"
34
#include "load/memtable/memtable_flush_executor.h"
35
#include "load/memtable/memtable_memory_limiter.h"
36
#include "runtime/exec_env.h"
37
#include "runtime/memory/mem_tracker.h"
38
#include "service/backend_options.h"
39
#include "storage/rowset/beta_rowset_writer.h"
40
#include "storage/rowset/rowset_writer.h"
41
#include "storage/schema_change/schema_change.h"
42
#include "storage/storage_engine.h"
43
#include "storage/tablet/tablet_schema.h"
44
#include "storage/tablet_info.h"
45
#include "util/mem_info.h"
46
#include "util/stopwatch.hpp"
47
48
namespace doris {
49
bvar::Adder<uint64_t> g_flush_cuz_rowscnt_oveflow("flush_cuz_rowscnt_oveflow");
50
51
using namespace ErrorCode;
52
53
275k
MemTableWriter::MemTableWriter(const WriteRequest& req) : _req(req) {}
54
55
275k
MemTableWriter::~MemTableWriter() {
56
275k
    if (!_is_init) {
57
94.5k
        return;
58
94.5k
    }
59
181k
    if (_flush_token != nullptr) {
60
        // cancel and wait all memtables in flush queue to be finished
61
181k
        _flush_token->cancel();
62
181k
    }
63
181k
    _mem_table.reset();
64
181k
}
65
66
Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer,
67
                            TabletSchemaSPtr tablet_schema,
68
                            std::shared_ptr<PartialUpdateInfo> partial_update_info,
69
180k
                            std::shared_ptr<WorkloadGroup> wg_sptr, bool unique_key_mow) {
70
180k
    _rowset_writer = rowset_writer;
71
180k
    _tablet_schema = tablet_schema;
72
180k
    _unique_key_mow = unique_key_mow;
73
180k
    _partial_update_info = partial_update_info;
74
180k
    _resource_ctx = thread_context()->resource_ctx();
75
76
180k
    _reset_mem_table();
77
78
    // create flush handler
79
    // by assigning segment_id to memtable before submiting to flush executor,
80
    // we can make sure same keys sort in the same order in all replicas.
81
180k
    RETURN_IF_ERROR(
82
180k
            ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->create_flush_token(
83
180k
                    _flush_token, _rowset_writer, _req.is_high_priority, wg_sptr));
84
85
180k
    _is_init = true;
86
180k
    return Status::OK();
87
180k
}
88
89
114k
Status MemTableWriter::write(const Block* block, const DorisVector<uint32_t>& row_idxs) {
90
114k
    if (UNLIKELY(row_idxs.empty())) {
91
0
        return Status::OK();
92
0
    }
93
114k
    _lock_watch.start();
94
114k
    std::lock_guard<std::mutex> l(_lock);
95
114k
    _lock_watch.stop();
96
114k
    if (_is_cancelled) {
97
0
        return _cancel_status;
98
0
    }
99
114k
    if (!_is_init) {
100
0
        return Status::Error<NOT_INITIALIZED>("delta segment writer has not been initialized");
101
0
    }
102
114k
    if (_is_closed) {
103
0
        return Status::Error<ALREADY_CLOSED>("write block after closed tablet_id={}, load_id={}-{}",
104
0
                                             _req.tablet_id, _req.load_id.hi(), _req.load_id.lo());
105
0
    }
106
107
    // Flush and reset memtable if it is raw rows great than int32_t.
108
114k
    int64_t raw_rows = _mem_table->raw_rows();
109
114k
    DBUG_EXECUTE_IF("MemTableWriter.too_many_raws",
110
114k
                    { raw_rows = std::numeric_limits<int32_t>::max(); });
111
114k
    if (raw_rows + row_idxs.size() > std::numeric_limits<int32_t>::max()) {
112
0
        g_flush_cuz_rowscnt_oveflow << 1;
113
0
        RETURN_IF_ERROR(_flush_memtable());
114
0
    }
115
116
114k
    _total_received_rows += row_idxs.size();
117
114k
    auto st = _mem_table->insert(block, row_idxs);
118
119
    // Reset memtable immediately after insert failure to prevent potential flush operations.
120
    // This is a defensive measure because:
121
    // 1. When insert fails (e.g., memory allocation failure during add_rows),
122
    //    the memtable is in an inconsistent state and should not be flushed
123
    // 2. However, memory pressure might trigger a flush operation on this failed memtable
124
    // 3. By resetting here, we ensure the failed memtable won't be included in any subsequent flush,
125
    //    thus preventing potential crashes
126
114k
    DBUG_EXECUTE_IF("MemTableWriter.write.random_insert_error", {
127
114k
        if (rand() % 100 < (100 * dp->param("percent", 0.3))) {
128
114k
            st = Status::InternalError<false>("write memtable random failed for debug");
129
114k
        }
130
114k
    });
131
114k
    if (!st.ok()) [[unlikely]] {
132
0
        _reset_mem_table();
133
0
        return st;
134
0
    }
135
136
114k
    if (UNLIKELY(_mem_table->need_agg() && config::enable_shrink_memory)) {
137
0
        _mem_table->shrink_memtable_by_agg();
138
0
    }
139
114k
    if (UNLIKELY(_mem_table->need_flush())) {
140
0
        RETURN_IF_ERROR(_flush_memtable());
141
0
    }
142
143
114k
    return Status::OK();
144
114k
}
145
146
0
Status MemTableWriter::_flush_memtable() {
147
0
    auto s = _flush_memtable_async();
148
0
    _reset_mem_table();
149
0
    if (UNLIKELY(!s.ok())) {
150
0
        return s;
151
0
    }
152
0
    return Status::OK();
153
0
}
154
155
181k
Status MemTableWriter::_flush_memtable_async() {
156
181k
    DCHECK(_flush_token != nullptr);
157
181k
    std::shared_ptr<MemTable> memtable;
158
181k
    {
159
181k
        std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
160
181k
        memtable = _mem_table;
161
181k
        _mem_table = nullptr;
162
181k
        memtable->update_mem_type(MemType::WRITE_FINISHED);
163
181k
        _freezed_mem_tables.push_back(memtable);
164
181k
    }
165
181k
    return _flush_token->submit(memtable);
166
181k
}
167
168
0
Status MemTableWriter::flush_async() {
169
0
    std::lock_guard<std::mutex> l(_lock);
170
    // Three calling paths:
171
    // 1. call by local, from `VTabletWriterV2::_write_memtable`.
172
    // 2. call by remote, from `LoadChannelMgr::_get_load_channel`.
173
    // 3. call by daemon thread, from `handle_paused_queries` -> `flush_workload_group_memtables`.
174
0
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker());
175
0
    if (!_is_init || _is_closed) {
176
        // This writer is uninitialized or closed before flushing, do nothing.
177
        // We return OK instead of NOT_INITIALIZED or ALREADY_CLOSED.
178
        // Because this method maybe called when trying to reduce mem consumption,
179
        // and at that time, the writer may not be initialized yet and that is a normal case.
180
0
        return Status::OK();
181
0
    }
182
183
0
    if (_is_cancelled) {
184
0
        return _cancel_status;
185
0
    }
186
187
0
    VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: "
188
0
                << PrettyPrinter::print_bytes(_mem_table->memory_usage())
189
0
                << ", tablet: " << _req.tablet_id << ", load id: " << print_id(_req.load_id);
190
0
    auto s = _flush_memtable_async();
191
0
    _reset_mem_table();
192
0
    return s;
193
0
}
194
195
63
Status MemTableWriter::wait_flush() {
196
63
    {
197
63
        std::lock_guard<std::mutex> l(_lock);
198
63
        if (!_is_init || _is_closed) {
199
            // return OK instead of NOT_INITIALIZED or ALREADY_CLOSED for same reason
200
            // as described in flush_async()
201
63
            return Status::OK();
202
63
        }
203
0
        if (_is_cancelled) {
204
0
            return _cancel_status;
205
0
        }
206
0
    }
207
0
    SCOPED_RAW_TIMER(&_wait_flush_time_ns);
208
0
    RETURN_IF_ERROR(_flush_token->wait());
209
0
    return Status::OK();
210
0
}
211
212
180k
void MemTableWriter::_reset_mem_table() {
213
180k
    {
214
180k
        std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
215
180k
        _mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema, _req.slots, _req.tuple_desc,
216
180k
                                      _unique_key_mow, _partial_update_info.get(), _resource_ctx));
217
180k
    }
218
219
180k
    _segment_num++;
220
180k
}
221
222
181k
Status MemTableWriter::close() {
223
181k
    _lock_watch.start();
224
181k
    std::lock_guard<std::mutex> l(_lock);
225
181k
    _lock_watch.stop();
226
181k
    if (_is_cancelled) {
227
0
        return _cancel_status;
228
0
    }
229
181k
    if (!_is_init) {
230
0
        return Status::Error<NOT_INITIALIZED>("delta segment writer has not been initialized");
231
0
    }
232
181k
    if (_is_closed) {
233
0
        LOG(WARNING) << "close after closed tablet_id=" << _req.tablet_id
234
0
                     << " load_id=" << _req.load_id;
235
0
        return Status::OK();
236
0
    }
237
238
181k
    auto s = _flush_memtable_async();
239
181k
    {
240
181k
        std::lock_guard<std::mutex> lm(_mem_table_ptr_lock);
241
181k
        _mem_table.reset();
242
181k
    }
243
181k
    _is_closed = true;
244
181k
    if (UNLIKELY(!s.ok())) {
245
0
        return s;
246
181k
    } else {
247
181k
        return Status::OK();
248
181k
    }
249
181k
}
250
251
181k
Status MemTableWriter::_do_close_wait() {
252
181k
    SCOPED_RAW_TIMER(&_close_wait_time_ns);
253
181k
    std::lock_guard<std::mutex> l(_lock);
254
18.4E
    DCHECK(_is_init)
255
18.4E
            << "delta writer is supposed be to initialized before close_wait() being called";
256
257
181k
    if (_is_cancelled) {
258
0
        return _cancel_status;
259
0
    }
260
261
181k
    Status st;
262
    // return error if previous flush failed
263
181k
    {
264
181k
        SCOPED_RAW_TIMER(&_wait_flush_time_ns);
265
181k
        st = _flush_token->wait();
266
181k
    }
267
181k
    if (UNLIKELY(!st.ok())) {
268
48
        LOG(WARNING) << "previous flush failed tablet " << _req.tablet_id;
269
48
        return st;
270
48
    }
271
272
181k
    if (_rowset_writer->num_rows() + _flush_token->memtable_stat().merged_rows !=
273
181k
        _total_received_rows) {
274
0
        LOG(WARNING) << "the rows number written doesn't match, rowset num rows written to file: "
275
0
                     << _rowset_writer->num_rows()
276
0
                     << ", merged_rows: " << _flush_token->memtable_stat().merged_rows
277
0
                     << ", total received rows: " << _total_received_rows;
278
0
        return Status::InternalError("rows number written by delta writer dosen't match");
279
0
    }
280
281
    // print slow log if wait more than 1s
282
181k
    if (_wait_flush_time_ns > 1000UL * 1000 * 1000) {
283
1.21k
        LOG(INFO) << "close delta writer for tablet: " << _req.tablet_id
284
1.21k
                  << ", load id: " << print_id(_req.load_id) << ", wait close for "
285
1.21k
                  << _wait_flush_time_ns << "(ns), stats: " << _flush_token->get_stats();
286
1.21k
    }
287
288
181k
    return Status::OK();
289
181k
}
290
291
181k
void MemTableWriter::_update_profile(RuntimeProfile* profile) {
292
    // NOTE: MemTableWriter may be accessed when profile is out of scope, in MemTableMemoryLimiter.
293
    // To avoid accessing dangling pointers, we cannot make profile as a member of MemTableWriter.
294
181k
    auto child =
295
181k
            profile->create_child(fmt::format("MemTableWriter {}", _req.tablet_id), true, true);
296
181k
    auto lock_timer = ADD_TIMER(child, "LockTime");
297
181k
    auto sort_timer = ADD_TIMER(child, "MemTableSortTime");
298
181k
    auto agg_timer = ADD_TIMER(child, "MemTableAggTime");
299
181k
    auto memtable_duration_timer = ADD_TIMER(child, "MemTableDurationTime");
300
181k
    auto segment_writer_timer = ADD_TIMER(child, "SegmentWriterTime");
301
181k
    auto wait_flush_timer = ADD_TIMER(child, "MemTableWaitFlushTime");
302
181k
    auto put_into_output_timer = ADD_TIMER(child, "MemTablePutIntoOutputTime");
303
181k
    auto delete_bitmap_timer = ADD_TIMER(child, "DeleteBitmapTime");
304
181k
    auto close_wait_timer = ADD_TIMER(child, "CloseWaitTime");
305
181k
    auto sort_times = ADD_COUNTER(child, "MemTableSortTimes", TUnit::UNIT);
306
181k
    auto agg_times = ADD_COUNTER(child, "MemTableAggTimes", TUnit::UNIT);
307
181k
    auto segment_num = ADD_COUNTER(child, "SegmentNum", TUnit::UNIT);
308
181k
    auto raw_rows_num = ADD_COUNTER(child, "RawRowNum", TUnit::UNIT);
309
181k
    auto merged_rows_num = ADD_COUNTER(child, "MergedRowNum", TUnit::UNIT);
310
311
181k
    COUNTER_UPDATE(lock_timer, _lock_watch.elapsed_time());
312
181k
    COUNTER_SET(delete_bitmap_timer, _rowset_writer->delete_bitmap_ns());
313
181k
    COUNTER_SET(segment_writer_timer, _rowset_writer->segment_writer_ns());
314
181k
    COUNTER_SET(wait_flush_timer, _wait_flush_time_ns);
315
181k
    COUNTER_SET(close_wait_timer, _close_wait_time_ns);
316
181k
    COUNTER_SET(segment_num, _segment_num);
317
181k
    const auto& memtable_stat = _flush_token->memtable_stat();
318
181k
    COUNTER_SET(sort_timer, memtable_stat.sort_ns);
319
181k
    COUNTER_SET(agg_timer, memtable_stat.agg_ns);
320
181k
    COUNTER_SET(memtable_duration_timer, memtable_stat.duration_ns);
321
181k
    COUNTER_SET(put_into_output_timer, memtable_stat.put_into_output_ns);
322
181k
    COUNTER_SET(sort_times, memtable_stat.sort_times);
323
181k
    COUNTER_SET(agg_times, memtable_stat.agg_times);
324
181k
    COUNTER_SET(raw_rows_num, memtable_stat.raw_rows);
325
181k
    COUNTER_SET(merged_rows_num, memtable_stat.merged_rows);
326
181k
}
327
328
181k
Status MemTableWriter::cancel() {
329
181k
    return cancel_with_status(Status::Cancelled("already cancelled"));
330
181k
}
331
332
275k
Status MemTableWriter::cancel_with_status(const Status& st) {
333
275k
    std::lock_guard<std::mutex> l(_lock);
334
275k
    if (_is_cancelled) {
335
10
        return Status::OK();
336
10
    }
337
275k
    {
338
275k
        std::lock_guard<std::mutex> lm(_mem_table_ptr_lock);
339
275k
        _mem_table.reset();
340
275k
    }
341
275k
    if (_flush_token != nullptr) {
342
        // cancel and wait all memtables in flush queue to be finished
343
181k
        _flush_token->cancel();
344
181k
    }
345
275k
    _is_cancelled = true;
346
275k
    _cancel_status = st;
347
275k
    return Status::OK();
348
275k
}
349
350
181k
const FlushStatistic& MemTableWriter::get_flush_token_stats() {
351
181k
    return _flush_token->get_stats();
352
181k
}
353
354
114k
uint64_t MemTableWriter::flush_running_count() const {
355
114k
    return _flush_token == nullptr ? 0 : _flush_token->get_stats().flush_running_count.load();
356
114k
}
357
358
12.1M
int64_t MemTableWriter::mem_consumption(MemType mem) {
359
12.1M
    if (!_is_init) {
360
        // This method may be called before this writer is initialized.
361
        // So _flush_token may be null.
362
472
        return 0;
363
472
    }
364
12.1M
    int64_t mem_usage = 0;
365
12.1M
    {
366
12.1M
        std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
367
12.1M
        for (const auto& mem_table : _freezed_mem_tables) {
368
6.00M
            auto mem_table_sptr = mem_table.lock();
369
6.00M
            if (mem_table_sptr != nullptr && mem_table_sptr->get_mem_type() == mem) {
370
1.16M
                mem_usage += mem_table_sptr->memory_usage();
371
1.16M
            }
372
6.00M
        }
373
12.1M
    }
374
12.1M
    return mem_usage;
375
12.1M
}
376
377
6.07M
int64_t MemTableWriter::active_memtable_mem_consumption() {
378
6.07M
    std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
379
6.07M
    return _mem_table != nullptr ? _mem_table->memory_usage() : 0;
380
6.07M
}
381
382
} // namespace doris