Coverage Report

Created: 2026-05-14 07:56

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/fragment_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/fragment_mgr.h"
19
20
#include <brpc/controller.h>
21
#include <bvar/latency_recorder.h>
22
#include <fmt/format.h>
23
#include <gen_cpp/DorisExternalService_types.h>
24
#include <gen_cpp/FrontendService.h>
25
#include <gen_cpp/FrontendService_types.h>
26
#include <gen_cpp/HeartbeatService_types.h>
27
#include <gen_cpp/Metrics_types.h>
28
#include <gen_cpp/PaloInternalService_types.h>
29
#include <gen_cpp/PlanNodes_types.h>
30
#include <gen_cpp/Planner_types.h>
31
#include <gen_cpp/QueryPlanExtra_types.h>
32
#include <gen_cpp/RuntimeProfile_types.h>
33
#include <gen_cpp/Types_types.h>
34
#include <gen_cpp/internal_service.pb.h>
35
#include <pthread.h>
36
#include <sys/time.h>
37
#include <thrift/TApplicationException.h>
38
#include <thrift/Thrift.h>
39
#include <thrift/protocol/TDebugProtocol.h>
40
#include <thrift/transport/TTransportException.h>
41
#include <unistd.h>
42
43
#include <algorithm>
44
#include <cstddef>
45
#include <ctime>
46
47
// IWYU pragma: no_include <bits/chrono.h>
48
#include <chrono> // IWYU pragma: keep
49
#include <cstdint>
50
#include <map>
51
#include <memory>
52
#include <mutex>
53
#include <sstream>
54
#include <unordered_map>
55
#include <unordered_set>
56
#include <utility>
57
58
#include "common/config.h"
59
#include "common/exception.h"
60
#include "common/logging.h"
61
#include "common/metrics/doris_metrics.h"
62
#include "common/object_pool.h"
63
#include "common/status.h"
64
#include "common/utils.h"
65
#include "core/data_type/primitive_type.h"
66
#include "exec/pipeline/pipeline_fragment_context.h"
67
#include "exec/runtime_filter/runtime_filter_consumer.h"
68
#include "exec/runtime_filter/runtime_filter_mgr.h"
69
#include "io/fs/stream_load_pipe.h"
70
#include "load/stream_load/new_load_stream_mgr.h"
71
#include "load/stream_load/stream_load_context.h"
72
#include "load/stream_load/stream_load_executor.h"
73
#include "runtime/descriptors.h"
74
#include "runtime/exec_env.h"
75
#include "runtime/frontend_info.h"
76
#include "runtime/query_context.h"
77
#include "runtime/runtime_profile.h"
78
#include "runtime/runtime_query_statistics_mgr.h"
79
#include "runtime/runtime_state.h"
80
#include "runtime/thread_context.h"
81
#include "runtime/workload_group/workload_group.h"
82
#include "runtime/workload_group/workload_group_manager.h"
83
#include "service/backend_options.h"
84
#include "storage/id_manager.h"
85
#include "util/brpc_client_cache.h"
86
#include "util/client_cache.h"
87
#include "util/debug_points.h"
88
#include "util/debug_util.h"
89
#include "util/network_util.h"
90
#include "util/thread.h"
91
#include "util/threadpool.h"
92
#include "util/thrift_util.h"
93
#include "util/uid_util.h"
94
95
namespace doris {
96
97
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_instance_count, MetricUnit::NOUNIT);
98
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count, MetricUnit::NOUNIT);
99
100
bvar::LatencyRecorder g_fragmentmgr_prepare_latency("doris_FragmentMgr", "prepare");
101
102
bvar::Adder<uint64_t> g_fragment_executing_count("fragment_executing_count");
103
bvar::Status<uint64_t> g_fragment_last_active_time(
104
        "fragment_last_active_time", duration_cast<std::chrono::milliseconds>(
105
                                             std::chrono::system_clock::now().time_since_epoch())
106
                                             .count());
107
108
1.24k
uint64_t get_fragment_executing_count() {
109
1.24k
    return g_fragment_executing_count.get_value();
110
1.24k
}
111
1.24k
uint64_t get_fragment_last_active_time() {
112
1.24k
    return g_fragment_last_active_time.get_value();
113
1.24k
}
114
115
2.57k
std::string to_load_error_http_path(const std::string& file_name) {
116
2.57k
    if (file_name.empty()) {
117
1.97k
        return "";
118
1.97k
    }
119
600
    if (file_name.compare(0, 4, "http") == 0) {
120
574
        return file_name;
121
574
    }
122
26
    std::stringstream url;
123
26
    url << (config::enable_https ? "https" : "http") << "://"
124
26
        << get_host_port(BackendOptions::get_localhost(), config::webserver_port)
125
26
        << "/api/_load_error_log?"
126
26
        << "file=" << file_name;
127
26
    return url.str();
128
600
}
129
130
static Status _do_fetch_running_queries_rpc(const FrontendInfo& fe_info,
131
0
                                            std::unordered_set<TUniqueId>& query_set) {
132
0
    TFetchRunningQueriesResult rpc_result;
133
0
    TFetchRunningQueriesRequest rpc_request;
134
135
0
    Status client_status;
136
0
    const int32_t timeout_ms = 3 * 1000;
137
0
    FrontendServiceConnection rpc_client(ExecEnv::GetInstance()->frontend_client_cache(),
138
0
                                         fe_info.info.coordinator_address, timeout_ms,
139
0
                                         &client_status);
140
    // Abort this fe.
141
0
    if (!client_status.ok()) {
142
0
        LOG_WARNING("Failed to get client for {}, reason is {}",
143
0
                    PrintThriftNetworkAddress(fe_info.info.coordinator_address),
144
0
                    client_status.to_string());
145
0
        return Status::InternalError("Failed to get client for {}, reason is {}",
146
0
                                     PrintThriftNetworkAddress(fe_info.info.coordinator_address),
147
0
                                     client_status.to_string());
148
0
    }
149
150
    // do rpc
151
0
    try {
152
0
        try {
153
0
            rpc_client->fetchRunningQueries(rpc_result, rpc_request);
154
0
        } catch (const apache::thrift::transport::TTransportException& e) {
155
0
            LOG_WARNING("Transport exception reason: {}, reopening", e.what());
156
0
            client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms);
157
0
            if (!client_status.ok()) {
158
0
                LOG_WARNING("Reopen failed, reason: {}", client_status.to_string_no_stack());
159
0
                return Status::InternalError("Reopen failed, reason: {}",
160
0
                                             client_status.to_string_no_stack());
161
0
            }
162
163
0
            rpc_client->fetchRunningQueries(rpc_result, rpc_request);
164
0
        }
165
0
    } catch (apache::thrift::TException& e) {
166
        // During upgrading cluster or meet any other network error.
167
0
        LOG_WARNING("Failed to fetch running queries from {}, reason: {}",
168
0
                    PrintThriftNetworkAddress(fe_info.info.coordinator_address), e.what());
169
0
        return Status::InternalError("Failed to fetch running queries from {}, reason: {}",
170
0
                                     PrintThriftNetworkAddress(fe_info.info.coordinator_address),
171
0
                                     e.what());
172
0
    }
173
174
    // Avoid logic error in frontend.
175
0
    if (!rpc_result.__isset.status || rpc_result.status.status_code != TStatusCode::OK) {
176
0
        LOG_WARNING("Failed to fetch running queries from {}, reason: {}",
177
0
                    PrintThriftNetworkAddress(fe_info.info.coordinator_address),
178
0
                    doris::to_string(rpc_result.status.status_code));
179
0
        return Status::InternalError("Failed to fetch running queries from {}, reason: {}",
180
0
                                     PrintThriftNetworkAddress(fe_info.info.coordinator_address),
181
0
                                     doris::to_string(rpc_result.status.status_code));
182
0
    }
183
184
0
    if (!rpc_result.__isset.running_queries) {
185
0
        return Status::InternalError("Failed to fetch running queries from {}, reason: {}",
186
0
                                     PrintThriftNetworkAddress(fe_info.info.coordinator_address),
187
0
                                     "running_queries is not set");
188
0
    }
189
190
0
    query_set = std::unordered_set<TUniqueId>(rpc_result.running_queries.begin(),
191
0
                                              rpc_result.running_queries.end());
192
0
    return Status::OK();
193
0
};
194
195
0
static std::map<int64_t, std::unordered_set<TUniqueId>> _get_all_running_queries_from_fe() {
196
0
    const std::map<TNetworkAddress, FrontendInfo>& running_fes =
197
0
            ExecEnv::GetInstance()->get_running_frontends();
198
199
0
    std::map<int64_t, std::unordered_set<TUniqueId>> result;
200
0
    std::vector<FrontendInfo> qualified_fes;
201
202
0
    for (const auto& fe : running_fes) {
203
        // Only consider normal frontend.
204
0
        if (fe.first.port != 0 && fe.second.info.process_uuid != 0) {
205
0
            qualified_fes.push_back(fe.second);
206
0
        } else {
207
0
            return {};
208
0
        }
209
0
    }
210
211
0
    for (const auto& fe_addr : qualified_fes) {
212
0
        const int64_t process_uuid = fe_addr.info.process_uuid;
213
0
        std::unordered_set<TUniqueId> query_set;
214
0
        Status st = _do_fetch_running_queries_rpc(fe_addr, query_set);
215
0
        if (!st.ok()) {
216
            // Empty result, cancel worker will not do anything
217
0
            return {};
218
0
        }
219
220
        // frontend_info and process_uuid has been checked in rpc threads.
221
0
        result[process_uuid] = query_set;
222
0
    }
223
224
0
    return result;
225
0
}
226
227
1.85M
inline uint32_t get_map_id(const TUniqueId& query_id, size_t capacity) {
228
1.85M
    uint32_t value = HashUtil::hash(&query_id.lo, 8, 0);
229
1.85M
    value = HashUtil::hash(&query_id.hi, 8, value);
230
1.85M
    return value % capacity;
231
1.85M
}
232
233
1.29M
inline uint32_t get_map_id(std::pair<TUniqueId, int> key, size_t capacity) {
234
1.29M
    uint32_t value = HashUtil::hash(&key.first.lo, 8, 0);
235
1.29M
    value = HashUtil::hash(&key.first.hi, 8, value);
236
1.29M
    return value % capacity;
237
1.29M
}
238
239
template <typename Key, typename Value, typename ValueType>
240
51
ConcurrentContextMap<Key, Value, ValueType>::ConcurrentContextMap() {
241
51
    _internal_map.resize(config::num_query_ctx_map_partitions);
242
6.57k
    for (size_t i = 0; i < config::num_query_ctx_map_partitions; i++) {
243
6.52k
        _internal_map[i] = {std::make_unique<std::shared_mutex>(),
244
6.52k
                            phmap::flat_hash_map<Key, Value>()};
245
6.52k
    }
246
51
}
_ZN5doris20ConcurrentContextMapISt4pairINS_9TUniqueIdEiESt10shared_ptrINS_23PipelineFragmentContextEES5_EC2Ev
Line
Count
Source
240
17
ConcurrentContextMap<Key, Value, ValueType>::ConcurrentContextMap() {
241
17
    _internal_map.resize(config::num_query_ctx_map_partitions);
242
2.19k
    for (size_t i = 0; i < config::num_query_ctx_map_partitions; i++) {
243
2.17k
        _internal_map[i] = {std::make_unique<std::shared_mutex>(),
244
2.17k
                            phmap::flat_hash_map<Key, Value>()};
245
2.17k
    }
246
17
}
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt8weak_ptrINS_12QueryContextEES3_EC2Ev
Line
Count
Source
240
17
ConcurrentContextMap<Key, Value, ValueType>::ConcurrentContextMap() {
241
17
    _internal_map.resize(config::num_query_ctx_map_partitions);
242
2.19k
    for (size_t i = 0; i < config::num_query_ctx_map_partitions; i++) {
243
2.17k
        _internal_map[i] = {std::make_unique<std::shared_mutex>(),
244
2.17k
                            phmap::flat_hash_map<Key, Value>()};
245
2.17k
    }
246
17
}
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_12QueryContextEES3_EC2Ev
Line
Count
Source
240
17
ConcurrentContextMap<Key, Value, ValueType>::ConcurrentContextMap() {
241
17
    _internal_map.resize(config::num_query_ctx_map_partitions);
242
2.19k
    for (size_t i = 0; i < config::num_query_ctx_map_partitions; i++) {
243
2.17k
        _internal_map[i] = {std::make_unique<std::shared_mutex>(),
244
2.17k
                            phmap::flat_hash_map<Key, Value>()};
245
2.17k
    }
246
17
}
247
248
template <typename Key, typename Value, typename ValueType>
249
1.19M
Value ConcurrentContextMap<Key, Value, ValueType>::find(const Key& query_id) {
250
1.19M
    auto id = get_map_id(query_id, _internal_map.size());
251
1.19M
    {
252
1.19M
        std::shared_lock lock(*_internal_map[id].first);
253
1.19M
        auto& map = _internal_map[id].second;
254
1.19M
        auto search = map.find(query_id);
255
1.19M
        if (search != map.end()) {
256
435k
            return search->second;
257
435k
        }
258
761k
        return std::shared_ptr<ValueType>(nullptr);
259
1.19M
    }
260
1.19M
}
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt8weak_ptrINS_12QueryContextEES3_E4findERKS1_
Line
Count
Source
249
761k
Value ConcurrentContextMap<Key, Value, ValueType>::find(const Key& query_id) {
250
761k
    auto id = get_map_id(query_id, _internal_map.size());
251
761k
    {
252
761k
        std::shared_lock lock(*_internal_map[id].first);
253
761k
        auto& map = _internal_map[id].second;
254
761k
        auto search = map.find(query_id);
255
761k
        if (search != map.end()) {
256
428k
            return search->second;
257
428k
        }
258
333k
        return std::shared_ptr<ValueType>(nullptr);
259
761k
    }
260
761k
}
_ZN5doris20ConcurrentContextMapISt4pairINS_9TUniqueIdEiESt10shared_ptrINS_23PipelineFragmentContextEES5_E4findERKS3_
Line
Count
Source
249
434k
Value ConcurrentContextMap<Key, Value, ValueType>::find(const Key& query_id) {
250
434k
    auto id = get_map_id(query_id, _internal_map.size());
251
434k
    {
252
434k
        std::shared_lock lock(*_internal_map[id].first);
253
434k
        auto& map = _internal_map[id].second;
254
434k
        auto search = map.find(query_id);
255
434k
        if (search != map.end()) {
256
6.76k
            return search->second;
257
6.76k
        }
258
427k
        return std::shared_ptr<ValueType>(nullptr);
259
434k
    }
260
434k
}
261
262
template <typename Key, typename Value, typename ValueType>
263
Status ConcurrentContextMap<Key, Value, ValueType>::apply_if_not_exists(
264
286k
        const Key& query_id, std::shared_ptr<ValueType>& query_ctx, ApplyFunction&& function) {
265
286k
    auto id = get_map_id(query_id, _internal_map.size());
266
286k
    {
267
286k
        std::unique_lock lock(*_internal_map[id].first);
268
286k
        auto& map = _internal_map[id].second;
269
286k
        auto search = map.find(query_id);
270
286k
        if (search != map.end()) {
271
0
            query_ctx = search->second.lock();
272
0
        }
273
286k
        if (!query_ctx) {
274
286k
            return function(map);
275
286k
        }
276
172
        return Status::OK();
277
286k
    }
278
286k
}
279
280
template <typename Key, typename Value, typename ValueType>
281
1.24M
bool ConcurrentContextMap<Key, Value, ValueType>::erase(const Key& query_id) {
282
1.24M
    auto id = get_map_id(query_id, _internal_map.size());
283
1.24M
    std::unique_lock lock(*_internal_map[id].first);
284
1.24M
    auto& map = _internal_map[id].second;
285
1.24M
    return map.erase(query_id) != 0;
286
1.24M
}
_ZN5doris20ConcurrentContextMapISt4pairINS_9TUniqueIdEiESt10shared_ptrINS_23PipelineFragmentContextEES5_E5eraseERKS3_
Line
Count
Source
281
432k
bool ConcurrentContextMap<Key, Value, ValueType>::erase(const Key& query_id) {
282
432k
    auto id = get_map_id(query_id, _internal_map.size());
283
432k
    std::unique_lock lock(*_internal_map[id].first);
284
432k
    auto& map = _internal_map[id].second;
285
432k
    return map.erase(query_id) != 0;
286
432k
}
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_12QueryContextEES3_E5eraseERKS1_
Line
Count
Source
281
404k
bool ConcurrentContextMap<Key, Value, ValueType>::erase(const Key& query_id) {
282
404k
    auto id = get_map_id(query_id, _internal_map.size());
283
404k
    std::unique_lock lock(*_internal_map[id].first);
284
404k
    auto& map = _internal_map[id].second;
285
404k
    return map.erase(query_id) != 0;
286
404k
}
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt8weak_ptrINS_12QueryContextEES3_E5eraseERKS1_
Line
Count
Source
281
404k
bool ConcurrentContextMap<Key, Value, ValueType>::erase(const Key& query_id) {
282
404k
    auto id = get_map_id(query_id, _internal_map.size());
283
404k
    std::unique_lock lock(*_internal_map[id].first);
284
404k
    auto& map = _internal_map[id].second;
285
404k
    return map.erase(query_id) != 0;
286
404k
}
287
288
template <typename Key, typename Value, typename ValueType>
289
void ConcurrentContextMap<Key, Value, ValueType>::insert(const Key& query_id,
290
433k
                                                         std::shared_ptr<ValueType> query_ctx) {
291
433k
    auto id = get_map_id(query_id, _internal_map.size());
292
433k
    {
293
433k
        std::unique_lock lock(*_internal_map[id].first);
294
433k
        auto& map = _internal_map[id].second;
295
433k
        map.insert({query_id, query_ctx});
296
433k
    }
297
433k
}
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_12QueryContextEES3_E6insertERKS1_S4_
Line
Count
Source
290
2.66k
                                                         std::shared_ptr<ValueType> query_ctx) {
291
2.66k
    auto id = get_map_id(query_id, _internal_map.size());
292
2.66k
    {
293
2.66k
        std::unique_lock lock(*_internal_map[id].first);
294
2.66k
        auto& map = _internal_map[id].second;
295
2.66k
        map.insert({query_id, query_ctx});
296
2.66k
    }
297
2.66k
}
_ZN5doris20ConcurrentContextMapISt4pairINS_9TUniqueIdEiESt10shared_ptrINS_23PipelineFragmentContextEES5_E6insertERKS3_S6_
Line
Count
Source
290
431k
                                                         std::shared_ptr<ValueType> query_ctx) {
291
431k
    auto id = get_map_id(query_id, _internal_map.size());
292
431k
    {
293
431k
        std::unique_lock lock(*_internal_map[id].first);
294
431k
        auto& map = _internal_map[id].second;
295
431k
        map.insert({query_id, query_ctx});
296
431k
    }
297
431k
}
298
299
template <typename Key, typename Value, typename ValueType>
300
40
void ConcurrentContextMap<Key, Value, ValueType>::clear() {
301
    // Avoid self-deadlock from releasing the last QueryContext
302
    // in _query_ctx_map_delay_delete:
303
    // FragmentMgr::stop()
304
    //   -> _query_ctx_map_delay_delete.clear()
305
    //     -> unique_lock(query_id_lock)
306
    //     -> map.clear()
307
    //       -> QueryContext::~QueryContext()
308
    //         -> FragmentMgr::remove_query_context(query_id)
309
    //           -> _query_ctx_map_delay_delete.erase(query_id)
310
    //             -> unique_lock(query_id_lock) <- deadlock
311
5.12k
    for (auto& pair : _internal_map) {
312
5.12k
        phmap::flat_hash_map<Key, Value> map;
313
5.12k
        {
314
5.12k
            std::unique_lock lock(*pair.first);
315
5.12k
            map.swap(pair.second);
316
5.12k
        }
317
5.12k
        map.clear();
318
5.12k
    }
319
40
}
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt8weak_ptrINS_12QueryContextEES3_E5clearEv
Line
Count
Source
300
13
void ConcurrentContextMap<Key, Value, ValueType>::clear() {
301
    // Avoid self-deadlock from releasing the last QueryContext
302
    // in _query_ctx_map_delay_delete:
303
    // FragmentMgr::stop()
304
    //   -> _query_ctx_map_delay_delete.clear()
305
    //     -> unique_lock(query_id_lock)
306
    //     -> map.clear()
307
    //       -> QueryContext::~QueryContext()
308
    //         -> FragmentMgr::remove_query_context(query_id)
309
    //           -> _query_ctx_map_delay_delete.erase(query_id)
310
    //             -> unique_lock(query_id_lock) <- deadlock
311
1.66k
    for (auto& pair : _internal_map) {
312
1.66k
        phmap::flat_hash_map<Key, Value> map;
313
1.66k
        {
314
1.66k
            std::unique_lock lock(*pair.first);
315
1.66k
            map.swap(pair.second);
316
1.66k
        }
317
1.66k
        map.clear();
318
1.66k
    }
319
13
}
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_12QueryContextEES3_E5clearEv
Line
Count
Source
300
14
void ConcurrentContextMap<Key, Value, ValueType>::clear() {
301
    // Avoid self-deadlock from releasing the last QueryContext
302
    // in _query_ctx_map_delay_delete:
303
    // FragmentMgr::stop()
304
    //   -> _query_ctx_map_delay_delete.clear()
305
    //     -> unique_lock(query_id_lock)
306
    //     -> map.clear()
307
    //       -> QueryContext::~QueryContext()
308
    //         -> FragmentMgr::remove_query_context(query_id)
309
    //           -> _query_ctx_map_delay_delete.erase(query_id)
310
    //             -> unique_lock(query_id_lock) <- deadlock
311
1.79k
    for (auto& pair : _internal_map) {
312
1.79k
        phmap::flat_hash_map<Key, Value> map;
313
1.79k
        {
314
1.79k
            std::unique_lock lock(*pair.first);
315
1.79k
            map.swap(pair.second);
316
1.79k
        }
317
1.79k
        map.clear();
318
1.79k
    }
319
14
}
_ZN5doris20ConcurrentContextMapISt4pairINS_9TUniqueIdEiESt10shared_ptrINS_23PipelineFragmentContextEES5_E5clearEv
Line
Count
Source
300
13
void ConcurrentContextMap<Key, Value, ValueType>::clear() {
301
    // Avoid self-deadlock from releasing the last QueryContext
302
    // in _query_ctx_map_delay_delete:
303
    // FragmentMgr::stop()
304
    //   -> _query_ctx_map_delay_delete.clear()
305
    //     -> unique_lock(query_id_lock)
306
    //     -> map.clear()
307
    //       -> QueryContext::~QueryContext()
308
    //         -> FragmentMgr::remove_query_context(query_id)
309
    //           -> _query_ctx_map_delay_delete.erase(query_id)
310
    //             -> unique_lock(query_id_lock) <- deadlock
311
1.66k
    for (auto& pair : _internal_map) {
312
1.66k
        phmap::flat_hash_map<Key, Value> map;
313
1.66k
        {
314
1.66k
            std::unique_lock lock(*pair.first);
315
1.66k
            map.swap(pair.second);
316
1.66k
        }
317
1.66k
        map.clear();
318
1.66k
    }
319
13
}
320
321
FragmentMgr::FragmentMgr(ExecEnv* exec_env)
322
17
        : _exec_env(exec_env), _stop_background_threads_latch(1) {
323
17
    _entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr");
324
17
    INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count);
325
326
17
    auto s = Thread::create(
327
17
            "FragmentMgr", "cancel_timeout_plan_fragment", [this]() { this->cancel_worker(); },
328
17
            &_cancel_thread);
329
17
    CHECK(s.ok()) << s.to_string();
330
331
17
    s = ThreadPoolBuilder("FragmentMgrAsyncWorkThreadPool")
332
17
                .set_min_threads(config::fragment_mgr_async_work_pool_thread_num_min)
333
17
                .set_max_threads(config::fragment_mgr_async_work_pool_thread_num_max)
334
17
                .set_max_queue_size(config::fragment_mgr_async_work_pool_queue_size)
335
17
                .build(&_thread_pool);
336
17
    CHECK(s.ok()) << s.to_string();
337
17
}
338
339
13
FragmentMgr::~FragmentMgr() = default;
340
341
13
void FragmentMgr::stop() {
342
13
    DEREGISTER_HOOK_METRIC(fragment_instance_count);
343
13
    _stop_background_threads_latch.count_down();
344
13
    if (_cancel_thread) {
345
13
        _cancel_thread->join();
346
13
    }
347
348
13
    _thread_pool->shutdown();
349
    // Only me can delete
350
13
    _query_ctx_map.clear();
351
    // in one BE's graceful shutdown, cancel_worker will get related running queries via _get_all_running_queries_from_fe and cancel them.
352
    // so clearing here will not make RF consumer hang. if we dont do this, in ~FragmentMgr() there may be QueryContext in _query_ctx_map_delay_delete
353
    // destructred and remove it from _query_ctx_map_delay_delete which is destructring. it's UB.
354
13
    _query_ctx_map_delay_delete.clear();
355
13
    _pipeline_map.clear();
356
13
    {
357
13
        std::lock_guard<std::mutex> lk(_rerunnable_params_lock);
358
13
        _rerunnable_params_map.clear();
359
13
    }
360
13
}
361
362
1.18M
static void empty_function(RuntimeState*, Status*) {}
363
364
Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
365
                                       const QuerySource query_source,
366
427k
                                       const TPipelineFragmentParamsList& parent) {
367
427k
    if (params.txn_conf.need_txn) {
368
48
        std::shared_ptr<StreamLoadContext> stream_load_ctx =
369
48
                std::make_shared<StreamLoadContext>(_exec_env);
370
48
        stream_load_ctx->db = params.txn_conf.db;
371
48
        stream_load_ctx->db_id = params.txn_conf.db_id;
372
48
        stream_load_ctx->table = params.txn_conf.tbl;
373
48
        stream_load_ctx->txn_id = params.txn_conf.txn_id;
374
48
        stream_load_ctx->id = UniqueId(params.query_id);
375
48
        stream_load_ctx->put_result.__set_pipeline_params(params);
376
48
        stream_load_ctx->use_streaming = true;
377
48
        stream_load_ctx->load_type = TLoadType::MANUL_LOAD;
378
48
        stream_load_ctx->load_src_type = TLoadSourceType::RAW;
379
48
        stream_load_ctx->label = params.import_label;
380
48
        stream_load_ctx->format = TFileFormatType::FORMAT_CSV_PLAIN;
381
48
        stream_load_ctx->timeout_second = 3600;
382
48
        stream_load_ctx->auth.token = params.txn_conf.token;
383
48
        stream_load_ctx->need_commit_self = true;
384
48
        stream_load_ctx->need_rollback = true;
385
48
        auto pipe = std::make_shared<io::StreamLoadPipe>(
386
48
                io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
387
48
                -1 /* total_length */, true /* use_proto */);
388
48
        stream_load_ctx->body_sink = pipe;
389
48
        stream_load_ctx->pipe = pipe;
390
48
        stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio;
391
392
48
        RETURN_IF_ERROR(
393
48
                _exec_env->new_load_stream_mgr()->put(stream_load_ctx->id, stream_load_ctx));
394
395
48
        RETURN_IF_ERROR(
396
48
                _exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx, parent));
397
48
        return Status::OK();
398
427k
    } else {
399
427k
        return exec_plan_fragment(params, query_source, empty_function, parent);
400
427k
    }
401
427k
}
402
403
// Stage 2. prepare finished. then get FE instruction to execute
404
117k
Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* request) {
405
117k
    TUniqueId query_id;
406
117k
    query_id.__set_hi(request->query_id().hi());
407
117k
    query_id.__set_lo(request->query_id().lo());
408
117k
    auto q_ctx = get_query_ctx(query_id);
409
117k
    if (q_ctx) {
410
117k
        q_ctx->set_ready_to_execute(Status::OK());
411
117k
        LOG_INFO("Query {} start execution", print_id(query_id));
412
117k
    } else {
413
0
        return Status::InternalError(
414
0
                "Failed to get query fragments context. Query {} may be "
415
0
                "timeout or be cancelled. host: {}",
416
0
                print_id(query_id), BackendOptions::get_localhost());
417
0
    }
418
117k
    return Status::OK();
419
117k
}
420
421
432k
void FragmentMgr::remove_pipeline_context(std::pair<TUniqueId, int> key) {
422
432k
    if (_pipeline_map.erase(key)) {
423
432k
        int64_t now = duration_cast<std::chrono::milliseconds>(
424
432k
                              std::chrono::system_clock::now().time_since_epoch())
425
432k
                              .count();
426
432k
        g_fragment_executing_count << -1;
427
432k
        g_fragment_last_active_time.set_value(now);
428
432k
    }
429
432k
}
430
431
403k
void FragmentMgr::remove_query_context(const TUniqueId& key) {
432
    // Clean up any saved rerunnable params for this query to avoid memory leaks.
433
    // This covers both cancel and normal destruction paths.
434
403k
    {
435
403k
        std::lock_guard<std::mutex> lk(_rerunnable_params_lock);
436
422k
        for (auto it = _rerunnable_params_map.begin(); it != _rerunnable_params_map.end();) {
437
18.6k
            if (it->first.first == key) {
438
209
                it = _rerunnable_params_map.erase(it);
439
18.4k
            } else {
440
18.4k
                ++it;
441
18.4k
            }
442
18.6k
        }
443
403k
    }
444
403k
    _query_ctx_map_delay_delete.erase(key);
445
403k
#ifndef BE_TEST
446
403k
    _query_ctx_map.erase(key);
447
403k
#endif
448
403k
}
449
450
761k
std::shared_ptr<QueryContext> FragmentMgr::get_query_ctx(const TUniqueId& query_id) {
451
761k
    auto val = _query_ctx_map.find(query_id);
452
761k
    if (auto q_ctx = val.lock()) {
453
424k
        return q_ctx;
454
424k
    }
455
337k
    return nullptr;
456
761k
}
457
458
Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& params,
459
                                             const TPipelineFragmentParamsList& parent,
460
                                             QuerySource query_source,
461
429k
                                             std::shared_ptr<QueryContext>& query_ctx) {
462
429k
    auto query_id = params.query_id;
463
429k
    DBUG_EXECUTE_IF("FragmentMgr._get_query_ctx.failed", {
464
429k
        return Status::InternalError("FragmentMgr._get_query_ctx.failed, query id {}",
465
429k
                                     print_id(query_id));
466
429k
    });
467
468
    // Find _query_ctx_map, in case some other request has already
469
    // create the query fragments context.
470
429k
    query_ctx = get_query_ctx(query_id);
471
429k
    if (params.is_simplified_param) {
472
        // Get common components from _query_ctx_map
473
143k
        if (!query_ctx) {
474
0
            return Status::InternalError(
475
0
                    "Failed to get query fragments context. Query {} may be timeout or be "
476
0
                    "cancelled. host: {}",
477
0
                    print_id(query_id), BackendOptions::get_localhost());
478
0
        }
479
286k
    } else {
480
286k
        if (!query_ctx) {
481
286k
            RETURN_IF_ERROR(_query_ctx_map.apply_if_not_exists(
482
286k
                    query_id, query_ctx,
483
286k
                    [&](phmap::flat_hash_map<TUniqueId, std::weak_ptr<QueryContext>>& map)
484
286k
                            -> Status {
485
286k
                        WorkloadGroupPtr workload_group_ptr = nullptr;
486
286k
                        std::vector<uint64_t> wg_id_set;
487
286k
                        if (params.__isset.workload_groups && !params.workload_groups.empty()) {
488
286k
                            for (auto& wg : params.workload_groups) {
489
286k
                                wg_id_set.push_back(wg.id);
490
286k
                            }
491
286k
                        }
492
286k
                        workload_group_ptr = _exec_env->workload_group_mgr()->get_group(wg_id_set);
493
494
                        // First time a fragment of a query arrived. print logs.
495
286k
                        LOG(INFO) << "query_id: " << print_id(query_id)
496
286k
                                  << ", coord_addr: " << params.coord
497
286k
                                  << ", total fragment num on current host: "
498
286k
                                  << params.fragment_num_on_host
499
286k
                                  << ", fe process uuid: " << params.query_options.fe_process_uuid
500
286k
                                  << ", query type: " << params.query_options.query_type
501
286k
                                  << ", report audit fe:" << params.current_connect_fe
502
286k
                                  << ", use wg:" << workload_group_ptr->id() << ","
503
286k
                                  << workload_group_ptr->name();
504
505
                        // This may be a first fragment request of the query.
506
                        // Create the query fragments context.
507
                        // Cross-cluster query: coordinator FE may not belong to local cluster.
508
                        // In that case, cancel_worker() should not cancel it based on local FE liveness.
509
286k
                        QuerySource actual_query_source = query_source;
510
286k
                        if (query_source == QuerySource::INTERNAL_FRONTEND &&
511
286k
                            !_exec_env->get_running_frontends().contains(params.coord)) {
512
286k
                            actual_query_source = QuerySource::EXTERNAL_FRONTEND;
513
286k
                        }
514
286k
                        query_ctx = QueryContext::create(
515
286k
                                query_id, _exec_env, params.query_options, params.coord,
516
286k
                                params.is_nereids, params.current_connect_fe, actual_query_source);
517
286k
                        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker());
518
286k
                        RETURN_IF_ERROR(DescriptorTbl::create(
519
286k
                                &(query_ctx->obj_pool), params.desc_tbl, &(query_ctx->desc_tbl)));
520
                        // set file scan range params
521
286k
                        if (params.__isset.file_scan_params) {
522
286k
                            query_ctx->file_scan_range_params_map = params.file_scan_params;
523
286k
                        }
524
525
286k
                        query_ctx->query_globals = params.query_globals;
526
527
286k
                        if (params.__isset.resource_info) {
528
286k
                            query_ctx->user = params.resource_info.user;
529
286k
                            query_ctx->group = params.resource_info.group;
530
286k
                            query_ctx->set_rsc_info = true;
531
286k
                        }
532
533
286k
                        if (params.__isset.ai_resources) {
534
286k
                            query_ctx->set_ai_resources(params.ai_resources);
535
286k
                        }
536
537
286k
                        RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr));
538
539
286k
                        if (parent.__isset.runtime_filter_info) {
540
286k
                            auto info = parent.runtime_filter_info;
541
286k
                            if (info.__isset.runtime_filter_params) {
542
286k
                                auto handler =
543
286k
                                        std::make_shared<RuntimeFilterMergeControllerEntity>();
544
286k
                                RETURN_IF_ERROR(
545
286k
                                        handler->init(query_ctx, info.runtime_filter_params));
546
286k
                                query_ctx->set_merge_controller_handler(handler);
547
548
286k
                                query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
549
286k
                                        info.runtime_filter_params);
550
286k
                                if (!handler->empty()) {
551
286k
                                    _query_ctx_map_delay_delete.insert(query_id, query_ctx);
552
286k
                                }
553
286k
                            }
554
286k
                            if (info.__isset.topn_filter_descs) {
555
286k
                                query_ctx->init_runtime_predicates(info.topn_filter_descs);
556
286k
                            }
557
286k
                        }
558
559
                        // There is some logic in query ctx's dctor, we could not check if exists and delete the
560
                        // temp query ctx now. For example, the query id maybe removed from workload group's queryset.
561
286k
                        map.insert({query_id, query_ctx});
562
286k
                        return Status::OK();
563
286k
                    }));
564
286k
        }
565
286k
    }
566
429k
    return Status::OK();
567
429k
}
568
569
151
std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) {
570
151
    fmt::memory_buffer debug_string_buffer;
571
151
    size_t i = 0;
572
151
    {
573
151
        fmt::format_to(debug_string_buffer,
574
151
                       "{} pipeline fragment contexts are still running! duration_limit={}\n",
575
151
                       _pipeline_map.num_items(), duration);
576
151
        timespec now;
577
151
        clock_gettime(CLOCK_MONOTONIC, &now);
578
579
151
        _pipeline_map.apply([&](phmap::flat_hash_map<std::pair<TUniqueId, int>,
580
151
                                                     std::shared_ptr<PipelineFragmentContext>>& map)
581
19.3k
                                    -> Status {
582
19.3k
            std::set<TUniqueId> query_id_set;
583
19.3k
            for (auto& it : map) {
584
2.00k
                auto elapsed = it.second->elapsed_time() / 1000000000;
585
2.00k
                if (elapsed < duration) {
586
                    // Only display tasks which has been running for more than {duration} seconds.
587
1.75k
                    continue;
588
1.75k
                }
589
248
                if (!query_id_set.contains(it.first.first)) {
590
205
                    query_id_set.insert(it.first.first);
591
205
                    fmt::format_to(
592
205
                            debug_string_buffer, "QueryId: {}, global_runtime_filter_mgr: {}\n",
593
205
                            print_id(it.first.first),
594
205
                            it.second->get_query_ctx()->runtime_filter_mgr()->debug_string());
595
596
205
                    if (it.second->get_query_ctx()->get_merge_controller_handler()) {
597
203
                        fmt::format_to(debug_string_buffer, "{}\n",
598
203
                                       it.second->get_query_ctx()
599
203
                                               ->get_merge_controller_handler()
600
203
                                               ->debug_string());
601
203
                    }
602
205
                }
603
604
248
                auto timeout_second = it.second->timeout_second();
605
248
                fmt::format_to(
606
248
                        debug_string_buffer,
607
248
                        "No.{} (elapse_second={}s, query_timeout_second={}s, is_timeout={}): {}\n",
608
248
                        i, elapsed, timeout_second, it.second->is_timeout(now),
609
248
                        it.second->debug_string());
610
248
                i++;
611
248
            }
612
19.3k
            return Status::OK();
613
19.3k
        });
614
151
    }
615
151
    return fmt::to_string(debug_string_buffer);
616
151
}
617
618
0
std::string FragmentMgr::dump_pipeline_tasks(TUniqueId& query_id) {
619
0
    if (auto q_ctx = get_query_ctx(query_id)) {
620
0
        return q_ctx->print_all_pipeline_context();
621
0
    } else {
622
0
        return fmt::format(
623
0
                "Dump pipeline tasks failed: Query context (query id = {}) not found. \n",
624
0
                print_id(query_id));
625
0
    }
626
0
}
627
628
Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
629
                                       QuerySource query_source, const FinishCallback& cb,
630
                                       const TPipelineFragmentParamsList& parent,
631
430k
                                       std::shared_ptr<bool> is_prepare_success) {
632
430k
    VLOG_ROW << "Query: " << print_id(params.query_id) << " exec_plan_fragment params is "
633
150
             << apache::thrift::ThriftDebugString(params).c_str();
634
    // sometimes TPipelineFragmentParams debug string is too long and glog
635
    // will truncate the log line, so print query options seperately for debuggin purpose
636
430k
    VLOG_ROW << "Query: " << print_id(params.query_id) << "query options is "
637
200
             << apache::thrift::ThriftDebugString(params.query_options).c_str();
638
639
430k
    std::shared_ptr<QueryContext> query_ctx;
640
430k
    RETURN_IF_ERROR(_get_or_create_query_ctx(params, parent, query_source, query_ctx));
641
430k
    SCOPED_ATTACH_TASK(query_ctx.get()->resource_ctx());
642
    // Set single_backend_query before prepare() so that pipeline local states
643
    // (e.g. StreamingAggLocalState) can read the correct value in their constructors.
644
430k
    query_ctx->set_single_backend_query(params.__isset.query_options &&
645
430k
                                        params.query_options.__isset.single_backend_query &&
646
430k
                                        params.query_options.single_backend_query);
647
430k
    int64_t duration_ns = 0;
648
430k
    std::shared_ptr<PipelineFragmentContext> context = std::make_shared<PipelineFragmentContext>(
649
430k
            query_ctx->query_id(), params, query_ctx, _exec_env, cb);
650
430k
    {
651
430k
        SCOPED_RAW_TIMER(&duration_ns);
652
430k
        Status prepare_st = Status::OK();
653
430k
        ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st = context->prepare(_thread_pool.get()),
654
430k
                                         prepare_st);
655
429k
        DBUG_EXECUTE_IF("FragmentMgr.exec_plan_fragment.prepare_failed", {
656
429k
            prepare_st = Status::Aborted("FragmentMgr.exec_plan_fragment.prepare_failed");
657
429k
        });
658
429k
        if (!prepare_st.ok()) {
659
1.20k
            query_ctx->cancel(prepare_st, params.fragment_id);
660
1.20k
            return prepare_st;
661
1.20k
        }
662
429k
    }
663
428k
    g_fragmentmgr_prepare_latency << (duration_ns / 1000);
664
665
428k
    DBUG_EXECUTE_IF("FragmentMgr.exec_plan_fragment.failed",
666
428k
                    { return Status::Aborted("FragmentMgr.exec_plan_fragment.failed"); });
667
428k
    {
668
428k
        int64_t now = duration_cast<std::chrono::milliseconds>(
669
428k
                              std::chrono::system_clock::now().time_since_epoch())
670
428k
                              .count();
671
428k
        g_fragment_executing_count << 1;
672
428k
        g_fragment_last_active_time.set_value(now);
673
674
        // (query_id, fragment_id) is executed only on one BE, locks _pipeline_map.
675
428k
        auto res = _pipeline_map.find({params.query_id, params.fragment_id});
676
428k
        if (res != nullptr) {
677
0
            return Status::InternalError(
678
0
                    "exec_plan_fragment query_id({}) input duplicated fragment_id({})",
679
0
                    print_id(params.query_id), params.fragment_id);
680
0
        }
681
428k
        _pipeline_map.insert({params.query_id, params.fragment_id}, context);
682
428k
    }
683
684
    // Save params for recursive CTE child fragments so we can recreate the PFC later.
685
    // For recursive CTE, the child fragment needs to be destroyed and rebuilt between rounds,
686
    // so we save the original params here and use them in rerun_fragment(rebuild).
687
428k
    if (params.__isset.need_notify_close && params.need_notify_close) {
688
209
        std::lock_guard<std::mutex> lk(_rerunnable_params_lock);
689
209
        _rerunnable_params_map[{params.query_id, params.fragment_id}] = {
690
209
                .deregister_runtime_filter_ids = {},
691
209
                .params = params,
692
209
                .parent = parent,
693
209
                .finish_callback = cb,
694
209
                .query_ctx = query_ctx};
695
209
    }
696
697
428k
    if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) {
698
168k
        query_ctx->set_ready_to_execute_only();
699
168k
    }
700
701
428k
    query_ctx->set_pipeline_context(params.fragment_id, context);
702
703
428k
    RETURN_IF_ERROR(context->submit());
704
428k
    if (is_prepare_success != nullptr) {
705
2.39k
        *is_prepare_success = true;
706
2.39k
    }
707
428k
    return Status::OK();
708
428k
}
709
710
168k
void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) {
711
168k
    std::shared_ptr<QueryContext> query_ctx = nullptr;
712
168k
    {
713
168k
        if (auto q_ctx = get_query_ctx(query_id)) {
714
116k
            query_ctx = q_ctx;
715
116k
        } else {
716
51.1k
            LOG(WARNING) << "Query " << print_id(query_id)
717
51.1k
                         << " does not exists, failed to cancel it";
718
51.1k
            return;
719
51.1k
        }
720
168k
    }
721
116k
    SCOPED_ATTACH_TASK(query_ctx->resource_ctx());
722
116k
    query_ctx->cancel(reason);
723
116k
    remove_query_context(query_id);
724
    // Clean up id_file_map in IdManager if exists
725
116k
    if (ExecEnv::GetInstance()->get_id_manager()->get_id_file_map(query_id)) {
726
868
        ExecEnv::GetInstance()->get_id_manager()->remove_id_file_map(query_id);
727
868
    }
728
116k
    LOG(INFO) << "Query " << print_id(query_id)
729
116k
              << " is cancelled and removed. Reason: " << reason.to_string();
730
116k
}
731
732
17
void FragmentMgr::cancel_worker() {
733
17
    LOG(INFO) << "FragmentMgr cancel worker start working.";
734
735
17
    timespec check_invalid_query_last_timestamp;
736
17
    clock_gettime(CLOCK_MONOTONIC, &check_invalid_query_last_timestamp);
737
738
12.6k
    do {
739
12.6k
        std::vector<TUniqueId> queries_lost_coordinator;
740
12.6k
        std::vector<TUniqueId> queries_timeout;
741
12.6k
        std::vector<TUniqueId> queries_pipeline_task_leak;
742
        // Fe process uuid -> set<QueryId>
743
12.6k
        std::map<int64_t, std::unordered_set<TUniqueId>> running_queries_on_all_fes;
744
12.6k
        const std::map<TNetworkAddress, FrontendInfo>& running_fes =
745
12.6k
                ExecEnv::GetInstance()->get_running_frontends();
746
747
12.6k
        timespec now;
748
12.6k
        clock_gettime(CLOCK_MONOTONIC, &now);
749
750
12.6k
        if (config::enable_pipeline_task_leakage_detect &&
751
12.6k
            now.tv_sec - check_invalid_query_last_timestamp.tv_sec >
752
0
                    config::pipeline_task_leakage_detect_period_secs) {
753
0
            check_invalid_query_last_timestamp = now;
754
0
            running_queries_on_all_fes = _get_all_running_queries_from_fe();
755
12.6k
        } else {
756
12.6k
            running_queries_on_all_fes.clear();
757
12.6k
        }
758
759
12.6k
        std::vector<std::shared_ptr<PipelineFragmentContext>> ctx;
760
12.6k
        _pipeline_map.apply(
761
12.6k
                [&](phmap::flat_hash_map<std::pair<TUniqueId, int>,
762
1.62M
                                         std::shared_ptr<PipelineFragmentContext>>& map) -> Status {
763
1.62M
                    ctx.reserve(ctx.size() + map.size());
764
1.62M
                    for (auto& pipeline_itr : map) {
765
117k
                        ctx.push_back(pipeline_itr.second);
766
117k
                    }
767
1.62M
                    return Status::OK();
768
1.62M
                });
769
117k
        for (auto& c : ctx) {
770
117k
            c->clear_finished_tasks();
771
117k
        }
772
773
12.6k
        std::unordered_map<std::shared_ptr<PBackendService_Stub>, BrpcItem> brpc_stub_with_queries;
774
12.6k
        _collect_timeout_queries_and_brpc_items(queries_timeout, brpc_stub_with_queries, now);
775
776
        // We use a very conservative cancel strategy.
777
        // 0. If there are no running frontends, do not cancel any queries.
778
        // 1. If query's process uuid is zero, do not cancel
779
        // 2. If same process uuid, do not cancel
780
        // 3. If fe has zero process uuid, do not cancel
781
12.6k
        if (running_fes.empty() && _query_ctx_map.num_items() != 0) {
782
0
            LOG_EVERY_N(WARNING, 10)
783
0
                    << "Could not find any running frontends, maybe we are upgrading or "
784
0
                       "starting? "
785
0
                    << "We will not cancel any outdated queries in this situation.";
786
12.6k
        } else {
787
12.6k
            _collect_invalid_queries(queries_lost_coordinator, queries_pipeline_task_leak,
788
12.6k
                                     running_queries_on_all_fes, running_fes,
789
12.6k
                                     check_invalid_query_last_timestamp);
790
12.6k
        }
791
792
12.6k
        if (config::enable_brpc_connection_check) {
793
5.46k
            for (auto it : brpc_stub_with_queries) {
794
3.39k
                if (!it.first) {
795
0
                    LOG(WARNING) << "brpc stub is nullptr, skip it.";
796
0
                    continue;
797
0
                }
798
3.39k
                _check_brpc_available(it.first, it.second);
799
3.39k
            }
800
5.46k
        }
801
802
12.6k
        if (!queries_lost_coordinator.empty()) {
803
5
            LOG(INFO) << "There are " << queries_lost_coordinator.size()
804
5
                      << " queries need to be cancelled, coordinator dead or restarted.";
805
5
        }
806
807
12.6k
        for (const auto& qid : queries_timeout) {
808
1
            cancel_query(qid,
809
1
                         Status::Error<ErrorCode::TIMEOUT>(
810
1
                                 "FragmentMgr cancel worker going to cancel timeout instance "));
811
1
        }
812
813
12.6k
        for (const auto& qid : queries_pipeline_task_leak) {
814
            // Cancel the query, and maybe try to report debug info to fe so that we can
815
            // collect debug info by sql or http api instead of search log.
816
0
            cancel_query(qid, Status::Error<ErrorCode::ILLEGAL_STATE>(
817
0
                                      "Potential pipeline task leakage"));
818
0
        }
819
820
12.6k
        for (const auto& qid : queries_lost_coordinator) {
821
5
            cancel_query(qid, Status::Error<ErrorCode::CANCELLED>(
822
5
                                      "Source frontend is not running or restarted"));
823
5
        }
824
825
12.6k
    } while (!_stop_background_threads_latch.wait_for(
826
12.6k
            std::chrono::seconds(config::fragment_mgr_cancel_worker_interval_seconds)));
827
17
    LOG(INFO) << "FragmentMgr cancel worker is going to exit.";
828
17
}
829
830
void FragmentMgr::_collect_timeout_queries_and_brpc_items(
831
        std::vector<TUniqueId>& queries_timeout,
832
        std::unordered_map<std::shared_ptr<PBackendService_Stub>, BrpcItem>& brpc_stub_with_queries,
833
12.6k
        timespec now) {
834
12.6k
    std::vector<std::shared_ptr<QueryContext>> contexts;
835
12.6k
    _query_ctx_map.apply(
836
1.62M
            [&](phmap::flat_hash_map<TUniqueId, std::weak_ptr<QueryContext>>& map) -> Status {
837
1.69M
                for (auto it = map.begin(); it != map.end();) {
838
77.4k
                    if (auto q_ctx = it->second.lock()) {
839
77.4k
                        contexts.push_back(q_ctx);
840
77.4k
                        if (q_ctx->is_timeout(now)) {
841
1
                            LOG_WARNING("Query {} is timeout", print_id(it->first));
842
1
                            queries_timeout.push_back(it->first);
843
77.4k
                        } else if (config::enable_brpc_connection_check) {
844
67.4k
                            auto brpc_stubs = q_ctx->get_using_brpc_stubs();
845
67.4k
                            for (auto& item : brpc_stubs) {
846
12.2k
                                if (!brpc_stub_with_queries.contains(item.second)) {
847
3.39k
                                    brpc_stub_with_queries.emplace(item.second,
848
3.39k
                                                                   BrpcItem {item.first, {q_ctx}});
849
8.88k
                                } else {
850
8.88k
                                    brpc_stub_with_queries[item.second].queries.emplace_back(q_ctx);
851
8.88k
                                }
852
12.2k
                            }
853
67.4k
                        }
854
77.4k
                        ++it;
855
77.4k
                    } else {
856
42
                        it = map.erase(it);
857
42
                    }
858
77.4k
                }
859
1.62M
                return Status::OK();
860
1.62M
            });
861
12.6k
}
862
863
void FragmentMgr::_collect_invalid_queries(
864
        std::vector<TUniqueId>& queries_lost_coordinator,
865
        std::vector<TUniqueId>& queries_pipeline_task_leak,
866
        const std::map<int64_t, std::unordered_set<TUniqueId>>& running_queries_on_all_fes,
867
        const std::map<TNetworkAddress, FrontendInfo>& running_fes,
868
12.6k
        timespec check_invalid_query_last_timestamp) {
869
12.6k
    std::vector<std::shared_ptr<QueryContext>> q_contexts;
870
12.6k
    _query_ctx_map.apply([&](phmap::flat_hash_map<TUniqueId, std::weak_ptr<QueryContext>>& map)
871
1.62M
                                 -> Status {
872
1.62M
        for (const auto& it : map) {
873
77.4k
            if (auto q_ctx = it.second.lock()) {
874
                // Cross-cluster query: coordinator FE is not in local `running_fes`,
875
                // we should not cancel it based on local coordinator liveness.
876
77.4k
                if (q_ctx->get_query_source() == QuerySource::EXTERNAL_FRONTEND) {
877
1
                    continue;
878
1
                }
879
77.4k
                q_contexts.push_back(q_ctx);
880
77.4k
                const int64_t fe_process_uuid = q_ctx->get_fe_process_uuid();
881
882
77.4k
                if (fe_process_uuid == 0) {
883
                    // zero means this query is from a older version fe or
884
                    // this fe is starting
885
643
                    continue;
886
643
                }
887
888
                // If the query is not running on the any frontends, cancel it.
889
76.7k
                if (auto itr = running_queries_on_all_fes.find(fe_process_uuid);
890
76.7k
                    itr != running_queries_on_all_fes.end()) {
891
                    // Query not found on this frontend, and the query arrives before the last check
892
0
                    if (itr->second.find(it.first) == itr->second.end() &&
893
                        // tv_nsec represents the number of nanoseconds that have elapsed since the time point stored in tv_sec.
894
                        // tv_sec is enough, we do not need to check tv_nsec.
895
0
                        q_ctx->get_query_arrival_timestamp().tv_sec <
896
0
                                check_invalid_query_last_timestamp.tv_sec &&
897
0
                        q_ctx->get_query_source() == QuerySource::INTERNAL_FRONTEND) {
898
0
                        queries_pipeline_task_leak.push_back(q_ctx->query_id());
899
0
                        LOG_INFO(
900
0
                                "Query {}, type {} is not found on any frontends, "
901
0
                                "maybe it "
902
0
                                "is leaked.",
903
0
                                print_id(q_ctx->query_id()), toString(q_ctx->get_query_source()));
904
0
                        continue;
905
0
                    }
906
0
                }
907
908
76.7k
                auto itr = running_fes.find(q_ctx->coord_addr);
909
76.7k
                if (itr != running_fes.end()) {
910
76.7k
                    if (fe_process_uuid == itr->second.info.process_uuid ||
911
76.7k
                        itr->second.info.process_uuid == 0) {
912
76.7k
                        continue;
913
76.7k
                    } else {
914
0
                        LOG_WARNING(
915
0
                                "Coordinator of query {} restarted, going to cancel "
916
0
                                "it.",
917
0
                                print_id(q_ctx->query_id()));
918
0
                    }
919
76.7k
                } else {
920
                    // In some rear cases, the rpc port of follower is not updated in time,
921
                    // then the port of this follower will be zero, but acutally it is still running,
922
                    // and be has already received the query from follower.
923
                    // So we need to check if host is in running_fes.
924
0
                    bool fe_host_is_standing = std::any_of(
925
0
                            running_fes.begin(), running_fes.end(), [&q_ctx](const auto& fe) {
926
0
                                return fe.first.hostname == q_ctx->coord_addr.hostname &&
927
0
                                       fe.first.port == 0;
928
0
                            });
929
0
                    if (fe_host_is_standing) {
930
0
                        LOG_WARNING(
931
0
                                "Coordinator {}:{} is not found, but its host is still "
932
0
                                "running with an unstable brpc port, not going to "
933
0
                                "cancel "
934
0
                                "it.",
935
0
                                q_ctx->coord_addr.hostname, q_ctx->coord_addr.port,
936
0
                                print_id(q_ctx->query_id()));
937
0
                        continue;
938
0
                    } else {
939
0
                        LOG_WARNING(
940
0
                                "Could not find target coordinator {}:{} of query {}, "
941
0
                                "going to "
942
0
                                "cancel it.",
943
0
                                q_ctx->coord_addr.hostname, q_ctx->coord_addr.port,
944
0
                                print_id(q_ctx->query_id()));
945
0
                    }
946
0
                }
947
76.7k
            }
948
            // Coordinator of this query has already dead or query context has been released.
949
5
            queries_lost_coordinator.push_back(it.first);
950
5
        }
951
1.62M
        return Status::OK();
952
1.62M
    });
953
12.6k
}
954
955
void FragmentMgr::_check_brpc_available(const std::shared_ptr<PBackendService_Stub>& brpc_stub,
956
3.39k
                                        const BrpcItem& brpc_item) {
957
3.39k
    const std::string message = "hello doris!";
958
3.39k
    std::string error_message;
959
3.39k
    int32_t failed_count = 0;
960
3.39k
    const int64_t check_timeout_ms =
961
3.39k
            std::max<int64_t>(100, config::brpc_connection_check_timeout_ms);
962
963
3.39k
    while (true) {
964
3.39k
        PHandShakeRequest request;
965
3.39k
        request.set_hello(message);
966
3.39k
        PHandShakeResponse response;
967
3.39k
        brpc::Controller cntl;
968
3.39k
        cntl.set_timeout_ms(check_timeout_ms);
969
3.39k
        cntl.set_max_retry(10);
970
3.39k
        brpc_stub->hand_shake(&cntl, &request, &response, nullptr);
971
972
3.39k
        if (cntl.Failed()) {
973
0
            error_message = cntl.ErrorText();
974
0
            LOG(WARNING) << "brpc stub: " << brpc_item.network_address.hostname << ":"
975
0
                         << brpc_item.network_address.port << " check failed: " << error_message;
976
3.39k
        } else if (response.has_status() && response.status().status_code() == 0) {
977
3.39k
            break;
978
3.39k
        } else {
979
0
            error_message = response.DebugString();
980
0
            LOG(WARNING) << "brpc stub: " << brpc_item.network_address.hostname << ":"
981
0
                         << brpc_item.network_address.port << " check failed: " << error_message;
982
0
        }
983
0
        failed_count++;
984
0
        if (failed_count == 2) {
985
0
            for (const auto& query_wptr : brpc_item.queries) {
986
0
                auto query = query_wptr.lock();
987
0
                if (query && !query->is_cancelled()) {
988
0
                    query->cancel(Status::InternalError("brpc(dest: {}:{}) check failed: {}",
989
0
                                                        brpc_item.network_address.hostname,
990
0
                                                        brpc_item.network_address.port,
991
0
                                                        error_message));
992
0
                }
993
0
            }
994
995
0
            LOG(WARNING) << "remove brpc stub from cache: " << brpc_item.network_address.hostname
996
0
                         << ":" << brpc_item.network_address.port << ", error: " << error_message;
997
0
            ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
998
0
                    brpc_item.network_address.hostname, brpc_item.network_address.port);
999
0
            break;
1000
0
        }
1001
0
    }
1002
3.39k
}
1003
1004
0
void FragmentMgr::debug(std::stringstream& ss) {}
1005
/*
1006
 * 1. resolve opaqued_query_plan to thrift structure
1007
 * 2. build TPipelineFragmentParams
1008
 */
1009
Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params,
1010
                                                const TQueryPlanInfo& t_query_plan_info,
1011
                                                const TUniqueId& query_id,
1012
                                                const TUniqueId& fragment_instance_id,
1013
3
                                                std::vector<TScanColumnDesc>* selected_columns) {
1014
    // set up desc tbl
1015
3
    DescriptorTbl* desc_tbl = nullptr;
1016
3
    ObjectPool obj_pool;
1017
3
    Status st = DescriptorTbl::create(&obj_pool, t_query_plan_info.desc_tbl, &desc_tbl);
1018
3
    if (!st.ok()) {
1019
0
        LOG(WARNING) << "open context error: extract DescriptorTbl failure";
1020
0
        std::stringstream msg;
1021
0
        msg << " create DescriptorTbl error, should not be modified after returned Doris FE "
1022
0
               "processed";
1023
0
        return Status::InvalidArgument(msg.str());
1024
0
    }
1025
3
    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
1026
3
    if (tuple_desc == nullptr) {
1027
0
        LOG(WARNING) << "open context error: extract TupleDescriptor failure";
1028
0
        std::stringstream msg;
1029
0
        msg << " get  TupleDescriptor error, should not be modified after returned Doris FE "
1030
0
               "processed";
1031
0
        return Status::InvalidArgument(msg.str());
1032
0
    }
1033
    // process selected columns form slots
1034
50
    for (const SlotDescriptor* slot : tuple_desc->slots()) {
1035
50
        TScanColumnDesc col;
1036
50
        col.__set_name(slot->col_name());
1037
50
        col.__set_type(to_thrift(slot->type()->get_primitive_type()));
1038
50
        selected_columns->emplace_back(std::move(col));
1039
50
    }
1040
1041
3
    VLOG_QUERY << "BackendService execute open()  TQueryPlanInfo: "
1042
0
               << apache::thrift::ThriftDebugString(t_query_plan_info);
1043
    // assign the param used to execute PlanFragment
1044
3
    TPipelineFragmentParams exec_fragment_params;
1045
3
    exec_fragment_params.protocol_version = (PaloInternalServiceVersion::type)0;
1046
3
    exec_fragment_params.__set_is_simplified_param(false);
1047
3
    exec_fragment_params.__set_fragment(t_query_plan_info.plan_fragment);
1048
3
    exec_fragment_params.__set_desc_tbl(t_query_plan_info.desc_tbl);
1049
1050
    // assign the param used for executing of PlanFragment-self
1051
3
    TPipelineInstanceParams fragment_exec_params;
1052
3
    exec_fragment_params.query_id = query_id;
1053
3
    fragment_exec_params.fragment_instance_id = fragment_instance_id;
1054
3
    exec_fragment_params.coord.hostname = "external";
1055
3
    std::map<::doris::TPlanNodeId, std::vector<TScanRangeParams>> per_node_scan_ranges;
1056
3
    std::vector<TScanRangeParams> scan_ranges;
1057
3
    std::vector<int64_t> tablet_ids = params.tablet_ids;
1058
3
    TNetworkAddress address;
1059
3
    address.hostname = BackendOptions::get_localhost();
1060
3
    address.port = doris::config::be_port;
1061
3
    std::map<int64_t, TTabletVersionInfo> tablet_info = t_query_plan_info.tablet_info;
1062
3
    for (auto tablet_id : params.tablet_ids) {
1063
3
        TPaloScanRange scan_range;
1064
3
        scan_range.db_name = params.database;
1065
3
        scan_range.table_name = params.table;
1066
3
        auto iter = tablet_info.find(tablet_id);
1067
3
        if (iter != tablet_info.end()) {
1068
3
            TTabletVersionInfo info = iter->second;
1069
3
            scan_range.tablet_id = tablet_id;
1070
3
            scan_range.version = std::to_string(info.version);
1071
            // Useless but it is required field in TPaloScanRange
1072
3
            scan_range.version_hash = "0";
1073
3
            scan_range.schema_hash = std::to_string(info.schema_hash);
1074
3
            scan_range.hosts.push_back(address);
1075
3
        } else {
1076
0
            std::stringstream msg;
1077
0
            msg << "tablet_id: " << tablet_id << " not found";
1078
0
            LOG(WARNING) << "tablet_id [ " << tablet_id << " ] not found";
1079
0
            return Status::NotFound(msg.str());
1080
0
        }
1081
3
        TScanRange doris_scan_range;
1082
3
        doris_scan_range.__set_palo_scan_range(scan_range);
1083
3
        TScanRangeParams scan_range_params;
1084
3
        scan_range_params.scan_range = doris_scan_range;
1085
3
        scan_ranges.push_back(scan_range_params);
1086
3
    }
1087
3
    per_node_scan_ranges.insert(std::make_pair((::doris::TPlanNodeId)0, scan_ranges));
1088
3
    fragment_exec_params.per_node_scan_ranges = per_node_scan_ranges;
1089
3
    exec_fragment_params.local_params.push_back(fragment_exec_params);
1090
3
    TQueryOptions query_options;
1091
3
    query_options.batch_size = params.batch_size;
1092
3
    query_options.execution_timeout = params.execution_timeout;
1093
3
    query_options.mem_limit = params.mem_limit;
1094
3
    query_options.query_type = TQueryType::EXTERNAL;
1095
3
    query_options.be_exec_version = BeExecVersionManager::get_newest_version();
1096
3
    exec_fragment_params.__set_query_options(query_options);
1097
3
    VLOG_ROW << "external exec_plan_fragment params is "
1098
0
             << apache::thrift::ThriftDebugString(exec_fragment_params).c_str();
1099
1100
3
    TPipelineFragmentParamsList mocked;
1101
3
    return exec_plan_fragment(exec_fragment_params, QuerySource::EXTERNAL_CONNECTOR, mocked);
1102
3
}
1103
1104
Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
1105
1.70k
                                   butil::IOBufAsZeroCopyInputStream* attach_data) {
1106
1.70k
    UniqueId queryid = request->query_id();
1107
1.70k
    TUniqueId query_id;
1108
1.70k
    query_id.__set_hi(queryid.hi);
1109
1.70k
    query_id.__set_lo(queryid.lo);
1110
1.70k
    if (auto q_ctx = get_query_ctx(query_id)) {
1111
1.70k
        SCOPED_ATTACH_TASK(q_ctx.get());
1112
1.70k
        RuntimeFilterMgr* runtime_filter_mgr = q_ctx->runtime_filter_mgr();
1113
1.70k
        DCHECK(runtime_filter_mgr != nullptr);
1114
1115
        // 1. get the target filters
1116
1.70k
        std::vector<std::shared_ptr<RuntimeFilterConsumer>> filters =
1117
1.70k
                runtime_filter_mgr->get_consume_filters(request->filter_id());
1118
1119
        // 2. create the filter wrapper to replace or ignore/disable the target filters
1120
1.70k
        if (!filters.empty()) {
1121
            // Discard stale-stage requests from old recursive CTE rounds.
1122
1.70k
            if (filters[0]->stage() != request->stage()) {
1123
0
                return Status::OK();
1124
0
            }
1125
1.70k
            RETURN_IF_ERROR(filters[0]->assign(*request, attach_data));
1126
4.68k
            std::ranges::for_each(filters, [&](auto& filter) { filter->signal(filters[0].get()); });
1127
1.70k
        }
1128
1.70k
    }
1129
1.70k
    return Status::OK();
1130
1.70k
}
1131
1132
134
Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {
1133
134
    UniqueId queryid = request->query_id();
1134
134
    TUniqueId query_id;
1135
134
    query_id.__set_hi(queryid.hi);
1136
134
    query_id.__set_lo(queryid.lo);
1137
1138
134
    if (config::enable_debug_points &&
1139
134
        DebugPoints::instance()->is_enable("FragmentMgr::send_filter_size.return_eof")) {
1140
0
        return Status::EndOfFile("inject FragmentMgr::send_filter_size.return_eof");
1141
0
    }
1142
1143
134
    if (auto q_ctx = get_query_ctx(query_id)) {
1144
134
        return q_ctx->get_merge_controller_handler()->send_filter_size(q_ctx, request);
1145
134
    } else {
1146
0
        return Status::EndOfFile(
1147
0
                "Send filter size failed: Query context (query-id: {}) not found, maybe "
1148
0
                "finished",
1149
0
                queryid.to_string());
1150
0
    }
1151
134
}
1152
1153
134
Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest* request) {
1154
134
    UniqueId queryid = request->query_id();
1155
134
    TUniqueId query_id;
1156
134
    query_id.__set_hi(queryid.hi);
1157
134
    query_id.__set_lo(queryid.lo);
1158
134
    if (auto q_ctx = get_query_ctx(query_id)) {
1159
134
        try {
1160
134
            return q_ctx->runtime_filter_mgr()->sync_filter_size(request);
1161
134
        } catch (const Exception& e) {
1162
0
            return Status::InternalError(
1163
0
                    "Sync filter size failed: Query context (query-id: {}) error: {}",
1164
0
                    queryid.to_string(), e.what());
1165
0
        }
1166
134
    } else {
1167
0
        return Status::EndOfFile(
1168
0
                "Sync filter size failed: Query context (query-id: {}) already finished",
1169
0
                queryid.to_string());
1170
0
    }
1171
134
}
1172
1173
Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
1174
2.20k
                                 butil::IOBufAsZeroCopyInputStream* attach_data) {
1175
2.20k
    UniqueId queryid = request->query_id();
1176
1177
2.20k
    TUniqueId query_id;
1178
2.20k
    query_id.__set_hi(queryid.hi);
1179
2.20k
    query_id.__set_lo(queryid.lo);
1180
2.20k
    if (auto q_ctx = get_query_ctx(query_id)) {
1181
2.17k
        SCOPED_ATTACH_TASK(q_ctx.get());
1182
2.17k
        if (!q_ctx->get_merge_controller_handler()) {
1183
0
            return Status::InternalError("Merge filter failed: Merge controller handler is null");
1184
0
        }
1185
2.17k
        return q_ctx->get_merge_controller_handler()->merge(q_ctx, request, attach_data);
1186
2.17k
    } else {
1187
28
        return Status::EndOfFile(
1188
28
                "Merge filter size failed: Query context (query-id: {}) already finished",
1189
28
                queryid.to_string());
1190
28
    }
1191
2.20k
}
1192
1193
void FragmentMgr::get_runtime_query_info(
1194
25.1k
        std::vector<std::weak_ptr<ResourceContext>>* _resource_ctx_list) {
1195
25.1k
    std::vector<std::shared_ptr<QueryContext>> contexts;
1196
25.1k
    _query_ctx_map.apply(
1197
3.21M
            [&](phmap::flat_hash_map<TUniqueId, std::weak_ptr<QueryContext>>& map) -> Status {
1198
3.37M
                for (auto iter = map.begin(); iter != map.end();) {
1199
158k
                    if (auto q_ctx = iter->second.lock()) {
1200
158k
                        _resource_ctx_list->push_back(q_ctx->resource_ctx());
1201
158k
                        contexts.push_back(q_ctx);
1202
158k
                        iter++;
1203
158k
                    } else {
1204
85
                        iter = map.erase(iter);
1205
85
                    }
1206
158k
                }
1207
3.21M
                return Status::OK();
1208
3.21M
            });
1209
25.1k
}
1210
1211
Status FragmentMgr::get_realtime_exec_status(const TUniqueId& query_id,
1212
0
                                             TReportExecStatusParams* exec_status) {
1213
0
    if (exec_status == nullptr) {
1214
0
        return Status::InvalidArgument("exes_status is nullptr");
1215
0
    }
1216
1217
0
    std::shared_ptr<QueryContext> query_context = get_query_ctx(query_id);
1218
0
    if (query_context == nullptr) {
1219
0
        return Status::NotFound("Query {} not found or released", print_id(query_id));
1220
0
    }
1221
1222
0
    *exec_status = query_context->get_realtime_exec_status();
1223
1224
0
    return Status::OK();
1225
0
}
1226
1227
1
Status FragmentMgr::get_query_statistics(const TUniqueId& query_id, TQueryStatistics* query_stats) {
1228
1
    if (query_stats == nullptr) {
1229
0
        return Status::InvalidArgument("query_stats is nullptr");
1230
0
    }
1231
1232
1
    return ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_query_statistics(
1233
1
            print_id(query_id), query_stats);
1234
1
}
1235
1236
Status FragmentMgr::transmit_rec_cte_block(
1237
        const TUniqueId& query_id, const TUniqueId& instance_id, int node_id,
1238
3.76k
        const google::protobuf::RepeatedPtrField<doris::PBlock>& pblocks, bool eos) {
1239
3.76k
    if (auto q_ctx = get_query_ctx(query_id)) {
1240
3.76k
        SCOPED_ATTACH_TASK(q_ctx.get());
1241
3.76k
        return q_ctx->send_block_to_cte_scan(instance_id, node_id, pblocks, eos);
1242
3.76k
    } else {
1243
0
        return Status::EndOfFile(
1244
0
                "Transmit rec cte block failed: Query context (query-id: {}) not found, maybe "
1245
0
                "finished",
1246
0
                print_id(query_id));
1247
0
    }
1248
3.76k
}
1249
1250
// Orchestrates the recursive CTE fragment lifecycle through 4 phases:
1251
//
1252
// wait_for_destroy: collect deregister RF IDs, store brpc closure, trigger old PFC close
1253
// rebuild: increment stage, deregister old RFs, create+prepare new PFC from saved params
1254
// submit: submit the new PFC's pipeline tasks for execution
1255
// final_close: async wait for close, send final report, clean up (last round only)
1256
//
1257
// The brpc ClosureGuard is stored in the PFC so the RPC response is deferred until
1258
// the PFC is fully destroyed. This gives the caller (RecCTESourceOperatorX) a
1259
// synchronization point to know when the old PFC has finished all its tasks.
1260
Status FragmentMgr::rerun_fragment(const std::shared_ptr<brpc::ClosureGuard>& guard,
1261
                                   const TUniqueId& query_id, int fragment_id,
1262
10.0k
                                   PRerunFragmentParams_Opcode stage) {
1263
10.0k
    if (stage == PRerunFragmentParams::WAIT_FOR_DESTROY ||
1264
10.0k
        stage == PRerunFragmentParams::FINAL_CLOSE) {
1265
3.48k
        auto fragment_ctx = _pipeline_map.find({query_id, fragment_id});
1266
3.48k
        if (!fragment_ctx) {
1267
9
            return Status::NotFound("Fragment context (query-id: {}, fragment-id: {}) not found",
1268
9
                                    print_id(query_id), fragment_id);
1269
9
        }
1270
1271
3.47k
        if (stage == PRerunFragmentParams::WAIT_FOR_DESTROY) {
1272
3.28k
            std::unique_lock<std::mutex> lk(_rerunnable_params_lock);
1273
3.28k
            auto it = _rerunnable_params_map.find({query_id, fragment_id});
1274
3.28k
            if (it == _rerunnable_params_map.end()) {
1275
0
                lk.unlock();
1276
0
                auto st = fragment_ctx->listen_wait_close(guard, true);
1277
0
                if (!st.ok()) {
1278
0
                    LOG(WARNING) << fmt::format(
1279
0
                            "wait_for_destroy fragment context (query-id: {}, fragment-id: "
1280
0
                            "{}) failed: {}",
1281
0
                            print_id(query_id), fragment_id, st.to_string());
1282
0
                }
1283
0
                return Status::NotFound(
1284
0
                        "Rerunnable params (query-id: {}, fragment-id: {}) not found",
1285
0
                        print_id(query_id), fragment_id);
1286
0
            }
1287
1288
3.28k
            it->second.deregister_runtime_filter_ids.merge(
1289
3.28k
                    fragment_ctx->get_deregister_runtime_filter());
1290
3.28k
        }
1291
1292
3.47k
        auto* query_ctx = fragment_ctx->get_query_ctx();
1293
3.47k
        SCOPED_ATTACH_TASK(query_ctx);
1294
3.47k
        RETURN_IF_ERROR(
1295
3.47k
                fragment_ctx->listen_wait_close(guard, stage == PRerunFragmentParams::FINAL_CLOSE));
1296
3.47k
        fragment_ctx->notify_close();
1297
3.47k
        return Status::OK();
1298
6.57k
    } else if (stage == PRerunFragmentParams::REBUILD) {
1299
3.28k
        auto q_ctx = get_query_ctx(query_id);
1300
3.28k
        if (!q_ctx) {
1301
0
            return Status::NotFound(
1302
0
                    "rerun_fragment: Query context (query-id: {}) not found, maybe finished",
1303
0
                    print_id(query_id));
1304
0
        }
1305
3.28k
        SCOPED_ATTACH_TASK(q_ctx.get());
1306
3.28k
        RerunableFragmentInfo info;
1307
3.28k
        {
1308
3.28k
            std::lock_guard<std::mutex> lk(_rerunnable_params_lock);
1309
3.28k
            auto it = _rerunnable_params_map.find({query_id, fragment_id});
1310
3.28k
            if (it == _rerunnable_params_map.end()) {
1311
0
                return Status::NotFound("rebuild (query-id: {}, fragment-id: {}) not found",
1312
0
                                        print_id(query_id), fragment_id);
1313
0
            }
1314
3.28k
            it->second.stage++;
1315
            // Deregister old runtime filters so new ones can be registered in the new PFC.
1316
3.28k
            for (int32_t filter_id : it->second.deregister_runtime_filter_ids) {
1317
1.06k
                q_ctx->runtime_filter_mgr()->remove_filter(filter_id);
1318
1.06k
            }
1319
3.28k
            info = it->second;
1320
3.28k
        }
1321
1322
0
        auto context = std::make_shared<PipelineFragmentContext>(
1323
3.28k
                q_ctx->query_id(), info.params, q_ctx, _exec_env, info.finish_callback);
1324
        // Propagate the recursion stage so that runtime filters created by this PFC
1325
        // carry the correct stage number.
1326
3.28k
        context->set_rec_cte_stage(info.stage);
1327
1328
3.28k
        Status prepare_st = Status::OK();
1329
3.28k
        ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st = context->prepare(_thread_pool.get()),
1330
3.28k
                                         prepare_st);
1331
3.28k
        if (!prepare_st.ok()) {
1332
0
            q_ctx->cancel(prepare_st, info.params.fragment_id);
1333
0
            return prepare_st;
1334
0
        }
1335
1336
        // Insert new PFC into _pipeline_map (old one was removed)
1337
3.28k
        _pipeline_map.insert({info.params.query_id, info.params.fragment_id}, context);
1338
1339
        // Update QueryContext mapping (must support overwrite)
1340
3.28k
        q_ctx->set_pipeline_context(info.params.fragment_id, context);
1341
3.28k
        return Status::OK();
1342
1343
3.28k
    } else if (stage == PRerunFragmentParams::SUBMIT) {
1344
3.28k
        auto fragment_ctx = _pipeline_map.find({query_id, fragment_id});
1345
3.28k
        if (!fragment_ctx) {
1346
0
            return Status::NotFound("Fragment context (query-id: {}, fragment-id: {}) not found",
1347
0
                                    print_id(query_id), fragment_id);
1348
0
        }
1349
3.28k
        return fragment_ctx->submit();
1350
3.28k
    } else {
1351
0
        return Status::InvalidArgument("Unknown rerun fragment opcode: {}", stage);
1352
0
    }
1353
10.0k
}
1354
1355
Status FragmentMgr::reset_global_rf(const TUniqueId& query_id,
1356
1.82k
                                    const google::protobuf::RepeatedField<int32_t>& filter_ids) {
1357
1.82k
    if (auto q_ctx = get_query_ctx(query_id)) {
1358
1.82k
        SCOPED_ATTACH_TASK(q_ctx.get());
1359
1.82k
        return q_ctx->reset_global_rf(filter_ids);
1360
1.82k
    } else {
1361
0
        return Status::NotFound(
1362
0
                "reset_fragment: Query context (query-id: {}) not found, maybe finished",
1363
0
                print_id(query_id));
1364
0
    }
1365
1.82k
}
1366
1367
} // namespace doris