Coverage Report

Created: 2025-04-11 23:49

/root/doris/be/src/olap/memtable.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <stddef.h>
21
#include <stdint.h>
22
23
#include <cstring>
24
#include <functional>
25
#include <memory>
26
#include <vector>
27
28
#include "common/status.h"
29
#include "olap/partial_update_info.h"
30
#include "olap/tablet_schema.h"
31
#include "runtime/memory/mem_tracker.h"
32
#include "runtime/thread_context.h"
33
#include "vec/aggregate_functions/aggregate_function.h"
34
#include "vec/common/arena.h"
35
#include "vec/common/custom_allocator.h"
36
#include "vec/core/block.h"
37
38
namespace doris {
39
40
class Schema;
41
class SlotDescriptor;
42
class TabletSchema;
43
class TupleDescriptor;
44
enum KeysType : int;
45
46
// Active: the memtable is currently used by writer to insert into blocks
47
// Write_finished: the memtable finished write blocks and in the queue waiting for flush
48
// FLUSH: the memtable is under flushing, write segment to disk.
49
enum MemType { ACTIVE = 0, WRITE_FINISHED = 1, FLUSH = 2 };
50
51
// row pos in _input_mutable_block
52
struct RowInBlock {
53
    size_t _row_pos;
54
    char* _agg_mem = nullptr;
55
    size_t* _agg_state_offset = nullptr;
56
    bool _has_init_agg;
57
58
24
    RowInBlock(size_t row) : _row_pos(row), _has_init_agg(false) {}
59
60
3
    void init_agg_places(char* agg_mem, size_t* agg_state_offset) {
61
3
        _has_init_agg = true;
62
3
        _agg_mem = agg_mem;
63
3
        _agg_state_offset = agg_state_offset;
64
3
    }
65
66
18
    char* agg_places(size_t offset) const { return _agg_mem + _agg_state_offset[offset]; }
67
68
29
    inline bool has_init_agg() const { return _has_init_agg; }
69
70
3
    inline void remove_init_agg() { _has_init_agg = false; }
71
};
72
73
class Tie {
74
public:
75
    class Iter {
76
    public:
77
53
        Iter(Tie& tie) : _tie(tie), _next(tie._begin + 1) {}
78
41
        size_t left() const { return _left; }
79
48
        size_t right() const { return _right; }
80
81
        // return false means no more ranges
82
72
        bool next() {
83
72
            if (_next >= _tie._end) {
84
45
                return false;
85
45
            }
86
27
            _next = _find(1, _next);
87
27
            if (_next >= _tie._end) {
88
9
                return false;
89
9
            }
90
18
            _left = _next - 1;
91
18
            _next = _find(0, _next);
92
18
            _right = _next;
93
18
            return true;
94
27
        }
95
96
    private:
97
45
        size_t _find(uint8_t value, size_t start) {
98
45
            if (start >= _tie._end) {
99
0
                return start;
100
0
            }
101
45
            size_t offset = start - _tie._begin;
102
45
            size_t size = _tie._end - start;
103
45
            void* p = std::memchr(_tie._bits.data() + offset, value, size);
104
45
            if (p == nullptr) {
105
23
                return _tie._end;
106
23
            }
107
22
            return static_cast<uint8_t*>(p) - _tie._bits.data() + _tie._begin;
108
45
        }
109
110
    private:
111
        Tie& _tie;
112
        size_t _left;
113
        size_t _right;
114
        size_t _next;
115
    };
116
117
public:
118
16
    Tie(size_t begin, size_t end) : _begin(begin), _end(end) {
119
16
        _bits = std::vector<uint8_t>(_end - _begin, 1);
120
16
    }
121
0
    uint8_t operator[](int i) const { return _bits[i - _begin]; }
122
72
    uint8_t& operator[](int i) { return _bits[i - _begin]; }
123
53
    Iter iter() { return Iter(*this); }
124
125
private:
126
    const size_t _begin;
127
    const size_t _end;
128
    std::vector<uint8_t> _bits;
129
};
130
131
class RowInBlockComparator {
132
public:
133
    RowInBlockComparator(std::shared_ptr<TabletSchema> tablet_schema)
134
15
            : _tablet_schema(tablet_schema) {}
135
    // call set_block before operator().
136
    // only first time insert block to create _input_mutable_block,
137
    // so can not Comparator of construct to set pblock
138
27
    void set_block(vectorized::MutableBlock* pblock) { _pblock = pblock; }
139
    int operator()(const RowInBlock* left, const RowInBlock* right) const;
140
141
private:
142
    std::shared_ptr<TabletSchema> _tablet_schema;
143
    vectorized::MutableBlock* _pblock = nullptr; //  corresponds to Memtable::_input_mutable_block
144
};
145
146
class MemTableStat {
147
public:
148
12
    MemTableStat& operator+=(const MemTableStat& stat) {
149
12
        raw_rows += stat.raw_rows;
150
12
        merged_rows += stat.merged_rows;
151
12
        sort_ns += stat.sort_ns;
152
12
        agg_ns += stat.agg_ns;
153
12
        put_into_output_ns += stat.put_into_output_ns;
154
12
        duration_ns += stat.duration_ns;
155
12
        sort_times += stat.sort_times;
156
12
        agg_times += stat.agg_times;
157
158
12
        return *this;
159
12
    }
160
161
    std::atomic<int64_t> raw_rows = 0;
162
    std::atomic<int64_t> merged_rows = 0;
163
    int64_t sort_ns = 0;
164
    int64_t agg_ns = 0;
165
    int64_t put_into_output_ns = 0;
166
    int64_t duration_ns = 0;
167
    std::atomic<int64_t> sort_times = 0;
168
    std::atomic<int64_t> agg_times = 0;
169
};
170
171
class MemTable {
172
public:
173
    MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schema,
174
             const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
175
             bool enable_unique_key_mow, PartialUpdateInfo* partial_update_info);
176
    ~MemTable();
177
178
12
    int64_t tablet_id() const { return _tablet_id; }
179
49
    size_t memory_usage() const { return _mem_tracker->consumption(); }
180
    size_t get_flush_reserve_memory_size() const;
181
    // insert tuple from (row_pos) to (row_pos+num_rows)
182
    Status insert(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs);
183
184
    void shrink_memtable_by_agg();
185
186
    bool need_flush() const;
187
188
    bool need_agg() const;
189
190
    Status to_block(std::unique_ptr<vectorized::Block>* res);
191
192
15
    bool empty() const { return _input_mutable_block.rows() == 0; }
193
194
12
    const MemTableStat& stat() { return _stat; }
195
196
0
    std::shared_ptr<ResourceContext> resource_ctx() { return _resource_ctx; }
197
198
0
    std::shared_ptr<MemTracker> mem_tracker() { return _mem_tracker; }
199
200
12
    void set_flush_success() { _is_flush_success = true; }
201
202
0
    MemType get_mem_type() { return _mem_type; }
203
204
27
    void update_mem_type(MemType memtype) { _mem_type = memtype; }
205
206
private:
207
    // for vectorized
208
    template <bool has_skip_bitmap_col>
209
    void _aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, RowInBlock* new_row,
210
                                     RowInBlock* row_in_skiplist);
211
212
    // Used to wrapped by to_block to do exception handle logic
213
    Status _to_block(std::unique_ptr<vectorized::Block>* res);
214
215
private:
216
    std::atomic<MemType> _mem_type;
217
    int64_t _tablet_id;
218
    bool _enable_unique_key_mow = false;
219
    bool _is_flush_success = false;
220
    UniqueKeyUpdateModePB _partial_update_mode {UniqueKeyUpdateModePB::UPSERT};
221
    const KeysType _keys_type;
222
    std::shared_ptr<TabletSchema> _tablet_schema;
223
224
    std::shared_ptr<RowInBlockComparator> _vec_row_comparator;
225
226
    std::shared_ptr<ResourceContext> _resource_ctx;
227
228
    std::shared_ptr<MemTracker> _mem_tracker;
229
    // Only the rows will be inserted into block can allocate memory from _arena.
230
    // In this way, we can make MemTable::memory_usage() to be more accurate, and eventually
231
    // reduce the number of segment files that are generated by current load
232
    std::unique_ptr<vectorized::Arena> _arena;
233
234
    void _init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs,
235
                                            const TupleDescriptor* tuple_desc);
236
    std::vector<int> _column_offset;
237
238
    // Number of rows inserted to this memtable.
239
    // This is not the rows in this memtable, because rows may be merged
240
    // in unique or aggregate key model.
241
    MemTableStat _stat;
242
243
    //for vectorized
244
    vectorized::MutableBlock _input_mutable_block;
245
    vectorized::MutableBlock _output_mutable_block;
246
    size_t _last_sorted_pos = 0;
247
248
    //return number of same keys
249
    size_t _sort();
250
    Status _sort_by_cluster_keys();
251
    void _sort_one_column(DorisVector<RowInBlock*>& row_in_blocks, Tie& tie,
252
                          std::function<int(const RowInBlock*, const RowInBlock*)> cmp);
253
    template <bool is_final>
254
    void _finalize_one_row(RowInBlock* row, const vectorized::ColumnsWithTypeAndName& block_data,
255
                           int row_pos);
256
    template <bool is_final, bool has_skip_bitmap_col = false>
257
    void _aggregate();
258
    Status _put_into_output(vectorized::Block& in_block);
259
    bool _is_first_insertion;
260
261
    void _init_agg_functions(const vectorized::Block* block);
262
    std::vector<vectorized::AggregateFunctionPtr> _agg_functions;
263
    std::vector<size_t> _offsets_of_aggregate_states;
264
    size_t _total_size_of_aggregate_states;
265
    std::unique_ptr<DorisVector<RowInBlock*>> _row_in_blocks;
266
267
    size_t _num_columns;
268
    int32_t _seq_col_idx_in_block = -1;
269
    int32_t _skip_bitmap_col_idx {-1};
270
    int32_t _seq_col_unique_id {-1};
271
272
    bool _is_partial_update_and_auto_inc = false;
273
}; // class MemTable
274
275
} // namespace doris