be/src/service/point_query_executor.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "service/point_query_executor.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/Descriptors_types.h> |
22 | | #include <gen_cpp/Exprs_types.h> |
23 | | #include <gen_cpp/PaloInternalService_types.h> |
24 | | #include <gen_cpp/internal_service.pb.h> |
25 | | #include <glog/logging.h> |
26 | | #include <google/protobuf/extension_set.h> |
27 | | #include <stdlib.h> |
28 | | |
29 | | #include <memory> |
30 | | #include <unordered_map> |
31 | | #include <vector> |
32 | | |
33 | | #include "cloud/cloud_tablet.h" |
34 | | #include "cloud/config.h" |
35 | | #include "common/cast_set.h" |
36 | | #include "common/consts.h" |
37 | | #include "common/status.h" |
38 | | #include "core/data_type_serde/data_type_serde.h" |
39 | | #include "exec/sink/writer/vmysql_result_writer.h" |
40 | | #include "exprs/vexpr.h" |
41 | | #include "exprs/vexpr_context.h" |
42 | | #include "exprs/vexpr_fwd.h" |
43 | | #include "exprs/vslot_ref.h" |
44 | | #include "runtime/descriptors.h" |
45 | | #include "runtime/exec_env.h" |
46 | | #include "runtime/result_block_buffer.h" |
47 | | #include "runtime/runtime_profile.h" |
48 | | #include "runtime/runtime_state.h" |
49 | | #include "runtime/thread_context.h" |
50 | | #include "storage/olap_tuple.h" |
51 | | #include "storage/row_cursor.h" |
52 | | #include "storage/rowset/beta_rowset.h" |
53 | | #include "storage/rowset/rowset_fwd.h" |
54 | | #include "storage/segment/column_reader.h" |
55 | | #include "storage/tablet/tablet_schema.h" |
56 | | #include "storage/utils.h" |
57 | | #include "util/jsonb/serialize.h" |
58 | | #include "util/lru_cache.h" |
59 | | #include "util/simd/bits.h" |
60 | | #include "util/thrift_util.h" |
61 | | |
62 | | namespace doris { |
63 | | |
64 | | #include "common/compile_check_begin.h" |
65 | | |
66 | | class PointQueryResultBlockBuffer final : public MySQLResultBlockBuffer { |
67 | | public: |
68 | 0 | PointQueryResultBlockBuffer(RuntimeState* state) : MySQLResultBlockBuffer(state) {} |
69 | 0 | ~PointQueryResultBlockBuffer() override = default; |
70 | 0 | std::shared_ptr<TFetchDataResult> get_block() { |
71 | 0 | std::lock_guard<std::mutex> l(_lock); |
72 | 0 | DCHECK_EQ(_result_batch_queue.size(), 1); |
73 | 0 | auto result = std::move(_result_batch_queue.front()); |
74 | 0 | _result_batch_queue.pop_front(); |
75 | 0 | return result; |
76 | 0 | } |
77 | | }; |
78 | | |
79 | 4.09k | Reusable::~Reusable() = default; |
80 | | |
81 | | // get missing and include column ids |
82 | | // input include_cids : the output expr slots columns unique ids |
83 | | // missing_cids : the output expr columns that not in row columns cids |
84 | | static void get_missing_and_include_cids(const TabletSchema& schema, |
85 | | const std::vector<SlotDescriptor*>& slots, |
86 | | int target_rs_column_id, |
87 | | std::unordered_set<int>& missing_cids, |
88 | 4.04k | std::unordered_set<int>& include_cids) { |
89 | 4.04k | missing_cids.clear(); |
90 | 4.04k | include_cids.clear(); |
91 | 4.04k | for (auto* slot : slots) { |
92 | 4.04k | missing_cids.insert(slot->col_unique_id()); |
93 | 4.04k | } |
94 | | // insert delete sign column id |
95 | 4.04k | missing_cids.insert(schema.columns()[schema.delete_sign_idx()]->unique_id()); |
96 | 4.04k | if (target_rs_column_id == -1) { |
97 | | // no row store columns |
98 | 4.04k | return; |
99 | 4.04k | } |
100 | 0 | const TabletColumn& target_rs_column = schema.column_by_uid(target_rs_column_id); |
101 | 0 | DCHECK(target_rs_column.is_row_store_column()); |
102 | | // The full column group is considered a full match, thus no missing cids |
103 | 0 | if (schema.row_columns_uids().empty()) { |
104 | 0 | missing_cids.clear(); |
105 | 0 | return; |
106 | 0 | } |
107 | 0 | for (int cid : schema.row_columns_uids()) { |
108 | 0 | missing_cids.erase(cid); |
109 | 0 | include_cids.insert(cid); |
110 | 0 | } |
111 | 0 | } |
112 | | |
113 | | constexpr static int s_preallocted_blocks_num = 32; |
114 | | |
115 | | static void extract_slot_ref(const VExprSPtr& expr, TupleDescriptor* tuple_desc, |
116 | 4.04k | std::vector<SlotDescriptor*>& slots) { |
117 | 4.04k | const auto& children = expr->children(); |
118 | 4.04k | for (const auto& i : children) { |
119 | 0 | extract_slot_ref(i, tuple_desc, slots); |
120 | 0 | } |
121 | | |
122 | 4.04k | auto node_type = expr->node_type(); |
123 | 4.04k | if (node_type == TExprNodeType::SLOT_REF) { |
124 | 4.04k | int column_id = static_cast<const VSlotRef*>(expr.get())->column_id(); |
125 | 4.04k | auto* slot_desc = tuple_desc->slots()[column_id]; |
126 | 4.04k | slots.push_back(slot_desc); |
127 | 4.04k | } |
128 | 4.04k | } |
129 | | |
130 | | Status Reusable::init(const TDescriptorTable& t_desc_tbl, const std::vector<TExpr>& output_exprs, |
131 | | const TQueryOptions& query_options, const TabletSchema& schema, |
132 | 4.04k | size_t block_size) { |
133 | 4.04k | _runtime_state = RuntimeState::create_unique(); |
134 | 4.04k | _runtime_state->set_query_options(query_options); |
135 | 4.04k | RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), t_desc_tbl, &_desc_tbl)); |
136 | 4.04k | _runtime_state->set_desc_tbl(_desc_tbl); |
137 | 4.04k | _block_pool.resize(block_size); |
138 | 8.09k | for (auto& i : _block_pool) { |
139 | 8.09k | i = Block::create_unique(tuple_desc()->slots(), 2); |
140 | | // Name is useless but cost space |
141 | 8.09k | i->clear_names(); |
142 | 8.09k | } |
143 | | |
144 | 4.04k | RETURN_IF_ERROR(VExpr::create_expr_trees(output_exprs, _output_exprs_ctxs)); |
145 | 4.04k | RowDescriptor row_desc(tuple_desc()); |
146 | | // Prepare the exprs to run. |
147 | 4.04k | RETURN_IF_ERROR(VExpr::prepare(_output_exprs_ctxs, _runtime_state.get(), row_desc)); |
148 | 4.04k | RETURN_IF_ERROR(VExpr::open(_output_exprs_ctxs, _runtime_state.get())); |
149 | 4.04k | _create_timestamp = butil::gettimeofday_ms(); |
150 | 4.04k | _data_type_serdes = create_data_type_serdes(tuple_desc()->slots()); |
151 | 4.04k | _col_default_values.resize(tuple_desc()->slots().size()); |
152 | 4.04k | bool has_delete_sign = false; |
153 | 93.0k | for (int i = 0; i < tuple_desc()->slots().size(); ++i) { |
154 | 88.9k | auto* slot = tuple_desc()->slots()[i]; |
155 | 88.9k | if (slot->col_name() == DELETE_SIGN) { |
156 | 0 | has_delete_sign = true; |
157 | 0 | } |
158 | 88.9k | _col_uid_to_idx[slot->col_unique_id()] = i; |
159 | 88.9k | _col_default_values[i] = slot->col_default_value(); |
160 | 88.9k | } |
161 | | |
162 | | // Get the output slot descriptors |
163 | 4.04k | std::vector<SlotDescriptor*> output_slot_descs; |
164 | 4.04k | for (const auto& expr : _output_exprs_ctxs) { |
165 | 4.04k | extract_slot_ref(expr->root(), tuple_desc(), output_slot_descs); |
166 | 4.04k | } |
167 | | |
168 | | // get the delete sign idx in block |
169 | 4.04k | if (has_delete_sign) { |
170 | 0 | _delete_sign_idx = _col_uid_to_idx[schema.columns()[schema.delete_sign_idx()]->unique_id()]; |
171 | 0 | } |
172 | | |
173 | 4.04k | if (schema.have_column(BeConsts::ROW_STORE_COL)) { |
174 | 0 | const auto& column = *DORIS_TRY(schema.column(BeConsts::ROW_STORE_COL)); |
175 | 0 | _row_store_column_ids = column.unique_id(); |
176 | 0 | } |
177 | 4.04k | get_missing_and_include_cids(schema, output_slot_descs, _row_store_column_ids, |
178 | 4.04k | _missing_col_uids, _include_col_uids); |
179 | | |
180 | 4.04k | return Status::OK(); |
181 | 4.04k | } |
182 | | |
183 | 0 | std::unique_ptr<Block> Reusable::get_block() { |
184 | 0 | std::lock_guard lock(_block_mutex); |
185 | 0 | if (_block_pool.empty()) { |
186 | 0 | auto block = Block::create_unique(tuple_desc()->slots(), 2); |
187 | | // Name is useless but cost space |
188 | 0 | block->clear_names(); |
189 | 0 | return block; |
190 | 0 | } |
191 | 0 | auto block = std::move(_block_pool.back()); |
192 | 0 | CHECK(block != nullptr); |
193 | 0 | _block_pool.pop_back(); |
194 | 0 | return block; |
195 | 0 | } |
196 | | |
197 | 0 | void Reusable::return_block(std::unique_ptr<Block>& block) { |
198 | 0 | std::lock_guard lock(_block_mutex); |
199 | 0 | if (block == nullptr) { |
200 | 0 | return; |
201 | 0 | } |
202 | 0 | block->clear_column_data(); |
203 | 0 | _block_pool.push_back(std::move(block)); |
204 | 0 | if (_block_pool.size() > s_preallocted_blocks_num) { |
205 | 0 | _block_pool.resize(s_preallocted_blocks_num); |
206 | 0 | } |
207 | 0 | } |
208 | | |
209 | 0 | LookupConnectionCache* LookupConnectionCache::create_global_instance(size_t capacity) { |
210 | 0 | DCHECK(ExecEnv::GetInstance()->get_lookup_connection_cache() == nullptr); |
211 | 0 | auto* res = new LookupConnectionCache(capacity); |
212 | 0 | return res; |
213 | 0 | } |
214 | | |
215 | | RowCache::RowCache(int64_t capacity, int num_shards) |
216 | 3 | : LRUCachePolicy(CachePolicy::CacheType::POINT_QUERY_ROW_CACHE, capacity, |
217 | 3 | LRUCacheType::SIZE, config::point_query_row_cache_stale_sweep_time_sec, |
218 | 3 | num_shards, /*element count capacity */ 0, |
219 | 3 | /*enable prune*/ true, /*is lru-k*/ true) {} |
220 | | |
221 | | // Create global instance of this class |
222 | 0 | RowCache* RowCache::create_global_cache(int64_t capacity, uint32_t num_shards) { |
223 | 0 | DCHECK(ExecEnv::GetInstance()->get_row_cache() == nullptr); |
224 | 0 | auto* res = new RowCache(capacity, num_shards); |
225 | 0 | return res; |
226 | 0 | } |
227 | | |
228 | 0 | RowCache* RowCache::instance() { |
229 | 0 | return ExecEnv::GetInstance()->get_row_cache(); |
230 | 0 | } |
231 | | |
232 | 4 | bool RowCache::lookup(const RowCacheKey& key, CacheHandle* handle) { |
233 | 4 | const std::string& encoded_key = key.encode(); |
234 | 4 | auto* lru_handle = LRUCachePolicy::lookup(encoded_key); |
235 | 4 | if (!lru_handle) { |
236 | | // cache miss |
237 | 3 | return false; |
238 | 3 | } |
239 | 1 | *handle = CacheHandle(this, lru_handle); |
240 | 1 | return true; |
241 | 4 | } |
242 | | |
243 | 11 | void RowCache::insert(const RowCacheKey& key, const Slice& value) { |
244 | 11 | char* cache_value = static_cast<char*>(malloc(value.size)); |
245 | 11 | memcpy(cache_value, value.data, value.size); |
246 | 11 | auto* row_cache_value = new RowCacheValue; |
247 | 11 | row_cache_value->cache_value = cache_value; |
248 | 11 | const std::string& encoded_key = key.encode(); |
249 | 11 | auto* handle = LRUCachePolicy::insert(encoded_key, row_cache_value, value.size, value.size, |
250 | 11 | CachePriority::NORMAL); |
251 | | // handle will released |
252 | 11 | auto tmp = CacheHandle {this, handle}; |
253 | 11 | } |
254 | | |
255 | 1 | void RowCache::erase(const RowCacheKey& key) { |
256 | 1 | const std::string& encoded_key = key.encode(); |
257 | 1 | LRUCachePolicy::erase(encoded_key); |
258 | 1 | } |
259 | | |
260 | 2.09k | LookupConnectionCache::CacheValue::~CacheValue() { |
261 | 2.09k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
262 | 2.09k | ExecEnv::GetInstance()->point_query_executor_mem_tracker()); |
263 | 2.09k | item.reset(); |
264 | 2.09k | } |
265 | | |
266 | 0 | PointQueryExecutor::~PointQueryExecutor() { |
267 | 0 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
268 | 0 | ExecEnv::GetInstance()->point_query_executor_mem_tracker()); |
269 | 0 | _tablet.reset(); |
270 | 0 | _reusable.reset(); |
271 | 0 | _result_block.reset(); |
272 | 0 | _row_read_ctxs.clear(); |
273 | 0 | } |
274 | | |
275 | | Status PointQueryExecutor::init(const PTabletKeyLookupRequest* request, |
276 | 0 | PTabletKeyLookupResponse* response) { |
277 | 0 | SCOPED_TIMER(&_profile_metrics.init_ns); |
278 | 0 | _response = response; |
279 | | // using cache |
280 | 0 | __int128_t uuid = |
281 | 0 | static_cast<__int128_t>(request->uuid().uuid_high()) << 64 | request->uuid().uuid_low(); |
282 | 0 | SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->point_query_executor_mem_tracker()); |
283 | 0 | auto cache_handle = LookupConnectionCache::instance()->get(uuid); |
284 | 0 | _binary_row_format = request->is_binary_row(); |
285 | 0 | _tablet = DORIS_TRY(ExecEnv::get_tablet(request->tablet_id())); |
286 | 0 | if (cache_handle != nullptr) { |
287 | 0 | _reusable = cache_handle; |
288 | 0 | _profile_metrics.hit_lookup_cache = true; |
289 | 0 | } else { |
290 | | // init handle |
291 | 0 | auto reusable_ptr = std::make_shared<Reusable>(); |
292 | 0 | TDescriptorTable t_desc_tbl; |
293 | 0 | TExprList t_output_exprs; |
294 | 0 | auto len = cast_set<uint32_t>(request->desc_tbl().size()); |
295 | 0 | RETURN_IF_ERROR( |
296 | 0 | deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(request->desc_tbl().data()), |
297 | 0 | &len, false, &t_desc_tbl)); |
298 | 0 | len = cast_set<uint32_t>(request->output_expr().size()); |
299 | 0 | RETURN_IF_ERROR(deserialize_thrift_msg( |
300 | 0 | reinterpret_cast<const uint8_t*>(request->output_expr().data()), &len, false, |
301 | 0 | &t_output_exprs)); |
302 | 0 | _reusable = reusable_ptr; |
303 | 0 | TQueryOptions t_query_options; |
304 | 0 | len = cast_set<uint32_t>(request->query_options().size()); |
305 | 0 | if (request->has_query_options()) { |
306 | 0 | RETURN_IF_ERROR(deserialize_thrift_msg( |
307 | 0 | reinterpret_cast<const uint8_t*>(request->query_options().data()), &len, false, |
308 | 0 | &t_query_options)); |
309 | 0 | } |
310 | 0 | if (uuid != 0) { |
311 | | // could be reused by requests after, pre allocte more blocks |
312 | 0 | RETURN_IF_ERROR(reusable_ptr->init(t_desc_tbl, t_output_exprs.exprs, t_query_options, |
313 | 0 | *_tablet->tablet_schema(), |
314 | 0 | s_preallocted_blocks_num)); |
315 | 0 | LookupConnectionCache::instance()->add(uuid, reusable_ptr); |
316 | 0 | } else { |
317 | 0 | RETURN_IF_ERROR(reusable_ptr->init(t_desc_tbl, t_output_exprs.exprs, t_query_options, |
318 | 0 | *_tablet->tablet_schema(), 1)); |
319 | 0 | } |
320 | 0 | } |
321 | | // Set timezone from request for functions like from_unixtime() |
322 | 0 | if (request->has_time_zone() && !request->time_zone().empty()) { |
323 | 0 | _reusable->runtime_state()->set_timezone(request->time_zone()); |
324 | 0 | } |
325 | 0 | if (request->has_version() && request->version() >= 0) { |
326 | 0 | _version = request->version(); |
327 | 0 | } |
328 | 0 | RETURN_IF_ERROR(_init_keys(request)); |
329 | 0 | _result_block = _reusable->get_block(); |
330 | 0 | CHECK(_result_block != nullptr); |
331 | |
|
332 | 0 | return Status::OK(); |
333 | 0 | } |
334 | | |
335 | 0 | Status PointQueryExecutor::lookup_up() { |
336 | 0 | SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->point_query_executor_mem_tracker()); |
337 | 0 | RETURN_IF_ERROR(_lookup_row_key()); |
338 | 0 | RETURN_IF_ERROR(_lookup_row_data()); |
339 | 0 | RETURN_IF_ERROR(_output_data()); |
340 | 0 | return Status::OK(); |
341 | 0 | } |
342 | | |
343 | 0 | void PointQueryExecutor::print_profile() { |
344 | 0 | auto init_us = _profile_metrics.init_ns.value() / 1000; |
345 | 0 | auto init_key_us = _profile_metrics.init_key_ns.value() / 1000; |
346 | 0 | auto lookup_key_us = _profile_metrics.lookup_key_ns.value() / 1000; |
347 | 0 | auto lookup_data_us = _profile_metrics.lookup_data_ns.value() / 1000; |
348 | 0 | auto output_data_us = _profile_metrics.output_data_ns.value() / 1000; |
349 | 0 | auto load_segments_key_us = _profile_metrics.load_segment_key_stage_ns.value() / 1000; |
350 | 0 | auto load_segments_data_us = _profile_metrics.load_segment_data_stage_ns.value() / 1000; |
351 | 0 | auto total_us = init_us + lookup_key_us + lookup_data_us + output_data_us; |
352 | 0 | auto read_stats = _profile_metrics.read_stats; |
353 | 0 | const std::string stats_str = fmt::format( |
354 | 0 | "[lookup profile:{}us] init:{}us, init_key:{}us," |
355 | 0 | " lookup_key:{}us, load_segments_key:{}us, lookup_data:{}us, load_segments_data:{}us," |
356 | 0 | " output_data:{}us, " |
357 | 0 | "hit_lookup_cache:{}" |
358 | 0 | ", is_binary_row:{}, output_columns:{}, total_keys:{}, row_cache_hits:{}" |
359 | 0 | ", hit_cached_pages:{}, total_pages_read:{}, compressed_bytes_read:{}, " |
360 | 0 | "io_latency:{}ns, " |
361 | 0 | "uncompressed_bytes_read:{}, result_data_bytes:{}, row_hits:{}" |
362 | 0 | ", rs_column_uid:{}, bytes_read_from_local:{}, bytes_read_from_remote:{}, " |
363 | 0 | "local_io_timer:{}, remote_io_timer:{}, local_write_timer:{}", |
364 | 0 | total_us, init_us, init_key_us, lookup_key_us, load_segments_key_us, lookup_data_us, |
365 | 0 | load_segments_data_us, output_data_us, _profile_metrics.hit_lookup_cache, |
366 | 0 | _binary_row_format, _reusable->output_exprs().size(), _row_read_ctxs.size(), |
367 | 0 | _profile_metrics.row_cache_hits, read_stats.cached_pages_num, |
368 | 0 | read_stats.total_pages_num, read_stats.compressed_bytes_read, read_stats.io_ns, |
369 | 0 | read_stats.uncompressed_bytes_read, _profile_metrics.result_data_bytes, _row_hits, |
370 | 0 | _reusable->rs_column_uid(), |
371 | 0 | _profile_metrics.read_stats.file_cache_stats.bytes_read_from_local, |
372 | 0 | _profile_metrics.read_stats.file_cache_stats.bytes_read_from_remote, |
373 | 0 | _profile_metrics.read_stats.file_cache_stats.local_io_timer, |
374 | 0 | _profile_metrics.read_stats.file_cache_stats.remote_io_timer, |
375 | 0 | _profile_metrics.read_stats.file_cache_stats.write_cache_io_timer); |
376 | |
|
377 | 0 | constexpr static int kSlowThreholdUs = 50 * 1000; // 50ms |
378 | 0 | if (total_us > kSlowThreholdUs) { |
379 | 0 | LOG(WARNING) << "slow query, " << stats_str; |
380 | 0 | } else if (VLOG_DEBUG_IS_ON) { |
381 | 0 | VLOG_DEBUG << stats_str; |
382 | 0 | } else { |
383 | 0 | LOG_EVERY_N(INFO, 1000) << stats_str; |
384 | 0 | } |
385 | 0 | } |
386 | | |
387 | 0 | Status PointQueryExecutor::_init_keys(const PTabletKeyLookupRequest* request) { |
388 | 0 | SCOPED_TIMER(&_profile_metrics.init_key_ns); |
389 | | // 1. get primary key from conditions |
390 | 0 | std::vector<OlapTuple> olap_tuples; |
391 | 0 | olap_tuples.resize(request->key_tuples().size()); |
392 | 0 | for (int i = 0; i < request->key_tuples().size(); ++i) { |
393 | 0 | const KeyTuple& key_tuple = request->key_tuples(i); |
394 | 0 | for (const std::string& key_col : key_tuple.key_column_rep()) { |
395 | 0 | olap_tuples[i].add_value(key_col); |
396 | 0 | } |
397 | 0 | } |
398 | 0 | _row_read_ctxs.resize(olap_tuples.size()); |
399 | | // get row cursor and encode keys |
400 | 0 | for (size_t i = 0; i < olap_tuples.size(); ++i) { |
401 | 0 | RowCursor cursor; |
402 | 0 | RETURN_IF_ERROR(cursor.init_scan_key(_tablet->tablet_schema(), olap_tuples[i].values())); |
403 | 0 | RETURN_IF_ERROR(cursor.from_tuple(olap_tuples[i])); |
404 | 0 | cursor.encode_key_with_padding<true>(&_row_read_ctxs[i]._primary_key, |
405 | 0 | _tablet->tablet_schema()->num_key_columns(), true); |
406 | 0 | } |
407 | 0 | return Status::OK(); |
408 | 0 | } |
409 | | |
410 | 0 | Status PointQueryExecutor::_lookup_row_key() { |
411 | 0 | SCOPED_TIMER(&_profile_metrics.lookup_key_ns); |
412 | | // 2. lookup row location |
413 | 0 | Status st; |
414 | 0 | if (_version >= 0) { |
415 | 0 | CHECK(config::is_cloud_mode()) << "Only cloud mode support snapshot read at present"; |
416 | 0 | SyncOptions options; |
417 | 0 | options.query_version = _version; |
418 | 0 | RETURN_IF_ERROR(std::dynamic_pointer_cast<CloudTablet>(_tablet)->sync_rowsets(options)); |
419 | 0 | } |
420 | 0 | std::vector<RowsetSharedPtr> specified_rowsets; |
421 | 0 | { |
422 | 0 | std::shared_lock rlock(_tablet->get_header_lock()); |
423 | 0 | specified_rowsets = _tablet->get_rowset_by_ids(nullptr); |
424 | 0 | } |
425 | 0 | std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size()); |
426 | 0 | for (size_t i = 0; i < _row_read_ctxs.size(); ++i) { |
427 | 0 | RowLocation location; |
428 | 0 | if (!config::disable_storage_row_cache) { |
429 | 0 | RowCache::CacheHandle cache_handle; |
430 | 0 | auto hit_cache = RowCache::instance()->lookup( |
431 | 0 | {_tablet->tablet_id(), _row_read_ctxs[i]._primary_key}, &cache_handle); |
432 | 0 | if (hit_cache) { |
433 | 0 | _row_read_ctxs[i]._cached_row_data = std::move(cache_handle); |
434 | 0 | ++_profile_metrics.row_cache_hits; |
435 | 0 | continue; |
436 | 0 | } |
437 | 0 | } |
438 | | // Get rowlocation and rowset, ctx._rowset_ptr will acquire wrap this ptr |
439 | 0 | auto rowset_ptr = std::make_unique<RowsetSharedPtr>(); |
440 | 0 | st = (_tablet->lookup_row_key(_row_read_ctxs[i]._primary_key, nullptr, false, |
441 | 0 | specified_rowsets, &location, INT32_MAX /*rethink?*/, |
442 | 0 | segment_caches, rowset_ptr.get(), false, nullptr, |
443 | 0 | &_profile_metrics.read_stats)); |
444 | 0 | if (st.is<ErrorCode::KEY_NOT_FOUND>()) { |
445 | 0 | continue; |
446 | 0 | } |
447 | 0 | RETURN_IF_ERROR(st); |
448 | 0 | _row_read_ctxs[i]._row_location = location; |
449 | | // acquire and wrap this rowset |
450 | 0 | (*rowset_ptr)->acquire(); |
451 | 0 | VLOG_DEBUG << "aquire rowset " << (*rowset_ptr)->rowset_id(); |
452 | 0 | _row_read_ctxs[i]._rowset_ptr = std::unique_ptr<RowsetSharedPtr, decltype(&release_rowset)>( |
453 | 0 | rowset_ptr.release(), &release_rowset); |
454 | 0 | _row_hits++; |
455 | 0 | } |
456 | 0 | return Status::OK(); |
457 | 0 | } |
458 | | |
459 | 0 | Status PointQueryExecutor::_lookup_row_data() { |
460 | | // 3. get values |
461 | 0 | SCOPED_TIMER(&_profile_metrics.lookup_data_ns); |
462 | 0 | for (size_t i = 0; i < _row_read_ctxs.size(); ++i) { |
463 | 0 | if (_row_read_ctxs[i]._cached_row_data.valid()) { |
464 | 0 | RETURN_IF_ERROR(JsonbSerializeUtil::jsonb_to_block( |
465 | 0 | _reusable->get_data_type_serdes(), |
466 | 0 | _row_read_ctxs[i]._cached_row_data.data().data, |
467 | 0 | _row_read_ctxs[i]._cached_row_data.data().size, _reusable->get_col_uid_to_idx(), |
468 | 0 | *_result_block, _reusable->get_col_default_values(), |
469 | 0 | _reusable->include_col_uids())); |
470 | 0 | continue; |
471 | 0 | } |
472 | 0 | if (!_row_read_ctxs[i]._row_location.has_value()) { |
473 | 0 | continue; |
474 | 0 | } |
475 | 0 | std::string value; |
476 | | // fill block by row store |
477 | 0 | if (_reusable->rs_column_uid() != -1) { |
478 | 0 | bool use_row_cache = !config::disable_storage_row_cache; |
479 | 0 | RETURN_IF_ERROR(_tablet->lookup_row_data( |
480 | 0 | _row_read_ctxs[i]._primary_key, _row_read_ctxs[i]._row_location.value(), |
481 | 0 | *(_row_read_ctxs[i]._rowset_ptr), _profile_metrics.read_stats, value, |
482 | 0 | use_row_cache)); |
483 | | // serilize value to block, currently only jsonb row formt |
484 | 0 | RETURN_IF_ERROR(JsonbSerializeUtil::jsonb_to_block( |
485 | 0 | _reusable->get_data_type_serdes(), value.data(), value.size(), |
486 | 0 | _reusable->get_col_uid_to_idx(), *_result_block, |
487 | 0 | _reusable->get_col_default_values(), _reusable->include_col_uids())); |
488 | 0 | } |
489 | 0 | if (!_reusable->missing_col_uids().empty()) { |
490 | 0 | if (!_reusable->runtime_state()->enable_short_circuit_query_access_column_store()) { |
491 | 0 | std::string missing_columns; |
492 | 0 | for (int cid : _reusable->missing_col_uids()) { |
493 | 0 | missing_columns += _tablet->tablet_schema()->column_by_uid(cid).name() + ","; |
494 | 0 | } |
495 | 0 | return Status::InternalError( |
496 | 0 | "Not support column store, set store_row_column=true or row_store_columns " |
497 | 0 | "in table " |
498 | 0 | "properties, missing columns: " + |
499 | 0 | missing_columns + " should be added to row store"); |
500 | 0 | } |
501 | | // fill missing columns by column store |
502 | 0 | RowLocation row_loc = _row_read_ctxs[i]._row_location.value(); |
503 | 0 | BetaRowsetSharedPtr rowset = |
504 | 0 | std::static_pointer_cast<BetaRowset>(_tablet->get_rowset(row_loc.rowset_id)); |
505 | 0 | SegmentCacheHandle segment_cache; |
506 | 0 | { |
507 | 0 | SCOPED_TIMER(&_profile_metrics.load_segment_data_stage_ns); |
508 | 0 | RETURN_IF_ERROR( |
509 | 0 | SegmentLoader::instance()->load_segments(rowset, &segment_cache, true)); |
510 | 0 | } |
511 | | // find segment |
512 | 0 | auto it = std::find_if(segment_cache.get_segments().cbegin(), |
513 | 0 | segment_cache.get_segments().cend(), |
514 | 0 | [&](const segment_v2::SegmentSharedPtr& seg) { |
515 | 0 | return seg->id() == row_loc.segment_id; |
516 | 0 | }); |
517 | 0 | const auto& segment = *it; |
518 | 0 | for (int cid : _reusable->missing_col_uids()) { |
519 | 0 | int pos = _reusable->get_col_uid_to_idx().at(cid); |
520 | 0 | auto row_id = static_cast<segment_v2::rowid_t>(row_loc.row_id); |
521 | 0 | MutableColumnPtr column = |
522 | 0 | _result_block->get_by_position(pos).column->assume_mutable(); |
523 | 0 | std::unique_ptr<ColumnIterator> iter; |
524 | 0 | SlotDescriptor* slot = _reusable->tuple_desc()->slots()[pos]; |
525 | 0 | StorageReadOptions storage_read_options; |
526 | 0 | storage_read_options.stats = &_read_stats; |
527 | 0 | storage_read_options.io_ctx.reader_type = ReaderType::READER_QUERY; |
528 | 0 | RETURN_IF_ERROR(segment->seek_and_read_by_rowid(*_tablet->tablet_schema(), slot, |
529 | 0 | row_id, column, |
530 | 0 | storage_read_options, iter)); |
531 | 0 | if (_tablet->tablet_schema() |
532 | 0 | ->column_by_uid(slot->col_unique_id()) |
533 | 0 | .has_char_type()) { |
534 | 0 | column->shrink_padding_chars(); |
535 | 0 | } |
536 | 0 | } |
537 | 0 | } |
538 | 0 | } |
539 | 0 | if (_result_block->columns() > _reusable->include_col_uids().size()) { |
540 | | // Padding rows for some columns that no need to output to mysql client |
541 | | // eg. SELECT k1,v1,v2 FROM TABLE WHERE k1 = 1, k1 is not in output slots, tuple as bellow |
542 | | // TupleDescriptor{id=1, tbl=table_with_column_group} |
543 | | // SlotDescriptor{id=8, col=v1, colUniqueId=1 ...} |
544 | | // SlotDescriptor{id=9, col=v2, colUniqueId=2 ...} |
545 | | // thus missing in include_col_uids and missing_col_uids |
546 | 0 | for (size_t i = 0; i < _result_block->columns(); ++i) { |
547 | 0 | auto column = _result_block->get_by_position(i).column; |
548 | 0 | int padding_rows = _row_hits - cast_set<int>(column->size()); |
549 | 0 | if (padding_rows > 0) { |
550 | 0 | column->assume_mutable()->insert_many_defaults(padding_rows); |
551 | 0 | } |
552 | 0 | } |
553 | 0 | } |
554 | | // filter rows by delete sign |
555 | 0 | if (_row_hits > 0 && _reusable->delete_sign_idx() != -1) { |
556 | 0 | size_t filtered = 0; |
557 | 0 | size_t total = 0; |
558 | 0 | { |
559 | | // clear_column_data will check reference of ColumnPtr, so we need to release |
560 | | // reference before clear_column_data |
561 | 0 | ColumnPtr delete_filter_columns = |
562 | 0 | _result_block->get_columns()[_reusable->delete_sign_idx()]; |
563 | 0 | const auto& filter = |
564 | 0 | assert_cast<const ColumnInt8*>(delete_filter_columns.get())->get_data(); |
565 | 0 | filtered = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size()); |
566 | 0 | total = filter.size(); |
567 | 0 | } |
568 | |
|
569 | 0 | if (filtered == total) { |
570 | 0 | _result_block->clear_column_data(); |
571 | 0 | } else if (filtered > 0) { |
572 | 0 | return Status::NotSupported("Not implemented since only single row at present"); |
573 | 0 | } |
574 | 0 | } |
575 | 0 | return Status::OK(); |
576 | 0 | } |
577 | | |
578 | 0 | Status serialize_block(std::shared_ptr<TFetchDataResult> res, PTabletKeyLookupResponse* response) { |
579 | 0 | uint8_t* buf = nullptr; |
580 | 0 | uint32_t len = 0; |
581 | 0 | ThriftSerializer ser(false, 4096); |
582 | 0 | RETURN_IF_ERROR(ser.serialize(&(res->result_batch), &len, &buf)); |
583 | 0 | response->set_row_batch(std::string((const char*)buf, len)); |
584 | 0 | return Status::OK(); |
585 | 0 | } |
586 | | |
587 | 0 | Status PointQueryExecutor::_output_data() { |
588 | | // 4. exprs exec and serialize to mysql row batches |
589 | 0 | SCOPED_TIMER(&_profile_metrics.output_data_ns); |
590 | 0 | if (_result_block->rows()) { |
591 | 0 | RuntimeState state; |
592 | 0 | auto buffer = std::make_shared<PointQueryResultBlockBuffer>(&state); |
593 | | // TODO reuse mysql_writer |
594 | 0 | VMysqlResultWriter mysql_writer(buffer, _reusable->output_exprs(), nullptr, |
595 | 0 | _binary_row_format); |
596 | 0 | RETURN_IF_ERROR(mysql_writer.init(_reusable->runtime_state())); |
597 | 0 | _result_block->clear_names(); |
598 | 0 | RETURN_IF_ERROR(mysql_writer.write(_reusable->runtime_state(), *_result_block)); |
599 | 0 | RETURN_IF_ERROR(serialize_block(buffer->get_block(), _response)); |
600 | 0 | VLOG_DEBUG << "dump block " << _result_block->dump_data(); |
601 | 0 | } else { |
602 | 0 | _response->set_empty_batch(true); |
603 | 0 | } |
604 | 0 | _profile_metrics.result_data_bytes = _result_block->bytes(); |
605 | 0 | _reusable->return_block(_result_block); |
606 | 0 | return Status::OK(); |
607 | 0 | } |
608 | | |
609 | | #include "common/compile_check_end.h" |
610 | | |
611 | | } // namespace doris |