Coverage Report

Created: 2025-03-12 11:23

/root/doris/be/src/olap/memtable_writer.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "olap/memtable_writer.h"
19
20
#include <fmt/format.h>
21
22
#include <filesystem>
23
#include <ostream>
24
#include <string>
25
#include <utility>
26
27
#include "common/compiler_util.h" // IWYU pragma: keep
28
#include "common/config.h"
29
#include "common/logging.h"
30
#include "common/status.h"
31
#include "exec/tablet_info.h"
32
#include "gutil/strings/numbers.h"
33
#include "io/fs/file_writer.h" // IWYU pragma: keep
34
#include "olap/memtable.h"
35
#include "olap/memtable_flush_executor.h"
36
#include "olap/memtable_memory_limiter.h"
37
#include "olap/rowset/beta_rowset_writer.h"
38
#include "olap/rowset/rowset_writer.h"
39
#include "olap/schema_change.h"
40
#include "olap/storage_engine.h"
41
#include "olap/tablet_schema.h"
42
#include "runtime/exec_env.h"
43
#include "runtime/memory/mem_tracker.h"
44
#include "service/backend_options.h"
45
#include "util/mem_info.h"
46
#include "util/stopwatch.hpp"
47
#include "vec/core/block.h"
48
49
namespace doris {
50
using namespace ErrorCode;
51
52
16
MemTableWriter::MemTableWriter(const WriteRequest& req) : _req(req) {}
53
54
16
MemTableWriter::~MemTableWriter() {
55
16
    if (!_is_init) {
56
2
        return;
57
2
    }
58
14
    if (_flush_token != nullptr) {
59
        // cancel and wait all memtables in flush queue to be finished
60
14
        _flush_token->cancel();
61
14
    }
62
14
    _mem_table.reset();
63
14
}
64
65
Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer,
66
                            TabletSchemaSPtr tablet_schema,
67
                            std::shared_ptr<PartialUpdateInfo> partial_update_info,
68
14
                            std::shared_ptr<WorkloadGroup> wg_sptr, bool unique_key_mow) {
69
14
    _rowset_writer = rowset_writer;
70
14
    _tablet_schema = tablet_schema;
71
14
    _unique_key_mow = unique_key_mow;
72
14
    _partial_update_info = partial_update_info;
73
14
    _resource_ctx = thread_context()->resource_ctx();
74
75
14
    _reset_mem_table();
76
77
    // create flush handler
78
    // by assigning segment_id to memtable before submiting to flush executor,
79
    // we can make sure same keys sort in the same order in all replicas.
80
14
    RETURN_IF_ERROR(
81
14
            ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->create_flush_token(
82
14
                    _flush_token, _rowset_writer, _req.is_high_priority, wg_sptr));
83
84
14
    _is_init = true;
85
14
    return Status::OK();
86
14
}
87
88
Status MemTableWriter::write(const vectorized::Block* block,
89
19
                             const DorisVector<uint32_t>& row_idxs) {
90
19
    if (UNLIKELY(row_idxs.empty())) {
91
0
        return Status::OK();
92
0
    }
93
19
    _lock_watch.start();
94
19
    std::lock_guard<std::mutex> l(_lock);
95
19
    _lock_watch.stop();
96
19
    if (_is_cancelled) {
97
0
        return _cancel_status;
98
0
    }
99
19
    if (!_is_init) {
100
0
        return Status::Error<NOT_INITIALIZED>("delta segment writer has not been initialized");
101
0
    }
102
19
    if (_is_closed) {
103
0
        return Status::Error<ALREADY_CLOSED>("write block after closed tablet_id={}, load_id={}-{}",
104
0
                                             _req.tablet_id, _req.load_id.hi(), _req.load_id.lo());
105
0
    }
106
107
19
    _total_received_rows += row_idxs.size();
108
19
    auto st = _mem_table->insert(block, row_idxs);
109
110
    // Reset memtable immediately after insert failure to prevent potential flush operations.
111
    // This is a defensive measure because:
112
    // 1. When insert fails (e.g., memory allocation failure during add_rows),
113
    //    the memtable is in an inconsistent state and should not be flushed
114
    // 2. However, memory pressure might trigger a flush operation on this failed memtable
115
    // 3. By resetting here, we ensure the failed memtable won't be included in any subsequent flush,
116
    //    thus preventing potential crashes
117
19
    if (!st.ok()) [[unlikely]] {
118
0
        std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
119
0
        _reset_mem_table();
120
0
        return st;
121
0
    }
122
123
19
    if (UNLIKELY(_mem_table->need_agg() && config::enable_shrink_memory)) {
124
0
        _mem_table->shrink_memtable_by_agg();
125
0
    }
126
19
    if (UNLIKELY(_mem_table->need_flush())) {
127
0
        auto s = _flush_memtable_async();
128
0
        _reset_mem_table();
129
0
        if (UNLIKELY(!s.ok())) {
130
0
            return s;
131
0
        }
132
0
    }
133
134
19
    return Status::OK();
135
19
}
136
137
14
Status MemTableWriter::_flush_memtable_async() {
138
14
    DCHECK(_flush_token != nullptr);
139
14
    std::shared_ptr<MemTable> memtable;
140
14
    {
141
14
        std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
142
14
        memtable = _mem_table;
143
14
        _mem_table = nullptr;
144
14
    }
145
14
    {
146
14
        std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
147
14
        memtable->update_mem_type(MemType::WRITE_FINISHED);
148
14
        _freezed_mem_tables.push_back(memtable);
149
14
    }
150
14
    return _flush_token->submit(memtable);
151
14
}
152
153
0
Status MemTableWriter::flush_async() {
154
0
    std::lock_guard<std::mutex> l(_lock);
155
    // Three calling paths:
156
    // 1. call by local, from `VTabletWriterV2::_write_memtable`.
157
    // 2. call by remote, from `LoadChannelMgr::_get_load_channel`.
158
    // 3. call by daemon thread, from `handle_paused_queries` -> `flush_workload_group_memtables`.
159
0
    SCOPED_SWITCH_RESOURCE_CONTEXT(_resource_ctx);
160
0
    if (!_is_init || _is_closed) {
161
        // This writer is uninitialized or closed before flushing, do nothing.
162
        // We return OK instead of NOT_INITIALIZED or ALREADY_CLOSED.
163
        // Because this method maybe called when trying to reduce mem consumption,
164
        // and at that time, the writer may not be initialized yet and that is a normal case.
165
0
        return Status::OK();
166
0
    }
167
168
0
    if (_is_cancelled) {
169
0
        return _cancel_status;
170
0
    }
171
172
0
    VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: "
173
0
                << PrettyPrinter::print_bytes(_mem_table->memory_usage())
174
0
                << ", tablet: " << _req.tablet_id << ", load id: " << print_id(_req.load_id);
175
0
    auto s = _flush_memtable_async();
176
0
    _reset_mem_table();
177
0
    return s;
178
0
}
179
180
8
Status MemTableWriter::wait_flush() {
181
8
    {
182
8
        std::lock_guard<std::mutex> l(_lock);
183
8
        if (!_is_init || _is_closed) {
184
            // return OK instead of NOT_INITIALIZED or ALREADY_CLOSED for same reason
185
            // as described in flush_async()
186
8
            return Status::OK();
187
8
        }
188
0
        if (_is_cancelled) {
189
0
            return _cancel_status;
190
0
        }
191
0
    }
192
0
    SCOPED_RAW_TIMER(&_wait_flush_time_ns);
193
0
    RETURN_IF_ERROR(_flush_token->wait());
194
0
    return Status::OK();
195
0
}
196
197
14
void MemTableWriter::_reset_mem_table() {
198
14
    {
199
14
        std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
200
14
        _mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema, _req.slots, _req.tuple_desc,
201
14
                                      _unique_key_mow, _partial_update_info.get()));
202
14
    }
203
204
14
    _segment_num++;
205
14
}
206
207
14
Status MemTableWriter::close() {
208
14
    _lock_watch.start();
209
14
    std::lock_guard<std::mutex> l(_lock);
210
14
    _lock_watch.stop();
211
14
    if (_is_cancelled) {
212
0
        return _cancel_status;
213
0
    }
214
14
    if (!_is_init) {
215
0
        return Status::Error<NOT_INITIALIZED>("delta segment writer has not been initialized");
216
0
    }
217
14
    if (_is_closed) {
218
0
        LOG(WARNING) << "close after closed tablet_id=" << _req.tablet_id
219
0
                     << " load_id=" << _req.load_id;
220
0
        return Status::OK();
221
0
    }
222
223
14
    auto s = _flush_memtable_async();
224
14
    {
225
14
        std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
226
14
        _mem_table.reset();
227
14
    }
228
14
    _is_closed = true;
229
14
    if (UNLIKELY(!s.ok())) {
230
0
        return s;
231
14
    } else {
232
14
        return Status::OK();
233
14
    }
234
14
}
235
236
14
Status MemTableWriter::_do_close_wait() {
237
14
    SCOPED_RAW_TIMER(&_close_wait_time_ns);
238
14
    std::lock_guard<std::mutex> l(_lock);
239
14
    DCHECK(_is_init)
240
0
            << "delta writer is supposed be to initialized before close_wait() being called";
241
242
14
    if (_is_cancelled) {
243
0
        return _cancel_status;
244
0
    }
245
246
14
    Status st;
247
    // return error if previous flush failed
248
14
    {
249
14
        SCOPED_RAW_TIMER(&_wait_flush_time_ns);
250
14
        st = _flush_token->wait();
251
14
    }
252
14
    if (UNLIKELY(!st.ok())) {
253
0
        LOG(WARNING) << "previous flush failed tablet " << _req.tablet_id;
254
0
        return st;
255
0
    }
256
257
14
    if (_rowset_writer->num_rows() + _flush_token->memtable_stat().merged_rows !=
258
14
        _total_received_rows) {
259
0
        LOG(WARNING) << "the rows number written doesn't match, rowset num rows written to file: "
260
0
                     << _rowset_writer->num_rows()
261
0
                     << ", merged_rows: " << _flush_token->memtable_stat().merged_rows
262
0
                     << ", total received rows: " << _total_received_rows;
263
0
        return Status::InternalError("rows number written by delta writer dosen't match");
264
0
    }
265
266
    // const FlushStatistic& stat = _flush_token->get_stats();
267
    // print slow log if wait more than 1s
268
    /*if (_wait_flush_timer->elapsed_time() > 1000UL * 1000 * 1000) {
269
        LOG(INFO) << "close delta writer for tablet: " << req.tablet_id
270
                  << ", load id: " << print_id(_req.load_id) << ", wait close for "
271
                  << _wait_flush_timer->elapsed_time() << "(ns), stats: " << stat;
272
    }*/
273
274
14
    return Status::OK();
275
14
}
276
277
14
void MemTableWriter::_update_profile(RuntimeProfile* profile) {
278
    // NOTE: MemTableWriter may be accessed when profile is out of scope, in MemTableMemoryLimiter.
279
    // To avoid accessing dangling pointers, we cannot make profile as a member of MemTableWriter.
280
14
    auto child =
281
14
            profile->create_child(fmt::format("MemTableWriter {}", _req.tablet_id), true, true);
282
14
    auto lock_timer = ADD_TIMER(child, "LockTime");
283
14
    auto sort_timer = ADD_TIMER(child, "MemTableSortTime");
284
14
    auto agg_timer = ADD_TIMER(child, "MemTableAggTime");
285
14
    auto memtable_duration_timer = ADD_TIMER(child, "MemTableDurationTime");
286
14
    auto segment_writer_timer = ADD_TIMER(child, "SegmentWriterTime");
287
14
    auto wait_flush_timer = ADD_TIMER(child, "MemTableWaitFlushTime");
288
14
    auto put_into_output_timer = ADD_TIMER(child, "MemTablePutIntoOutputTime");
289
14
    auto delete_bitmap_timer = ADD_TIMER(child, "DeleteBitmapTime");
290
14
    auto close_wait_timer = ADD_TIMER(child, "CloseWaitTime");
291
14
    auto sort_times = ADD_COUNTER(child, "MemTableSortTimes", TUnit::UNIT);
292
14
    auto agg_times = ADD_COUNTER(child, "MemTableAggTimes", TUnit::UNIT);
293
14
    auto segment_num = ADD_COUNTER(child, "SegmentNum", TUnit::UNIT);
294
14
    auto raw_rows_num = ADD_COUNTER(child, "RawRowNum", TUnit::UNIT);
295
14
    auto merged_rows_num = ADD_COUNTER(child, "MergedRowNum", TUnit::UNIT);
296
297
14
    COUNTER_UPDATE(lock_timer, _lock_watch.elapsed_time());
298
14
    COUNTER_SET(delete_bitmap_timer, _rowset_writer->delete_bitmap_ns());
299
14
    COUNTER_SET(segment_writer_timer, _rowset_writer->segment_writer_ns());
300
14
    COUNTER_SET(wait_flush_timer, _wait_flush_time_ns);
301
14
    COUNTER_SET(close_wait_timer, _close_wait_time_ns);
302
14
    COUNTER_SET(segment_num, _segment_num);
303
14
    const auto& memtable_stat = _flush_token->memtable_stat();
304
14
    COUNTER_SET(sort_timer, memtable_stat.sort_ns);
305
14
    COUNTER_SET(agg_timer, memtable_stat.agg_ns);
306
14
    COUNTER_SET(memtable_duration_timer, memtable_stat.duration_ns);
307
14
    COUNTER_SET(put_into_output_timer, memtable_stat.put_into_output_ns);
308
14
    COUNTER_SET(sort_times, memtable_stat.sort_times);
309
14
    COUNTER_SET(agg_times, memtable_stat.agg_times);
310
14
    COUNTER_SET(raw_rows_num, memtable_stat.raw_rows);
311
14
    COUNTER_SET(merged_rows_num, memtable_stat.merged_rows);
312
14
}
313
314
14
Status MemTableWriter::cancel() {
315
14
    return cancel_with_status(Status::Cancelled("already cancelled"));
316
14
}
317
318
14
Status MemTableWriter::cancel_with_status(const Status& st) {
319
14
    std::lock_guard<std::mutex> l(_lock);
320
14
    if (_is_cancelled) {
321
0
        return Status::OK();
322
0
    }
323
14
    {
324
14
        std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
325
14
        _mem_table.reset();
326
14
    }
327
14
    if (_flush_token != nullptr) {
328
        // cancel and wait all memtables in flush queue to be finished
329
14
        _flush_token->cancel();
330
14
    }
331
14
    _is_cancelled = true;
332
14
    _cancel_status = st;
333
14
    return Status::OK();
334
14
}
335
336
14
const FlushStatistic& MemTableWriter::get_flush_token_stats() {
337
14
    return _flush_token->get_stats();
338
14
}
339
340
19
uint64_t MemTableWriter::flush_running_count() const {
341
19
    return _flush_token == nullptr ? 0 : _flush_token->get_stats().flush_running_count.load();
342
19
}
343
344
0
int64_t MemTableWriter::mem_consumption(MemType mem) {
345
0
    if (!_is_init) {
346
        // This method may be called before this writer is initialized.
347
        // So _flush_token may be null.
348
0
        return 0;
349
0
    }
350
0
    int64_t mem_usage = 0;
351
0
    {
352
0
        std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
353
0
        for (const auto& mem_table : _freezed_mem_tables) {
354
0
            auto mem_table_sptr = mem_table.lock();
355
0
            if (mem_table_sptr != nullptr && mem_table_sptr->get_mem_type() == mem) {
356
0
                mem_usage += mem_table_sptr->memory_usage();
357
0
            }
358
0
        }
359
0
    }
360
0
    return mem_usage;
361
0
}
362
363
0
int64_t MemTableWriter::active_memtable_mem_consumption() {
364
0
    std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
365
0
    return _mem_table != nullptr ? _mem_table->memory_usage() : 0;
366
0
}
367
368
} // namespace doris