Coverage Report

Created: 2026-07-03 21:48

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/cache/peer_file_cache_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
#include "io/cache/peer_file_cache_reader.h"
18
19
#include <brpc/controller.h>
20
#include <butil/iobuf.h>
21
#include <bvar/latency_recorder.h>
22
#include <bvar/reducer.h>
23
#include <fmt/format.h>
24
#include <gen_cpp/internal_service.pb.h>
25
#include <glog/logging.h>
26
27
#include <algorithm>
28
#include <utility>
29
30
#include "common/compiler_util.h" // IWYU pragma: keep
31
#include "common/metrics/doris_metrics.h"
32
#include "runtime/exec_env.h"
33
#include "runtime/runtime_profile.h"
34
#include "runtime/thread_context.h"
35
#include "util/brpc_client_cache.h"
36
#include "util/bvar_helper.h"
37
#include "util/debug_points.h"
38
#include "util/defer_op.h"
39
#include "util/network_util.h"
40
41
namespace doris::io {
42
43
namespace {
44
45
struct ExpectedPeerFetch {
46
    std::vector<FileBlock::Range> expected_ranges;
47
    std::vector<FileBlock::Range> pending_ranges;
48
    std::vector<size_t> expected_block_indexes;
49
    size_t expected_bytes = 0;
50
};
51
52
91
size_t clip_requested_range(const FileBlock::Range& range, size_t file_size) {
53
91
    if (range.left >= file_size) {
54
2
        return 0;
55
2
    }
56
89
    return std::min(file_size - range.left, range.size());
57
91
}
58
59
ExpectedPeerFetch build_expected_peer_fetch(const std::vector<FileBlockSPtr>& blocks,
60
40
                                            size_t file_size, PFetchPeerDataRequest* req) {
61
40
    ExpectedPeerFetch expected;
62
131
    for (size_t block_idx = 0; block_idx < blocks.size(); ++block_idx) {
63
91
        const auto& blk = blocks[block_idx];
64
91
        auto* cb = req->add_cache_req();
65
91
        cb->set_block_offset(static_cast<int64_t>(blk->range().left));
66
91
        cb->set_block_size(static_cast<int64_t>(blk->range().size()));
67
91
        const size_t clipped_size = clip_requested_range(blk->range(), file_size);
68
91
        if (clipped_size == 0) {
69
2
            continue;
70
2
        }
71
89
        expected.expected_block_indexes.push_back(block_idx);
72
89
        expected.expected_ranges.emplace_back(blk->range().left,
73
89
                                              blk->range().left + clipped_size - 1);
74
89
        expected.pending_ranges.emplace_back(expected.expected_ranges.back());
75
89
        expected.expected_bytes += clipped_size;
76
89
    }
77
40
    return expected;
78
40
}
79
80
18
Status cut_attachment_payload(butil::IOBuf* attachment, size_t size, butil::IOBuf* out) {
81
18
    const size_t cut_size = attachment->cutn(out, size);
82
18
    if (cut_size != size) {
83
1
        return Status::InternalError<false>(
84
1
                "peer cache read incomplete attachment: need={}, got={}", size, cut_size);
85
1
    }
86
17
    return Status::OK();
87
18
}
88
89
bool subtract_pending_range(std::vector<FileBlock::Range>& pending_ranges,
90
57
                            const FileBlock::Range& response_range) {
91
58
    for (size_t idx = 0; idx < pending_ranges.size(); ++idx) {
92
57
        auto pending_range = pending_ranges[idx];
93
57
        if (response_range.left < pending_range.left ||
94
57
            response_range.right > pending_range.right) {
95
1
            continue;
96
1
        }
97
98
56
        if (response_range.left == pending_range.left &&
99
56
            response_range.right == pending_range.right) {
100
55
            pending_ranges.erase(pending_ranges.begin() + idx);
101
55
        } else if (response_range.left == pending_range.left) {
102
1
            pending_ranges[idx].left = response_range.right + 1;
103
1
        } else if (response_range.right == pending_range.right) {
104
0
            pending_ranges[idx].right = response_range.left - 1;
105
0
        } else {
106
0
            auto right_remain = FileBlock::Range(response_range.right + 1, pending_range.right);
107
0
            pending_ranges[idx].right = response_range.left - 1;
108
0
            pending_ranges.insert(pending_ranges.begin() + idx + 1, right_remain);
109
0
        }
110
56
        return true;
111
57
    }
112
1
    return false;
113
57
}
114
115
int find_expected_range_idx(const std::vector<FileBlock::Range>& expected_ranges,
116
58
                            const FileBlock::Range& response_range) {
117
98
    for (size_t idx = 0; idx < expected_ranges.size(); ++idx) {
118
97
        const auto& expected_range = expected_ranges[idx];
119
97
        if (response_range.left >= expected_range.left &&
120
97
            response_range.right <= expected_range.right) {
121
57
            return static_cast<int>(idx);
122
57
        }
123
97
    }
124
1
    return -1;
125
58
}
126
127
} // namespace
128
129
// read from peer
130
131
bvar::Adder<uint64_t> peer_cache_reader_failed_counter("peer_cache_reader", "failed_counter");
132
bvar::Adder<uint64_t> peer_cache_reader_succ_counter("peer_cache_reader", "succ_counter");
133
bvar::LatencyRecorder peer_bytes_per_read("peer_cache_reader", "bytes_per_read"); // also QPS
134
bvar::Adder<uint64_t> peer_cache_reader_total("peer_cache_reader", "total_num");
135
bvar::Adder<uint64_t> peer_cache_being_read("peer_cache_reader", "file_being_read");
136
bvar::Adder<uint64_t> peer_cache_reader_read_counter("peer_cache_reader", "read_at");
137
bvar::LatencyRecorder peer_cache_reader_latency("peer_cache_reader", "peer_latency");
138
bvar::PerSecond<bvar::Adder<uint64_t>> peer_get_request_qps("peer_cache_reader", "peer_get_request",
139
                                                            &peer_cache_reader_read_counter);
140
bvar::Adder<uint64_t> peer_bytes_read_total("peer_cache_reader", "bytes_read");
141
bvar::PerSecond<bvar::Adder<uint64_t>> peer_read_througthput("peer_cache_reader",
142
                                                             "peer_read_throughput",
143
                                                             &peer_bytes_read_total);
144
145
PeerFileCacheReader::PeerFileCacheReader(const io::Path& file_path, bool is_doris_table,
146
                                         std::string host, int port)
147
42
        : _path(file_path), _is_doris_table(is_doris_table), _host(host), _port(port) {
148
42
    peer_cache_reader_total << 1;
149
42
    peer_cache_being_read << 1;
150
42
}
151
152
42
PeerFileCacheReader::~PeerFileCacheReader() {
153
42
    peer_cache_being_read << -1;
154
42
}
155
156
Status PeerFileCacheReader::fetch_blocks(const std::vector<FileBlockSPtr>& blocks,
157
                                         PeerFetchResult* result, size_t file_size,
158
                                         const IOContext* ctx, bool request_fill, int64_t tablet_id,
159
42
                                         std::string resource_id) {
160
42
    (void)ctx;
161
42
    if (result == nullptr) {
162
0
        return Status::InvalidArgument("peer cache fetch requires non-null result");
163
0
    }
164
42
    result->clear();
165
42
    VLOG_DEBUG << "enter PeerFileCacheReader::fetch_blocks";
166
42
    if (blocks.empty()) {
167
1
        return Status::OK();
168
1
    }
169
41
    if (!_is_doris_table) {
170
1
        return Status::NotSupported<false>("peer cache fetch only supports doris table segments");
171
1
    }
172
173
40
    PFetchPeerDataRequest req;
174
40
    req.set_type(PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK);
175
40
    req.set_path(_path.native());
176
40
    req.set_file_size(static_cast<int64_t>(file_size));
177
40
    auto* rowset_meta_pb = req.mutable_rowset_meta();
178
40
    rowset_meta_pb->Clear();
179
    // RowsetMetaPB still has deprecated proto2 required rowset_id. Set a dummy value so
180
    // the RPC can be serialized; current peer read/fill paths only read tablet_id/resource_id.
181
40
    rowset_meta_pb->set_rowset_id(0);
182
40
    rowset_meta_pb->set_resource_id(resource_id);
183
40
    rowset_meta_pb->set_tablet_id(tablet_id);
184
40
    if (request_fill) {
185
        // Ask the peer server to pull missing blocks from remote storage before serving them.
186
        // Only set for cross-CG reads targeting the designated fill compute group
187
        // (peer_cache_fill_compute_group_id). Server still gates with enable_peer_server_cache_fill.
188
2
        req.set_request_cache_fill(true);
189
2
    }
190
    // Always advertise attachment support. Older peers can still reply in protobuf mode.
191
40
    req.set_support_attachment(true);
192
40
    auto expected = build_expected_peer_fetch(blocks, file_size, &req);
193
40
    if (expected.expected_bytes == 0) {
194
1
        return Status::OK();
195
1
    }
196
197
39
    std::string realhost = _host;
198
39
    int port = _port;
199
200
39
    auto dns_cache = ExecEnv::GetInstance()->dns_cache();
201
39
    if (dns_cache == nullptr) {
202
39
        LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
203
39
    } else if (!is_valid_ip(realhost)) {
204
0
        Status status = dns_cache->get(_host, &realhost);
205
0
        if (!status.ok()) {
206
0
            peer_cache_reader_failed_counter << 1;
207
0
            LOG(WARNING) << "failed to get ip from host " << _host << ": " << status.to_string();
208
0
            return Status::InternalError<false>("failed to get ip from host {}", _host);
209
0
        }
210
0
    }
211
39
    std::string brpc_addr = get_host_port(realhost, port);
212
39
    Status st = Status::OK();
213
39
    std::shared_ptr<PBackendService_Stub> brpc_stub =
214
39
            ExecEnv::GetInstance()->brpc_internal_client_cache()->get_new_client_no_cache(
215
39
                    brpc_addr);
216
39
    if (!brpc_stub) {
217
0
        peer_cache_reader_failed_counter << 1;
218
0
        LOG(WARNING) << "failed to get brpc stub " << brpc_addr;
219
0
        st = Status::RpcError<false>("Address {} is wrong", brpc_addr);
220
0
        return st;
221
0
    }
222
223
39
    size_t filled = 0;
224
39
    size_t* bytes_read = &filled;
225
39
    LIMIT_REMOTE_SCAN_IO(bytes_read);
226
39
    int64_t begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
227
39
                               std::chrono::system_clock::now().time_since_epoch())
228
39
                               .count();
229
39
    Defer defer_latency {[&]() {
230
39
        int64_t end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
231
39
                                 std::chrono::system_clock::now().time_since_epoch())
232
39
                                 .count();
233
39
        peer_cache_reader_latency << (end_ts - begin_ts);
234
39
    }};
235
236
39
    brpc::Controller cntl;
237
    // Use a longer timeout when fill is requested: server may spend up to
238
    // peer_server_cache_fill_timeout_ms (default 6000ms) pulling from S3 before responding.
239
39
    cntl.set_timeout_ms(request_fill ? 7000 : 5000);
240
39
    PFetchPeerDataResponse resp;
241
39
    peer_cache_reader_read_counter << 1;
242
39
    brpc_stub->fetch_peer_data(&cntl, &req, &resp, nullptr);
243
39
    if (cntl.Failed()) {
244
1
        return Status::RpcError<false>(cntl.ErrorText());
245
1
    }
246
38
    if (resp.has_status()) {
247
38
        Status st2 = Status::create<false>(resp.status());
248
38
        LOG_EVERY_N(WARNING, 1000) << "peer cache read failed, status=" << st2.msg();
249
38
        if (!st2.ok()) return st2;
250
38
    }
251
252
    // Metadata stays in resp.datas(); payload may come from protobuf bytes or BRPC attachment.
253
28
    const bool use_attachment = resp.has_data_in_attachment() && resp.data_in_attachment();
254
28
    butil::IOBuf remaining_attachment(cntl.response_attachment());
255
28
    result->chunks.reserve(resp.datas_size());
256
60
    for (const auto& data : resp.datas()) {
257
60
        if (data.block_offset() < 0 || data.block_size() < 0) {
258
1
            peer_cache_reader_failed_counter << 1;
259
1
            result->clear();
260
1
            return Status::InternalError<false>(
261
1
                    "peer cache read invalid block metadata: offset={}, size={}",
262
1
                    data.block_offset(), data.block_size());
263
1
        }
264
59
        const size_t block_off = static_cast<size_t>(data.block_offset());
265
59
        const size_t payload_size =
266
59
                use_attachment ? static_cast<size_t>(data.block_size()) : data.data().size();
267
59
        if (payload_size == 0) {
268
1
            continue;
269
1
        }
270
58
        const auto response_range = FileBlock::Range(block_off, block_off + payload_size - 1);
271
58
        const int expected_idx = find_expected_range_idx(expected.expected_ranges, response_range);
272
58
        if (expected_idx < 0) {
273
1
            peer_cache_reader_failed_counter << 1;
274
1
            result->clear();
275
1
            return Status::InternalError<false>(
276
1
                    "peer cache read block out of requested ranges: off={}, size={}", block_off,
277
1
                    payload_size);
278
1
        }
279
        // Attachment payload is a single byte stream. Consume it in resp.datas() order so the
280
        // peer can split or reorder requested ranges without forcing a fallback.
281
57
        if (!subtract_pending_range(expected.pending_ranges, response_range)) {
282
1
            peer_cache_reader_failed_counter << 1;
283
1
            result->clear();
284
1
            return Status::InternalError<false>("peer cache read unexpected block range: [{}, {}]",
285
1
                                                response_range.left, response_range.right);
286
1
        }
287
288
56
        PeerFetchChunk chunk;
289
56
        chunk.block_index = expected.expected_block_indexes[expected_idx];
290
56
        chunk.block_offset = response_range.left;
291
56
        VLOG_DEBUG << "peer cache read data=" << data.block_offset() << " size=" << payload_size
292
0
                   << " block_idx=" << chunk.block_index;
293
56
        if (use_attachment) {
294
18
            auto cut_st =
295
18
                    cut_attachment_payload(&remaining_attachment, payload_size, &chunk.payload);
296
18
            if (!cut_st.ok()) {
297
1
                peer_cache_reader_failed_counter << 1;
298
1
                result->clear();
299
1
                return cut_st;
300
1
            }
301
38
        } else if (chunk.payload.append(data.data().data(), payload_size) != 0) {
302
0
            peer_cache_reader_failed_counter << 1;
303
0
            result->clear();
304
0
            return Status::InternalError<false>(
305
0
                    "failed to append protobuf payload into iobuf: size={}", payload_size);
306
0
        }
307
55
        filled += payload_size;
308
55
        result->chunks.emplace_back(std::move(chunk));
309
55
    }
310
24
    VLOG_DEBUG << "peer cache read filled=" << filled;
311
    // Sparse reads are complete only when all requested block ranges are covered exactly.
312
24
    if (!expected.pending_ranges.empty() || filled != expected.expected_bytes ||
313
24
        (use_attachment && !remaining_attachment.empty())) {
314
2
        peer_cache_reader_failed_counter << 1;
315
2
        result->clear();
316
2
        return Status::InternalError<false>(
317
2
                "peer cache read incomplete: need={}, got={}, attachment_left={}",
318
2
                expected.expected_bytes, filled, remaining_attachment.size());
319
2
    }
320
22
    peer_bytes_read_total << filled;
321
22
    peer_bytes_per_read << filled;
322
22
    peer_cache_reader_succ_counter << 1;
323
22
    result->bytes_read = filled;
324
22
    return Status::OK();
325
24
}
326
327
} // namespace doris::io