be/src/service/point_query_executor.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "service/point_query_executor.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/Descriptors_types.h> |
22 | | #include <gen_cpp/Exprs_types.h> |
23 | | #include <gen_cpp/PaloInternalService_types.h> |
24 | | #include <gen_cpp/internal_service.pb.h> |
25 | | #include <glog/logging.h> |
26 | | #include <google/protobuf/extension_set.h> |
27 | | #include <stdlib.h> |
28 | | |
29 | | #include <memory> |
30 | | #include <unordered_map> |
31 | | #include <vector> |
32 | | |
33 | | #include "cloud/cloud_tablet.h" |
34 | | #include "cloud/config.h" |
35 | | #include "common/cast_set.h" |
36 | | #include "common/consts.h" |
37 | | #include "common/status.h" |
38 | | #include "core/data_type/data_type_factory.hpp" |
39 | | #include "core/data_type_serde/data_type_serde.h" |
40 | | #include "exec/sink/writer/vmysql_result_writer.h" |
41 | | #include "exprs/vexpr.h" |
42 | | #include "exprs/vexpr_context.h" |
43 | | #include "exprs/vexpr_fwd.h" |
44 | | #include "exprs/vslot_ref.h" |
45 | | #include "runtime/descriptors.h" |
46 | | #include "runtime/exec_env.h" |
47 | | #include "runtime/result_block_buffer.h" |
48 | | #include "runtime/runtime_profile.h" |
49 | | #include "runtime/runtime_state.h" |
50 | | #include "runtime/thread_context.h" |
51 | | #include "storage/row_cursor.h" |
52 | | #include "storage/rowset/beta_rowset.h" |
53 | | #include "storage/rowset/rowset_fwd.h" |
54 | | #include "storage/segment/column_reader.h" |
55 | | #include "storage/tablet/tablet_schema.h" |
56 | | #include "storage/utils.h" |
57 | | #include "util/defer_op.h" |
58 | | #include "util/jsonb/serialize.h" |
59 | | #include "util/lru_cache.h" |
60 | | #include "util/simd/bits.h" |
61 | | #include "util/thrift_util.h" |
62 | | |
63 | | namespace doris { |
64 | | |
65 | | class PointQueryResultBlockBuffer final : public MySQLResultBlockBuffer { |
66 | | public: |
67 | 0 | PointQueryResultBlockBuffer(RuntimeState* state) : MySQLResultBlockBuffer(state) {} |
68 | 0 | ~PointQueryResultBlockBuffer() override = default; |
69 | 0 | std::shared_ptr<TFetchDataResult> get_block() { |
70 | 0 | std::lock_guard<std::mutex> l(_lock); |
71 | 0 | DCHECK_EQ(_result_batch_queue.size(), 1); |
72 | 0 | auto result = std::move(_result_batch_queue.front()); |
73 | 0 | _result_batch_queue.pop_front(); |
74 | 0 | return result; |
75 | 0 | } |
76 | | }; |
77 | | |
78 | 4.09k | Reusable::~Reusable() = default; |
79 | | |
80 | | // get missing and include column ids |
81 | | // input include_cids : the output expr slots columns unique ids |
82 | | // missing_cids : the output expr columns that not in row columns cids |
83 | | static void get_missing_and_include_cids(const TabletSchema& schema, |
84 | | const std::vector<SlotDescriptor*>& slots, |
85 | | int target_rs_column_id, |
86 | | std::unordered_set<int>& missing_cids, |
87 | 4.04k | std::unordered_set<int>& include_cids) { |
88 | 4.04k | missing_cids.clear(); |
89 | 4.04k | include_cids.clear(); |
90 | 4.04k | for (auto* slot : slots) { |
91 | 4.04k | missing_cids.insert(slot->col_unique_id()); |
92 | 4.04k | } |
93 | | // insert delete sign column id |
94 | 4.04k | missing_cids.insert(schema.columns()[schema.delete_sign_idx()]->unique_id()); |
95 | 4.04k | if (target_rs_column_id == -1) { |
96 | | // no row store columns |
97 | 4.04k | return; |
98 | 4.04k | } |
99 | 0 | const TabletColumn& target_rs_column = schema.column_by_uid(target_rs_column_id); |
100 | 0 | DCHECK(target_rs_column.is_row_store_column()); |
101 | | // The full column group is considered a full match, thus no missing cids |
102 | 0 | if (schema.row_columns_uids().empty()) { |
103 | 0 | missing_cids.clear(); |
104 | 0 | return; |
105 | 0 | } |
106 | 0 | for (int cid : schema.row_columns_uids()) { |
107 | 0 | missing_cids.erase(cid); |
108 | 0 | include_cids.insert(cid); |
109 | 0 | } |
110 | 0 | } |
111 | | |
112 | | constexpr static int s_preallocted_blocks_num = 32; |
113 | | |
114 | | static void extract_slot_ref(const VExprSPtr& expr, TupleDescriptor* tuple_desc, |
115 | 4.04k | std::vector<SlotDescriptor*>& slots) { |
116 | 4.04k | const auto& children = expr->children(); |
117 | 4.04k | for (const auto& i : children) { |
118 | 0 | extract_slot_ref(i, tuple_desc, slots); |
119 | 0 | } |
120 | | |
121 | 4.04k | auto node_type = expr->node_type(); |
122 | 4.04k | if (node_type == TExprNodeType::SLOT_REF) { |
123 | 4.04k | int column_id = static_cast<const VSlotRef*>(expr.get())->column_id(); |
124 | 4.04k | auto* slot_desc = tuple_desc->slots()[column_id]; |
125 | 4.04k | slots.push_back(slot_desc); |
126 | 4.04k | } |
127 | 4.04k | } |
128 | | |
129 | | Status Reusable::init(const TDescriptorTable& t_desc_tbl, const std::vector<TExpr>& output_exprs, |
130 | | const TQueryOptions& query_options, const TabletSchema& schema, |
131 | 4.04k | size_t block_size) { |
132 | 4.04k | _runtime_state = RuntimeState::create_unique(); |
133 | 4.04k | _runtime_state->set_query_options(query_options); |
134 | 4.04k | RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), t_desc_tbl, &_desc_tbl)); |
135 | 4.04k | _runtime_state->set_desc_tbl(_desc_tbl); |
136 | 88.9k | for (const auto* slot : tuple_desc()->slots()) { |
137 | 88.9k | if (!slot->all_access_paths().empty() || !slot->predicate_access_paths().empty()) { |
138 | 0 | return Status::InternalError( |
139 | 0 | "Short-circuit point query does not support nested column access paths, " |
140 | 0 | "slot: {}. Please upgrade FE to disable nested column pruning for " |
141 | 0 | "short-circuit point queries.", |
142 | 0 | slot->col_name()); |
143 | 0 | } |
144 | 88.9k | } |
145 | 4.04k | _block_pool.resize(block_size); |
146 | 8.09k | for (auto& i : _block_pool) { |
147 | 8.09k | i = Block::create_unique(tuple_desc()->slots(), 2); |
148 | | // Name is useless but cost space |
149 | 8.09k | i->clear_names(); |
150 | 8.09k | } |
151 | | |
152 | 4.04k | RETURN_IF_ERROR(VExpr::create_expr_trees(output_exprs, _output_exprs_ctxs)); |
153 | 4.04k | RowDescriptor row_desc(tuple_desc()); |
154 | | // Prepare the exprs to run. |
155 | 4.04k | RETURN_IF_ERROR(VExpr::prepare(_output_exprs_ctxs, _runtime_state.get(), row_desc)); |
156 | 4.04k | RETURN_IF_ERROR(VExpr::open(_output_exprs_ctxs, _runtime_state.get())); |
157 | 4.04k | _create_timestamp = butil::gettimeofday_ms(); |
158 | 4.04k | _data_type_serdes = create_data_type_serdes(tuple_desc()->slots()); |
159 | 4.04k | _col_default_values.resize(tuple_desc()->slots().size()); |
160 | 4.04k | bool has_delete_sign = false; |
161 | 93.0k | for (int i = 0; i < tuple_desc()->slots().size(); ++i) { |
162 | 88.9k | auto* slot = tuple_desc()->slots()[i]; |
163 | 88.9k | if (slot->col_name() == DELETE_SIGN) { |
164 | 0 | has_delete_sign = true; |
165 | 0 | } |
166 | 88.9k | _col_uid_to_idx[slot->col_unique_id()] = i; |
167 | 88.9k | _col_default_values[i] = slot->col_default_value(); |
168 | 88.9k | } |
169 | | |
170 | | // Get the output slot descriptors |
171 | 4.04k | std::vector<SlotDescriptor*> output_slot_descs; |
172 | 4.04k | for (const auto& expr : _output_exprs_ctxs) { |
173 | 4.04k | extract_slot_ref(expr->root(), tuple_desc(), output_slot_descs); |
174 | 4.04k | } |
175 | | |
176 | | // get the delete sign idx in block |
177 | 4.04k | if (has_delete_sign) { |
178 | 0 | _delete_sign_idx = _col_uid_to_idx[schema.columns()[schema.delete_sign_idx()]->unique_id()]; |
179 | 0 | } |
180 | | |
181 | 4.04k | if (schema.have_column(BeConsts::ROW_STORE_COL)) { |
182 | 0 | const auto& column = *DORIS_TRY(schema.column(BeConsts::ROW_STORE_COL)); |
183 | 0 | _row_store_column_ids = column.unique_id(); |
184 | 0 | } |
185 | 4.04k | get_missing_and_include_cids(schema, output_slot_descs, _row_store_column_ids, |
186 | 4.04k | _missing_col_uids, _include_col_uids); |
187 | | |
188 | 4.04k | return Status::OK(); |
189 | 4.04k | } |
190 | | |
191 | 0 | std::unique_ptr<Block> Reusable::get_block() { |
192 | 0 | std::lock_guard lock(_block_mutex); |
193 | 0 | if (_block_pool.empty()) { |
194 | 0 | auto block = Block::create_unique(tuple_desc()->slots(), 2); |
195 | | // Name is useless but cost space |
196 | 0 | block->clear_names(); |
197 | 0 | return block; |
198 | 0 | } |
199 | 0 | auto block = std::move(_block_pool.back()); |
200 | 0 | CHECK(block != nullptr); |
201 | 0 | _block_pool.pop_back(); |
202 | 0 | return block; |
203 | 0 | } |
204 | | |
205 | 0 | void Reusable::return_block(std::unique_ptr<Block>& block) { |
206 | 0 | std::lock_guard lock(_block_mutex); |
207 | 0 | if (block == nullptr) { |
208 | 0 | return; |
209 | 0 | } |
210 | 0 | block->clear_column_data(); |
211 | 0 | _block_pool.push_back(std::move(block)); |
212 | 0 | if (_block_pool.size() > s_preallocted_blocks_num) { |
213 | 0 | _block_pool.resize(s_preallocted_blocks_num); |
214 | 0 | } |
215 | 0 | } |
216 | | |
217 | 0 | LookupConnectionCache* LookupConnectionCache::create_global_instance(size_t capacity) { |
218 | 0 | DCHECK(ExecEnv::GetInstance()->get_lookup_connection_cache() == nullptr); |
219 | 0 | auto* res = new LookupConnectionCache(capacity); |
220 | 0 | return res; |
221 | 0 | } |
222 | | |
223 | | RowCache::RowCache(int64_t capacity, int num_shards) |
224 | 3 | : LRUCachePolicy(CachePolicy::CacheType::POINT_QUERY_ROW_CACHE, capacity, |
225 | 3 | LRUCacheType::SIZE, config::point_query_row_cache_stale_sweep_time_sec, |
226 | 3 | num_shards, /*element count capacity */ 0, |
227 | 3 | /*enable prune*/ true, /*is lru-k*/ true) {} |
228 | | |
229 | | // Create global instance of this class |
230 | 0 | RowCache* RowCache::create_global_cache(int64_t capacity, uint32_t num_shards) { |
231 | 0 | DCHECK(ExecEnv::GetInstance()->get_row_cache() == nullptr); |
232 | 0 | auto* res = new RowCache(capacity, num_shards); |
233 | 0 | return res; |
234 | 0 | } |
235 | | |
236 | 0 | RowCache* RowCache::instance() { |
237 | 0 | return ExecEnv::GetInstance()->get_row_cache(); |
238 | 0 | } |
239 | | |
240 | 4 | bool RowCache::lookup(const RowCacheKey& key, CacheHandle* handle) { |
241 | 4 | const std::string& encoded_key = key.encode(); |
242 | 4 | auto* lru_handle = LRUCachePolicy::lookup(encoded_key); |
243 | 4 | if (!lru_handle) { |
244 | | // cache miss |
245 | 3 | return false; |
246 | 3 | } |
247 | 1 | *handle = CacheHandle(this, lru_handle); |
248 | 1 | return true; |
249 | 4 | } |
250 | | |
251 | 11 | void RowCache::insert(const RowCacheKey& key, const Slice& value) { |
252 | 11 | char* cache_value = static_cast<char*>(malloc(value.size)); |
253 | 11 | memcpy(cache_value, value.data, value.size); |
254 | 11 | auto* row_cache_value = new RowCacheValue; |
255 | 11 | row_cache_value->cache_value = cache_value; |
256 | 11 | const std::string& encoded_key = key.encode(); |
257 | 11 | auto* handle = LRUCachePolicy::insert(encoded_key, row_cache_value, value.size, value.size, |
258 | 11 | CachePriority::NORMAL); |
259 | | // handle will released |
260 | 11 | auto tmp = CacheHandle {this, handle}; |
261 | 11 | } |
262 | | |
263 | 1 | void RowCache::erase(const RowCacheKey& key) { |
264 | 1 | const std::string& encoded_key = key.encode(); |
265 | 1 | LRUCachePolicy::erase(encoded_key); |
266 | 1 | } |
267 | | |
268 | 2.09k | LookupConnectionCache::CacheValue::~CacheValue() { |
269 | 2.09k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
270 | 2.09k | ExecEnv::GetInstance()->point_query_executor_mem_tracker()); |
271 | 2.09k | item.reset(); |
272 | 2.09k | } |
273 | | |
274 | 0 | PointQueryExecutor::~PointQueryExecutor() { |
275 | 0 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
276 | 0 | ExecEnv::GetInstance()->point_query_executor_mem_tracker()); |
277 | 0 | _tablet.reset(); |
278 | 0 | _reusable.reset(); |
279 | 0 | _result_block.reset(); |
280 | 0 | _row_read_ctxs.clear(); |
281 | 0 | } |
282 | | |
283 | | Status PointQueryExecutor::init(const PTabletKeyLookupRequest* request, |
284 | 0 | PTabletKeyLookupResponse* response) { |
285 | 0 | SCOPED_TIMER(&_profile_metrics.init_ns); |
286 | 0 | _response = response; |
287 | | // using cache |
288 | 0 | __int128_t uuid = |
289 | 0 | static_cast<__int128_t>(request->uuid().uuid_high()) << 64 | request->uuid().uuid_low(); |
290 | 0 | SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->point_query_executor_mem_tracker()); |
291 | 0 | auto cache_handle = LookupConnectionCache::instance()->get(uuid); |
292 | 0 | _binary_row_format = request->is_binary_row(); |
293 | 0 | _tablet = DORIS_TRY(ExecEnv::get_tablet(request->tablet_id())); |
294 | 0 | if (cache_handle != nullptr) { |
295 | 0 | _reusable = cache_handle; |
296 | 0 | _profile_metrics.hit_lookup_cache = true; |
297 | 0 | } else { |
298 | | // Lightweight request: FE may omit reusable query context and rely on uuid cache. |
299 | | // If cache miss and required parameters are absent, ask FE to resend a full request. |
300 | 0 | if (uuid != 0 && (!request->has_desc_tbl() || request->desc_tbl().empty() || |
301 | 0 | !request->has_output_expr() || request->output_expr().empty() || |
302 | 0 | !request->has_query_options() || request->query_options().empty())) { |
303 | 0 | if (VLOG_DEBUG_IS_ON) { |
304 | 0 | VLOG_DEBUG << "lookup connection cache miss, ask FE to resend query context" |
305 | 0 | << ", tablet_id=" << request->tablet_id() |
306 | 0 | << ", uuid_high=" << request->uuid().uuid_high() |
307 | 0 | << ", uuid_low=" << request->uuid().uuid_low(); |
308 | 0 | } |
309 | 0 | response->set_need_resend_query_context(true); |
310 | 0 | return Status::OK(); |
311 | 0 | } |
312 | 0 | if (uuid == 0 && (!request->has_desc_tbl() || request->desc_tbl().empty() || |
313 | 0 | !request->has_output_expr() || request->output_expr().empty())) { |
314 | 0 | return Status::InvalidArgument( |
315 | 0 | "tablet_fetch_data requires desc_tbl/output_expr when uuid is not set"); |
316 | 0 | } |
317 | | // init handle |
318 | 0 | auto reusable_ptr = std::make_shared<Reusable>(); |
319 | 0 | TDescriptorTable t_desc_tbl; |
320 | 0 | TExprList t_output_exprs; |
321 | 0 | auto len = cast_set<uint32_t>(request->desc_tbl().size()); |
322 | 0 | RETURN_IF_ERROR( |
323 | 0 | deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(request->desc_tbl().data()), |
324 | 0 | &len, false, &t_desc_tbl)); |
325 | 0 | len = cast_set<uint32_t>(request->output_expr().size()); |
326 | 0 | RETURN_IF_ERROR(deserialize_thrift_msg( |
327 | 0 | reinterpret_cast<const uint8_t*>(request->output_expr().data()), &len, false, |
328 | 0 | &t_output_exprs)); |
329 | 0 | _reusable = reusable_ptr; |
330 | 0 | TQueryOptions t_query_options; |
331 | 0 | len = cast_set<uint32_t>(request->query_options().size()); |
332 | 0 | if (request->has_query_options()) { |
333 | 0 | RETURN_IF_ERROR(deserialize_thrift_msg( |
334 | 0 | reinterpret_cast<const uint8_t*>(request->query_options().data()), &len, false, |
335 | 0 | &t_query_options)); |
336 | 0 | } |
337 | 0 | if (uuid != 0) { |
338 | | // could be reused by requests after, pre allocte more blocks |
339 | 0 | RETURN_IF_ERROR(reusable_ptr->init(t_desc_tbl, t_output_exprs.exprs, t_query_options, |
340 | 0 | *_tablet->tablet_schema(), |
341 | 0 | s_preallocted_blocks_num)); |
342 | 0 | LookupConnectionCache::instance()->add(uuid, reusable_ptr); |
343 | 0 | } else { |
344 | 0 | RETURN_IF_ERROR(reusable_ptr->init(t_desc_tbl, t_output_exprs.exprs, t_query_options, |
345 | 0 | *_tablet->tablet_schema(), 1)); |
346 | 0 | } |
347 | 0 | } |
348 | | // Set timezone from request for functions like from_unixtime() |
349 | 0 | if (request->has_time_zone() && !request->time_zone().empty()) { |
350 | 0 | _reusable->runtime_state()->set_timezone(request->time_zone()); |
351 | 0 | } |
352 | 0 | if (request->has_version() && request->version() >= 0) { |
353 | 0 | _version = request->version(); |
354 | 0 | } |
355 | 0 | RETURN_IF_ERROR(_init_keys(request)); |
356 | 0 | _result_block = _reusable->get_block(); |
357 | 0 | CHECK(_result_block != nullptr); |
358 | |
|
359 | 0 | return Status::OK(); |
360 | 0 | } |
361 | | |
362 | 0 | Status PointQueryExecutor::lookup_up() { |
363 | 0 | SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->point_query_executor_mem_tracker()); |
364 | 0 | RETURN_IF_ERROR(_lookup_row_key()); |
365 | 0 | RETURN_IF_ERROR(_lookup_row_data()); |
366 | 0 | RETURN_IF_ERROR(_output_data()); |
367 | 0 | return Status::OK(); |
368 | 0 | } |
369 | | |
370 | 0 | void PointQueryExecutor::print_profile() { |
371 | 0 | auto init_us = _profile_metrics.init_ns.value() / 1000; |
372 | 0 | auto init_key_us = _profile_metrics.init_key_ns.value() / 1000; |
373 | 0 | auto lookup_key_us = _profile_metrics.lookup_key_ns.value() / 1000; |
374 | 0 | auto lookup_data_us = _profile_metrics.lookup_data_ns.value() / 1000; |
375 | 0 | auto output_data_us = _profile_metrics.output_data_ns.value() / 1000; |
376 | 0 | auto load_segments_key_us = _profile_metrics.load_segment_key_stage_ns.value() / 1000; |
377 | 0 | auto load_segments_data_us = _profile_metrics.load_segment_data_stage_ns.value() / 1000; |
378 | 0 | auto total_us = init_us + lookup_key_us + lookup_data_us + output_data_us; |
379 | 0 | auto read_stats = _profile_metrics.read_stats; |
380 | 0 | const std::string stats_str = fmt::format( |
381 | 0 | "[lookup profile:{}us] init:{}us, init_key:{}us," |
382 | 0 | " lookup_key:{}us, load_segments_key:{}us, lookup_data:{}us, load_segments_data:{}us," |
383 | 0 | " output_data:{}us, " |
384 | 0 | "hit_lookup_cache:{}" |
385 | 0 | ", is_binary_row:{}, output_columns:{}, total_keys:{}, row_cache_hits:{}" |
386 | 0 | ", hit_cached_pages:{}, total_pages_read:{}, compressed_bytes_read:{}, " |
387 | 0 | "io_latency:{}ns, " |
388 | 0 | "uncompressed_bytes_read:{}, result_data_bytes:{}, row_hits:{}" |
389 | 0 | ", rs_column_uid:{}, bytes_read_from_local:{}, bytes_read_from_remote:{}, " |
390 | 0 | "local_io_timer:{}, remote_io_timer:{}, local_write_timer:{}", |
391 | 0 | total_us, init_us, init_key_us, lookup_key_us, load_segments_key_us, lookup_data_us, |
392 | 0 | load_segments_data_us, output_data_us, _profile_metrics.hit_lookup_cache, |
393 | 0 | _binary_row_format, _reusable->output_exprs().size(), _row_read_ctxs.size(), |
394 | 0 | _profile_metrics.row_cache_hits, read_stats.cached_pages_num, |
395 | 0 | read_stats.total_pages_num, read_stats.compressed_bytes_read, read_stats.io_ns, |
396 | 0 | read_stats.uncompressed_bytes_read, _profile_metrics.result_data_bytes, _row_hits, |
397 | 0 | _reusable->rs_column_uid(), |
398 | 0 | _profile_metrics.read_stats.file_cache_stats.bytes_read_from_local, |
399 | 0 | _profile_metrics.read_stats.file_cache_stats.bytes_read_from_remote, |
400 | 0 | _profile_metrics.read_stats.file_cache_stats.local_io_timer, |
401 | 0 | _profile_metrics.read_stats.file_cache_stats.remote_io_timer, |
402 | 0 | _profile_metrics.read_stats.file_cache_stats.write_cache_io_timer); |
403 | |
|
404 | 0 | constexpr static int kSlowThreholdUs = 50 * 1000; // 50ms |
405 | 0 | if (total_us > kSlowThreholdUs) { |
406 | 0 | LOG(WARNING) << "slow query, " << stats_str; |
407 | 0 | } else if (VLOG_DEBUG_IS_ON) { |
408 | 0 | VLOG_DEBUG << stats_str; |
409 | 0 | } else { |
410 | 0 | LOG_EVERY_N(INFO, 1000) << stats_str; |
411 | 0 | } |
412 | 0 | } |
413 | | |
414 | 0 | Status PointQueryExecutor::_init_keys(const PTabletKeyLookupRequest* request) { |
415 | 0 | SCOPED_TIMER(&_profile_metrics.init_key_ns); |
416 | 0 | const auto& schema = _tablet->tablet_schema(); |
417 | | // Point query is only supported on merge-on-write unique key tables. |
418 | 0 | DCHECK(schema->keys_type() == UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()); |
419 | 0 | if (schema->keys_type() != UNIQUE_KEYS || !_tablet->enable_unique_key_merge_on_write()) { |
420 | 0 | return Status::InvalidArgument( |
421 | 0 | "Point query is only supported on merge-on-write unique key tables, " |
422 | 0 | "tablet_id={}", |
423 | 0 | _tablet->tablet_id()); |
424 | 0 | } |
425 | | // 1. get primary key from conditions |
426 | 0 | _row_read_ctxs.resize(request->key_tuples().size()); |
427 | | // get row cursor and encode keys |
428 | 0 | for (int i = 0; i < request->key_tuples().size(); ++i) { |
429 | 0 | const KeyTuple& key_tuple = request->key_tuples(i); |
430 | 0 | if (UNLIKELY(cast_set<size_t>(key_tuple.key_column_literals_size()) != |
431 | 0 | schema->num_key_columns())) { |
432 | 0 | return Status::InvalidArgument( |
433 | 0 | "Key column count mismatch. expected={}, actual={}, tablet_id={}", |
434 | 0 | schema->num_key_columns(), key_tuple.key_column_literals_size(), |
435 | 0 | _tablet->tablet_id()); |
436 | 0 | } |
437 | 0 | RowCursor cursor; |
438 | 0 | std::vector<Field> key_fields; |
439 | 0 | key_fields.reserve(key_tuple.key_column_literals_size()); |
440 | 0 | for (int j = 0; j < key_tuple.key_column_literals_size(); ++j) { |
441 | 0 | const auto& literal_bytes = key_tuple.key_column_literals(j); |
442 | 0 | TExprNode expr_node; |
443 | 0 | auto len = cast_set<uint32_t>(literal_bytes.size()); |
444 | 0 | RETURN_IF_ERROR( |
445 | 0 | deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(literal_bytes.data()), |
446 | 0 | &len, false, &expr_node)); |
447 | 0 | const auto& col = schema->column(j); |
448 | 0 | auto data_type = DataTypeFactory::instance().create_data_type( |
449 | 0 | col.type(), col.precision(), col.frac(), col.length()); |
450 | 0 | key_fields.push_back(data_type->get_field(expr_node)); |
451 | 0 | } |
452 | 0 | RETURN_IF_ERROR(cursor.init_scan_key(_tablet->tablet_schema(), std::move(key_fields))); |
453 | 0 | cursor.encode_key_with_padding<true>(&_row_read_ctxs[i]._primary_key, |
454 | 0 | _tablet->tablet_schema()->num_key_columns(), true); |
455 | 0 | } |
456 | 0 | return Status::OK(); |
457 | 0 | } |
458 | | |
459 | 0 | Status PointQueryExecutor::_lookup_row_key() { |
460 | 0 | SCOPED_TIMER(&_profile_metrics.lookup_key_ns); |
461 | | // 2. lookup row location |
462 | 0 | Status st; |
463 | 0 | if (_version >= 0) { |
464 | 0 | CHECK(config::is_cloud_mode()) << "Only cloud mode support snapshot read at present"; |
465 | 0 | SyncOptions options; |
466 | 0 | options.query_version = _version; |
467 | 0 | RETURN_IF_ERROR(std::dynamic_pointer_cast<CloudTablet>(_tablet)->sync_rowsets(options)); |
468 | 0 | } |
469 | 0 | std::vector<RowsetSharedPtr> specified_rowsets; |
470 | 0 | { |
471 | 0 | std::shared_lock rlock(_tablet->get_header_lock()); |
472 | 0 | specified_rowsets = _tablet->get_rowset_by_ids(nullptr); |
473 | 0 | } |
474 | 0 | std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size()); |
475 | 0 | for (size_t i = 0; i < _row_read_ctxs.size(); ++i) { |
476 | 0 | RowLocation location; |
477 | 0 | if (!config::disable_storage_row_cache) { |
478 | 0 | RowCache::CacheHandle cache_handle; |
479 | 0 | auto hit_cache = RowCache::instance()->lookup( |
480 | 0 | {_tablet->tablet_id(), _row_read_ctxs[i]._primary_key}, &cache_handle); |
481 | 0 | if (hit_cache) { |
482 | 0 | _row_read_ctxs[i]._cached_row_data = std::move(cache_handle); |
483 | 0 | ++_profile_metrics.row_cache_hits; |
484 | 0 | continue; |
485 | 0 | } |
486 | 0 | } |
487 | | // Get rowlocation and rowset, ctx._rowset_ptr will acquire wrap this ptr |
488 | 0 | auto rowset_ptr = std::make_unique<RowsetSharedPtr>(); |
489 | 0 | st = (_tablet->lookup_row_key(_row_read_ctxs[i]._primary_key, nullptr, false, |
490 | 0 | specified_rowsets, &location, INT32_MAX /*rethink?*/, |
491 | 0 | segment_caches, rowset_ptr.get(), false, nullptr, |
492 | 0 | &_profile_metrics.read_stats)); |
493 | 0 | if (st.is<ErrorCode::KEY_NOT_FOUND>()) { |
494 | 0 | continue; |
495 | 0 | } |
496 | 0 | RETURN_IF_ERROR(st); |
497 | 0 | _row_read_ctxs[i]._row_location = location; |
498 | | // acquire and wrap this rowset |
499 | 0 | (*rowset_ptr)->acquire(); |
500 | 0 | VLOG_DEBUG << "aquire rowset " << (*rowset_ptr)->rowset_id(); |
501 | 0 | _row_read_ctxs[i]._rowset_ptr = std::unique_ptr<RowsetSharedPtr, decltype(&release_rowset)>( |
502 | 0 | rowset_ptr.release(), &release_rowset); |
503 | 0 | _row_hits++; |
504 | 0 | } |
505 | 0 | return Status::OK(); |
506 | 0 | } |
507 | | |
508 | 0 | Status PointQueryExecutor::_lookup_row_data() { |
509 | | // 3. get values |
510 | 0 | SCOPED_TIMER(&_profile_metrics.lookup_data_ns); |
511 | 0 | { |
512 | 0 | auto result_columns_guard = _result_block->mutate_columns_scoped(); |
513 | 0 | MutableColumns& result_columns = result_columns_guard.mutable_columns(); |
514 | 0 | for (size_t i = 0; i < _row_read_ctxs.size(); ++i) { |
515 | 0 | if (_row_read_ctxs[i]._cached_row_data.valid()) { |
516 | 0 | RETURN_IF_ERROR(JsonbSerializeUtil::jsonb_to_columns( |
517 | 0 | _reusable->get_data_type_serdes(), |
518 | 0 | _row_read_ctxs[i]._cached_row_data.data().data, |
519 | 0 | _row_read_ctxs[i]._cached_row_data.data().size, |
520 | 0 | _reusable->get_col_uid_to_idx(), result_columns, |
521 | 0 | _reusable->get_col_default_values(), _reusable->include_col_uids())); |
522 | 0 | continue; |
523 | 0 | } |
524 | 0 | if (!_row_read_ctxs[i]._row_location.has_value()) { |
525 | 0 | continue; |
526 | 0 | } |
527 | 0 | std::string value; |
528 | | // fill block by row store |
529 | 0 | if (_reusable->rs_column_uid() != -1) { |
530 | 0 | bool use_row_cache = !config::disable_storage_row_cache; |
531 | 0 | RETURN_IF_ERROR(_tablet->lookup_row_data( |
532 | 0 | _row_read_ctxs[i]._primary_key, _row_read_ctxs[i]._row_location.value(), |
533 | 0 | *(_row_read_ctxs[i]._rowset_ptr), _profile_metrics.read_stats, value, |
534 | 0 | use_row_cache)); |
535 | | // serialize value to block, currently only jsonb row format |
536 | 0 | RETURN_IF_ERROR(JsonbSerializeUtil::jsonb_to_columns( |
537 | 0 | _reusable->get_data_type_serdes(), value.data(), value.size(), |
538 | 0 | _reusable->get_col_uid_to_idx(), result_columns, |
539 | 0 | _reusable->get_col_default_values(), _reusable->include_col_uids())); |
540 | 0 | } |
541 | 0 | if (!_reusable->missing_col_uids().empty()) { |
542 | 0 | if (!_reusable->runtime_state()->enable_short_circuit_query_access_column_store()) { |
543 | 0 | std::string missing_columns; |
544 | 0 | for (int cid : _reusable->missing_col_uids()) { |
545 | 0 | missing_columns += |
546 | 0 | _tablet->tablet_schema()->column_by_uid(cid).name() + ","; |
547 | 0 | } |
548 | 0 | return Status::InternalError( |
549 | 0 | "Not support column store, set store_row_column=true or " |
550 | 0 | "row_store_columns in table properties, missing columns: " + |
551 | 0 | missing_columns + " should be added to row store"); |
552 | 0 | } |
553 | | // fill missing columns by column store |
554 | 0 | RowLocation row_loc = _row_read_ctxs[i]._row_location.value(); |
555 | 0 | BetaRowsetSharedPtr rowset = std::static_pointer_cast<BetaRowset>( |
556 | 0 | _tablet->get_rowset(row_loc.rowset_id)); |
557 | 0 | SegmentCacheHandle segment_cache; |
558 | 0 | { |
559 | 0 | SCOPED_TIMER(&_profile_metrics.load_segment_data_stage_ns); |
560 | 0 | RETURN_IF_ERROR( |
561 | 0 | SegmentLoader::instance()->load_segments(rowset, &segment_cache, true)); |
562 | 0 | } |
563 | | // find segment |
564 | 0 | auto it = std::find_if(segment_cache.get_segments().cbegin(), |
565 | 0 | segment_cache.get_segments().cend(), |
566 | 0 | [&](const segment_v2::SegmentSharedPtr& seg) { |
567 | 0 | return seg->id() == row_loc.segment_id; |
568 | 0 | }); |
569 | 0 | const auto& segment = *it; |
570 | 0 | for (int cid : _reusable->missing_col_uids()) { |
571 | 0 | int pos = _reusable->get_col_uid_to_idx().at(cid); |
572 | 0 | std::vector<segment_v2::rowid_t> row_ids { |
573 | 0 | static_cast<segment_v2::rowid_t>(row_loc.row_id)}; |
574 | 0 | auto& column = result_columns[pos]; |
575 | 0 | std::unique_ptr<ColumnIterator> iter; |
576 | 0 | SlotDescriptor* slot = _reusable->tuple_desc()->slots()[pos]; |
577 | 0 | StorageReadOptions storage_read_options; |
578 | 0 | storage_read_options.stats = &_read_stats; |
579 | 0 | storage_read_options.io_ctx.reader_type = ReaderType::READER_QUERY; |
580 | 0 | RETURN_IF_ERROR(segment->seek_and_read_by_rowid(*_tablet->tablet_schema(), slot, |
581 | 0 | row_ids, column, |
582 | 0 | storage_read_options, iter)); |
583 | 0 | } |
584 | 0 | } |
585 | 0 | } |
586 | 0 | if (result_columns.size() > _reusable->include_col_uids().size()) { |
587 | | // Padding rows for some columns that no need to output to mysql client |
588 | | // eg. SELECT k1,v1,v2 FROM TABLE WHERE k1 = 1, k1 is not in output slots, tuple as bellow |
589 | | // TupleDescriptor{id=1, tbl=table_with_column_group} |
590 | | // SlotDescriptor{id=8, col=v1, colUniqueId=1 ...} |
591 | | // SlotDescriptor{id=9, col=v2, colUniqueId=2 ...} |
592 | | // thus missing in include_col_uids and missing_col_uids |
593 | 0 | for (auto& column : result_columns) { |
594 | 0 | int padding_rows = _row_hits - cast_set<int>(column->size()); |
595 | 0 | if (padding_rows > 0) { |
596 | 0 | column->insert_many_defaults(padding_rows); |
597 | 0 | } |
598 | 0 | } |
599 | 0 | } |
600 | 0 | } |
601 | | // filter rows by delete sign |
602 | 0 | if (_row_hits > 0 && _reusable->delete_sign_idx() != -1) { |
603 | 0 | size_t filtered = 0; |
604 | 0 | size_t total = 0; |
605 | 0 | { |
606 | | // clear_column_data will check reference of ColumnPtr, so we need to release |
607 | | // reference before clear_column_data |
608 | 0 | ColumnPtr delete_filter_columns = |
609 | 0 | _result_block->get_columns()[_reusable->delete_sign_idx()]; |
610 | 0 | const auto& filter = |
611 | 0 | assert_cast<const ColumnInt8*>(delete_filter_columns.get())->get_data(); |
612 | 0 | filtered = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size()); |
613 | 0 | total = filter.size(); |
614 | 0 | } |
615 | |
|
616 | 0 | if (filtered == total) { |
617 | 0 | _result_block->clear_column_data(); |
618 | 0 | } else if (filtered > 0) { |
619 | 0 | return Status::NotSupported("Not implemented since only single row at present"); |
620 | 0 | } |
621 | 0 | } |
622 | 0 | return Status::OK(); |
623 | 0 | } |
624 | | |
625 | 0 | Status serialize_block(std::shared_ptr<TFetchDataResult> res, PTabletKeyLookupResponse* response) { |
626 | 0 | uint8_t* buf = nullptr; |
627 | 0 | uint32_t len = 0; |
628 | 0 | ThriftSerializer ser(false, 4096); |
629 | 0 | RETURN_IF_ERROR(ser.serialize(&(res->result_batch), &len, &buf)); |
630 | 0 | response->set_row_batch(std::string((const char*)buf, len)); |
631 | 0 | return Status::OK(); |
632 | 0 | } |
633 | | |
634 | 0 | Status PointQueryExecutor::_output_data() { |
635 | | // 4. exprs exec and serialize to mysql row batches |
636 | 0 | SCOPED_TIMER(&_profile_metrics.output_data_ns); |
637 | 0 | if (_result_block->rows()) { |
638 | 0 | RuntimeState state; |
639 | 0 | auto buffer = std::make_shared<PointQueryResultBlockBuffer>(&state); |
640 | | // TODO reuse mysql_writer |
641 | 0 | VMysqlResultWriter mysql_writer(buffer, _reusable->output_exprs(), nullptr, |
642 | 0 | _binary_row_format); |
643 | 0 | RETURN_IF_ERROR(mysql_writer.init(_reusable->runtime_state())); |
644 | 0 | _result_block->clear_names(); |
645 | 0 | RETURN_IF_ERROR(mysql_writer.write(_reusable->runtime_state(), *_result_block)); |
646 | 0 | RETURN_IF_ERROR(serialize_block(buffer->get_block(), _response)); |
647 | 0 | VLOG_DEBUG << "dump block " << _result_block->dump_data(); |
648 | 0 | } else { |
649 | 0 | _response->set_empty_batch(true); |
650 | 0 | } |
651 | 0 | _profile_metrics.result_data_bytes = _result_block->bytes(); |
652 | 0 | _reusable->return_block(_result_block); |
653 | 0 | return Status::OK(); |
654 | 0 | } |
655 | | |
656 | | } // namespace doris |