Coverage Report

Created: 2026-03-15 14:53

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/service/point_query_executor.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "service/point_query_executor.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Descriptors_types.h>
22
#include <gen_cpp/Exprs_types.h>
23
#include <gen_cpp/PaloInternalService_types.h>
24
#include <gen_cpp/internal_service.pb.h>
25
#include <glog/logging.h>
26
#include <google/protobuf/extension_set.h>
27
#include <stdlib.h>
28
29
#include <memory>
30
#include <unordered_map>
31
#include <vector>
32
33
#include "cloud/cloud_tablet.h"
34
#include "cloud/config.h"
35
#include "common/cast_set.h"
36
#include "common/consts.h"
37
#include "common/status.h"
38
#include "core/data_type_serde/data_type_serde.h"
39
#include "exec/sink/writer/vmysql_result_writer.h"
40
#include "exprs/vexpr.h"
41
#include "exprs/vexpr_context.h"
42
#include "exprs/vexpr_fwd.h"
43
#include "exprs/vslot_ref.h"
44
#include "runtime/descriptors.h"
45
#include "runtime/exec_env.h"
46
#include "runtime/result_block_buffer.h"
47
#include "runtime/runtime_profile.h"
48
#include "runtime/runtime_state.h"
49
#include "runtime/thread_context.h"
50
#include "storage/olap_tuple.h"
51
#include "storage/row_cursor.h"
52
#include "storage/rowset/beta_rowset.h"
53
#include "storage/rowset/rowset_fwd.h"
54
#include "storage/segment/column_reader.h"
55
#include "storage/tablet/tablet_schema.h"
56
#include "storage/utils.h"
57
#include "util/jsonb/serialize.h"
58
#include "util/lru_cache.h"
59
#include "util/simd/bits.h"
60
#include "util/thrift_util.h"
61
62
namespace doris {
63
64
#include "common/compile_check_begin.h"
65
66
class PointQueryResultBlockBuffer final : public MySQLResultBlockBuffer {
67
public:
68
249
    PointQueryResultBlockBuffer(RuntimeState* state) : MySQLResultBlockBuffer(state) {}
69
    ~PointQueryResultBlockBuffer() override = default;
70
249
    std::shared_ptr<TFetchDataResult> get_block() {
71
249
        std::lock_guard<std::mutex> l(_lock);
72
249
        DCHECK_EQ(_result_batch_queue.size(), 1);
73
249
        auto result = std::move(_result_batch_queue.front());
74
249
        _result_batch_queue.pop_front();
75
249
        return result;
76
249
    }
77
};
78
79
4.16k
Reusable::~Reusable() = default;
80
81
// get missing and include column ids
82
// input include_cids : the output expr slots columns unique ids
83
// missing_cids : the output expr columns that not in row columns cids
84
static void get_missing_and_include_cids(const TabletSchema& schema,
85
                                         const std::vector<SlotDescriptor*>& slots,
86
                                         int target_rs_column_id,
87
                                         std::unordered_set<int>& missing_cids,
88
4.20k
                                         std::unordered_set<int>& include_cids) {
89
4.20k
    missing_cids.clear();
90
4.20k
    include_cids.clear();
91
5.20k
    for (auto* slot : slots) {
92
5.20k
        missing_cids.insert(slot->col_unique_id());
93
5.20k
    }
94
    // insert delete sign column id
95
4.20k
    missing_cids.insert(schema.columns()[schema.delete_sign_idx()]->unique_id());
96
4.20k
    if (target_rs_column_id == -1) {
97
        // no row store columns
98
4.04k
        return;
99
4.04k
    }
100
154
    const TabletColumn& target_rs_column = schema.column_by_uid(target_rs_column_id);
101
154
    DCHECK(target_rs_column.is_row_store_column());
102
    // The full column group is considered a full match, thus no missing cids
103
154
    if (schema.row_columns_uids().empty()) {
104
144
        missing_cids.clear();
105
144
        return;
106
144
    }
107
18
    for (int cid : schema.row_columns_uids()) {
108
18
        missing_cids.erase(cid);
109
18
        include_cids.insert(cid);
110
18
    }
111
10
}
112
113
constexpr static int s_preallocted_blocks_num = 32;
114
115
static void extract_slot_ref(const VExprSPtr& expr, TupleDescriptor* tuple_desc,
116
5.41k
                             std::vector<SlotDescriptor*>& slots) {
117
5.41k
    const auto& children = expr->children();
118
5.41k
    for (const auto& i : children) {
119
210
        extract_slot_ref(i, tuple_desc, slots);
120
210
    }
121
122
5.41k
    auto node_type = expr->node_type();
123
5.41k
    if (node_type == TExprNodeType::SLOT_REF) {
124
5.20k
        int column_id = static_cast<const VSlotRef*>(expr.get())->column_id();
125
5.20k
        auto* slot_desc = tuple_desc->slots()[column_id];
126
5.20k
        slots.push_back(slot_desc);
127
5.20k
    }
128
5.41k
}
129
130
Status Reusable::init(const TDescriptorTable& t_desc_tbl, const std::vector<TExpr>& output_exprs,
131
                      const TQueryOptions& query_options, const TabletSchema& schema,
132
4.20k
                      size_t block_size) {
133
4.20k
    _runtime_state = RuntimeState::create_unique();
134
4.20k
    _runtime_state->set_query_options(query_options);
135
4.20k
    RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), t_desc_tbl, &_desc_tbl));
136
4.20k
    _runtime_state->set_desc_tbl(_desc_tbl);
137
4.20k
    _block_pool.resize(block_size);
138
10.9k
    for (auto& i : _block_pool) {
139
10.9k
        i = Block::create_unique(tuple_desc()->slots(), 2);
140
        // Name is useless but cost space
141
10.9k
        i->clear_names();
142
10.9k
    }
143
144
4.20k
    RETURN_IF_ERROR(VExpr::create_expr_trees(output_exprs, _output_exprs_ctxs));
145
4.20k
    RowDescriptor row_desc(tuple_desc());
146
    // Prepare the exprs to run.
147
4.20k
    RETURN_IF_ERROR(VExpr::prepare(_output_exprs_ctxs, _runtime_state.get(), row_desc));
148
4.20k
    RETURN_IF_ERROR(VExpr::open(_output_exprs_ctxs, _runtime_state.get()));
149
4.20k
    _create_timestamp = butil::gettimeofday_ms();
150
4.20k
    _data_type_serdes = create_data_type_serdes(tuple_desc()->slots());
151
4.20k
    _col_default_values.resize(tuple_desc()->slots().size());
152
4.20k
    bool has_delete_sign = false;
153
94.6k
    for (int i = 0; i < tuple_desc()->slots().size(); ++i) {
154
90.4k
        auto* slot = tuple_desc()->slots()[i];
155
90.4k
        if (slot->col_name() == DELETE_SIGN) {
156
153
            has_delete_sign = true;
157
153
        }
158
90.4k
        _col_uid_to_idx[slot->col_unique_id()] = i;
159
90.4k
        _col_default_values[i] = slot->col_default_value();
160
90.4k
    }
161
162
    // Get the output slot descriptors
163
4.20k
    std::vector<SlotDescriptor*> output_slot_descs;
164
5.20k
    for (const auto& expr : _output_exprs_ctxs) {
165
5.20k
        extract_slot_ref(expr->root(), tuple_desc(), output_slot_descs);
166
5.20k
    }
167
168
    // get the delete sign idx in block
169
4.20k
    if (has_delete_sign) {
170
153
        _delete_sign_idx = _col_uid_to_idx[schema.columns()[schema.delete_sign_idx()]->unique_id()];
171
153
    }
172
173
4.20k
    if (schema.have_column(BeConsts::ROW_STORE_COL)) {
174
154
        const auto& column = *DORIS_TRY(schema.column(BeConsts::ROW_STORE_COL));
175
154
        _row_store_column_ids = column.unique_id();
176
154
    }
177
4.20k
    get_missing_and_include_cids(schema, output_slot_descs, _row_store_column_ids,
178
4.20k
                                 _missing_col_uids, _include_col_uids);
179
180
4.20k
    return Status::OK();
181
4.20k
}
182
183
263
std::unique_ptr<Block> Reusable::get_block() {
184
263
    std::lock_guard lock(_block_mutex);
185
263
    if (_block_pool.empty()) {
186
0
        auto block = Block::create_unique(tuple_desc()->slots(), 2);
187
        // Name is useless but cost space
188
0
        block->clear_names();
189
0
        return block;
190
0
    }
191
263
    auto block = std::move(_block_pool.back());
192
263
    CHECK(block != nullptr);
193
263
    _block_pool.pop_back();
194
263
    return block;
195
263
}
196
197
260
void Reusable::return_block(std::unique_ptr<Block>& block) {
198
260
    std::lock_guard lock(_block_mutex);
199
260
    if (block == nullptr) {
200
0
        return;
201
0
    }
202
260
    block->clear_column_data();
203
260
    _block_pool.push_back(std::move(block));
204
260
    if (_block_pool.size() > s_preallocted_blocks_num) {
205
0
        _block_pool.resize(s_preallocted_blocks_num);
206
0
    }
207
260
}
208
209
7
LookupConnectionCache* LookupConnectionCache::create_global_instance(size_t capacity) {
210
7
    DCHECK(ExecEnv::GetInstance()->get_lookup_connection_cache() == nullptr);
211
7
    auto* res = new LookupConnectionCache(capacity);
212
7
    return res;
213
7
}
214
215
RowCache::RowCache(int64_t capacity, int num_shards)
216
10
        : LRUCachePolicy(CachePolicy::CacheType::POINT_QUERY_ROW_CACHE, capacity,
217
10
                         LRUCacheType::SIZE, config::point_query_row_cache_stale_sweep_time_sec,
218
10
                         num_shards, /*element count capacity */ 0,
219
10
                         /*enable prune*/ true, /*is lru-k*/ true) {}
220
221
// Create global instance of this class
222
7
RowCache* RowCache::create_global_cache(int64_t capacity, uint32_t num_shards) {
223
7
    DCHECK(ExecEnv::GetInstance()->get_row_cache() == nullptr);
224
7
    auto* res = new RowCache(capacity, num_shards);
225
7
    return res;
226
7
}
227
228
299
RowCache* RowCache::instance() {
229
299
    return ExecEnv::GetInstance()->get_row_cache();
230
299
}
231
232
146
bool RowCache::lookup(const RowCacheKey& key, CacheHandle* handle) {
233
146
    const std::string& encoded_key = key.encode();
234
146
    auto* lru_handle = LRUCachePolicy::lookup(encoded_key);
235
146
    if (!lru_handle) {
236
        // cache miss
237
74
        return false;
238
74
    }
239
72
    *handle = CacheHandle(this, lru_handle);
240
72
    return true;
241
146
}
242
243
76
void RowCache::insert(const RowCacheKey& key, const Slice& value) {
244
76
    char* cache_value = static_cast<char*>(malloc(value.size));
245
76
    memcpy(cache_value, value.data, value.size);
246
76
    auto* row_cache_value = new RowCacheValue;
247
76
    row_cache_value->cache_value = cache_value;
248
76
    const std::string& encoded_key = key.encode();
249
76
    auto* handle = LRUCachePolicy::insert(encoded_key, row_cache_value, value.size, value.size,
250
76
                                          CachePriority::NORMAL);
251
    // handle will released
252
76
    auto tmp = CacheHandle {this, handle};
253
76
}
254
255
93
void RowCache::erase(const RowCacheKey& key) {
256
93
    const std::string& encoded_key = key.encode();
257
93
    LRUCachePolicy::erase(encoded_key);
258
93
}
259
260
2.09k
LookupConnectionCache::CacheValue::~CacheValue() {
261
2.09k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
262
2.09k
            ExecEnv::GetInstance()->point_query_executor_mem_tracker());
263
2.09k
    item.reset();
264
2.09k
}
265
266
263
PointQueryExecutor::~PointQueryExecutor() {
267
263
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
268
263
            ExecEnv::GetInstance()->point_query_executor_mem_tracker());
269
263
    _tablet.reset();
270
263
    _reusable.reset();
271
263
    _result_block.reset();
272
263
    _row_read_ctxs.clear();
273
263
}
274
275
Status PointQueryExecutor::init(const PTabletKeyLookupRequest* request,
276
263
                                PTabletKeyLookupResponse* response) {
277
263
    SCOPED_TIMER(&_profile_metrics.init_ns);
278
263
    _response = response;
279
    // using cache
280
263
    __int128_t uuid =
281
263
            static_cast<__int128_t>(request->uuid().uuid_high()) << 64 | request->uuid().uuid_low();
282
263
    SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->point_query_executor_mem_tracker());
283
263
    auto cache_handle = LookupConnectionCache::instance()->get(uuid);
284
263
    _binary_row_format = request->is_binary_row();
285
263
    _tablet = DORIS_TRY(ExecEnv::get_tablet(request->tablet_id()));
286
263
    if (cache_handle != nullptr) {
287
109
        _reusable = cache_handle;
288
109
        _profile_metrics.hit_lookup_cache = true;
289
154
    } else {
290
        // init handle
291
154
        auto reusable_ptr = std::make_shared<Reusable>();
292
154
        TDescriptorTable t_desc_tbl;
293
154
        TExprList t_output_exprs;
294
154
        auto len = cast_set<uint32_t>(request->desc_tbl().size());
295
154
        RETURN_IF_ERROR(
296
154
                deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(request->desc_tbl().data()),
297
154
                                       &len, false, &t_desc_tbl));
298
154
        len = cast_set<uint32_t>(request->output_expr().size());
299
154
        RETURN_IF_ERROR(deserialize_thrift_msg(
300
154
                reinterpret_cast<const uint8_t*>(request->output_expr().data()), &len, false,
301
154
                &t_output_exprs));
302
154
        _reusable = reusable_ptr;
303
154
        TQueryOptions t_query_options;
304
154
        len = cast_set<uint32_t>(request->query_options().size());
305
154
        if (request->has_query_options()) {
306
154
            RETURN_IF_ERROR(deserialize_thrift_msg(
307
154
                    reinterpret_cast<const uint8_t*>(request->query_options().data()), &len, false,
308
154
                    &t_query_options));
309
154
        }
310
154
        if (uuid != 0) {
311
            // could be reused by requests after, pre allocte more blocks
312
88
            RETURN_IF_ERROR(reusable_ptr->init(t_desc_tbl, t_output_exprs.exprs, t_query_options,
313
88
                                               *_tablet->tablet_schema(),
314
88
                                               s_preallocted_blocks_num));
315
88
            LookupConnectionCache::instance()->add(uuid, reusable_ptr);
316
88
        } else {
317
66
            RETURN_IF_ERROR(reusable_ptr->init(t_desc_tbl, t_output_exprs.exprs, t_query_options,
318
66
                                               *_tablet->tablet_schema(), 1));
319
66
        }
320
154
    }
321
    // Set timezone from request for functions like from_unixtime()
322
263
    if (request->has_time_zone() && !request->time_zone().empty()) {
323
263
        _reusable->runtime_state()->set_timezone(request->time_zone());
324
263
    }
325
263
    if (request->has_version() && request->version() >= 0) {
326
263
        _version = request->version();
327
263
    }
328
263
    RETURN_IF_ERROR(_init_keys(request));
329
263
    _result_block = _reusable->get_block();
330
263
    CHECK(_result_block != nullptr);
331
332
263
    return Status::OK();
333
263
}
334
335
263
Status PointQueryExecutor::lookup_up() {
336
263
    SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->point_query_executor_mem_tracker());
337
263
    RETURN_IF_ERROR(_lookup_row_key());
338
263
    RETURN_IF_ERROR(_lookup_row_data());
339
260
    RETURN_IF_ERROR(_output_data());
340
260
    return Status::OK();
341
260
}
342
343
260
void PointQueryExecutor::print_profile() {
344
260
    auto init_us = _profile_metrics.init_ns.value() / 1000;
345
260
    auto init_key_us = _profile_metrics.init_key_ns.value() / 1000;
346
260
    auto lookup_key_us = _profile_metrics.lookup_key_ns.value() / 1000;
347
260
    auto lookup_data_us = _profile_metrics.lookup_data_ns.value() / 1000;
348
260
    auto output_data_us = _profile_metrics.output_data_ns.value() / 1000;
349
260
    auto load_segments_key_us = _profile_metrics.load_segment_key_stage_ns.value() / 1000;
350
260
    auto load_segments_data_us = _profile_metrics.load_segment_data_stage_ns.value() / 1000;
351
260
    auto total_us = init_us + lookup_key_us + lookup_data_us + output_data_us;
352
260
    auto read_stats = _profile_metrics.read_stats;
353
260
    const std::string stats_str = fmt::format(
354
260
            "[lookup profile:{}us] init:{}us, init_key:{}us,"
355
260
            " lookup_key:{}us, load_segments_key:{}us, lookup_data:{}us, load_segments_data:{}us,"
356
260
            " output_data:{}us, "
357
260
            "hit_lookup_cache:{}"
358
260
            ", is_binary_row:{}, output_columns:{}, total_keys:{}, row_cache_hits:{}"
359
260
            ", hit_cached_pages:{}, total_pages_read:{}, compressed_bytes_read:{}, "
360
260
            "io_latency:{}ns, "
361
260
            "uncompressed_bytes_read:{}, result_data_bytes:{}, row_hits:{}"
362
260
            ", rs_column_uid:{}, bytes_read_from_local:{}, bytes_read_from_remote:{}, "
363
260
            "local_io_timer:{}, remote_io_timer:{}, local_write_timer:{}",
364
260
            total_us, init_us, init_key_us, lookup_key_us, load_segments_key_us, lookup_data_us,
365
260
            load_segments_data_us, output_data_us, _profile_metrics.hit_lookup_cache,
366
260
            _binary_row_format, _reusable->output_exprs().size(), _row_read_ctxs.size(),
367
260
            _profile_metrics.row_cache_hits, read_stats.cached_pages_num,
368
260
            read_stats.total_pages_num, read_stats.compressed_bytes_read, read_stats.io_ns,
369
260
            read_stats.uncompressed_bytes_read, _profile_metrics.result_data_bytes, _row_hits,
370
260
            _reusable->rs_column_uid(),
371
260
            _profile_metrics.read_stats.file_cache_stats.bytes_read_from_local,
372
260
            _profile_metrics.read_stats.file_cache_stats.bytes_read_from_remote,
373
260
            _profile_metrics.read_stats.file_cache_stats.local_io_timer,
374
260
            _profile_metrics.read_stats.file_cache_stats.remote_io_timer,
375
260
            _profile_metrics.read_stats.file_cache_stats.write_cache_io_timer);
376
377
260
    constexpr static int kSlowThreholdUs = 50 * 1000; // 50ms
378
260
    if (total_us > kSlowThreholdUs) {
379
2
        LOG(WARNING) << "slow query, " << stats_str;
380
258
    } else if (VLOG_DEBUG_IS_ON) {
381
0
        VLOG_DEBUG << stats_str;
382
258
    } else {
383
258
        LOG_EVERY_N(INFO, 1000) << stats_str;
384
258
    }
385
260
}
386
387
263
Status PointQueryExecutor::_init_keys(const PTabletKeyLookupRequest* request) {
388
263
    SCOPED_TIMER(&_profile_metrics.init_key_ns);
389
    // 1. get primary key from conditions
390
263
    std::vector<OlapTuple> olap_tuples;
391
263
    olap_tuples.resize(request->key_tuples().size());
392
526
    for (int i = 0; i < request->key_tuples().size(); ++i) {
393
263
        const KeyTuple& key_tuple = request->key_tuples(i);
394
946
        for (const std::string& key_col : key_tuple.key_column_rep()) {
395
946
            olap_tuples[i].add_value(key_col);
396
946
        }
397
263
    }
398
263
    _row_read_ctxs.resize(olap_tuples.size());
399
    // get row cursor and encode keys
400
526
    for (size_t i = 0; i < olap_tuples.size(); ++i) {
401
263
        RowCursor cursor;
402
263
        RETURN_IF_ERROR(cursor.init_scan_key(_tablet->tablet_schema(), olap_tuples[i].values()));
403
263
        RETURN_IF_ERROR(cursor.from_tuple(olap_tuples[i]));
404
263
        cursor.encode_key_with_padding<true>(&_row_read_ctxs[i]._primary_key,
405
263
                                             _tablet->tablet_schema()->num_key_columns(), true);
406
263
    }
407
263
    return Status::OK();
408
263
}
409
410
263
Status PointQueryExecutor::_lookup_row_key() {
411
263
    SCOPED_TIMER(&_profile_metrics.lookup_key_ns);
412
    // 2. lookup row location
413
263
    Status st;
414
263
    if (_version >= 0) {
415
263
        CHECK(config::is_cloud_mode()) << "Only cloud mode support snapshot read at present";
416
263
        SyncOptions options;
417
263
        options.query_version = _version;
418
263
        RETURN_IF_ERROR(std::dynamic_pointer_cast<CloudTablet>(_tablet)->sync_rowsets(options));
419
263
    }
420
263
    std::vector<RowsetSharedPtr> specified_rowsets;
421
263
    {
422
263
        std::shared_lock rlock(_tablet->get_header_lock());
423
263
        specified_rowsets = _tablet->get_rowset_by_ids(nullptr);
424
263
    }
425
263
    std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());
426
526
    for (size_t i = 0; i < _row_read_ctxs.size(); ++i) {
427
263
        RowLocation location;
428
263
        if (!config::disable_storage_row_cache) {
429
142
            RowCache::CacheHandle cache_handle;
430
142
            auto hit_cache = RowCache::instance()->lookup(
431
142
                    {_tablet->tablet_id(), _row_read_ctxs[i]._primary_key}, &cache_handle);
432
142
            if (hit_cache) {
433
71
                _row_read_ctxs[i]._cached_row_data = std::move(cache_handle);
434
71
                ++_profile_metrics.row_cache_hits;
435
71
                continue;
436
71
            }
437
142
        }
438
        // Get rowlocation and rowset, ctx._rowset_ptr will acquire wrap this ptr
439
192
        auto rowset_ptr = std::make_unique<RowsetSharedPtr>();
440
192
        st = (_tablet->lookup_row_key(_row_read_ctxs[i]._primary_key, nullptr, false,
441
192
                                      specified_rowsets, &location, INT32_MAX /*rethink?*/,
442
192
                                      segment_caches, rowset_ptr.get(), false, nullptr,
443
192
                                      &_profile_metrics.read_stats));
444
192
        if (st.is<ErrorCode::KEY_NOT_FOUND>()) {
445
9
            continue;
446
9
        }
447
183
        RETURN_IF_ERROR(st);
448
183
        _row_read_ctxs[i]._row_location = location;
449
        // acquire and wrap this rowset
450
183
        (*rowset_ptr)->acquire();
451
183
        VLOG_DEBUG << "aquire rowset " << (*rowset_ptr)->rowset_id();
452
183
        _row_read_ctxs[i]._rowset_ptr = std::unique_ptr<RowsetSharedPtr, decltype(&release_rowset)>(
453
183
                rowset_ptr.release(), &release_rowset);
454
183
        _row_hits++;
455
183
    }
456
263
    return Status::OK();
457
263
}
458
459
263
Status PointQueryExecutor::_lookup_row_data() {
460
    // 3. get values
461
263
    SCOPED_TIMER(&_profile_metrics.lookup_data_ns);
462
523
    for (size_t i = 0; i < _row_read_ctxs.size(); ++i) {
463
263
        if (_row_read_ctxs[i]._cached_row_data.valid()) {
464
71
            RETURN_IF_ERROR(JsonbSerializeUtil::jsonb_to_block(
465
71
                    _reusable->get_data_type_serdes(),
466
71
                    _row_read_ctxs[i]._cached_row_data.data().data,
467
71
                    _row_read_ctxs[i]._cached_row_data.data().size, _reusable->get_col_uid_to_idx(),
468
71
                    *_result_block, _reusable->get_col_default_values(),
469
71
                    _reusable->include_col_uids()));
470
71
            continue;
471
71
        }
472
192
        if (!_row_read_ctxs[i]._row_location.has_value()) {
473
9
            continue;
474
9
        }
475
183
        std::string value;
476
        // fill block by row store
477
183
        if (_reusable->rs_column_uid() != -1) {
478
183
            bool use_row_cache = !config::disable_storage_row_cache;
479
183
            RETURN_IF_ERROR(_tablet->lookup_row_data(
480
183
                    _row_read_ctxs[i]._primary_key, _row_read_ctxs[i]._row_location.value(),
481
183
                    *(_row_read_ctxs[i]._rowset_ptr), _profile_metrics.read_stats, value,
482
183
                    use_row_cache));
483
            // serilize value to block, currently only jsonb row formt
484
183
            RETURN_IF_ERROR(JsonbSerializeUtil::jsonb_to_block(
485
183
                    _reusable->get_data_type_serdes(), value.data(), value.size(),
486
183
                    _reusable->get_col_uid_to_idx(), *_result_block,
487
183
                    _reusable->get_col_default_values(), _reusable->include_col_uids()));
488
183
        }
489
183
        if (!_reusable->missing_col_uids().empty()) {
490
10
            if (!_reusable->runtime_state()->enable_short_circuit_query_access_column_store()) {
491
3
                std::string missing_columns;
492
22
                for (int cid : _reusable->missing_col_uids()) {
493
22
                    missing_columns += _tablet->tablet_schema()->column_by_uid(cid).name() + ",";
494
22
                }
495
3
                return Status::InternalError(
496
3
                        "Not support column store, set store_row_column=true or row_store_columns "
497
3
                        "in table "
498
3
                        "properties, missing columns: " +
499
3
                        missing_columns + " should be added to row store");
500
3
            }
501
            // fill missing columns by column store
502
7
            RowLocation row_loc = _row_read_ctxs[i]._row_location.value();
503
7
            BetaRowsetSharedPtr rowset =
504
7
                    std::static_pointer_cast<BetaRowset>(_tablet->get_rowset(row_loc.rowset_id));
505
7
            SegmentCacheHandle segment_cache;
506
7
            {
507
7
                SCOPED_TIMER(&_profile_metrics.load_segment_data_stage_ns);
508
7
                RETURN_IF_ERROR(
509
7
                        SegmentLoader::instance()->load_segments(rowset, &segment_cache, true));
510
7
            }
511
            // find segment
512
7
            auto it = std::find_if(segment_cache.get_segments().cbegin(),
513
7
                                   segment_cache.get_segments().cend(),
514
7
                                   [&](const segment_v2::SegmentSharedPtr& seg) {
515
7
                                       return seg->id() == row_loc.segment_id;
516
7
                                   });
517
7
            const auto& segment = *it;
518
8
            for (int cid : _reusable->missing_col_uids()) {
519
8
                int pos = _reusable->get_col_uid_to_idx().at(cid);
520
8
                auto row_id = static_cast<segment_v2::rowid_t>(row_loc.row_id);
521
8
                MutableColumnPtr column =
522
8
                        _result_block->get_by_position(pos).column->assume_mutable();
523
8
                std::unique_ptr<ColumnIterator> iter;
524
8
                SlotDescriptor* slot = _reusable->tuple_desc()->slots()[pos];
525
8
                StorageReadOptions storage_read_options;
526
8
                storage_read_options.stats = &_read_stats;
527
8
                storage_read_options.io_ctx.reader_type = ReaderType::READER_QUERY;
528
8
                RETURN_IF_ERROR(segment->seek_and_read_by_rowid(*_tablet->tablet_schema(), slot,
529
8
                                                                row_id, column,
530
8
                                                                storage_read_options, iter));
531
8
                if (_tablet->tablet_schema()
532
8
                            ->column_by_uid(slot->col_unique_id())
533
8
                            .has_char_type()) {
534
1
                    column->shrink_padding_chars();
535
1
                }
536
8
            }
537
7
        }
538
183
    }
539
260
    if (_result_block->columns() > _reusable->include_col_uids().size()) {
540
        // Padding rows for some columns that no need to output to mysql client
541
        // eg. SELECT k1,v1,v2 FROM TABLE WHERE k1 = 1, k1 is not in output slots, tuple as bellow
542
        // TupleDescriptor{id=1, tbl=table_with_column_group}
543
        // SlotDescriptor{id=8, col=v1, colUniqueId=1 ...}
544
        // SlotDescriptor{id=9, col=v2, colUniqueId=2 ...}
545
        // thus missing in include_col_uids and missing_col_uids
546
2.90k
        for (size_t i = 0; i < _result_block->columns(); ++i) {
547
2.64k
            auto column = _result_block->get_by_position(i).column;
548
2.64k
            int padding_rows = _row_hits - cast_set<int>(column->size());
549
2.64k
            if (padding_rows > 0) {
550
1
                column->assume_mutable()->insert_many_defaults(padding_rows);
551
1
            }
552
2.64k
        }
553
260
    }
554
    // filter rows by delete sign
555
260
    if (_row_hits > 0 && _reusable->delete_sign_idx() != -1) {
556
179
        size_t filtered = 0;
557
179
        size_t total = 0;
558
179
        {
559
            // clear_column_data will check reference of ColumnPtr, so we need to release
560
            // reference before clear_column_data
561
179
            ColumnPtr delete_filter_columns =
562
179
                    _result_block->get_columns()[_reusable->delete_sign_idx()];
563
179
            const auto& filter =
564
179
                    assert_cast<const ColumnInt8*>(delete_filter_columns.get())->get_data();
565
179
            filtered = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size());
566
179
            total = filter.size();
567
179
        }
568
569
179
        if (filtered == total) {
570
2
            _result_block->clear_column_data();
571
177
        } else if (filtered > 0) {
572
0
            return Status::NotSupported("Not implemented since only single row at present");
573
0
        }
574
179
    }
575
260
    return Status::OK();
576
260
}
577
578
249
Status serialize_block(std::shared_ptr<TFetchDataResult> res, PTabletKeyLookupResponse* response) {
579
249
    uint8_t* buf = nullptr;
580
249
    uint32_t len = 0;
581
249
    ThriftSerializer ser(false, 4096);
582
249
    RETURN_IF_ERROR(ser.serialize(&(res->result_batch), &len, &buf));
583
249
    response->set_row_batch(std::string((const char*)buf, len));
584
249
    return Status::OK();
585
249
}
586
587
260
Status PointQueryExecutor::_output_data() {
588
    // 4. exprs exec and serialize to mysql row batches
589
260
    SCOPED_TIMER(&_profile_metrics.output_data_ns);
590
260
    if (_result_block->rows()) {
591
249
        RuntimeState state;
592
249
        auto buffer = std::make_shared<PointQueryResultBlockBuffer>(&state);
593
        // TODO reuse mysql_writer
594
249
        VMysqlResultWriter mysql_writer(buffer, _reusable->output_exprs(), nullptr,
595
249
                                        _binary_row_format);
596
249
        RETURN_IF_ERROR(mysql_writer.init(_reusable->runtime_state()));
597
249
        _result_block->clear_names();
598
249
        RETURN_IF_ERROR(mysql_writer.write(_reusable->runtime_state(), *_result_block));
599
249
        RETURN_IF_ERROR(serialize_block(buffer->get_block(), _response));
600
249
        VLOG_DEBUG << "dump block " << _result_block->dump_data();
601
249
    } else {
602
11
        _response->set_empty_batch(true);
603
11
    }
604
260
    _profile_metrics.result_data_bytes = _result_block->bytes();
605
260
    _reusable->return_block(_result_block);
606
260
    return Status::OK();
607
260
}
608
609
#include "common/compile_check_end.h"
610
611
} // namespace doris