Coverage Report

Created: 2026-03-13 19:41

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/cache/block_file_cache_downloader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCacheFactory.h
19
// and modified by Doris
20
21
#pragma once
22
23
#include <gen_cpp/internal_service.pb.h>
24
#include <glog/logging.h>
25
26
#include <atomic>
27
#include <chrono>
28
#include <condition_variable>
29
#include <deque>
30
#include <memory>
31
#include <thread>
32
#include <variant>
33
34
#include "cloud/cloud_storage_engine.h"
35
#include "io/fs/file_system.h"
36
37
namespace doris::io {
38
39
struct DownloadFileMeta {
40
    Path path;
41
    int64_t file_size {-1};
42
    int64_t offset {0};
43
    int64_t download_size {-1};
44
    io::FileSystemSPtr file_system;
45
    IOContext ctx;
46
    std::function<void(Status)> download_done;
47
};
48
49
struct DownloadTask {
50
    std::chrono::steady_clock::time_point atime = std::chrono::steady_clock::now();
51
52
    using FileCacheBlockMetaVec =
53
            ::google::protobuf::RepeatedPtrField< ::doris::FileCacheBlockMeta>;
54
55
    std::variant<FileCacheBlockMetaVec, DownloadFileMeta> task_message;
56
4
    DownloadTask(FileCacheBlockMetaVec metas) : task_message(std::move(metas)) {}
57
0
    DownloadTask(DownloadFileMeta meta) : task_message(std::move(meta)) {}
58
9
    DownloadTask() = default;
59
};
60
61
class FileCacheBlockDownloader {
62
public:
63
    explicit FileCacheBlockDownloader(CloudStorageEngine& engine);
64
65
    ~FileCacheBlockDownloader();
66
67
    // download into cache block
68
    void download_blocks(DownloadTask& task);
69
70
    // submit the task to download queue
71
    void submit_download_task(DownloadTask task);
72
    // polling the queue, get the task to download
73
    void polling_download_task();
74
    // check whether the tasks about tables finish or not
75
    void check_download_task(const std::vector<int64_t>& tablets, std::map<int64_t, bool>* done);
76
77
private:
78
    void download_file_cache_block(const DownloadTask::FileCacheBlockMetaVec&);
79
    void download_segment_file(const DownloadFileMeta&);
80
81
    CloudStorageEngine& _engine;
82
83
    std::thread _poller;
84
    std::unique_ptr<ThreadPool> _workers;
85
86
    std::mutex _mtx;
87
    bool _closed {false};
88
    std::condition_variable _empty;
89
    std::deque<DownloadTask> _task_queue;
90
91
    std::mutex _inflight_mtx;
92
    // tablet id -> inflight block num of tablet
93
    std::unordered_map<int64_t, int64_t> _inflight_tablets;
94
95
    static inline constexpr size_t _max_size {102400};
96
};
97
98
} // namespace doris::io