/root/doris/be/src/exec/rowid_fetcher.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/rowid_fetcher.h" |
19 | | |
20 | | #include <brpc/callback.h> |
21 | | #include <butil/endpoint.h> |
22 | | #include <fmt/format.h> |
23 | | #include <gen_cpp/data.pb.h> |
24 | | #include <gen_cpp/internal_service.pb.h> |
25 | | #include <gen_cpp/olap_file.pb.h> |
26 | | #include <gen_cpp/types.pb.h> |
27 | | #include <glog/logging.h> |
28 | | #include <stddef.h> |
29 | | #include <stdint.h> |
30 | | |
31 | | #include <algorithm> |
32 | | #include <cstdint> |
33 | | #include <ostream> |
34 | | #include <string> |
35 | | #include <unordered_map> |
36 | | #include <utility> |
37 | | #include <vector> |
38 | | |
39 | | #include "bthread/countdown_event.h" |
40 | | #include "common/config.h" |
41 | | #include "common/consts.h" |
42 | | #include "common/exception.h" |
43 | | #include "exec/tablet_info.h" // DorisNodesInfo |
44 | | #include "olap/olap_common.h" |
45 | | #include "olap/tablet_schema.h" |
46 | | #include "olap/utils.h" |
47 | | #include "runtime/descriptors.h" |
48 | | #include "runtime/exec_env.h" // ExecEnv |
49 | | #include "runtime/runtime_state.h" // RuntimeState |
50 | | #include "runtime/types.h" |
51 | | #include "util/brpc_client_cache.h" // BrpcClientCache |
52 | | #include "util/defer_op.h" |
53 | | #include "vec/columns/column.h" |
54 | | #include "vec/columns/column_nullable.h" |
55 | | #include "vec/columns/column_string.h" |
56 | | #include "vec/common/assert_cast.h" |
57 | | #include "vec/common/string_ref.h" |
58 | | #include "vec/core/block.h" // Block |
59 | | #include "vec/data_types/data_type_factory.hpp" |
60 | | #include "vec/data_types/serde/data_type_serde.h" |
61 | | #include "vec/jsonb/serialize.h" |
62 | | |
63 | | namespace doris { |
64 | | |
65 | 0 | Status RowIDFetcher::init() { |
66 | 0 | DorisNodesInfo nodes_info; |
67 | 0 | nodes_info.setNodes(_fetch_option.t_fetch_opt.nodes_info); |
68 | 0 | for (auto [node_id, node_info] : nodes_info.nodes_info()) { |
69 | 0 | auto client = ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client( |
70 | 0 | node_info.host, node_info.brpc_port); |
71 | 0 | if (!client) { |
72 | 0 | LOG(WARNING) << "Get rpc stub failed, host=" << node_info.host |
73 | 0 | << ", port=" << node_info.brpc_port; |
74 | 0 | return Status::InternalError("RowIDFetcher failed to init rpc client, host={}, port={}", |
75 | 0 | node_info.host, node_info.brpc_port); |
76 | 0 | } |
77 | 0 | _stubs.push_back(client); |
78 | 0 | } |
79 | 0 | return Status::OK(); |
80 | 0 | } |
81 | | |
82 | 0 | PMultiGetRequest RowIDFetcher::_init_fetch_request(const vectorized::ColumnString& row_locs) const { |
83 | 0 | PMultiGetRequest mget_req; |
84 | 0 | _fetch_option.desc->to_protobuf(mget_req.mutable_desc()); |
85 | 0 | for (SlotDescriptor* slot : _fetch_option.desc->slots()) { |
86 | | // ignore rowid |
87 | 0 | if (slot->col_name() == BeConsts::ROWID_COL) { |
88 | 0 | continue; |
89 | 0 | } |
90 | 0 | slot->to_protobuf(mget_req.add_slots()); |
91 | 0 | } |
92 | 0 | for (size_t i = 0; i < row_locs.size(); ++i) { |
93 | 0 | PRowLocation row_loc; |
94 | 0 | StringRef row_id_rep = row_locs.get_data_at(i); |
95 | | // TODO: When transferring data between machines with different byte orders (endianness), |
96 | | // not performing proper handling may lead to issues in parsing and exchanging the data. |
97 | 0 | auto location = reinterpret_cast<const GlobalRowLoacation*>(row_id_rep.data); |
98 | 0 | row_loc.set_tablet_id(location->tablet_id); |
99 | 0 | row_loc.set_rowset_id(location->row_location.rowset_id.to_string()); |
100 | 0 | row_loc.set_segment_id(location->row_location.segment_id); |
101 | 0 | row_loc.set_ordinal_id(location->row_location.row_id); |
102 | 0 | *mget_req.add_row_locs() = std::move(row_loc); |
103 | 0 | } |
104 | | // Set column desc |
105 | 0 | for (const TColumn& tcolumn : _fetch_option.t_fetch_opt.column_desc) { |
106 | 0 | TabletColumn column(tcolumn); |
107 | 0 | column.to_schema_pb(mget_req.add_column_desc()); |
108 | 0 | } |
109 | 0 | PUniqueId& query_id = *mget_req.mutable_query_id(); |
110 | 0 | query_id.set_hi(_fetch_option.runtime_state->query_id().hi); |
111 | 0 | query_id.set_lo(_fetch_option.runtime_state->query_id().lo); |
112 | 0 | mget_req.set_be_exec_version(_fetch_option.runtime_state->be_exec_version()); |
113 | 0 | mget_req.set_fetch_row_store(_fetch_option.t_fetch_opt.fetch_row_store); |
114 | 0 | return mget_req; |
115 | 0 | } |
116 | | |
117 | 0 | static void fetch_callback(bthread::CountdownEvent* counter) { |
118 | 0 | Defer __defer([&] { counter->signal(); }); |
119 | 0 | } |
120 | | |
121 | | Status RowIDFetcher::_merge_rpc_results(const PMultiGetRequest& request, |
122 | | const std::vector<PMultiGetResponse>& rsps, |
123 | | const std::vector<brpc::Controller>& cntls, |
124 | | vectorized::Block* output_block, |
125 | 0 | std::vector<PRowLocation>* rows_id) const { |
126 | 0 | output_block->clear(); |
127 | 0 | for (const auto& cntl : cntls) { |
128 | 0 | if (cntl.Failed()) { |
129 | 0 | LOG(WARNING) << "Failed to fetch meet rpc error:" << cntl.ErrorText() |
130 | 0 | << ", host:" << cntl.remote_side(); |
131 | 0 | return Status::InternalError(cntl.ErrorText()); |
132 | 0 | } |
133 | 0 | } |
134 | 0 | vectorized::DataTypeSerDeSPtrs serdes; |
135 | 0 | std::unordered_map<uint32_t, uint32_t> col_uid_to_idx; |
136 | 0 | std::vector<std::string> default_values; |
137 | 0 | default_values.resize(_fetch_option.desc->slots().size()); |
138 | 0 | auto merge_function = [&](const PMultiGetResponse& resp) { |
139 | 0 | Status st(Status::create(resp.status())); |
140 | 0 | if (!st.ok()) { |
141 | 0 | LOG(WARNING) << "Failed to fetch " << st.to_string(); |
142 | 0 | return st; |
143 | 0 | } |
144 | 0 | for (const PRowLocation& row_id : resp.row_locs()) { |
145 | 0 | rows_id->push_back(row_id); |
146 | 0 | } |
147 | | // Merge binary rows |
148 | 0 | if (request.fetch_row_store()) { |
149 | 0 | CHECK(resp.row_locs().size() == resp.binary_row_data_size()); |
150 | 0 | if (output_block->is_empty_column()) { |
151 | 0 | *output_block = vectorized::Block(_fetch_option.desc->slots(), 1); |
152 | 0 | } |
153 | 0 | if (serdes.empty() && col_uid_to_idx.empty()) { |
154 | 0 | serdes = vectorized::create_data_type_serdes(_fetch_option.desc->slots()); |
155 | 0 | for (int i = 0; i < _fetch_option.desc->slots().size(); ++i) { |
156 | 0 | col_uid_to_idx[_fetch_option.desc->slots()[i]->col_unique_id()] = i; |
157 | 0 | default_values[i] = _fetch_option.desc->slots()[i]->col_default_value(); |
158 | 0 | } |
159 | 0 | } |
160 | 0 | for (int i = 0; i < resp.binary_row_data_size(); ++i) { |
161 | 0 | vectorized::JsonbSerializeUtil::jsonb_to_block( |
162 | 0 | serdes, resp.binary_row_data(i).data(), resp.binary_row_data(i).size(), |
163 | 0 | col_uid_to_idx, *output_block, default_values); |
164 | 0 | } |
165 | 0 | return Status::OK(); |
166 | 0 | } |
167 | | // Merge partial blocks |
168 | 0 | vectorized::Block partial_block; |
169 | 0 | RETURN_IF_ERROR(partial_block.deserialize(resp.block())); |
170 | 0 | if (partial_block.is_empty_column()) { |
171 | 0 | return Status::OK(); |
172 | 0 | } |
173 | 0 | CHECK(resp.row_locs().size() == partial_block.rows()); |
174 | 0 | if (output_block->is_empty_column()) { |
175 | 0 | output_block->swap(partial_block); |
176 | 0 | } else if (partial_block.columns() != output_block->columns()) { |
177 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
178 | 0 | "Merge block not match, self:[{}], input:[{}], ", output_block->dump_types(), |
179 | 0 | partial_block.dump_types()); |
180 | 0 | } else { |
181 | 0 | for (int i = 0; i < output_block->columns(); ++i) { |
182 | 0 | output_block->get_by_position(i).column->assume_mutable()->insert_range_from( |
183 | 0 | *partial_block.get_by_position(i) |
184 | 0 | .column->convert_to_full_column_if_const() |
185 | 0 | .get(), |
186 | 0 | 0, partial_block.rows()); |
187 | 0 | } |
188 | 0 | } |
189 | 0 | return Status::OK(); |
190 | 0 | }; |
191 | |
|
192 | 0 | for (const auto& resp : rsps) { |
193 | 0 | RETURN_IF_ERROR(merge_function(resp)); |
194 | 0 | } |
195 | 0 | return Status::OK(); |
196 | 0 | } |
197 | | |
198 | 0 | bool _has_char_type(const TypeDescriptor& desc) { |
199 | 0 | switch (desc.type) { |
200 | 0 | case TYPE_CHAR: |
201 | 0 | return true; |
202 | 0 | case TYPE_ARRAY: |
203 | 0 | case TYPE_MAP: |
204 | 0 | case TYPE_STRUCT: |
205 | 0 | for (int idx = 0; idx < desc.children.size(); ++idx) { |
206 | 0 | if (_has_char_type(desc.children[idx])) { |
207 | 0 | return true; |
208 | 0 | } |
209 | 0 | } |
210 | 0 | return false; |
211 | 0 | default: |
212 | 0 | return false; |
213 | 0 | } |
214 | 0 | } |
215 | | |
216 | | Status RowIDFetcher::fetch(const vectorized::ColumnPtr& column_row_ids, |
217 | 0 | vectorized::Block* res_block) { |
218 | 0 | CHECK(!_stubs.empty()); |
219 | 0 | PMultiGetRequest mget_req = _init_fetch_request(assert_cast<const vectorized::ColumnString&>( |
220 | 0 | *vectorized::remove_nullable(column_row_ids).get())); |
221 | 0 | std::vector<PMultiGetResponse> resps(_stubs.size()); |
222 | 0 | std::vector<brpc::Controller> cntls(_stubs.size()); |
223 | 0 | bthread::CountdownEvent counter(_stubs.size()); |
224 | 0 | for (size_t i = 0; i < _stubs.size(); ++i) { |
225 | 0 | cntls[i].set_timeout_ms(config::fetch_rpc_timeout_seconds * 1000); |
226 | 0 | auto callback = brpc::NewCallback(fetch_callback, &counter); |
227 | 0 | _stubs[i]->multiget_data(&cntls[i], &mget_req, &resps[i], callback); |
228 | 0 | } |
229 | 0 | counter.wait(); |
230 | | |
231 | | // Merge |
232 | 0 | std::vector<PRowLocation> rows_locs; |
233 | 0 | rows_locs.reserve(rows_locs.size()); |
234 | 0 | RETURN_IF_ERROR(_merge_rpc_results(mget_req, resps, cntls, res_block, &rows_locs)); |
235 | 0 | if (rows_locs.size() < column_row_ids->size()) { |
236 | 0 | return Status::InternalError("Miss matched return row loc count {}, expected {}, input {}", |
237 | 0 | rows_locs.size(), res_block->rows(), column_row_ids->size()); |
238 | 0 | } |
239 | | // Final sort by row_ids sequence, since row_ids is already sorted if need |
240 | 0 | std::map<GlobalRowLoacation, size_t> positions; |
241 | 0 | for (size_t i = 0; i < rows_locs.size(); ++i) { |
242 | 0 | RowsetId rowset_id; |
243 | 0 | rowset_id.init(rows_locs[i].rowset_id()); |
244 | 0 | GlobalRowLoacation grl(rows_locs[i].tablet_id(), rowset_id, rows_locs[i].segment_id(), |
245 | 0 | rows_locs[i].ordinal_id()); |
246 | 0 | positions[grl] = i; |
247 | 0 | }; |
248 | | // TODO remove this warning code |
249 | 0 | if (positions.size() < rows_locs.size()) { |
250 | 0 | LOG(WARNING) << "contains duplicated row entry"; |
251 | 0 | } |
252 | 0 | vectorized::IColumn::Permutation permutation; |
253 | 0 | permutation.reserve(column_row_ids->size()); |
254 | 0 | for (size_t i = 0; i < column_row_ids->size(); ++i) { |
255 | 0 | auto location = |
256 | 0 | reinterpret_cast<const GlobalRowLoacation*>(column_row_ids->get_data_at(i).data); |
257 | 0 | permutation.push_back(positions[*location]); |
258 | 0 | } |
259 | 0 | for (size_t i = 0; i < res_block->columns(); ++i) { |
260 | 0 | res_block->get_by_position(i).column = |
261 | 0 | res_block->get_by_position(i).column->permute(permutation, permutation.size()); |
262 | 0 | } |
263 | | // Check row consistency |
264 | 0 | RETURN_IF_CATCH_EXCEPTION(res_block->check_number_of_rows()); |
265 | | // shrink for char type |
266 | 0 | std::vector<size_t> char_type_idx; |
267 | 0 | for (size_t i = 0; i < _fetch_option.desc->slots().size(); i++) { |
268 | 0 | const auto& column_desc = _fetch_option.desc->slots()[i]; |
269 | 0 | const TypeDescriptor& type_desc = column_desc->type(); |
270 | 0 | if (_has_char_type(type_desc)) { |
271 | 0 | char_type_idx.push_back(i); |
272 | 0 | } |
273 | 0 | } |
274 | 0 | res_block->shrink_char_type_column_suffix_zero(char_type_idx); |
275 | 0 | VLOG_DEBUG << "dump block:" << res_block->dump_data(0, 10); |
276 | 0 | return Status::OK(); |
277 | 0 | } |
278 | | |
279 | | } // namespace doris |