Coverage Report

Created: 2025-12-28 17:59

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/runtime/exec_env.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/exec_env.h"
19
20
#include <gen_cpp/HeartbeatService_types.h>
21
#include <glog/logging.h>
22
23
#include <mutex>
24
#include <utility>
25
26
#include "common/config.h"
27
#include "common/logging.h"
28
#include "olap/olap_define.h"
29
#include "olap/storage_engine.h"
30
#include "olap/tablet_manager.h"
31
#include "runtime/fragment_mgr.h"
32
#include "runtime/frontend_info.h"
33
#include "runtime/load_stream_mgr.h"
34
#include "util/debug_util.h"
35
#include "util/time.h"
36
#include "vec/runtime/vdata_stream_mgr.h"
37
#include "vec/sink/delta_writer_v2_pool.h"
38
#include "vec/sink/load_stream_map_pool.h"
39
40
namespace doris {
41
42
#ifdef BE_TEST
43
void ExecEnv::set_inverted_index_searcher_cache(
44
145
        segment_v2::InvertedIndexSearcherCache* inverted_index_searcher_cache) {
45
145
    _inverted_index_searcher_cache = inverted_index_searcher_cache;
46
145
}
47
432
void ExecEnv::set_storage_engine(std::unique_ptr<BaseStorageEngine>&& engine) {
48
432
    _storage_engine = std::move(engine);
49
432
}
50
1
void ExecEnv::set_write_cooldown_meta_executors() {
51
1
    _write_cooldown_meta_executors = std::make_unique<WriteCooldownMetaExecutors>();
52
1
}
53
#endif // BE_TEST
54
55
Result<BaseTabletSPtr> ExecEnv::get_tablet(int64_t tablet_id, SyncRowsetStats* sync_stats,
56
33
                                           bool force_use_only_cached, bool cache_on_miss) {
57
33
    auto storage_engine = GetInstance()->_storage_engine.get();
58
33
    return storage_engine != nullptr
59
33
                   ? storage_engine->get_tablet(tablet_id, sync_stats, force_use_only_cached,
60
30
                                                cache_on_miss)
61
33
                   : ResultError(Status::InternalError("failed to get tablet {}", tablet_id));
62
33
}
63
64
Status ExecEnv::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta,
65
26
                                bool force_use_only_cached) {
66
26
    auto storage_engine = GetInstance()->_storage_engine.get();
67
26
    if (storage_engine == nullptr) {
68
20
        return Status::InternalError("storage engine is not initialized");
69
20
    }
70
6
    return storage_engine->get_tablet_meta(tablet_id, tablet_meta, force_use_only_cached);
71
26
}
72
73
4
const std::string& ExecEnv::token() const {
74
4
    return _cluster_info->token;
75
4
}
76
77
0
void ExecEnv::clear_stream_mgr() {
78
0
    if (_vstream_mgr) {
79
0
        SAFE_DELETE(_vstream_mgr);
80
0
    }
81
0
}
82
83
0
std::vector<TFrontendInfo> ExecEnv::get_frontends() {
84
0
    std::lock_guard<std::mutex> lg(_frontends_lock);
85
0
    std::vector<TFrontendInfo> infos;
86
0
    for (const auto& cur_fe : _frontends) {
87
0
        infos.push_back(cur_fe.second.info);
88
0
    }
89
0
    return infos;
90
0
}
91
92
0
void ExecEnv::update_frontends(const std::vector<TFrontendInfo>& new_fe_infos) {
93
0
    std::lock_guard<std::mutex> lg(_frontends_lock);
94
95
0
    std::set<TNetworkAddress> dropped_fes;
96
97
0
    for (const auto& cur_fe : _frontends) {
98
0
        dropped_fes.insert(cur_fe.first);
99
0
    }
100
101
0
    for (const auto& coming_fe_info : new_fe_infos) {
102
0
        auto itr = _frontends.find(coming_fe_info.coordinator_address);
103
104
0
        if (itr == _frontends.end()) {
105
0
            LOG(INFO) << "A completely new frontend, " << PrintFrontendInfo(coming_fe_info);
106
107
0
            _frontends.insert(std::pair<TNetworkAddress, FrontendInfo>(
108
0
                    coming_fe_info.coordinator_address,
109
0
                    FrontendInfo {coming_fe_info, GetCurrentTimeMicros() / 1000, /*first time*/
110
0
                                  GetCurrentTimeMicros() / 1000 /*last time*/}));
111
112
0
            continue;
113
0
        }
114
115
0
        dropped_fes.erase(coming_fe_info.coordinator_address);
116
117
0
        if (coming_fe_info.process_uuid == 0) {
118
0
            LOG(WARNING) << "Frontend " << PrintFrontendInfo(coming_fe_info)
119
0
                         << " is in an unknown state.";
120
0
        }
121
122
0
        if (coming_fe_info.process_uuid == itr->second.info.process_uuid) {
123
0
            itr->second.last_reveiving_time_ms = GetCurrentTimeMicros() / 1000;
124
0
            continue;
125
0
        }
126
127
        // If we get here, means this frontend has already restarted.
128
0
        itr->second.info.process_uuid = coming_fe_info.process_uuid;
129
0
        itr->second.first_receiving_time_ms = GetCurrentTimeMicros() / 1000;
130
0
        itr->second.last_reveiving_time_ms = GetCurrentTimeMicros() / 1000;
131
0
        LOG(INFO) << "Update frontend " << PrintFrontendInfo(coming_fe_info);
132
0
    }
133
134
0
    for (const auto& dropped_fe : dropped_fes) {
135
0
        LOG(INFO) << "Frontend " << PrintThriftNetworkAddress(dropped_fe)
136
0
                  << " has already been dropped, remove it";
137
0
        _frontends.erase(dropped_fe);
138
0
    }
139
0
}
140
141
13
std::map<TNetworkAddress, FrontendInfo> ExecEnv::get_running_frontends() {
142
13
    std::lock_guard<std::mutex> lg(_frontends_lock);
143
13
    std::map<TNetworkAddress, FrontendInfo> res;
144
13
    const int expired_duration = config::fe_expire_duration_seconds * 1000;
145
13
    const auto now = GetCurrentTimeMicros() / 1000;
146
147
13
    for (const auto& pair : _frontends) {
148
0
        auto& brpc_addr = pair.first;
149
0
        auto& fe_info = pair.second;
150
151
0
        if (fe_info.info.process_uuid == 0) {
152
            // FE is in an unknown state, regart it as alive. conservative
153
0
            res[brpc_addr] = fe_info;
154
0
        } else {
155
0
            if (now - fe_info.last_reveiving_time_ms < expired_duration) {
156
                // If fe info has just been update in last expired_duration, regard it as running.
157
0
                res[brpc_addr] = fe_info;
158
0
            } else {
159
                // Fe info has not been udpate for more than expired_duration, regard it as an abnormal.
160
                // Abnormal means this fe can not connect to master, and it is not dropped from cluster.
161
                // or fe do not have master yet.
162
0
                LOG_EVERY_N(WARNING, 50) << fmt::format(
163
0
                        "Frontend {}:{} has not update its hb for more than {} secs, regard it as "
164
0
                        "abnormal",
165
0
                        brpc_addr.hostname, brpc_addr.port, config::fe_expire_duration_seconds);
166
0
            }
167
0
        }
168
0
    }
169
170
13
    return res;
171
13
}
172
173
0
void ExecEnv::wait_for_all_tasks_done() {
174
    // For graceful shutdown, need to wait for all running queries to stop
175
0
    int32_t wait_seconds_passed = 0;
176
0
    while (true) {
177
0
        int num_queries = _fragment_mgr->running_query_num();
178
0
        if (num_queries < 1) {
179
0
            break;
180
0
        }
181
0
        if (wait_seconds_passed > doris::config::grace_shutdown_wait_seconds) {
182
0
            LOG(INFO) << "There are still " << num_queries << " queries running, but "
183
0
                      << wait_seconds_passed << " seconds passed, has to exist now";
184
0
            break;
185
0
        }
186
0
        LOG(INFO) << "There are still " << num_queries << " queries running, waiting...";
187
0
        sleep(1);
188
0
        ++wait_seconds_passed;
189
0
    }
190
    // This is a conservative strategy.
191
    // Because a query might still have fragments running on other BE nodes.
192
    // In other words, the query hasn't truly terminated.
193
    // If the current BE is shut down at this point,
194
    // the FE will detect the downtime of a related BE and cancel the entire query,
195
    // defeating the purpose of a graceful stop.
196
0
    sleep(config::grace_shutdown_post_delay_seconds);
197
0
}
198
199
0
bool ExecEnv::check_auth_token(const std::string& auth_token) {
200
0
    return _cluster_info->curr_auth_token == auth_token ||
201
0
           _cluster_info->last_auth_token == auth_token;
202
0
}
203
204
} // namespace doris