Coverage Report

Created: 2026-07-01 22:22

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/cache/block_file_cache_downloader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCacheFactory.h
19
// and modified by Doris
20
21
#include "io/cache/block_file_cache_downloader.h"
22
23
#include <bvar/bvar.h>
24
#include <fmt/core.h>
25
#include <gen_cpp/internal_service.pb.h>
26
27
#include <memory>
28
#include <mutex>
29
#include <unordered_set>
30
#include <variant>
31
32
#include "cloud/cloud_tablet_mgr.h"
33
#include "common/config.h"
34
#include "common/logging.h"
35
#include "cpp/sync_point.h"
36
#include "io/fs/file_reader.h"
37
#include "io/io_common.h"
38
#include "storage/rowset/beta_rowset.h"
39
#include "util/bvar_helper.h"
40
41
namespace doris::io {
42
43
bvar::Adder<uint64_t> g_file_cache_download_submitted_size("file_cache_download_submitted_size");
44
bvar::Adder<uint64_t> g_file_cache_download_finished_size("file_cache_download_finished_size");
45
bvar::Adder<uint64_t> g_file_cache_download_submitted_num("file_cache_download_submitted_num");
46
bvar::Adder<uint64_t> g_file_cache_download_finished_num("file_cache_download_finished_num");
47
bvar::Adder<uint64_t> g_file_cache_download_failed_num("file_cache_download_failed_num");
48
bvar::Adder<uint64_t> block_file_cache_downloader_task_total("file_cache_downloader_queue_total");
49
50
5
FileCacheBlockDownloader::FileCacheBlockDownloader(CloudStorageEngine& engine) : _engine(engine) {
51
5
    _poller = std::thread(&FileCacheBlockDownloader::polling_download_task, this);
52
5
    auto st = ThreadPoolBuilder("FileCacheBlockDownloader")
53
5
                      .set_min_threads(config::file_cache_downloader_thread_num_min)
54
5
                      .set_max_threads(config::file_cache_downloader_thread_num_max)
55
5
                      .build(&_workers);
56
5
    CHECK(st.ok()) << "failed to create FileCacheBlockDownloader";
57
5
}
58
59
5
FileCacheBlockDownloader::~FileCacheBlockDownloader() {
60
5
    {
61
5
        std::lock_guard lock(_mtx);
62
5
        _closed = true;
63
5
    }
64
5
    _empty.notify_all();
65
66
5
    if (_poller.joinable()) {
67
5
        _poller.join();
68
5
    }
69
70
5
    if (_workers) {
71
5
        _workers->shutdown();
72
5
    }
73
5
}
74
75
4
void FileCacheBlockDownloader::submit_download_task(DownloadTask task) {
76
4
    if (!config::enable_file_cache) [[unlikely]] {
77
0
        LOG(INFO) << "Skip submit download file task because file cache is not enabled";
78
0
        return;
79
0
    }
80
81
4
    if (task.task_message.index() == 0) { // download file cache block task
82
4
        std::lock_guard lock(_inflight_mtx);
83
10
        for (auto& meta : std::get<0>(task.task_message)) {
84
10
            ++_inflight_tablets[meta.tablet_id()];
85
10
            if (meta.size() > 0) {
86
10
                g_file_cache_download_submitted_size << meta.size();
87
10
            }
88
10
            LOG(INFO) << "submit_download_task: inflight_tablets[" << meta.tablet_id()
89
10
                      << "] = " << _inflight_tablets[meta.tablet_id()];
90
10
        }
91
4
    } else {
92
0
        int64_t download_size = std::get<1>(task.task_message).download_size;
93
0
        if (download_size > 0) {
94
0
            g_file_cache_download_submitted_size << download_size;
95
0
        }
96
0
    }
97
98
4
    {
99
4
        std::lock_guard lock(_mtx);
100
4
        if (_task_queue.size() == _max_size) {
101
0
            if (_task_queue.front().task_message.index() == 1) { // download segment file task
102
0
                auto& download_file_meta = std::get<1>(_task_queue.front().task_message);
103
0
                if (download_file_meta.download_done) {
104
0
                    download_file_meta.download_done(
105
0
                            Status::InternalError("The downloader queue is full"));
106
0
                }
107
0
                g_file_cache_download_failed_num << 1;
108
0
            }
109
0
            LOG(INFO) << "submit_download_task: task queue full, pop front";
110
0
            _task_queue.pop_front(); // Eliminate the earliest task in the queue
111
0
            block_file_cache_downloader_task_total << -1;
112
0
        }
113
4
        VLOG_DEBUG << "submit_download_task: push task, queue size before push: "
114
0
                   << _task_queue.size();
115
4
        _task_queue.push_back(std::move(task));
116
4
        block_file_cache_downloader_task_total << 1;
117
4
        _empty.notify_all();
118
4
    }
119
4
    g_file_cache_download_submitted_num << 1;
120
4
}
121
122
5
void FileCacheBlockDownloader::polling_download_task() {
123
5
    constexpr int64_t hot_interval = 2 * 60 * 60; // 2 hours
124
9
    while (!_closed) {
125
9
        DownloadTask task;
126
9
        {
127
9
            std::unique_lock lock(_mtx);
128
18
            _empty.wait(lock, [this]() { return !_task_queue.empty() || _closed; });
129
9
            if (_closed) {
130
5
                LOG(INFO) << "polling_download_task: downloader closed, exit polling";
131
5
                break;
132
5
            }
133
134
4
            task = std::move(_task_queue.front());
135
4
            _task_queue.pop_front();
136
4
            block_file_cache_downloader_task_total << -1;
137
4
            VLOG_DEBUG << "polling_download_task: pop task, queue size after pop: "
138
0
                       << _task_queue.size();
139
4
        }
140
141
4
        if (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() -
142
4
                                                             task.atime)
143
4
                    .count() < hot_interval) {
144
4
            VLOG_DEBUG << "polling_download_task: submit download_blocks to thread pool";
145
4
            auto st = _workers->submit_func(
146
4
                    [this, task_ = std::move(task)]() mutable { download_blocks(task_); });
147
4
            if (!st.ok()) {
148
0
                LOG(WARNING) << "submit download blocks failed: " << st;
149
0
            }
150
4
        }
151
4
    }
152
5
}
153
154
void FileCacheBlockDownloader::check_download_task(const std::vector<int64_t>& tablets,
155
10
                                                   std::map<int64_t, bool>* done) {
156
10
    std::lock_guard lock(_inflight_mtx);
157
18
    for (int64_t tablet_id : tablets) {
158
18
        done->insert({tablet_id, !_inflight_tablets.contains(tablet_id)});
159
18
    }
160
10
}
161
162
0
std::unordered_map<std::string, RowsetMetaSharedPtr> snapshot_rs_metas(BaseTablet* tablet) {
163
0
    std::unordered_map<std::string, RowsetMetaSharedPtr> id_to_rowset_meta_map;
164
0
    auto visitor = [&id_to_rowset_meta_map](const RowsetSharedPtr& r) {
165
0
        id_to_rowset_meta_map.emplace(r->rowset_meta()->rowset_id().to_string(), r->rowset_meta());
166
0
    };
167
0
    constexpr bool include_stale = false;
168
0
    tablet->traverse_rowsets(visitor, include_stale);
169
0
    return id_to_rowset_meta_map;
170
0
}
171
172
void FileCacheBlockDownloader::download_file_cache_block(
173
4
        const DownloadTask::FileCacheBlockMetaVec& metas) {
174
4
    std::unordered_set<int64_t> synced_tablets;
175
10
    std::ranges::for_each(metas, [&](const FileCacheBlockMeta& meta) {
176
10
        VLOG_DEBUG << "download_file_cache_block: start, tablet_id=" << meta.tablet_id()
177
0
                   << ", rowset_id=" << meta.rowset_id() << ", segment_id=" << meta.segment_id()
178
0
                   << ", offset=" << meta.offset() << ", size=" << meta.size()
179
0
                   << ", type=" << meta.cache_type();
180
181
        // Helper to decrease inflight count on early return.
182
        // NOTE: This lambda captures 'this' pointer. It's safe because:
183
        // 1. download_segment_file() calls download_done synchronously
184
        // 2. ~FileCacheBlockDownloader() waits for all workers to finish via _workers->shutdown()
185
        // If this assumption changes (e.g., async callback), consider using shared_from_this pattern.
186
10
        auto decrease_inflight_count = [this, tablet_id = meta.tablet_id()]() {
187
10
            std::lock_guard lock(_inflight_mtx);
188
10
            auto it = _inflight_tablets.find(tablet_id);
189
10
            if (it == _inflight_tablets.end()) {
190
0
                LOG(WARNING) << "inflight ref cnt not exist, tablet id " << tablet_id;
191
10
            } else {
192
10
                it->second--;
193
10
                VLOG_DEBUG << "download_file_cache_block: inflight_tablets[" << tablet_id
194
0
                           << "] = " << it->second;
195
10
                if (it->second <= 0) {
196
8
                    DCHECK_EQ(it->second, 0) << it->first;
197
8
                    _inflight_tablets.erase(it);
198
8
                    VLOG_DEBUG << "download_file_cache_block: erase inflight_tablets[" << tablet_id
199
0
                               << "]";
200
8
                }
201
10
            }
202
10
        };
203
204
10
        CloudTabletSPtr tablet;
205
10
        if (auto res = _engine.tablet_mgr().get_tablet(meta.tablet_id(), false); !res.has_value()) {
206
10
            LOG(INFO) << "failed to find tablet " << meta.tablet_id() << " : " << res.error();
207
10
            decrease_inflight_count();
208
10
            return;
209
10
        } else {
210
0
            tablet = std::move(res).value();
211
0
        }
212
0
        if (!synced_tablets.contains(meta.tablet_id())) {
213
0
            auto st = tablet->sync_rowsets();
214
0
            if (!st) {
215
                // just log failed, try it best
216
0
                LOG(WARNING) << "failed to sync rowsets: " << meta.tablet_id()
217
0
                             << " err msg: " << st.to_string();
218
0
            }
219
0
            synced_tablets.insert(meta.tablet_id());
220
0
        }
221
0
        auto id_to_rowset_meta_map = snapshot_rs_metas(tablet.get());
222
0
        auto find_it = id_to_rowset_meta_map.find(meta.rowset_id());
223
0
        if (find_it == id_to_rowset_meta_map.end()) {
224
0
            LOG(WARNING) << "download_file_cache_block: tablet_id=" << meta.tablet_id()
225
0
                         << " rowset_id not found, rowset_id=" << meta.rowset_id();
226
0
            decrease_inflight_count();
227
0
            return;
228
0
        }
229
230
0
        auto storage_resource = find_it->second->remote_storage_resource();
231
0
        if (!storage_resource) {
232
0
            LOG(WARNING) << storage_resource.error();
233
0
            decrease_inflight_count();
234
0
            return;
235
0
        }
236
        // Use RowsetMeta::fs() instead of storage_resource->fs to support packed file.
237
        // RowsetMeta::fs() wraps the underlying FileSystem with PackedFileSystem when
238
        // packed_slice_locations is not empty, which correctly maps segment file paths
239
        // to their actual locations within packed files.
240
0
        auto file_system = find_it->second->fs();
241
0
        if (!file_system) {
242
0
            LOG(WARNING) << "download_file_cache_block: failed to get file system for tablet_id="
243
0
                         << meta.tablet_id() << ", rowset_id=" << meta.rowset_id();
244
0
            decrease_inflight_count();
245
0
            return;
246
0
        }
247
248
        // Capture decrease_inflight_count by value to ensure lifetime safety
249
        // even if download_done is called asynchronously in the future
250
0
        auto download_done = [decrease_inflight_count, tablet_id = meta.tablet_id()](Status st) {
251
0
            TEST_SYNC_POINT_CALLBACK("FileCacheBlockDownloader::download_file_cache_block");
252
0
            decrease_inflight_count();
253
0
            LOG(INFO) << "download_file_cache_block: download_done, tablet_Id=" << tablet_id
254
0
                      << " status=" << st.to_string();
255
0
        };
256
257
0
        std::string path;
258
0
        doris::FileType file_type =
259
0
                meta.has_file_type() ? meta.file_type() : doris::FileType::SEGMENT_FILE;
260
0
        bool is_index = (file_type == doris::FileType::INVERTED_INDEX_FILE);
261
0
        if (is_index) {
262
0
            path = storage_resource.value()->remote_idx_v2_path(*find_it->second,
263
0
                                                                meta.segment_id());
264
0
        } else {
265
            // default .dat
266
0
            path = storage_resource.value()->remote_segment_path(*find_it->second,
267
0
                                                                 meta.segment_id());
268
0
        }
269
270
0
        DownloadFileMeta download_meta {
271
0
                .path = path,
272
0
                .file_size = meta.has_file_size() ? meta.file_size()
273
0
                                                  : -1, // To avoid trigger get file size IO
274
0
                .offset = meta.offset(),
275
0
                .download_size = meta.size(),
276
0
                .file_system = file_system,
277
0
                .ctx =
278
0
                        {
279
0
                                .is_index_data = meta.cache_type() == ::doris::FileCacheType::INDEX,
280
0
                                .expiration_time = meta.expiration_time(),
281
0
                                .is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
282
0
                                .is_warmup = true,
283
0
                        },
284
0
                .download_done = std::move(download_done),
285
0
                .tablet_id = meta.tablet_id(),
286
0
        };
287
0
        download_segment_file(download_meta);
288
0
    });
289
4
}
290
291
0
void FileCacheBlockDownloader::download_segment_file(const DownloadFileMeta& meta) {
292
0
    VLOG_DEBUG << "download_segment_file: start, path=" << meta.path << ", offset=" << meta.offset
293
0
               << ", download_size=" << meta.download_size << ", file_size=" << meta.file_size;
294
0
    DBUG_EXECUTE_IF("FileCacheBlockDownloader::download_segment_file.skip_warmup", {
295
0
        if (meta.ctx.is_warmup) {
296
0
            LOG(INFO) << "download_segment_file: skip warmup download by debug point, path="
297
0
                      << meta.path << ", offset=" << meta.offset
298
0
                      << ", download_size=" << meta.download_size;
299
0
            if (meta.download_done) {
300
0
                meta.download_done(Status::OK());
301
0
            }
302
0
            return;
303
0
        }
304
0
    });
305
306
0
    FileReaderSPtr file_reader;
307
0
    FileReaderOptions opts {
308
0
            .cache_type = FileCachePolicy::FILE_BLOCK_CACHE,
309
0
            .is_doris_table = true,
310
0
            .cache_base_path {},
311
0
            .file_size = meta.file_size,
312
0
            .tablet_id = meta.tablet_id,
313
0
            .storage_resource_id = meta.file_system->id(),
314
0
    };
315
0
    auto st = meta.file_system->open_file(meta.path, &file_reader, &opts);
316
0
    if (!st.ok()) {
317
0
        LOG(WARNING) << "failed to download file path=" << meta.path << ", st=" << st;
318
0
        if (meta.download_done) {
319
0
            meta.download_done(std::move(st));
320
0
        }
321
0
        g_file_cache_download_failed_num << 1;
322
0
        return;
323
0
    }
324
325
0
    size_t one_single_task_size = config::s3_write_buffer_size;
326
327
0
    int64_t download_size = meta.download_size > 0 ? meta.download_size : file_reader->size();
328
0
    size_t task_num = (download_size + one_single_task_size - 1) / one_single_task_size;
329
330
0
    std::unique_ptr<char[]> buffer(new char[one_single_task_size]);
331
332
0
    DBUG_EXECUTE_IF("FileCacheBlockDownloader::download_segment_file_sleep", {
333
0
        if (meta.ctx.is_warmup) {
334
0
            auto sleep_time = DebugPoints::instance()->get_debug_param_or_default<int32_t>(
335
0
                    "FileCacheBlockDownloader::download_segment_file_sleep", "sleep_time", 3);
336
0
            LOG(INFO) << "FileCacheBlockDownloader::download_segment_file_sleep: sleep_time="
337
0
                      << sleep_time;
338
0
            sleep(sleep_time);
339
0
        }
340
0
    });
341
342
0
    size_t task_offset = 0;
343
0
    for (size_t i = 0; i < task_num; i++) {
344
0
        size_t offset = meta.offset + task_offset;
345
346
0
        size_t size = std::min(one_single_task_size,
347
0
                               static_cast<size_t>(meta.download_size - task_offset));
348
0
        size_t bytes_read;
349
0
        VLOG_DEBUG << "download_segment_file, path=" << meta.path << ", read_at offset=" << offset
350
0
                   << ", size=" << size;
351
        // TODO(plat1ko):
352
        //  1. Directly append buffer data to file cache
353
        //  2. Provide `FileReader::async_read()` interface
354
0
        DCHECK(meta.ctx.is_dryrun == config::enable_reader_dryrun_when_download_file_cache);
355
0
        st = file_reader->read_at(offset, {buffer.get(), size}, &bytes_read, &meta.ctx);
356
0
        if (!st.ok()) {
357
0
            LOG(WARNING) << "failed to download file path=" << meta.path << ", st=" << st;
358
0
            if (meta.download_done) {
359
0
                meta.download_done(std::move(st));
360
0
            }
361
0
            g_file_cache_download_failed_num << 1;
362
0
            return;
363
0
        }
364
0
        task_offset += size;
365
0
        g_file_cache_download_finished_size << size;
366
0
    }
367
368
0
    if (meta.download_done) {
369
0
        LOG(INFO) << "download_segment_file: download finished, path=" << meta.path;
370
0
        meta.download_done(Status::OK());
371
0
    }
372
0
    g_file_cache_download_finished_num << 1;
373
0
}
374
375
4
void FileCacheBlockDownloader::download_blocks(DownloadTask& task) {
376
4
    switch (task.task_message.index()) {
377
4
    case 0: {
378
4
        bool should_balance_task = true;
379
4
        DBUG_EXECUTE_IF("FileCacheBlockDownloader.download_blocks.balance_task",
380
4
                        { should_balance_task = false; });
381
4
        if (should_balance_task) {
382
4
            download_file_cache_block(std::get<0>(task.task_message));
383
4
        }
384
385
4
        break;
386
0
    }
387
0
    case 1:
388
0
        download_segment_file(std::get<1>(task.task_message));
389
0
        break;
390
4
    }
391
4
}
392
393
} // namespace doris::io