be/src/load/channel/adaptive_random_bucket_state.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "load/channel/adaptive_random_bucket_state.h" |
19 | | |
20 | | #include <glog/logging.h> |
21 | | |
22 | | #include <utility> |
23 | | |
24 | | namespace doris { |
25 | | |
26 | | Status AdaptiveRandomBucketState::init_partition(int64_t partition_id, |
27 | | const std::vector<int64_t>& tablets, |
28 | | const std::vector<int32_t>& bucket_seqs, |
29 | 8 | int32_t start_tablet_idx) { |
30 | 8 | if (partition_id < 0 || tablets.empty()) { |
31 | 0 | return Status::OK(); |
32 | 0 | } |
33 | 8 | if (tablets.size() != bucket_seqs.size()) { |
34 | 1 | return Status::InternalError( |
35 | 1 | "invalid adaptive random bucket tablet sequence, load_id={}, partition_id={}, " |
36 | 1 | "tablet_count={}, bucket_seq_count={}", |
37 | 1 | print_id(_load_id), partition_id, tablets.size(), bucket_seqs.size()); |
38 | 1 | } |
39 | 7 | if (start_tablet_idx < 0 || static_cast<size_t>(start_tablet_idx) >= tablets.size()) { |
40 | 1 | return Status::InternalError( |
41 | 1 | "invalid adaptive random bucket start tablet index, load_id={}, partition_id={}, " |
42 | 1 | "start_tablet_idx={}, tablet_count={}", |
43 | 1 | print_id(_load_id), partition_id, start_tablet_idx, tablets.size()); |
44 | 1 | } |
45 | 6 | std::lock_guard<std::mutex> lock(_mutex); |
46 | 6 | auto existing = _partition_states.find(partition_id); |
47 | 6 | if (existing != _partition_states.end()) { |
48 | 2 | const auto& state = existing->second; |
49 | 2 | if (state.tablets != tablets || state.bucket_seqs != bucket_seqs || |
50 | 2 | state.initial_tablet_pos != start_tablet_idx) { |
51 | 1 | return Status::InternalError( |
52 | 1 | "inconsistent adaptive random bucket partition init, load_id={}, " |
53 | 1 | "partition_id={}, existing_tablet_count={}, new_tablet_count={}, " |
54 | 1 | "existing_bucket_seq_count={}, new_bucket_seq_count={}, " |
55 | 1 | "existing_start_idx={}, new_start_idx={}", |
56 | 1 | print_id(_load_id), partition_id, state.tablets.size(), tablets.size(), |
57 | 1 | state.bucket_seqs.size(), bucket_seqs.size(), state.initial_tablet_pos, |
58 | 1 | start_tablet_idx); |
59 | 1 | } |
60 | 1 | return Status::OK(); |
61 | 2 | } |
62 | | |
63 | 4 | PartitionState state; |
64 | 4 | state.partition_id = partition_id; |
65 | 4 | state.tablets = tablets; |
66 | 4 | state.bucket_seqs = bucket_seqs; |
67 | 4 | state.initial_tablet_pos = start_tablet_idx; |
68 | 4 | state.tablet_pos = start_tablet_idx; |
69 | 4 | state.current_tablet_id = state.tablets[state.tablet_pos]; |
70 | | |
71 | 4 | _partition_states.emplace(partition_id, std::move(state)); |
72 | 4 | LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: load_id=" << _load_id << ", partition=" << partition_id |
73 | 4 | << ", local tablet count=" << tablets.size() |
74 | 4 | << ", start tablet=" << _partition_states.at(partition_id).current_tablet_id; |
75 | 4 | return Status::OK(); |
76 | 6 | } |
77 | | |
78 | 11 | int64_t AdaptiveRandomBucketState::current_tablet(int64_t partition_id) { |
79 | 11 | std::lock_guard<std::mutex> lock(_mutex); |
80 | 11 | auto it = _partition_states.find(partition_id); |
81 | 11 | if (it == _partition_states.end()) { |
82 | 2 | return -1; |
83 | 2 | } |
84 | 9 | return it->second.current_tablet_id; |
85 | 11 | } |
86 | | |
87 | 4 | void AdaptiveRandomBucketState::rotate_by_tablet(int64_t partition_id, int64_t tablet_id) { |
88 | 4 | std::lock_guard<std::mutex> lock(_mutex); |
89 | 4 | auto state_it = _partition_states.find(partition_id); |
90 | 4 | if (state_it == _partition_states.end()) { |
91 | 0 | return; |
92 | 0 | } |
93 | 4 | auto& state = state_it->second; |
94 | 4 | if (state.current_tablet_id != tablet_id) { |
95 | 1 | return; |
96 | 1 | } |
97 | 3 | int32_t next_pos = (state.tablet_pos + 1) % static_cast<int32_t>(state.tablets.size()); |
98 | | LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: load_id=" << _load_id |
99 | 3 | << ", partition=" << state.partition_id << " rotate tablet " |
100 | 3 | << state.current_tablet_id << " -> " << state.tablets[next_pos] |
101 | 3 | << " after tablet=" << tablet_id << " memtable flushed"; |
102 | 3 | state.tablet_pos = next_pos; |
103 | 3 | state.current_tablet_id = state.tablets[next_pos]; |
104 | 3 | } |
105 | | |
106 | | } // namespace doris |