Coverage Report

Created: 2026-06-01 14:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_warmup_metrics.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_warmup_metrics.h"
19
20
#include <algorithm>
21
22
namespace doris {
23
24
WarmUpEdDownstreamProgressTracker g_warmup_ed_downstream_progress_tracker;
25
26
void WarmUpEdDownstreamProgressTracker::record_task_submit(const std::string& job_id_str,
27
4
                                                           int64_t upstream_trigger_ts_ms) {
28
4
    if (upstream_trigger_ts_ms <= 0) {
29
0
        return;
30
0
    }
31
4
    std::lock_guard lock(_mtx);
32
4
    auto& progress = _progress_by_job[job_id_str];
33
4
    ++progress.pending_trigger_ts_counts[upstream_trigger_ts_ms];
34
4
}
35
36
void WarmUpEdDownstreamProgressTracker::record_task_done(const std::string& job_id_str,
37
6
                                                         int64_t upstream_trigger_ts_ms) {
38
6
    if (upstream_trigger_ts_ms <= 0) {
39
0
        return;
40
0
    }
41
6
    std::lock_guard lock(_mtx);
42
6
    auto& progress = _progress_by_job[job_id_str];
43
6
    auto pending_it = progress.pending_trigger_ts_counts.find(upstream_trigger_ts_ms);
44
6
    if (pending_it != progress.pending_trigger_ts_counts.end()) {
45
4
        --pending_it->second;
46
4
        if (pending_it->second <= 0) {
47
3
            progress.pending_trigger_ts_counts.erase(pending_it);
48
3
        }
49
4
    }
50
6
    progress.last_finished_trigger_ts =
51
6
            std::max(progress.last_finished_trigger_ts, upstream_trigger_ts_ms);
52
6
}
53
54
7
int64_t WarmUpEdDownstreamProgressTracker::get_progress_ts(const std::string& job_id_str) const {
55
7
    std::lock_guard lock(_mtx);
56
7
    auto progress_it = _progress_by_job.find(job_id_str);
57
7
    if (progress_it == _progress_by_job.end()) {
58
0
        return 0;
59
0
    }
60
7
    const auto& progress = progress_it->second;
61
7
    if (!progress.pending_trigger_ts_counts.empty()) {
62
4
        return progress.pending_trigger_ts_counts.begin()->first;
63
4
    }
64
3
    return progress.last_finished_trigger_ts;
65
7
}
66
67
1
std::vector<std::string> WarmUpEdDownstreamProgressTracker::list_job_ids() const {
68
1
    std::lock_guard lock(_mtx);
69
1
    std::vector<std::string> job_ids;
70
1
    job_ids.reserve(_progress_by_job.size());
71
1
    for (const auto& entry : _progress_by_job) {
72
1
        job_ids.emplace_back(entry.first);
73
1
    }
74
1
    return job_ids;
75
1
}
76
77
4
void WarmUpEdDownstreamProgressTracker::reset_for_test() {
78
4
    std::lock_guard lock(_mtx);
79
4
    _progress_by_job.clear();
80
4
}
81
82
} // namespace doris