Coverage Report

Created: 2025-08-22 14:32

/root/doris/be/src/olap/olap_server.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include <gen_cpp/Types_types.h>
19
#include <gen_cpp/olap_file.pb.h>
20
#include <glog/logging.h>
21
#include <rapidjson/prettywriter.h>
22
#include <rapidjson/stringbuffer.h>
23
#include <stdint.h>
24
#include <sys/types.h>
25
26
#include <algorithm>
27
#include <atomic>
28
// IWYU pragma: no_include <bits/chrono.h>
29
#include <chrono> // IWYU pragma: keep
30
#include <cmath>
31
#include <condition_variable>
32
#include <cstdint>
33
#include <ctime>
34
#include <functional>
35
#include <map>
36
#include <memory>
37
#include <mutex>
38
#include <ostream>
39
#include <random>
40
#include <shared_mutex>
41
#include <string>
42
#include <thread>
43
#include <unordered_set>
44
#include <utility>
45
#include <vector>
46
47
#include "agent/utils.h"
48
#include "common/config.h"
49
#include "common/logging.h"
50
#include "common/status.h"
51
#include "cpp/sync_point.h"
52
#include "gen_cpp/FrontendService.h"
53
#include "gen_cpp/internal_service.pb.h"
54
#include "gutil/ref_counted.h"
55
#include "io/fs/file_writer.h" // IWYU pragma: keep
56
#include "io/fs/path.h"
57
#include "olap/base_tablet.h"
58
#include "olap/cold_data_compaction.h"
59
#include "olap/compaction_permit_limiter.h"
60
#include "olap/cumulative_compaction.h"
61
#include "olap/cumulative_compaction_policy.h"
62
#include "olap/cumulative_compaction_time_series_policy.h"
63
#include "olap/data_dir.h"
64
#include "olap/olap_common.h"
65
#include "olap/olap_define.h"
66
#include "olap/rowset/segcompaction.h"
67
#include "olap/schema_change.h"
68
#include "olap/single_replica_compaction.h"
69
#include "olap/storage_engine.h"
70
#include "olap/storage_policy.h"
71
#include "olap/tablet.h"
72
#include "olap/tablet_manager.h"
73
#include "olap/tablet_meta.h"
74
#include "olap/tablet_meta_manager.h"
75
#include "olap/tablet_schema.h"
76
#include "olap/task/engine_publish_version_task.h"
77
#include "olap/task/index_builder.h"
78
#include "runtime/client_cache.h"
79
#include "runtime/memory/cache_manager.h"
80
#include "runtime/memory/global_memory_arbitrator.h"
81
#include "util/countdown_latch.h"
82
#include "util/debug_points.h"
83
#include "util/doris_metrics.h"
84
#include "util/mem_info.h"
85
#include "util/metrics.h"
86
#include "util/thread.h"
87
#include "util/threadpool.h"
88
#include "util/thrift_rpc_helper.h"
89
#include "util/time.h"
90
#include "util/uid_util.h"
91
#include "util/work_thread_pool.hpp"
92
93
using std::string;
94
95
namespace doris {
96
97
using io::Path;
98
99
// number of running SCHEMA-CHANGE threads
100
volatile uint32_t g_schema_change_active_threads = 0;
101
bvar::Status<int64_t> g_cumu_compaction_task_num_per_round("cumu_compaction_task_num_per_round", 0);
102
bvar::Status<int64_t> g_base_compaction_task_num_per_round("base_compaction_task_num_per_round", 0);
103
104
static const uint64_t DEFAULT_SEED = 104729;
105
static const uint64_t MOD_PRIME = 7652413;
106
107
0
CompactionSubmitRegistry::CompactionSubmitRegistry(CompactionSubmitRegistry&& r) {
108
0
    std::swap(_tablet_submitted_cumu_compaction, r._tablet_submitted_cumu_compaction);
109
0
    std::swap(_tablet_submitted_base_compaction, r._tablet_submitted_base_compaction);
110
0
    std::swap(_tablet_submitted_full_compaction, r._tablet_submitted_full_compaction);
111
0
}
112
113
0
CompactionSubmitRegistry CompactionSubmitRegistry::create_snapshot() {
114
    // full compaction is not engaged in this method
115
0
    std::unique_lock<std::mutex> l(_tablet_submitted_compaction_mutex);
116
0
    CompactionSubmitRegistry registry;
117
0
    registry._tablet_submitted_base_compaction = _tablet_submitted_base_compaction;
118
0
    registry._tablet_submitted_cumu_compaction = _tablet_submitted_cumu_compaction;
119
0
    return registry;
120
0
}
121
122
7
void CompactionSubmitRegistry::reset(const std::vector<DataDir*>& stores) {
123
    // full compaction is not engaged in this method
124
7
    for (const auto& store : stores) {
  Branch (124:28): [True: 0, False: 7]
125
0
        _tablet_submitted_cumu_compaction[store] = {};
126
0
        _tablet_submitted_base_compaction[store] = {};
127
0
    }
128
7
}
129
130
uint32_t CompactionSubmitRegistry::count_executing_compaction(DataDir* dir,
131
2
                                                              CompactionType compaction_type) {
132
    // non-lock, used in snapshot
133
2
    const auto& compaction_tasks = _get_tablet_set(dir, compaction_type);
134
10
    return std::count_if(compaction_tasks.begin(), compaction_tasks.end(), [](const auto& task) {
135
10
        return task->compaction_stage == CompactionStage::EXECUTING;
136
10
    });
137
2
}
138
139
1
uint32_t CompactionSubmitRegistry::count_executing_cumu_and_base(DataDir* dir) {
140
    // non-lock, used in snapshot
141
1
    return count_executing_compaction(dir, CompactionType::BASE_COMPACTION) +
142
1
           count_executing_compaction(dir, CompactionType::CUMULATIVE_COMPACTION);
143
1
}
144
145
0
bool CompactionSubmitRegistry::has_compaction_task(DataDir* dir, CompactionType compaction_type) {
146
    // non-lock, used in snapshot
147
0
    return !_get_tablet_set(dir, compaction_type).empty();
148
0
}
149
150
std::vector<TabletSharedPtr> CompactionSubmitRegistry::pick_topn_tablets_for_compaction(
151
        TabletManager* tablet_mgr, DataDir* data_dir, CompactionType compaction_type,
152
0
        const CumuCompactionPolicyTable& cumu_compaction_policies, uint32_t* disk_max_score) {
153
    // non-lock, used in snapshot
154
0
    return tablet_mgr->find_best_tablets_to_compaction(compaction_type, data_dir,
155
0
                                                       _get_tablet_set(data_dir, compaction_type),
156
0
                                                       disk_max_score, cumu_compaction_policies);
157
0
}
158
159
21
bool CompactionSubmitRegistry::insert(TabletSharedPtr tablet, CompactionType compaction_type) {
160
21
    std::unique_lock<std::mutex> l(_tablet_submitted_compaction_mutex);
161
21
    auto& tablet_set = _get_tablet_set(tablet->data_dir(), compaction_type);
162
21
    bool already_exist = !(tablet_set.insert(tablet).second);
163
21
    return already_exist;
164
21
}
165
166
void CompactionSubmitRegistry::remove(TabletSharedPtr tablet, CompactionType compaction_type,
167
7
                                      std::function<void()> wakeup_cb) {
168
7
    std::unique_lock<std::mutex> l(_tablet_submitted_compaction_mutex);
169
7
    auto& tablet_set = _get_tablet_set(tablet->data_dir(), compaction_type);
170
7
    size_t removed = tablet_set.erase(tablet);
171
7
    if (removed == 1) {
  Branch (171:9): [True: 7, False: 0]
172
7
        wakeup_cb();
173
7
    }
174
7
}
175
176
CompactionSubmitRegistry::TabletSet& CompactionSubmitRegistry::_get_tablet_set(
177
30
        DataDir* dir, CompactionType compaction_type) {
178
30
    switch (compaction_type) {
179
1
    case CompactionType::BASE_COMPACTION:
  Branch (179:5): [True: 1, False: 29]
180
1
        return _tablet_submitted_base_compaction[dir];
181
29
    case CompactionType::CUMULATIVE_COMPACTION:
  Branch (181:5): [True: 29, False: 1]
182
29
        return _tablet_submitted_cumu_compaction[dir];
183
0
    case CompactionType::FULL_COMPACTION:
  Branch (183:5): [True: 0, False: 30]
184
0
        return _tablet_submitted_full_compaction[dir];
185
0
    default:
  Branch (185:5): [True: 0, False: 30]
186
0
        CHECK(false) << "invalid compaction type";
187
30
    }
188
30
}
189
190
0
static int32_t get_cumu_compaction_threads_num(size_t data_dirs_num) {
191
0
    int32_t threads_num = config::max_cumu_compaction_threads;
192
0
    if (threads_num == -1) {
  Branch (192:9): [True: 0, False: 0]
193
0
        int num_cores = doris::CpuInfo::num_cores();
194
0
        threads_num = std::max<size_t>(data_dirs_num, num_cores / 6);
195
0
    }
196
0
    threads_num = threads_num <= 0 ? 1 : threads_num;
  Branch (196:19): [True: 0, False: 0]
197
0
    return threads_num;
198
0
}
199
200
0
static int32_t get_base_compaction_threads_num(size_t data_dirs_num) {
201
0
    int32_t threads_num = config::max_base_compaction_threads;
202
0
    if (threads_num == -1) {
  Branch (202:9): [True: 0, False: 0]
203
0
        threads_num = data_dirs_num;
204
0
    }
205
0
    threads_num = threads_num <= 0 ? 1 : threads_num;
  Branch (205:19): [True: 0, False: 0]
206
0
    return threads_num;
207
0
}
208
209
0
static int32_t get_single_replica_compaction_threads_num(size_t data_dirs_num) {
210
0
    int32_t threads_num = config::max_single_replica_compaction_threads;
211
0
    if (threads_num == -1) {
  Branch (211:9): [True: 0, False: 0]
212
0
        threads_num = data_dirs_num;
213
0
    }
214
0
    threads_num = threads_num <= 0 ? 1 : threads_num;
  Branch (214:19): [True: 0, False: 0]
215
0
    return threads_num;
216
0
}
217
218
0
Status StorageEngine::start_bg_threads() {
219
0
    RETURN_IF_ERROR(Thread::create(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
220
0
            "StorageEngine", "unused_rowset_monitor_thread",
221
0
            [this]() { this->_unused_rowset_monitor_thread_callback(); },
222
0
            &_unused_rowset_monitor_thread));
223
0
    LOG(INFO) << "unused rowset monitor thread started";
224
225
0
    RETURN_IF_ERROR(Thread::create(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
226
0
            "StorageEngine", "evict_querying_rowset_thread",
227
0
            [this]() { this->_evict_quring_rowset_thread_callback(); },
228
0
            &_evict_quering_rowset_thread));
229
0
    LOG(INFO) << "evict quering thread started";
230
231
    // start thread for monitoring the snapshot and trash folder
232
0
    RETURN_IF_ERROR(Thread::create(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
233
0
            "StorageEngine", "garbage_sweeper_thread",
234
0
            [this]() { this->_garbage_sweeper_thread_callback(); }, &_garbage_sweeper_thread));
235
0
    LOG(INFO) << "garbage sweeper thread started";
236
237
    // start thread for monitoring the tablet with io error
238
0
    RETURN_IF_ERROR(Thread::create(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
239
0
            "StorageEngine", "disk_stat_monitor_thread",
240
0
            [this]() { this->_disk_stat_monitor_thread_callback(); }, &_disk_stat_monitor_thread));
241
0
    LOG(INFO) << "disk stat monitor thread started";
242
243
    // convert store map to vector
244
0
    std::vector<DataDir*> data_dirs = get_stores();
245
246
0
    auto base_compaction_threads = get_base_compaction_threads_num(data_dirs.size());
247
0
    auto cumu_compaction_threads = get_cumu_compaction_threads_num(data_dirs.size());
248
0
    auto single_replica_compaction_threads =
249
0
            get_single_replica_compaction_threads_num(data_dirs.size());
250
251
0
    RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
252
0
                            .set_min_threads(base_compaction_threads)
253
0
                            .set_max_threads(base_compaction_threads)
254
0
                            .build(&_base_compaction_thread_pool));
255
0
    RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
256
0
                            .set_min_threads(cumu_compaction_threads)
257
0
                            .set_max_threads(cumu_compaction_threads)
258
0
                            .build(&_cumu_compaction_thread_pool));
259
0
    RETURN_IF_ERROR(ThreadPoolBuilder("SingleReplicaCompactionTaskThreadPool")
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
260
0
                            .set_min_threads(single_replica_compaction_threads)
261
0
                            .set_max_threads(single_replica_compaction_threads)
262
0
                            .build(&_single_replica_compaction_thread_pool));
263
264
0
    if (config::enable_segcompaction) {
  Branch (264:9): [True: 0, False: 0]
265
0
        RETURN_IF_ERROR(ThreadPoolBuilder("SegCompactionTaskThreadPool")
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
266
0
                                .set_min_threads(config::segcompaction_num_threads)
267
0
                                .set_max_threads(config::segcompaction_num_threads)
268
0
                                .build(&_seg_compaction_thread_pool));
269
0
    }
270
0
    RETURN_IF_ERROR(ThreadPoolBuilder("ColdDataCompactionTaskThreadPool")
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
271
0
                            .set_min_threads(config::cold_data_compaction_thread_num)
272
0
                            .set_max_threads(config::cold_data_compaction_thread_num)
273
0
                            .build(&_cold_data_compaction_thread_pool));
274
275
    // compaction tasks producer thread
276
0
    RETURN_IF_ERROR(Thread::create(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
277
0
            "StorageEngine", "compaction_tasks_producer_thread",
278
0
            [this]() { this->_compaction_tasks_producer_callback(); },
279
0
            &_compaction_tasks_producer_thread));
280
0
    LOG(INFO) << "compaction tasks producer thread started";
281
282
0
    RETURN_IF_ERROR(Thread::create(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
283
0
            "StorageEngine", "_update_replica_infos_thread",
284
0
            [this]() { this->_update_replica_infos_callback(); }, &_update_replica_infos_thread));
285
0
    LOG(INFO) << "tablet replicas info update thread started";
286
287
0
    int32_t max_checkpoint_thread_num = config::max_meta_checkpoint_threads;
288
0
    if (max_checkpoint_thread_num < 0) {
  Branch (288:9): [True: 0, False: 0]
289
0
        max_checkpoint_thread_num = data_dirs.size();
290
0
    }
291
0
    RETURN_IF_ERROR(ThreadPoolBuilder("TabletMetaCheckpointTaskThreadPool")
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
292
0
                            .set_max_threads(max_checkpoint_thread_num)
293
0
                            .build(&_tablet_meta_checkpoint_thread_pool));
294
295
0
    RETURN_IF_ERROR(ThreadPoolBuilder("MultiGetTaskThreadPool")
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
296
0
                            .set_min_threads(config::multi_get_max_threads)
297
0
                            .set_max_threads(config::multi_get_max_threads)
298
0
                            .build(&_bg_multi_get_thread_pool));
299
0
    RETURN_IF_ERROR(Thread::create(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
300
0
            "StorageEngine", "tablet_checkpoint_tasks_producer_thread",
301
0
            [this, data_dirs]() { this->_tablet_checkpoint_callback(data_dirs); },
302
0
            &_tablet_checkpoint_tasks_producer_thread));
303
0
    LOG(INFO) << "tablet checkpoint tasks producer thread started";
304
305
0
    RETURN_IF_ERROR(Thread::create(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
306
0
            "StorageEngine", "tablet_path_check_thread",
307
0
            [this]() { this->_tablet_path_check_callback(); }, &_tablet_path_check_thread));
308
0
    LOG(INFO) << "tablet path check thread started";
309
310
    // path scan and gc thread
311
0
    if (config::path_gc_check) {
  Branch (311:9): [True: 0, False: 0]
312
0
        for (auto data_dir : get_stores()) {
  Branch (312:28): [True: 0, False: 0]
313
0
            scoped_refptr<Thread> path_gc_thread;
314
0
            RETURN_IF_ERROR(Thread::create(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
315
0
                    "StorageEngine", "path_gc_thread",
316
0
                    [this, data_dir]() { this->_path_gc_thread_callback(data_dir); },
317
0
                    &path_gc_thread));
318
0
            _path_gc_threads.emplace_back(path_gc_thread);
319
0
        }
320
0
        LOG(INFO) << "path gc threads started. number:" << get_stores().size();
321
0
    }
322
323
0
    RETURN_IF_ERROR(ThreadPoolBuilder("CooldownTaskThreadPool")
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
324
0
                            .set_min_threads(config::cooldown_thread_num)
325
0
                            .set_max_threads(config::cooldown_thread_num)
326
0
                            .build(&_cooldown_thread_pool));
327
0
    LOG(INFO) << "cooldown thread pool started";
328
329
0
    RETURN_IF_ERROR(Thread::create(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
330
0
            "StorageEngine", "cooldown_tasks_producer_thread",
331
0
            [this]() { this->_cooldown_tasks_producer_callback(); },
332
0
            &_cooldown_tasks_producer_thread));
333
0
    LOG(INFO) << "cooldown tasks producer thread started";
334
335
0
    RETURN_IF_ERROR(Thread::create(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
336
0
            "StorageEngine", "remove_unused_remote_files_thread",
337
0
            [this]() { this->_remove_unused_remote_files_callback(); },
338
0
            &_remove_unused_remote_files_thread));
339
0
    LOG(INFO) << "remove unused remote files thread started";
340
341
0
    RETURN_IF_ERROR(Thread::create(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
342
0
            "StorageEngine", "cold_data_compaction_producer_thread",
343
0
            [this]() { this->_cold_data_compaction_producer_callback(); },
344
0
            &_cold_data_compaction_producer_thread));
345
0
    LOG(INFO) << "cold data compaction producer thread started";
346
347
    // add tablet publish version thread pool
348
0
    RETURN_IF_ERROR(ThreadPoolBuilder("TabletPublishTxnThreadPool")
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
349
0
                            .set_min_threads(config::tablet_publish_txn_max_thread)
350
0
                            .set_max_threads(config::tablet_publish_txn_max_thread)
351
0
                            .build(&_tablet_publish_txn_thread_pool));
352
353
0
    RETURN_IF_ERROR(Thread::create(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
354
0
            "StorageEngine", "async_publish_version_thread",
355
0
            [this]() { this->_async_publish_callback(); }, &_async_publish_thread));
356
0
    LOG(INFO) << "async publish thread started";
357
358
0
    RETURN_IF_ERROR(Thread::create(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
359
0
            "StorageEngine", "check_tablet_delete_bitmap_score_thread",
360
0
            [this]() { this->_check_tablet_delete_bitmap_score_callback(); },
361
0
            &_check_delete_bitmap_score_thread));
362
0
    LOG(INFO) << "check tablet delete bitmap score thread started";
363
364
0
    LOG(INFO) << "all storage engine's background threads are started.";
365
0
    return Status::OK();
366
0
}
367
368
0
void StorageEngine::_garbage_sweeper_thread_callback() {
369
0
    uint32_t max_interval = config::max_garbage_sweep_interval;
370
0
    uint32_t min_interval = config::min_garbage_sweep_interval;
371
372
0
    if (max_interval < min_interval || min_interval <= 0) {
  Branch (372:9): [True: 0, False: 0]
  Branch (372:40): [True: 0, False: 0]
373
0
        LOG(WARNING) << "garbage sweep interval config is illegal: [max=" << max_interval
374
0
                     << " min=" << min_interval << "].";
375
0
        min_interval = 1;
376
0
        max_interval = max_interval >= min_interval ? max_interval : min_interval;
  Branch (376:24): [True: 0, False: 0]
377
0
        LOG(INFO) << "force reset garbage sweep interval. "
378
0
                  << "max_interval=" << max_interval << ", min_interval=" << min_interval;
379
0
    }
380
381
0
    const double pi = M_PI;
382
0
    double usage = 1.0;
383
    // After the program starts, the first round of cleaning starts after min_interval.
384
0
    uint32_t curr_interval = min_interval;
385
0
    do {
386
        // Function properties:
387
        // when usage < 0.6,          ratio close to 1.(interval close to max_interval)
388
        // when usage at [0.6, 0.75], ratio is rapidly decreasing from 0.87 to 0.27.
389
        // when usage > 0.75,         ratio is slowly decreasing.
390
        // when usage > 0.8,          ratio close to min_interval.
391
        // when usage = 0.88,         ratio is approximately 0.0057.
392
0
        double ratio = (1.1 * (pi / 2 - std::atan(usage * 100 / 5 - 14)) - 0.28) / pi;
393
0
        ratio = ratio > 0 ? ratio : 0;
  Branch (393:17): [True: 0, False: 0]
394
0
        auto curr_interval = uint32_t(max_interval * ratio);
395
0
        curr_interval = std::max(curr_interval, min_interval);
396
0
        curr_interval = std::min(curr_interval, max_interval);
397
398
        // start clean trash and update usage.
399
0
        Status res = start_trash_sweep(&usage);
400
0
        if (res.ok() && _need_clean_trash.exchange(false, std::memory_order_relaxed)) {
  Branch (400:13): [True: 0, False: 0]
  Branch (400:25): [True: 0, False: 0]
401
0
            res = start_trash_sweep(&usage, true);
402
0
        }
403
404
0
        if (!res.ok()) {
  Branch (404:13): [True: 0, False: 0]
405
0
            LOG(WARNING) << "one or more errors occur when sweep trash."
406
0
                         << "see previous message for detail. err code=" << res;
407
            // do nothing. continue next loop.
408
0
        }
409
0
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(curr_interval)));
  Branch (409:14): [True: 0, False: 0]
410
0
}
411
412
0
void StorageEngine::_disk_stat_monitor_thread_callback() {
413
0
    int32_t interval = config::disk_stat_monitor_interval;
414
0
    do {
415
0
        _start_disk_stat_monitor();
416
417
0
        interval = config::disk_stat_monitor_interval;
418
0
        if (interval <= 0) {
  Branch (418:13): [True: 0, False: 0]
419
0
            LOG(WARNING) << "disk_stat_monitor_interval config is illegal: " << interval
420
0
                         << ", force set to 1";
421
0
            interval = 1;
422
0
        }
423
0
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
  Branch (423:14): [True: 0, False: 0]
424
0
}
425
426
0
void StorageEngine::_unused_rowset_monitor_thread_callback() {
427
0
    int32_t interval = config::unused_rowset_monitor_interval;
428
0
    do {
429
0
        start_delete_unused_rowset();
430
431
0
        interval = config::unused_rowset_monitor_interval;
432
0
        if (interval <= 0) {
  Branch (432:13): [True: 0, False: 0]
433
0
            LOG(WARNING) << "unused_rowset_monitor_interval config is illegal: " << interval
434
0
                         << ", force set to 1";
435
0
            interval = 1;
436
0
        }
437
0
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
  Branch (437:14): [True: 0, False: 0]
438
0
}
439
440
0
int32_t StorageEngine::_auto_get_interval_by_disk_capacity(DataDir* data_dir) {
441
0
    double disk_used = data_dir->get_usage(0);
442
0
    double remain_used = 1 - disk_used;
443
0
    DCHECK(remain_used >= 0 && remain_used <= 1);
444
0
    DCHECK(config::path_gc_check_interval_second >= 0);
445
0
    int32_t ret = 0;
446
0
    if (remain_used > 0.9) {
  Branch (446:9): [True: 0, False: 0]
447
        // if config::path_gc_check_interval_second == 24h
448
0
        ret = config::path_gc_check_interval_second;
449
0
    } else if (remain_used > 0.7) {
  Branch (449:16): [True: 0, False: 0]
450
        // 12h
451
0
        ret = config::path_gc_check_interval_second / 2;
452
0
    } else if (remain_used > 0.5) {
  Branch (452:16): [True: 0, False: 0]
453
        // 6h
454
0
        ret = config::path_gc_check_interval_second / 4;
455
0
    } else if (remain_used > 0.3) {
  Branch (455:16): [True: 0, False: 0]
456
        // 4h
457
0
        ret = config::path_gc_check_interval_second / 6;
458
0
    } else {
459
        // 3h
460
0
        ret = config::path_gc_check_interval_second / 8;
461
0
    }
462
0
    return ret;
463
0
}
464
465
0
void StorageEngine::_path_gc_thread_callback(DataDir* data_dir) {
466
0
    LOG(INFO) << "try to start path gc thread!";
467
0
    int32_t last_exec_time = 0;
468
0
    do {
469
0
        int32_t current_time = time(nullptr);
470
471
0
        int32_t interval = _auto_get_interval_by_disk_capacity(data_dir);
472
0
        DBUG_EXECUTE_IF("_path_gc_thread_callback.interval.eq.1ms", {
Line
Count
Source
37
0
    if (UNLIKELY(config::enable_debug_points)) {                              \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
38
0
        auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
39
0
        if (dp) {                                                             \
  Branch (39:13): [True: 0, False: 0]
40
0
            [[maybe_unused]] auto DP_NAME = debug_point_name;                 \
41
0
            { code; }                                                         \
  Branch (41:15): [True: 0, False: 0]
42
0
        }                                                                     \
43
0
    }
473
0
            LOG(INFO) << "debug point change interval eq 1ms";
474
0
            interval = 1;
475
0
            while (DebugPoints::instance()->is_enable("_path_gc_thread_callback.always.do")) {
476
0
                data_dir->perform_path_gc();
477
0
                std::this_thread::sleep_for(std::chrono::milliseconds(10));
478
0
            }
479
0
        });
480
0
        if (interval <= 0) {
  Branch (480:13): [True: 0, False: 0]
481
0
            LOG(WARNING) << "path gc thread check interval config is illegal:" << interval
482
0
                         << " will be forced set to half hour";
483
0
            interval = 1800; // 0.5 hour
484
0
        }
485
0
        if (current_time - last_exec_time >= interval) {
  Branch (485:13): [True: 0, False: 0]
486
0
            LOG(INFO) << "try to perform path gc! disk remain [" << 1 - data_dir->get_usage(0)
487
0
                      << "] internal [" << interval << "]";
488
0
            data_dir->perform_path_gc();
489
0
            last_exec_time = time(nullptr);
490
0
        }
491
0
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(5)));
  Branch (491:14): [True: 0, False: 0]
492
0
    LOG(INFO) << "stop path gc thread!";
493
0
}
494
495
0
void StorageEngine::_tablet_checkpoint_callback(const std::vector<DataDir*>& data_dirs) {
496
0
    int64_t interval = config::generate_tablet_meta_checkpoint_tasks_interval_secs;
497
0
    do {
498
0
        for (auto data_dir : data_dirs) {
  Branch (498:28): [True: 0, False: 0]
499
0
            LOG(INFO) << "begin to produce tablet meta checkpoint tasks, data_dir="
500
0
                      << data_dir->path();
501
0
            auto st = _tablet_meta_checkpoint_thread_pool->submit_func(
502
0
                    [data_dir, this]() { _tablet_manager->do_tablet_meta_checkpoint(data_dir); });
503
0
            if (!st.ok()) {
  Branch (503:17): [True: 0, False: 0]
504
0
                LOG(WARNING) << "submit tablet checkpoint tasks failed.";
505
0
            }
506
0
        }
507
0
        interval = config::generate_tablet_meta_checkpoint_tasks_interval_secs;
508
0
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
  Branch (508:14): [True: 0, False: 0]
509
0
}
510
511
0
void StorageEngine::_tablet_path_check_callback() {
512
0
    struct TabletIdComparator {
513
0
        bool operator()(Tablet* a, Tablet* b) { return a->tablet_id() < b->tablet_id(); }
514
0
    };
515
516
0
    using TabletQueue = std::priority_queue<Tablet*, std::vector<Tablet*>, TabletIdComparator>;
517
518
0
    int64_t interval = config::tablet_path_check_interval_seconds;
519
0
    if (interval <= 0) {
  Branch (519:9): [True: 0, False: 0]
520
0
        return;
521
0
    }
522
523
0
    int64_t last_tablet_id = 0;
524
0
    do {
525
0
        int32_t batch_size = config::tablet_path_check_batch_size;
526
0
        if (batch_size <= 0) {
  Branch (526:13): [True: 0, False: 0]
527
0
            if (_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))) {
  Branch (527:17): [True: 0, False: 0]
528
0
                break;
529
0
            }
530
0
            continue;
531
0
        }
532
533
0
        LOG(INFO) << "start to check tablet path";
534
535
0
        auto all_tablets = _tablet_manager->get_all_tablet(
536
0
                [](Tablet* t) { return t->is_used() && t->tablet_state() == TABLET_RUNNING; });
  Branch (536:40): [True: 0, False: 0]
  Branch (536:56): [True: 0, False: 0]
537
538
0
        TabletQueue big_id_tablets;
539
0
        TabletQueue small_id_tablets;
540
0
        for (auto tablet : all_tablets) {
  Branch (540:26): [True: 0, False: 0]
541
0
            auto tablet_id = tablet->tablet_id();
542
0
            TabletQueue* belong_tablets = nullptr;
543
0
            if (tablet_id > last_tablet_id) {
  Branch (543:17): [True: 0, False: 0]
544
0
                if (big_id_tablets.size() < batch_size ||
  Branch (544:21): [True: 0, False: 0]
545
0
                    big_id_tablets.top()->tablet_id() > tablet_id) {
  Branch (545:21): [True: 0, False: 0]
546
0
                    belong_tablets = &big_id_tablets;
547
0
                }
548
0
            } else if (big_id_tablets.size() < batch_size) {
  Branch (548:24): [True: 0, False: 0]
549
0
                if (small_id_tablets.size() < batch_size ||
  Branch (549:21): [True: 0, False: 0]
550
0
                    small_id_tablets.top()->tablet_id() > tablet_id) {
  Branch (550:21): [True: 0, False: 0]
551
0
                    belong_tablets = &small_id_tablets;
552
0
                }
553
0
            }
554
0
            if (belong_tablets != nullptr) {
  Branch (554:17): [True: 0, False: 0]
555
0
                belong_tablets->push(tablet.get());
556
0
                if (belong_tablets->size() > batch_size) {
  Branch (556:21): [True: 0, False: 0]
557
0
                    belong_tablets->pop();
558
0
                }
559
0
            }
560
0
        }
561
562
0
        int32_t need_small_id_tablet_size =
563
0
                batch_size - static_cast<int32_t>(big_id_tablets.size());
564
565
0
        if (!big_id_tablets.empty()) {
  Branch (565:13): [True: 0, False: 0]
566
0
            last_tablet_id = big_id_tablets.top()->tablet_id();
567
0
        }
568
0
        while (!big_id_tablets.empty()) {
  Branch (568:16): [True: 0, False: 0]
569
0
            big_id_tablets.top()->check_tablet_path_exists();
570
0
            big_id_tablets.pop();
571
0
        }
572
573
0
        if (!small_id_tablets.empty() && need_small_id_tablet_size > 0) {
  Branch (573:13): [True: 0, False: 0]
  Branch (573:42): [True: 0, False: 0]
574
0
            while (static_cast<int32_t>(small_id_tablets.size()) > need_small_id_tablet_size) {
  Branch (574:20): [True: 0, False: 0]
575
0
                small_id_tablets.pop();
576
0
            }
577
578
0
            last_tablet_id = small_id_tablets.top()->tablet_id();
579
0
            while (!small_id_tablets.empty()) {
  Branch (579:20): [True: 0, False: 0]
580
0
                small_id_tablets.top()->check_tablet_path_exists();
581
0
                small_id_tablets.pop();
582
0
            }
583
0
        }
584
585
0
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
  Branch (585:14): [True: 0, False: 0]
586
0
}
587
588
6
void StorageEngine::_adjust_compaction_thread_num() {
589
6
    TEST_SYNC_POINT_RETURN_WITH_VOID("StorageEngine::_adjust_compaction_thread_num.return_void");
590
0
    auto base_compaction_threads_num = get_base_compaction_threads_num(_store_map.size());
591
0
    if (_base_compaction_thread_pool->max_threads() != base_compaction_threads_num) {
  Branch (591:9): [True: 0, False: 0]
592
0
        int old_max_threads = _base_compaction_thread_pool->max_threads();
593
0
        Status status = _base_compaction_thread_pool->set_max_threads(base_compaction_threads_num);
594
0
        if (status.ok()) {
  Branch (594:13): [True: 0, False: 0]
595
0
            VLOG_NOTICE << "update base compaction thread pool max_threads from " << old_max_threads
Line
Count
Source
42
0
#define VLOG_NOTICE VLOG(3)
596
0
                        << " to " << base_compaction_threads_num;
597
0
        }
598
0
    }
599
0
    if (_base_compaction_thread_pool->min_threads() != base_compaction_threads_num) {
  Branch (599:9): [True: 0, False: 0]
600
0
        int old_min_threads = _base_compaction_thread_pool->min_threads();
601
0
        Status status = _base_compaction_thread_pool->set_min_threads(base_compaction_threads_num);
602
0
        if (status.ok()) {
  Branch (602:13): [True: 0, False: 0]
603
0
            VLOG_NOTICE << "update base compaction thread pool min_threads from " << old_min_threads
Line
Count
Source
42
0
#define VLOG_NOTICE VLOG(3)
604
0
                        << " to " << base_compaction_threads_num;
605
0
        }
606
0
    }
607
608
0
    auto cumu_compaction_threads_num = get_cumu_compaction_threads_num(_store_map.size());
609
0
    if (_cumu_compaction_thread_pool->max_threads() != cumu_compaction_threads_num) {
  Branch (609:9): [True: 0, False: 0]
610
0
        int old_max_threads = _cumu_compaction_thread_pool->max_threads();
611
0
        Status status = _cumu_compaction_thread_pool->set_max_threads(cumu_compaction_threads_num);
612
0
        if (status.ok()) {
  Branch (612:13): [True: 0, False: 0]
613
0
            VLOG_NOTICE << "update cumu compaction thread pool max_threads from " << old_max_threads
Line
Count
Source
42
0
#define VLOG_NOTICE VLOG(3)
614
0
                        << " to " << cumu_compaction_threads_num;
615
0
        }
616
0
    }
617
0
    if (_cumu_compaction_thread_pool->min_threads() != cumu_compaction_threads_num) {
  Branch (617:9): [True: 0, False: 0]
618
0
        int old_min_threads = _cumu_compaction_thread_pool->min_threads();
619
0
        Status status = _cumu_compaction_thread_pool->set_min_threads(cumu_compaction_threads_num);
620
0
        if (status.ok()) {
  Branch (620:13): [True: 0, False: 0]
621
0
            VLOG_NOTICE << "update cumu compaction thread pool min_threads from " << old_min_threads
Line
Count
Source
42
0
#define VLOG_NOTICE VLOG(3)
622
0
                        << " to " << cumu_compaction_threads_num;
623
0
        }
624
0
    }
625
626
0
    auto single_replica_compaction_threads_num =
627
0
            get_single_replica_compaction_threads_num(_store_map.size());
628
0
    if (_single_replica_compaction_thread_pool->max_threads() !=
  Branch (628:9): [True: 0, False: 0]
629
0
        single_replica_compaction_threads_num) {
630
0
        int old_max_threads = _single_replica_compaction_thread_pool->max_threads();
631
0
        Status status = _single_replica_compaction_thread_pool->set_max_threads(
632
0
                single_replica_compaction_threads_num);
633
0
        if (status.ok()) {
  Branch (633:13): [True: 0, False: 0]
634
0
            VLOG_NOTICE << "update single replica compaction thread pool max_threads from "
Line
Count
Source
42
0
#define VLOG_NOTICE VLOG(3)
635
0
                        << old_max_threads << " to " << single_replica_compaction_threads_num;
636
0
        }
637
0
    }
638
0
    if (_single_replica_compaction_thread_pool->min_threads() !=
  Branch (638:9): [True: 0, False: 0]
639
0
        single_replica_compaction_threads_num) {
640
0
        int old_min_threads = _single_replica_compaction_thread_pool->min_threads();
641
0
        Status status = _single_replica_compaction_thread_pool->set_min_threads(
642
0
                single_replica_compaction_threads_num);
643
0
        if (status.ok()) {
  Branch (643:13): [True: 0, False: 0]
644
0
            VLOG_NOTICE << "update single replica compaction thread pool min_threads from "
Line
Count
Source
42
0
#define VLOG_NOTICE VLOG(3)
645
0
                        << old_min_threads << " to " << single_replica_compaction_threads_num;
646
0
        }
647
0
    }
648
0
}
649
650
7
void StorageEngine::_compaction_tasks_producer_callback() {
651
7
    LOG(INFO) << "try to start compaction producer process!";
652
653
7
    std::vector<DataDir*> data_dirs = get_stores();
654
7
    _compaction_submit_registry.reset(data_dirs);
655
656
7
    int round = 0;
657
7
    CompactionType compaction_type;
658
659
    // Used to record the time when the score metric was last updated.
660
    // The update of the score metric is accompanied by the logic of selecting the tablet.
661
    // If there is no slot available, the logic of selecting the tablet will be terminated,
662
    // which causes the score metric update to be terminated.
663
    // In order to avoid this situation, we need to update the score regularly.
664
7
    int64_t last_cumulative_score_update_time = 0;
665
7
    int64_t last_base_score_update_time = 0;
666
7
    static const int64_t check_score_interval_ms = 5000; // 5 secs
667
668
7
    int64_t interval = config::generate_compaction_tasks_interval_ms;
669
7
    do {
670
7
        int64_t cur_time = UnixMillis();
671
7
        if (!config::disable_auto_compaction &&
  Branch (671:13): [True: 6, False: 1]
672
7
            (!config::enable_compaction_pause_on_high_memory ||
  Branch (672:14): [True: 0, False: 6]
673
6
             !GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE))) {
  Branch (673:14): [True: 6, False: 0]
674
6
            _adjust_compaction_thread_num();
675
676
6
            bool check_score = false;
677
6
            int64_t cur_time = UnixMillis();
678
6
            if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) {
  Branch (678:17): [True: 6, False: 0]
679
6
                compaction_type = CompactionType::CUMULATIVE_COMPACTION;
680
6
                round++;
681
6
                if (cur_time - last_cumulative_score_update_time >= check_score_interval_ms) {
  Branch (681:21): [True: 6, False: 0]
682
6
                    check_score = true;
683
6
                    last_cumulative_score_update_time = cur_time;
684
6
                }
685
6
            } else {
686
0
                compaction_type = CompactionType::BASE_COMPACTION;
687
0
                round = 0;
688
0
                if (cur_time - last_base_score_update_time >= check_score_interval_ms) {
  Branch (688:21): [True: 0, False: 0]
689
0
                    check_score = true;
690
0
                    last_base_score_update_time = cur_time;
691
0
                }
692
0
            }
693
6
            std::unique_ptr<ThreadPool>& thread_pool =
694
6
                    (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
  Branch (694:21): [True: 6, False: 0]
695
6
                            ? _cumu_compaction_thread_pool
696
6
                            : _base_compaction_thread_pool;
697
6
            bvar::Status<int64_t>& g_compaction_task_num_per_round =
698
6
                    (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
  Branch (698:21): [True: 6, False: 0]
699
6
                            ? g_cumu_compaction_task_num_per_round
700
6
                            : g_base_compaction_task_num_per_round;
701
6
            if (config::compaction_num_per_round != -1) {
  Branch (701:17): [True: 0, False: 6]
702
0
                _compaction_num_per_round = config::compaction_num_per_round;
703
6
            } else if (thread_pool->get_queue_size() == 0) {
  Branch (703:24): [True: 2, False: 4]
704
                // If all tasks in the thread pool queue are executed,
705
                // double the number of tasks generated each time,
706
                // with a maximum of config::max_automatic_compaction_num_per_round tasks per generation.
707
2
                if (_compaction_num_per_round < config::max_automatic_compaction_num_per_round) {
  Branch (707:21): [True: 1, False: 1]
708
1
                    _compaction_num_per_round *= 2;
709
1
                    g_compaction_task_num_per_round.set_value(_compaction_num_per_round);
710
1
                }
711
4
            } else if (thread_pool->get_queue_size() > _compaction_num_per_round / 2) {
  Branch (711:24): [True: 3, False: 1]
712
                // If all tasks in the thread pool is greater than
713
                // half of the tasks submitted in the previous round,
714
                // reduce the number of tasks generated each time by half, with a minimum of 1.
715
3
                if (_compaction_num_per_round > 1) {
  Branch (715:21): [True: 1, False: 2]
716
1
                    _compaction_num_per_round /= 2;
717
1
                    g_compaction_task_num_per_round.set_value(_compaction_num_per_round);
718
1
                }
719
3
            }
720
6
            std::vector<TabletSharedPtr> tablets_compaction =
721
6
                    _generate_compaction_tasks(compaction_type, data_dirs, check_score);
722
6
            if (tablets_compaction.size() == 0) {
  Branch (722:17): [True: 6, False: 0]
723
6
                std::unique_lock<std::mutex> lock(_compaction_producer_sleep_mutex);
724
6
                _wakeup_producer_flag = 0;
725
                // It is necessary to wake up the thread on timeout to prevent deadlock
726
                // in case of no running compaction task.
727
6
                _compaction_producer_sleep_cv.wait_for(
728
6
                        lock, std::chrono::milliseconds(2000),
729
12
                        [this] { return _wakeup_producer_flag == 1; });
730
6
                continue;
731
6
            }
732
733
0
            for (const auto& tablet : tablets_compaction) {
  Branch (733:37): [True: 0, False: 0]
734
0
                if (compaction_type == CompactionType::BASE_COMPACTION) {
  Branch (734:21): [True: 0, False: 0]
735
0
                    tablet->set_last_base_compaction_schedule_time(UnixMillis());
736
0
                } else if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
  Branch (736:28): [True: 0, False: 0]
737
0
                    tablet->set_last_cumu_compaction_schedule_time(UnixMillis());
738
0
                } else if (compaction_type == CompactionType::FULL_COMPACTION) {
  Branch (738:28): [True: 0, False: 0]
739
0
                    tablet->set_last_full_compaction_schedule_time(UnixMillis());
740
0
                }
741
0
                Status st = _submit_compaction_task(tablet, compaction_type, false);
742
0
                if (!st.ok()) {
  Branch (742:21): [True: 0, False: 0]
743
0
                    LOG(WARNING) << "failed to submit compaction task for tablet: "
744
0
                                 << tablet->tablet_id() << ", err: " << st;
745
0
                }
746
0
            }
747
0
            interval = config::generate_compaction_tasks_interval_ms;
748
1
        } else {
749
1
            interval = 5000; // 5s to check disable_auto_compaction
750
1
        }
751
752
        // wait some seconds for ut test
753
1
        {
754
1
            std ::vector<std ::any> args {};
755
1
            args.emplace_back(1);
756
1
            doris ::SyncPoint ::get_instance()->process(
757
1
                    "StorageEngine::_compaction_tasks_producer_callback", std ::move(args));
758
1
        }
759
1
        int64_t end_time = UnixMillis();
760
1
        DorisMetrics::instance()->compaction_producer_callback_a_round_time->set_value(end_time -
761
1
                                                                                       cur_time);
762
7
    } while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval)));
  Branch (762:14): [True: 0, False: 7]
763
7
}
764
765
0
void StorageEngine::_update_replica_infos_callback() {
766
#ifdef GOOGLE_PROFILER
767
    ProfilerRegisterThread();
768
#endif
769
0
    LOG(INFO) << "start to update replica infos!";
770
771
0
    int64_t interval = config::update_replica_infos_interval_seconds;
772
0
    do {
773
0
        auto all_tablets = _tablet_manager->get_all_tablet([](Tablet* t) {
774
0
            return t->is_used() && t->tablet_state() == TABLET_RUNNING &&
  Branch (774:20): [True: 0, False: 0]
  Branch (774:36): [True: 0, False: 0]
775
0
                   !t->tablet_meta()->tablet_schema()->disable_auto_compaction() &&
  Branch (775:20): [True: 0, False: 0]
776
0
                   t->tablet_meta()->tablet_schema()->enable_single_replica_compaction();
  Branch (776:20): [True: 0, False: 0]
777
0
        });
778
0
        ClusterInfo* cluster_info = ExecEnv::GetInstance()->cluster_info();
779
0
        if (cluster_info == nullptr) {
  Branch (779:13): [True: 0, False: 0]
780
0
            LOG(WARNING) << "Have not get FE Master heartbeat yet";
781
0
            std::this_thread::sleep_for(std::chrono::seconds(2));
782
0
            continue;
783
0
        }
784
0
        TNetworkAddress master_addr = cluster_info->master_fe_addr;
785
0
        if (master_addr.hostname == "" || master_addr.port == 0) {
  Branch (785:13): [True: 0, False: 0]
  Branch (785:43): [True: 0, False: 0]
786
0
            LOG(WARNING) << "Have not get FE Master heartbeat yet";
787
0
            std::this_thread::sleep_for(std::chrono::seconds(2));
788
0
            continue;
789
0
        }
790
791
0
        int start = 0;
792
0
        int tablet_size = all_tablets.size();
793
        // The while loop may take a long time, we should skip it when stop
794
0
        while (start < tablet_size && _stop_background_threads_latch.count() > 0) {
  Branch (794:16): [True: 0, False: 0]
  Branch (794:39): [True: 0, False: 0]
795
0
            int batch_size = std::min(100, tablet_size - start);
796
0
            int end = start + batch_size;
797
0
            TGetTabletReplicaInfosRequest request;
798
0
            TGetTabletReplicaInfosResult result;
799
0
            for (int i = start; i < end; i++) {
  Branch (799:33): [True: 0, False: 0]
800
0
                request.tablet_ids.emplace_back(all_tablets[i]->tablet_id());
801
0
            }
802
0
            Status rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>(
803
0
                    master_addr.hostname, master_addr.port,
804
0
                    [&request, &result](FrontendServiceConnection& client) {
805
0
                        client->getTabletReplicaInfos(result, request);
806
0
                    });
807
808
0
            if (!rpc_st.ok()) {
  Branch (808:17): [True: 0, False: 0]
809
0
                LOG(WARNING) << "Failed to get tablet replica infos, encounter rpc failure, "
810
0
                                "tablet start: "
811
0
                             << start << " end: " << end;
812
0
                continue;
813
0
            }
814
815
0
            std::unique_lock<std::mutex> lock(_peer_replica_infos_mutex);
816
0
            for (const auto& it : result.tablet_replica_infos) {
  Branch (816:33): [True: 0, False: 0]
817
0
                auto tablet_id = it.first;
818
0
                auto tablet = _tablet_manager->get_tablet(tablet_id);
819
0
                if (tablet == nullptr) {
  Branch (819:21): [True: 0, False: 0]
820
0
                    VLOG_CRITICAL << "tablet ptr is nullptr";
Line
Count
Source
43
0
#define VLOG_CRITICAL VLOG(1)
821
0
                    continue;
822
0
                }
823
824
0
                VLOG_NOTICE << tablet_id << " tablet has " << it.second.size() << " replicas";
Line
Count
Source
42
0
#define VLOG_NOTICE VLOG(3)
825
0
                uint64_t min_modulo = MOD_PRIME;
826
0
                TReplicaInfo peer_replica;
827
0
                for (const auto& replica : it.second) {
  Branch (827:42): [True: 0, False: 0]
828
0
                    int64_t peer_replica_id = replica.replica_id;
829
0
                    uint64_t modulo = HashUtil::hash64(&peer_replica_id, sizeof(peer_replica_id),
830
0
                                                       DEFAULT_SEED) %
831
0
                                      MOD_PRIME;
832
0
                    if (modulo < min_modulo) {
  Branch (832:25): [True: 0, False: 0]
833
0
                        peer_replica = replica;
834
0
                        min_modulo = modulo;
835
0
                    }
836
0
                }
837
0
                VLOG_NOTICE << "tablet " << tablet_id << ", peer replica host is "
Line
Count
Source
42
0
#define VLOG_NOTICE VLOG(3)
838
0
                            << peer_replica.host;
839
0
                _peer_replica_infos[tablet_id] = peer_replica;
840
0
            }
841
0
            _token = result.token;
842
0
            VLOG_NOTICE << "get tablet replica infos from fe, size is " << end - start
Line
Count
Source
42
0
#define VLOG_NOTICE VLOG(3)
843
0
                        << " token = " << result.token;
844
0
            start = end;
845
0
        }
846
0
        interval = config::update_replica_infos_interval_seconds;
847
0
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
  Branch (847:14): [True: 0, False: 0]
848
0
}
849
850
Status StorageEngine::_submit_single_replica_compaction_task(TabletSharedPtr tablet,
851
0
                                                             CompactionType compaction_type) {
852
    // For single replica compaction, the local version to be merged is determined based on the version fetched from the peer replica.
853
    // Therefore, it is currently not possible to determine whether it should be a base compaction or cumulative compaction.
854
    // As a result, the tablet needs to be pushed to both the _tablet_submitted_cumu_compaction and the _tablet_submitted_base_compaction simultaneously.
855
0
    bool already_exist =
856
0
            _compaction_submit_registry.insert(tablet, CompactionType::CUMULATIVE_COMPACTION);
857
0
    if (already_exist) {
  Branch (857:9): [True: 0, False: 0]
858
0
        return Status::AlreadyExist<false>(
859
0
                "compaction task has already been submitted, tablet_id={}", tablet->tablet_id());
860
0
    }
861
862
0
    already_exist = _compaction_submit_registry.insert(tablet, CompactionType::BASE_COMPACTION);
863
0
    if (already_exist) {
  Branch (863:9): [True: 0, False: 0]
864
0
        _pop_tablet_from_submitted_compaction(tablet, CompactionType::CUMULATIVE_COMPACTION);
865
0
        return Status::AlreadyExist<false>(
866
0
                "compaction task has already been submitted, tablet_id={}", tablet->tablet_id());
867
0
    }
868
869
0
    auto compaction = std::make_shared<SingleReplicaCompaction>(*this, tablet, compaction_type);
870
0
    DorisMetrics::instance()->single_compaction_request_total->increment(1);
871
0
    auto st = compaction->prepare_compact();
872
873
0
    auto clean_single_replica_compaction = [tablet, this]() {
874
0
        _pop_tablet_from_submitted_compaction(tablet, CompactionType::CUMULATIVE_COMPACTION);
875
0
        _pop_tablet_from_submitted_compaction(tablet, CompactionType::BASE_COMPACTION);
876
0
    };
877
878
0
    if (!st.ok()) {
  Branch (878:9): [True: 0, False: 0]
879
0
        clean_single_replica_compaction();
880
0
        if (!st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) {
  Branch (880:13): [True: 0, False: 0]
881
0
            LOG(WARNING) << "failed to prepare single replica compaction, tablet_id="
882
0
                         << tablet->tablet_id() << " : " << st;
883
0
            return st;
884
0
        }
885
0
        return Status::OK(); // No suitable version, regard as OK
886
0
    }
887
888
0
    auto submit_st = _single_replica_compaction_thread_pool->submit_func(
889
0
            [tablet, compaction = std::move(compaction),
890
0
             clean_single_replica_compaction]() mutable {
891
0
                tablet->execute_single_replica_compaction(*compaction);
892
0
                clean_single_replica_compaction();
893
0
            });
894
0
    if (!submit_st.ok()) {
  Branch (894:9): [True: 0, False: 0]
895
0
        clean_single_replica_compaction();
896
0
        return Status::InternalError(
897
0
                "failed to submit single replica compaction task to thread pool, "
898
0
                "tablet_id={}",
899
0
                tablet->tablet_id());
900
0
    }
901
0
    return Status::OK();
902
0
}
903
904
void StorageEngine::get_tablet_rowset_versions(const PGetTabletVersionsRequest* request,
905
0
                                               PGetTabletVersionsResponse* response) {
906
0
    TabletSharedPtr tablet = _tablet_manager->get_tablet(request->tablet_id());
907
0
    if (tablet == nullptr) {
  Branch (907:9): [True: 0, False: 0]
908
0
        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
909
0
        return;
910
0
    }
911
0
    std::vector<Version> local_versions = tablet->get_all_local_versions();
912
0
    for (const auto& local_version : local_versions) {
  Branch (912:36): [True: 0, False: 0]
913
0
        auto version = response->add_versions();
914
0
        version->set_first(local_version.first);
915
0
        version->set_second(local_version.second);
916
0
    }
917
0
    response->mutable_status()->set_status_code(0);
918
0
}
919
920
bool need_generate_compaction_tasks(int task_cnt_per_disk, int thread_per_disk,
921
0
                                    CompactionType compaction_type, bool all_base) {
922
    // We need to reserve at least one Slot for cumulative compaction.
923
    // So when there is only one Slot, we have to judge whether there is a cumulative compaction
924
    // in the current submitted tasks.
925
    // If so, the last Slot can be assigned to Base compaction,
926
    // otherwise, this Slot needs to be reserved for cumulative compaction.
927
0
    if (task_cnt_per_disk >= thread_per_disk) {
  Branch (927:9): [True: 0, False: 0]
928
        // Return if no available slot
929
0
        return false;
930
0
    } else if (task_cnt_per_disk >= thread_per_disk - 1) {
  Branch (930:16): [True: 0, False: 0]
931
        // Only one slot left, check if it can be assigned to base compaction task.
932
0
        if (compaction_type == CompactionType::BASE_COMPACTION) {
  Branch (932:13): [True: 0, False: 0]
933
0
            if (all_base) {
  Branch (933:17): [True: 0, False: 0]
934
0
                return false;
935
0
            }
936
0
        }
937
0
    }
938
0
    return true;
939
0
}
940
941
0
int get_concurrent_per_disk(int max_score, int thread_per_disk) {
942
0
    if (!config::enable_compaction_priority_scheduling) {
  Branch (942:9): [True: 0, False: 0]
943
0
        return thread_per_disk;
944
0
    }
945
946
0
    double load_average = 0;
947
0
    if (DorisMetrics::instance()->system_metrics() != nullptr) {
  Branch (947:9): [True: 0, False: 0]
948
0
        load_average = DorisMetrics::instance()->system_metrics()->get_load_average_1_min();
949
0
    }
950
0
    int num_cores = doris::CpuInfo::num_cores();
951
0
    bool cpu_usage_high = load_average > num_cores * 0.8;
952
953
0
    auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage();
954
0
    bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() * 0.8;
955
956
0
    if (max_score <= config::low_priority_compaction_score_threshold &&
  Branch (956:9): [True: 0, False: 0]
957
0
        (cpu_usage_high || memory_usage_high)) {
  Branch (957:10): [True: 0, False: 0]
  Branch (957:28): [True: 0, False: 0]
958
0
        return config::low_priority_compaction_task_num_per_disk;
959
0
    }
960
961
0
    return thread_per_disk;
962
0
}
963
964
0
int32_t disk_compaction_slot_num(const DataDir& data_dir) {
965
0
    return data_dir.is_ssd_disk() ? config::compaction_task_num_per_fast_disk
  Branch (965:12): [True: 0, False: 0]
966
0
                                  : config::compaction_task_num_per_disk;
967
0
}
968
969
bool has_free_compaction_slot(CompactionSubmitRegistry* registry, DataDir* dir,
970
0
                              CompactionType compaction_type, uint32_t executing_cnt) {
971
0
    int32_t thread_per_disk = disk_compaction_slot_num(*dir);
972
0
    return need_generate_compaction_tasks(
973
0
            executing_cnt, thread_per_disk, compaction_type,
974
0
            !registry->has_compaction_task(dir, CompactionType::CUMULATIVE_COMPACTION));
975
0
}
976
977
std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks(
978
6
        CompactionType compaction_type, std::vector<DataDir*>& data_dirs, bool check_score) {
979
6
    TEST_SYNC_POINT_RETURN_WITH_VALUE("olap_server::_generate_compaction_tasks.return_empty",
980
0
                                      std::vector<TabletSharedPtr> {});
981
0
    _update_cumulative_compaction_policy();
982
0
    std::vector<TabletSharedPtr> tablets_compaction;
983
0
    uint32_t max_compaction_score = 0;
984
985
0
    std::random_device rd;
986
0
    std::mt19937 g(rd());
987
0
    std::shuffle(data_dirs.begin(), data_dirs.end(), g);
988
989
    // Copy _tablet_submitted_xxx_compaction map so that we don't need to hold _tablet_submitted_compaction_mutex
990
    // when traversing the data dir
991
0
    auto compaction_registry_snapshot = _compaction_submit_registry.create_snapshot();
992
0
    for (auto* data_dir : data_dirs) {
  Branch (992:25): [True: 0, False: 0]
993
0
        bool need_pick_tablet = true;
994
0
        uint32_t executing_task_num =
995
0
                compaction_registry_snapshot.count_executing_cumu_and_base(data_dir);
996
0
        need_pick_tablet = has_free_compaction_slot(&compaction_registry_snapshot, data_dir,
997
0
                                                    compaction_type, executing_task_num);
998
0
        if (!need_pick_tablet && !check_score) {
  Branch (998:13): [True: 0, False: 0]
  Branch (998:34): [True: 0, False: 0]
999
0
            continue;
1000
0
        }
1001
1002
        // Even if need_pick_tablet is false, we still need to call find_best_tablet_to_compaction(),
1003
        // So that we can update the max_compaction_score metric.
1004
0
        if (!data_dir->reach_capacity_limit(0)) {
  Branch (1004:13): [True: 0, False: 0]
1005
0
            uint32_t disk_max_score = 0;
1006
0
            auto tablets = compaction_registry_snapshot.pick_topn_tablets_for_compaction(
1007
0
                    _tablet_manager.get(), data_dir, compaction_type,
1008
0
                    _cumulative_compaction_policies, &disk_max_score);
1009
0
            int concurrent_num =
1010
0
                    get_concurrent_per_disk(disk_max_score, disk_compaction_slot_num(*data_dir));
1011
0
            need_pick_tablet = need_generate_compaction_tasks(
1012
0
                    executing_task_num, concurrent_num, compaction_type,
1013
0
                    !compaction_registry_snapshot.has_compaction_task(
1014
0
                            data_dir, CompactionType::CUMULATIVE_COMPACTION));
1015
0
            for (const auto& tablet : tablets) {
  Branch (1015:37): [True: 0, False: 0]
1016
0
                if (tablet != nullptr) {
  Branch (1016:21): [True: 0, False: 0]
1017
0
                    if (need_pick_tablet) {
  Branch (1017:25): [True: 0, False: 0]
1018
0
                        tablets_compaction.emplace_back(tablet);
1019
0
                    }
1020
0
                    max_compaction_score = std::max(max_compaction_score, disk_max_score);
1021
0
                }
1022
0
            }
1023
0
        }
1024
0
    }
1025
1026
0
    if (max_compaction_score > 0) {
  Branch (1026:9): [True: 0, False: 0]
1027
0
        if (compaction_type == CompactionType::BASE_COMPACTION) {
  Branch (1027:13): [True: 0, False: 0]
1028
0
            DorisMetrics::instance()->tablet_base_max_compaction_score->set_value(
1029
0
                    max_compaction_score);
1030
0
        } else {
1031
0
            DorisMetrics::instance()->tablet_cumulative_max_compaction_score->set_value(
1032
0
                    max_compaction_score);
1033
0
        }
1034
0
    }
1035
0
    return tablets_compaction;
1036
6
}
1037
1038
0
void StorageEngine::_update_cumulative_compaction_policy() {
1039
0
    if (_cumulative_compaction_policies.empty()) {
  Branch (1039:9): [True: 0, False: 0]
1040
0
        _cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] =
1041
0
                CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
1042
0
                        CUMULATIVE_SIZE_BASED_POLICY);
1043
0
        _cumulative_compaction_policies[CUMULATIVE_TIME_SERIES_POLICY] =
1044
0
                CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
1045
0
                        CUMULATIVE_TIME_SERIES_POLICY);
1046
0
    }
1047
0
}
1048
1049
void StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet,
1050
7
                                                          CompactionType compaction_type) {
1051
7
    _compaction_submit_registry.remove(tablet, compaction_type, [this]() {
1052
7
        std::unique_lock<std::mutex> lock(_compaction_producer_sleep_mutex);
1053
7
        _wakeup_producer_flag = 1;
1054
7
        _compaction_producer_sleep_cv.notify_one();
1055
7
    });
1056
7
}
1057
1058
Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
1059
21
                                              CompactionType compaction_type, bool force) {
1060
21
    if (tablet->tablet_meta()->tablet_schema()->enable_single_replica_compaction() &&
  Branch (1060:9): [True: 0, False: 21]
1061
21
        should_fetch_from_peer(tablet->tablet_id())) {
  Branch (1061:9): [True: 0, False: 0]
1062
0
        VLOG_CRITICAL << "start to submit single replica compaction task for tablet: "
Line
Count
Source
43
0
#define VLOG_CRITICAL VLOG(1)
1063
0
                      << tablet->tablet_id();
1064
0
        Status st = _submit_single_replica_compaction_task(tablet, compaction_type);
1065
0
        if (!st.ok()) {
  Branch (1065:13): [True: 0, False: 0]
1066
0
            LOG(WARNING) << "failed to submit single replica compaction task for tablet: "
1067
0
                         << tablet->tablet_id() << ", err: " << st;
1068
0
        }
1069
1070
0
        return Status::OK();
1071
0
    }
1072
21
    bool already_exist = _compaction_submit_registry.insert(tablet, compaction_type);
1073
21
    if (already_exist) {
  Branch (1073:9): [True: 0, False: 21]
1074
0
        return Status::AlreadyExist<false>(
1075
0
                "compaction task has already been submitted, tablet_id={}, compaction_type={}.",
1076
0
                tablet->tablet_id(), compaction_type);
1077
0
    }
1078
21
    tablet->compaction_stage = CompactionStage::PENDING;
1079
21
    std::shared_ptr<CompactionMixin> compaction;
1080
21
    int64_t permits = 0;
1081
21
    Status st = Tablet::prepare_compaction_and_calculate_permits(compaction_type, tablet,
1082
21
                                                                 compaction, permits);
1083
21
    if (st.ok() && permits > 0) {
  Branch (1083:9): [True: 21, False: 0]
  Branch (1083:20): [True: 21, False: 0]
1084
21
        if (!force) {
  Branch (1084:13): [True: 21, False: 0]
1085
21
            _permit_limiter.request(permits);
1086
21
        }
1087
21
        std::unique_ptr<ThreadPool>& thread_pool =
1088
21
                (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
  Branch (1088:17): [True: 21, False: 0]
1089
21
                        ? _cumu_compaction_thread_pool
1090
21
                        : _base_compaction_thread_pool;
1091
21
        VLOG_CRITICAL << "compaction thread pool. type: "
Line
Count
Source
43
0
#define VLOG_CRITICAL VLOG(1)
1092
0
                      << (compaction_type == CompactionType::CUMULATIVE_COMPACTION ? "CUMU"
  Branch (1092:27): [True: 0, False: 0]
1093
0
                                                                                   : "BASE")
1094
0
                      << ", num_threads: " << thread_pool->num_threads()
1095
0
                      << ", num_threads_pending_start: " << thread_pool->num_threads_pending_start()
1096
0
                      << ", num_active_threads: " << thread_pool->num_active_threads()
1097
0
                      << ", max_threads: " << thread_pool->max_threads()
1098
0
                      << ", min_threads: " << thread_pool->min_threads()
1099
0
                      << ", num_total_queued_tasks: " << thread_pool->get_queue_size();
1100
21
        auto st = thread_pool->submit_func([tablet, compaction = std::move(compaction),
1101
21
                                            compaction_type, permits, force, this]() {
1102
7
            if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) [[likely]] {
  Branch (1102:17): [True: 7, False: 0]
1103
7
                DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(1);
1104
7
                DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
1105
7
                        _cumu_compaction_thread_pool->get_queue_size());
1106
7
            } else if (compaction_type == CompactionType::BASE_COMPACTION) {
  Branch (1106:24): [True: 0, False: 0]
1107
0
                DorisMetrics::instance()->base_compaction_task_running_total->increment(1);
1108
0
                DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
1109
0
                        _base_compaction_thread_pool->get_queue_size());
1110
0
            }
1111
7
            bool is_large_task = true;
1112
7
            Defer defer {[&]() {
1113
7
                DBUG_EXECUTE_IF("StorageEngine._submit_compaction_task.sleep", { sleep(5); })
Line
Count
Source
37
7
    if (UNLIKELY(config::enable_debug_points)) {                              \
Line
Count
Source
36
7
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 7]
38
0
        auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
39
0
        if (dp) {                                                             \
  Branch (39:13): [True: 0, False: 0]
40
0
            [[maybe_unused]] auto DP_NAME = debug_point_name;                 \
41
0
            { code; }                                                         \
42
0
        }                                                                     \
43
0
    }
1114
7
                if (!force) {
  Branch (1114:21): [True: 7, False: 0]
1115
7
                    _permit_limiter.release(permits);
1116
7
                }
1117
7
                _pop_tablet_from_submitted_compaction(tablet, compaction_type);
1118
7
                tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
1119
7
                if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
  Branch (1119:21): [True: 7, False: 0]
1120
7
                    std::lock_guard<std::mutex> lock(_cumu_compaction_delay_mtx);
1121
7
                    _cumu_compaction_thread_pool_used_threads--;
1122
7
                    if (!is_large_task) {
  Branch (1122:25): [True: 0, False: 7]
1123
0
                        _cumu_compaction_thread_pool_small_tasks_running--;
1124
0
                    }
1125
7
                    DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(
1126
7
                            -1);
1127
7
                    DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
1128
7
                            _cumu_compaction_thread_pool->get_queue_size());
1129
7
                } else if (compaction_type == CompactionType::BASE_COMPACTION) {
  Branch (1129:28): [True: 0, False: 0]
1130
0
                    DorisMetrics::instance()->base_compaction_task_running_total->increment(-1);
1131
0
                    DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
1132
0
                            _base_compaction_thread_pool->get_queue_size());
1133
0
                }
1134
7
            }};
1135
7
            do {
1136
7
                if (compaction->compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) {
  Branch (1136:21): [True: 7, False: 0]
1137
7
                    std::lock_guard<std::mutex> lock(_cumu_compaction_delay_mtx);
1138
7
                    _cumu_compaction_thread_pool_used_threads++;
1139
7
                    if (config::large_cumu_compaction_task_min_thread_num > 1 &&
  Branch (1139:25): [True: 0, False: 7]
1140
7
                        _cumu_compaction_thread_pool->max_threads() >=
  Branch (1140:25): [True: 0, False: 0]
1141
0
                                config::large_cumu_compaction_task_min_thread_num) {
1142
                        // Determine if this is a large task based on configured thresholds
1143
0
                        is_large_task =
1144
0
                                (compaction->calc_input_rowsets_total_size() >
  Branch (1144:34): [True: 0, False: 0]
1145
0
                                         config::large_cumu_compaction_task_bytes_threshold ||
1146
0
                                 compaction->calc_input_rowsets_row_num() >
  Branch (1146:34): [True: 0, False: 0]
1147
0
                                         config::large_cumu_compaction_task_row_num_threshold);
1148
1149
                        // Small task. No delay needed
1150
0
                        if (!is_large_task) {
  Branch (1150:29): [True: 0, False: 0]
1151
0
                            _cumu_compaction_thread_pool_small_tasks_running++;
1152
0
                            break;
1153
0
                        }
1154
                        // Deal with large task
1155
0
                        if (_should_delay_large_task()) {
  Branch (1155:29): [True: 0, False: 0]
1156
0
                            LOG_WARNING(
Line
Count
Source
119
0
#define LOG_WARNING TaggableLogger(__FILE__, __LINE__, google::GLOG_WARNING)
1157
0
                                    "failed to do CumulativeCompaction, cumu thread pool is "
1158
0
                                    "intensive, delay large task.")
1159
0
                                    .tag("tablet_id", tablet->tablet_id())
1160
0
                                    .tag("input_rows", compaction->calc_input_rowsets_row_num())
1161
0
                                    .tag("input_rowsets_total_size",
1162
0
                                         compaction->calc_input_rowsets_total_size())
1163
0
                                    .tag("config::large_cumu_compaction_task_bytes_threshold",
1164
0
                                         config::large_cumu_compaction_task_bytes_threshold)
1165
0
                                    .tag("config::large_cumu_compaction_task_row_num_threshold",
1166
0
                                         config::large_cumu_compaction_task_row_num_threshold)
1167
0
                                    .tag("remaining threads",
1168
0
                                         _cumu_compaction_thread_pool_used_threads)
1169
0
                                    .tag("small_tasks_running",
1170
0
                                         _cumu_compaction_thread_pool_small_tasks_running);
1171
                            // Delay this task and sleep 5s for this tablet
1172
0
                            long now = duration_cast<std::chrono::milliseconds>(
1173
0
                                               std::chrono::system_clock::now().time_since_epoch())
1174
0
                                               .count();
1175
0
                            tablet->set_last_cumu_compaction_failure_time(now);
1176
0
                            return;
1177
0
                        }
1178
0
                    }
1179
7
                }
1180
7
            } while (false);
  Branch (1180:22): [Folded - Ignored]
1181
7
            if (!tablet->can_do_compaction(tablet->data_dir()->path_hash(), compaction_type)) {
  Branch (1181:17): [True: 0, False: 7]
1182
0
                LOG(INFO) << "Tablet state has been changed, no need to begin this compaction "
1183
0
                             "task, tablet_id="
1184
0
                          << tablet->tablet_id() << ", tablet_state=" << tablet->tablet_state();
1185
0
                return;
1186
0
            }
1187
7
            tablet->compaction_stage = CompactionStage::EXECUTING;
1188
7
            TEST_SYNC_POINT_RETURN_WITH_VOID("olap_server::execute_compaction");
1189
1
            tablet->execute_compaction(*compaction);
1190
1
        });
1191
21
        if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) [[likely]] {
  Branch (1191:13): [True: 21, False: 0]
1192
21
            DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
1193
21
                    _cumu_compaction_thread_pool->get_queue_size());
1194
21
        } else if (compaction_type == CompactionType::BASE_COMPACTION) {
  Branch (1194:20): [True: 0, False: 0]
1195
0
            DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
1196
0
                    _base_compaction_thread_pool->get_queue_size());
1197
0
        }
1198
21
        if (!st.ok()) {
  Branch (1198:13): [True: 0, False: 21]
1199
0
            if (!force) {
  Branch (1199:17): [True: 0, False: 0]
1200
0
                _permit_limiter.release(permits);
1201
0
            }
1202
0
            _pop_tablet_from_submitted_compaction(tablet, compaction_type);
1203
0
            tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
1204
0
            return Status::InternalError(
1205
0
                    "failed to submit compaction task to thread pool, "
1206
0
                    "tablet_id={}, compaction_type={}.",
1207
0
                    tablet->tablet_id(), compaction_type);
1208
0
        }
1209
21
        return Status::OK();
1210
21
    } else {
1211
0
        _pop_tablet_from_submitted_compaction(tablet, compaction_type);
1212
0
        tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
1213
0
        if (!st.ok()) {
  Branch (1213:13): [True: 0, False: 0]
1214
0
            return Status::InternalError(
1215
0
                    "failed to prepare compaction task and calculate permits, "
1216
0
                    "tablet_id={}, compaction_type={}, "
1217
0
                    "permit={}, current_permit={}, status={}",
1218
0
                    tablet->tablet_id(), compaction_type, permits, _permit_limiter.usage(),
1219
0
                    st.to_string());
1220
0
        }
1221
0
        return st;
1222
0
    }
1223
21
}
1224
1225
Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type,
1226
0
                                             bool force, bool eager) {
1227
0
    if (!eager) {
  Branch (1227:9): [True: 0, False: 0]
1228
0
        DCHECK(compaction_type == CompactionType::BASE_COMPACTION ||
1229
0
               compaction_type == CompactionType::CUMULATIVE_COMPACTION);
1230
0
        auto compaction_registry_snapshot = _compaction_submit_registry.create_snapshot();
1231
0
        auto stores = get_stores();
1232
1233
0
        bool is_busy = std::none_of(
1234
0
                stores.begin(), stores.end(),
1235
0
                [&compaction_registry_snapshot, compaction_type](auto* data_dir) {
1236
0
                    return has_free_compaction_slot(
1237
0
                            &compaction_registry_snapshot, data_dir, compaction_type,
1238
0
                            compaction_registry_snapshot.count_executing_cumu_and_base(data_dir));
1239
0
                });
1240
0
        if (is_busy) {
  Branch (1240:13): [True: 0, False: 0]
1241
0
            LOG_EVERY_N(WARNING, 100)
1242
0
                    << "Too busy to submit a compaction task, tablet=" << tablet->get_table_id();
1243
0
            return Status::OK();
1244
0
        }
1245
0
    }
1246
0
    _update_cumulative_compaction_policy();
1247
    // alter table tableName set ("compaction_policy"="time_series")
1248
    // if atler table's compaction  policy, we need to modify tablet compaction policy shared ptr
1249
0
    if (tablet->get_cumulative_compaction_policy() == nullptr ||
  Branch (1249:9): [True: 0, False: 0]
  Branch (1249:9): [True: 0, False: 0]
1250
0
        tablet->get_cumulative_compaction_policy()->name() !=
  Branch (1250:9): [True: 0, False: 0]
1251
0
                tablet->tablet_meta()->compaction_policy()) {
1252
0
        tablet->set_cumulative_compaction_policy(
1253
0
                _cumulative_compaction_policies.at(tablet->tablet_meta()->compaction_policy()));
1254
0
    }
1255
0
    tablet->set_skip_compaction(false);
1256
0
    return _submit_compaction_task(tablet, compaction_type, force);
1257
0
}
1258
1259
Status StorageEngine::_handle_seg_compaction(std::shared_ptr<SegcompactionWorker> worker,
1260
                                             SegCompactionCandidatesSharedPtr segments,
1261
11
                                             uint64_t submission_time) {
1262
    // note: be aware that worker->_writer maybe released when the task is cancelled
1263
11
    uint64_t exec_queue_time = GetCurrentTimeMicros() - submission_time;
1264
11
    LOG(INFO) << "segcompaction thread pool queue time(ms): " << exec_queue_time / 1000;
1265
11
    worker->compact_segments(segments);
1266
    // return OK here. error will be reported via BetaRowsetWriter::_segcompaction_status
1267
11
    return Status::OK();
1268
11
}
1269
1270
Status StorageEngine::submit_seg_compaction_task(std::shared_ptr<SegcompactionWorker> worker,
1271
11
                                                 SegCompactionCandidatesSharedPtr segments) {
1272
11
    uint64_t submission_time = GetCurrentTimeMicros();
1273
11
    return _seg_compaction_thread_pool->submit_func([this, worker, segments, submission_time] {
1274
11
        static_cast<void>(_handle_seg_compaction(worker, segments, submission_time));
1275
11
    });
1276
11
}
1277
1278
0
Status StorageEngine::process_index_change_task(const TAlterInvertedIndexReq& request) {
1279
0
    auto tablet_id = request.tablet_id;
1280
0
    TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
1281
0
    DBUG_EXECUTE_IF("StorageEngine::process_index_change_task_tablet_nullptr",
Line
Count
Source
37
0
    if (UNLIKELY(config::enable_debug_points)) {                              \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
38
0
        auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
39
0
        if (dp) {                                                             \
  Branch (39:13): [True: 0, False: 0]
40
0
            [[maybe_unused]] auto DP_NAME = debug_point_name;                 \
41
0
            { code; }                                                         \
42
0
        }                                                                     \
43
0
    }
1282
0
                    { tablet = nullptr; })
1283
0
    if (tablet == nullptr) {
  Branch (1283:9): [True: 0, False: 0]
1284
0
        LOG(WARNING) << "tablet: " << tablet_id << " not exist";
1285
0
        return Status::InternalError("tablet not exist, tablet_id={}.", tablet_id);
1286
0
    }
1287
1288
0
    IndexBuilderSharedPtr index_builder = std::make_shared<IndexBuilder>(
1289
0
            *this, tablet, request.columns, request.alter_inverted_indexes, request.is_drop_op);
1290
0
    RETURN_IF_ERROR(_handle_index_change(index_builder));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
1291
0
    return Status::OK();
1292
0
}
1293
1294
0
Status StorageEngine::_handle_index_change(IndexBuilderSharedPtr index_builder) {
1295
0
    RETURN_IF_ERROR(index_builder->init());
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
1296
0
    RETURN_IF_ERROR(index_builder->do_build_inverted_index());
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
1297
0
    return Status::OK();
1298
0
}
1299
1300
0
void StorageEngine::_cooldown_tasks_producer_callback() {
1301
0
    int64_t interval = config::generate_cooldown_task_interval_sec;
1302
    // the cooldown replica may be slow to upload it's meta file, so we should wait
1303
    // until it has done uploaded
1304
0
    int64_t skip_failed_interval = interval * 10;
1305
0
    do {
1306
        // these tables are ordered by priority desc
1307
0
        std::vector<TabletSharedPtr> tablets;
1308
0
        std::vector<RowsetSharedPtr> rowsets;
1309
        // TODO(luwei) : a more efficient way to get cooldown tablets
1310
0
        auto cur_time = time(nullptr);
1311
        // we should skip all the tablets which are not running and those pending to do cooldown
1312
        // also tablets once failed to do follow cooldown
1313
0
        auto skip_tablet = [this, skip_failed_interval,
1314
0
                            cur_time](const TabletSharedPtr& tablet) -> bool {
1315
0
            bool is_skip =
1316
0
                    cur_time - tablet->last_failed_follow_cooldown_time() < skip_failed_interval ||
  Branch (1316:21): [True: 0, False: 0]
1317
0
                    TABLET_RUNNING != tablet->tablet_state();
  Branch (1317:21): [True: 0, False: 0]
1318
0
            if (is_skip) {
  Branch (1318:17): [True: 0, False: 0]
1319
0
                return is_skip;
1320
0
            }
1321
0
            std::lock_guard<std::mutex> lock(_running_cooldown_mutex);
1322
0
            return _running_cooldown_tablets.find(tablet->tablet_id()) !=
1323
0
                   _running_cooldown_tablets.end();
1324
0
        };
1325
0
        _tablet_manager->get_cooldown_tablets(&tablets, &rowsets, std::move(skip_tablet));
1326
0
        LOG(INFO) << "cooldown producer get tablet num: " << tablets.size();
1327
0
        int max_priority = tablets.size();
1328
0
        int index = 0;
1329
0
        for (const auto& tablet : tablets) {
  Branch (1329:33): [True: 0, False: 0]
1330
0
            {
1331
0
                std::lock_guard<std::mutex> lock(_running_cooldown_mutex);
1332
0
                _running_cooldown_tablets.insert(tablet->tablet_id());
1333
0
            }
1334
0
            PriorityThreadPool::Task task;
1335
0
            RowsetSharedPtr rowset = std::move(rowsets[index++]);
1336
0
            task.work_function = [tablet, rowset, task_size = tablets.size(), this]() {
1337
0
                Status st = tablet->cooldown(rowset);
1338
0
                {
1339
0
                    std::lock_guard<std::mutex> lock(_running_cooldown_mutex);
1340
0
                    _running_cooldown_tablets.erase(tablet->tablet_id());
1341
0
                }
1342
0
                if (!st.ok()) {
  Branch (1342:21): [True: 0, False: 0]
1343
0
                    LOG(WARNING) << "failed to cooldown, tablet: " << tablet->tablet_id()
1344
0
                                 << " err: " << st;
1345
0
                } else {
1346
0
                    LOG(INFO) << "succeed to cooldown, tablet: " << tablet->tablet_id()
1347
0
                              << " cooldown progress ("
1348
0
                              << task_size - _cooldown_thread_pool->get_queue_size() << "/"
1349
0
                              << task_size << ")";
1350
0
                }
1351
0
            };
1352
0
            task.priority = max_priority--;
1353
0
            bool submited = _cooldown_thread_pool->offer(std::move(task));
1354
1355
0
            if (!submited) {
  Branch (1355:17): [True: 0, False: 0]
1356
0
                LOG(INFO) << "failed to submit cooldown task";
1357
0
            }
1358
0
        }
1359
0
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
  Branch (1359:14): [True: 0, False: 0]
1360
0
}
1361
1362
0
void StorageEngine::_remove_unused_remote_files_callback() {
1363
0
    while (!_stop_background_threads_latch.wait_for(
  Branch (1363:12): [True: 0, False: 0]
1364
0
            std::chrono::seconds(config::remove_unused_remote_files_interval_sec))) {
1365
0
        LOG(INFO) << "begin to remove unused remote files";
1366
0
        do_remove_unused_remote_files();
1367
0
    }
1368
0
}
1369
1370
0
void StorageEngine::do_remove_unused_remote_files() {
1371
0
    auto tablets = tablet_manager()->get_all_tablet([](Tablet* t) {
1372
0
        return t->tablet_meta()->cooldown_meta_id().initialized() && t->is_used() &&
  Branch (1372:16): [True: 0, False: 0]
  Branch (1372:70): [True: 0, False: 0]
1373
0
               t->tablet_state() == TABLET_RUNNING &&
  Branch (1373:16): [True: 0, False: 0]
1374
0
               t->cooldown_conf_unlocked().cooldown_replica_id == t->replica_id();
  Branch (1374:16): [True: 0, False: 0]
1375
0
    });
1376
0
    TConfirmUnusedRemoteFilesRequest req;
1377
0
    req.__isset.confirm_list = true;
1378
    // tablet_id -> [storage_resource, unused_remote_files]
1379
0
    using unused_remote_files_buffer_t =
1380
0
            std::unordered_map<int64_t, std::pair<StorageResource, std::vector<io::FileInfo>>>;
1381
0
    unused_remote_files_buffer_t buffer;
1382
0
    int64_t num_files_in_buffer = 0;
1383
    // assume a filename is 0.1KB, buffer size should not larger than 100MB
1384
0
    constexpr int64_t max_files_in_buffer = 1000000;
1385
1386
0
    auto calc_unused_remote_files = [&req, &buffer, &num_files_in_buffer, this](Tablet* t) {
1387
0
        auto storage_resource = get_resource_by_storage_policy_id(t->storage_policy_id());
1388
0
        if (!storage_resource) {
  Branch (1388:13): [True: 0, False: 0]
1389
0
            LOG(WARNING) << "encounter error when remove unused remote files, tablet_id="
1390
0
                         << t->tablet_id() << " : " << storage_resource.error();
1391
0
            return;
1392
0
        }
1393
1394
        // TODO(plat1ko): Support path v1
1395
0
        if (storage_resource->path_version > 0) {
  Branch (1395:13): [True: 0, False: 0]
1396
0
            return;
1397
0
        }
1398
1399
0
        std::vector<io::FileInfo> files;
1400
        // FIXME(plat1ko): What if user reset resource in storage policy to another resource?
1401
        //  Maybe we should also list files in previously uploaded resources.
1402
0
        bool exists = true;
1403
0
        auto st = storage_resource->fs->list(storage_resource->remote_tablet_path(t->tablet_id()),
1404
0
                                             true, &files, &exists);
1405
0
        if (!st.ok()) {
  Branch (1405:13): [True: 0, False: 0]
1406
0
            LOG(WARNING) << "encounter error when remove unused remote files, tablet_id="
1407
0
                         << t->tablet_id() << " : " << st;
1408
0
            return;
1409
0
        }
1410
0
        if (!exists || files.empty()) {
  Branch (1410:13): [True: 0, False: 0]
  Branch (1410:24): [True: 0, False: 0]
1411
0
            return;
1412
0
        }
1413
        // get all cooldowned rowsets
1414
0
        RowsetIdUnorderedSet cooldowned_rowsets;
1415
0
        UniqueId cooldown_meta_id;
1416
0
        {
1417
0
            std::shared_lock rlock(t->get_header_lock());
1418
0
            for (auto&& rs_meta : t->tablet_meta()->all_rs_metas()) {
  Branch (1418:33): [True: 0, False: 0]
1419
0
                if (!rs_meta->is_local()) {
  Branch (1419:21): [True: 0, False: 0]
1420
0
                    cooldowned_rowsets.insert(rs_meta->rowset_id());
1421
0
                }
1422
0
            }
1423
0
            if (cooldowned_rowsets.empty()) {
  Branch (1423:17): [True: 0, False: 0]
1424
0
                return;
1425
0
            }
1426
0
            cooldown_meta_id = t->tablet_meta()->cooldown_meta_id();
1427
0
        }
1428
0
        auto [cooldown_term, cooldown_replica_id] = t->cooldown_conf();
1429
0
        if (cooldown_replica_id != t->replica_id()) {
  Branch (1429:13): [True: 0, False: 0]
1430
0
            return;
1431
0
        }
1432
        // {cooldown_replica_id}.{cooldown_term}.meta
1433
0
        std::string remote_meta_path =
1434
0
                cooldown_tablet_meta_filename(cooldown_replica_id, cooldown_term);
1435
        // filter out the paths that should be reserved
1436
0
        auto filter = [&, this](io::FileInfo& info) {
1437
0
            std::string_view filename = info.file_name;
1438
0
            if (filename.ends_with(".meta")) {
  Branch (1438:17): [True: 0, False: 0]
1439
0
                return filename == remote_meta_path;
1440
0
            }
1441
0
            auto rowset_id = extract_rowset_id(filename);
1442
0
            if (rowset_id.hi == 0) {
  Branch (1442:17): [True: 0, False: 0]
1443
0
                return false;
1444
0
            }
1445
0
            return cooldowned_rowsets.contains(rowset_id) ||
  Branch (1445:20): [True: 0, False: 0]
1446
0
                   pending_remote_rowsets().contains(rowset_id);
  Branch (1446:20): [True: 0, False: 0]
1447
0
        };
1448
0
        files.erase(std::remove_if(files.begin(), files.end(), std::move(filter)), files.end());
1449
0
        if (files.empty()) {
  Branch (1449:13): [True: 0, False: 0]
1450
0
            return;
1451
0
        }
1452
0
        files.shrink_to_fit();
1453
0
        num_files_in_buffer += files.size();
1454
0
        buffer.insert({t->tablet_id(), {*storage_resource, std::move(files)}});
1455
0
        auto& info = req.confirm_list.emplace_back();
1456
0
        info.__set_tablet_id(t->tablet_id());
1457
0
        info.__set_cooldown_replica_id(cooldown_replica_id);
1458
0
        info.__set_cooldown_meta_id(cooldown_meta_id.to_thrift());
1459
0
    };
1460
1461
0
    auto confirm_and_remove_files = [&buffer, &req, &num_files_in_buffer]() {
1462
0
        TConfirmUnusedRemoteFilesResult result;
1463
0
        LOG(INFO) << "begin to confirm unused remote files. num_tablets=" << buffer.size()
1464
0
                  << " num_files=" << num_files_in_buffer;
1465
0
        auto st = MasterServerClient::instance()->confirm_unused_remote_files(req, &result);
1466
0
        if (!st.ok()) {
  Branch (1466:13): [True: 0, False: 0]
1467
0
            LOG(WARNING) << st;
1468
0
            return;
1469
0
        }
1470
0
        for (auto id : result.confirmed_tablets) {
  Branch (1470:22): [True: 0, False: 0]
1471
0
            if (auto it = buffer.find(id); LIKELY(it != buffer.end())) {
Line
Count
Source
35
0
#define LIKELY(expr) __builtin_expect(!!(expr), 1)
  Branch (35:22): [True: 0, False: 0]
1472
0
                auto& storage_resource = it->second.first;
1473
0
                auto& files = it->second.second;
1474
0
                std::vector<io::Path> paths;
1475
0
                paths.reserve(files.size());
1476
                // delete unused files
1477
0
                LOG(INFO) << "delete unused files. root_path=" << storage_resource.fs->root_path()
1478
0
                          << " tablet_id=" << id;
1479
0
                io::Path dir = storage_resource.remote_tablet_path(id);
1480
0
                for (auto& file : files) {
  Branch (1480:33): [True: 0, False: 0]
1481
0
                    auto file_path = dir / file.file_name;
1482
0
                    LOG(INFO) << "delete unused file: " << file_path.native();
1483
0
                    paths.push_back(std::move(file_path));
1484
0
                }
1485
0
                st = storage_resource.fs->batch_delete(paths);
1486
0
                if (!st.ok()) {
  Branch (1486:21): [True: 0, False: 0]
1487
0
                    LOG(WARNING) << "failed to delete unused files, tablet_id=" << id << " : "
1488
0
                                 << st;
1489
0
                }
1490
0
                buffer.erase(it);
1491
0
            }
1492
0
        }
1493
0
    };
1494
1495
    // batch confirm to reduce FE's overhead
1496
0
    auto next_confirm_time = std::chrono::steady_clock::now() +
1497
0
                             std::chrono::seconds(config::confirm_unused_remote_files_interval_sec);
1498
0
    for (auto& t : tablets) {
  Branch (1498:18): [True: 0, False: 0]
1499
0
        if (t.use_count() <= 1 // this means tablet has been dropped
  Branch (1499:13): [True: 0, False: 0]
  Branch (1499:13): [True: 0, False: 0]
1500
0
            || t->cooldown_conf_unlocked().cooldown_replica_id != t->replica_id() ||
  Branch (1500:16): [True: 0, False: 0]
1501
0
            t->tablet_state() != TABLET_RUNNING) {
  Branch (1501:13): [True: 0, False: 0]
1502
0
            continue;
1503
0
        }
1504
0
        calc_unused_remote_files(t.get());
1505
0
        if (num_files_in_buffer > 0 && (num_files_in_buffer > max_files_in_buffer ||
  Branch (1505:13): [True: 0, False: 0]
  Branch (1505:13): [True: 0, False: 0]
  Branch (1505:41): [True: 0, False: 0]
1506
0
                                        std::chrono::steady_clock::now() > next_confirm_time)) {
  Branch (1506:41): [True: 0, False: 0]
1507
0
            confirm_and_remove_files();
1508
0
            buffer.clear();
1509
0
            req.confirm_list.clear();
1510
0
            num_files_in_buffer = 0;
1511
0
            next_confirm_time =
1512
0
                    std::chrono::steady_clock::now() +
1513
0
                    std::chrono::seconds(config::confirm_unused_remote_files_interval_sec);
1514
0
        }
1515
0
    }
1516
0
    if (num_files_in_buffer > 0) {
  Branch (1516:9): [True: 0, False: 0]
1517
0
        confirm_and_remove_files();
1518
0
    }
1519
0
}
1520
1521
0
void StorageEngine::_cold_data_compaction_producer_callback() {
1522
0
    while (!_stop_background_threads_latch.wait_for(
  Branch (1522:12): [True: 0, False: 0]
1523
0
            std::chrono::seconds(config::cold_data_compaction_interval_sec))) {
1524
0
        if (config::disable_auto_compaction ||
  Branch (1524:13): [True: 0, False: 0]
1525
0
            GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
  Branch (1525:13): [True: 0, False: 0]
1526
0
            continue;
1527
0
        }
1528
1529
0
        std::unordered_set<int64_t> copied_tablet_submitted;
1530
0
        {
1531
0
            std::lock_guard lock(_cold_compaction_tablet_submitted_mtx);
1532
0
            copied_tablet_submitted = _cold_compaction_tablet_submitted;
1533
0
        }
1534
0
        int n = config::cold_data_compaction_thread_num - copied_tablet_submitted.size();
1535
0
        if (n <= 0) {
  Branch (1535:13): [True: 0, False: 0]
1536
0
            continue;
1537
0
        }
1538
0
        auto tablets = _tablet_manager->get_all_tablet([&copied_tablet_submitted](Tablet* t) {
1539
0
            return t->tablet_meta()->cooldown_meta_id().initialized() && t->is_used() &&
  Branch (1539:20): [True: 0, False: 0]
  Branch (1539:74): [True: 0, False: 0]
1540
0
                   t->tablet_state() == TABLET_RUNNING &&
  Branch (1540:20): [True: 0, False: 0]
1541
0
                   !copied_tablet_submitted.contains(t->tablet_id()) &&
  Branch (1541:20): [True: 0, False: 0]
1542
0
                   !t->tablet_meta()->tablet_schema()->disable_auto_compaction();
  Branch (1542:20): [True: 0, False: 0]
1543
0
        });
1544
0
        std::vector<std::pair<TabletSharedPtr, int64_t>> tablet_to_compact;
1545
0
        tablet_to_compact.reserve(n + 1);
1546
0
        std::vector<std::pair<TabletSharedPtr, int64_t>> tablet_to_follow;
1547
0
        tablet_to_follow.reserve(n + 1);
1548
1549
0
        for (auto& t : tablets) {
  Branch (1549:22): [True: 0, False: 0]
1550
0
            if (t->replica_id() == t->cooldown_conf_unlocked().cooldown_replica_id) {
  Branch (1550:17): [True: 0, False: 0]
1551
0
                auto score = t->calc_cold_data_compaction_score();
1552
0
                if (score < 4) {
  Branch (1552:21): [True: 0, False: 0]
1553
0
                    continue;
1554
0
                }
1555
0
                tablet_to_compact.emplace_back(t, score);
1556
0
                if (tablet_to_compact.size() > n) {
  Branch (1556:21): [True: 0, False: 0]
1557
0
                    std::sort(tablet_to_compact.begin(), tablet_to_compact.end(),
1558
0
                              [](auto& a, auto& b) { return a.second > b.second; });
1559
0
                    tablet_to_compact.pop_back();
1560
0
                }
1561
0
                continue;
1562
0
            }
1563
            // else, need to follow
1564
0
            {
1565
0
                std::lock_guard lock(_running_cooldown_mutex);
1566
0
                if (_running_cooldown_tablets.contains(t->table_id())) {
  Branch (1566:21): [True: 0, False: 0]
1567
                    // already in cooldown queue
1568
0
                    continue;
1569
0
                }
1570
0
            }
1571
            // TODO(plat1ko): some avoidance strategy if failed to follow
1572
0
            auto score = t->calc_cold_data_compaction_score();
1573
0
            tablet_to_follow.emplace_back(t, score);
1574
1575
0
            if (tablet_to_follow.size() > n) {
  Branch (1575:17): [True: 0, False: 0]
1576
0
                std::sort(tablet_to_follow.begin(), tablet_to_follow.end(),
1577
0
                          [](auto& a, auto& b) { return a.second > b.second; });
1578
0
                tablet_to_follow.pop_back();
1579
0
            }
1580
0
        }
1581
1582
0
        for (auto& [tablet, score] : tablet_to_compact) {
  Branch (1582:36): [True: 0, False: 0]
1583
0
            LOG(INFO) << "submit cold data compaction. tablet_id=" << tablet->tablet_id()
1584
0
                      << " score=" << score;
1585
0
            static_cast<void>(_cold_data_compaction_thread_pool->submit_func(
1586
0
                    [&, t = std::move(tablet), this]() {
1587
0
                        auto compaction = std::make_shared<ColdDataCompaction>(*this, t);
1588
0
                        {
1589
0
                            std::lock_guard lock(_cold_compaction_tablet_submitted_mtx);
1590
0
                            _cold_compaction_tablet_submitted.insert(t->tablet_id());
1591
0
                        }
1592
0
                        Defer defer {[&] {
1593
0
                            std::lock_guard lock(_cold_compaction_tablet_submitted_mtx);
1594
0
                            _cold_compaction_tablet_submitted.erase(t->tablet_id());
1595
0
                        }};
1596
0
                        std::unique_lock cold_compaction_lock(t->get_cold_compaction_lock(),
1597
0
                                                              std::try_to_lock);
1598
0
                        if (!cold_compaction_lock.owns_lock()) {
  Branch (1598:29): [True: 0, False: 0]
1599
0
                            LOG(WARNING) << "try cold_compaction_lock failed, tablet_id="
1600
0
                                         << t->tablet_id();
1601
0
                            return;
1602
0
                        }
1603
0
                        _update_cumulative_compaction_policy();
1604
0
                        if (t->get_cumulative_compaction_policy() == nullptr ||
  Branch (1604:29): [True: 0, False: 0]
  Branch (1604:29): [True: 0, False: 0]
1605
0
                            t->get_cumulative_compaction_policy()->name() !=
  Branch (1605:29): [True: 0, False: 0]
1606
0
                                    t->tablet_meta()->compaction_policy()) {
1607
0
                            t->set_cumulative_compaction_policy(_cumulative_compaction_policies.at(
1608
0
                                    t->tablet_meta()->compaction_policy()));
1609
0
                        }
1610
1611
0
                        auto st = compaction->prepare_compact();
1612
0
                        if (!st.ok()) {
  Branch (1612:29): [True: 0, False: 0]
1613
0
                            LOG(WARNING) << "failed to prepare cold data compaction. tablet_id="
1614
0
                                         << t->tablet_id() << " err=" << st;
1615
0
                            return;
1616
0
                        }
1617
1618
0
                        st = compaction->execute_compact();
1619
0
                        if (!st.ok()) {
  Branch (1619:29): [True: 0, False: 0]
1620
0
                            LOG(WARNING) << "failed to execute cold data compaction. tablet_id="
1621
0
                                         << t->tablet_id() << " err=" << st;
1622
0
                            return;
1623
0
                        }
1624
0
                    }));
1625
0
        }
1626
1627
0
        for (auto& [tablet, score] : tablet_to_follow) {
  Branch (1627:36): [True: 0, False: 0]
1628
0
            LOG(INFO) << "submit to follow cooldown meta. tablet_id=" << tablet->tablet_id()
1629
0
                      << " score=" << score;
1630
0
            static_cast<void>(_cold_data_compaction_thread_pool->submit_func([&,
1631
0
                                                                              t = std::move(
1632
0
                                                                                      tablet)]() {
1633
0
                {
1634
0
                    std::lock_guard lock(_cold_compaction_tablet_submitted_mtx);
1635
0
                    _cold_compaction_tablet_submitted.insert(t->tablet_id());
1636
0
                }
1637
0
                auto st = t->cooldown();
1638
0
                {
1639
0
                    std::lock_guard lock(_cold_compaction_tablet_submitted_mtx);
1640
0
                    _cold_compaction_tablet_submitted.erase(t->tablet_id());
1641
0
                }
1642
0
                if (!st.ok()) {
  Branch (1642:21): [True: 0, False: 0]
1643
                    // The cooldown of the replica may be relatively slow
1644
                    // resulting in a short period of time where following cannot be successful
1645
0
                    LOG_EVERY_N(WARNING, 5)
1646
0
                            << "failed to cooldown. tablet_id=" << t->tablet_id() << " err=" << st;
1647
0
                }
1648
0
            }));
1649
0
        }
1650
0
    }
1651
0
}
1652
1653
void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_id,
1654
                                           int64_t publish_version, int64_t transaction_id,
1655
2.05k
                                           bool is_recovery) {
1656
2.05k
    if (!is_recovery) {
  Branch (1656:9): [True: 2.05k, False: 0]
1657
2.05k
        bool exists = false;
1658
2.05k
        {
1659
2.05k
            std::shared_lock<std::shared_mutex> rlock(_async_publish_lock);
1660
2.05k
            if (auto tablet_iter = _async_publish_tasks.find(tablet_id);
1661
2.05k
                tablet_iter != _async_publish_tasks.end()) {
  Branch (1661:17): [True: 2.05k, False: 2]
1662
2.05k
                if (auto iter = tablet_iter->second.find(publish_version);
1663
2.05k
                    iter != tablet_iter->second.end()) {
  Branch (1663:21): [True: 20, False: 2.03k]
1664
20
                    exists = true;
1665
20
                }
1666
2.05k
            }
1667
2.05k
        }
1668
2.05k
        if (exists) {
  Branch (1668:13): [True: 20, False: 2.03k]
1669
20
            return;
1670
20
        }
1671
2.03k
        TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id);
1672
2.03k
        if (tablet == nullptr) {
  Branch (1672:13): [True: 0, False: 2.03k]
1673
0
            LOG(INFO) << "tablet may be dropped when add async publish task, tablet_id: "
1674
0
                      << tablet_id;
1675
0
            return;
1676
0
        }
1677
2.03k
        PendingPublishInfoPB pending_publish_info_pb;
1678
2.03k
        pending_publish_info_pb.set_partition_id(partition_id);
1679
2.03k
        pending_publish_info_pb.set_transaction_id(transaction_id);
1680
2.03k
        static_cast<void>(TabletMetaManager::save_pending_publish_info(
1681
2.03k
                tablet->data_dir(), tablet->tablet_id(), publish_version,
1682
2.03k
                pending_publish_info_pb.SerializeAsString()));
1683
2.03k
    }
1684
2.03k
    LOG(INFO) << "add pending publish task, tablet_id: " << tablet_id
1685
2.03k
              << " version: " << publish_version << " txn_id:" << transaction_id
1686
2.03k
              << " is_recovery: " << is_recovery;
1687
2.03k
    std::unique_lock<std::shared_mutex> wlock(_async_publish_lock);
1688
2.03k
    _async_publish_tasks[tablet_id][publish_version] = {transaction_id, partition_id};
1689
2.03k
}
1690
1691
3
int64_t StorageEngine::get_pending_publish_min_version(int64_t tablet_id) {
1692
3
    std::shared_lock<std::shared_mutex> rlock(_async_publish_lock);
1693
3
    auto iter = _async_publish_tasks.find(tablet_id);
1694
3
    if (iter == _async_publish_tasks.end()) {
  Branch (1694:9): [True: 0, False: 3]
1695
0
        return INT64_MAX;
1696
0
    }
1697
3
    if (iter->second.empty()) {
  Branch (1697:9): [True: 0, False: 3]
1698
0
        return INT64_MAX;
1699
0
    }
1700
3
    return iter->second.begin()->first;
1701
3
}
1702
1703
10
void StorageEngine::_process_async_publish() {
1704
    // tablet, publish_version
1705
10
    std::vector<std::pair<TabletSharedPtr, int64_t>> need_removed_tasks;
1706
10
    {
1707
10
        std::unique_lock<std::shared_mutex> wlock(_async_publish_lock);
1708
10
        for (auto tablet_iter = _async_publish_tasks.begin();
1709
20
             tablet_iter != _async_publish_tasks.end();) {
  Branch (1709:14): [True: 10, False: 10]
1710
10
            if (tablet_iter->second.empty()) {
  Branch (1710:17): [True: 1, False: 9]
1711
1
                tablet_iter = _async_publish_tasks.erase(tablet_iter);
1712
1
                continue;
1713
1
            }
1714
9
            int64_t tablet_id = tablet_iter->first;
1715
9
            TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id);
1716
9
            if (!tablet) {
  Branch (1716:17): [True: 1, False: 8]
1717
1
                LOG(WARNING) << "tablet does not exist when async publush, tablet_id: "
1718
1
                             << tablet_id;
1719
1
                tablet_iter = _async_publish_tasks.erase(tablet_iter);
1720
1
                continue;
1721
1
            }
1722
1723
8
            auto task_iter = tablet_iter->second.begin();
1724
8
            int64_t version = task_iter->first;
1725
8
            int64_t transaction_id = task_iter->second.first;
1726
8
            int64_t partition_id = task_iter->second.second;
1727
8
            int64_t max_version = tablet->max_version().second;
1728
1729
8
            if (version <= max_version) {
  Branch (1729:17): [True: 6, False: 2]
1730
6
                need_removed_tasks.emplace_back(tablet, version);
1731
6
                tablet_iter->second.erase(task_iter);
1732
6
                tablet_iter++;
1733
6
                continue;
1734
6
            }
1735
2
            if (version != max_version + 1) {
  Branch (1735:17): [True: 1, False: 1]
1736
1
                int32_t max_version_config = tablet->max_version_config();
1737
                // Keep only the most recent versions
1738
31
                while (tablet_iter->second.size() > max_version_config) {
  Branch (1738:24): [True: 30, False: 1]
1739
30
                    need_removed_tasks.emplace_back(tablet, version);
1740
30
                    task_iter = tablet_iter->second.erase(task_iter);
1741
30
                    version = task_iter->first;
1742
30
                }
1743
1
                tablet_iter++;
1744
1
                continue;
1745
1
            }
1746
1747
1
            auto async_publish_task = std::make_shared<AsyncTabletPublishTask>(
1748
1
                    *this, tablet, partition_id, transaction_id, version);
1749
1
            static_cast<void>(_tablet_publish_txn_thread_pool->submit_func(
1750
1
                    [=]() { async_publish_task->handle(); }));
1751
1
            tablet_iter->second.erase(task_iter);
1752
1
            need_removed_tasks.emplace_back(tablet, version);
1753
1
            tablet_iter++;
1754
1
        }
1755
10
    }
1756
37
    for (auto& [tablet, publish_version] : need_removed_tasks) {
  Branch (1756:42): [True: 37, False: 10]
1757
37
        static_cast<void>(TabletMetaManager::remove_pending_publish_info(
1758
37
                tablet->data_dir(), tablet->tablet_id(), publish_version));
1759
37
    }
1760
10
}
1761
1762
0
void StorageEngine::_async_publish_callback() {
1763
0
    while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(30))) {
  Branch (1763:12): [True: 0, False: 0]
1764
0
        _process_async_publish();
1765
0
    }
1766
0
}
1767
1768
0
void StorageEngine::_check_tablet_delete_bitmap_score_callback() {
1769
0
    LOG(INFO) << "try to start check tablet delete bitmap score!";
1770
0
    while (!_stop_background_threads_latch.wait_for(
  Branch (1770:12): [True: 0, False: 0]
1771
0
            std::chrono::seconds(config::check_tablet_delete_bitmap_interval_seconds))) {
1772
0
        if (!config::enable_check_tablet_delete_bitmap_score) {
  Branch (1772:13): [True: 0, False: 0]
1773
0
            return;
1774
0
        }
1775
0
        uint64_t max_delete_bitmap_score = 0;
1776
0
        uint64_t max_base_rowset_delete_bitmap_score = 0;
1777
0
        _tablet_manager->get_topn_tablet_delete_bitmap_score(&max_delete_bitmap_score,
1778
0
                                                             &max_base_rowset_delete_bitmap_score);
1779
0
        if (max_delete_bitmap_score > 0) {
  Branch (1779:13): [True: 0, False: 0]
1780
0
            _tablet_max_delete_bitmap_score_metrics->set_value(max_delete_bitmap_score);
1781
0
        }
1782
0
        if (max_base_rowset_delete_bitmap_score > 0) {
  Branch (1782:13): [True: 0, False: 0]
1783
0
            _tablet_max_base_rowset_delete_bitmap_score_metrics->set_value(
1784
0
                    max_base_rowset_delete_bitmap_score);
1785
0
        }
1786
0
    }
1787
0
}
1788
1789
} // namespace doris