/root/doris/be/src/exec/tablet_info.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <butil/fast_rand.h> |
21 | | #include <gen_cpp/Descriptors_types.h> |
22 | | #include <gen_cpp/descriptors.pb.h> |
23 | | #include <gen_cpp/olap_file.pb.h> |
24 | | |
25 | | #include <cstdint> |
26 | | #include <functional> |
27 | | #include <iterator> |
28 | | #include <map> |
29 | | #include <memory> |
30 | | #include <string> |
31 | | #include <tuple> |
32 | | #include <unordered_map> |
33 | | #include <utility> |
34 | | #include <vector> |
35 | | |
36 | | #include "common/cast_set.h" |
37 | | #include "common/logging.h" |
38 | | #include "common/object_pool.h" |
39 | | #include "common/status.h" |
40 | | #include "runtime/descriptors.h" |
41 | | #include "runtime/raw_value.h" |
42 | | #include "vec/columns/column.h" |
43 | | #include "vec/core/block.h" |
44 | | #include "vec/core/column_with_type_and_name.h" |
45 | | #include "vec/exprs/vexpr.h" |
46 | | #include "vec/exprs/vexpr_fwd.h" |
47 | | |
48 | | namespace doris { |
49 | | #include "common/compile_check_begin.h" |
50 | | class MemTracker; |
51 | | class SlotDescriptor; |
52 | | class TExprNode; |
53 | | class TabletColumn; |
54 | | class TabletIndex; |
55 | | class TupleDescriptor; |
56 | | |
57 | | struct OlapTableIndexSchema { |
58 | | int64_t index_id; |
59 | | std::vector<SlotDescriptor*> slots; |
60 | | int32_t schema_hash; |
61 | | std::vector<TabletColumn*> columns; |
62 | | std::vector<TabletIndex*> indexes; |
63 | | vectorized::VExprContextSPtr where_clause; |
64 | | |
65 | | void to_protobuf(POlapTableIndexSchema* pindex) const; |
66 | | }; |
67 | | |
68 | | class OlapTableSchemaParam { |
69 | | public: |
70 | 44 | OlapTableSchemaParam() = default; |
71 | 44 | ~OlapTableSchemaParam() noexcept = default; |
72 | | |
73 | | Status init(const TOlapTableSchemaParam& tschema); |
74 | | Status init(const POlapTableSchemaParam& pschema); |
75 | | |
76 | 28 | int64_t db_id() const { return _db_id; } |
77 | 28 | int64_t table_id() const { return _table_id; } |
78 | 0 | int64_t version() const { return _version; } |
79 | | |
80 | 0 | TupleDescriptor* tuple_desc() const { return _tuple_desc; } |
81 | 28 | const std::vector<OlapTableIndexSchema*>& indexes() const { return _indexes; } |
82 | | |
83 | | void to_protobuf(POlapTableSchemaParam* pschema) const; |
84 | | |
85 | | // NOTE: this function is not thread-safe. |
86 | 16 | POlapTableSchemaParam* to_protobuf() const { |
87 | 16 | if (_proto_schema == nullptr) { |
88 | 16 | _proto_schema = _obj_pool.add(new POlapTableSchemaParam()); |
89 | 16 | to_protobuf(_proto_schema); |
90 | 16 | } |
91 | 16 | return _proto_schema; |
92 | 16 | } |
93 | | |
94 | 28 | UniqueKeyUpdateModePB unique_key_update_mode() const { return _unique_key_update_mode; } |
95 | | |
96 | 28 | bool is_partial_update() const { |
97 | 28 | return _unique_key_update_mode != UniqueKeyUpdateModePB::UPSERT; |
98 | 28 | } |
99 | 0 | bool is_fixed_partial_update() const { |
100 | 0 | return _unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS; |
101 | 0 | } |
102 | 0 | bool is_flexible_partial_update() const { |
103 | 0 | return _unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS; |
104 | 0 | } |
105 | | |
106 | 28 | std::set<std::string> partial_update_input_columns() const { |
107 | 28 | return _partial_update_input_columns; |
108 | 28 | } |
109 | 28 | PartialUpdateNewRowPolicyPB partial_update_new_key_policy() const { |
110 | 28 | return _partial_update_new_row_policy; |
111 | 28 | } |
112 | 28 | std::string auto_increment_coulumn() const { return _auto_increment_column; } |
113 | 0 | int32_t auto_increment_column_unique_id() const { return _auto_increment_column_unique_id; } |
114 | 0 | void set_timestamp_ms(int64_t timestamp_ms) { _timestamp_ms = timestamp_ms; } |
115 | 28 | int64_t timestamp_ms() const { return _timestamp_ms; } |
116 | 0 | void set_nano_seconds(int32_t nano_seconds) { _nano_seconds = nano_seconds; } |
117 | 28 | int32_t nano_seconds() const { return _nano_seconds; } |
118 | 0 | void set_timezone(std::string timezone) { _timezone = timezone; } |
119 | 28 | std::string timezone() const { return _timezone; } |
120 | 28 | bool is_strict_mode() const { return _is_strict_mode; } |
121 | 28 | int32_t sequence_map_col_uid() const { return _sequence_map_col_uid; } |
122 | | std::string debug_string() const; |
123 | | |
124 | | Status init_unique_key_update_mode(const TOlapTableSchemaParam& tschema); |
125 | | |
126 | | private: |
127 | | int64_t _db_id; |
128 | | int64_t _table_id; |
129 | | int64_t _version; |
130 | | |
131 | | TupleDescriptor* _tuple_desc = nullptr; |
132 | | mutable POlapTableSchemaParam* _proto_schema = nullptr; |
133 | | std::vector<OlapTableIndexSchema*> _indexes; |
134 | | mutable ObjectPool _obj_pool; |
135 | | UniqueKeyUpdateModePB _unique_key_update_mode {UniqueKeyUpdateModePB::UPSERT}; |
136 | | PartialUpdateNewRowPolicyPB _partial_update_new_row_policy { |
137 | | PartialUpdateNewRowPolicyPB::APPEND}; |
138 | | std::set<std::string> _partial_update_input_columns; |
139 | | bool _is_strict_mode = false; |
140 | | std::string _auto_increment_column; |
141 | | int32_t _auto_increment_column_unique_id; |
142 | | int64_t _timestamp_ms = 0; |
143 | | int32_t _nano_seconds {0}; |
144 | | std::string _timezone; |
145 | | int32_t _sequence_map_col_uid {-1}; |
146 | | }; |
147 | | |
148 | | using OlapTableIndexTablets = TOlapTableIndexTablets; |
149 | | // struct TOlapTableIndexTablets { |
150 | | // 1: required i64 index_id |
151 | | // 2: required list<i64> tablets |
152 | | // } |
153 | | |
154 | | using BlockRow = std::pair<vectorized::Block*, int32_t>; |
155 | | using BlockRowWithIndicator = |
156 | | std::tuple<vectorized::Block*, int32_t, bool>; // [block, row, is_transformed] |
157 | | |
158 | | struct VOlapTablePartition { |
159 | | int64_t id = 0; |
160 | | BlockRow start_key; |
161 | | BlockRow end_key; |
162 | | std::vector<BlockRow> in_keys; |
163 | | int64_t num_buckets = 0; |
164 | | std::vector<OlapTableIndexTablets> indexes; |
165 | | bool is_mutable; |
166 | | // -1 indicates partition with hash distribution |
167 | | int64_t load_tablet_idx = -1; |
168 | | int total_replica_num = 0; |
169 | | int load_required_replica_num = 0; |
170 | | |
171 | | VOlapTablePartition(vectorized::Block* partition_block) |
172 | | // the default value of partition bound is -1. |
173 | 0 | : start_key {partition_block, -1}, end_key {partition_block, -1} {} |
174 | | }; |
175 | | |
176 | | // this is only used by tablet_sink. so we can assume it's inited by its' descriptor. |
177 | | class VOlapTablePartKeyComparator { |
178 | | public: |
179 | | VOlapTablePartKeyComparator(const std::vector<uint16_t>& slot_locs, |
180 | | const std::vector<uint16_t>& params_locs) |
181 | 0 | : _slot_locs(slot_locs), _param_locs(params_locs) {} |
182 | | |
183 | | // return true if lhs < rhs |
184 | | // 'row' is -1 mean maximal boundary |
185 | | bool operator()(const BlockRowWithIndicator& lhs, const BlockRowWithIndicator& rhs) const; |
186 | | |
187 | | private: |
188 | | const std::vector<uint16_t>& _slot_locs; |
189 | | const std::vector<uint16_t>& _param_locs; |
190 | | }; |
191 | | |
192 | | // store an olap table's tablet information |
193 | | class VOlapTablePartitionParam { |
194 | | public: |
195 | | VOlapTablePartitionParam(std::shared_ptr<OlapTableSchemaParam>& schema, |
196 | | const TOlapTablePartitionParam& param); |
197 | | |
198 | | ~VOlapTablePartitionParam(); |
199 | | |
200 | | Status init(); |
201 | | |
202 | 0 | int64_t db_id() const { return _t_param.db_id; } |
203 | 0 | int64_t table_id() const { return _t_param.table_id; } |
204 | 0 | int64_t version() const { return _t_param.version; } |
205 | | |
206 | | // return true if we found this block_row in partition |
207 | | ALWAYS_INLINE bool find_partition(vectorized::Block* block, int row, |
208 | 0 | VOlapTablePartition*& partition) const { |
209 | 0 | auto it = _is_in_partition ? _partitions_map->find(std::tuple {block, row, true}) |
210 | 0 | : _partitions_map->upper_bound(std::tuple {block, row, true}); |
211 | 0 | VLOG_TRACE << "find row " << row << " of\n" |
212 | 0 | << block->dump_data() << "in:\n" |
213 | 0 | << _partition_block.dump_data() << "result line row: " << std::get<1>(it->first); |
214 | | |
215 | | // for list partition it might result in default partition |
216 | 0 | if (_is_in_partition) { |
217 | 0 | partition = (it != _partitions_map->end()) ? it->second : _default_partition; |
218 | 0 | it = _partitions_map->end(); |
219 | 0 | } |
220 | 0 | if (it != _partitions_map->end() && |
221 | 0 | _part_contains(it->second, std::tuple {block, row, true})) { |
222 | 0 | partition = it->second; |
223 | 0 | } |
224 | 0 | return (partition != nullptr); |
225 | 0 | } |
226 | | |
227 | | ALWAYS_INLINE void find_tablets( |
228 | | vectorized::Block* block, const std::vector<uint32_t>& indexes, |
229 | | const std::vector<VOlapTablePartition*>& partitions, |
230 | | std::vector<uint32_t>& tablet_indexes /*result*/, |
231 | | /*TODO: check if flat hash map will be better*/ |
232 | 0 | std::map<VOlapTablePartition*, int64_t>* partition_tablets_buffer = nullptr) const { |
233 | 0 | std::function<uint32_t(vectorized::Block*, uint32_t, const VOlapTablePartition&)> |
234 | 0 | compute_function; |
235 | 0 | if (!_distributed_slot_locs.empty()) { |
236 | | //TODO: refactor by saving the hash values. then we can calculate in columnwise. |
237 | 0 | compute_function = [this](vectorized::Block* block, uint32_t row, |
238 | 0 | const VOlapTablePartition& partition) -> uint32_t { |
239 | 0 | uint32_t hash_val = 0; |
240 | 0 | for (unsigned short _distributed_slot_loc : _distributed_slot_locs) { |
241 | 0 | auto* slot_desc = _slots[_distributed_slot_loc]; |
242 | 0 | auto& column = block->get_by_position(_distributed_slot_loc).column; |
243 | 0 | auto val = column->get_data_at(row); |
244 | 0 | if (val.data != nullptr) { |
245 | 0 | hash_val = RawValue::zlib_crc32(val.data, val.size, |
246 | 0 | slot_desc->type()->get_primitive_type(), |
247 | 0 | hash_val); |
248 | 0 | } else { |
249 | 0 | hash_val = HashUtil::zlib_crc_hash_null(hash_val); |
250 | 0 | } |
251 | 0 | } |
252 | 0 | return cast_set<uint32_t>(hash_val % partition.num_buckets); |
253 | 0 | }; |
254 | 0 | } else { // random distribution |
255 | 0 | compute_function = [](vectorized::Block* block, uint32_t row, |
256 | 0 | const VOlapTablePartition& partition) -> uint32_t { |
257 | 0 | if (partition.load_tablet_idx == -1) { |
258 | | // for compatible with old version, just do random |
259 | 0 | return cast_set<uint32_t>(butil::fast_rand() % partition.num_buckets); |
260 | 0 | } |
261 | 0 | return cast_set<uint32_t>(partition.load_tablet_idx % partition.num_buckets); |
262 | 0 | }; |
263 | 0 | } |
264 | |
|
265 | 0 | if (partition_tablets_buffer == nullptr) { |
266 | 0 | for (auto index : indexes) { |
267 | 0 | tablet_indexes[index] = compute_function(block, index, *partitions[index]); |
268 | 0 | } |
269 | 0 | } else { // use buffer |
270 | 0 | for (auto index : indexes) { |
271 | 0 | auto* partition = partitions[index]; |
272 | 0 | if (auto it = partition_tablets_buffer->find(partition); |
273 | 0 | it != partition_tablets_buffer->end()) { |
274 | 0 | tablet_indexes[index] = cast_set<uint32_t>(it->second); // tablet |
275 | 0 | } else { |
276 | | // compute and save in buffer |
277 | 0 | (*partition_tablets_buffer)[partition] = tablet_indexes[index] = |
278 | 0 | compute_function(block, index, *partitions[index]); |
279 | 0 | } |
280 | 0 | } |
281 | 0 | } |
282 | 0 | } |
283 | | |
284 | 0 | const std::vector<VOlapTablePartition*>& get_partitions() const { return _partitions; } |
285 | | |
286 | | // it's same with auto now because we only support transformed partition in auto partition. may expand in future |
287 | 0 | bool is_projection_partition() const { return _is_auto_partition; } |
288 | 0 | bool is_auto_partition() const { return _is_auto_partition; } |
289 | | |
290 | 0 | bool is_auto_detect_overwrite() const { return _is_auto_detect_overwrite; } |
291 | 0 | int64_t get_overwrite_group_id() const { return _overwrite_group_id; } |
292 | | |
293 | 0 | std::vector<uint16_t> get_partition_keys() const { return _partition_slot_locs; } |
294 | | |
295 | | Status add_partitions(const std::vector<TOlapTablePartition>& partitions); |
296 | | // no need to del/reinsert partition keys, but change the link. reset the _partitions items |
297 | | Status replace_partitions(std::vector<int64_t>& old_partition_ids, |
298 | | const std::vector<TOlapTablePartition>& new_partitions); |
299 | | |
300 | 0 | vectorized::VExprContextSPtrs get_part_func_ctx() { return _part_func_ctx; } |
301 | 0 | vectorized::VExprSPtrs get_partition_function() { return _partition_function; } |
302 | | |
303 | | // which will affect _partition_block |
304 | | Status generate_partition_from(const TOlapTablePartition& t_part, |
305 | | VOlapTablePartition*& part_result); |
306 | | |
307 | 0 | void set_transformed_slots(const std::vector<uint16_t>& new_slots) { |
308 | 0 | _transformed_slot_locs = new_slots; |
309 | 0 | } |
310 | | |
311 | | private: |
312 | | Status _create_partition_keys(const std::vector<TExprNode>& t_exprs, BlockRow* part_key); |
313 | | |
314 | | // check if this partition contain this key |
315 | | bool _part_contains(VOlapTablePartition* part, BlockRowWithIndicator key) const; |
316 | | |
317 | | // this partition only valid in this schema |
318 | | std::shared_ptr<OlapTableSchemaParam> _schema; |
319 | | TOlapTablePartitionParam _t_param; |
320 | | |
321 | | const std::vector<SlotDescriptor*>& _slots; |
322 | | std::vector<uint16_t> _partition_slot_locs; |
323 | | std::vector<uint16_t> _transformed_slot_locs; |
324 | | std::vector<uint16_t> _distributed_slot_locs; |
325 | | |
326 | | ObjectPool _obj_pool; |
327 | | vectorized::Block _partition_block; |
328 | | std::unique_ptr<MemTracker> _mem_tracker; |
329 | | std::vector<VOlapTablePartition*> _partitions; |
330 | | // For all partition value rows saved in this map, indicator is false. whenever we use a value to find in it, the param is true. |
331 | | // so that we can distinguish which column index to use (origin slots or transformed slots). |
332 | | // For range partition we ONLY SAVE RIGHT ENDS. when we find a part's RIGHT by a value, check if part's left cover it then. |
333 | | std::unique_ptr< |
334 | | std::map<BlockRowWithIndicator, VOlapTablePartition*, VOlapTablePartKeyComparator>> |
335 | | _partitions_map; |
336 | | |
337 | | bool _is_in_partition = false; |
338 | | size_t _mem_usage = 0; |
339 | | // only works when using list partition, the resource is owned by _partitions |
340 | | VOlapTablePartition* _default_partition = nullptr; |
341 | | |
342 | | bool _is_auto_partition = false; |
343 | | vectorized::VExprContextSPtrs _part_func_ctx = {nullptr}; |
344 | | vectorized::VExprSPtrs _partition_function = {nullptr}; |
345 | | TPartitionType::type _part_type; // support list or range |
346 | | // "insert overwrite partition(*)", detect which partitions by BE |
347 | | bool _is_auto_detect_overwrite = false; |
348 | | int64_t _overwrite_group_id = 0; |
349 | | }; |
350 | | |
351 | | // indicate where's the tablet and all its replications (node-wise) |
352 | | using TabletLocation = TTabletLocation; |
353 | | // struct TTabletLocation { |
354 | | // 1: required i64 tablet_id |
355 | | // 2: required list<i64> node_ids |
356 | | // } |
357 | | |
358 | | class OlapTableLocationParam { |
359 | | public: |
360 | 0 | OlapTableLocationParam(const TOlapTableLocationParam& t_param) : _t_param(t_param) { |
361 | 0 | for (auto& location : _t_param.tablets) { |
362 | 0 | _tablets.emplace(location.tablet_id, &location); |
363 | 0 | } |
364 | 0 | } |
365 | | |
366 | 0 | int64_t db_id() const { return _t_param.db_id; } |
367 | 0 | int64_t table_id() const { return _t_param.table_id; } |
368 | 0 | int64_t version() const { return _t_param.version; } |
369 | | |
370 | 0 | TabletLocation* find_tablet(int64_t tablet_id) const { |
371 | 0 | auto it = _tablets.find(tablet_id); |
372 | 0 | if (it != std::end(_tablets)) { |
373 | 0 | return it->second; |
374 | 0 | } |
375 | 0 | return nullptr; |
376 | 0 | } |
377 | | |
378 | 0 | void add_locations(std::vector<TTabletLocation>& locations) { |
379 | 0 | for (auto& location : locations) { |
380 | 0 | if (_tablets.find(location.tablet_id) == _tablets.end()) { |
381 | 0 | _tablets[location.tablet_id] = &location; |
382 | 0 | } |
383 | 0 | } |
384 | 0 | } |
385 | | |
386 | | private: |
387 | | TOlapTableLocationParam _t_param; |
388 | | // [tablet_id, tablet]. tablet has id, also. |
389 | | std::unordered_map<int64_t, TabletLocation*> _tablets; |
390 | | }; |
391 | | |
392 | | struct NodeInfo { |
393 | | int64_t id; |
394 | | int64_t option; |
395 | | std::string host; |
396 | | int32_t brpc_port; |
397 | | |
398 | 0 | NodeInfo() = default; |
399 | | |
400 | | NodeInfo(const TNodeInfo& tnode) |
401 | 0 | : id(tnode.id), |
402 | 0 | option(tnode.option), |
403 | 0 | host(tnode.host), |
404 | 0 | brpc_port(tnode.async_internal_port) {} |
405 | | }; |
406 | | |
407 | | class DorisNodesInfo { |
408 | | public: |
409 | 0 | DorisNodesInfo() = default; |
410 | 0 | DorisNodesInfo(const TPaloNodesInfo& t_nodes) { |
411 | 0 | for (const auto& node : t_nodes.nodes) { |
412 | 0 | _nodes.emplace(node.id, node); |
413 | 0 | } |
414 | 0 | } |
415 | 0 | void setNodes(const TPaloNodesInfo& t_nodes) { |
416 | 0 | _nodes.clear(); |
417 | 0 | for (const auto& node : t_nodes.nodes) { |
418 | 0 | _nodes.emplace(node.id, node); |
419 | 0 | } |
420 | 0 | } |
421 | 0 | const NodeInfo* find_node(int64_t id) const { |
422 | 0 | auto it = _nodes.find(id); |
423 | 0 | if (it != std::end(_nodes)) { |
424 | 0 | return &it->second; |
425 | 0 | } |
426 | 0 | return nullptr; |
427 | 0 | } |
428 | | |
429 | 0 | void add_nodes(const std::vector<TNodeInfo>& t_nodes) { |
430 | 0 | for (const auto& node : t_nodes) { |
431 | 0 | const auto* node_info = find_node(node.id); |
432 | 0 | if (node_info == nullptr) { |
433 | 0 | _nodes.emplace(node.id, node); |
434 | 0 | } |
435 | 0 | } |
436 | 0 | } |
437 | | |
438 | 0 | const std::unordered_map<int64_t, NodeInfo>& nodes_info() { return _nodes; } |
439 | | |
440 | | private: |
441 | | std::unordered_map<int64_t, NodeInfo> _nodes; |
442 | | }; |
443 | | |
444 | | #include "common/compile_check_end.h" |
445 | | } // namespace doris |