Coverage Report

Created: 2025-06-07 21:54

/root/doris/be/src/olap/memtable_writer.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "olap/memtable_writer.h"
19
20
#include <fmt/format.h>
21
22
#include <filesystem>
23
#include <ostream>
24
#include <string>
25
#include <utility>
26
27
#include "common/compiler_util.h" // IWYU pragma: keep
28
#include "common/config.h"
29
#include "common/logging.h"
30
#include "common/status.h"
31
#include "exec/tablet_info.h"
32
#include "gutil/strings/numbers.h"
33
#include "io/fs/file_writer.h" // IWYU pragma: keep
34
#include "olap/memtable.h"
35
#include "olap/memtable_flush_executor.h"
36
#include "olap/memtable_memory_limiter.h"
37
#include "olap/rowset/beta_rowset_writer.h"
38
#include "olap/rowset/rowset_writer.h"
39
#include "olap/schema_change.h"
40
#include "olap/storage_engine.h"
41
#include "olap/tablet_schema.h"
42
#include "runtime/exec_env.h"
43
#include "runtime/memory/mem_tracker.h"
44
#include "service/backend_options.h"
45
#include "util/mem_info.h"
46
#include "util/stopwatch.hpp"
47
#include "vec/core/block.h"
48
49
namespace doris {
50
using namespace ErrorCode;
51
52
13
MemTableWriter::MemTableWriter(const WriteRequest& req) : _req(req) {}
53
54
13
MemTableWriter::~MemTableWriter() {
55
13
    if (!_is_init) {
56
2
        return;
57
2
    }
58
11
    if (_flush_token != nullptr) {
59
        // cancel and wait all memtables in flush queue to be finished
60
11
        _flush_token->cancel();
61
11
    }
62
11
    _mem_table.reset();
63
11
}
64
65
Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer,
66
                            TabletSchemaSPtr tablet_schema,
67
                            std::shared_ptr<PartialUpdateInfo> partial_update_info,
68
11
                            std::shared_ptr<WorkloadGroup> wg_sptr, bool unique_key_mow) {
69
11
    _rowset_writer = rowset_writer;
70
11
    _tablet_schema = tablet_schema;
71
11
    _unique_key_mow = unique_key_mow;
72
11
    _partial_update_info = partial_update_info;
73
11
    _query_thread_context.init();
74
75
11
    _reset_mem_table();
76
77
    // create flush handler
78
    // by assigning segment_id to memtable before submiting to flush executor,
79
    // we can make sure same keys sort in the same order in all replicas.
80
11
    RETURN_IF_ERROR(StorageEngine::instance()->memtable_flush_executor()->create_flush_token(
81
11
            _flush_token, _rowset_writer, _req.is_high_priority, wg_sptr));
82
83
11
    _is_init = true;
84
11
    return Status::OK();
85
11
}
86
87
Status MemTableWriter::write(const vectorized::Block* block,
88
12
                             const std::vector<uint32_t>& row_idxs) {
89
12
    if (UNLIKELY(row_idxs.empty())) {
90
0
        return Status::OK();
91
0
    }
92
12
    _lock_watch.start();
93
12
    std::lock_guard<std::mutex> l(_lock);
94
12
    _lock_watch.stop();
95
12
    if (_is_cancelled) {
96
0
        return _cancel_status;
97
0
    }
98
12
    if (!_is_init) {
99
0
        return Status::Error<NOT_INITIALIZED>("delta segment writer has not been initialized");
100
0
    }
101
12
    if (_is_closed) {
102
0
        return Status::Error<ALREADY_CLOSED>("write block after closed tablet_id={}, load_id={}-{}",
103
0
                                             _req.tablet_id, _req.load_id.hi(), _req.load_id.lo());
104
0
    }
105
106
12
    _total_received_rows += row_idxs.size();
107
12
    RETURN_IF_ERROR(_mem_table->insert(block, row_idxs));
108
109
12
    if (UNLIKELY(_mem_table->need_agg() && config::enable_shrink_memory)) {
110
0
        _mem_table->shrink_memtable_by_agg();
111
0
    }
112
12
    if (UNLIKELY(_mem_table->need_flush())) {
113
0
        auto s = _flush_memtable_async();
114
0
        _reset_mem_table();
115
0
        if (UNLIKELY(!s.ok())) {
116
0
            return s;
117
0
        }
118
0
    }
119
120
12
    return Status::OK();
121
12
}
122
123
11
Status MemTableWriter::_flush_memtable_async() {
124
11
    DCHECK(_flush_token != nullptr);
125
11
    std::unique_ptr<MemTable> memtable;
126
11
    {
127
11
        std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
128
11
        memtable = std::move(_mem_table);
129
11
    }
130
11
    return _flush_token->submit(std::move(memtable));
131
11
}
132
133
0
Status MemTableWriter::flush_async() {
134
0
    std::lock_guard<std::mutex> l(_lock);
135
    // In order to avoid repeated ATTACH, use SWITCH here. have two calling paths:
136
    // 1. call by local, from `VTabletWriterV2::_write_memtable`, has been ATTACH Load memory tracker
137
    // into thread context, ATTACH cannot be repeated here.
138
    // 2. call by remote, from `LoadChannelMgr::_get_load_channel`, no ATTACH because LoadChannelMgr
139
    // not know Load context.
140
0
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
141
0
    if (!_is_init || _is_closed) {
142
        // This writer is uninitialized or closed before flushing, do nothing.
143
        // We return OK instead of NOT_INITIALIZED or ALREADY_CLOSED.
144
        // Because this method maybe called when trying to reduce mem consumption,
145
        // and at that time, the writer may not be initialized yet and that is a normal case.
146
0
        return Status::OK();
147
0
    }
148
149
0
    if (_is_cancelled) {
150
0
        return _cancel_status;
151
0
    }
152
153
0
    VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: "
154
0
                << _mem_table->memory_usage() << ", tablet: " << _req.tablet_id
155
0
                << ", load id: " << print_id(_req.load_id);
156
0
    auto s = _flush_memtable_async();
157
0
    _reset_mem_table();
158
0
    return s;
159
0
}
160
161
5
Status MemTableWriter::wait_flush() {
162
5
    {
163
5
        std::lock_guard<std::mutex> l(_lock);
164
5
        if (!_is_init || _is_closed) {
165
            // return OK instead of NOT_INITIALIZED or ALREADY_CLOSED for same reason
166
            // as described in flush_async()
167
5
            return Status::OK();
168
5
        }
169
0
        if (_is_cancelled) {
170
0
            return _cancel_status;
171
0
        }
172
0
    }
173
0
    SCOPED_RAW_TIMER(&_wait_flush_time_ns);
174
0
    RETURN_IF_ERROR(_flush_token->wait());
175
0
    return Status::OK();
176
0
}
177
178
11
void MemTableWriter::_reset_mem_table() {
179
#ifndef BE_TEST
180
    auto mem_table_insert_tracker = std::make_shared<MemTracker>(
181
            fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
182
                        std::to_string(tablet_id()), _mem_table_num,
183
                        UniqueId(_req.load_id).to_string()),
184
            ExecEnv::GetInstance()->memtable_memory_limiter()->memtable_tracker_set());
185
    auto mem_table_flush_tracker = std::make_shared<MemTracker>(
186
            fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}",
187
                        std::to_string(tablet_id()), _mem_table_num++,
188
                        UniqueId(_req.load_id).to_string()),
189
            ExecEnv::GetInstance()->memtable_memory_limiter()->memtable_tracker_set());
190
#else
191
11
    auto mem_table_insert_tracker = std::make_shared<MemTracker>(fmt::format(
192
11
            "MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
193
11
            std::to_string(tablet_id()), _mem_table_num, UniqueId(_req.load_id).to_string()));
194
11
    auto mem_table_flush_tracker = std::make_shared<MemTracker>(fmt::format(
195
11
            "MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}", std::to_string(tablet_id()),
196
11
            _mem_table_num++, UniqueId(_req.load_id).to_string()));
197
11
#endif
198
11
    {
199
11
        std::lock_guard<SpinLock> l(_mem_table_tracker_lock);
200
11
        _mem_table_insert_trackers.push_back(mem_table_insert_tracker);
201
11
        _mem_table_flush_trackers.push_back(mem_table_flush_tracker);
202
11
    }
203
11
    {
204
11
        std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
205
11
        _mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema, _req.slots, _req.tuple_desc,
206
11
                                      _unique_key_mow, _partial_update_info.get(),
207
11
                                      mem_table_insert_tracker, mem_table_flush_tracker));
208
11
    }
209
210
11
    _segment_num++;
211
11
}
212
213
11
Status MemTableWriter::close() {
214
11
    _lock_watch.start();
215
11
    std::lock_guard<std::mutex> l(_lock);
216
11
    _lock_watch.stop();
217
11
    if (_is_cancelled) {
218
0
        return _cancel_status;
219
0
    }
220
11
    if (!_is_init) {
221
0
        return Status::Error<NOT_INITIALIZED>("delta segment writer has not been initialized");
222
0
    }
223
11
    if (_is_closed) {
224
0
        LOG(WARNING) << "close after closed tablet_id=" << _req.tablet_id
225
0
                     << " load_id=" << _req.load_id;
226
0
        return Status::OK();
227
0
    }
228
229
11
    auto s = _flush_memtable_async();
230
11
    {
231
11
        std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
232
11
        _mem_table.reset();
233
11
    }
234
11
    _is_closed = true;
235
11
    if (UNLIKELY(!s.ok())) {
236
0
        return s;
237
11
    } else {
238
11
        return Status::OK();
239
11
    }
240
11
}
241
242
11
Status MemTableWriter::_do_close_wait() {
243
11
    SCOPED_RAW_TIMER(&_close_wait_time_ns);
244
11
    std::lock_guard<std::mutex> l(_lock);
245
11
    DCHECK(_is_init)
246
0
            << "delta writer is supposed be to initialized before close_wait() being called";
247
248
11
    if (_is_cancelled) {
249
0
        return _cancel_status;
250
0
    }
251
252
11
    Status st;
253
    // return error if previous flush failed
254
11
    {
255
11
        SCOPED_RAW_TIMER(&_wait_flush_time_ns);
256
11
        st = _flush_token->wait();
257
11
    }
258
11
    if (UNLIKELY(!st.ok())) {
259
0
        LOG(WARNING) << "previous flush failed tablet " << _req.tablet_id;
260
0
        return st;
261
0
    }
262
263
11
    if (_rowset_writer->num_rows() + _flush_token->memtable_stat().merged_rows !=
264
11
        _total_received_rows) {
265
0
        LOG(WARNING) << "the rows number written doesn't match, rowset num rows written to file: "
266
0
                     << _rowset_writer->num_rows()
267
0
                     << ", merged_rows: " << _flush_token->memtable_stat().merged_rows
268
0
                     << ", total received rows: " << _total_received_rows;
269
0
        return Status::InternalError("rows number written by delta writer dosen't match");
270
0
    }
271
272
    // const FlushStatistic& stat = _flush_token->get_stats();
273
    // print slow log if wait more than 1s
274
    /*if (_wait_flush_timer->elapsed_time() > 1000UL * 1000 * 1000) {
275
        LOG(INFO) << "close delta writer for tablet: " << req.tablet_id
276
                  << ", load id: " << print_id(_req.load_id) << ", wait close for "
277
                  << _wait_flush_timer->elapsed_time() << "(ns), stats: " << stat;
278
    }*/
279
280
11
    return Status::OK();
281
11
}
282
283
11
void MemTableWriter::_update_profile(RuntimeProfile* profile) {
284
    // NOTE: MemTableWriter may be accessed when profile is out of scope, in MemTableMemoryLimiter.
285
    // To avoid accessing dangling pointers, we cannot make profile as a member of MemTableWriter.
286
11
    auto child =
287
11
            profile->create_child(fmt::format("MemTableWriter {}", _req.tablet_id), true, true);
288
11
    auto lock_timer = ADD_TIMER(child, "LockTime");
289
11
    auto sort_timer = ADD_TIMER(child, "MemTableSortTime");
290
11
    auto agg_timer = ADD_TIMER(child, "MemTableAggTime");
291
11
    auto memtable_duration_timer = ADD_TIMER(child, "MemTableDurationTime");
292
11
    auto segment_writer_timer = ADD_TIMER(child, "SegmentWriterTime");
293
11
    auto wait_flush_timer = ADD_TIMER(child, "MemTableWaitFlushTime");
294
11
    auto put_into_output_timer = ADD_TIMER(child, "MemTablePutIntoOutputTime");
295
11
    auto delete_bitmap_timer = ADD_TIMER(child, "DeleteBitmapTime");
296
11
    auto close_wait_timer = ADD_TIMER(child, "CloseWaitTime");
297
11
    auto sort_times = ADD_COUNTER(child, "MemTableSortTimes", TUnit::UNIT);
298
11
    auto agg_times = ADD_COUNTER(child, "MemTableAggTimes", TUnit::UNIT);
299
11
    auto segment_num = ADD_COUNTER(child, "SegmentNum", TUnit::UNIT);
300
11
    auto raw_rows_num = ADD_COUNTER(child, "RawRowNum", TUnit::UNIT);
301
11
    auto merged_rows_num = ADD_COUNTER(child, "MergedRowNum", TUnit::UNIT);
302
303
11
    COUNTER_UPDATE(lock_timer, _lock_watch.elapsed_time());
304
11
    COUNTER_SET(delete_bitmap_timer, _rowset_writer->delete_bitmap_ns());
305
11
    COUNTER_SET(segment_writer_timer, _rowset_writer->segment_writer_ns());
306
11
    COUNTER_SET(wait_flush_timer, _wait_flush_time_ns);
307
11
    COUNTER_SET(close_wait_timer, _close_wait_time_ns);
308
11
    COUNTER_SET(segment_num, _segment_num);
309
11
    const auto& memtable_stat = _flush_token->memtable_stat();
310
11
    COUNTER_SET(sort_timer, memtable_stat.sort_ns);
311
11
    COUNTER_SET(agg_timer, memtable_stat.agg_ns);
312
11
    COUNTER_SET(memtable_duration_timer, memtable_stat.duration_ns);
313
11
    COUNTER_SET(put_into_output_timer, memtable_stat.put_into_output_ns);
314
11
    COUNTER_SET(sort_times, memtable_stat.sort_times);
315
11
    COUNTER_SET(agg_times, memtable_stat.agg_times);
316
11
    COUNTER_SET(raw_rows_num, memtable_stat.raw_rows);
317
11
    COUNTER_SET(merged_rows_num, memtable_stat.merged_rows);
318
11
}
319
320
11
Status MemTableWriter::cancel() {
321
11
    return cancel_with_status(Status::Cancelled("already cancelled"));
322
11
}
323
324
11
Status MemTableWriter::cancel_with_status(const Status& st) {
325
11
    std::lock_guard<std::mutex> l(_lock);
326
11
    if (_is_cancelled) {
327
0
        return Status::OK();
328
0
    }
329
11
    {
330
11
        std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
331
11
        _mem_table.reset();
332
11
    }
333
11
    if (_flush_token != nullptr) {
334
        // cancel and wait all memtables in flush queue to be finished
335
11
        _flush_token->cancel();
336
11
    }
337
11
    _is_cancelled = true;
338
11
    _cancel_status = st;
339
11
    return Status::OK();
340
11
}
341
342
11
const FlushStatistic& MemTableWriter::get_flush_token_stats() {
343
11
    return _flush_token->get_stats();
344
11
}
345
346
12
uint64_t MemTableWriter::flush_running_count() const {
347
12
    return _flush_token == nullptr ? 0 : _flush_token->get_stats().flush_running_count.load();
348
12
}
349
350
0
int64_t MemTableWriter::mem_consumption(MemType mem) {
351
0
    if (!_is_init) {
352
        // This method may be called before this writer is initialized.
353
        // So _flush_token may be null.
354
0
        return 0;
355
0
    }
356
0
    int64_t mem_usage = 0;
357
0
    {
358
0
        std::lock_guard<SpinLock> l(_mem_table_tracker_lock);
359
0
        if ((mem & MemType::WRITE) == MemType::WRITE) { // 3 & 2 = 2
360
0
            for (const auto& mem_table_tracker : _mem_table_insert_trackers) {
361
0
                mem_usage += mem_table_tracker->consumption();
362
0
            }
363
0
        }
364
0
        if ((mem & MemType::FLUSH) == MemType::FLUSH) { // 3 & 1 = 1
365
0
            for (const auto& mem_table_tracker : _mem_table_flush_trackers) {
366
0
                mem_usage += mem_table_tracker->consumption();
367
0
            }
368
0
        }
369
0
    }
370
0
    return mem_usage;
371
0
}
372
373
0
int64_t MemTableWriter::active_memtable_mem_consumption() {
374
0
    std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
375
0
    return _mem_table != nullptr ? _mem_table->memory_usage() : 0;
376
0
}
377
378
} // namespace doris