Coverage Report

Created: 2026-03-16 21:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/cache/peer_file_cache_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
#include "io/cache/peer_file_cache_reader.h"
18
19
#include <brpc/controller.h>
20
#include <bvar/latency_recorder.h>
21
#include <bvar/reducer.h>
22
#include <fmt/format.h>
23
#include <gen_cpp/internal_service.pb.h>
24
#include <glog/logging.h>
25
26
#include <algorithm>
27
#include <utility>
28
29
#include "common/compiler_util.h" // IWYU pragma: keep
30
#include "common/metrics/doris_metrics.h"
31
#include "runtime/exec_env.h"
32
#include "runtime/runtime_profile.h"
33
#include "runtime/thread_context.h"
34
#include "util/brpc_client_cache.h"
35
#include "util/bvar_helper.h"
36
#include "util/debug_points.h"
37
#include "util/defer_op.h"
38
#include "util/network_util.h"
39
40
namespace doris::io {
41
// read from peer
42
43
bvar::Adder<uint64_t> peer_cache_reader_failed_counter("peer_cache_reader", "failed_counter");
44
bvar::Adder<uint64_t> peer_cache_reader_succ_counter("peer_cache_reader", "succ_counter");
45
bvar::LatencyRecorder peer_bytes_per_read("peer_cache_reader", "bytes_per_read"); // also QPS
46
bvar::Adder<uint64_t> peer_cache_reader_total("peer_cache_reader", "total_num");
47
bvar::Adder<uint64_t> peer_cache_being_read("peer_cache_reader", "file_being_read");
48
bvar::Adder<uint64_t> peer_cache_reader_read_counter("peer_cache_reader", "read_at");
49
bvar::LatencyRecorder peer_cache_reader_latency("peer_cache_reader", "peer_latency");
50
bvar::PerSecond<bvar::Adder<uint64_t>> peer_get_request_qps("peer_cache_reader", "peer_get_request",
51
                                                            &peer_cache_reader_read_counter);
52
bvar::Adder<uint64_t> peer_bytes_read_total("peer_cache_reader", "bytes_read");
53
bvar::PerSecond<bvar::Adder<uint64_t>> peer_read_througthput("peer_cache_reader",
54
                                                             "peer_read_throughput",
55
                                                             &peer_bytes_read_total);
56
57
PeerFileCacheReader::PeerFileCacheReader(const io::Path& file_path, bool is_doris_table,
58
                                         std::string host, int port)
59
0
        : _path(file_path), _is_doris_table(is_doris_table), _host(host), _port(port) {
60
0
    peer_cache_reader_total << 1;
61
0
    peer_cache_being_read << 1;
62
0
}
63
64
0
PeerFileCacheReader::~PeerFileCacheReader() {
65
0
    peer_cache_being_read << -1;
66
0
}
67
68
Status PeerFileCacheReader::fetch_blocks(const std::vector<FileBlockSPtr>& blocks, size_t off,
69
                                         Slice s, size_t* bytes_read, size_t file_size,
70
0
                                         const IOContext* ctx) {
71
0
    VLOG_DEBUG << "enter PeerFileCacheReader::fetch_blocks, off=" << off
72
0
               << " bytes_read=" << *bytes_read;
73
0
    if (blocks.empty()) {
74
0
        *bytes_read = 0;
75
0
        return Status::OK();
76
0
    }
77
0
    if (!_is_doris_table) {
78
0
        return Status::NotSupported<false>("peer cache fetch only supports doris table segments");
79
0
    }
80
81
0
    PFetchPeerDataRequest req;
82
0
    req.set_type(PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK);
83
0
    req.set_path(_path.filename().native());
84
0
    req.set_file_size(static_cast<int64_t>(file_size));
85
0
    for (const auto& blk : blocks) {
86
0
        auto* cb = req.add_cache_req();
87
0
        cb->set_block_offset(static_cast<int64_t>(blk->range().left));
88
0
        cb->set_block_size(static_cast<int64_t>(blk->range().size()));
89
0
    }
90
91
0
    std::string realhost = _host;
92
0
    int port = _port;
93
94
0
    auto dns_cache = ExecEnv::GetInstance()->dns_cache();
95
0
    if (dns_cache == nullptr) {
96
0
        LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
97
0
    } else if (!is_valid_ip(realhost)) {
98
0
        Status status = dns_cache->get(_host, &realhost);
99
0
        if (!status.ok()) {
100
0
            peer_cache_reader_failed_counter << 1;
101
0
            LOG(WARNING) << "failed to get ip from host " << _host << ": " << status.to_string();
102
0
            return Status::InternalError<false>("failed to get ip from host {}", _host);
103
0
        }
104
0
    }
105
0
    std::string brpc_addr = get_host_port(realhost, port);
106
0
    Status st = Status::OK();
107
0
    std::shared_ptr<PBackendService_Stub> brpc_stub =
108
0
            ExecEnv::GetInstance()->brpc_internal_client_cache()->get_new_client_no_cache(
109
0
                    brpc_addr);
110
0
    if (!brpc_stub) {
111
0
        peer_cache_reader_failed_counter << 1;
112
0
        LOG(WARNING) << "failed to get brpc stub " << brpc_addr;
113
0
        st = Status::RpcError<false>("Address {} is wrong", brpc_addr);
114
0
        return st;
115
0
    }
116
0
    LIMIT_REMOTE_SCAN_IO(bytes_read);
117
0
    int64_t begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
118
0
                               std::chrono::system_clock::now().time_since_epoch())
119
0
                               .count();
120
0
    Defer defer_latency {[&]() {
121
0
        int64_t end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
122
0
                                 std::chrono::system_clock::now().time_since_epoch())
123
0
                                 .count();
124
0
        peer_cache_reader_latency << (end_ts - begin_ts);
125
0
    }};
126
127
0
    brpc::Controller cntl;
128
0
    cntl.set_timeout_ms(5000);
129
0
    PFetchPeerDataResponse resp;
130
0
    peer_cache_reader_read_counter << 1;
131
0
    brpc_stub->fetch_peer_data(&cntl, &req, &resp, nullptr);
132
0
    if (cntl.Failed()) {
133
0
        return Status::RpcError<false>(cntl.ErrorText());
134
0
    }
135
0
    if (resp.has_status()) {
136
0
        Status st2 = Status::create<false>(resp.status());
137
0
        LOG_EVERY_N(WARNING, 1000) << "peer cache read failed, status=" << st2.msg();
138
0
        if (!st2.ok()) return st2;
139
0
    }
140
141
0
    size_t filled = 0;
142
0
    for (const auto& data : resp.datas()) {
143
0
        if (data.data().empty()) {
144
0
            peer_cache_reader_failed_counter << 1;
145
0
            LOG(WARNING) << "peer cache read empty data" << data.block_offset();
146
0
            return Status::InternalError<false>("peer cache read empty data");
147
0
        }
148
0
        int64_t block_off = data.block_offset();
149
0
        size_t rel = block_off > static_cast<int64_t>(off)
150
0
                             ? static_cast<size_t>(block_off - static_cast<int64_t>(off))
151
0
                             : 0;
152
0
        size_t can_copy = std::min(s.size - rel, static_cast<size_t>(data.data().size()));
153
0
        VLOG_DEBUG << "peer cache read data=" << data.block_offset()
154
0
                   << " size=" << data.data().size() << " off=" << rel << " can_copy=" << can_copy;
155
0
        std::memcpy(s.data + rel, data.data().data(), can_copy);
156
0
        filled += can_copy;
157
0
    }
158
0
    VLOG_DEBUG << "peer cache read filled=" << filled;
159
0
    peer_bytes_read_total << filled;
160
0
    peer_bytes_per_read << filled;
161
0
    if (filled != s.size) {
162
0
        peer_cache_reader_failed_counter << 1;
163
0
        return Status::InternalError<false>("peer cache read incomplete: need={}, got={}", s.size,
164
0
                                            filled);
165
0
    }
166
0
    peer_cache_reader_succ_counter << 1;
167
0
    *bytes_read = filled;
168
0
    return Status::OK();
169
0
}
170
171
} // namespace doris::io