Coverage Report

Created: 2025-03-13 18:46

/root/doris/be/src/olap/memtable_writer.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/Types_types.h>
21
#include <gen_cpp/internal_service.pb.h>
22
#include <gen_cpp/types.pb.h>
23
24
#include <atomic>
25
#include <cstdint>
26
#include <memory>
27
#include <mutex>
28
#include <shared_mutex>
29
#include <unordered_set>
30
#include <vector>
31
32
#include "common/status.h"
33
#include "olap/delta_writer_context.h"
34
#include "olap/memtable.h"
35
#include "olap/olap_common.h"
36
#include "olap/partial_update_info.h"
37
#include "olap/rowset/rowset.h"
38
#include "olap/tablet.h"
39
#include "olap/tablet_meta.h"
40
#include "olap/tablet_schema.h"
41
#include "util/spinlock.h"
42
#include "util/uid_util.h"
43
44
namespace doris {
45
46
class FlushToken;
47
class MemTable;
48
class StorageEngine;
49
class TupleDescriptor;
50
class SlotDescriptor;
51
class OlapTableSchemaParam;
52
class RowsetWriter;
53
struct FlushStatistic;
54
class WorkloadGroup;
55
56
namespace vectorized {
57
class Block;
58
} // namespace vectorized
59
60
// Writer for a particular (load, index, tablet).
61
// This class is NOT thread-safe, external synchronization is required.
62
class MemTableWriter {
63
public:
64
    MemTableWriter(const WriteRequest& req);
65
66
    ~MemTableWriter();
67
68
    Status init(std::shared_ptr<RowsetWriter> rowset_writer, TabletSchemaSPtr tablet_schema,
69
                std::shared_ptr<PartialUpdateInfo> partial_update_info,
70
                std::shared_ptr<WorkloadGroup> wg_sptr, bool unique_key_mow = false);
71
72
    Status write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs);
73
74
    // flush the last memtable to flush queue, must call it before close_wait()
75
    Status close();
76
    // wait for all memtables to be flushed, update profiles if provided.
77
    // mem_consumption() should be 0 after this function returns.
78
14
    Status close_wait(RuntimeProfile* profile = nullptr) {
79
14
        RETURN_IF_ERROR(_do_close_wait());
80
14
        if (profile != nullptr) {
81
14
            _update_profile(profile);
82
14
        }
83
14
        return Status::OK();
84
14
    }
85
86
    // abandon current memtable and wait for all pending-flushing memtables to be destructed.
87
    // mem_consumption() should be 0 after this function returns.
88
    Status cancel();
89
    Status cancel_with_status(const Status& st);
90
91
    int64_t mem_consumption(MemType mem);
92
    int64_t active_memtable_mem_consumption();
93
94
    // Submit current memtable to flush queue, and return without waiting.
95
    // This is currently for reducing mem consumption of this memtable writer.
96
    Status flush_async();
97
98
    // Wait all memtable in flush queue to be flushed
99
    Status wait_flush();
100
101
0
    int64_t tablet_id() const { return _req.tablet_id; }
102
103
0
    int64_t total_received_rows() const { return _total_received_rows; }
104
105
    const FlushStatistic& get_flush_token_stats();
106
107
    uint64_t flush_running_count() const;
108
109
0
    uint64_t workload_group_id() const {
110
0
        auto wg = _resource_ctx->workload_group();
111
0
        if (wg != nullptr) {
112
0
            return wg->id();
113
0
        }
114
0
        return 0;
115
0
    }
116
117
private:
118
    // push a full memtable to flush executor
119
    Status _flush_memtable_async();
120
121
    void _reset_mem_table();
122
123
    Status _do_close_wait();
124
    void _update_profile(RuntimeProfile* profile);
125
126
    std::atomic<bool> _is_init = false;
127
    bool _is_cancelled = false;
128
    bool _is_closed = false;
129
    Status _cancel_status;
130
    WriteRequest _req;
131
    std::shared_ptr<RowsetWriter> _rowset_writer;
132
    std::shared_ptr<MemTable> _mem_table;
133
    TabletSchemaSPtr _tablet_schema;
134
    bool _unique_key_mow = false;
135
136
    // This variable is accessed from writer thread and token flush thread
137
    // use a shared ptr to avoid use after free problem.
138
    std::shared_ptr<FlushToken> _flush_token;
139
    // Save the not active memtable that is in flush queue or under flushing.
140
    std::vector<std::weak_ptr<MemTable>> _freezed_mem_tables;
141
    // The lock to protect _memtable and _freezed_mem_tables structure to avoid concurrency modification or read
142
    SpinLock _mem_table_ptr_lock;
143
    std::shared_ptr<ResourceContext> _resource_ctx;
144
145
    std::mutex _lock;
146
147
    // total rows num written by MemTableWriter
148
    std::atomic<int64_t> _total_received_rows = 0;
149
    int64_t _wait_flush_time_ns = 0;
150
    int64_t _close_wait_time_ns = 0;
151
    int64_t _segment_num = 0;
152
153
    MonotonicStopWatch _lock_watch;
154
155
    std::shared_ptr<PartialUpdateInfo> _partial_update_info;
156
};
157
158
} // namespace doris