Coverage Report

Created: 2024-11-21 13:02

/root/doris/be/src/olap/memtable.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <stddef.h>
21
#include <stdint.h>
22
23
#include <cstring>
24
#include <functional>
25
#include <memory>
26
#include <optional>
27
#include <ostream>
28
#include <vector>
29
30
#include "common/object_pool.h"
31
#include "common/status.h"
32
#include "gutil/integral_types.h"
33
#include "olap/olap_common.h"
34
#include "olap/partial_update_info.h"
35
#include "olap/tablet_schema.h"
36
#include "runtime/memory/mem_tracker.h"
37
#include "runtime/thread_context.h"
38
#include "vec/aggregate_functions/aggregate_function.h"
39
#include "vec/common/arena.h"
40
#include "vec/core/block.h"
41
42
namespace doris {
43
44
class Schema;
45
class SlotDescriptor;
46
class TabletSchema;
47
class TupleDescriptor;
48
enum KeysType : int;
49
50
// row pos in _input_mutable_block
51
struct RowInBlock {
52
    size_t _row_pos;
53
    char* _agg_mem = nullptr;
54
    size_t* _agg_state_offset = nullptr;
55
    bool _has_init_agg;
56
57
12
    RowInBlock(size_t row) : _row_pos(row), _has_init_agg(false) {}
58
59
2
    void init_agg_places(char* agg_mem, size_t* agg_state_offset) {
60
2
        _has_init_agg = true;
61
2
        _agg_mem = agg_mem;
62
2
        _agg_state_offset = agg_state_offset;
63
2
    }
64
65
12
    char* agg_places(size_t offset) const { return _agg_mem + _agg_state_offset[offset]; }
66
67
16
    inline bool has_init_agg() const { return _has_init_agg; }
68
69
2
    inline void remove_init_agg() { _has_init_agg = false; }
70
};
71
72
class Tie {
73
public:
74
    class Iter {
75
    public:
76
41
        Iter(Tie& tie) : _tie(tie), _next(tie._begin + 1) {}
77
27
        size_t left() const { return _left; }
78
27
        size_t right() const { return _right; }
79
80
        // return false means no more ranges
81
55
        bool next() {
82
55
            if (_next >= _tie._end) {
83
35
                return false;
84
35
            }
85
20
            _next = _find(1, _next);
86
20
            if (_next >= _tie._end) {
87
7
                return false;
88
7
            }
89
13
            _left = _next - 1;
90
13
            _next = _find(0, _next);
91
13
            _right = _next;
92
13
            return true;
93
20
        }
94
95
    private:
96
33
        size_t _find(uint8_t value, size_t start) {
97
33
            if (start >= _tie._end) {
98
0
                return start;
99
0
            }
100
33
            size_t offset = start - _tie._begin;
101
33
            size_t size = _tie._end - start;
102
33
            void* p = std::memchr(_tie._bits.data() + offset, value, size);
103
33
            if (p == nullptr) {
104
17
                return _tie._end;
105
17
            }
106
16
            return static_cast<uint8_t*>(p) - _tie._bits.data() + _tie._begin;
107
33
        }
108
109
    private:
110
        Tie& _tie;
111
        size_t _left;
112
        size_t _right;
113
        size_t _next;
114
    };
115
116
public:
117
11
    Tie(size_t begin, size_t end) : _begin(begin), _end(end) {
118
11
        _bits = std::vector<uint8_t>(_end - _begin, 1);
119
11
    }
120
0
    uint8_t operator[](int i) const { return _bits[i - _begin]; }
121
57
    uint8_t& operator[](int i) { return _bits[i - _begin]; }
122
41
    Iter iter() { return Iter(*this); }
123
124
private:
125
    const size_t _begin;
126
    const size_t _end;
127
    std::vector<uint8_t> _bits;
128
};
129
130
class RowInBlockComparator {
131
public:
132
    RowInBlockComparator(std::shared_ptr<TabletSchema> tablet_schema)
133
11
            : _tablet_schema(tablet_schema) {}
134
    // call set_block before operator().
135
    // only first time insert block to create _input_mutable_block,
136
    // so can not Comparator of construct to set pblock
137
18
    void set_block(vectorized::MutableBlock* pblock) { _pblock = pblock; }
138
    int operator()(const RowInBlock* left, const RowInBlock* right) const;
139
140
private:
141
    std::shared_ptr<TabletSchema> _tablet_schema;
142
    vectorized::MutableBlock* _pblock = nullptr; //  corresponds to Memtable::_input_mutable_block
143
};
144
145
class MemTableStat {
146
public:
147
8
    MemTableStat& operator+=(const MemTableStat& stat) {
148
8
        raw_rows += stat.raw_rows;
149
8
        merged_rows += stat.merged_rows;
150
8
        sort_ns += stat.sort_ns;
151
8
        agg_ns += stat.agg_ns;
152
8
        put_into_output_ns += stat.put_into_output_ns;
153
8
        duration_ns += stat.duration_ns;
154
8
        sort_times += stat.sort_times;
155
8
        agg_times += stat.agg_times;
156
157
8
        return *this;
158
8
    }
159
160
    std::atomic<int64_t> raw_rows = 0;
161
    std::atomic<int64_t> merged_rows = 0;
162
    int64_t sort_ns = 0;
163
    int64_t agg_ns = 0;
164
    int64_t put_into_output_ns = 0;
165
    int64_t duration_ns = 0;
166
    std::atomic<int64_t> sort_times = 0;
167
    std::atomic<int64_t> agg_times = 0;
168
};
169
170
class MemTable {
171
public:
172
    MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schema,
173
             const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
174
             bool enable_unique_key_mow, PartialUpdateInfo* partial_update_info,
175
             const std::shared_ptr<MemTracker>& insert_mem_tracker,
176
             const std::shared_ptr<MemTracker>& flush_mem_tracker);
177
    ~MemTable();
178
179
8
    int64_t tablet_id() const { return _tablet_id; }
180
30
    size_t memory_usage() const {
181
30
        return _insert_mem_tracker->consumption() + _arena->used_size() +
182
30
               _flush_mem_tracker->consumption();
183
30
    }
184
    // insert tuple from (row_pos) to (row_pos+num_rows)
185
    Status insert(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs);
186
187
    void shrink_memtable_by_agg();
188
189
    bool need_flush() const;
190
191
    bool need_agg() const;
192
193
    Status to_block(std::unique_ptr<vectorized::Block>* res);
194
195
11
    bool empty() const { return _input_mutable_block.rows() == 0; }
196
197
8
    const MemTableStat& stat() { return _stat; }
198
199
8
    std::shared_ptr<MemTracker> flush_mem_tracker() { return _flush_mem_tracker; }
200
201
0
    QueryThreadContext query_thread_context() { return _query_thread_context; }
202
203
private:
204
    // for vectorized
205
    void _aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, RowInBlock* new_row,
206
                                     RowInBlock* row_in_skiplist);
207
208
    // Used to wrapped by to_block to do exception handle logic
209
    Status _to_block(std::unique_ptr<vectorized::Block>* res);
210
211
private:
212
    int64_t _tablet_id;
213
    bool _enable_unique_key_mow = false;
214
    bool _is_partial_update = false;
215
    const KeysType _keys_type;
216
    std::shared_ptr<TabletSchema> _tablet_schema;
217
218
    std::shared_ptr<RowInBlockComparator> _vec_row_comparator;
219
220
    QueryThreadContext _query_thread_context;
221
222
    // `_insert_manual_mem_tracker` manually records the memory value of memtable insert()
223
    // `_flush_hook_mem_tracker` automatically records the memory value of memtable flush() through mem hook.
224
    // Is used to flush when _insert_manual_mem_tracker larger than write_buffer_size and run flush memtable
225
    // when the sum of all memtable (_insert_manual_mem_tracker + _flush_hook_mem_tracker) exceeds the limit.
226
    std::shared_ptr<MemTracker> _insert_mem_tracker;
227
    std::shared_ptr<MemTracker> _flush_mem_tracker;
228
    // Only the rows will be inserted into block can allocate memory from _arena.
229
    // In this way, we can make MemTable::memory_usage() to be more accurate, and eventually
230
    // reduce the number of segment files that are generated by current load
231
    std::unique_ptr<vectorized::Arena> _arena;
232
    // The object buffer pool for convert tuple to row
233
    ObjectPool _agg_buffer_pool;
234
235
    void _init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs,
236
                                            const TupleDescriptor* tuple_desc);
237
    std::vector<int> _column_offset;
238
239
    // Number of rows inserted to this memtable.
240
    // This is not the rows in this memtable, because rows may be merged
241
    // in unique or aggregate key model.
242
    MemTableStat _stat;
243
244
    //for vectorized
245
    vectorized::MutableBlock _input_mutable_block;
246
    vectorized::MutableBlock _output_mutable_block;
247
    size_t _last_sorted_pos = 0;
248
249
    //return number of same keys
250
    size_t _sort();
251
    Status _sort_by_cluster_keys();
252
    void _sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie& tie,
253
                          std::function<int(const RowInBlock*, const RowInBlock*)> cmp);
254
    template <bool is_final>
255
    void _finalize_one_row(RowInBlock* row, const vectorized::ColumnsWithTypeAndName& block_data,
256
                           int row_pos);
257
    template <bool is_final>
258
    void _aggregate();
259
    Status _put_into_output(vectorized::Block& in_block);
260
    bool _is_first_insertion;
261
262
    void _init_agg_functions(const vectorized::Block* block);
263
    std::vector<vectorized::AggregateFunctionPtr> _agg_functions;
264
    std::vector<size_t> _offsets_of_aggregate_states;
265
    size_t _total_size_of_aggregate_states;
266
    std::vector<RowInBlock*> _row_in_blocks;
267
    // Memory usage without _arena.
268
    size_t _mem_usage;
269
270
    size_t _num_columns;
271
    int32_t _seq_col_idx_in_block = -1;
272
273
    bool _is_partial_update_and_auto_inc = false;
274
}; // class MemTable
275
276
} // namespace doris