Coverage Report

Created: 2026-04-01 10:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/cache/block_file_cache_downloader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCacheFactory.h
19
// and modified by Doris
20
21
#pragma once
22
23
#include <gen_cpp/internal_service.pb.h>
24
#include <glog/logging.h>
25
26
#include <atomic>
27
#include <chrono>
28
#include <condition_variable>
29
#include <deque>
30
#include <memory>
31
#include <thread>
32
#include <variant>
33
34
#include "cloud/cloud_storage_engine.h"
35
#include "io/fs/file_system.h"
36
37
namespace doris::io {
38
39
struct DownloadFileMeta {
40
    Path path;
41
    int64_t file_size {-1};
42
    int64_t offset {0};
43
    int64_t download_size {-1};
44
    io::FileSystemSPtr file_system;
45
    IOContext ctx;
46
    std::function<void(Status)> download_done;
47
    int64_t tablet_id {-1};
48
};
49
50
struct DownloadTask {
51
    std::chrono::steady_clock::time_point atime = std::chrono::steady_clock::now();
52
53
    using FileCacheBlockMetaVec =
54
            ::google::protobuf::RepeatedPtrField< ::doris::FileCacheBlockMeta>;
55
56
    std::variant<FileCacheBlockMetaVec, DownloadFileMeta> task_message;
57
4
    DownloadTask(FileCacheBlockMetaVec metas) : task_message(std::move(metas)) {}
58
0
    DownloadTask(DownloadFileMeta meta) : task_message(std::move(meta)) {}
59
9
    DownloadTask() = default;
60
};
61
62
class FileCacheBlockDownloader {
63
public:
64
    explicit FileCacheBlockDownloader(CloudStorageEngine& engine);
65
66
    ~FileCacheBlockDownloader();
67
68
    // download into cache block
69
    void download_blocks(DownloadTask& task);
70
71
    // submit the task to download queue
72
    void submit_download_task(DownloadTask task);
73
    // polling the queue, get the task to download
74
    void polling_download_task();
75
    // check whether the tasks about tables finish or not
76
    void check_download_task(const std::vector<int64_t>& tablets, std::map<int64_t, bool>* done);
77
78
private:
79
    void download_file_cache_block(const DownloadTask::FileCacheBlockMetaVec&);
80
    void download_segment_file(const DownloadFileMeta&);
81
82
    CloudStorageEngine& _engine;
83
84
    std::thread _poller;
85
    std::unique_ptr<ThreadPool> _workers;
86
87
    std::mutex _mtx;
88
    bool _closed {false};
89
    std::condition_variable _empty;
90
    std::deque<DownloadTask> _task_queue;
91
92
    std::mutex _inflight_mtx;
93
    // tablet id -> inflight block num of tablet
94
    std::unordered_map<int64_t, int64_t> _inflight_tablets;
95
96
    static inline constexpr size_t _max_size {102400};
97
};
98
99
} // namespace doris::io