Coverage Report

Created: 2024-11-21 14:31

/root/doris/be/src/exec/tablet_info.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <butil/fast_rand.h>
21
#include <gen_cpp/Descriptors_types.h>
22
#include <gen_cpp/descriptors.pb.h>
23
24
#include <cstdint>
25
#include <functional>
26
#include <iterator>
27
#include <map>
28
#include <memory>
29
#include <string>
30
#include <tuple>
31
#include <unordered_map>
32
#include <utility>
33
#include <vector>
34
35
#include "common/logging.h"
36
#include "common/object_pool.h"
37
#include "common/status.h"
38
#include "runtime/descriptors.h"
39
#include "runtime/raw_value.h"
40
#include "vec/columns/column.h"
41
#include "vec/core/block.h"
42
#include "vec/core/column_with_type_and_name.h"
43
#include "vec/exprs/vexpr.h"
44
#include "vec/exprs/vexpr_fwd.h"
45
46
namespace doris {
47
class MemTracker;
48
class SlotDescriptor;
49
class TExprNode;
50
class TabletColumn;
51
class TabletIndex;
52
class TupleDescriptor;
53
54
struct OlapTableIndexSchema {
55
    int64_t index_id;
56
    std::vector<SlotDescriptor*> slots;
57
    int32_t schema_hash;
58
    std::vector<TabletColumn*> columns;
59
    std::vector<TabletIndex*> indexes;
60
    vectorized::VExprContextSPtr where_clause;
61
62
    void to_protobuf(POlapTableIndexSchema* pindex) const;
63
};
64
65
class OlapTableSchemaParam {
66
public:
67
45
    OlapTableSchemaParam() = default;
68
45
    ~OlapTableSchemaParam() noexcept = default;
69
70
    Status init(const TOlapTableSchemaParam& tschema);
71
    Status init(const POlapTableSchemaParam& pschema);
72
73
29
    int64_t db_id() const { return _db_id; }
74
29
    int64_t table_id() const { return _table_id; }
75
0
    int64_t version() const { return _version; }
76
77
10
    TupleDescriptor* tuple_desc() const { return _tuple_desc; }
78
96
    const std::vector<OlapTableIndexSchema*>& indexes() const { return _indexes; }
79
80
    void to_protobuf(POlapTableSchemaParam* pschema) const;
81
82
    // NOTE: this function is not thread-safe.
83
31
    POlapTableSchemaParam* to_protobuf() const {
84
31
        if (_proto_schema == nullptr) {
85
21
            _proto_schema = _obj_pool.add(new POlapTableSchemaParam());
86
21
            to_protobuf(_proto_schema);
87
21
        }
88
31
        return _proto_schema;
89
31
    }
90
91
53
    bool is_partial_update() const { return _is_partial_update; }
92
24
    std::set<std::string> partial_update_input_columns() const {
93
24
        return _partial_update_input_columns;
94
24
    }
95
24
    std::string auto_increment_coulumn() const { return _auto_increment_column; }
96
5
    int32_t auto_increment_column_unique_id() const { return _auto_increment_column_unique_id; }
97
5
    void set_timestamp_ms(int64_t timestamp_ms) { _timestamp_ms = timestamp_ms; }
98
24
    int64_t timestamp_ms() const { return _timestamp_ms; }
99
5
    void set_nano_seconds(int32_t nano_seconds) { _nano_seconds = nano_seconds; }
100
24
    int32_t nano_seconds() const { return _nano_seconds; }
101
5
    void set_timezone(std::string timezone) { _timezone = timezone; }
102
24
    std::string timezone() const { return _timezone; }
103
28
    bool is_strict_mode() const { return _is_strict_mode; }
104
    std::string debug_string() const;
105
106
private:
107
    int64_t _db_id;
108
    int64_t _table_id;
109
    int64_t _version;
110
111
    TupleDescriptor* _tuple_desc = nullptr;
112
    mutable POlapTableSchemaParam* _proto_schema = nullptr;
113
    std::vector<OlapTableIndexSchema*> _indexes;
114
    mutable ObjectPool _obj_pool;
115
    bool _is_partial_update = false;
116
    std::set<std::string> _partial_update_input_columns;
117
    bool _is_strict_mode = false;
118
    std::string _auto_increment_column;
119
    int32_t _auto_increment_column_unique_id;
120
    int64_t _timestamp_ms = 0;
121
    int32_t _nano_seconds {0};
122
    std::string _timezone;
123
};
124
125
using OlapTableIndexTablets = TOlapTableIndexTablets;
126
// struct TOlapTableIndexTablets {
127
//     1: required i64 index_id
128
//     2: required list<i64> tablets
129
// }
130
131
using BlockRow = std::pair<vectorized::Block*, int32_t>;
132
using BlockRowWithIndicator =
133
        std::tuple<vectorized::Block*, int32_t, bool>; // [block, row, is_transformed]
134
135
struct VOlapTablePartition {
136
    int64_t id = 0;
137
    BlockRow start_key;
138
    BlockRow end_key;
139
    std::vector<BlockRow> in_keys;
140
    int64_t num_buckets = 0;
141
    std::vector<OlapTableIndexTablets> indexes;
142
    bool is_mutable;
143
    // -1 indicates partition with hash distribution
144
    int64_t load_tablet_idx = -1;
145
146
    VOlapTablePartition(vectorized::Block* partition_block)
147
            // the default value of partition bound is -1.
148
5
            : start_key {partition_block, -1}, end_key {partition_block, -1} {}
149
};
150
151
// this is only used by tablet_sink. so we can assume it's inited by its' descriptor.
152
class VOlapTablePartKeyComparator {
153
public:
154
    VOlapTablePartKeyComparator(const std::vector<uint16_t>& slot_locs,
155
                                const std::vector<uint16_t>& params_locs)
156
20
            : _slot_locs(slot_locs), _param_locs(params_locs) {}
157
158
    // return true if lhs < rhs
159
    // 'row' is -1 mean maximal boundary
160
    bool operator()(const BlockRowWithIndicator& lhs, const BlockRowWithIndicator& rhs) const;
161
162
private:
163
    const std::vector<uint16_t>& _slot_locs;
164
    const std::vector<uint16_t>& _param_locs;
165
};
166
167
// store an olap table's tablet information
168
class VOlapTablePartitionParam {
169
public:
170
    VOlapTablePartitionParam(std::shared_ptr<OlapTableSchemaParam>& schema,
171
                             const TOlapTablePartitionParam& param);
172
173
    ~VOlapTablePartitionParam();
174
175
    Status init();
176
177
0
    int64_t db_id() const { return _t_param.db_id; }
178
0
    int64_t table_id() const { return _t_param.table_id; }
179
0
    int64_t version() const { return _t_param.version; }
180
181
    // return true if we found this block_row in partition
182
    ALWAYS_INLINE bool find_partition(vectorized::Block* block, int row,
183
15
                                      VOlapTablePartition*& partition) const {
184
15
        auto it = _is_in_partition ? _partitions_map->find(std::tuple {block, row, true})
185
15
                                   : _partitions_map->upper_bound(std::tuple {block, row, true});
186
15
        VLOG_TRACE << "find row " << row << " of\n"
187
0
                   << block->dump_data() << "in:\n"
188
0
                   << _partition_block.dump_data() << "result line row: " << std::get<1>(it->first);
189
190
        // for list partition it might result in default partition
191
15
        if (_is_in_partition) {
192
0
            partition = (it != _partitions_map->end()) ? it->second : _default_partition;
193
0
            it = _partitions_map->end();
194
0
        }
195
15
        if (it != _partitions_map->end() &&
196
15
            _part_contains(it->second, std::tuple {block, row, true})) {
197
15
            partition = it->second;
198
15
        }
199
15
        return (partition != nullptr);
200
15
    }
201
202
    ALWAYS_INLINE void find_tablets(
203
            vectorized::Block* block, const std::vector<uint32_t>& indexes,
204
            const std::vector<VOlapTablePartition*>& partitions,
205
            std::vector<uint32_t>& tablet_indexes /*result*/,
206
            /*TODO: check if flat hash map will be better*/
207
7
            std::map<VOlapTablePartition*, int64_t>* partition_tablets_buffer = nullptr) const {
208
7
        std::function<uint32_t(vectorized::Block*, uint32_t, const VOlapTablePartition&)>
209
7
                compute_function;
210
7
        if (!_distributed_slot_locs.empty()) {
211
            //TODO: refactor by saving the hash values. then we can calculate in columnwise.
212
7
            compute_function = [this](vectorized::Block* block, uint32_t row,
213
15
                                      const VOlapTablePartition& partition) -> uint32_t {
214
15
                uint32_t hash_val = 0;
215
27
                for (unsigned short _distributed_slot_loc : _distributed_slot_locs) {
216
27
                    auto* slot_desc = _slots[_distributed_slot_loc];
217
27
                    auto& column = block->get_by_position(_distributed_slot_loc).column;
218
27
                    auto val = column->get_data_at(row);
219
27
                    if (val.data != nullptr) {
220
27
                        hash_val = RawValue::zlib_crc32(val.data, val.size, slot_desc->type().type,
221
27
                                                        hash_val);
222
27
                    } else {
223
0
                        hash_val = HashUtil::zlib_crc_hash_null(hash_val);
224
0
                    }
225
27
                }
226
15
                return hash_val % partition.num_buckets;
227
15
            };
228
7
        } else { // random distribution
229
0
            compute_function = [](vectorized::Block* block, uint32_t row,
230
0
                                  const VOlapTablePartition& partition) -> uint32_t {
231
0
                if (partition.load_tablet_idx == -1) {
232
                    // for compatible with old version, just do random
233
0
                    return butil::fast_rand() % partition.num_buckets;
234
0
                }
235
0
                return partition.load_tablet_idx % partition.num_buckets;
236
0
            };
237
0
        }
238
239
7
        if (partition_tablets_buffer == nullptr) {
240
15
            for (auto index : indexes) {
241
15
                tablet_indexes[index] = compute_function(block, index, *partitions[index]);
242
15
            }
243
7
        } else { // use buffer
244
0
            for (auto index : indexes) {
245
0
                auto* partition = partitions[index];
246
0
                if (auto it = partition_tablets_buffer->find(partition);
247
0
                    it != partition_tablets_buffer->end()) {
248
0
                    tablet_indexes[index] = it->second; // tablet
249
0
                } else {
250
                    // compute and save in buffer
251
0
                    (*partition_tablets_buffer)[partition] = tablet_indexes[index] =
252
0
                            compute_function(block, index, *partitions[index]);
253
0
                }
254
0
            }
255
0
        }
256
7
    }
257
258
5
    const std::vector<VOlapTablePartition*>& get_partitions() const { return _partitions; }
259
260
    // it's same with auto now because we only support transformed partition in auto partition. may expand in future
261
7
    bool is_projection_partition() const { return _is_auto_partition; }
262
17
    bool is_auto_partition() const { return _is_auto_partition; }
263
264
7
    bool is_auto_detect_overwrite() const { return _is_auto_detect_overwrite; }
265
0
    int64_t get_overwrite_group_id() const { return _overwrite_group_id; }
266
267
0
    std::vector<uint16_t> get_partition_keys() const { return _partition_slot_locs; }
268
269
    Status add_partitions(const std::vector<TOlapTablePartition>& partitions);
270
    // no need to del/reinsert partition keys, but change the link. reset the _partitions items
271
    Status replace_partitions(std::vector<int64_t>& old_partition_ids,
272
                              const std::vector<TOlapTablePartition>& new_partitions);
273
274
7
    vectorized::VExprContextSPtrs get_part_func_ctx() { return _part_func_ctx; }
275
7
    vectorized::VExprSPtrs get_partition_function() { return _partition_function; }
276
277
    // which will affect _partition_block
278
    Status generate_partition_from(const TOlapTablePartition& t_part,
279
                                   VOlapTablePartition*& part_result);
280
281
0
    void set_transformed_slots(const std::vector<uint16_t>& new_slots) {
282
0
        _transformed_slot_locs = new_slots;
283
0
    }
284
285
private:
286
    Status _create_partition_keys(const std::vector<TExprNode>& t_exprs, BlockRow* part_key);
287
288
    // check if this partition contain this key
289
    bool _part_contains(VOlapTablePartition* part, BlockRowWithIndicator key) const;
290
291
    // this partition only valid in this schema
292
    std::shared_ptr<OlapTableSchemaParam> _schema;
293
    TOlapTablePartitionParam _t_param;
294
295
    const std::vector<SlotDescriptor*>& _slots;
296
    std::vector<uint16_t> _partition_slot_locs;
297
    std::vector<uint16_t> _transformed_slot_locs;
298
    std::vector<uint16_t> _distributed_slot_locs;
299
300
    ObjectPool _obj_pool;
301
    vectorized::Block _partition_block;
302
    std::unique_ptr<MemTracker> _mem_tracker;
303
    std::vector<VOlapTablePartition*> _partitions;
304
    // For all partition value rows saved in this map, indicator is false. whenever we use a value to find in it, the param is true.
305
    // so that we can distinguish which column index to use (origin slots or transformed slots).
306
    // For range partition we ONLY SAVE RIGHT ENDS. when we find a part's RIGHT by a value, check if part's left cover it then.
307
    std::unique_ptr<
308
            std::map<BlockRowWithIndicator, VOlapTablePartition*, VOlapTablePartKeyComparator>>
309
            _partitions_map;
310
311
    bool _is_in_partition = false;
312
    uint32_t _mem_usage = 0;
313
    // only works when using list partition, the resource is owned by _partitions
314
    VOlapTablePartition* _default_partition = nullptr;
315
316
    bool _is_auto_partition = false;
317
    vectorized::VExprContextSPtrs _part_func_ctx = {nullptr};
318
    vectorized::VExprSPtrs _partition_function = {nullptr};
319
    TPartitionType::type _part_type; // support list or range
320
    // "insert overwrite partition(*)", detect which partitions by BE
321
    bool _is_auto_detect_overwrite = false;
322
    int64_t _overwrite_group_id = 0;
323
};
324
325
// indicate where's the tablet and all its replications (node-wise)
326
using TabletLocation = TTabletLocation;
327
// struct TTabletLocation {
328
//     1: required i64 tablet_id
329
//     2: required list<i64> node_ids
330
// }
331
332
class OlapTableLocationParam {
333
public:
334
5
    OlapTableLocationParam(const TOlapTableLocationParam& t_param) : _t_param(t_param) {
335
10
        for (auto& location : _t_param.tablets) {
336
10
            _tablets.emplace(location.tablet_id, &location);
337
10
        }
338
5
    }
339
340
0
    int64_t db_id() const { return _t_param.db_id; }
341
0
    int64_t table_id() const { return _t_param.table_id; }
342
0
    int64_t version() const { return _t_param.version; }
343
344
10
    TabletLocation* find_tablet(int64_t tablet_id) const {
345
10
        auto it = _tablets.find(tablet_id);
346
10
        if (it != std::end(_tablets)) {
347
10
            return it->second;
348
10
        }
349
0
        return nullptr;
350
10
    }
351
352
0
    void add_locations(std::vector<TTabletLocation>& locations) {
353
0
        for (auto& location : locations) {
354
0
            if (_tablets.find(location.tablet_id) == _tablets.end()) {
355
0
                _tablets[location.tablet_id] = &location;
356
0
            }
357
0
        }
358
0
    }
359
360
private:
361
    TOlapTableLocationParam _t_param;
362
    // [tablet_id, tablet]. tablet has id, also.
363
    std::unordered_map<int64_t, TabletLocation*> _tablets;
364
};
365
366
struct NodeInfo {
367
    int64_t id;
368
    int64_t option;
369
    std::string host;
370
    int32_t brpc_port;
371
372
15
    NodeInfo() = default;
373
374
    NodeInfo(const TNodeInfo& tnode)
375
            : id(tnode.id),
376
              option(tnode.option),
377
              host(tnode.host),
378
15
              brpc_port(tnode.async_internal_port) {}
379
};
380
381
class DorisNodesInfo {
382
public:
383
0
    DorisNodesInfo() = default;
384
5
    DorisNodesInfo(const TPaloNodesInfo& t_nodes) {
385
15
        for (const auto& node : t_nodes.nodes) {
386
15
            _nodes.emplace(node.id, node);
387
15
        }
388
5
    }
389
0
    void setNodes(const TPaloNodesInfo& t_nodes) {
390
0
        _nodes.clear();
391
0
        for (const auto& node : t_nodes.nodes) {
392
0
            _nodes.emplace(node.id, node);
393
0
        }
394
0
    }
395
15
    const NodeInfo* find_node(int64_t id) const {
396
15
        auto it = _nodes.find(id);
397
15
        if (it != std::end(_nodes)) {
398
15
            return &it->second;
399
15
        }
400
0
        return nullptr;
401
15
    }
402
403
0
    void add_nodes(const std::vector<TNodeInfo>& t_nodes) {
404
0
        for (const auto& node : t_nodes) {
405
0
            const auto* node_info = find_node(node.id);
406
0
            if (node_info == nullptr) {
407
0
                _nodes.emplace(node.id, node);
408
0
            }
409
0
        }
410
0
    }
411
412
0
    const std::unordered_map<int64_t, NodeInfo>& nodes_info() { return _nodes; }
413
414
private:
415
    std::unordered_map<int64_t, NodeInfo> _nodes;
416
};
417
418
} // namespace doris