Coverage Report

Created: 2026-07-03 18:10

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/service/point_query_executor.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "service/point_query_executor.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Descriptors_types.h>
22
#include <gen_cpp/Exprs_types.h>
23
#include <gen_cpp/PaloInternalService_types.h>
24
#include <gen_cpp/internal_service.pb.h>
25
#include <glog/logging.h>
26
#include <google/protobuf/extension_set.h>
27
#include <stdlib.h>
28
29
#include <memory>
30
#include <unordered_map>
31
#include <vector>
32
33
#include "cloud/cloud_tablet.h"
34
#include "cloud/config.h"
35
#include "common/cast_set.h"
36
#include "common/consts.h"
37
#include "common/status.h"
38
#include "core/data_type/data_type_factory.hpp"
39
#include "core/data_type_serde/data_type_serde.h"
40
#include "exec/sink/writer/vmysql_result_writer.h"
41
#include "exprs/vexpr.h"
42
#include "exprs/vexpr_context.h"
43
#include "exprs/vexpr_fwd.h"
44
#include "exprs/vslot_ref.h"
45
#include "runtime/descriptors.h"
46
#include "runtime/exec_env.h"
47
#include "runtime/result_block_buffer.h"
48
#include "runtime/runtime_profile.h"
49
#include "runtime/runtime_state.h"
50
#include "runtime/thread_context.h"
51
#include "storage/row_cursor.h"
52
#include "storage/rowset/beta_rowset.h"
53
#include "storage/rowset/rowset_fwd.h"
54
#include "storage/segment/column_reader.h"
55
#include "storage/tablet/tablet_schema.h"
56
#include "storage/utils.h"
57
#include "util/defer_op.h"
58
#include "util/jsonb/serialize.h"
59
#include "util/lru_cache.h"
60
#include "util/simd/bits.h"
61
#include "util/thrift_util.h"
62
63
namespace doris {
64
65
class PointQueryResultBlockBuffer final : public MySQLResultBlockBuffer {
66
public:
67
0
    PointQueryResultBlockBuffer(RuntimeState* state) : MySQLResultBlockBuffer(state) {}
68
0
    ~PointQueryResultBlockBuffer() override = default;
69
0
    std::shared_ptr<TFetchDataResult> get_block() {
70
0
        std::lock_guard<std::mutex> l(_lock);
71
0
        DCHECK_EQ(_result_batch_queue.size(), 1);
72
0
        auto result = std::move(_result_batch_queue.front());
73
0
        _result_batch_queue.pop_front();
74
0
        return result;
75
0
    }
76
};
77
78
4.09k
Reusable::~Reusable() = default;
79
80
// get missing and include column ids
81
// input include_cids : the output expr slots columns unique ids
82
// missing_cids : the output expr columns that not in row columns cids
83
static void get_missing_and_include_cids(const TabletSchema& schema,
84
                                         const std::vector<SlotDescriptor*>& slots,
85
                                         int target_rs_column_id,
86
                                         std::unordered_set<int>& missing_cids,
87
4.04k
                                         std::unordered_set<int>& include_cids) {
88
4.04k
    missing_cids.clear();
89
4.04k
    include_cids.clear();
90
4.04k
    for (auto* slot : slots) {
91
4.04k
        missing_cids.insert(slot->col_unique_id());
92
4.04k
    }
93
    // insert delete sign column id
94
4.04k
    missing_cids.insert(schema.columns()[schema.delete_sign_idx()]->unique_id());
95
4.04k
    if (target_rs_column_id == -1) {
96
        // no row store columns
97
4.04k
        return;
98
4.04k
    }
99
0
    const TabletColumn& target_rs_column = schema.column_by_uid(target_rs_column_id);
100
0
    DCHECK(target_rs_column.is_row_store_column());
101
    // The full column group is considered a full match, thus no missing cids
102
0
    if (schema.row_columns_uids().empty()) {
103
0
        missing_cids.clear();
104
0
        return;
105
0
    }
106
0
    for (int cid : schema.row_columns_uids()) {
107
0
        missing_cids.erase(cid);
108
0
        include_cids.insert(cid);
109
0
    }
110
0
}
111
112
constexpr static int s_preallocted_blocks_num = 32;
113
114
static void extract_slot_ref(const VExprSPtr& expr, TupleDescriptor* tuple_desc,
115
4.04k
                             std::vector<SlotDescriptor*>& slots) {
116
4.04k
    const auto& children = expr->children();
117
4.04k
    for (const auto& i : children) {
118
0
        extract_slot_ref(i, tuple_desc, slots);
119
0
    }
120
121
4.04k
    auto node_type = expr->node_type();
122
4.04k
    if (node_type == TExprNodeType::SLOT_REF) {
123
4.04k
        int column_id = static_cast<const VSlotRef*>(expr.get())->column_id();
124
4.04k
        auto* slot_desc = tuple_desc->slots()[column_id];
125
4.04k
        slots.push_back(slot_desc);
126
4.04k
    }
127
4.04k
}
128
129
Status Reusable::init(const TDescriptorTable& t_desc_tbl, const std::vector<TExpr>& output_exprs,
130
                      const TQueryOptions& query_options, const TabletSchema& schema,
131
4.04k
                      size_t block_size) {
132
4.04k
    _runtime_state = RuntimeState::create_unique();
133
4.04k
    _runtime_state->set_query_options(query_options);
134
4.04k
    RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), t_desc_tbl, &_desc_tbl));
135
4.04k
    _runtime_state->set_desc_tbl(_desc_tbl);
136
89.0k
    for (const auto* slot : tuple_desc()->slots()) {
137
89.0k
        if (!slot->all_access_paths().empty() || !slot->predicate_access_paths().empty()) {
138
0
            return Status::InternalError(
139
0
                    "Short-circuit point query does not support nested column access paths, "
140
0
                    "slot: {}. Please upgrade FE to disable nested column pruning for "
141
0
                    "short-circuit point queries.",
142
0
                    slot->col_name());
143
0
        }
144
89.0k
    }
145
4.04k
    _block_pool.resize(block_size);
146
8.09k
    for (auto& i : _block_pool) {
147
8.09k
        i = Block::create_unique(tuple_desc()->slots(), 2);
148
        // Name is useless but cost space
149
8.09k
        i->clear_names();
150
8.09k
    }
151
152
4.04k
    RETURN_IF_ERROR(VExpr::create_expr_trees(output_exprs, _output_exprs_ctxs));
153
4.04k
    RowDescriptor row_desc(tuple_desc());
154
    // Prepare the exprs to run.
155
4.04k
    RETURN_IF_ERROR(VExpr::prepare(_output_exprs_ctxs, _runtime_state.get(), row_desc));
156
4.04k
    RETURN_IF_ERROR(VExpr::open(_output_exprs_ctxs, _runtime_state.get()));
157
4.04k
    _create_timestamp = butil::gettimeofday_ms();
158
4.04k
    _data_type_serdes = create_data_type_serdes(tuple_desc()->slots());
159
4.04k
    _col_default_values.resize(tuple_desc()->slots().size());
160
4.04k
    bool has_delete_sign = false;
161
93.0k
    for (int i = 0; i < tuple_desc()->slots().size(); ++i) {
162
89.0k
        auto* slot = tuple_desc()->slots()[i];
163
89.0k
        if (slot->col_name() == DELETE_SIGN) {
164
0
            has_delete_sign = true;
165
0
        }
166
89.0k
        _col_uid_to_idx[slot->col_unique_id()] = i;
167
89.0k
        _col_default_values[i] = slot->col_default_value();
168
89.0k
    }
169
170
    // Get the output slot descriptors
171
4.04k
    std::vector<SlotDescriptor*> output_slot_descs;
172
4.04k
    for (const auto& expr : _output_exprs_ctxs) {
173
4.04k
        extract_slot_ref(expr->root(), tuple_desc(), output_slot_descs);
174
4.04k
    }
175
176
    // get the delete sign idx in block
177
4.04k
    if (has_delete_sign) {
178
0
        _delete_sign_idx = _col_uid_to_idx[schema.columns()[schema.delete_sign_idx()]->unique_id()];
179
0
    }
180
181
4.04k
    if (schema.have_column(BeConsts::ROW_STORE_COL)) {
182
0
        const auto& column = *DORIS_TRY(schema.column(BeConsts::ROW_STORE_COL));
183
0
        _row_store_column_ids = column.unique_id();
184
0
    }
185
4.04k
    get_missing_and_include_cids(schema, output_slot_descs, _row_store_column_ids,
186
4.04k
                                 _missing_col_uids, _include_col_uids);
187
188
4.04k
    return Status::OK();
189
4.04k
}
190
191
0
std::unique_ptr<Block> Reusable::get_block() {
192
0
    std::lock_guard lock(_block_mutex);
193
0
    if (_block_pool.empty()) {
194
0
        auto block = Block::create_unique(tuple_desc()->slots(), 2);
195
        // Name is useless but cost space
196
0
        block->clear_names();
197
0
        return block;
198
0
    }
199
0
    auto block = std::move(_block_pool.back());
200
0
    CHECK(block != nullptr);
201
0
    _block_pool.pop_back();
202
0
    return block;
203
0
}
204
205
0
void Reusable::return_block(std::unique_ptr<Block>& block) {
206
0
    std::lock_guard lock(_block_mutex);
207
0
    if (block == nullptr) {
208
0
        return;
209
0
    }
210
0
    block->clear_column_data();
211
0
    _block_pool.push_back(std::move(block));
212
0
    if (_block_pool.size() > s_preallocted_blocks_num) {
213
0
        _block_pool.resize(s_preallocted_blocks_num);
214
0
    }
215
0
}
216
217
0
LookupConnectionCache* LookupConnectionCache::create_global_instance(size_t capacity) {
218
0
    DCHECK(ExecEnv::GetInstance()->get_lookup_connection_cache() == nullptr);
219
0
    auto* res = new LookupConnectionCache(capacity);
220
0
    return res;
221
0
}
222
223
RowCache::RowCache(int64_t capacity, int num_shards)
224
3
        : LRUCachePolicy(CachePolicy::CacheType::POINT_QUERY_ROW_CACHE, capacity,
225
3
                         LRUCacheType::SIZE, config::point_query_row_cache_stale_sweep_time_sec,
226
3
                         num_shards, /*element count capacity */ 0,
227
3
                         /*enable prune*/ true, /*is lru-k*/ true) {}
228
229
// Create global instance of this class
230
0
RowCache* RowCache::create_global_cache(int64_t capacity, uint32_t num_shards) {
231
0
    DCHECK(ExecEnv::GetInstance()->get_row_cache() == nullptr);
232
0
    auto* res = new RowCache(capacity, num_shards);
233
0
    return res;
234
0
}
235
236
0
RowCache* RowCache::instance() {
237
0
    return ExecEnv::GetInstance()->get_row_cache();
238
0
}
239
240
4
bool RowCache::lookup(const RowCacheKey& key, CacheHandle* handle) {
241
4
    const std::string& encoded_key = key.encode();
242
4
    auto* lru_handle = LRUCachePolicy::lookup(encoded_key);
243
4
    if (!lru_handle) {
244
        // cache miss
245
3
        return false;
246
3
    }
247
1
    *handle = CacheHandle(this, lru_handle);
248
1
    return true;
249
4
}
250
251
11
void RowCache::insert(const RowCacheKey& key, const Slice& value) {
252
11
    char* cache_value = static_cast<char*>(malloc(value.size));
253
11
    memcpy(cache_value, value.data, value.size);
254
11
    auto* row_cache_value = new RowCacheValue;
255
11
    row_cache_value->cache_value = cache_value;
256
11
    const std::string& encoded_key = key.encode();
257
11
    auto* handle = LRUCachePolicy::insert(encoded_key, row_cache_value, value.size, value.size,
258
11
                                          CachePriority::NORMAL);
259
    // handle will released
260
11
    auto tmp = CacheHandle {this, handle};
261
11
}
262
263
1
void RowCache::erase(const RowCacheKey& key) {
264
1
    const std::string& encoded_key = key.encode();
265
1
    LRUCachePolicy::erase(encoded_key);
266
1
}
267
268
2.09k
LookupConnectionCache::CacheValue::~CacheValue() {
269
2.09k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
270
2.09k
            ExecEnv::GetInstance()->point_query_executor_mem_tracker());
271
2.09k
    item.reset();
272
2.09k
}
273
274
0
PointQueryExecutor::~PointQueryExecutor() {
275
0
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
276
0
            ExecEnv::GetInstance()->point_query_executor_mem_tracker());
277
0
    _tablet.reset();
278
0
    _reusable.reset();
279
0
    _result_block.reset();
280
0
    _row_read_ctxs.clear();
281
0
}
282
283
Status PointQueryExecutor::init(const PTabletKeyLookupRequest* request,
284
0
                                PTabletKeyLookupResponse* response) {
285
0
    SCOPED_TIMER(&_profile_metrics.init_ns);
286
0
    _response = response;
287
    // using cache
288
0
    __int128_t uuid =
289
0
            static_cast<__int128_t>(request->uuid().uuid_high()) << 64 | request->uuid().uuid_low();
290
0
    SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->point_query_executor_mem_tracker());
291
0
    auto cache_handle = LookupConnectionCache::instance()->get(uuid);
292
0
    _binary_row_format = request->is_binary_row();
293
0
    _tablet = DORIS_TRY(ExecEnv::get_tablet(request->tablet_id()));
294
0
    if (cache_handle != nullptr) {
295
0
        _reusable = cache_handle;
296
0
        _profile_metrics.hit_lookup_cache = true;
297
0
    } else {
298
        // Lightweight request: FE may omit reusable query context and rely on uuid cache.
299
        // If cache miss and required parameters are absent, ask FE to resend a full request.
300
0
        if (uuid != 0 && (!request->has_desc_tbl() || request->desc_tbl().empty() ||
301
0
                          !request->has_output_expr() || request->output_expr().empty() ||
302
0
                          !request->has_query_options() || request->query_options().empty())) {
303
0
            if (VLOG_DEBUG_IS_ON) {
304
0
                VLOG_DEBUG << "lookup connection cache miss, ask FE to resend query context"
305
0
                           << ", tablet_id=" << request->tablet_id()
306
0
                           << ", uuid_high=" << request->uuid().uuid_high()
307
0
                           << ", uuid_low=" << request->uuid().uuid_low();
308
0
            }
309
0
            response->set_need_resend_query_context(true);
310
0
            return Status::OK();
311
0
        }
312
0
        if (uuid == 0 && (!request->has_desc_tbl() || request->desc_tbl().empty() ||
313
0
                          !request->has_output_expr() || request->output_expr().empty())) {
314
0
            return Status::InvalidArgument(
315
0
                    "tablet_fetch_data requires desc_tbl/output_expr when uuid is not set");
316
0
        }
317
        // init handle
318
0
        auto reusable_ptr = std::make_shared<Reusable>();
319
0
        TDescriptorTable t_desc_tbl;
320
0
        TExprList t_output_exprs;
321
0
        auto len = cast_set<uint32_t>(request->desc_tbl().size());
322
0
        RETURN_IF_ERROR(
323
0
                deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(request->desc_tbl().data()),
324
0
                                       &len, false, &t_desc_tbl));
325
0
        len = cast_set<uint32_t>(request->output_expr().size());
326
0
        RETURN_IF_ERROR(deserialize_thrift_msg(
327
0
                reinterpret_cast<const uint8_t*>(request->output_expr().data()), &len, false,
328
0
                &t_output_exprs));
329
0
        _reusable = reusable_ptr;
330
0
        TQueryOptions t_query_options;
331
0
        len = cast_set<uint32_t>(request->query_options().size());
332
0
        if (request->has_query_options()) {
333
0
            RETURN_IF_ERROR(deserialize_thrift_msg(
334
0
                    reinterpret_cast<const uint8_t*>(request->query_options().data()), &len, false,
335
0
                    &t_query_options));
336
0
        }
337
0
        if (uuid != 0) {
338
            // could be reused by requests after, pre allocte more blocks
339
0
            RETURN_IF_ERROR(reusable_ptr->init(t_desc_tbl, t_output_exprs.exprs, t_query_options,
340
0
                                               *_tablet->tablet_schema(),
341
0
                                               s_preallocted_blocks_num));
342
0
            LookupConnectionCache::instance()->add(uuid, reusable_ptr);
343
0
        } else {
344
0
            RETURN_IF_ERROR(reusable_ptr->init(t_desc_tbl, t_output_exprs.exprs, t_query_options,
345
0
                                               *_tablet->tablet_schema(), 1));
346
0
        }
347
0
    }
348
    // Set timezone from request for functions like from_unixtime()
349
0
    if (request->has_time_zone() && !request->time_zone().empty()) {
350
0
        _reusable->runtime_state()->set_timezone(request->time_zone());
351
0
    }
352
0
    if (request->has_version() && request->version() >= 0) {
353
0
        _version = request->version();
354
0
    }
355
0
    RETURN_IF_ERROR(_init_keys(request));
356
0
    _result_block = _reusable->get_block();
357
0
    CHECK(_result_block != nullptr);
358
359
0
    return Status::OK();
360
0
}
361
362
0
Status PointQueryExecutor::lookup_up() {
363
0
    SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->point_query_executor_mem_tracker());
364
0
    RETURN_IF_ERROR(_lookup_row_key());
365
0
    RETURN_IF_ERROR(_lookup_row_data());
366
0
    RETURN_IF_ERROR(_output_data());
367
0
    return Status::OK();
368
0
}
369
370
0
void PointQueryExecutor::print_profile() {
371
0
    auto init_us = _profile_metrics.init_ns.value() / 1000;
372
0
    auto init_key_us = _profile_metrics.init_key_ns.value() / 1000;
373
0
    auto lookup_key_us = _profile_metrics.lookup_key_ns.value() / 1000;
374
0
    auto lookup_data_us = _profile_metrics.lookup_data_ns.value() / 1000;
375
0
    auto output_data_us = _profile_metrics.output_data_ns.value() / 1000;
376
0
    auto load_segments_key_us = _profile_metrics.load_segment_key_stage_ns.value() / 1000;
377
0
    auto load_segments_data_us = _profile_metrics.load_segment_data_stage_ns.value() / 1000;
378
0
    auto total_us = init_us + lookup_key_us + lookup_data_us + output_data_us;
379
0
    auto read_stats = _profile_metrics.read_stats;
380
0
    const std::string stats_str = fmt::format(
381
0
            "[lookup profile:{}us] init:{}us, init_key:{}us,"
382
0
            " lookup_key:{}us, load_segments_key:{}us, lookup_data:{}us, load_segments_data:{}us,"
383
0
            " output_data:{}us, "
384
0
            "hit_lookup_cache:{}"
385
0
            ", is_binary_row:{}, output_columns:{}, total_keys:{}, row_cache_hits:{}"
386
0
            ", hit_cached_pages:{}, total_pages_read:{}, compressed_bytes_read:{}, "
387
0
            "io_latency:{}ns, "
388
0
            "uncompressed_bytes_read:{}, result_data_bytes:{}, row_hits:{}"
389
0
            ", rs_column_uid:{}, bytes_read_from_local:{}, bytes_read_from_remote:{}, "
390
0
            "local_io_timer:{}, remote_io_timer:{}, local_write_timer:{}",
391
0
            total_us, init_us, init_key_us, lookup_key_us, load_segments_key_us, lookup_data_us,
392
0
            load_segments_data_us, output_data_us, _profile_metrics.hit_lookup_cache,
393
0
            _binary_row_format, _reusable->output_exprs().size(), _row_read_ctxs.size(),
394
0
            _profile_metrics.row_cache_hits, read_stats.cached_pages_num,
395
0
            read_stats.total_pages_num, read_stats.compressed_bytes_read, read_stats.io_ns,
396
0
            read_stats.uncompressed_bytes_read, _profile_metrics.result_data_bytes, _row_hits,
397
0
            _reusable->rs_column_uid(),
398
0
            _profile_metrics.read_stats.file_cache_stats.bytes_read_from_local,
399
0
            _profile_metrics.read_stats.file_cache_stats.bytes_read_from_remote,
400
0
            _profile_metrics.read_stats.file_cache_stats.local_io_timer,
401
0
            _profile_metrics.read_stats.file_cache_stats.remote_io_timer,
402
0
            _profile_metrics.read_stats.file_cache_stats.write_cache_io_timer);
403
404
0
    constexpr static int kSlowThreholdUs = 50 * 1000; // 50ms
405
0
    if (total_us > kSlowThreholdUs) {
406
0
        LOG(WARNING) << "slow query, " << stats_str;
407
0
    } else if (VLOG_DEBUG_IS_ON) {
408
0
        VLOG_DEBUG << stats_str;
409
0
    } else {
410
0
        LOG_EVERY_N(INFO, 1000) << stats_str;
411
0
    }
412
0
}
413
414
0
Status PointQueryExecutor::_init_keys(const PTabletKeyLookupRequest* request) {
415
0
    SCOPED_TIMER(&_profile_metrics.init_key_ns);
416
0
    const auto& schema = _tablet->tablet_schema();
417
    // Point query is only supported on merge-on-write unique key tables.
418
0
    DCHECK(schema->keys_type() == UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write());
419
0
    if (schema->keys_type() != UNIQUE_KEYS || !_tablet->enable_unique_key_merge_on_write()) {
420
0
        return Status::InvalidArgument(
421
0
                "Point query is only supported on merge-on-write unique key tables, "
422
0
                "tablet_id={}",
423
0
                _tablet->tablet_id());
424
0
    }
425
    // 1. get primary key from conditions
426
0
    _row_read_ctxs.resize(request->key_tuples().size());
427
    // get row cursor and encode keys
428
0
    for (int i = 0; i < request->key_tuples().size(); ++i) {
429
0
        const KeyTuple& key_tuple = request->key_tuples(i);
430
0
        if (UNLIKELY(cast_set<size_t>(key_tuple.key_column_literals_size()) !=
431
0
                     schema->num_key_columns())) {
432
0
            return Status::InvalidArgument(
433
0
                    "Key column count mismatch. expected={}, actual={}, tablet_id={}",
434
0
                    schema->num_key_columns(), key_tuple.key_column_literals_size(),
435
0
                    _tablet->tablet_id());
436
0
        }
437
0
        RowCursor cursor;
438
0
        std::vector<Field> key_fields;
439
0
        key_fields.reserve(key_tuple.key_column_literals_size());
440
0
        for (int j = 0; j < key_tuple.key_column_literals_size(); ++j) {
441
0
            const auto& literal_bytes = key_tuple.key_column_literals(j);
442
0
            TExprNode expr_node;
443
0
            auto len = cast_set<uint32_t>(literal_bytes.size());
444
0
            RETURN_IF_ERROR(
445
0
                    deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(literal_bytes.data()),
446
0
                                           &len, false, &expr_node));
447
0
            const auto& col = schema->column(j);
448
0
            auto data_type = DataTypeFactory::instance().create_data_type(
449
0
                    col.type(), col.precision(), col.frac(), col.length());
450
0
            key_fields.push_back(data_type->get_field(expr_node));
451
0
        }
452
0
        RETURN_IF_ERROR(cursor.init_scan_key(_tablet->tablet_schema(), std::move(key_fields)));
453
0
        cursor.encode_key_with_padding<true>(&_row_read_ctxs[i]._primary_key,
454
0
                                             _tablet->tablet_schema()->num_key_columns(), true);
455
0
    }
456
0
    return Status::OK();
457
0
}
458
459
0
Status PointQueryExecutor::_lookup_row_key() {
460
0
    SCOPED_TIMER(&_profile_metrics.lookup_key_ns);
461
    // 2. lookup row location
462
0
    Status st;
463
0
    if (_version >= 0) {
464
0
        CHECK(config::is_cloud_mode()) << "Only cloud mode support snapshot read at present";
465
0
        SyncOptions options;
466
0
        options.query_version = _version;
467
0
        RETURN_IF_ERROR(std::dynamic_pointer_cast<CloudTablet>(_tablet)->sync_rowsets(options));
468
0
    }
469
0
    std::vector<RowsetSharedPtr> specified_rowsets;
470
0
    {
471
0
        std::shared_lock rlock(_tablet->get_header_lock());
472
0
        specified_rowsets = _tablet->get_rowset_by_ids(nullptr);
473
0
    }
474
0
    std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());
475
0
    for (size_t i = 0; i < _row_read_ctxs.size(); ++i) {
476
0
        RowLocation location;
477
0
        if (!config::disable_storage_row_cache) {
478
0
            RowCache::CacheHandle cache_handle;
479
0
            auto hit_cache = RowCache::instance()->lookup(
480
0
                    {_tablet->tablet_id(), _row_read_ctxs[i]._primary_key}, &cache_handle);
481
0
            if (hit_cache) {
482
0
                _row_read_ctxs[i]._cached_row_data = std::move(cache_handle);
483
0
                ++_profile_metrics.row_cache_hits;
484
0
                continue;
485
0
            }
486
0
        }
487
        // Get rowlocation and rowset, ctx._rowset_ptr will acquire wrap this ptr
488
0
        auto rowset_ptr = std::make_unique<RowsetSharedPtr>();
489
0
        st = (_tablet->lookup_row_key(_row_read_ctxs[i]._primary_key, nullptr, false,
490
0
                                      specified_rowsets, &location, INT32_MAX /*rethink?*/,
491
0
                                      segment_caches, rowset_ptr.get(), false, nullptr,
492
0
                                      &_profile_metrics.read_stats));
493
0
        if (st.is<ErrorCode::KEY_NOT_FOUND>()) {
494
0
            continue;
495
0
        }
496
0
        RETURN_IF_ERROR(st);
497
0
        _row_read_ctxs[i]._row_location = location;
498
        // acquire and wrap this rowset
499
0
        (*rowset_ptr)->acquire();
500
0
        VLOG_DEBUG << "aquire rowset " << (*rowset_ptr)->rowset_id();
501
0
        _row_read_ctxs[i]._rowset_ptr = std::unique_ptr<RowsetSharedPtr, decltype(&release_rowset)>(
502
0
                rowset_ptr.release(), &release_rowset);
503
0
        _row_hits++;
504
0
    }
505
0
    return Status::OK();
506
0
}
507
508
0
Status PointQueryExecutor::_lookup_row_data() {
509
    // 3. get values
510
0
    SCOPED_TIMER(&_profile_metrics.lookup_data_ns);
511
0
    {
512
0
        auto result_columns_guard = _result_block->mutate_columns_scoped();
513
0
        MutableColumns& result_columns = result_columns_guard.mutable_columns();
514
0
        for (size_t i = 0; i < _row_read_ctxs.size(); ++i) {
515
0
            if (_row_read_ctxs[i]._cached_row_data.valid()) {
516
0
                RETURN_IF_ERROR(JsonbSerializeUtil::jsonb_to_columns(
517
0
                        _reusable->get_data_type_serdes(),
518
0
                        _row_read_ctxs[i]._cached_row_data.data().data,
519
0
                        _row_read_ctxs[i]._cached_row_data.data().size,
520
0
                        _reusable->get_col_uid_to_idx(), result_columns,
521
0
                        _reusable->get_col_default_values(), _reusable->include_col_uids()));
522
0
                continue;
523
0
            }
524
0
            if (!_row_read_ctxs[i]._row_location.has_value()) {
525
0
                continue;
526
0
            }
527
0
            std::string value;
528
            // fill block by row store
529
0
            if (_reusable->rs_column_uid() != -1) {
530
0
                bool use_row_cache = !config::disable_storage_row_cache;
531
0
                RETURN_IF_ERROR(_tablet->lookup_row_data(
532
0
                        _row_read_ctxs[i]._primary_key, _row_read_ctxs[i]._row_location.value(),
533
0
                        *(_row_read_ctxs[i]._rowset_ptr), _profile_metrics.read_stats, value,
534
0
                        use_row_cache));
535
                // serialize value to block, currently only jsonb row format
536
0
                RETURN_IF_ERROR(JsonbSerializeUtil::jsonb_to_columns(
537
0
                        _reusable->get_data_type_serdes(), value.data(), value.size(),
538
0
                        _reusable->get_col_uid_to_idx(), result_columns,
539
0
                        _reusable->get_col_default_values(), _reusable->include_col_uids()));
540
0
            }
541
0
            if (!_reusable->missing_col_uids().empty()) {
542
0
                if (!_reusable->runtime_state()->enable_short_circuit_query_access_column_store()) {
543
0
                    std::string missing_columns;
544
0
                    for (int cid : _reusable->missing_col_uids()) {
545
0
                        missing_columns +=
546
0
                                _tablet->tablet_schema()->column_by_uid(cid).name() + ",";
547
0
                    }
548
0
                    return Status::InternalError(
549
0
                            "Not support column store, set store_row_column=true or "
550
0
                            "row_store_columns in table properties, missing columns: " +
551
0
                            missing_columns + " should be added to row store");
552
0
                }
553
                // fill missing columns by column store
554
0
                RowLocation row_loc = _row_read_ctxs[i]._row_location.value();
555
0
                BetaRowsetSharedPtr rowset = std::static_pointer_cast<BetaRowset>(
556
0
                        _tablet->get_rowset(row_loc.rowset_id));
557
0
                SegmentCacheHandle segment_cache;
558
0
                {
559
0
                    SCOPED_TIMER(&_profile_metrics.load_segment_data_stage_ns);
560
0
                    RETURN_IF_ERROR(
561
0
                            SegmentLoader::instance()->load_segments(rowset, &segment_cache, true));
562
0
                }
563
                // find segment
564
0
                auto it = std::find_if(segment_cache.get_segments().cbegin(),
565
0
                                       segment_cache.get_segments().cend(),
566
0
                                       [&](const segment_v2::SegmentSharedPtr& seg) {
567
0
                                           return seg->id() == row_loc.segment_id;
568
0
                                       });
569
0
                const auto& segment = *it;
570
0
                for (int cid : _reusable->missing_col_uids()) {
571
0
                    int pos = _reusable->get_col_uid_to_idx().at(cid);
572
0
                    std::vector<segment_v2::rowid_t> row_ids {
573
0
                            static_cast<segment_v2::rowid_t>(row_loc.row_id)};
574
0
                    auto& column = result_columns[pos];
575
0
                    std::unique_ptr<ColumnIterator> iter;
576
0
                    SlotDescriptor* slot = _reusable->tuple_desc()->slots()[pos];
577
0
                    StorageReadOptions storage_read_options;
578
0
                    storage_read_options.stats = &_read_stats;
579
0
                    storage_read_options.io_ctx.reader_type = ReaderType::READER_QUERY;
580
0
                    RETURN_IF_ERROR(segment->seek_and_read_by_rowid(*_tablet->tablet_schema(), slot,
581
0
                                                                    row_ids, column,
582
0
                                                                    storage_read_options, iter));
583
0
                }
584
0
            }
585
0
        }
586
0
        if (result_columns.size() > _reusable->include_col_uids().size()) {
587
            // Padding rows for some columns that no need to output to mysql client
588
            // eg. SELECT k1,v1,v2 FROM TABLE WHERE k1 = 1, k1 is not in output slots, tuple as bellow
589
            // TupleDescriptor{id=1, tbl=table_with_column_group}
590
            // SlotDescriptor{id=8, col=v1, colUniqueId=1 ...}
591
            // SlotDescriptor{id=9, col=v2, colUniqueId=2 ...}
592
            // thus missing in include_col_uids and missing_col_uids
593
0
            for (auto& column : result_columns) {
594
0
                int padding_rows = _row_hits - cast_set<int>(column->size());
595
0
                if (padding_rows > 0) {
596
0
                    column->insert_many_defaults(padding_rows);
597
0
                }
598
0
            }
599
0
        }
600
0
    }
601
    // filter rows by delete sign
602
0
    if (_row_hits > 0 && _reusable->delete_sign_idx() != -1) {
603
0
        size_t filtered = 0;
604
0
        size_t total = 0;
605
0
        {
606
            // clear_column_data will check reference of ColumnPtr, so we need to release
607
            // reference before clear_column_data
608
0
            ColumnPtr delete_filter_columns =
609
0
                    _result_block->get_columns()[_reusable->delete_sign_idx()];
610
0
            const auto& filter =
611
0
                    assert_cast<const ColumnInt8*>(delete_filter_columns.get())->get_data();
612
0
            filtered = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size());
613
0
            total = filter.size();
614
0
        }
615
616
0
        if (filtered == total) {
617
0
            _result_block->clear_column_data();
618
0
        } else if (filtered > 0) {
619
0
            return Status::NotSupported("Not implemented since only single row at present");
620
0
        }
621
0
    }
622
0
    return Status::OK();
623
0
}
624
625
0
Status serialize_block(std::shared_ptr<TFetchDataResult> res, PTabletKeyLookupResponse* response) {
626
0
    uint8_t* buf = nullptr;
627
0
    uint32_t len = 0;
628
0
    ThriftSerializer ser(false, 4096);
629
0
    RETURN_IF_ERROR(ser.serialize(&(res->result_batch), &len, &buf));
630
0
    response->set_row_batch(std::string((const char*)buf, len));
631
0
    return Status::OK();
632
0
}
633
634
0
Status PointQueryExecutor::_output_data() {
635
    // 4. exprs exec and serialize to mysql row batches
636
0
    SCOPED_TIMER(&_profile_metrics.output_data_ns);
637
0
    if (_result_block->rows()) {
638
0
        RuntimeState state;
639
0
        auto buffer = std::make_shared<PointQueryResultBlockBuffer>(&state);
640
        // TODO reuse mysql_writer
641
0
        VMysqlResultWriter mysql_writer(buffer, _reusable->output_exprs(), nullptr,
642
0
                                        _binary_row_format);
643
0
        RETURN_IF_ERROR(mysql_writer.init(_reusable->runtime_state()));
644
0
        _result_block->clear_names();
645
0
        RETURN_IF_ERROR(mysql_writer.write(_reusable->runtime_state(), *_result_block));
646
0
        RETURN_IF_ERROR(serialize_block(buffer->get_block(), _response));
647
0
        VLOG_DEBUG << "dump block " << _result_block->dump_data();
648
0
    } else {
649
0
        _response->set_empty_batch(true);
650
0
    }
651
0
    _profile_metrics.result_data_bytes = _result_block->bytes();
652
0
    _reusable->return_block(_result_block);
653
0
    return Status::OK();
654
0
}
655
656
} // namespace doris