Coverage Report

Created: 2026-06-01 13:25

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/rowid_fetcher.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/rowid_fetcher.h"
19
20
#include <brpc/callback.h>
21
#include <butil/endpoint.h>
22
#include <fmt/format.h>
23
#include <gen_cpp/data.pb.h>
24
#include <gen_cpp/internal_service.pb.h>
25
#include <gen_cpp/olap_file.pb.h>
26
#include <gen_cpp/types.pb.h>
27
#include <glog/logging.h>
28
#include <stddef.h>
29
#include <stdint.h>
30
31
#include <algorithm>
32
#include <cstdint>
33
#include <memory>
34
#include <ostream>
35
#include <string>
36
#include <unordered_map>
37
#include <utility>
38
#include <vector>
39
40
#include "bthread/countdown_event.h"
41
#include "common/config.h"
42
#include "common/consts.h"
43
#include "common/exception.h"
44
#include "common/signal_handler.h"
45
#include "core/assert_cast.h"
46
#include "core/block/block.h" // Block
47
#include "core/column/column.h"
48
#include "core/column/column_nullable.h"
49
#include "core/column/column_string.h"
50
#include "core/data_type/data_type_struct.h"
51
#include "core/data_type_serde/data_type_serde.h"
52
#include "core/string_ref.h"
53
#include "exec/scan/file_scanner.h"
54
#include "format/orc/vorc_reader.h"
55
#include "format/parquet/vparquet_reader.h"
56
#include "runtime/descriptors.h"
57
#include "runtime/exec_env.h"      // ExecEnv
58
#include "runtime/fragment_mgr.h"  // FragmentMgr
59
#include "runtime/runtime_state.h" // RuntimeState
60
#include "runtime/workload_group/workload_group_manager.h"
61
#include "semaphore"
62
#include "storage/olap_common.h"
63
#include "storage/rowset/beta_rowset.h"
64
#include "storage/segment/column_reader.h"
65
#include "storage/storage_engine.h"
66
#include "storage/tablet/tablet_fwd.h"
67
#include "storage/tablet/tablet_schema.h"
68
#include "storage/tablet_info.h" // DorisNodesInfo
69
#include "storage/utils.h"
70
#include "util/brpc_client_cache.h" // BrpcClientCache
71
#include "util/defer_op.h"
72
#include "util/jsonb/serialize.h"
73
74
namespace doris {
75
76
0
Status RowIDFetcher::init() {
77
0
    DorisNodesInfo nodes_info;
78
0
    nodes_info.setNodes(_fetch_option.t_fetch_opt.nodes_info);
79
0
    for (auto [node_id, node_info] : nodes_info.nodes_info()) {
80
0
        auto client = ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
81
0
                node_info.host, node_info.brpc_port);
82
0
        if (!client) {
83
0
            LOG(WARNING) << "Get rpc stub failed, host=" << node_info.host
84
0
                         << ", port=" << node_info.brpc_port;
85
0
            return Status::InternalError("RowIDFetcher failed to init rpc client, host={}, port={}",
86
0
                                         node_info.host, node_info.brpc_port);
87
0
        }
88
0
        _stubs.push_back(client);
89
0
    }
90
0
    return Status::OK();
91
0
}
92
93
0
PMultiGetRequest RowIDFetcher::_init_fetch_request(const ColumnString& row_locs) const {
94
0
    PMultiGetRequest mget_req;
95
0
    _fetch_option.desc->to_protobuf(mget_req.mutable_desc());
96
0
    for (SlotDescriptor* slot : _fetch_option.desc->slots()) {
97
        // ignore rowid
98
0
        if (slot->col_name() == BeConsts::ROWID_COL) {
99
0
            continue;
100
0
        }
101
0
        slot->to_protobuf(mget_req.add_slots());
102
0
    }
103
0
    for (size_t i = 0; i < row_locs.size(); ++i) {
104
0
        PRowLocation row_loc;
105
0
        StringRef row_id_rep = row_locs.get_data_at(i);
106
        // TODO: When transferring data between machines with different byte orders (endianness),
107
        // not performing proper handling may lead to issues in parsing and exchanging the data.
108
0
        auto location = reinterpret_cast<const GlobalRowLoacation*>(row_id_rep.data);
109
0
        row_loc.set_tablet_id(location->tablet_id);
110
0
        row_loc.set_rowset_id(location->row_location.rowset_id.to_string());
111
0
        row_loc.set_segment_id(location->row_location.segment_id);
112
0
        row_loc.set_ordinal_id(location->row_location.row_id);
113
0
        *mget_req.add_row_locs() = std::move(row_loc);
114
0
    }
115
    // Set column desc
116
0
    for (const TColumn& tcolumn : _fetch_option.t_fetch_opt.column_desc) {
117
0
        TabletColumn column(tcolumn);
118
0
        column.to_schema_pb(mget_req.add_column_desc());
119
0
    }
120
0
    PUniqueId& query_id = *mget_req.mutable_query_id();
121
0
    query_id.set_hi(_fetch_option.runtime_state->query_id().hi);
122
0
    query_id.set_lo(_fetch_option.runtime_state->query_id().lo);
123
0
    mget_req.set_be_exec_version(_fetch_option.runtime_state->be_exec_version());
124
0
    mget_req.set_fetch_row_store(_fetch_option.t_fetch_opt.fetch_row_store);
125
0
    return mget_req;
126
0
}
127
128
Status RowIDFetcher::_merge_rpc_results(const PMultiGetRequest& request,
129
                                        const std::vector<PMultiGetResponse>& rsps,
130
                                        const std::vector<brpc::Controller>& cntls,
131
                                        Block* output_block,
132
0
                                        std::vector<PRowLocation>* rows_id) const {
133
0
    output_block->clear();
134
0
    for (const auto& cntl : cntls) {
135
0
        if (cntl.Failed()) {
136
0
            LOG(WARNING) << "Failed to fetch meet rpc error:" << cntl.ErrorText()
137
0
                         << ", host:" << cntl.remote_side();
138
0
            return Status::InternalError(cntl.ErrorText());
139
0
        }
140
0
    }
141
0
    DataTypeSerDeSPtrs serdes;
142
0
    std::unordered_map<uint32_t, uint32_t> col_uid_to_idx;
143
0
    std::vector<std::string> default_values;
144
0
    default_values.resize(_fetch_option.desc->slots().size());
145
0
    auto merge_function = [&](const PMultiGetResponse& resp) {
146
0
        Status st(Status::create(resp.status()));
147
0
        if (!st.ok()) {
148
0
            LOG(WARNING) << "Failed to fetch " << st.to_string();
149
0
            return st;
150
0
        }
151
0
        for (const PRowLocation& row_id : resp.row_locs()) {
152
0
            rows_id->push_back(row_id);
153
0
        }
154
        // Merge binary rows
155
0
        if (request.fetch_row_store()) {
156
0
            CHECK(resp.row_locs().size() == resp.binary_row_data_size());
157
0
            if (output_block->is_empty_column()) {
158
0
                *output_block = Block(_fetch_option.desc->slots(), 1);
159
0
            }
160
0
            if (serdes.empty() && col_uid_to_idx.empty()) {
161
0
                serdes = create_data_type_serdes(_fetch_option.desc->slots());
162
0
                for (int i = 0; i < _fetch_option.desc->slots().size(); ++i) {
163
0
                    col_uid_to_idx[_fetch_option.desc->slots()[i]->col_unique_id()] = i;
164
0
                    default_values[i] = _fetch_option.desc->slots()[i]->col_default_value();
165
0
                }
166
0
            }
167
0
            auto output_columns_guard = output_block->mutate_columns_scoped();
168
0
            MutableColumns& output_columns = output_columns_guard.mutable_columns();
169
0
            for (int i = 0; i < resp.binary_row_data_size(); ++i) {
170
0
                RETURN_IF_ERROR(JsonbSerializeUtil::jsonb_to_columns(
171
0
                        serdes, resp.binary_row_data(i).data(), resp.binary_row_data(i).size(),
172
0
                        col_uid_to_idx, output_columns, default_values, {}));
173
0
            }
174
0
            return Status::OK();
175
0
        }
176
        // Merge partial blocks
177
0
        Block partial_block;
178
0
        [[maybe_unused]] size_t uncompressed_size = 0;
179
0
        [[maybe_unused]] int64_t uncompressed_time = 0;
180
181
0
        RETURN_IF_ERROR(
182
0
                partial_block.deserialize(resp.block(), &uncompressed_size, &uncompressed_time));
183
0
        if (partial_block.is_empty_column()) {
184
0
            return Status::OK();
185
0
        }
186
0
        CHECK(resp.row_locs().size() == partial_block.rows());
187
0
        if (output_block->is_empty_column()) {
188
0
            output_block->swap(partial_block);
189
0
        } else if (partial_block.columns() != output_block->columns()) {
190
0
            return Status::Error<ErrorCode::INTERNAL_ERROR>(
191
0
                    "Merge block not match, self:[{}], input:[{}], ", output_block->dump_types(),
192
0
                    partial_block.dump_types());
193
0
        } else {
194
0
            for (int i = 0; i < output_block->columns(); ++i) {
195
0
                auto column_guard = output_block->mutate_column_scoped(i);
196
0
                MutableColumnPtr& column = column_guard.mutable_column();
197
0
                column->insert_range_from(
198
0
                        *partial_block.get_by_position(i).column->convert_to_full_column_if_const(),
199
0
                        0, partial_block.rows());
200
0
            }
201
0
        }
202
0
        return Status::OK();
203
0
    };
204
205
0
    for (const auto& resp : rsps) {
206
0
        RETURN_IF_ERROR(merge_function(resp));
207
0
    }
208
0
    return Status::OK();
209
0
}
210
211
0
Status RowIDFetcher::fetch(const ColumnPtr& column_row_ids, Block* res_block) {
212
0
    CHECK(!_stubs.empty());
213
0
    PMultiGetRequest mget_req = _init_fetch_request(
214
0
            assert_cast<const ColumnString&>(*remove_nullable(column_row_ids).get()));
215
0
    std::vector<PMultiGetResponse> resps(_stubs.size());
216
0
    std::vector<brpc::Controller> cntls(_stubs.size());
217
0
    bthread::CountdownEvent counter(cast_set<int>(_stubs.size()));
218
0
    for (size_t i = 0; i < _stubs.size(); ++i) {
219
0
        cntls[i].set_timeout_ms(_fetch_option.runtime_state->execution_timeout() * 1000);
220
0
        auto callback = brpc::NewCallback(fetch_callback, &counter);
221
0
        _stubs[i]->multiget_data(&cntls[i], &mget_req, &resps[i], callback);
222
0
    }
223
0
    counter.wait();
224
225
    // Merge
226
0
    std::vector<PRowLocation> rows_locs;
227
0
    rows_locs.reserve(rows_locs.size());
228
0
    RETURN_IF_ERROR(_merge_rpc_results(mget_req, resps, cntls, res_block, &rows_locs));
229
0
    if (rows_locs.size() < column_row_ids->size()) {
230
0
        return Status::InternalError("Miss matched return row loc count {}, expected {}, input {}",
231
0
                                     rows_locs.size(), res_block->rows(), column_row_ids->size());
232
0
    }
233
    // Final sort by row_ids sequence, since row_ids is already sorted if need
234
0
    std::map<GlobalRowLoacation, size_t> positions;
235
0
    for (size_t i = 0; i < rows_locs.size(); ++i) {
236
0
        RowsetId rowset_id;
237
0
        rowset_id.init(rows_locs[i].rowset_id());
238
0
        GlobalRowLoacation grl(rows_locs[i].tablet_id(), rowset_id,
239
0
                               cast_set<uint32_t>(rows_locs[i].segment_id()),
240
0
                               cast_set<uint32_t>(rows_locs[i].ordinal_id()));
241
0
        positions[grl] = i;
242
0
    };
243
    // TODO remove this warning code
244
0
    if (positions.size() < rows_locs.size()) {
245
0
        LOG(WARNING) << "cwntains duplicated row entry";
246
0
    }
247
0
    IColumn::Permutation permutation;
248
0
    permutation.reserve(column_row_ids->size());
249
0
    for (size_t i = 0; i < column_row_ids->size(); ++i) {
250
0
        auto location =
251
0
                reinterpret_cast<const GlobalRowLoacation*>(column_row_ids->get_data_at(i).data);
252
0
        permutation.push_back(positions[*location]);
253
0
    }
254
0
    for (size_t i = 0; i < res_block->columns(); ++i) {
255
0
        res_block->get_by_position(i).column =
256
0
                res_block->get_by_position(i).column->permute(permutation, permutation.size());
257
0
    }
258
    // Check row consistency
259
0
    RETURN_IF_CATCH_EXCEPTION(res_block->check_number_of_rows());
260
0
    VLOG_DEBUG << "dump block:" << res_block->dump_data(0, 10);
261
0
    return Status::OK();
262
0
}
263
264
struct IteratorKey {
265
    int64_t tablet_id;
266
    RowsetId rowset_id;
267
    uint64_t segment_id;
268
    int slot_id;
269
270
    // unordered map std::equal_to
271
6
    bool operator==(const IteratorKey& rhs) const {
272
6
        return tablet_id == rhs.tablet_id && rowset_id == rhs.rowset_id &&
273
6
               segment_id == rhs.segment_id && slot_id == rhs.slot_id;
274
6
    }
275
};
276
277
struct SegKey {
278
    int64_t tablet_id;
279
    RowsetId rowset_id;
280
    uint64_t segment_id;
281
282
    // unordered map std::equal_to
283
6
    bool operator==(const SegKey& rhs) const {
284
6
        return tablet_id == rhs.tablet_id && rowset_id == rhs.rowset_id &&
285
6
               segment_id == rhs.segment_id;
286
6
    }
287
};
288
289
struct HashOfSegKey {
290
24
    size_t operator()(const SegKey& key) const {
291
24
        size_t seed = 0;
292
24
        seed = HashUtil::hash64(&key.tablet_id, sizeof(key.tablet_id), seed);
293
24
        seed = HashUtil::hash64(&key.rowset_id.hi, sizeof(key.rowset_id.hi), seed);
294
24
        seed = HashUtil::hash64(&key.rowset_id.mi, sizeof(key.rowset_id.mi), seed);
295
24
        seed = HashUtil::hash64(&key.rowset_id.lo, sizeof(key.rowset_id.lo), seed);
296
24
        seed = HashUtil::hash64(&key.segment_id, sizeof(key.segment_id), seed);
297
24
        return seed;
298
24
    }
299
};
300
301
struct HashOfIteratorKey {
302
12
    size_t operator()(const IteratorKey& key) const {
303
12
        size_t seed = 0;
304
12
        seed = HashUtil::hash64(&key.tablet_id, sizeof(key.tablet_id), seed);
305
12
        seed = HashUtil::hash64(&key.rowset_id.hi, sizeof(key.rowset_id.hi), seed);
306
12
        seed = HashUtil::hash64(&key.rowset_id.mi, sizeof(key.rowset_id.mi), seed);
307
12
        seed = HashUtil::hash64(&key.rowset_id.lo, sizeof(key.rowset_id.lo), seed);
308
12
        seed = HashUtil::hash64(&key.segment_id, sizeof(key.segment_id), seed);
309
12
        seed = HashUtil::hash64(&key.slot_id, sizeof(key.slot_id), seed);
310
12
        return seed;
311
12
    }
312
};
313
314
struct IteratorItem {
315
    std::unique_ptr<ColumnIterator> iterator;
316
    SegmentSharedPtr segment;
317
    // for holding the reference of storage read options to avoid use after release
318
    StorageReadOptions storage_read_options;
319
};
320
321
struct SegItem {
322
    BaseTabletSPtr tablet;
323
    BetaRowsetSharedPtr rowset;
324
    // for holding the reference of segment to avoid use after release
325
    SegmentSharedPtr segment;
326
};
327
328
// Groups all row_ids belonging to the same segment for batched reading.
329
// Position index tracks where each row_id originated in the original request,
330
// so results can be scattered back to the correct output positions.
331
struct DorisFormatReadBatch {
332
    std::shared_ptr<FileMapping> file_mapping;
333
    // (row_id, index_in_request) pairs for all rows in this segment.
334
    std::vector<std::pair<segment_v2::rowid_t, size_t>> row_ids_with_positions;
335
};
336
337
static void scatter_scan_blocks_to_result_block(
338
        const std::vector<std::pair<size_t, size_t>>& row_id_block_idx,
339
2.06k
        const std::vector<Block>& scan_blocks, Block& result_block) {
340
11.2k
    for (size_t column_id = 0; column_id < result_block.columns(); ++column_id) {
341
9.15k
        auto dst_col_guard = result_block.mutate_column_scoped(column_id);
342
9.15k
        MutableColumnPtr& dst_col = dst_col_guard.mutable_column();
343
344
9.15k
        std::vector<const IColumn*> scan_src_columns;
345
9.15k
        scan_src_columns.reserve(row_id_block_idx.size());
346
9.15k
        std::vector<size_t> scan_positions;
347
9.15k
        scan_positions.reserve(row_id_block_idx.size());
348
74.5k
        for (const auto& [pos_block, block_idx] : row_id_block_idx) {
349
74.5k
            DCHECK(scan_blocks.size() > pos_block);
350
74.5k
            DCHECK(scan_blocks[pos_block].columns() > column_id);
351
74.5k
            scan_src_columns.emplace_back(
352
74.5k
                    scan_blocks[pos_block].get_by_position(column_id).column.get());
353
74.5k
            scan_positions.emplace_back(block_idx);
354
74.5k
        }
355
9.15k
        dst_col->insert_from_multi_column(scan_src_columns, scan_positions);
356
9.15k
    }
357
2.06k
}
358
359
Status RowIdStorageReader::read_by_rowids(const PMultiGetRequest& request,
360
0
                                          PMultiGetResponse* response) {
361
    // read from storage engine row id by row id
362
0
    OlapReaderStatistics stats;
363
0
    Block result_block;
364
0
    int64_t acquire_tablet_ms = 0;
365
0
    int64_t acquire_rowsets_ms = 0;
366
0
    int64_t acquire_segments_ms = 0;
367
0
    int64_t lookup_row_data_ms = 0;
368
369
    // init desc
370
0
    std::vector<SlotDescriptor> slots;
371
0
    slots.reserve(request.slots().size());
372
0
    for (const auto& pslot : request.slots()) {
373
0
        slots.push_back(SlotDescriptor(pslot));
374
0
    }
375
376
    // init read schema
377
0
    TabletSchema full_read_schema;
378
0
    for (const ColumnPB& column_pb : request.column_desc()) {
379
0
        full_read_schema.append_column(TabletColumn(column_pb));
380
0
    }
381
382
0
    std::unordered_map<IteratorKey, IteratorItem, HashOfIteratorKey> iterator_map;
383
    // read row by row
384
0
    for (int i = 0; i < request.row_locs_size(); ++i) {
385
0
        const auto& row_loc = request.row_locs(i);
386
0
        MonotonicStopWatch watch;
387
0
        watch.start();
388
0
        BaseTabletSPtr tablet = scope_timer_run(
389
0
                [&]() {
390
0
                    auto res = ExecEnv::get_tablet(row_loc.tablet_id(), nullptr, true);
391
0
                    return !res.has_value() ? nullptr
392
0
                                            : std::dynamic_pointer_cast<BaseTablet>(res.value());
393
0
                },
394
0
                &acquire_tablet_ms);
395
0
        RowsetId rowset_id;
396
0
        rowset_id.init(row_loc.rowset_id());
397
0
        if (!tablet) {
398
0
            continue;
399
0
        }
400
        // We ensured it's rowset is not released when init Tablet reader param, rowset->update_delayed_expired_timestamp();
401
0
        BetaRowsetSharedPtr rowset = std::static_pointer_cast<BetaRowset>(scope_timer_run(
402
0
                [&]() {
403
0
                    return ExecEnv::GetInstance()->storage_engine().get_quering_rowset(rowset_id);
404
0
                },
405
0
                &acquire_rowsets_ms));
406
0
        if (!rowset) {
407
0
            LOG(INFO) << "no such rowset " << rowset_id;
408
0
            continue;
409
0
        }
410
0
        size_t row_size = 0;
411
0
        Defer _defer([&]() {
412
0
            LOG_EVERY_N(INFO, 100)
413
0
                    << "multiget_data single_row, cost(us):" << watch.elapsed_time() / 1000
414
0
                    << ", row_size:" << row_size;
415
0
            *response->add_row_locs() = row_loc;
416
0
        });
417
        // TODO: supoort session variable enable_page_cache and disable_file_cache if necessary.
418
0
        SegmentCacheHandle segment_cache;
419
0
        RETURN_IF_ERROR(scope_timer_run(
420
0
                [&]() {
421
0
                    return SegmentLoader::instance()->load_segments(rowset, &segment_cache, true);
422
0
                },
423
0
                &acquire_segments_ms));
424
        // find segment
425
0
        auto it = std::find_if(segment_cache.get_segments().cbegin(),
426
0
                               segment_cache.get_segments().cend(),
427
0
                               [&row_loc](const segment_v2::SegmentSharedPtr& seg) {
428
0
                                   return seg->id() == row_loc.segment_id();
429
0
                               });
430
0
        if (it == segment_cache.get_segments().end()) {
431
0
            continue;
432
0
        }
433
0
        segment_v2::SegmentSharedPtr segment = *it;
434
0
        GlobalRowLoacation row_location(row_loc.tablet_id(), rowset->rowset_id(),
435
0
                                        cast_set<uint32_t>(row_loc.segment_id()),
436
0
                                        cast_set<uint32_t>(row_loc.ordinal_id()));
437
        // fetch by row store, more effcient way
438
0
        if (request.fetch_row_store()) {
439
0
            if (!tablet->tablet_schema()->has_row_store_for_all_columns()) {
440
0
                return Status::InternalError("Tablet {} does not have row store for all columns",
441
0
                                             tablet->tablet_id());
442
0
            }
443
0
            RowLocation loc(rowset_id, segment->id(), cast_set<uint32_t>(row_loc.ordinal_id()));
444
0
            std::string* value = response->add_binary_row_data();
445
0
            RETURN_IF_ERROR(scope_timer_run(
446
0
                    [&]() { return tablet->lookup_row_data({}, loc, rowset, stats, *value); },
447
0
                    &lookup_row_data_ms));
448
0
            row_size = value->size();
449
0
            continue;
450
0
        }
451
452
        // fetch by column store
453
0
        if (result_block.is_empty_column()) {
454
0
            result_block = Block(slots, request.row_locs().size());
455
0
        }
456
0
        VLOG_DEBUG << "Read row location "
457
0
                   << fmt::format("{}, {}, {}, {}", row_location.tablet_id,
458
0
                                  row_location.row_location.rowset_id.to_string(),
459
0
                                  row_location.row_location.segment_id,
460
0
                                  row_location.row_location.row_id);
461
0
        for (int x = 0; x < slots.size(); ++x) {
462
0
            std::vector<segment_v2::rowid_t> row_ids {
463
0
                    static_cast<segment_v2::rowid_t>(row_loc.ordinal_id())};
464
0
            MutableColumnPtr column = result_block.get_by_position(x).column->assert_mutable();
465
0
            IteratorKey iterator_key {.tablet_id = tablet->tablet_id(),
466
0
                                      .rowset_id = rowset_id,
467
0
                                      .segment_id = row_loc.segment_id(),
468
0
                                      .slot_id = slots[x].id()};
469
0
            IteratorItem& iterator_item = iterator_map[iterator_key];
470
0
            if (iterator_item.segment == nullptr) {
471
                // hold the reference
472
0
                iterator_map[iterator_key].segment = segment;
473
0
                iterator_item.storage_read_options.stats = &stats;
474
0
                iterator_item.storage_read_options.io_ctx.reader_type = ReaderType::READER_QUERY;
475
0
            }
476
0
            segment = iterator_item.segment;
477
0
            RETURN_IF_ERROR(segment->seek_and_read_by_rowid(
478
0
                    full_read_schema, &slots[x], row_ids, column,
479
0
                    iterator_item.storage_read_options, iterator_item.iterator));
480
0
        }
481
0
    }
482
    // serialize block if not empty
483
0
    if (!result_block.is_empty_column()) {
484
0
        VLOG_DEBUG << "dump block:" << result_block.dump_data(0, 10)
485
0
                   << ", be_exec_version:" << request.be_exec_version();
486
0
        [[maybe_unused]] size_t compressed_size = 0;
487
0
        [[maybe_unused]] size_t uncompressed_size = 0;
488
0
        [[maybe_unused]] int64_t compress_time = 0;
489
0
        int be_exec_version = request.has_be_exec_version() ? request.be_exec_version() : 0;
490
0
        RETURN_IF_ERROR(result_block.serialize(be_exec_version, response->mutable_block(),
491
0
                                               &uncompressed_size, &compressed_size, &compress_time,
492
0
                                               segment_v2::CompressionTypePB::LZ4));
493
0
    }
494
495
0
    LOG(INFO) << "Query stats: "
496
0
              << fmt::format(
497
0
                         "query_id:{}, "
498
0
                         "hit_cached_pages:{}, total_pages_read:{}, compressed_bytes_read:{}, "
499
0
                         "io_latency:{}ns, "
500
0
                         "uncompressed_bytes_read:{},"
501
0
                         "bytes_read:{},"
502
0
                         "acquire_tablet_ms:{}, acquire_rowsets_ms:{}, acquire_segments_ms:{}, "
503
0
                         "lookup_row_data_ms:{}",
504
0
                         print_id(request.query_id()), stats.cached_pages_num,
505
0
                         stats.total_pages_num, stats.compressed_bytes_read, stats.io_ns,
506
0
                         stats.uncompressed_bytes_read, stats.bytes_read, acquire_tablet_ms,
507
0
                         acquire_rowsets_ms, acquire_segments_ms, lookup_row_data_ms);
508
0
    return Status::OK();
509
0
}
510
511
Status RowIdStorageReader::read_by_rowids(const PMultiGetRequestV2& request,
512
2.13k
                                          PMultiGetResponseV2* response) {
513
2.13k
    if (request.request_block_descs_size()) {
514
2.13k
        auto tquery_id = ((UniqueId)request.query_id()).to_thrift();
515
        // todo: use mutableBlock instead of block
516
2.13k
        std::vector<Block> result_blocks(request.request_block_descs_size());
517
518
2.13k
        OlapReaderStatistics stats;
519
2.13k
        int64_t acquire_tablet_ms = 0;
520
2.13k
        int64_t acquire_rowsets_ms = 0;
521
2.13k
        int64_t acquire_segments_ms = 0;
522
2.13k
        int64_t lookup_row_data_ms = 0;
523
524
2.13k
        int64_t external_init_reader_avg_ms = 0;
525
2.13k
        int64_t external_get_block_avg_ms = 0;
526
2.13k
        size_t external_scan_range_cnt = 0;
527
528
        // Add counters for different file mapping types
529
2.13k
        std::unordered_map<FileMappingType, int64_t> file_type_counts;
530
531
2.13k
        auto id_file_map =
532
2.13k
                ExecEnv::GetInstance()->get_id_manager()->get_id_file_map(request.query_id());
533
        // if id_file_map is null, means the BE not have scan range, just return ok
534
2.13k
        if (!id_file_map) {
535
            // padding empty block to response
536
0
            LOG(INFO) << "id_file_map not found for query_id: " << print_id(request.query_id());
537
0
            for (int i = 0; i < request.request_block_descs_size(); ++i) {
538
0
                response->add_blocks();
539
0
            }
540
0
            return Status::OK();
541
0
        }
542
543
4.33k
        for (int i = 0; i < request.request_block_descs_size(); ++i) {
544
2.20k
            const auto& request_block_desc = request.request_block_descs(i);
545
2.20k
            PMultiGetBlockV2* pblock = response->add_blocks();
546
2.20k
            if (request_block_desc.row_id_size() >= 1) {
547
                // Since this block belongs to the same table, we only need to take the first type for judgment.
548
2.06k
                auto first_file_id = request_block_desc.file_id(0);
549
2.06k
                auto first_file_mapping = id_file_map->get_file_mapping(first_file_id);
550
2.06k
                if (!first_file_mapping) {
551
0
                    return Status::InternalError(
552
0
                            "Backend:{} file_mapping not found, query_id: {}, file_id: {}",
553
0
                            BackendOptions::get_localhost(), print_id(request.query_id()),
554
0
                            first_file_id);
555
0
                }
556
2.06k
                file_type_counts[first_file_mapping->type] += request_block_desc.row_id_size();
557
558
                // prepare slots to build block
559
2.06k
                std::vector<SlotDescriptor> slots;
560
2.06k
                slots.reserve(request_block_desc.slots_size());
561
9.15k
                for (const auto& pslot : request_block_desc.slots()) {
562
9.15k
                    slots.push_back(SlotDescriptor(pslot));
563
9.15k
                }
564
2.06k
                try {
565
2.06k
                    if (first_file_mapping->type == FileMappingType::INTERNAL) {
566
6
                        RETURN_IF_ERROR(read_batch_doris_format_row(
567
6
                                request_block_desc, id_file_map, slots, tquery_id, result_blocks[i],
568
6
                                stats, &acquire_tablet_ms, &acquire_rowsets_ms,
569
6
                                &acquire_segments_ms, &lookup_row_data_ms));
570
2.06k
                    } else {
571
2.06k
                        RETURN_IF_ERROR(read_batch_external_row(
572
2.06k
                                request.wg_id(), request_block_desc, id_file_map, slots,
573
2.06k
                                first_file_mapping, tquery_id, result_blocks[i],
574
2.06k
                                pblock->mutable_profile(), &external_init_reader_avg_ms,
575
2.06k
                                &external_get_block_avg_ms, &external_scan_range_cnt));
576
2.06k
                    }
577
2.06k
                } catch (const Exception& e) {
578
0
                    return Status::Error<false>(e.code(), "Row id fetch failed because {}",
579
0
                                                e.what());
580
0
                }
581
2.06k
            }
582
583
2.20k
            [[maybe_unused]] size_t compressed_size = 0;
584
2.20k
            [[maybe_unused]] size_t uncompressed_size = 0;
585
2.20k
            [[maybe_unused]] int64_t compress_time = 0;
586
2.20k
            int be_exec_version = request.has_be_exec_version() ? request.be_exec_version() : 0;
587
2.20k
            RETURN_IF_ERROR(result_blocks[i].serialize(
588
2.20k
                    be_exec_version, pblock->mutable_block(), &uncompressed_size, &compressed_size,
589
2.20k
                    &compress_time, segment_v2::CompressionTypePB::LZ4));
590
2.20k
        }
591
592
        // Build file type statistics string
593
2.13k
        std::string file_type_stats;
594
2.13k
        for (const auto& [type, count] : file_type_counts) {
595
2.00k
            if (!file_type_stats.empty()) {
596
0
                file_type_stats += ", ";
597
0
            }
598
2.00k
            file_type_stats += fmt::format("{}:{}", type, count);
599
2.00k
        }
600
601
2.13k
        LOG(INFO) << "Query stats: "
602
2.13k
                  << fmt::format(
603
2.13k
                             "query_id:{}, "
604
2.13k
                             "Internal table:"
605
2.13k
                             "hit_cached_pages:{}, total_pages_read:{}, compressed_bytes_read:{}, "
606
2.13k
                             "io_latency:{}ns, uncompressed_bytes_read:{}, bytes_read:{}, "
607
2.13k
                             "acquire_tablet_ms:{}, acquire_rowsets_ms:{}, acquire_segments_ms:{}, "
608
2.13k
                             "lookup_row_data_ms:{}, file_types:[{}]; "
609
2.13k
                             "External table : init_reader_ms:{}, get_block_ms:{}, "
610
2.13k
                             "external_scan_range_cnt:{}",
611
2.13k
                             print_id(request.query_id()), stats.cached_pages_num,
612
2.13k
                             stats.total_pages_num, stats.compressed_bytes_read, stats.io_ns,
613
2.13k
                             stats.uncompressed_bytes_read, stats.bytes_read, acquire_tablet_ms,
614
2.13k
                             acquire_rowsets_ms, acquire_segments_ms, lookup_row_data_ms,
615
2.13k
                             file_type_stats, external_init_reader_avg_ms,
616
2.13k
                             external_get_block_avg_ms, external_scan_range_cnt);
617
2.13k
    }
618
619
2.13k
    return Status::OK();
620
2.13k
}
621
622
Status RowIdStorageReader::read_batch_doris_format_row(
623
        const PRequestBlockDesc& request_block_desc, std::shared_ptr<IdFileMap> id_file_map,
624
        std::vector<SlotDescriptor>& slots, const TUniqueId& query_id, Block& result_block,
625
        OlapReaderStatistics& stats, int64_t* acquire_tablet_ms, int64_t* acquire_rowsets_ms,
626
6
        int64_t* acquire_segments_ms, int64_t* lookup_row_data_ms) {
627
6
    if (result_block.is_empty_column()) [[likely]] {
628
6
        result_block = Block(slots, request_block_desc.row_id_size());
629
6
    }
630
6
    TabletSchema full_read_schema;
631
6
    for (const ColumnPB& column_pb : request_block_desc.column_descs()) {
632
6
        full_read_schema.append_column(TabletColumn(column_pb));
633
6
    }
634
635
6
    std::unordered_map<IteratorKey, IteratorItem, HashOfIteratorKey> iterator_map;
636
6
    std::unordered_map<SegKey, SegItem, HashOfSegKey> seg_map;
637
6
    std::string row_store_buffer;
638
6
    RowStoreReadStruct row_store_read_struct(row_store_buffer);
639
6
    if (request_block_desc.fetch_row_store()) {
640
0
        for (int i = 0; i < request_block_desc.slots_size(); ++i) {
641
0
            row_store_read_struct.serdes.emplace_back(slots[i].get_data_type_ptr()->get_serde());
642
0
            row_store_read_struct.col_uid_to_idx[slots[i].col_unique_id()] = i;
643
0
            row_store_read_struct.default_values.emplace_back(slots[i].col_default_value());
644
0
        }
645
0
    }
646
647
    // Phase 1: Group all row_ids by their (tablet_id, rowset_id, segment_id) key.
648
    // Unlike the old code which only batched adjacent rows with the same file_id,
649
    // this merges non-contiguous same-segment requests into a single batch,
650
    // maximizing the number of rows read per seek_and_read_by_rowid call.
651
6
    std::vector<DorisFormatReadBatch> scan_batches;
652
6
    std::unordered_map<SegKey, size_t, HashOfSegKey> batch_idx_by_seg;
653
    // (batch_idx, position_in_batch) for each row in the original request.
654
6
    std::vector<std::pair<size_t, size_t>> row_id_block_idx(request_block_desc.row_id_size());
655
18
    for (int j = 0; j < request_block_desc.row_id_size(); ++j) {
656
12
        auto file_id = request_block_desc.file_id(j);
657
12
        auto file_mapping = id_file_map->get_file_mapping(file_id);
658
12
        if (!file_mapping) {
659
0
            return Status::InternalError(
660
0
                    "Backend:{} file_mapping not found, query_id: {}, file_id: {}",
661
0
                    BackendOptions::get_localhost(), print_id(query_id), file_id);
662
0
        }
663
664
        // Derive segment key and group by it — rows from the same segment are batched together
665
        // even if they are interleaved with rows from other segments in the request.
666
12
        auto [tablet_id, rowset_id, segment_id] = file_mapping->get_doris_format_info();
667
12
        SegKey seg_key {.tablet_id = tablet_id, .rowset_id = rowset_id, .segment_id = segment_id};
668
12
        auto [it, inserted] = batch_idx_by_seg.emplace(seg_key, scan_batches.size());
669
12
        if (inserted) {
670
            // First time seeing this segment, create a new batch for it.
671
6
            scan_batches.emplace_back();
672
6
            scan_batches.back().file_mapping = file_mapping;
673
6
        }
674
        // Record (row_id, original_request_index) for later sorting and scattering.
675
12
        scan_batches[it->second].row_ids_with_positions.emplace_back(request_block_desc.row_id(j),
676
12
                                                                     j);
677
12
    }
678
679
    // Phase 2: For each segment, sort row_ids ascending (required by ColumnIterator),
680
    // deduplicate, then read all rows in a single batch call.
681
6
    std::vector<Block> scan_blocks(scan_batches.size());
682
12
    for (size_t batch_idx = 0; batch_idx < scan_batches.size(); ++batch_idx) {
683
6
        auto& scan_batch = scan_batches[batch_idx];
684
6
        auto& row_ids_with_positions = scan_batch.row_ids_with_positions;
685
6
        std::sort(row_ids_with_positions.begin(), row_ids_with_positions.end(),
686
6
                  [](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; });
687
688
        // Column iterators read rowids monotonically. Deduplicate consecutive identical row_ids
689
        // (different file_ids may map to the same row), then scatter rows back to their original
690
        // request positions.
691
6
        std::vector<uint32_t> row_ids;
692
6
        row_ids.reserve(row_ids_with_positions.size());
693
694
        // Also builds the scatter map: row_id_block_idx[original_request_idx] ->
695
        // (batch_idx, deduplicated_position_in_batch).
696
12
        for (const auto& [row_id, result_idx] : row_ids_with_positions) {
697
12
            if (row_ids.empty() || row_ids.back() != row_id) {
698
12
                row_ids.emplace_back(row_id);
699
12
            }
700
12
            row_id_block_idx[result_idx] = std::make_pair(batch_idx, row_ids.size() - 1);
701
12
        }
702
703
6
        scan_blocks[batch_idx] = Block(slots, row_ids.size());
704
6
        RETURN_IF_ERROR(read_doris_format_row(id_file_map, scan_batch.file_mapping, row_ids, slots,
705
6
                                              full_read_schema, row_store_read_struct, stats,
706
6
                                              acquire_tablet_ms, acquire_rowsets_ms,
707
6
                                              acquire_segments_ms, lookup_row_data_ms, seg_map,
708
6
                                              iterator_map, scan_blocks[batch_idx]));
709
6
    }
710
711
6
    scatter_scan_blocks_to_result_block(row_id_block_idx, scan_blocks, result_block);
712
713
6
    return Status::OK();
714
6
}
715
716
const std::string RowIdStorageReader::ScannersRunningTimeProfile = "ScannersRunningTime";
717
const std::string RowIdStorageReader::InitReaderAvgTimeProfile = "InitReaderAvgTime";
718
const std::string RowIdStorageReader::GetBlockAvgTimeProfile = "GetBlockAvgTime";
719
const std::string RowIdStorageReader::FileReadLinesProfile = "FileReadLines";
720
721
Status RowIdStorageReader::read_external_row_from_file_mapping(
722
        size_t idx, const std::multimap<segment_v2::rowid_t, size_t>& row_ids,
723
        const std::shared_ptr<FileMapping>& file_mapping, const std::vector<SlotDescriptor>& slots,
724
        const TUniqueId& query_id, const std::shared_ptr<RuntimeState>& runtime_state,
725
        std::vector<Block>& scan_blocks, std::vector<std::pair<size_t, size_t>>& row_id_block_idx,
726
        std::vector<RowIdStorageReader::ExternalFetchStatistics>& fetch_statistics,
727
        const TFileScanRangeParams& rpc_scan_params,
728
        const std::unordered_map<std::string, int>& colname_to_slot_id,
729
        std::atomic<int>& producer_count, size_t scan_rows_count,
730
        std::counting_semaphore<>& semaphore, std::condition_variable& cv, std::mutex& mtx,
731
3.22k
        TupleDescriptor& tuple_desc) {
732
3.22k
    SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->rowid_storage_reader_tracker());
733
3.22k
    signal::set_signal_task_id(query_id);
734
735
3.22k
    std::list<int64_t> read_ids;
736
    //Generate an ordered list with the help of the orderliness of the map.
737
13.3k
    for (const auto& [row_id, result_block_idx] : row_ids) {
738
13.3k
        if (read_ids.empty() || read_ids.back() != row_id) {
739
11.1k
            read_ids.emplace_back(row_id);
740
11.1k
        }
741
13.3k
        row_id_block_idx[result_block_idx] = std::make_pair(idx, read_ids.size() - 1);
742
13.3k
    }
743
744
3.22k
    scan_blocks[idx] = Block(slots, read_ids.size());
745
746
3.22k
    auto& external_info = file_mapping->get_external_file_info();
747
3.22k
    auto& scan_range_desc = external_info.scan_range_desc;
748
749
    // Clear to avoid reading iceberg position delete file...
750
3.22k
    scan_range_desc.table_format_params.iceberg_params = TIcebergFileDesc {};
751
752
    // Clear to avoid reading hive transactional delete delta file...
753
3.22k
    scan_range_desc.table_format_params.transactional_hive_params = TTransactionalHiveDesc {};
754
755
3.22k
    std::unique_ptr<RuntimeProfile> sub_runtime_profile =
756
3.22k
            std::make_unique<RuntimeProfile>("ExternalRowIDFetcher");
757
3.22k
    {
758
3.22k
        std::unique_ptr<FileScanner> vfile_scanner_ptr =
759
3.22k
                FileScanner::create_unique(runtime_state.get(), sub_runtime_profile.get(),
760
3.22k
                                           &rpc_scan_params, &colname_to_slot_id, &tuple_desc);
761
762
3.22k
        RETURN_IF_ERROR(vfile_scanner_ptr->prepare_for_read_lines(scan_range_desc));
763
3.22k
        RETURN_IF_ERROR(vfile_scanner_ptr->read_lines_from_range(
764
3.22k
                scan_range_desc, read_ids, &scan_blocks[idx], external_info,
765
3.22k
                &fetch_statistics[idx].init_reader_ms, &fetch_statistics[idx].get_block_ms));
766
3.22k
    }
767
768
3.22k
    auto file_read_bytes_counter =
769
3.22k
            sub_runtime_profile->get_counter(FileScanner::FileReadBytesProfile);
770
771
3.22k
    if (file_read_bytes_counter != nullptr) {
772
3.22k
        fetch_statistics[idx].file_read_bytes = PrettyPrinter::print(
773
3.22k
                file_read_bytes_counter->value(), file_read_bytes_counter->type());
774
3.22k
    }
775
776
3.22k
    auto file_read_times_counter =
777
3.22k
            sub_runtime_profile->get_counter(FileScanner::FileReadTimeProfile);
778
3.22k
    if (file_read_times_counter != nullptr) {
779
3.22k
        fetch_statistics[idx].file_read_times = PrettyPrinter::print(
780
3.22k
                file_read_times_counter->value(), file_read_times_counter->type());
781
3.22k
    }
782
783
3.22k
    semaphore.release();
784
3.22k
    if (++producer_count == scan_rows_count) {
785
2.06k
        std::lock_guard<std::mutex> lock(mtx);
786
2.06k
        cv.notify_one();
787
2.06k
    }
788
3.22k
    return Status::OK();
789
3.22k
}
790
791
Status RowIdStorageReader::read_batch_external_row(
792
        const uint64_t workload_group_id, const PRequestBlockDesc& request_block_desc,
793
        std::shared_ptr<IdFileMap> id_file_map, std::vector<SlotDescriptor>& slots,
794
        std::shared_ptr<FileMapping> first_file_mapping, const TUniqueId& query_id,
795
        Block& result_block, PRuntimeProfileTree* pprofile, int64_t* init_reader_avg_ms,
796
2.06k
        int64_t* get_block_avg_ms, size_t* scan_range_cnt) {
797
2.06k
    TFileScanRangeParams rpc_scan_params;
798
2.06k
    TupleDescriptor tuple_desc(request_block_desc.desc(), false);
799
2.06k
    std::unordered_map<std::string, int> colname_to_slot_id;
800
2.06k
    std::shared_ptr<RuntimeState> runtime_state = nullptr;
801
802
2.06k
    int max_file_scanners = 0;
803
2.06k
    {
804
2.06k
        if (result_block.is_empty_column()) [[likely]] {
805
2.06k
            result_block = Block(slots, request_block_desc.row_id_size());
806
2.06k
        }
807
808
2.06k
        auto& external_info = first_file_mapping->get_external_file_info();
809
2.06k
        int plan_node_id = external_info.plan_node_id;
810
2.06k
        const auto& first_scan_range_desc = external_info.scan_range_desc;
811
812
2.06k
        DCHECK(id_file_map->get_external_scan_params().contains(plan_node_id));
813
2.06k
        const auto* old_scan_params = &(id_file_map->get_external_scan_params().at(plan_node_id));
814
2.06k
        rpc_scan_params = *old_scan_params;
815
816
2.06k
        rpc_scan_params.required_slots.clear();
817
2.06k
        rpc_scan_params.column_idxs.clear();
818
2.06k
        rpc_scan_params.slot_name_to_schema_pos.clear();
819
820
2.06k
        std::set partition_name_set(first_scan_range_desc.columns_from_path_keys.begin(),
821
2.06k
                                    first_scan_range_desc.columns_from_path_keys.end());
822
11.2k
        for (auto slot_idx = 0; slot_idx < slots.size(); ++slot_idx) {
823
9.14k
            auto& slot = slots[slot_idx];
824
9.14k
            tuple_desc.add_slot(&slot);
825
9.14k
            colname_to_slot_id.emplace(slot.col_name(), slot.id());
826
9.14k
            TFileScanSlotInfo slot_info;
827
9.14k
            slot_info.slot_id = slot.id();
828
9.14k
            auto column_idx = request_block_desc.column_idxs(slot_idx);
829
9.14k
            if (partition_name_set.contains(slot.col_name())) {
830
                //This is partition column.
831
464
                slot_info.is_file_slot = false;
832
8.68k
            } else {
833
8.68k
                rpc_scan_params.column_idxs.emplace_back(column_idx);
834
8.68k
                slot_info.is_file_slot = true;
835
8.68k
            }
836
9.14k
            rpc_scan_params.default_value_of_src_slot.emplace(slot.id(), TExpr {});
837
9.14k
            rpc_scan_params.required_slots.emplace_back(slot_info);
838
9.14k
            rpc_scan_params.slot_name_to_schema_pos.emplace(slot.col_name(), column_idx);
839
9.14k
        }
840
841
2.06k
        const auto& query_options = id_file_map->get_query_options();
842
2.06k
        const auto& query_globals = id_file_map->get_query_globals();
843
        /*
844
         * The scan stage needs the information in query_options to generate different behaviors according to the specific variables:
845
         *  query_options.hive_parquet_use_column_names, query_options.truncate_char_or_varchar_columns,query_globals.time_zone ...
846
         *
847
         * To ensure the same behavior as the scan stage, I get query_options query_globals from id_file_map, then create runtime_state
848
         * and pass it to vfile_scanner so that the runtime_state information is the same as the scan stage and the behavior is also consistent.
849
         */
850
2.06k
        runtime_state = RuntimeState::create_shared(
851
2.06k
                query_id, -1, query_options, query_globals, ExecEnv::GetInstance(),
852
2.06k
                ExecEnv::GetInstance()->rowid_storage_reader_tracker());
853
854
2.06k
        max_file_scanners = id_file_map->get_max_file_scanners();
855
2.06k
    }
856
857
    // Hash(TFileRangeDesc) => { all the rows that need to be read and their positions in the result block. } +  file mapping
858
    // std::multimap<segment_v2::rowid_t, size_t> : The reason for using multimap is: may need the same row of data multiple times.
859
2.06k
    std::map<std::string,
860
2.06k
             std::pair<std::multimap<segment_v2::rowid_t, size_t>, std::shared_ptr<FileMapping>>>
861
2.06k
            scan_rows;
862
863
    // Block corresponding to the order of `scan_rows` map.
864
2.06k
    std::vector<Block> scan_blocks;
865
866
    // row_id (Indexing of vectors) => < In which block, which line in the block >
867
2.06k
    std::vector<std::pair<size_t, size_t>> row_id_block_idx;
868
869
    // Count the time/bytes it takes to read each TFileRangeDesc. (for profile)
870
2.06k
    std::vector<ExternalFetchStatistics> fetch_statistics;
871
872
13.3k
    auto hash_file_range = [](const TFileRangeDesc& file_range_desc) {
873
13.3k
        std::string value;
874
13.3k
        value.resize(file_range_desc.path.size() + sizeof(file_range_desc.start_offset));
875
13.3k
        auto* ptr = value.data();
876
877
13.3k
        memcpy(ptr, &file_range_desc.start_offset, sizeof(file_range_desc.start_offset));
878
13.3k
        ptr += sizeof(file_range_desc.start_offset);
879
13.3k
        memcpy(ptr, file_range_desc.path.data(), file_range_desc.path.size());
880
13.3k
        return value;
881
13.3k
    };
882
883
15.3k
    for (int j = 0; j < request_block_desc.row_id_size(); ++j) {
884
13.3k
        auto file_id = request_block_desc.file_id(j);
885
13.3k
        auto file_mapping = id_file_map->get_file_mapping(file_id);
886
13.3k
        if (!file_mapping) {
887
0
            return Status::InternalError(
888
0
                    "Backend:{} file_mapping not found, query_id: {}, file_id: {}",
889
0
                    BackendOptions::get_localhost(), print_id(query_id), file_id);
890
0
        }
891
892
13.3k
        const auto& external_info = file_mapping->get_external_file_info();
893
13.3k
        const auto& scan_range_desc = external_info.scan_range_desc;
894
895
13.3k
        auto scan_range_hash = hash_file_range(scan_range_desc);
896
13.3k
        if (scan_rows.contains(scan_range_hash)) {
897
10.0k
            scan_rows.at(scan_range_hash).first.emplace(request_block_desc.row_id(j), j);
898
10.0k
        } else {
899
3.22k
            std::multimap<segment_v2::rowid_t, size_t> tmp {{request_block_desc.row_id(j), j}};
900
3.22k
            scan_rows.emplace(scan_range_hash, std::make_pair(tmp, file_mapping));
901
3.22k
        }
902
13.3k
    }
903
904
2.06k
    scan_blocks.resize(scan_rows.size());
905
2.06k
    row_id_block_idx.resize(request_block_desc.row_id_size());
906
2.06k
    fetch_statistics.resize(scan_rows.size());
907
908
    // Get the workload group for subsequent scan task submission.
909
2.06k
    std::vector<uint64_t> workload_group_ids;
910
2.06k
    workload_group_ids.emplace_back(workload_group_id);
911
2.06k
    auto wg = ExecEnv::GetInstance()->workload_group_mgr()->get_group(workload_group_ids);
912
2.06k
    doris::TaskScheduler* exec_sched = nullptr;
913
2.06k
    ScannerScheduler* scan_sched = nullptr;
914
2.06k
    ScannerScheduler* remote_scan_sched = nullptr;
915
2.06k
    wg->get_query_scheduler(&exec_sched, &scan_sched, &remote_scan_sched);
916
2.06k
    DCHECK(remote_scan_sched);
917
918
2.06k
    int64_t scan_running_time = 0;
919
2.06k
    RETURN_IF_ERROR(scope_timer_run(
920
2.06k
            [&]() -> Status {
921
                // Make sure to insert data into result_block only after all scan tasks have been executed.
922
2.06k
                std::atomic<int> producer_count {0};
923
2.06k
                std::condition_variable cv;
924
2.06k
                std::mutex mtx;
925
926
                //semaphore: Limit the number of scan tasks submitted at one time
927
2.06k
                std::counting_semaphore semaphore {max_file_scanners};
928
929
2.06k
                size_t idx = 0;
930
2.06k
                for (const auto& [_, scan_info] : scan_rows) {
931
2.06k
                    semaphore.acquire();
932
2.06k
                    RETURN_IF_ERROR(remote_scan_sched->submit_scan_task(
933
2.06k
                            SimplifiedScanTask(
934
2.06k
                                    [&, idx, scan_info]() -> Status {
935
2.06k
                                        const auto& [row_ids, file_mapping] = scan_info;
936
2.06k
                                        return read_external_row_from_file_mapping(
937
2.06k
                                                idx, row_ids, file_mapping, slots, query_id,
938
2.06k
                                                runtime_state, scan_blocks, row_id_block_idx,
939
2.06k
                                                fetch_statistics, rpc_scan_params,
940
2.06k
                                                colname_to_slot_id, producer_count,
941
2.06k
                                                scan_rows.size(), semaphore, cv, mtx, tuple_desc);
942
2.06k
                                    },
943
2.06k
                                    nullptr, nullptr),
944
2.06k
                            fmt::format("{}-read_batch_external_row-{}", print_id(query_id), idx)));
945
2.06k
                    idx++;
946
2.06k
                }
947
948
2.06k
                {
949
2.06k
                    std::unique_lock<std::mutex> lock(mtx);
950
2.06k
                    cv.wait(lock, [&] { return producer_count == scan_rows.size(); });
951
2.06k
                }
952
2.06k
                return Status::OK();
953
2.06k
            },
954
2.06k
            &scan_running_time));
955
956
2.06k
    scatter_scan_blocks_to_result_block(row_id_block_idx, scan_blocks, result_block);
957
958
    // Statistical runtime profile information.
959
2.06k
    std::unique_ptr<RuntimeProfile> runtime_profile =
960
2.06k
            std::make_unique<RuntimeProfile>("ExternalRowIDFetcher");
961
2.06k
    {
962
2.06k
        runtime_profile->add_info_string(ScannersRunningTimeProfile,
963
2.06k
                                         std::to_string(scan_running_time) + "ms");
964
2.06k
        fmt::memory_buffer file_read_lines_buffer;
965
2.06k
        format_to(file_read_lines_buffer, "[");
966
2.06k
        fmt::memory_buffer file_read_bytes_buffer;
967
2.06k
        format_to(file_read_bytes_buffer, "[");
968
2.06k
        fmt::memory_buffer file_read_times_buffer;
969
2.06k
        format_to(file_read_times_buffer, "[");
970
971
2.06k
        size_t idx = 0;
972
3.22k
        for (const auto& [_, scan_info] : scan_rows) {
973
3.22k
            format_to(file_read_lines_buffer, "{}, ", scan_info.first.size());
974
3.22k
            *init_reader_avg_ms = fetch_statistics[idx].init_reader_ms;
975
3.22k
            *get_block_avg_ms += fetch_statistics[idx].get_block_ms;
976
3.22k
            format_to(file_read_bytes_buffer, "{}, ", fetch_statistics[idx].file_read_bytes);
977
3.22k
            format_to(file_read_times_buffer, "{}, ", fetch_statistics[idx].file_read_times);
978
3.22k
            idx++;
979
3.22k
        }
980
981
2.06k
        format_to(file_read_lines_buffer, "]");
982
2.06k
        format_to(file_read_bytes_buffer, "]");
983
2.06k
        format_to(file_read_times_buffer, "]");
984
985
2.06k
        *init_reader_avg_ms /= fetch_statistics.size();
986
2.06k
        *get_block_avg_ms /= fetch_statistics.size();
987
2.06k
        runtime_profile->add_info_string(InitReaderAvgTimeProfile,
988
2.06k
                                         std::to_string(*init_reader_avg_ms) + "ms");
989
2.06k
        runtime_profile->add_info_string(GetBlockAvgTimeProfile,
990
2.06k
                                         std::to_string(*init_reader_avg_ms) + "ms");
991
2.06k
        runtime_profile->add_info_string(FileReadLinesProfile,
992
2.06k
                                         fmt::to_string(file_read_lines_buffer));
993
2.06k
        runtime_profile->add_info_string(FileScanner::FileReadBytesProfile,
994
2.06k
                                         fmt::to_string(file_read_bytes_buffer));
995
2.06k
        runtime_profile->add_info_string(FileScanner::FileReadTimeProfile,
996
2.06k
                                         fmt::to_string(file_read_times_buffer));
997
2.06k
    }
998
999
2.06k
    runtime_profile->to_proto(pprofile, 2);
1000
1001
2.06k
    *scan_range_cnt = scan_rows.size();
1002
1003
2.06k
    return Status::OK();
1004
2.06k
}
1005
1006
Status RowIdStorageReader::read_doris_format_row(
1007
        const std::shared_ptr<IdFileMap>& id_file_map,
1008
        const std::shared_ptr<FileMapping>& file_mapping, const std::vector<uint32_t>& row_ids,
1009
        std::vector<SlotDescriptor>& slots, const TabletSchema& full_read_schema,
1010
        RowStoreReadStruct& row_store_read_struct, OlapReaderStatistics& stats,
1011
        int64_t* acquire_tablet_ms, int64_t* acquire_rowsets_ms, int64_t* acquire_segments_ms,
1012
        int64_t* lookup_row_data_ms, std::unordered_map<SegKey, SegItem, HashOfSegKey>& seg_map,
1013
        std::unordered_map<IteratorKey, IteratorItem, HashOfIteratorKey>& iterator_map,
1014
6
        Block& result_block) {
1015
6
    auto [tablet_id, rowset_id, segment_id] = file_mapping->get_doris_format_info();
1016
6
    SegKey seg_key {.tablet_id = tablet_id, .rowset_id = rowset_id, .segment_id = segment_id};
1017
1018
6
    BaseTabletSPtr tablet;
1019
6
    BetaRowsetSharedPtr rowset;
1020
6
    SegmentSharedPtr segment;
1021
6
    if (seg_map.find(seg_key) == seg_map.end()) {
1022
6
        tablet = scope_timer_run(
1023
6
                [&]() {
1024
6
                    auto res = ExecEnv::get_tablet(tablet_id);
1025
6
                    return !res.has_value() ? nullptr
1026
6
                                            : std::dynamic_pointer_cast<BaseTablet>(res.value());
1027
6
                },
1028
6
                acquire_tablet_ms);
1029
6
        if (!tablet) {
1030
0
            return Status::InternalError(
1031
0
                    "Backend:{} tablet not found, tablet_id: {}, rowset_id: {}, segment_id: {}, "
1032
0
                    "row_id: {}",
1033
0
                    BackendOptions::get_localhost(), tablet_id, rowset_id.to_string(), segment_id,
1034
0
                    row_ids[0]);
1035
0
        }
1036
1037
6
        rowset = std::static_pointer_cast<BetaRowset>(scope_timer_run(
1038
6
                [&]() { return id_file_map->get_temp_rowset(tablet_id, rowset_id); },
1039
6
                acquire_rowsets_ms));
1040
6
        if (!rowset) {
1041
0
            return Status::InternalError(
1042
0
                    "Backend:{} rowset_id not found, tablet_id: {}, rowset_id: {}, segment_id: {}, "
1043
0
                    "row_id: {}",
1044
0
                    BackendOptions::get_localhost(), tablet_id, rowset_id.to_string(), segment_id,
1045
0
                    row_ids[0]);
1046
0
        }
1047
1048
6
        SegmentCacheHandle segment_cache;
1049
6
        RETURN_IF_ERROR(scope_timer_run(
1050
6
                [&]() {
1051
6
                    return SegmentLoader::instance()->load_segments(rowset, &segment_cache, true);
1052
6
                },
1053
6
                acquire_segments_ms));
1054
1055
6
        auto it = std::find_if(segment_cache.get_segments().cbegin(),
1056
6
                               segment_cache.get_segments().cend(),
1057
6
                               [segment_id](const segment_v2::SegmentSharedPtr& seg) {
1058
6
                                   return seg->id() == segment_id;
1059
6
                               });
1060
6
        if (it == segment_cache.get_segments().end()) {
1061
0
            return Status::InternalError(
1062
0
                    "Backend:{} segment not found, tablet_id: {}, rowset_id: {}, segment_id: {}, "
1063
0
                    "row_id: {}",
1064
0
                    BackendOptions::get_localhost(), tablet_id, rowset_id.to_string(), segment_id,
1065
0
                    row_ids[0]);
1066
0
        }
1067
6
        segment = *it;
1068
6
        seg_map[seg_key] = SegItem {.tablet = tablet, .rowset = rowset, .segment = segment};
1069
6
    } else {
1070
0
        auto& seg_item = seg_map[seg_key];
1071
0
        tablet = seg_item.tablet;
1072
0
        rowset = seg_item.rowset;
1073
0
        segment = seg_item.segment;
1074
0
    }
1075
1076
    // if row_store_read_struct not empty, means the line we should read from row_store
1077
6
    if (!row_store_read_struct.default_values.empty()) {
1078
0
        if (!tablet->tablet_schema()->has_row_store_for_all_columns()) {
1079
0
            return Status::InternalError("Tablet {} does not have row store for all columns",
1080
0
                                         tablet->tablet_id());
1081
0
        }
1082
0
        auto result_columns_guard = result_block.mutate_columns_scoped();
1083
0
        MutableColumns& result_columns = result_columns_guard.mutable_columns();
1084
0
        for (auto row_id : row_ids) {
1085
0
            RowLocation loc(rowset_id, segment->id(), cast_set<uint32_t>(row_id));
1086
0
            row_store_read_struct.row_store_buffer.clear();
1087
0
            RETURN_IF_ERROR(scope_timer_run(
1088
0
                    [&]() {
1089
0
                        return tablet->lookup_row_data({}, loc, rowset, stats,
1090
0
                                                       row_store_read_struct.row_store_buffer);
1091
0
                    },
1092
0
                    lookup_row_data_ms));
1093
1094
0
            RETURN_IF_ERROR(JsonbSerializeUtil::jsonb_to_columns(
1095
0
                    row_store_read_struct.serdes, row_store_read_struct.row_store_buffer.data(),
1096
0
                    row_store_read_struct.row_store_buffer.size(),
1097
0
                    row_store_read_struct.col_uid_to_idx, result_columns,
1098
0
                    row_store_read_struct.default_values, {}));
1099
0
        }
1100
6
    } else {
1101
12
        for (int x = 0; x < slots.size(); ++x) {
1102
6
            auto column_guard = result_block.mutate_column_scoped(x);
1103
6
            MutableColumnPtr& column = column_guard.mutable_column();
1104
6
            IteratorKey iterator_key {.tablet_id = tablet_id,
1105
6
                                      .rowset_id = rowset_id,
1106
6
                                      .segment_id = segment_id,
1107
6
                                      .slot_id = slots[x].id()};
1108
6
            IteratorItem& iterator_item = iterator_map[iterator_key];
1109
6
            if (iterator_item.segment == nullptr) {
1110
6
                iterator_map[iterator_key].segment = segment;
1111
6
                iterator_item.storage_read_options.stats = &stats;
1112
6
                iterator_item.storage_read_options.io_ctx.reader_type = ReaderType::READER_QUERY;
1113
6
            }
1114
6
            RETURN_IF_ERROR(segment->seek_and_read_by_rowid(
1115
6
                    full_read_schema, &slots[x], row_ids, column,
1116
6
                    iterator_item.storage_read_options, iterator_item.iterator));
1117
6
        }
1118
6
    }
1119
6
    return Status::OK();
1120
6
}
1121
1122
} // namespace doris