Coverage Report

Created: 2026-06-26 08:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/memtable/memtable.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <stddef.h>
21
#include <stdint.h>
22
23
#include <cstdint>
24
#include <cstring>
25
#include <functional>
26
#include <memory>
27
#include <vector>
28
29
#include "common/status.h"
30
#include "core/arena.h"
31
#include "core/block/block.h"
32
#include "core/custom_allocator.h"
33
#include "exprs/aggregate/aggregate_function.h"
34
#include "runtime/memory/mem_tracker.h"
35
#include "runtime/thread_context.h"
36
#include "storage/partial_update_info.h"
37
#include "storage/tablet/tablet_schema.h"
38
39
namespace doris {
40
41
class Schema;
42
class SlotDescriptor;
43
class TabletSchema;
44
struct TabletAddRowsPayload;
45
class TupleDescriptor;
46
enum KeysType : int;
47
48
// Active: the memtable is currently used by writer to insert into blocks
49
// Write_finished: the memtable finished write blocks and in the queue waiting for flush
50
// FLUSH: the memtable is under flushing, write segment to disk.
51
enum MemType { ACTIVE = 0, WRITE_FINISHED = 1, FLUSH = 2 };
52
53
// row pos in _input_mutable_block
54
struct RowInBlock {
55
    size_t _row_pos;
56
    int64_t _row_binlog_lsn = 0;
57
    char* _agg_mem = nullptr;
58
    size_t* _agg_state_offset = nullptr;
59
    bool _has_init_agg;
60
61
805k
    RowInBlock(size_t row) : _row_pos(row), _has_init_agg(false) {}
62
    RowInBlock(size_t row, int64_t row_binlog_lsn)
63
35.8M
            : _row_pos(row), _row_binlog_lsn(row_binlog_lsn), _has_init_agg(false) {}
64
65
8.41k
    void init_agg_places(char* agg_mem, size_t* agg_state_offset) {
66
8.41k
        _has_init_agg = true;
67
8.41k
        _agg_mem = agg_mem;
68
8.41k
        _agg_state_offset = agg_state_offset;
69
8.41k
    }
70
71
448k
    char* agg_places(size_t offset) const { return _agg_mem + _agg_state_offset[offset]; }
72
73
8.06M
    inline bool has_init_agg() const { return _has_init_agg; }
74
75
8.41k
    inline void remove_init_agg() { _has_init_agg = false; }
76
};
77
78
class Tie {
79
public:
80
    class Iter {
81
    public:
82
254k
        Iter(Tie& tie) : _tie(tie), _next(tie._begin + 1) {}
83
10.8M
        size_t left() const { return _left; }
84
52.9M
        size_t right() const { return _right; }
85
86
        // return false means no more ranges
87
3.97M
        bool next() {
88
3.97M
            if (_next >= _tie._end) {
89
195k
                return false;
90
195k
            }
91
3.77M
            _next = _find(1, _next);
92
3.77M
            if (_next >= _tie._end) {
93
59.2k
                return false;
94
59.2k
            }
95
3.71M
            _left = _next - 1;
96
3.71M
            _next = _find(0, _next);
97
3.71M
            _right = _next;
98
3.71M
            return true;
99
3.77M
        }
100
101
    private:
102
7.51M
        size_t _find(uint8_t value, size_t start) {
103
7.51M
            if (start >= _tie._end) {
104
0
                return start;
105
0
            }
106
7.51M
            size_t offset = start - _tie._begin;
107
7.51M
            size_t size = _tie._end - start;
108
7.51M
            void* p = std::memchr(_tie._bits.data() + offset, value, size);
109
7.51M
            if (p == nullptr) {
110
100k
                return _tie._end;
111
100k
            }
112
7.41M
            return static_cast<uint8_t*>(p) - _tie._bits.data() + _tie._begin;
113
7.51M
        }
114
115
    private:
116
        Tie& _tie;
117
        size_t _left;
118
        size_t _right;
119
        size_t _next;
120
    };
121
122
public:
123
75.0k
    Tie(size_t begin, size_t end) : _begin(begin), _end(end) {
124
75.0k
        _bits = std::vector<uint8_t>(_end - _begin, 1);
125
75.0k
    }
126
0
    uint8_t operator[](size_t i) const { return _bits[i - _begin]; }
127
49.5M
    uint8_t& operator[](size_t i) { return _bits[i - _begin]; }
128
254k
    Iter iter() { return Iter(*this); }
129
130
private:
131
    const size_t _begin;
132
    const size_t _end;
133
    std::vector<uint8_t> _bits;
134
};
135
136
class RowInBlockComparator {
137
public:
138
    RowInBlockComparator(std::shared_ptr<TabletSchema> tablet_schema)
139
204k
            : _tablet_schema(tablet_schema) {}
140
    // call set_block before operator().
141
    // only first time insert block to create _input_mutable_block,
142
    // so can not Comparator of construct to set pblock
143
149k
    void set_block(MutableBlock* pblock) { _pblock = pblock; }
144
    int operator()(const RowInBlock* left, const RowInBlock* right) const;
145
146
private:
147
    std::shared_ptr<TabletSchema> _tablet_schema;
148
    MutableBlock* _pblock = nullptr; //  corresponds to Memtable::_input_mutable_block
149
};
150
151
class MemTableStat {
152
public:
153
73.4k
    MemTableStat& operator+=(const MemTableStat& stat) {
154
73.4k
        raw_rows += stat.raw_rows;
155
73.4k
        merged_rows += stat.merged_rows;
156
73.4k
        sort_ns += stat.sort_ns;
157
73.4k
        agg_ns += stat.agg_ns;
158
73.4k
        put_into_output_ns += stat.put_into_output_ns;
159
73.4k
        duration_ns += stat.duration_ns;
160
73.4k
        sort_times += stat.sort_times;
161
73.4k
        agg_times += stat.agg_times;
162
163
73.4k
        return *this;
164
73.4k
    }
165
166
    std::atomic<int64_t> raw_rows = 0;
167
    std::atomic<int64_t> merged_rows = 0;
168
    int64_t sort_ns = 0;
169
    int64_t agg_ns = 0;
170
    int64_t put_into_output_ns = 0;
171
    int64_t duration_ns = 0;
172
    std::atomic<int64_t> sort_times = 0;
173
    std::atomic<int64_t> agg_times = 0;
174
};
175
176
class MemTable {
177
public:
178
    MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schema,
179
             const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
180
             bool enable_unique_key_mow, PartialUpdateInfo* partial_update_info,
181
             const std::shared_ptr<ResourceContext>& resource_ctx,
182
             bool need_row_binlog_lsn = false);
183
    ~MemTable();
184
185
73.5k
    int64_t tablet_id() const { return _tablet_id; }
186
3.49M
    size_t memory_usage() const { return _mem_tracker->consumption(); }
187
    size_t get_flush_reserve_memory_size() const;
188
    // insert tuple from (row_pos) to (row_pos+num_rows)
189
    Status insert(const Block* block, const TabletAddRowsPayload& rows);
190
191
    void shrink_memtable_by_agg();
192
193
    bool need_flush() const;
194
195
    bool need_agg() const;
196
197
    Status to_block(std::unique_ptr<Block>* res);
198
199
3
    const DorisVector<int64_t>& row_binlog_lsns() const { return _output_row_binlog_lsns; }
200
201
204k
    bool empty() const { return _input_mutable_block.rows() == 0; }
202
203
73.4k
    const MemTableStat& stat() { return _stat; }
204
205
147k
    std::shared_ptr<ResourceContext> resource_ctx() { return _resource_ctx; }
206
207
73.5k
    std::shared_ptr<MemTracker> mem_tracker() { return _mem_tracker; }
208
209
73.4k
    void set_flush_success() { _is_flush_success = true; }
210
211
1.28M
    MemType get_mem_type() { return _mem_type; }
212
213
278k
    void update_mem_type(MemType memtype) { _mem_type = memtype; }
214
215
134k
    int64_t raw_rows() { return _stat.raw_rows.load(); }
216
217
private:
218
    // for vectorized
219
    template <bool has_skip_bitmap_col>
220
    void _aggregate_two_row_in_block(MutableBlock& mutable_block, RowInBlock* new_row,
221
                                     RowInBlock* row_in_skiplist);
222
223
    // Merge row-binlog LSN sidecar only when MemTable merges two RowInBlock objects.
224
    // Table models that require complex merge semantics, such as AGG tables and unique key
225
    // merge-on-read tables, do not support row-binlog LSN now and are rejected in insert().
226
    void _merge_row_binlog_lsn(RowInBlock* src_row, RowInBlock* dst_row);
227
228
    void _append_output_row_binlog_lsn(RowInBlock* row);
229
230
    void _aggregate_two_row_with_sequence_map(MutableBlock& mutable_block, RowInBlock* new_row,
231
                                              RowInBlock* row_in_skiplist);
232
233
    // Used to wrapped by to_block to do exception handle logic
234
    Status _to_block(std::unique_ptr<Block>* res);
235
236
    int64_t _adaptive_write_buffer_size() const;
237
238
private:
239
    std::atomic<MemType> _mem_type;
240
    int64_t _tablet_id;
241
    bool _enable_unique_key_mow = false;
242
    bool _is_flush_success = false;
243
    UniqueKeyUpdateModePB _partial_update_mode {UniqueKeyUpdateModePB::UPSERT};
244
    const KeysType _keys_type;
245
    std::shared_ptr<TabletSchema> _tablet_schema;
246
247
    std::shared_ptr<RowInBlockComparator> _vec_row_comparator;
248
249
    std::shared_ptr<ResourceContext> _resource_ctx;
250
251
    std::shared_ptr<MemTracker> _mem_tracker;
252
    // Only the rows will be inserted into block can allocate memory from _arena.
253
    // In this way, we can make MemTable::memory_usage() to be more accurate, and eventually
254
    // reduce the number of segment files that are generated by current load
255
    Arena _arena;
256
    int64_t _load_mem_limit = -1;
257
258
    void _init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs,
259
                                            const TupleDescriptor* tuple_desc);
260
    std::vector<int> _column_offset;
261
262
    // Number of rows inserted to this memtable.
263
    // This is not the rows in this memtable, because rows may be merged
264
    // in unique or aggregate key model.
265
    MemTableStat _stat;
266
267
    //for vectorized
268
    MutableBlock _input_mutable_block;
269
    MutableBlock _output_mutable_block;
270
    DorisVector<int64_t> _output_row_binlog_lsns;
271
    bool _need_row_binlog_lsn = false;
272
    size_t _last_sorted_pos = 0;
273
    size_t _last_agg_pos = 0;
274
275
    //return number of same keys
276
    size_t _sort();
277
    Status _sort_by_cluster_keys();
278
    void _sort_one_column(DorisVector<std::shared_ptr<RowInBlock>>& row_in_blocks, Tie& tie,
279
                          std::function<int(RowInBlock*, RowInBlock*)> cmp);
280
    template <bool is_final>
281
    void _finalize_one_row(RowInBlock* row, MutableBlock& mutable_block, int row_pos);
282
    void _init_row_for_agg(RowInBlock* row, MutableBlock& mutable_block);
283
    void _clear_row_agg(RowInBlock* row);
284
285
    template <bool is_final, bool has_skip_bitmap_col = false>
286
    void _aggregate();
287
288
    template <bool is_final>
289
    void _aggregate_for_flexible_partial_update_without_seq_col(
290
            MutableBlock& mutable_block,
291
            DorisVector<std::shared_ptr<RowInBlock>>& temp_row_in_blocks);
292
293
    template <bool is_final>
294
    void _aggregate_for_flexible_partial_update_with_seq_col(
295
            MutableBlock& mutable_block,
296
            DorisVector<std::shared_ptr<RowInBlock>>& temp_row_in_blocks);
297
298
    Status _put_into_output(Block& in_block);
299
    bool _is_first_insertion;
300
301
    void _init_agg_functions(const Block* block);
302
    std::vector<AggregateFunctionPtr> _agg_functions;
303
    std::vector<size_t> _offsets_of_aggregate_states;
304
    size_t _total_size_of_aggregate_states;
305
    std::unique_ptr<DorisVector<std::shared_ptr<RowInBlock>>> _row_in_blocks;
306
307
    size_t _num_columns;
308
    int32_t _seq_col_idx_in_block {-1};
309
    int32_t _skip_bitmap_col_idx {-1};
310
    int32_t _delete_sign_col_idx {-1};
311
    int32_t _delete_sign_col_unique_id {-1};
312
    int32_t _seq_col_unique_id {-1};
313
314
    bool _is_partial_update_and_auto_inc = false;
315
}; // class MemTable
316
317
} // namespace doris