Coverage Report

Created: 2025-04-29 12:50

/root/doris/be/src/olap/memtable_writer.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "olap/memtable_writer.h"
19
20
#include <fmt/format.h>
21
22
#include <ostream>
23
#include <string>
24
25
#include "common/compiler_util.h" // IWYU pragma: keep
26
#include "common/config.h"
27
#include "common/logging.h"
28
#include "common/status.h"
29
#include "exec/tablet_info.h"
30
#include "gutil/strings/numbers.h"
31
#include "io/fs/file_writer.h" // IWYU pragma: keep
32
#include "olap/memtable.h"
33
#include "olap/memtable_flush_executor.h"
34
#include "olap/memtable_memory_limiter.h"
35
#include "olap/rowset/beta_rowset_writer.h"
36
#include "olap/rowset/rowset_writer.h"
37
#include "olap/schema_change.h"
38
#include "olap/storage_engine.h"
39
#include "olap/tablet_schema.h"
40
#include "runtime/exec_env.h"
41
#include "util/mem_info.h"
42
#include "util/stopwatch.hpp"
43
#include "vec/core/block.h"
44
45
namespace doris {
46
using namespace ErrorCode;
47
48
17
MemTableWriter::MemTableWriter(const WriteRequest& req) : _req(req) {}
49
50
17
MemTableWriter::~MemTableWriter() {
51
17
    if (!_is_init) {
52
2
        return;
53
2
    }
54
15
    if (_flush_token != nullptr) {
55
        // cancel and wait all memtables in flush queue to be finished
56
15
        _flush_token->cancel();
57
15
    }
58
15
    _mem_table.reset();
59
15
}
60
61
Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer,
62
                            TabletSchemaSPtr tablet_schema,
63
                            std::shared_ptr<PartialUpdateInfo> partial_update_info,
64
15
                            ThreadPool* wg_flush_pool_ptr, bool unique_key_mow) {
65
15
    _rowset_writer = rowset_writer;
66
15
    _tablet_schema = tablet_schema;
67
15
    _unique_key_mow = unique_key_mow;
68
15
    _partial_update_info = partial_update_info;
69
15
    _query_thread_context.init_unlocked();
70
71
15
    _reset_mem_table();
72
73
    // create flush handler
74
    // by assigning segment_id to memtable before submiting to flush executor,
75
    // we can make sure same keys sort in the same order in all replicas.
76
15
    if (wg_flush_pool_ptr) {
77
0
        RETURN_IF_ERROR(
78
0
                ExecEnv::GetInstance()
79
0
                        ->storage_engine()
80
0
                        .memtable_flush_executor()
81
0
                        ->create_flush_token(_flush_token, _rowset_writer, wg_flush_pool_ptr));
82
15
    } else {
83
15
        RETURN_IF_ERROR(
84
15
                ExecEnv::GetInstance()
85
15
                        ->storage_engine()
86
15
                        .memtable_flush_executor()
87
15
                        ->create_flush_token(_flush_token, _rowset_writer, _req.is_high_priority));
88
15
    }
89
90
15
    _is_init = true;
91
15
    return Status::OK();
92
15
}
93
94
Status MemTableWriter::write(const vectorized::Block* block,
95
20
                             const std::vector<uint32_t>& row_idxs) {
96
20
    if (UNLIKELY(row_idxs.empty())) {
97
0
        return Status::OK();
98
0
    }
99
20
    _lock_watch.start();
100
20
    std::lock_guard<std::mutex> l(_lock);
101
20
    _lock_watch.stop();
102
20
    if (_is_cancelled) {
103
0
        return _cancel_status;
104
0
    }
105
20
    if (!_is_init) {
106
0
        return Status::Error<NOT_INITIALIZED>("delta segment writer has not been initialized");
107
0
    }
108
20
    if (_is_closed) {
109
0
        return Status::Error<ALREADY_CLOSED>("write block after closed tablet_id={}, load_id={}-{}",
110
0
                                             _req.tablet_id, _req.load_id.hi(), _req.load_id.lo());
111
0
    }
112
113
20
    _total_received_rows += row_idxs.size();
114
20
    auto st = _mem_table->insert(block, row_idxs);
115
116
    // Reset memtable immediately after insert failure to prevent potential flush operations.
117
    // This is a defensive measure because:
118
    // 1. When insert fails (e.g., memory allocation failure during add_rows),
119
    //    the memtable is in an inconsistent state and should not be flushed
120
    // 2. However, memory pressure might trigger a flush operation on this failed memtable
121
    // 3. By resetting here, we ensure the failed memtable won't be included in any subsequent flush,
122
    //    thus preventing potential crashes
123
20
    DBUG_EXECUTE_IF("MemTableWriter.write.random_insert_error", {
124
20
        if (rand() % 100 < (100 * dp->param("percent", 0.3))) {
125
20
            st = Status::InternalError<false>("write memtable random failed for debug");
126
20
        }
127
20
    });
128
20
    if (!st.ok()) [[unlikely]] {
129
0
        _reset_mem_table();
130
0
        return st;
131
0
    }
132
133
20
    if (UNLIKELY(_mem_table->need_agg() && config::enable_shrink_memory)) {
134
0
        _mem_table->shrink_memtable_by_agg();
135
0
    }
136
20
    if (UNLIKELY(_mem_table->need_flush())) {
137
0
        auto s = _flush_memtable_async();
138
0
        _reset_mem_table();
139
0
        if (UNLIKELY(!s.ok())) {
140
0
            return s;
141
0
        }
142
0
    }
143
144
20
    return Status::OK();
145
20
}
146
147
15
Status MemTableWriter::_flush_memtable_async() {
148
15
    DCHECK(_flush_token != nullptr);
149
15
    std::shared_ptr<MemTable> memtable;
150
15
    {
151
15
        std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
152
15
        memtable = _mem_table;
153
15
        _mem_table = nullptr;
154
15
    }
155
15
    {
156
15
        std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
157
15
        memtable->update_mem_type(MemType::WRITE_FINISHED);
158
15
        _freezed_mem_tables.push_back(memtable);
159
15
    }
160
15
    return _flush_token->submit(memtable);
161
15
}
162
163
0
Status MemTableWriter::flush_async() {
164
0
    std::lock_guard<std::mutex> l(_lock);
165
    // In order to avoid repeated ATTACH, use SWITCH here. have two calling paths:
166
    // 1. call by local, from `VTabletWriterV2::_write_memtable`, has been ATTACH Load memory tracker
167
    // into thread context, ATTACH cannot be repeated here.
168
    // 2. call by remote, from `LoadChannelMgr::_get_load_channel`, no ATTACH because LoadChannelMgr
169
    // not know Load context.
170
0
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
171
0
    if (!_is_init || _is_closed) {
172
        // This writer is uninitialized or closed before flushing, do nothing.
173
        // We return OK instead of NOT_INITIALIZED or ALREADY_CLOSED.
174
        // Because this method maybe called when trying to reduce mem consumption,
175
        // and at that time, the writer may not be initialized yet and that is a normal case.
176
0
        return Status::OK();
177
0
    }
178
179
0
    if (_is_cancelled) {
180
0
        return _cancel_status;
181
0
    }
182
183
0
    VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: "
184
0
                << _mem_table->memory_usage() << ", tablet: " << _req.tablet_id
185
0
                << ", load id: " << print_id(_req.load_id);
186
0
    auto s = _flush_memtable_async();
187
0
    _reset_mem_table();
188
0
    return s;
189
0
}
190
191
9
Status MemTableWriter::wait_flush() {
192
9
    {
193
9
        std::lock_guard<std::mutex> l(_lock);
194
9
        if (!_is_init || _is_closed) {
195
            // return OK instead of NOT_INITIALIZED or ALREADY_CLOSED for same reason
196
            // as described in flush_async()
197
9
            return Status::OK();
198
9
        }
199
0
        if (_is_cancelled) {
200
0
            return _cancel_status;
201
0
        }
202
0
    }
203
0
    SCOPED_RAW_TIMER(&_wait_flush_time_ns);
204
0
    RETURN_IF_ERROR(_flush_token->wait());
205
0
    return Status::OK();
206
0
}
207
208
15
void MemTableWriter::_reset_mem_table() {
209
15
    {
210
15
        std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
211
15
        _mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema, _req.slots, _req.tuple_desc,
212
15
                                      _unique_key_mow, _partial_update_info.get()));
213
15
    }
214
215
15
    _segment_num++;
216
15
}
217
218
15
Status MemTableWriter::close() {
219
15
    _lock_watch.start();
220
15
    std::lock_guard<std::mutex> l(_lock);
221
15
    _lock_watch.stop();
222
15
    if (_is_cancelled) {
223
0
        return _cancel_status;
224
0
    }
225
15
    if (!_is_init) {
226
0
        return Status::Error<NOT_INITIALIZED>("delta segment writer has not been initialized");
227
0
    }
228
15
    if (_is_closed) {
229
0
        LOG(WARNING) << "close after closed tablet_id=" << _req.tablet_id
230
0
                     << " load_id=" << _req.load_id;
231
0
        return Status::OK();
232
0
    }
233
234
15
    auto s = _flush_memtable_async();
235
15
    {
236
15
        std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
237
15
        _mem_table.reset();
238
15
    }
239
15
    _is_closed = true;
240
15
    if (UNLIKELY(!s.ok())) {
241
0
        return s;
242
15
    } else {
243
15
        return Status::OK();
244
15
    }
245
15
}
246
247
15
Status MemTableWriter::_do_close_wait() {
248
15
    SCOPED_RAW_TIMER(&_close_wait_time_ns);
249
15
    std::lock_guard<std::mutex> l(_lock);
250
15
    DCHECK(_is_init)
251
0
            << "delta writer is supposed be to initialized before close_wait() being called";
252
253
15
    if (_is_cancelled) {
254
0
        return _cancel_status;
255
0
    }
256
257
15
    Status st;
258
    // return error if previous flush failed
259
15
    {
260
15
        SCOPED_RAW_TIMER(&_wait_flush_time_ns);
261
15
        st = _flush_token->wait();
262
15
    }
263
15
    if (UNLIKELY(!st.ok())) {
264
0
        LOG(WARNING) << "previous flush failed tablet " << _req.tablet_id;
265
0
        return st;
266
0
    }
267
268
15
    if (_rowset_writer->num_rows() + _flush_token->memtable_stat().merged_rows !=
269
15
        _total_received_rows) {
270
0
        LOG(WARNING) << "the rows number written doesn't match, rowset num rows written to file: "
271
0
                     << _rowset_writer->num_rows()
272
0
                     << ", merged_rows: " << _flush_token->memtable_stat().merged_rows
273
0
                     << ", total received rows: " << _total_received_rows;
274
0
        return Status::InternalError("rows number written by delta writer dosen't match");
275
0
    }
276
277
    // const FlushStatistic& stat = _flush_token->get_stats();
278
    // print slow log if wait more than 1s
279
    /*if (_wait_flush_timer->elapsed_time() > 1000UL * 1000 * 1000) {
280
        LOG(INFO) << "close delta writer for tablet: " << req.tablet_id
281
                  << ", load id: " << print_id(_req.load_id) << ", wait close for "
282
                  << _wait_flush_timer->elapsed_time() << "(ns), stats: " << stat;
283
    }*/
284
285
15
    return Status::OK();
286
15
}
287
288
15
void MemTableWriter::_update_profile(RuntimeProfile* profile) {
289
    // NOTE: MemTableWriter may be accessed when profile is out of scope, in MemTableMemoryLimiter.
290
    // To avoid accessing dangling pointers, we cannot make profile as a member of MemTableWriter.
291
15
    auto child =
292
15
            profile->create_child(fmt::format("MemTableWriter {}", _req.tablet_id), true, true);
293
15
    auto lock_timer = ADD_TIMER(child, "LockTime");
294
15
    auto sort_timer = ADD_TIMER(child, "MemTableSortTime");
295
15
    auto agg_timer = ADD_TIMER(child, "MemTableAggTime");
296
15
    auto memtable_duration_timer = ADD_TIMER(child, "MemTableDurationTime");
297
15
    auto segment_writer_timer = ADD_TIMER(child, "SegmentWriterTime");
298
15
    auto wait_flush_timer = ADD_TIMER(child, "MemTableWaitFlushTime");
299
15
    auto put_into_output_timer = ADD_TIMER(child, "MemTablePutIntoOutputTime");
300
15
    auto delete_bitmap_timer = ADD_TIMER(child, "DeleteBitmapTime");
301
15
    auto close_wait_timer = ADD_TIMER(child, "CloseWaitTime");
302
15
    auto sort_times = ADD_COUNTER(child, "MemTableSortTimes", TUnit::UNIT);
303
15
    auto agg_times = ADD_COUNTER(child, "MemTableAggTimes", TUnit::UNIT);
304
15
    auto segment_num = ADD_COUNTER(child, "SegmentNum", TUnit::UNIT);
305
15
    auto raw_rows_num = ADD_COUNTER(child, "RawRowNum", TUnit::UNIT);
306
15
    auto merged_rows_num = ADD_COUNTER(child, "MergedRowNum", TUnit::UNIT);
307
308
15
    COUNTER_UPDATE(lock_timer, _lock_watch.elapsed_time());
309
15
    COUNTER_SET(delete_bitmap_timer, _rowset_writer->delete_bitmap_ns());
310
15
    COUNTER_SET(segment_writer_timer, _rowset_writer->segment_writer_ns());
311
15
    COUNTER_SET(wait_flush_timer, _wait_flush_time_ns);
312
15
    COUNTER_SET(close_wait_timer, _close_wait_time_ns);
313
15
    COUNTER_SET(segment_num, _segment_num);
314
15
    const auto& memtable_stat = _flush_token->memtable_stat();
315
15
    COUNTER_SET(sort_timer, memtable_stat.sort_ns);
316
15
    COUNTER_SET(agg_timer, memtable_stat.agg_ns);
317
15
    COUNTER_SET(memtable_duration_timer, memtable_stat.duration_ns);
318
15
    COUNTER_SET(put_into_output_timer, memtable_stat.put_into_output_ns);
319
15
    COUNTER_SET(sort_times, memtable_stat.sort_times);
320
15
    COUNTER_SET(agg_times, memtable_stat.agg_times);
321
15
    COUNTER_SET(raw_rows_num, memtable_stat.raw_rows);
322
15
    COUNTER_SET(merged_rows_num, memtable_stat.merged_rows);
323
15
}
324
325
15
Status MemTableWriter::cancel() {
326
15
    return cancel_with_status(Status::Cancelled("already cancelled"));
327
15
}
328
329
15
Status MemTableWriter::cancel_with_status(const Status& st) {
330
15
    std::lock_guard<std::mutex> l(_lock);
331
15
    if (_is_cancelled) {
332
0
        return Status::OK();
333
0
    }
334
15
    {
335
15
        std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
336
15
        _mem_table.reset();
337
15
    }
338
15
    if (_flush_token != nullptr) {
339
        // cancel and wait all memtables in flush queue to be finished
340
15
        _flush_token->cancel();
341
15
    }
342
15
    _is_cancelled = true;
343
15
    _cancel_status = st;
344
15
    return Status::OK();
345
15
}
346
347
15
const FlushStatistic& MemTableWriter::get_flush_token_stats() {
348
15
    return _flush_token->get_stats();
349
15
}
350
351
20
uint64_t MemTableWriter::flush_running_count() const {
352
20
    return _flush_token == nullptr ? 0 : _flush_token->get_stats().flush_running_count.load();
353
20
}
354
355
0
int64_t MemTableWriter::mem_consumption(MemType mem) {
356
0
    if (!_is_init) {
357
        // This method may be called before this writer is initialized.
358
        // So _flush_token may be null.
359
0
        return 0;
360
0
    }
361
0
    int64_t mem_usage = 0;
362
0
    {
363
0
        std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
364
0
        for (const auto& mem_table : _freezed_mem_tables) {
365
0
            auto mem_table_sptr = mem_table.lock();
366
0
            if (mem_table_sptr != nullptr && mem_table_sptr->get_mem_type() == mem) {
367
0
                mem_usage += mem_table_sptr->memory_usage();
368
0
            }
369
0
        }
370
0
    }
371
0
    return mem_usage;
372
0
}
373
374
0
int64_t MemTableWriter::active_memtable_mem_consumption() {
375
0
    std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
376
0
    return _mem_table != nullptr ? _mem_table->memory_usage() : 0;
377
0
}
378
379
} // namespace doris