Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/sink/vtablet_finder.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/Exprs_types.h> |
22 | | #include <gen_cpp/FrontendService_types.h> |
23 | | #include <glog/logging.h> |
24 | | |
25 | | #include <algorithm> |
26 | | #include <string> |
27 | | #include <utility> |
28 | | |
29 | | #include "common/compiler_util.h" // IWYU pragma: keep |
30 | | #include "common/config.h" |
31 | | #include "common/status.h" |
32 | | #include "core/block/block.h" |
33 | | #include "runtime/runtime_state.h" |
34 | | #include "storage/tablet_info.h" |
35 | | |
36 | | namespace doris { |
37 | | |
38 | | void AdaptiveRandomBucketState::init_partition(int32_t sender_id, int64_t partition_id, |
39 | | const std::vector<int64_t>& tablets, |
40 | | const std::vector<int32_t>& bucket_seqs, |
41 | 9.29k | int32_t start_tablet_idx) { |
42 | 9.29k | if (sender_id < 0 || partition_id < 0 || tablets.empty()) { |
43 | 0 | return; |
44 | 0 | } |
45 | 9.29k | std::lock_guard<std::mutex> lock(_mutex); |
46 | 9.29k | auto& partition_states = _sender_partition_states[sender_id]; |
47 | 9.29k | if (partition_states.contains(partition_id)) { |
48 | 1 | return; |
49 | 1 | } |
50 | | |
51 | 9.29k | PartitionState state; |
52 | 9.29k | state.partition_id = partition_id; |
53 | 9.29k | state.tablets = tablets; |
54 | 9.29k | state.bucket_seqs = bucket_seqs; |
55 | 9.29k | if (start_tablet_idx >= 0 && start_tablet_idx < state.tablets.size()) { |
56 | 9.29k | state.tablet_pos = start_tablet_idx; |
57 | 9.29k | } |
58 | 9.29k | state.current_tablet_id = state.tablets[state.tablet_pos]; |
59 | | |
60 | 9.29k | partition_states.emplace(partition_id, std::move(state)); |
61 | 9.29k | LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: load_id=" << _load_id << ", sender_id=" << sender_id |
62 | 9.29k | << ", partition=" << partition_id << ", local tablet count=" << tablets.size() |
63 | 9.29k | << ", start tablet=" << partition_states.at(partition_id).current_tablet_id; |
64 | 9.29k | } |
65 | | |
66 | 4.35k | int64_t AdaptiveRandomBucketState::current_tablet(int32_t sender_id, int64_t partition_id) { |
67 | 4.35k | std::lock_guard<std::mutex> lock(_mutex); |
68 | 4.35k | auto sender_it = _sender_partition_states.find(sender_id); |
69 | 4.35k | if (sender_it == _sender_partition_states.end()) { |
70 | 1 | return -1; |
71 | 1 | } |
72 | 4.35k | auto it = sender_it->second.find(partition_id); |
73 | 4.35k | if (it == sender_it->second.end()) { |
74 | 0 | return -1; |
75 | 0 | } |
76 | 4.35k | return it->second.current_tablet_id; |
77 | 4.35k | } |
78 | | |
79 | | void AdaptiveRandomBucketState::rotate_by_tablet(int32_t sender_id, int64_t partition_id, |
80 | 3 | int64_t tablet_id) { |
81 | 3 | if (!config::enable_adaptive_random_bucket_load_bucket_rotation) { |
82 | 0 | return; |
83 | 0 | } |
84 | 3 | std::lock_guard<std::mutex> lock(_mutex); |
85 | 3 | auto sender_it = _sender_partition_states.find(sender_id); |
86 | 3 | if (sender_it == _sender_partition_states.end()) { |
87 | 0 | return; |
88 | 0 | } |
89 | 3 | auto state_it = sender_it->second.find(partition_id); |
90 | 3 | if (state_it == sender_it->second.end()) { |
91 | 0 | return; |
92 | 0 | } |
93 | 3 | auto& state = state_it->second; |
94 | 3 | if (state.current_tablet_id != tablet_id) { |
95 | 1 | return; |
96 | 1 | } |
97 | 2 | int32_t next_pos = (state.tablet_pos + 1) % static_cast<int32_t>(state.tablets.size()); |
98 | 2 | LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: load_id=" << _load_id << ", sender_id=" << sender_id |
99 | 2 | << ", partition=" << state.partition_id << " rotate tablet " |
100 | 2 | << state.current_tablet_id << " -> " << state.tablets[next_pos] |
101 | 2 | << " after tablet=" << tablet_id << " memtable flushed"; |
102 | 2 | state.tablet_pos = next_pos; |
103 | 2 | state.current_tablet_id = state.tablets[next_pos]; |
104 | 2 | } |
105 | | |
106 | | Status OlapTabletFinder::find_tablets(RuntimeState* state, Block* block, int rows, |
107 | | std::vector<VOlapTablePartition*>& partitions, |
108 | | std::vector<uint32_t>& tablet_index, std::vector<bool>& skip, |
109 | 43.4k | std::vector<uint32_t>* miss_rows) { |
110 | 35.6M | for (int index = 0; index < rows; index++) { |
111 | 35.6M | _vpartition->find_partition(block, index, partitions[index]); |
112 | 35.6M | } |
113 | | |
114 | 43.4k | std::vector<uint32_t> qualified_rows; |
115 | 43.4k | qualified_rows.reserve(rows); |
116 | | |
117 | 35.8M | for (int row_index = 0; row_index < rows; row_index++) { |
118 | 35.8M | if (partitions[row_index] == nullptr) [[unlikely]] { |
119 | 38.3k | if (miss_rows != nullptr) { // auto partition table |
120 | 37.2k | miss_rows->push_back(row_index); // already reserve memory outside |
121 | 37.2k | skip[row_index] = true; |
122 | 37.2k | continue; |
123 | 37.2k | } |
124 | 1.11k | _num_filtered_rows++; |
125 | 1.11k | _filter_bitmap.Set(row_index, true); |
126 | 1.11k | skip[row_index] = true; |
127 | 1.11k | RETURN_IF_ERROR(state->append_error_msg_to_file( |
128 | 1.11k | []() -> std::string { return ""; }, |
129 | 1.11k | [&]() -> std::string { |
130 | 1.11k | fmt::memory_buffer buf; |
131 | 1.11k | fmt::format_to(buf, "no partition for this tuple. tuple={}", |
132 | 1.11k | block->dump_data_json(row_index, 1)); |
133 | 1.11k | return fmt::to_string(buf); |
134 | 1.11k | })); |
135 | 1.11k | continue; |
136 | 1.11k | } |
137 | 35.7M | if (!partitions[row_index]->is_mutable) [[unlikely]] { |
138 | 12 | _num_immutable_partition_filtered_rows++; |
139 | 12 | skip[row_index] = true; |
140 | 12 | continue; |
141 | 12 | } |
142 | 35.7M | if (partitions[row_index]->num_buckets <= 0) [[unlikely]] { |
143 | 0 | std::stringstream ss; |
144 | 0 | ss << "num_buckets must be greater than 0, num_buckets=" |
145 | 0 | << partitions[row_index]->num_buckets; |
146 | 0 | return Status::InternalError(ss.str()); |
147 | 0 | } |
148 | | |
149 | 35.7M | _partition_ids.emplace(partitions[row_index]->id); |
150 | | |
151 | 35.7M | qualified_rows.push_back(row_index); |
152 | 35.7M | } |
153 | | |
154 | 43.4k | if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_ROW) { |
155 | 37.7k | _vpartition->find_tablets(block, qualified_rows, partitions, tablet_index); |
156 | 37.7k | } else if (_find_tablet_mode == FindTabletMode::FIND_TABLET_RANDOM_BUCKET) { |
157 | | // Receiver-side random bucket mode only needs partition ids on sender side. |
158 | | // The receiver decides the concrete tablet from its local ordered tablet list. |
159 | 4.97k | } else { |
160 | | // FIND_TABLET_EVERY_BATCH / FIND_TABLET_EVERY_SINK |
161 | 651 | _vpartition->find_tablets(block, qualified_rows, partitions, tablet_index, |
162 | 651 | &_partition_to_tablet_map); |
163 | 651 | if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_BATCH) { |
164 | 645 | for (auto it : _partition_to_tablet_map) { |
165 | | // do round-robin for next batch |
166 | 645 | if (it.first->load_tablet_idx != -1) { |
167 | 645 | it.first->load_tablet_idx++; |
168 | 645 | } |
169 | 645 | } |
170 | 645 | _partition_to_tablet_map.clear(); |
171 | 645 | } |
172 | 651 | } |
173 | | |
174 | 43.4k | return Status::OK(); |
175 | 43.4k | } |
176 | | |
177 | | } // namespace doris |