Coverage Report

Created: 2026-03-14 04:23

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/cache/block_file_cache_downloader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCacheFactory.h
19
// and modified by Doris
20
21
#include "io/cache/block_file_cache_downloader.h"
22
23
#include <bthread/bthread.h>
24
#include <bthread/countdown_event.h>
25
#include <bthread/unstable.h>
26
#include <bvar/bvar.h>
27
#include <fmt/core.h>
28
#include <gen_cpp/internal_service.pb.h>
29
30
#include <memory>
31
#include <mutex>
32
#include <unordered_set>
33
#include <variant>
34
35
#include "cloud/cloud_tablet_mgr.h"
36
#include "cloud/cloud_warm_up_manager.h"
37
#include "common/config.h"
38
#include "common/logging.h"
39
#include "cpp/sync_point.h"
40
#include "io/fs/file_reader.h"
41
#include "io/io_common.h"
42
#include "storage/rowset/beta_rowset.h"
43
#include "util/bvar_helper.h"
44
45
namespace doris::io {
46
47
bvar::Adder<uint64_t> g_file_cache_download_submitted_size("file_cache_download_submitted_size");
48
bvar::Adder<uint64_t> g_file_cache_download_finished_size("file_cache_download_finished_size");
49
bvar::Adder<uint64_t> g_file_cache_download_submitted_num("file_cache_download_submitted_num");
50
bvar::Adder<uint64_t> g_file_cache_download_finished_num("file_cache_download_finished_num");
51
bvar::Adder<uint64_t> g_file_cache_download_failed_num("file_cache_download_failed_num");
52
bvar::Adder<uint64_t> block_file_cache_downloader_task_total("file_cache_downloader_queue_total");
53
54
5
FileCacheBlockDownloader::FileCacheBlockDownloader(CloudStorageEngine& engine) : _engine(engine) {
55
5
    _poller = std::thread(&FileCacheBlockDownloader::polling_download_task, this);
56
5
    auto st = ThreadPoolBuilder("FileCacheBlockDownloader")
57
5
                      .set_min_threads(config::file_cache_downloader_thread_num_min)
58
5
                      .set_max_threads(config::file_cache_downloader_thread_num_max)
59
5
                      .build(&_workers);
60
5
    CHECK(st.ok()) << "failed to create FileCacheBlockDownloader";
61
5
}
62
63
5
FileCacheBlockDownloader::~FileCacheBlockDownloader() {
64
5
    {
65
5
        std::lock_guard lock(_mtx);
66
5
        _closed = true;
67
5
    }
68
5
    _empty.notify_all();
69
70
5
    if (_poller.joinable()) {
71
5
        _poller.join();
72
5
    }
73
74
5
    if (_workers) {
75
5
        _workers->shutdown();
76
5
    }
77
5
}
78
79
4
void FileCacheBlockDownloader::submit_download_task(DownloadTask task) {
80
4
    if (!config::enable_file_cache) [[unlikely]] {
81
0
        LOG(INFO) << "Skip submit download file task because file cache is not enabled";
82
0
        return;
83
0
    }
84
85
4
    if (task.task_message.index() == 0) { // download file cache block task
86
4
        std::lock_guard lock(_inflight_mtx);
87
10
        for (auto& meta : std::get<0>(task.task_message)) {
88
10
            ++_inflight_tablets[meta.tablet_id()];
89
10
            if (meta.size() > 0) {
90
10
                g_file_cache_download_submitted_size << meta.size();
91
10
            }
92
10
            LOG(INFO) << "submit_download_task: inflight_tablets[" << meta.tablet_id()
93
10
                      << "] = " << _inflight_tablets[meta.tablet_id()];
94
10
        }
95
4
    } else {
96
0
        int64_t download_size = std::get<1>(task.task_message).download_size;
97
0
        if (download_size > 0) {
98
0
            g_file_cache_download_submitted_size << download_size;
99
0
        }
100
0
    }
101
102
4
    {
103
4
        std::lock_guard lock(_mtx);
104
4
        if (_task_queue.size() == _max_size) {
105
0
            if (_task_queue.front().task_message.index() == 1) { // download segment file task
106
0
                auto& download_file_meta = std::get<1>(_task_queue.front().task_message);
107
0
                if (download_file_meta.download_done) {
108
0
                    download_file_meta.download_done(
109
0
                            Status::InternalError("The downloader queue is full"));
110
0
                }
111
0
                g_file_cache_download_failed_num << 1;
112
0
            }
113
0
            LOG(INFO) << "submit_download_task: task queue full, pop front";
114
0
            _task_queue.pop_front(); // Eliminate the earliest task in the queue
115
0
            block_file_cache_downloader_task_total << -1;
116
0
        }
117
4
        VLOG_DEBUG << "submit_download_task: push task, queue size before push: "
118
0
                   << _task_queue.size();
119
4
        _task_queue.push_back(std::move(task));
120
4
        block_file_cache_downloader_task_total << 1;
121
4
        _empty.notify_all();
122
4
    }
123
4
    g_file_cache_download_submitted_num << 1;
124
4
}
125
126
5
void FileCacheBlockDownloader::polling_download_task() {
127
5
    constexpr int64_t hot_interval = 2 * 60 * 60; // 2 hours
128
9
    while (!_closed) {
129
9
        DownloadTask task;
130
9
        {
131
9
            std::unique_lock lock(_mtx);
132
18
            _empty.wait(lock, [this]() { return !_task_queue.empty() || _closed; });
133
9
            if (_closed) {
134
5
                LOG(INFO) << "polling_download_task: downloader closed, exit polling";
135
5
                break;
136
5
            }
137
138
4
            task = std::move(_task_queue.front());
139
4
            _task_queue.pop_front();
140
4
            block_file_cache_downloader_task_total << -1;
141
4
            VLOG_DEBUG << "polling_download_task: pop task, queue size after pop: "
142
0
                       << _task_queue.size();
143
4
        }
144
145
4
        if (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() -
146
4
                                                             task.atime)
147
4
                    .count() < hot_interval) {
148
4
            VLOG_DEBUG << "polling_download_task: submit download_blocks to thread pool";
149
4
            auto st = _workers->submit_func(
150
4
                    [this, task_ = std::move(task)]() mutable { download_blocks(task_); });
151
4
            if (!st.ok()) {
152
0
                LOG(WARNING) << "submit download blocks failed: " << st;
153
0
            }
154
4
        }
155
4
    }
156
5
}
157
158
void FileCacheBlockDownloader::check_download_task(const std::vector<int64_t>& tablets,
159
10
                                                   std::map<int64_t, bool>* done) {
160
10
    std::lock_guard lock(_inflight_mtx);
161
18
    for (int64_t tablet_id : tablets) {
162
18
        done->insert({tablet_id, !_inflight_tablets.contains(tablet_id)});
163
18
    }
164
10
}
165
166
0
std::unordered_map<std::string, RowsetMetaSharedPtr> snapshot_rs_metas(BaseTablet* tablet) {
167
0
    std::unordered_map<std::string, RowsetMetaSharedPtr> id_to_rowset_meta_map;
168
0
    auto visitor = [&id_to_rowset_meta_map](const RowsetSharedPtr& r) {
169
0
        id_to_rowset_meta_map.emplace(r->rowset_meta()->rowset_id().to_string(), r->rowset_meta());
170
0
    };
171
0
    constexpr bool include_stale = false;
172
0
    tablet->traverse_rowsets(visitor, include_stale);
173
0
    return id_to_rowset_meta_map;
174
0
}
175
176
void FileCacheBlockDownloader::download_file_cache_block(
177
4
        const DownloadTask::FileCacheBlockMetaVec& metas) {
178
4
    std::unordered_set<int64_t> synced_tablets;
179
10
    std::ranges::for_each(metas, [&](const FileCacheBlockMeta& meta) {
180
10
        VLOG_DEBUG << "download_file_cache_block: start, tablet_id=" << meta.tablet_id()
181
0
                   << ", rowset_id=" << meta.rowset_id() << ", segment_id=" << meta.segment_id()
182
0
                   << ", offset=" << meta.offset() << ", size=" << meta.size()
183
0
                   << ", type=" << meta.cache_type();
184
185
        // Helper to decrease inflight count on early return.
186
        // NOTE: This lambda captures 'this' pointer. It's safe because:
187
        // 1. download_segment_file() calls download_done synchronously
188
        // 2. ~FileCacheBlockDownloader() waits for all workers to finish via _workers->shutdown()
189
        // If this assumption changes (e.g., async callback), consider using shared_from_this pattern.
190
10
        auto decrease_inflight_count = [this, tablet_id = meta.tablet_id()]() {
191
10
            std::lock_guard lock(_inflight_mtx);
192
10
            auto it = _inflight_tablets.find(tablet_id);
193
10
            if (it == _inflight_tablets.end()) {
194
0
                LOG(WARNING) << "inflight ref cnt not exist, tablet id " << tablet_id;
195
10
            } else {
196
10
                it->second--;
197
10
                VLOG_DEBUG << "download_file_cache_block: inflight_tablets[" << tablet_id
198
0
                           << "] = " << it->second;
199
10
                if (it->second <= 0) {
200
8
                    DCHECK_EQ(it->second, 0) << it->first;
201
8
                    _inflight_tablets.erase(it);
202
8
                    VLOG_DEBUG << "download_file_cache_block: erase inflight_tablets[" << tablet_id
203
0
                               << "]";
204
8
                }
205
10
            }
206
10
        };
207
208
10
        CloudTabletSPtr tablet;
209
10
        if (auto res = _engine.tablet_mgr().get_tablet(meta.tablet_id(), false); !res.has_value()) {
210
10
            LOG(INFO) << "failed to find tablet " << meta.tablet_id() << " : " << res.error();
211
10
            decrease_inflight_count();
212
10
            return;
213
10
        } else {
214
0
            tablet = std::move(res).value();
215
0
        }
216
0
        if (!synced_tablets.contains(meta.tablet_id())) {
217
0
            auto st = tablet->sync_rowsets();
218
0
            if (!st) {
219
                // just log failed, try it best
220
0
                LOG(WARNING) << "failed to sync rowsets: " << meta.tablet_id()
221
0
                             << " err msg: " << st.to_string();
222
0
            }
223
0
            synced_tablets.insert(meta.tablet_id());
224
0
        }
225
0
        auto id_to_rowset_meta_map = snapshot_rs_metas(tablet.get());
226
0
        auto find_it = id_to_rowset_meta_map.find(meta.rowset_id());
227
0
        if (find_it == id_to_rowset_meta_map.end()) {
228
0
            LOG(WARNING) << "download_file_cache_block: tablet_id=" << meta.tablet_id()
229
0
                         << " rowset_id not found, rowset_id=" << meta.rowset_id();
230
0
            decrease_inflight_count();
231
0
            return;
232
0
        }
233
234
0
        auto storage_resource = find_it->second->remote_storage_resource();
235
0
        if (!storage_resource) {
236
0
            LOG(WARNING) << storage_resource.error();
237
0
            decrease_inflight_count();
238
0
            return;
239
0
        }
240
        // Use RowsetMeta::fs() instead of storage_resource->fs to support packed file.
241
        // RowsetMeta::fs() wraps the underlying FileSystem with PackedFileSystem when
242
        // packed_slice_locations is not empty, which correctly maps segment file paths
243
        // to their actual locations within packed files.
244
0
        auto file_system = find_it->second->fs();
245
0
        if (!file_system) {
246
0
            LOG(WARNING) << "download_file_cache_block: failed to get file system for tablet_id="
247
0
                         << meta.tablet_id() << ", rowset_id=" << meta.rowset_id();
248
0
            decrease_inflight_count();
249
0
            return;
250
0
        }
251
252
        // Capture decrease_inflight_count by value to ensure lifetime safety
253
        // even if download_done is called asynchronously in the future
254
0
        auto download_done = [decrease_inflight_count, tablet_id = meta.tablet_id()](Status st) {
255
0
            TEST_SYNC_POINT_CALLBACK("FileCacheBlockDownloader::download_file_cache_block");
256
0
            decrease_inflight_count();
257
0
            LOG(INFO) << "download_file_cache_block: download_done, tablet_Id=" << tablet_id
258
0
                      << " status=" << st.to_string();
259
0
        };
260
261
0
        std::string path;
262
0
        doris::FileType file_type =
263
0
                meta.has_file_type() ? meta.file_type() : doris::FileType::SEGMENT_FILE;
264
0
        bool is_index = (file_type == doris::FileType::INVERTED_INDEX_FILE);
265
0
        if (is_index) {
266
0
            path = storage_resource.value()->remote_idx_v2_path(*find_it->second,
267
0
                                                                meta.segment_id());
268
0
        } else {
269
            // default .dat
270
0
            path = storage_resource.value()->remote_segment_path(*find_it->second,
271
0
                                                                 meta.segment_id());
272
0
        }
273
274
0
        DownloadFileMeta download_meta {
275
0
                .path = path,
276
0
                .file_size = meta.has_file_size() ? meta.file_size()
277
0
                                                  : -1, // To avoid trigger get file size IO
278
0
                .offset = meta.offset(),
279
0
                .download_size = meta.size(),
280
0
                .file_system = file_system,
281
0
                .ctx =
282
0
                        {
283
0
                                .is_index_data = meta.cache_type() == ::doris::FileCacheType::INDEX,
284
0
                                .expiration_time = meta.expiration_time(),
285
0
                                .is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
286
0
                                .is_warmup = true,
287
0
                        },
288
0
                .download_done = std::move(download_done),
289
0
        };
290
0
        download_segment_file(download_meta);
291
0
    });
292
4
}
293
294
0
void FileCacheBlockDownloader::download_segment_file(const DownloadFileMeta& meta) {
295
0
    LOG(INFO) << "download_segment_file: start, path=" << meta.path << ", offset=" << meta.offset
296
0
              << ", download_size=" << meta.download_size << ", file_size=" << meta.file_size;
297
0
    FileReaderSPtr file_reader;
298
0
    FileReaderOptions opts {
299
0
            .cache_type = FileCachePolicy::FILE_BLOCK_CACHE,
300
0
            .is_doris_table = true,
301
0
            .cache_base_path {},
302
0
            .file_size = meta.file_size,
303
0
    };
304
0
    auto st = meta.file_system->open_file(meta.path, &file_reader, &opts);
305
0
    if (!st.ok()) {
306
0
        LOG(WARNING) << "failed to download file path=" << meta.path << ", st=" << st;
307
0
        if (meta.download_done) {
308
0
            meta.download_done(std::move(st));
309
0
        }
310
0
        g_file_cache_download_failed_num << 1;
311
0
        return;
312
0
    }
313
314
0
    size_t one_single_task_size = config::s3_write_buffer_size;
315
316
0
    int64_t download_size = meta.download_size > 0 ? meta.download_size : file_reader->size();
317
0
    size_t task_num = (download_size + one_single_task_size - 1) / one_single_task_size;
318
319
0
    std::unique_ptr<char[]> buffer(new char[one_single_task_size]);
320
321
0
    DBUG_EXECUTE_IF("FileCacheBlockDownloader::download_segment_file_sleep", {
322
0
        auto sleep_time = DebugPoints::instance()->get_debug_param_or_default<int32_t>(
323
0
                "FileCacheBlockDownloader::download_segment_file_sleep", "sleep_time", 3);
324
0
        LOG(INFO) << "FileCacheBlockDownloader::download_segment_file_sleep: sleep_time="
325
0
                  << sleep_time;
326
0
        sleep(sleep_time);
327
0
    });
328
329
0
    size_t task_offset = 0;
330
0
    for (size_t i = 0; i < task_num; i++) {
331
0
        size_t offset = meta.offset + task_offset;
332
333
0
        size_t size = std::min(one_single_task_size,
334
0
                               static_cast<size_t>(meta.download_size - task_offset));
335
0
        size_t bytes_read;
336
0
        VLOG_DEBUG << "download_segment_file, path=" << meta.path << ", read_at offset=" << offset
337
0
                   << ", size=" << size;
338
        // TODO(plat1ko):
339
        //  1. Directly append buffer data to file cache
340
        //  2. Provide `FileReader::async_read()` interface
341
0
        DCHECK(meta.ctx.is_dryrun == config::enable_reader_dryrun_when_download_file_cache);
342
0
        st = file_reader->read_at(offset, {buffer.get(), size}, &bytes_read, &meta.ctx);
343
0
        if (!st.ok()) {
344
0
            LOG(WARNING) << "failed to download file path=" << meta.path << ", st=" << st;
345
0
            if (meta.download_done) {
346
0
                meta.download_done(std::move(st));
347
0
            }
348
0
            g_file_cache_download_failed_num << 1;
349
0
            return;
350
0
        }
351
0
        task_offset += size;
352
0
        g_file_cache_download_finished_size << size;
353
0
    }
354
355
0
    if (meta.download_done) {
356
0
        LOG(INFO) << "download_segment_file: download finished, path=" << meta.path;
357
0
        meta.download_done(Status::OK());
358
0
    }
359
0
    g_file_cache_download_finished_num << 1;
360
0
}
361
362
4
void FileCacheBlockDownloader::download_blocks(DownloadTask& task) {
363
4
    switch (task.task_message.index()) {
364
4
    case 0: {
365
4
        bool should_balance_task = true;
366
4
        DBUG_EXECUTE_IF("FileCacheBlockDownloader.download_blocks.balance_task",
367
4
                        { should_balance_task = false; });
368
4
        if (should_balance_task) {
369
4
            download_file_cache_block(std::get<0>(task.task_message));
370
4
        }
371
372
4
        break;
373
0
    }
374
0
    case 1:
375
0
        download_segment_file(std::get<1>(task.task_message));
376
0
        break;
377
4
    }
378
4
}
379
380
} // namespace doris::io