Coverage Report

Created: 2026-05-09 20:48

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_meta_mgr.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
#pragma once
18
19
#include <gen_cpp/olap_file.pb.h>
20
21
#include <future>
22
#include <memory>
23
#include <string>
24
#include <tuple>
25
#include <variant>
26
#include <vector>
27
28
#include "cloud/cloud_tablet.h"
29
#include "common/status.h"
30
#include "storage/rowset/rowset_fwd.h"
31
#include "storage/rowset/rowset_meta.h"
32
#include "util/s3_util.h"
33
34
namespace doris {
35
36
class DeleteBitmap;
37
class StreamLoadContext;
38
class CloudTablet;
39
class TabletMeta;
40
class TabletSchema;
41
class TabletMetaPB;
42
class RowsetMeta;
43
44
namespace cloud {
45
46
class FinishTabletJobResponse;
47
class StartTabletJobResponse;
48
class TabletJobInfoPB;
49
class TabletStatsPB;
50
class TabletIndexPB;
51
class HostLevelMSRpcRateLimiters;
52
class MSBackpressureHandler;
53
54
using StorageVaultInfos = std::vector<
55
        std::tuple<std::string, std::variant<S3Conf, HdfsVaultInfo>, StorageVaultPB_PathFormat>>;
56
57
// run tasks in bthread with concurrency and wait until all tasks done
58
// it stops running tasks if there are any tasks return !ok, leaving some tasks untouched
59
// return OK if all tasks successfully done, otherwise return the result of the failed task
60
Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int concurrency);
61
62
// An async wrap of `bthread_fork_join` declared previously using promise-future
63
// return OK if fut successfully created, otherwise return error
64
Status bthread_fork_join(std::vector<std::function<Status()>>&& tasks, int concurrency,
65
                         std::future<Status>* fut);
66
67
class CloudMetaMgr {
68
public:
69
169
    CloudMetaMgr() = default;
70
    ~CloudMetaMgr() = default;
71
    CloudMetaMgr(const CloudMetaMgr&) = delete;
72
    CloudMetaMgr& operator=(const CloudMetaMgr&) = delete;
73
74
    Status get_tablet_meta(int64_t tablet_id, std::shared_ptr<TabletMeta>* tablet_meta);
75
76
    Status sync_tablet_rowsets(CloudTablet* tablet, const SyncOptions& options = {},
77
                               SyncRowsetStats* sync_stats = nullptr);
78
    Status sync_tablet_rowsets_unlocked(
79
            CloudTablet* tablet, std::unique_lock<bthread::Mutex>& lock /* _sync_meta_lock */,
80
            const SyncOptions& options = {}, SyncRowsetStats* sync_stats = nullptr);
81
82
    Status prepare_rowset(const RowsetMeta& rs_meta, const std::string& job_id, int64_t table_id,
83
                          std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr);
84
85
    Status commit_rowset(RowsetMeta& rs_meta, const std::string& job_id, int64_t table_id,
86
                         std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr);
87
    void cache_committed_rowset(RowsetMetaSharedPtr rs_meta, int64_t expiration_time);
88
89
    Status update_tmp_rowset(const RowsetMeta& rs_meta, int64_t table_id);
90
91
    Status update_packed_file_info(const std::string& packed_file_path,
92
                                   const cloud::PackedFileInfoPB& packed_file_info,
93
                                   int64_t table_id);
94
95
    Status commit_txn(const StreamLoadContext& ctx, bool is_2pc);
96
97
    Status abort_txn(const StreamLoadContext& ctx);
98
99
    Status precommit_txn(const StreamLoadContext& ctx);
100
101
    /**
102
     * Prepares a restore job for a tablet to meta-service
103
     * Change the state to PREPARED
104
     * PREPARED state means the meta of tablet has been uploaded but not finalized.
105
     */
106
    Status prepare_restore_job(const TabletMetaPB& tablet_meta);
107
108
    /**
109
     * Commits a restore job for a tablet to meta-service
110
     * Change the state from PREPARED to COMMITTED
111
     * COMMITTED state means the meta of tablet has been finalized.
112
     */
113
    Status commit_restore_job(const int64_t tablet_id);
114
115
    /**
116
     * Finish a restore job for a tablet from meta-service
117
     * Change the state to final state.
118
     * If is_completed = true, change the state from COMMITTED to COMPLETED
119
     * If is_completed = false, change the state to from PREPARED/COMMITTED to DROPPED
120
     * COMPLETED state means the job is finished, the restored data should be visible.
121
     * DROPPED state means the job is aborted.
122
     * COMPLETED/DROPPED are the final states, jobs with final states will be recycled.
123
     */
124
    Status finish_restore_job(const int64_t tablet_id, bool is_completed);
125
126
    /**
127
     * Gets storage vault (storage backends) from meta-service
128
     * 
129
     * @param vault_info output param, all storage backends
130
     * @param is_vault_mode output param, true for pure vault mode, false for legacy mode
131
     * @return status
132
     */
133
    Status get_storage_vault_info(StorageVaultInfos* vault_infos, bool* is_vault_mode);
134
135
    Status prepare_tablet_job(const TabletJobInfoPB& job, StartTabletJobResponse* res);
136
137
    Status commit_tablet_job(const TabletJobInfoPB& job, FinishTabletJobResponse* res);
138
139
    Status abort_tablet_job(const TabletJobInfoPB& job);
140
141
    Status lease_tablet_job(const TabletJobInfoPB& job);
142
143
    Status update_delete_bitmap(const CloudTablet& tablet, int64_t lock_id, int64_t initiator,
144
                                DeleteBitmap* delete_bitmap, DeleteBitmap* delete_bitmap_v2,
145
                                std::string rowset_id,
146
                                std::optional<StorageResource> storage_resource,
147
                                int64_t store_version, int64_t table_id, int64_t txn_id = -1,
148
                                bool is_explicit_txn = false, int64_t next_visible_version = -1);
149
150
    Status cloud_update_delete_bitmap_without_lock(
151
            const CloudTablet& tablet, DeleteBitmap* delete_bitmap,
152
            std::map<std::string, int64_t>& rowset_to_versions, int64_t table_id,
153
            int64_t pre_rowset_agg_start_version = 0, int64_t pre_rowset_agg_end_version = 0);
154
155
    Status get_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id,
156
                                         int64_t initiator);
157
158
    void remove_delete_bitmap_update_lock(int64_t table_id, int64_t lock_id, int64_t initiator,
159
                                          int64_t tablet_id);
160
161
    // Fill version holes by creating empty rowsets for missing versions
162
    Status fill_version_holes(CloudTablet* tablet, int64_t max_version,
163
                              std::unique_lock<std::shared_mutex>& wlock);
164
165
    // Create an empty rowset to fill a version hole
166
    Status create_empty_rowset_for_hole(CloudTablet* tablet, int64_t version,
167
                                        RowsetMetaSharedPtr prev_rowset_meta,
168
                                        RowsetSharedPtr* rowset);
169
170
    Status list_snapshot(std::vector<SnapshotInfoPB>& snapshots);
171
    Status get_snapshot_properties(SnapshotSwitchStatus& switch_status,
172
                                   int64_t& max_reserved_snapshots,
173
                                   int64_t& snapshot_interval_seconds);
174
175
    // Get all cluster status for the instance
176
    // Returns cluster_id -> (status, mtime_ms)
177
    // If my_cluster_id is not null, also returns the requesting node's cluster_id
178
    Status get_cluster_status(std::unordered_map<std::string, std::pair<int32_t, int64_t>>* result,
179
                              std::string* my_cluster_id = nullptr);
180
181
0
    void set_host_level_ms_rpc_rate_limiters(HostLevelMSRpcRateLimiters* limiters) {
182
0
        host_level_ms_rpc_rate_limiters_ = limiters;
183
0
    }
184
185
0
    void set_ms_backpressure_handler(MSBackpressureHandler* handler) {
186
0
        ms_backpressure_handler_ = handler;
187
0
    }
188
189
private:
190
    bool sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, std::ranges::range auto&& rs_metas,
191
                                            DeleteBitmap* delete_bitmap);
192
193
    Status sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_max_version,
194
                                     std::ranges::range auto&& rs_metas, const TabletStatsPB& stats,
195
                                     const TabletIndexPB& idx, DeleteBitmap* delete_bitmap,
196
                                     bool full_sync, SyncRowsetStats* sync_stats,
197
                                     int32_t read_version, bool full_sync_v2);
198
    Status _read_tablet_delete_bitmap_v2(CloudTablet* tablet, int64_t old_max_version,
199
                                         std::ranges::range auto&& rs_metas,
200
                                         DeleteBitmap* delete_bitmap, GetDeleteBitmapResponse& res,
201
                                         int64_t& remote_delete_bitmap_bytes, bool full_sync_v2);
202
    Status _log_mow_delete_bitmap(CloudTablet* tablet, GetRowsetResponse& resp,
203
                                  DeleteBitmap& delete_bitmap, int64_t old_max_version,
204
                                  bool full_sync, int32_t read_version);
205
    Status _check_delete_bitmap_v2_correctness(CloudTablet* tablet, GetRowsetRequest& req,
206
                                               GetRowsetResponse& resp, int64_t old_max_version);
207
208
    Status _get_delete_bitmap_from_ms(GetDeleteBitmapRequest& req, GetDeleteBitmapResponse& res);
209
    Status _get_delete_bitmap_from_ms_by_batch(GetDeleteBitmapRequest& req,
210
                                               GetDeleteBitmapResponse& res,
211
                                               int64_t bytes_threadhold);
212
213
    void check_table_size_correctness(RowsetMeta& rs_meta);
214
    int64_t get_segment_file_size(RowsetMeta& rs_meta);
215
    int64_t get_inverted_index_file_size(RowsetMeta& rs_meta);
216
217
    HostLevelMSRpcRateLimiters* host_level_ms_rpc_rate_limiters_ {nullptr};
218
    MSBackpressureHandler* ms_backpressure_handler_ {nullptr};
219
};
220
221
} // namespace cloud
222
} // namespace doris