Coverage Report

Created: 2026-04-10 12:12

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/fragment_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/fragment_mgr.h"
19
20
#include <brpc/controller.h>
21
#include <bvar/latency_recorder.h>
22
#include <fmt/format.h>
23
#include <gen_cpp/DorisExternalService_types.h>
24
#include <gen_cpp/FrontendService.h>
25
#include <gen_cpp/FrontendService_types.h>
26
#include <gen_cpp/HeartbeatService_types.h>
27
#include <gen_cpp/Metrics_types.h>
28
#include <gen_cpp/PaloInternalService_types.h>
29
#include <gen_cpp/PlanNodes_types.h>
30
#include <gen_cpp/Planner_types.h>
31
#include <gen_cpp/QueryPlanExtra_types.h>
32
#include <gen_cpp/RuntimeProfile_types.h>
33
#include <gen_cpp/Types_types.h>
34
#include <gen_cpp/internal_service.pb.h>
35
#include <pthread.h>
36
#include <sys/time.h>
37
#include <thrift/TApplicationException.h>
38
#include <thrift/Thrift.h>
39
#include <thrift/protocol/TDebugProtocol.h>
40
#include <thrift/transport/TTransportException.h>
41
#include <unistd.h>
42
43
#include <algorithm>
44
#include <cstddef>
45
#include <ctime>
46
47
// IWYU pragma: no_include <bits/chrono.h>
48
#include <chrono> // IWYU pragma: keep
49
#include <cstdint>
50
#include <map>
51
#include <memory>
52
#include <mutex>
53
#include <sstream>
54
#include <unordered_map>
55
#include <unordered_set>
56
#include <utility>
57
58
#include "common/config.h"
59
#include "common/exception.h"
60
#include "common/logging.h"
61
#include "common/metrics/doris_metrics.h"
62
#include "common/object_pool.h"
63
#include "common/status.h"
64
#include "common/utils.h"
65
#include "core/data_type/primitive_type.h"
66
#include "exec/pipeline/pipeline_fragment_context.h"
67
#include "exec/runtime_filter/runtime_filter_consumer.h"
68
#include "exec/runtime_filter/runtime_filter_mgr.h"
69
#include "io/fs/stream_load_pipe.h"
70
#include "load/stream_load/new_load_stream_mgr.h"
71
#include "load/stream_load/stream_load_context.h"
72
#include "load/stream_load/stream_load_executor.h"
73
#include "runtime/descriptors.h"
74
#include "runtime/exec_env.h"
75
#include "runtime/frontend_info.h"
76
#include "runtime/query_context.h"
77
#include "runtime/runtime_profile.h"
78
#include "runtime/runtime_query_statistics_mgr.h"
79
#include "runtime/runtime_state.h"
80
#include "runtime/thread_context.h"
81
#include "runtime/workload_group/workload_group.h"
82
#include "runtime/workload_group/workload_group_manager.h"
83
#include "service/backend_options.h"
84
#include "storage/id_manager.h"
85
#include "util/brpc_client_cache.h"
86
#include "util/client_cache.h"
87
#include "util/debug_points.h"
88
#include "util/debug_util.h"
89
#include "util/network_util.h"
90
#include "util/thread.h"
91
#include "util/threadpool.h"
92
#include "util/thrift_util.h"
93
#include "util/uid_util.h"
94
95
namespace doris {
96
97
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_instance_count, MetricUnit::NOUNIT);
98
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count, MetricUnit::NOUNIT);
99
100
bvar::LatencyRecorder g_fragmentmgr_prepare_latency("doris_FragmentMgr", "prepare");
101
102
bvar::Adder<uint64_t> g_fragment_executing_count("fragment_executing_count");
103
bvar::Status<uint64_t> g_fragment_last_active_time(
104
        "fragment_last_active_time", duration_cast<std::chrono::milliseconds>(
105
                                             std::chrono::system_clock::now().time_since_epoch())
106
                                             .count());
107
108
1.19k
uint64_t get_fragment_executing_count() {
109
1.19k
    return g_fragment_executing_count.get_value();
110
1.19k
}
111
1.19k
uint64_t get_fragment_last_active_time() {
112
1.19k
    return g_fragment_last_active_time.get_value();
113
1.19k
}
114
115
2.48k
std::string to_load_error_http_path(const std::string& file_name) {
116
2.48k
    if (file_name.empty()) {
117
1.89k
        return "";
118
1.89k
    }
119
590
    if (file_name.compare(0, 4, "http") == 0) {
120
562
        return file_name;
121
562
    }
122
28
    std::stringstream url;
123
28
    url << (config::enable_https ? "https" : "http") << "://"
124
28
        << get_host_port(BackendOptions::get_localhost(), config::webserver_port)
125
28
        << "/api/_load_error_log?"
126
28
        << "file=" << file_name;
127
28
    return url.str();
128
590
}
129
130
using apache::thrift::TException;
131
using apache::thrift::transport::TTransportException;
132
133
static Status _do_fetch_running_queries_rpc(const FrontendInfo& fe_info,
134
0
                                            std::unordered_set<TUniqueId>& query_set) {
135
0
    TFetchRunningQueriesResult rpc_result;
136
0
    TFetchRunningQueriesRequest rpc_request;
137
138
0
    Status client_status;
139
0
    const int32_t timeout_ms = 3 * 1000;
140
0
    FrontendServiceConnection rpc_client(ExecEnv::GetInstance()->frontend_client_cache(),
141
0
                                         fe_info.info.coordinator_address, timeout_ms,
142
0
                                         &client_status);
143
    // Abort this fe.
144
0
    if (!client_status.ok()) {
145
0
        LOG_WARNING("Failed to get client for {}, reason is {}",
146
0
                    PrintThriftNetworkAddress(fe_info.info.coordinator_address),
147
0
                    client_status.to_string());
148
0
        return Status::InternalError("Failed to get client for {}, reason is {}",
149
0
                                     PrintThriftNetworkAddress(fe_info.info.coordinator_address),
150
0
                                     client_status.to_string());
151
0
    }
152
153
    // do rpc
154
0
    try {
155
0
        try {
156
0
            rpc_client->fetchRunningQueries(rpc_result, rpc_request);
157
0
        } catch (const apache::thrift::transport::TTransportException& e) {
158
0
            LOG_WARNING("Transport exception reason: {}, reopening", e.what());
159
0
            client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms);
160
0
            if (!client_status.ok()) {
161
0
                LOG_WARNING("Reopen failed, reason: {}", client_status.to_string_no_stack());
162
0
                return Status::InternalError("Reopen failed, reason: {}",
163
0
                                             client_status.to_string_no_stack());
164
0
            }
165
166
0
            rpc_client->fetchRunningQueries(rpc_result, rpc_request);
167
0
        }
168
0
    } catch (apache::thrift::TException& e) {
169
        // During upgrading cluster or meet any other network error.
170
0
        LOG_WARNING("Failed to fetch running queries from {}, reason: {}",
171
0
                    PrintThriftNetworkAddress(fe_info.info.coordinator_address), e.what());
172
0
        return Status::InternalError("Failed to fetch running queries from {}, reason: {}",
173
0
                                     PrintThriftNetworkAddress(fe_info.info.coordinator_address),
174
0
                                     e.what());
175
0
    }
176
177
    // Avoid logic error in frontend.
178
0
    if (!rpc_result.__isset.status || rpc_result.status.status_code != TStatusCode::OK) {
179
0
        LOG_WARNING("Failed to fetch running queries from {}, reason: {}",
180
0
                    PrintThriftNetworkAddress(fe_info.info.coordinator_address),
181
0
                    doris::to_string(rpc_result.status.status_code));
182
0
        return Status::InternalError("Failed to fetch running queries from {}, reason: {}",
183
0
                                     PrintThriftNetworkAddress(fe_info.info.coordinator_address),
184
0
                                     doris::to_string(rpc_result.status.status_code));
185
0
    }
186
187
0
    if (!rpc_result.__isset.running_queries) {
188
0
        return Status::InternalError("Failed to fetch running queries from {}, reason: {}",
189
0
                                     PrintThriftNetworkAddress(fe_info.info.coordinator_address),
190
0
                                     "running_queries is not set");
191
0
    }
192
193
0
    query_set = std::unordered_set<TUniqueId>(rpc_result.running_queries.begin(),
194
0
                                              rpc_result.running_queries.end());
195
0
    return Status::OK();
196
0
};
197
198
0
static std::map<int64_t, std::unordered_set<TUniqueId>> _get_all_running_queries_from_fe() {
199
0
    const std::map<TNetworkAddress, FrontendInfo>& running_fes =
200
0
            ExecEnv::GetInstance()->get_running_frontends();
201
202
0
    std::map<int64_t, std::unordered_set<TUniqueId>> result;
203
0
    std::vector<FrontendInfo> qualified_fes;
204
205
0
    for (const auto& fe : running_fes) {
206
        // Only consider normal frontend.
207
0
        if (fe.first.port != 0 && fe.second.info.process_uuid != 0) {
208
0
            qualified_fes.push_back(fe.second);
209
0
        } else {
210
0
            return {};
211
0
        }
212
0
    }
213
214
0
    for (const auto& fe_addr : qualified_fes) {
215
0
        const int64_t process_uuid = fe_addr.info.process_uuid;
216
0
        std::unordered_set<TUniqueId> query_set;
217
0
        Status st = _do_fetch_running_queries_rpc(fe_addr, query_set);
218
0
        if (!st.ok()) {
219
            // Empty result, cancel worker will not do anything
220
0
            return {};
221
0
        }
222
223
        // frontend_info and process_uuid has been checked in rpc threads.
224
0
        result[process_uuid] = query_set;
225
0
    }
226
227
0
    return result;
228
0
}
229
230
1.82M
inline uint32_t get_map_id(const TUniqueId& query_id, size_t capacity) {
231
1.82M
    uint32_t value = HashUtil::hash(&query_id.lo, 8, 0);
232
1.82M
    value = HashUtil::hash(&query_id.hi, 8, value);
233
1.82M
    return value % capacity;
234
1.82M
}
235
236
1.27M
inline uint32_t get_map_id(std::pair<TUniqueId, int> key, size_t capacity) {
237
1.27M
    uint32_t value = HashUtil::hash(&key.first.lo, 8, 0);
238
1.27M
    value = HashUtil::hash(&key.first.hi, 8, value);
239
1.27M
    return value % capacity;
240
1.27M
}
241
242
template <typename Key, typename Value, typename ValueType>
243
42
ConcurrentContextMap<Key, Value, ValueType>::ConcurrentContextMap() {
244
42
    _internal_map.resize(config::num_query_ctx_map_partitions);
245
5.41k
    for (size_t i = 0; i < config::num_query_ctx_map_partitions; i++) {
246
5.37k
        _internal_map[i] = {std::make_unique<std::shared_mutex>(),
247
5.37k
                            phmap::flat_hash_map<Key, Value>()};
248
5.37k
    }
249
42
}
_ZN5doris20ConcurrentContextMapISt4pairINS_9TUniqueIdEiESt10shared_ptrINS_23PipelineFragmentContextEES5_EC2Ev
Line
Count
Source
243
14
ConcurrentContextMap<Key, Value, ValueType>::ConcurrentContextMap() {
244
14
    _internal_map.resize(config::num_query_ctx_map_partitions);
245
1.80k
    for (size_t i = 0; i < config::num_query_ctx_map_partitions; i++) {
246
1.79k
        _internal_map[i] = {std::make_unique<std::shared_mutex>(),
247
1.79k
                            phmap::flat_hash_map<Key, Value>()};
248
1.79k
    }
249
14
}
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt8weak_ptrINS_12QueryContextEES3_EC2Ev
Line
Count
Source
243
14
ConcurrentContextMap<Key, Value, ValueType>::ConcurrentContextMap() {
244
14
    _internal_map.resize(config::num_query_ctx_map_partitions);
245
1.80k
    for (size_t i = 0; i < config::num_query_ctx_map_partitions; i++) {
246
1.79k
        _internal_map[i] = {std::make_unique<std::shared_mutex>(),
247
1.79k
                            phmap::flat_hash_map<Key, Value>()};
248
1.79k
    }
249
14
}
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_12QueryContextEES3_EC2Ev
Line
Count
Source
243
14
ConcurrentContextMap<Key, Value, ValueType>::ConcurrentContextMap() {
244
14
    _internal_map.resize(config::num_query_ctx_map_partitions);
245
1.80k
    for (size_t i = 0; i < config::num_query_ctx_map_partitions; i++) {
246
1.79k
        _internal_map[i] = {std::make_unique<std::shared_mutex>(),
247
1.79k
                            phmap::flat_hash_map<Key, Value>()};
248
1.79k
    }
249
14
}
250
251
template <typename Key, typename Value, typename ValueType>
252
1.17M
Value ConcurrentContextMap<Key, Value, ValueType>::find(const Key& query_id) {
253
1.17M
    auto id = get_map_id(query_id, _internal_map.size());
254
1.17M
    {
255
1.17M
        std::shared_lock lock(*_internal_map[id].first);
256
1.17M
        auto& map = _internal_map[id].second;
257
1.17M
        auto search = map.find(query_id);
258
1.17M
        if (search != map.end()) {
259
421k
            return search->second;
260
421k
        }
261
757k
        return std::shared_ptr<ValueType>(nullptr);
262
1.17M
    }
263
1.17M
}
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt8weak_ptrINS_12QueryContextEES3_E4findERKS1_
Line
Count
Source
252
750k
Value ConcurrentContextMap<Key, Value, ValueType>::find(const Key& query_id) {
253
750k
    auto id = get_map_id(query_id, _internal_map.size());
254
750k
    {
255
750k
        std::shared_lock lock(*_internal_map[id].first);
256
750k
        auto& map = _internal_map[id].second;
257
750k
        auto search = map.find(query_id);
258
750k
        if (search != map.end()) {
259
414k
            return search->second;
260
414k
        }
261
336k
        return std::shared_ptr<ValueType>(nullptr);
262
750k
    }
263
750k
}
_ZN5doris20ConcurrentContextMapISt4pairINS_9TUniqueIdEiESt10shared_ptrINS_23PipelineFragmentContextEES5_E4findERKS3_
Line
Count
Source
252
427k
Value ConcurrentContextMap<Key, Value, ValueType>::find(const Key& query_id) {
253
427k
    auto id = get_map_id(query_id, _internal_map.size());
254
427k
    {
255
427k
        std::shared_lock lock(*_internal_map[id].first);
256
427k
        auto& map = _internal_map[id].second;
257
427k
        auto search = map.find(query_id);
258
427k
        if (search != map.end()) {
259
6.76k
            return search->second;
260
6.76k
        }
261
420k
        return std::shared_ptr<ValueType>(nullptr);
262
427k
    }
263
427k
}
264
265
template <typename Key, typename Value, typename ValueType>
266
Status ConcurrentContextMap<Key, Value, ValueType>::apply_if_not_exists(
267
284k
        const Key& query_id, std::shared_ptr<ValueType>& query_ctx, ApplyFunction&& function) {
268
284k
    auto id = get_map_id(query_id, _internal_map.size());
269
284k
    {
270
284k
        std::unique_lock lock(*_internal_map[id].first);
271
284k
        auto& map = _internal_map[id].second;
272
284k
        auto search = map.find(query_id);
273
284k
        if (search != map.end()) {
274
2
            query_ctx = search->second.lock();
275
2
        }
276
284k
        if (!query_ctx) {
277
283k
            return function(map);
278
283k
        }
279
76
        return Status::OK();
280
284k
    }
281
284k
}
282
283
template <typename Key, typename Value, typename ValueType>
284
1.21M
bool ConcurrentContextMap<Key, Value, ValueType>::erase(const Key& query_id) {
285
1.21M
    auto id = get_map_id(query_id, _internal_map.size());
286
1.21M
    std::unique_lock lock(*_internal_map[id].first);
287
1.21M
    auto& map = _internal_map[id].second;
288
1.21M
    return map.erase(query_id) != 0;
289
1.21M
}
_ZN5doris20ConcurrentContextMapISt4pairINS_9TUniqueIdEiESt10shared_ptrINS_23PipelineFragmentContextEES5_E5eraseERKS3_
Line
Count
Source
284
425k
bool ConcurrentContextMap<Key, Value, ValueType>::erase(const Key& query_id) {
285
425k
    auto id = get_map_id(query_id, _internal_map.size());
286
425k
    std::unique_lock lock(*_internal_map[id].first);
287
425k
    auto& map = _internal_map[id].second;
288
425k
    return map.erase(query_id) != 0;
289
425k
}
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_12QueryContextEES3_E5eraseERKS1_
Line
Count
Source
284
393k
bool ConcurrentContextMap<Key, Value, ValueType>::erase(const Key& query_id) {
285
393k
    auto id = get_map_id(query_id, _internal_map.size());
286
393k
    std::unique_lock lock(*_internal_map[id].first);
287
393k
    auto& map = _internal_map[id].second;
288
393k
    return map.erase(query_id) != 0;
289
393k
}
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt8weak_ptrINS_12QueryContextEES3_E5eraseERKS1_
Line
Count
Source
284
393k
bool ConcurrentContextMap<Key, Value, ValueType>::erase(const Key& query_id) {
285
393k
    auto id = get_map_id(query_id, _internal_map.size());
286
393k
    std::unique_lock lock(*_internal_map[id].first);
287
393k
    auto& map = _internal_map[id].second;
288
393k
    return map.erase(query_id) != 0;
289
393k
}
290
291
template <typename Key, typename Value, typename ValueType>
292
void ConcurrentContextMap<Key, Value, ValueType>::insert(const Key& query_id,
293
426k
                                                         std::shared_ptr<ValueType> query_ctx) {
294
426k
    auto id = get_map_id(query_id, _internal_map.size());
295
426k
    {
296
426k
        std::unique_lock lock(*_internal_map[id].first);
297
426k
        auto& map = _internal_map[id].second;
298
426k
        map.insert({query_id, query_ctx});
299
426k
    }
300
426k
}
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_12QueryContextEES3_E6insertERKS1_S4_
Line
Count
Source
293
2.60k
                                                         std::shared_ptr<ValueType> query_ctx) {
294
2.60k
    auto id = get_map_id(query_id, _internal_map.size());
295
2.60k
    {
296
2.60k
        std::unique_lock lock(*_internal_map[id].first);
297
2.60k
        auto& map = _internal_map[id].second;
298
2.60k
        map.insert({query_id, query_ctx});
299
2.60k
    }
300
2.60k
}
_ZN5doris20ConcurrentContextMapISt4pairINS_9TUniqueIdEiESt10shared_ptrINS_23PipelineFragmentContextEES5_E6insertERKS3_S6_
Line
Count
Source
293
424k
                                                         std::shared_ptr<ValueType> query_ctx) {
294
424k
    auto id = get_map_id(query_id, _internal_map.size());
295
424k
    {
296
424k
        std::unique_lock lock(*_internal_map[id].first);
297
424k
        auto& map = _internal_map[id].second;
298
424k
        map.insert({query_id, query_ctx});
299
424k
    }
300
424k
}
301
302
template <typename Key, typename Value, typename ValueType>
303
33
void ConcurrentContextMap<Key, Value, ValueType>::clear() {
304
4.22k
    for (auto& pair : _internal_map) {
305
4.22k
        std::unique_lock lock(*pair.first);
306
4.22k
        auto& map = pair.second;
307
4.22k
        map.clear();
308
4.22k
    }
309
33
}
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt8weak_ptrINS_12QueryContextEES3_E5clearEv
Line
Count
Source
303
11
void ConcurrentContextMap<Key, Value, ValueType>::clear() {
304
1.40k
    for (auto& pair : _internal_map) {
305
1.40k
        std::unique_lock lock(*pair.first);
306
1.40k
        auto& map = pair.second;
307
1.40k
        map.clear();
308
1.40k
    }
309
11
}
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_12QueryContextEES3_E5clearEv
Line
Count
Source
303
11
void ConcurrentContextMap<Key, Value, ValueType>::clear() {
304
1.40k
    for (auto& pair : _internal_map) {
305
1.40k
        std::unique_lock lock(*pair.first);
306
1.40k
        auto& map = pair.second;
307
1.40k
        map.clear();
308
1.40k
    }
309
11
}
_ZN5doris20ConcurrentContextMapISt4pairINS_9TUniqueIdEiESt10shared_ptrINS_23PipelineFragmentContextEES5_E5clearEv
Line
Count
Source
303
11
void ConcurrentContextMap<Key, Value, ValueType>::clear() {
304
1.40k
    for (auto& pair : _internal_map) {
305
1.40k
        std::unique_lock lock(*pair.first);
306
1.40k
        auto& map = pair.second;
307
1.40k
        map.clear();
308
1.40k
    }
309
11
}
310
311
FragmentMgr::FragmentMgr(ExecEnv* exec_env)
312
14
        : _exec_env(exec_env), _stop_background_threads_latch(1) {
313
14
    _entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr");
314
14
    INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count);
315
316
14
    auto s = Thread::create(
317
14
            "FragmentMgr", "cancel_timeout_plan_fragment", [this]() { this->cancel_worker(); },
318
14
            &_cancel_thread);
319
14
    CHECK(s.ok()) << s.to_string();
320
321
14
    s = ThreadPoolBuilder("FragmentMgrAsyncWorkThreadPool")
322
14
                .set_min_threads(config::fragment_mgr_async_work_pool_thread_num_min)
323
14
                .set_max_threads(config::fragment_mgr_async_work_pool_thread_num_max)
324
14
                .set_max_queue_size(config::fragment_mgr_async_work_pool_queue_size)
325
14
                .build(&_thread_pool);
326
14
    CHECK(s.ok()) << s.to_string();
327
14
}
328
329
11
FragmentMgr::~FragmentMgr() = default;
330
331
11
void FragmentMgr::stop() {
332
11
    DEREGISTER_HOOK_METRIC(fragment_instance_count);
333
11
    _stop_background_threads_latch.count_down();
334
11
    if (_cancel_thread) {
335
11
        _cancel_thread->join();
336
11
    }
337
338
11
    _thread_pool->shutdown();
339
    // Only me can delete
340
11
    _query_ctx_map.clear();
341
    // in one BE's graceful shutdown, cancel_worker will get related running queries via _get_all_running_queries_from_fe and cancel them.
342
    // so clearing here will not make RF consumer hang. if we dont do this, in ~FragmentMgr() there may be QueryContext in _query_ctx_map_delay_delete
343
    // destructred and remove it from _query_ctx_map_delay_delete which is destructring. it's UB.
344
11
    _query_ctx_map_delay_delete.clear();
345
11
    _pipeline_map.clear();
346
11
    {
347
11
        std::lock_guard<std::mutex> lk(_rerunnable_params_lock);
348
11
        _rerunnable_params_map.clear();
349
11
    }
350
11
}
351
352
0
std::string FragmentMgr::to_http_path(const std::string& file_name) {
353
0
    std::stringstream url;
354
0
    url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
355
0
        << "/api/_download_load?"
356
0
        << "token=" << _exec_env->token() << "&file=" << file_name;
357
0
    return url.str();
358
0
}
359
360
Status FragmentMgr::trigger_pipeline_context_report(
361
46.7k
        const ReportStatusRequest req, std::shared_ptr<PipelineFragmentContext>&& ctx) {
362
46.7k
    return _thread_pool->submit_func([this, req, ctx]() {
363
46.7k
        SCOPED_ATTACH_TASK(ctx->get_query_ctx()->query_mem_tracker());
364
46.7k
        coordinator_callback(req);
365
46.7k
        if (!req.done) {
366
4.71k
            ctx->refresh_next_report_time();
367
4.71k
        }
368
46.7k
    });
369
46.7k
}
370
371
// There can only be one of these callbacks in-flight at any moment, because
372
// it is only invoked from the executor's reporting thread.
373
// Also, the reported status will always reflect the most recent execution status,
374
// including the final status when execution finishes.
375
46.7k
void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
376
46.7k
    DBUG_EXECUTE_IF("FragmentMgr::coordinator_callback.report_delay", {
377
46.7k
        int random_seconds = req.status.is<ErrorCode::DATA_QUALITY_ERROR>() ? 8 : 2;
378
46.7k
        LOG_INFO("sleep : ").tag("time", random_seconds).tag("query_id", print_id(req.query_id));
379
46.7k
        std::this_thread::sleep_for(std::chrono::seconds(random_seconds));
380
46.7k
        LOG_INFO("sleep done").tag("query_id", print_id(req.query_id));
381
46.7k
    });
382
383
46.7k
    DCHECK(req.status.ok() || req.done); // if !status.ok() => done
384
46.7k
    if (req.coord_addr.hostname == "external") {
385
        // External query (flink/spark read tablets) not need to report to FE.
386
0
        return;
387
0
    }
388
46.7k
    int callback_retries = 10;
389
46.7k
    const int sleep_ms = 1000;
390
46.7k
    Status exec_status = req.status;
391
46.7k
    Status coord_status;
392
46.7k
    std::unique_ptr<FrontendServiceConnection> coord = nullptr;
393
46.7k
    do {
394
46.7k
        coord = std::make_unique<FrontendServiceConnection>(_exec_env->frontend_client_cache(),
395
46.7k
                                                            req.coord_addr, &coord_status);
396
46.7k
        if (!coord_status.ok()) {
397
0
            std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
398
0
        }
399
46.7k
    } while (!coord_status.ok() && callback_retries-- > 0);
400
401
46.7k
    if (!coord_status.ok()) {
402
0
        std::stringstream ss;
403
0
        UniqueId uid(req.query_id.hi, req.query_id.lo);
404
0
        static_cast<void>(req.cancel_fn(Status::InternalError(
405
0
                "query_id: {}, couldn't get a client for {}, reason is {}", uid.to_string(),
406
0
                PrintThriftNetworkAddress(req.coord_addr), coord_status.to_string())));
407
0
        return;
408
0
    }
409
410
46.7k
    TReportExecStatusParams params;
411
46.7k
    params.protocol_version = FrontendServiceVersion::V1;
412
46.7k
    params.__set_query_id(req.query_id);
413
46.7k
    params.__set_backend_num(req.backend_num);
414
46.7k
    params.__set_fragment_instance_id(req.fragment_instance_id);
415
46.7k
    params.__set_fragment_id(req.fragment_id);
416
46.7k
    params.__set_status(exec_status.to_thrift());
417
46.7k
    params.__set_done(req.done);
418
46.7k
    params.__set_query_type(req.runtime_state->query_type());
419
46.7k
    params.__isset.profile = false;
420
421
46.7k
    DCHECK(req.runtime_state != nullptr);
422
423
46.7k
    if (req.runtime_state->query_type() == TQueryType::LOAD) {
424
42.0k
        params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
425
42.0k
        params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
426
42.0k
    } else {
427
4.61k
        DCHECK(!req.runtime_states.empty());
428
4.61k
        if (!req.runtime_state->output_files().empty()) {
429
0
            params.__isset.delta_urls = true;
430
0
            for (auto& it : req.runtime_state->output_files()) {
431
0
                params.delta_urls.push_back(to_http_path(it));
432
0
            }
433
0
        }
434
4.61k
        if (!params.delta_urls.empty()) {
435
0
            params.__isset.delta_urls = true;
436
0
        }
437
4.61k
    }
438
439
    // load rows
440
46.7k
    static std::string s_dpp_normal_all = "dpp.norm.ALL";
441
46.7k
    static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
442
46.7k
    static std::string s_unselected_rows = "unselected.rows";
443
46.7k
    int64_t num_rows_load_success = 0;
444
46.7k
    int64_t num_rows_load_filtered = 0;
445
46.7k
    int64_t num_rows_load_unselected = 0;
446
46.7k
    if (req.runtime_state->num_rows_load_total() > 0 ||
447
46.7k
        req.runtime_state->num_rows_load_filtered() > 0 ||
448
46.7k
        req.runtime_state->num_finished_range() > 0) {
449
0
        params.__isset.load_counters = true;
450
451
0
        num_rows_load_success = req.runtime_state->num_rows_load_success();
452
0
        num_rows_load_filtered = req.runtime_state->num_rows_load_filtered();
453
0
        num_rows_load_unselected = req.runtime_state->num_rows_load_unselected();
454
0
        params.__isset.fragment_instance_reports = true;
455
0
        TFragmentInstanceReport t;
456
0
        t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
457
0
        t.__set_num_finished_range(cast_set<int>(req.runtime_state->num_finished_range()));
458
0
        t.__set_loaded_rows(req.runtime_state->num_rows_load_total());
459
0
        t.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
460
0
        params.fragment_instance_reports.push_back(t);
461
46.7k
    } else if (!req.runtime_states.empty()) {
462
173k
        for (auto* rs : req.runtime_states) {
463
173k
            if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 ||
464
173k
                rs->num_finished_range() > 0) {
465
34.9k
                params.__isset.load_counters = true;
466
34.9k
                num_rows_load_success += rs->num_rows_load_success();
467
34.9k
                num_rows_load_filtered += rs->num_rows_load_filtered();
468
34.9k
                num_rows_load_unselected += rs->num_rows_load_unselected();
469
34.9k
                params.__isset.fragment_instance_reports = true;
470
34.9k
                TFragmentInstanceReport t;
471
34.9k
                t.__set_fragment_instance_id(rs->fragment_instance_id());
472
34.9k
                t.__set_num_finished_range(cast_set<int>(rs->num_finished_range()));
473
34.9k
                t.__set_loaded_rows(rs->num_rows_load_total());
474
34.9k
                t.__set_loaded_bytes(rs->num_bytes_load_total());
475
34.9k
                params.fragment_instance_reports.push_back(t);
476
34.9k
            }
477
173k
        }
478
46.7k
    }
479
46.7k
    params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success));
480
46.7k
    params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered));
481
46.7k
    params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected));
482
483
46.7k
    if (!req.load_error_url.empty()) {
484
151
        params.__set_tracking_url(req.load_error_url);
485
151
    }
486
46.7k
    if (!req.first_error_msg.empty()) {
487
151
        params.__set_first_error_msg(req.first_error_msg);
488
151
    }
489
173k
    for (auto* rs : req.runtime_states) {
490
173k
        if (rs->wal_id() > 0) {
491
110
            params.__set_txn_id(rs->wal_id());
492
110
            params.__set_label(rs->import_label());
493
110
        }
494
173k
    }
495
46.7k
    if (!req.runtime_state->export_output_files().empty()) {
496
0
        params.__isset.export_files = true;
497
0
        params.export_files = req.runtime_state->export_output_files();
498
46.7k
    } else if (!req.runtime_states.empty()) {
499
173k
        for (auto* rs : req.runtime_states) {
500
173k
            if (!rs->export_output_files().empty()) {
501
0
                params.__isset.export_files = true;
502
0
                params.export_files.insert(params.export_files.end(),
503
0
                                           rs->export_output_files().begin(),
504
0
                                           rs->export_output_files().end());
505
0
            }
506
173k
        }
507
46.6k
    }
508
46.7k
    if (auto tci = req.runtime_state->tablet_commit_infos(); !tci.empty()) {
509
0
        params.__isset.commitInfos = true;
510
0
        params.commitInfos.insert(params.commitInfos.end(), tci.begin(), tci.end());
511
46.7k
    } else if (!req.runtime_states.empty()) {
512
173k
        for (auto* rs : req.runtime_states) {
513
173k
            if (auto rs_tci = rs->tablet_commit_infos(); !rs_tci.empty()) {
514
25.9k
                params.__isset.commitInfos = true;
515
25.9k
                params.commitInfos.insert(params.commitInfos.end(), rs_tci.begin(), rs_tci.end());
516
25.9k
            }
517
173k
        }
518
46.7k
    }
519
46.7k
    if (auto eti = req.runtime_state->error_tablet_infos(); !eti.empty()) {
520
0
        params.__isset.errorTabletInfos = true;
521
0
        params.errorTabletInfos.insert(params.errorTabletInfos.end(), eti.begin(), eti.end());
522
46.7k
    } else if (!req.runtime_states.empty()) {
523
173k
        for (auto* rs : req.runtime_states) {
524
173k
            if (auto rs_eti = rs->error_tablet_infos(); !rs_eti.empty()) {
525
0
                params.__isset.errorTabletInfos = true;
526
0
                params.errorTabletInfos.insert(params.errorTabletInfos.end(), rs_eti.begin(),
527
0
                                               rs_eti.end());
528
0
            }
529
173k
        }
530
46.7k
    }
531
46.7k
    if (auto hpu = req.runtime_state->hive_partition_updates(); !hpu.empty()) {
532
0
        params.__isset.hive_partition_updates = true;
533
0
        params.hive_partition_updates.insert(params.hive_partition_updates.end(), hpu.begin(),
534
0
                                             hpu.end());
535
46.7k
    } else if (!req.runtime_states.empty()) {
536
173k
        for (auto* rs : req.runtime_states) {
537
173k
            if (auto rs_hpu = rs->hive_partition_updates(); !rs_hpu.empty()) {
538
2.09k
                params.__isset.hive_partition_updates = true;
539
2.09k
                params.hive_partition_updates.insert(params.hive_partition_updates.end(),
540
2.09k
                                                     rs_hpu.begin(), rs_hpu.end());
541
2.09k
            }
542
173k
        }
543
46.7k
    }
544
46.7k
    if (auto icd = req.runtime_state->iceberg_commit_datas(); !icd.empty()) {
545
0
        params.__isset.iceberg_commit_datas = true;
546
0
        params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), icd.begin(),
547
0
                                           icd.end());
548
46.7k
    } else if (!req.runtime_states.empty()) {
549
173k
        for (auto* rs : req.runtime_states) {
550
173k
            if (auto rs_icd = rs->iceberg_commit_datas(); !rs_icd.empty()) {
551
2.07k
                params.__isset.iceberg_commit_datas = true;
552
2.07k
                params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
553
2.07k
                                                   rs_icd.begin(), rs_icd.end());
554
2.07k
            }
555
173k
        }
556
46.6k
    }
557
558
46.7k
    if (auto mcd = req.runtime_state->mc_commit_datas(); !mcd.empty()) {
559
0
        params.__isset.mc_commit_datas = true;
560
0
        params.mc_commit_datas.insert(params.mc_commit_datas.end(), mcd.begin(), mcd.end());
561
46.7k
    } else if (!req.runtime_states.empty()) {
562
173k
        for (auto* rs : req.runtime_states) {
563
173k
            if (auto rs_mcd = rs->mc_commit_datas(); !rs_mcd.empty()) {
564
0
                params.__isset.mc_commit_datas = true;
565
0
                params.mc_commit_datas.insert(params.mc_commit_datas.end(), rs_mcd.begin(),
566
0
                                              rs_mcd.end());
567
0
            }
568
173k
        }
569
46.6k
    }
570
571
    // Send new errors to coordinator
572
46.7k
    req.runtime_state->get_unreported_errors(&(params.error_log));
573
46.7k
    params.__isset.error_log = (!params.error_log.empty());
574
575
46.7k
    if (_exec_env->cluster_info()->backend_id != 0) {
576
46.6k
        params.__set_backend_id(_exec_env->cluster_info()->backend_id);
577
46.6k
    }
578
579
46.7k
    TReportExecStatusResult res;
580
46.7k
    Status rpc_status;
581
582
46.7k
    VLOG_DEBUG << "reportExecStatus params is "
583
35
               << apache::thrift::ThriftDebugString(params).c_str();
584
46.7k
    if (!exec_status.ok()) {
585
1.63k
        LOG(WARNING) << "report error status: " << exec_status.msg()
586
1.63k
                     << " to coordinator: " << req.coord_addr
587
1.63k
                     << ", query id: " << print_id(req.query_id);
588
1.63k
    }
589
46.7k
    try {
590
46.7k
        try {
591
46.7k
            (*coord)->reportExecStatus(res, params);
592
46.7k
        } catch ([[maybe_unused]] TTransportException& e) {
593
#ifndef ADDRESS_SANITIZER
594
            LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(req.query_id)
595
                         << ", instance id: " << print_id(req.fragment_instance_id) << " to "
596
                         << req.coord_addr << ", err: " << e.what();
597
#endif
598
0
            rpc_status = coord->reopen();
599
600
0
            if (!rpc_status.ok()) {
601
                // we need to cancel the execution of this fragment
602
0
                req.cancel_fn(rpc_status);
603
0
                return;
604
0
            }
605
0
            (*coord)->reportExecStatus(res, params);
606
0
        }
607
608
46.7k
        rpc_status = Status::create<false>(res.status);
609
46.7k
    } catch (TException& e) {
610
0
        rpc_status = Status::InternalError("ReportExecStatus() to {} failed: {}",
611
0
                                           PrintThriftNetworkAddress(req.coord_addr), e.what());
612
0
    }
613
614
46.6k
    if (!rpc_status.ok()) {
615
0
        LOG_INFO("Going to cancel query {} since report exec status got rpc failed: {}",
616
0
                 print_id(req.query_id), rpc_status.to_string());
617
        // we need to cancel the execution of this fragment
618
0
        req.cancel_fn(rpc_status);
619
0
    }
620
46.6k
}
621
622
1.11M
static void empty_function(RuntimeState*, Status*) {}
623
624
Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
625
                                       const QuerySource query_source,
626
420k
                                       const TPipelineFragmentParamsList& parent) {
627
420k
    if (params.txn_conf.need_txn) {
628
48
        std::shared_ptr<StreamLoadContext> stream_load_ctx =
629
48
                std::make_shared<StreamLoadContext>(_exec_env);
630
48
        stream_load_ctx->db = params.txn_conf.db;
631
48
        stream_load_ctx->db_id = params.txn_conf.db_id;
632
48
        stream_load_ctx->table = params.txn_conf.tbl;
633
48
        stream_load_ctx->txn_id = params.txn_conf.txn_id;
634
48
        stream_load_ctx->id = UniqueId(params.query_id);
635
48
        stream_load_ctx->put_result.__set_pipeline_params(params);
636
48
        stream_load_ctx->use_streaming = true;
637
48
        stream_load_ctx->load_type = TLoadType::MANUL_LOAD;
638
48
        stream_load_ctx->load_src_type = TLoadSourceType::RAW;
639
48
        stream_load_ctx->label = params.import_label;
640
48
        stream_load_ctx->format = TFileFormatType::FORMAT_CSV_PLAIN;
641
48
        stream_load_ctx->timeout_second = 3600;
642
48
        stream_load_ctx->auth.token = params.txn_conf.token;
643
48
        stream_load_ctx->need_commit_self = true;
644
48
        stream_load_ctx->need_rollback = true;
645
48
        auto pipe = std::make_shared<io::StreamLoadPipe>(
646
48
                io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
647
48
                -1 /* total_length */, true /* use_proto */);
648
48
        stream_load_ctx->body_sink = pipe;
649
48
        stream_load_ctx->pipe = pipe;
650
48
        stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio;
651
652
48
        RETURN_IF_ERROR(
653
48
                _exec_env->new_load_stream_mgr()->put(stream_load_ctx->id, stream_load_ctx));
654
655
48
        RETURN_IF_ERROR(
656
48
                _exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx, parent));
657
48
        return Status::OK();
658
420k
    } else {
659
420k
        return exec_plan_fragment(params, query_source, empty_function, parent);
660
420k
    }
661
420k
}
662
663
// Stage 2. prepare finished. then get FE instruction to execute
664
115k
Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* request) {
665
115k
    TUniqueId query_id;
666
115k
    query_id.__set_hi(request->query_id().hi());
667
115k
    query_id.__set_lo(request->query_id().lo());
668
115k
    auto q_ctx = get_query_ctx(query_id);
669
115k
    if (q_ctx) {
670
115k
        q_ctx->set_ready_to_execute(Status::OK());
671
115k
        LOG_INFO("Query {} start execution", print_id(query_id));
672
115k
    } else {
673
1
        return Status::InternalError(
674
1
                "Failed to get query fragments context. Query {} may be "
675
1
                "timeout or be cancelled. host: {}",
676
1
                print_id(query_id), BackendOptions::get_localhost());
677
1
    }
678
115k
    return Status::OK();
679
115k
}
680
681
425k
void FragmentMgr::remove_pipeline_context(std::pair<TUniqueId, int> key) {
682
425k
    if (_pipeline_map.erase(key)) {
683
425k
        int64_t now = duration_cast<std::chrono::milliseconds>(
684
425k
                              std::chrono::system_clock::now().time_since_epoch())
685
425k
                              .count();
686
425k
        g_fragment_executing_count << -1;
687
425k
        g_fragment_last_active_time.set_value(now);
688
425k
    }
689
425k
}
690
691
392k
void FragmentMgr::remove_query_context(const TUniqueId& key) {
692
    // Clean up any saved rerunnable params for this query to avoid memory leaks.
693
    // This covers both cancel and normal destruction paths.
694
392k
    {
695
392k
        std::lock_guard<std::mutex> lk(_rerunnable_params_lock);
696
410k
        for (auto it = _rerunnable_params_map.begin(); it != _rerunnable_params_map.end();) {
697
17.7k
            if (it->first.first == key) {
698
209
                it = _rerunnable_params_map.erase(it);
699
17.5k
            } else {
700
17.5k
                ++it;
701
17.5k
            }
702
17.7k
        }
703
392k
    }
704
392k
    _query_ctx_map_delay_delete.erase(key);
705
392k
#ifndef BE_TEST
706
392k
    _query_ctx_map.erase(key);
707
392k
#endif
708
392k
}
709
710
750k
std::shared_ptr<QueryContext> FragmentMgr::get_query_ctx(const TUniqueId& query_id) {
711
750k
    auto val = _query_ctx_map.find(query_id);
712
750k
    if (auto q_ctx = val.lock()) {
713
411k
        return q_ctx;
714
411k
    }
715
339k
    return nullptr;
716
750k
}
717
718
Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& params,
719
                                             const TPipelineFragmentParamsList& parent,
720
                                             QuerySource query_source,
721
422k
                                             std::shared_ptr<QueryContext>& query_ctx) {
722
422k
    auto query_id = params.query_id;
723
422k
    DBUG_EXECUTE_IF("FragmentMgr._get_query_ctx.failed", {
724
422k
        return Status::InternalError("FragmentMgr._get_query_ctx.failed, query id {}",
725
422k
                                     print_id(query_id));
726
422k
    });
727
728
    // Find _query_ctx_map, in case some other request has already
729
    // create the query fragments context.
730
422k
    query_ctx = get_query_ctx(query_id);
731
422k
    if (params.is_simplified_param) {
732
        // Get common components from _query_ctx_map
733
138k
        if (!query_ctx) {
734
0
            return Status::InternalError(
735
0
                    "Failed to get query fragments context. Query {} may be timeout or be "
736
0
                    "cancelled. host: {}",
737
0
                    print_id(query_id), BackendOptions::get_localhost());
738
0
        }
739
283k
    } else {
740
284k
        if (!query_ctx) {
741
284k
            RETURN_IF_ERROR(_query_ctx_map.apply_if_not_exists(
742
284k
                    query_id, query_ctx,
743
284k
                    [&](phmap::flat_hash_map<TUniqueId, std::weak_ptr<QueryContext>>& map)
744
284k
                            -> Status {
745
284k
                        WorkloadGroupPtr workload_group_ptr = nullptr;
746
284k
                        std::vector<uint64_t> wg_id_set;
747
284k
                        if (params.__isset.workload_groups && !params.workload_groups.empty()) {
748
284k
                            for (auto& wg : params.workload_groups) {
749
284k
                                wg_id_set.push_back(wg.id);
750
284k
                            }
751
284k
                        }
752
284k
                        workload_group_ptr = _exec_env->workload_group_mgr()->get_group(wg_id_set);
753
754
                        // First time a fragment of a query arrived. print logs.
755
284k
                        LOG(INFO) << "query_id: " << print_id(query_id)
756
284k
                                  << ", coord_addr: " << params.coord
757
284k
                                  << ", total fragment num on current host: "
758
284k
                                  << params.fragment_num_on_host
759
284k
                                  << ", fe process uuid: " << params.query_options.fe_process_uuid
760
284k
                                  << ", query type: " << params.query_options.query_type
761
284k
                                  << ", report audit fe:" << params.current_connect_fe
762
284k
                                  << ", use wg:" << workload_group_ptr->id() << ","
763
284k
                                  << workload_group_ptr->name();
764
765
                        // This may be a first fragment request of the query.
766
                        // Create the query fragments context.
767
                        // Cross-cluster query: coordinator FE may not belong to local cluster.
768
                        // In that case, cancel_worker() should not cancel it based on local FE liveness.
769
284k
                        QuerySource actual_query_source = query_source;
770
284k
                        if (query_source == QuerySource::INTERNAL_FRONTEND &&
771
284k
                            !_exec_env->get_running_frontends().contains(params.coord)) {
772
284k
                            actual_query_source = QuerySource::EXTERNAL_FRONTEND;
773
284k
                        }
774
284k
                        query_ctx = QueryContext::create(
775
284k
                                query_id, _exec_env, params.query_options, params.coord,
776
284k
                                params.is_nereids, params.current_connect_fe, actual_query_source);
777
284k
                        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker());
778
284k
                        RETURN_IF_ERROR(DescriptorTbl::create(
779
284k
                                &(query_ctx->obj_pool), params.desc_tbl, &(query_ctx->desc_tbl)));
780
                        // set file scan range params
781
284k
                        if (params.__isset.file_scan_params) {
782
284k
                            query_ctx->file_scan_range_params_map = params.file_scan_params;
783
284k
                        }
784
785
284k
                        query_ctx->query_globals = params.query_globals;
786
787
284k
                        if (params.__isset.resource_info) {
788
284k
                            query_ctx->user = params.resource_info.user;
789
284k
                            query_ctx->group = params.resource_info.group;
790
284k
                            query_ctx->set_rsc_info = true;
791
284k
                        }
792
793
284k
                        if (params.__isset.ai_resources) {
794
284k
                            query_ctx->set_ai_resources(params.ai_resources);
795
284k
                        }
796
797
284k
                        RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr));
798
799
284k
                        if (parent.__isset.runtime_filter_info) {
800
284k
                            auto info = parent.runtime_filter_info;
801
284k
                            if (info.__isset.runtime_filter_params) {
802
284k
                                auto handler =
803
284k
                                        std::make_shared<RuntimeFilterMergeControllerEntity>();
804
284k
                                RETURN_IF_ERROR(
805
284k
                                        handler->init(query_ctx, info.runtime_filter_params));
806
284k
                                query_ctx->set_merge_controller_handler(handler);
807
808
284k
                                query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
809
284k
                                        info.runtime_filter_params);
810
284k
                                if (!handler->empty()) {
811
284k
                                    _query_ctx_map_delay_delete.insert(query_id, query_ctx);
812
284k
                                }
813
284k
                            }
814
284k
                            if (info.__isset.topn_filter_descs) {
815
284k
                                query_ctx->init_runtime_predicates(info.topn_filter_descs);
816
284k
                            }
817
284k
                        }
818
819
                        // There is some logic in query ctx's dctor, we could not check if exists and delete the
820
                        // temp query ctx now. For example, the query id maybe removed from workload group's queryset.
821
284k
                        map.insert({query_id, query_ctx});
822
284k
                        return Status::OK();
823
284k
                    }));
824
284k
        }
825
283k
    }
826
422k
    return Status::OK();
827
422k
}
828
829
135
std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) {
830
135
    fmt::memory_buffer debug_string_buffer;
831
135
    size_t i = 0;
832
135
    {
833
135
        fmt::format_to(debug_string_buffer,
834
135
                       "{} pipeline fragment contexts are still running! duration_limit={}\n",
835
135
                       _pipeline_map.num_items(), duration);
836
135
        timespec now;
837
135
        clock_gettime(CLOCK_MONOTONIC, &now);
838
839
135
        _pipeline_map.apply([&](phmap::flat_hash_map<std::pair<TUniqueId, int>,
840
135
                                                     std::shared_ptr<PipelineFragmentContext>>& map)
841
17.2k
                                    -> Status {
842
17.2k
            std::set<TUniqueId> query_id_set;
843
17.2k
            for (auto& it : map) {
844
1.36k
                auto elapsed = it.second->elapsed_time() / 1000000000;
845
1.36k
                if (elapsed < duration) {
846
                    // Only display tasks which has been running for more than {duration} seconds.
847
1.26k
                    continue;
848
1.26k
                }
849
101
                if (!query_id_set.contains(it.first.first)) {
850
75
                    query_id_set.insert(it.first.first);
851
75
                    fmt::format_to(
852
75
                            debug_string_buffer, "QueryId: {}, global_runtime_filter_mgr: {}\n",
853
75
                            print_id(it.first.first),
854
75
                            it.second->get_query_ctx()->runtime_filter_mgr()->debug_string());
855
856
75
                    if (it.second->get_query_ctx()->get_merge_controller_handler()) {
857
72
                        fmt::format_to(debug_string_buffer, "{}\n",
858
72
                                       it.second->get_query_ctx()
859
72
                                               ->get_merge_controller_handler()
860
72
                                               ->debug_string());
861
72
                    }
862
75
                }
863
864
101
                auto timeout_second = it.second->timeout_second();
865
101
                fmt::format_to(
866
101
                        debug_string_buffer,
867
101
                        "No.{} (elapse_second={}s, query_timeout_second={}s, is_timeout={}): {}\n",
868
101
                        i, elapsed, timeout_second, it.second->is_timeout(now),
869
101
                        it.second->debug_string());
870
101
                i++;
871
101
            }
872
17.2k
            return Status::OK();
873
17.2k
        });
874
135
    }
875
135
    return fmt::to_string(debug_string_buffer);
876
135
}
877
878
0
std::string FragmentMgr::dump_pipeline_tasks(TUniqueId& query_id) {
879
0
    if (auto q_ctx = get_query_ctx(query_id)) {
880
0
        return q_ctx->print_all_pipeline_context();
881
0
    } else {
882
0
        return fmt::format(
883
0
                "Dump pipeline tasks failed: Query context (query id = {}) not found. \n",
884
0
                print_id(query_id));
885
0
    }
886
0
}
887
888
Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
889
                                       QuerySource query_source, const FinishCallback& cb,
890
                                       const TPipelineFragmentParamsList& parent,
891
423k
                                       std::shared_ptr<bool> is_prepare_success) {
892
423k
    VLOG_ROW << "Query: " << print_id(params.query_id) << " exec_plan_fragment params is "
893
459
             << apache::thrift::ThriftDebugString(params).c_str();
894
    // sometimes TPipelineFragmentParams debug string is too long and glog
895
    // will truncate the log line, so print query options seperately for debuggin purpose
896
423k
    VLOG_ROW << "Query: " << print_id(params.query_id) << "query options is "
897
398
             << apache::thrift::ThriftDebugString(params.query_options).c_str();
898
899
423k
    std::shared_ptr<QueryContext> query_ctx;
900
423k
    RETURN_IF_ERROR(_get_or_create_query_ctx(params, parent, query_source, query_ctx));
901
423k
    SCOPED_ATTACH_TASK(query_ctx.get()->resource_ctx());
902
    // Set single_backend_query before prepare() so that pipeline local states
903
    // (e.g. StreamingAggLocalState) can read the correct value in their constructors.
904
423k
    query_ctx->set_single_backend_query(params.__isset.query_options &&
905
423k
                                        params.query_options.__isset.single_backend_query &&
906
423k
                                        params.query_options.single_backend_query);
907
423k
    int64_t duration_ns = 0;
908
423k
    std::shared_ptr<PipelineFragmentContext> context = std::make_shared<PipelineFragmentContext>(
909
423k
            query_ctx->query_id(), params, query_ctx, _exec_env, cb,
910
423k
            [this](const ReportStatusRequest& req, auto&& ctx) {
911
46.6k
                return this->trigger_pipeline_context_report(req, std::move(ctx));
912
46.6k
            });
913
423k
    {
914
423k
        SCOPED_RAW_TIMER(&duration_ns);
915
423k
        Status prepare_st = Status::OK();
916
423k
        ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st = context->prepare(_thread_pool.get()),
917
423k
                                         prepare_st);
918
422k
        DBUG_EXECUTE_IF("FragmentMgr.exec_plan_fragment.prepare_failed", {
919
422k
            prepare_st = Status::Aborted("FragmentMgr.exec_plan_fragment.prepare_failed");
920
422k
        });
921
422k
        if (!prepare_st.ok()) {
922
1.20k
            query_ctx->cancel(prepare_st, params.fragment_id);
923
1.20k
            return prepare_st;
924
1.20k
        }
925
422k
    }
926
421k
    g_fragmentmgr_prepare_latency << (duration_ns / 1000);
927
928
421k
    DBUG_EXECUTE_IF("FragmentMgr.exec_plan_fragment.failed",
929
421k
                    { return Status::Aborted("FragmentMgr.exec_plan_fragment.failed"); });
930
421k
    {
931
421k
        int64_t now = duration_cast<std::chrono::milliseconds>(
932
421k
                              std::chrono::system_clock::now().time_since_epoch())
933
421k
                              .count();
934
421k
        g_fragment_executing_count << 1;
935
421k
        g_fragment_last_active_time.set_value(now);
936
937
        // (query_id, fragment_id) is executed only on one BE, locks _pipeline_map.
938
421k
        auto res = _pipeline_map.find({params.query_id, params.fragment_id});
939
421k
        if (res != nullptr) {
940
0
            return Status::InternalError(
941
0
                    "exec_plan_fragment query_id({}) input duplicated fragment_id({})",
942
0
                    print_id(params.query_id), params.fragment_id);
943
0
        }
944
421k
        _pipeline_map.insert({params.query_id, params.fragment_id}, context);
945
421k
    }
946
947
    // Save params for recursive CTE child fragments so we can recreate the PFC later.
948
    // For recursive CTE, the child fragment needs to be destroyed and rebuilt between rounds,
949
    // so we save the original params here and use them in rerun_fragment(rebuild).
950
421k
    if (params.__isset.need_notify_close && params.need_notify_close) {
951
209
        std::lock_guard<std::mutex> lk(_rerunnable_params_lock);
952
209
        _rerunnable_params_map[{params.query_id, params.fragment_id}] = {
953
209
                .deregister_runtime_filter_ids = {},
954
209
                .params = params,
955
209
                .parent = parent,
956
209
                .finish_callback = cb,
957
209
                .query_ctx = query_ctx};
958
209
    }
959
960
421k
    if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) {
961
167k
        query_ctx->set_ready_to_execute_only();
962
167k
    }
963
964
421k
    query_ctx->set_pipeline_context(params.fragment_id, context);
965
966
421k
    RETURN_IF_ERROR(context->submit());
967
421k
    if (is_prepare_success != nullptr) {
968
2.31k
        *is_prepare_success = true;
969
2.31k
    }
970
421k
    return Status::OK();
971
421k
}
972
973
165k
void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) {
974
165k
    std::shared_ptr<QueryContext> query_ctx = nullptr;
975
165k
    {
976
165k
        if (auto q_ctx = get_query_ctx(query_id)) {
977
108k
            query_ctx = q_ctx;
978
108k
        } else {
979
56.7k
            LOG(WARNING) << "Query " << print_id(query_id)
980
56.7k
                         << " does not exists, failed to cancel it";
981
56.7k
            return;
982
56.7k
        }
983
165k
    }
984
108k
    SCOPED_ATTACH_TASK(query_ctx->resource_ctx());
985
108k
    query_ctx->cancel(reason);
986
108k
    remove_query_context(query_id);
987
    // Clean up id_file_map in IdManager if exists
988
108k
    if (ExecEnv::GetInstance()->get_id_manager()->get_id_file_map(query_id)) {
989
840
        ExecEnv::GetInstance()->get_id_manager()->remove_id_file_map(query_id);
990
840
    }
991
108k
    LOG(INFO) << "Query " << print_id(query_id)
992
108k
              << " is cancelled and removed. Reason: " << reason.to_string();
993
108k
}
994
995
14
void FragmentMgr::cancel_worker() {
996
14
    LOG(INFO) << "FragmentMgr cancel worker start working.";
997
998
14
    timespec check_invalid_query_last_timestamp;
999
14
    clock_gettime(CLOCK_MONOTONIC, &check_invalid_query_last_timestamp);
1000
1001
12.1k
    do {
1002
12.1k
        std::vector<TUniqueId> queries_lost_coordinator;
1003
12.1k
        std::vector<TUniqueId> queries_timeout;
1004
12.1k
        std::vector<TUniqueId> queries_pipeline_task_leak;
1005
        // Fe process uuid -> set<QueryId>
1006
12.1k
        std::map<int64_t, std::unordered_set<TUniqueId>> running_queries_on_all_fes;
1007
12.1k
        const std::map<TNetworkAddress, FrontendInfo>& running_fes =
1008
12.1k
                ExecEnv::GetInstance()->get_running_frontends();
1009
1010
12.1k
        timespec now;
1011
12.1k
        clock_gettime(CLOCK_MONOTONIC, &now);
1012
1013
12.1k
        if (config::enable_pipeline_task_leakage_detect &&
1014
12.1k
            now.tv_sec - check_invalid_query_last_timestamp.tv_sec >
1015
0
                    config::pipeline_task_leakage_detect_period_secs) {
1016
0
            check_invalid_query_last_timestamp = now;
1017
0
            running_queries_on_all_fes = _get_all_running_queries_from_fe();
1018
12.1k
        } else {
1019
12.1k
            running_queries_on_all_fes.clear();
1020
12.1k
        }
1021
1022
12.1k
        std::vector<std::shared_ptr<PipelineFragmentContext>> ctx;
1023
12.1k
        _pipeline_map.apply(
1024
12.1k
                [&](phmap::flat_hash_map<std::pair<TUniqueId, int>,
1025
1.55M
                                         std::shared_ptr<PipelineFragmentContext>>& map) -> Status {
1026
1.55M
                    ctx.reserve(ctx.size() + map.size());
1027
1.55M
                    for (auto& pipeline_itr : map) {
1028
83.8k
                        ctx.push_back(pipeline_itr.second);
1029
83.8k
                    }
1030
1.55M
                    return Status::OK();
1031
1.55M
                });
1032
83.8k
        for (auto& c : ctx) {
1033
83.8k
            c->clear_finished_tasks();
1034
83.8k
        }
1035
1036
12.1k
        std::unordered_map<std::shared_ptr<PBackendService_Stub>, BrpcItem> brpc_stub_with_queries;
1037
12.1k
        _collect_timeout_queries_and_brpc_items(queries_timeout, brpc_stub_with_queries, now);
1038
1039
        // We use a very conservative cancel strategy.
1040
        // 0. If there are no running frontends, do not cancel any queries.
1041
        // 1. If query's process uuid is zero, do not cancel
1042
        // 2. If same process uuid, do not cancel
1043
        // 3. If fe has zero process uuid, do not cancel
1044
12.1k
        if (running_fes.empty() && _query_ctx_map.num_items() != 0) {
1045
0
            LOG_EVERY_N(WARNING, 10)
1046
0
                    << "Could not find any running frontends, maybe we are upgrading or "
1047
0
                       "starting? "
1048
0
                    << "We will not cancel any outdated queries in this situation.";
1049
12.1k
        } else {
1050
12.1k
            _collect_invalid_queries(queries_lost_coordinator, queries_pipeline_task_leak,
1051
12.1k
                                     running_queries_on_all_fes, running_fes,
1052
12.1k
                                     check_invalid_query_last_timestamp);
1053
12.1k
        }
1054
1055
12.1k
        if (config::enable_brpc_connection_check) {
1056
4.18k
            for (auto it : brpc_stub_with_queries) {
1057
2.92k
                if (!it.first) {
1058
0
                    LOG(WARNING) << "brpc stub is nullptr, skip it.";
1059
0
                    continue;
1060
0
                }
1061
2.92k
                _check_brpc_available(it.first, it.second);
1062
2.92k
            }
1063
4.18k
        }
1064
1065
12.1k
        if (!queries_lost_coordinator.empty()) {
1066
5
            LOG(INFO) << "There are " << queries_lost_coordinator.size()
1067
5
                      << " queries need to be cancelled, coordinator dead or restarted.";
1068
5
        }
1069
1070
12.1k
        for (const auto& qid : queries_timeout) {
1071
1
            cancel_query(qid,
1072
1
                         Status::Error<ErrorCode::TIMEOUT>(
1073
1
                                 "FragmentMgr cancel worker going to cancel timeout instance "));
1074
1
        }
1075
1076
12.1k
        for (const auto& qid : queries_pipeline_task_leak) {
1077
            // Cancel the query, and maybe try to report debug info to fe so that we can
1078
            // collect debug info by sql or http api instead of search log.
1079
0
            cancel_query(qid, Status::Error<ErrorCode::ILLEGAL_STATE>(
1080
0
                                      "Potential pipeline task leakage"));
1081
0
        }
1082
1083
12.1k
        for (const auto& qid : queries_lost_coordinator) {
1084
5
            cancel_query(qid, Status::Error<ErrorCode::CANCELLED>(
1085
5
                                      "Source frontend is not running or restarted"));
1086
5
        }
1087
1088
12.1k
    } while (!_stop_background_threads_latch.wait_for(
1089
12.1k
            std::chrono::seconds(config::fragment_mgr_cancel_worker_interval_seconds)));
1090
14
    LOG(INFO) << "FragmentMgr cancel worker is going to exit.";
1091
14
}
1092
1093
void FragmentMgr::_collect_timeout_queries_and_brpc_items(
1094
        std::vector<TUniqueId>& queries_timeout,
1095
        std::unordered_map<std::shared_ptr<PBackendService_Stub>, BrpcItem>& brpc_stub_with_queries,
1096
12.1k
        timespec now) {
1097
12.1k
    std::vector<std::shared_ptr<QueryContext>> contexts;
1098
12.1k
    _query_ctx_map.apply(
1099
1.55M
            [&](phmap::flat_hash_map<TUniqueId, std::weak_ptr<QueryContext>>& map) -> Status {
1100
1.61M
                for (auto it = map.begin(); it != map.end();) {
1101
60.5k
                    if (auto q_ctx = it->second.lock()) {
1102
60.5k
                        contexts.push_back(q_ctx);
1103
60.5k
                        if (q_ctx->is_timeout(now)) {
1104
1
                            LOG_WARNING("Query {} is timeout", print_id(it->first));
1105
1
                            queries_timeout.push_back(it->first);
1106
60.5k
                        } else if (config::enable_brpc_connection_check) {
1107
50.5k
                            auto brpc_stubs = q_ctx->get_using_brpc_stubs();
1108
50.5k
                            for (auto& item : brpc_stubs) {
1109
9.97k
                                if (!brpc_stub_with_queries.contains(item.second)) {
1110
2.92k
                                    brpc_stub_with_queries.emplace(item.second,
1111
2.92k
                                                                   BrpcItem {item.first, {q_ctx}});
1112
7.05k
                                } else {
1113
7.05k
                                    brpc_stub_with_queries[item.second].queries.emplace_back(q_ctx);
1114
7.05k
                                }
1115
9.97k
                            }
1116
50.5k
                        }
1117
60.5k
                        ++it;
1118
60.5k
                    } else {
1119
41
                        it = map.erase(it);
1120
41
                    }
1121
60.5k
                }
1122
1.55M
                return Status::OK();
1123
1.55M
            });
1124
12.1k
}
1125
1126
void FragmentMgr::_collect_invalid_queries(
1127
        std::vector<TUniqueId>& queries_lost_coordinator,
1128
        std::vector<TUniqueId>& queries_pipeline_task_leak,
1129
        const std::map<int64_t, std::unordered_set<TUniqueId>>& running_queries_on_all_fes,
1130
        const std::map<TNetworkAddress, FrontendInfo>& running_fes,
1131
12.1k
        timespec check_invalid_query_last_timestamp) {
1132
12.1k
    std::vector<std::shared_ptr<QueryContext>> q_contexts;
1133
12.1k
    _query_ctx_map.apply([&](phmap::flat_hash_map<TUniqueId, std::weak_ptr<QueryContext>>& map)
1134
1.55M
                                 -> Status {
1135
1.55M
        for (const auto& it : map) {
1136
60.6k
            if (auto q_ctx = it.second.lock()) {
1137
                // Cross-cluster query: coordinator FE is not in local `running_fes`,
1138
                // we should not cancel it based on local coordinator liveness.
1139
60.6k
                if (q_ctx->get_query_source() == QuerySource::EXTERNAL_FRONTEND) {
1140
1
                    continue;
1141
1
                }
1142
60.6k
                q_contexts.push_back(q_ctx);
1143
60.6k
                const int64_t fe_process_uuid = q_ctx->get_fe_process_uuid();
1144
1145
60.6k
                if (fe_process_uuid == 0) {
1146
                    // zero means this query is from a older version fe or
1147
                    // this fe is starting
1148
698
                    continue;
1149
698
                }
1150
1151
                // If the query is not running on the any frontends, cancel it.
1152
59.9k
                if (auto itr = running_queries_on_all_fes.find(fe_process_uuid);
1153
59.9k
                    itr != running_queries_on_all_fes.end()) {
1154
                    // Query not found on this frontend, and the query arrives before the last check
1155
0
                    if (itr->second.find(it.first) == itr->second.end() &&
1156
                        // tv_nsec represents the number of nanoseconds that have elapsed since the time point stored in tv_sec.
1157
                        // tv_sec is enough, we do not need to check tv_nsec.
1158
0
                        q_ctx->get_query_arrival_timestamp().tv_sec <
1159
0
                                check_invalid_query_last_timestamp.tv_sec &&
1160
0
                        q_ctx->get_query_source() == QuerySource::INTERNAL_FRONTEND) {
1161
0
                        queries_pipeline_task_leak.push_back(q_ctx->query_id());
1162
0
                        LOG_INFO(
1163
0
                                "Query {}, type {} is not found on any frontends, "
1164
0
                                "maybe it "
1165
0
                                "is leaked.",
1166
0
                                print_id(q_ctx->query_id()), toString(q_ctx->get_query_source()));
1167
0
                        continue;
1168
0
                    }
1169
0
                }
1170
1171
59.9k
                auto itr = running_fes.find(q_ctx->coord_addr);
1172
59.9k
                if (itr != running_fes.end()) {
1173
59.9k
                    if (fe_process_uuid == itr->second.info.process_uuid ||
1174
59.9k
                        itr->second.info.process_uuid == 0) {
1175
59.9k
                        continue;
1176
59.9k
                    } else {
1177
0
                        LOG_WARNING(
1178
0
                                "Coordinator of query {} restarted, going to cancel "
1179
0
                                "it.",
1180
0
                                print_id(q_ctx->query_id()));
1181
0
                    }
1182
59.9k
                } else {
1183
                    // In some rear cases, the rpc port of follower is not updated in time,
1184
                    // then the port of this follower will be zero, but acutally it is still running,
1185
                    // and be has already received the query from follower.
1186
                    // So we need to check if host is in running_fes.
1187
0
                    bool fe_host_is_standing = std::any_of(
1188
0
                            running_fes.begin(), running_fes.end(), [&q_ctx](const auto& fe) {
1189
0
                                return fe.first.hostname == q_ctx->coord_addr.hostname &&
1190
0
                                       fe.first.port == 0;
1191
0
                            });
1192
0
                    if (fe_host_is_standing) {
1193
0
                        LOG_WARNING(
1194
0
                                "Coordinator {}:{} is not found, but its host is still "
1195
0
                                "running with an unstable brpc port, not going to "
1196
0
                                "cancel "
1197
0
                                "it.",
1198
0
                                q_ctx->coord_addr.hostname, q_ctx->coord_addr.port,
1199
0
                                print_id(q_ctx->query_id()));
1200
0
                        continue;
1201
0
                    } else {
1202
0
                        LOG_WARNING(
1203
0
                                "Could not find target coordinator {}:{} of query {}, "
1204
0
                                "going to "
1205
0
                                "cancel it.",
1206
0
                                q_ctx->coord_addr.hostname, q_ctx->coord_addr.port,
1207
0
                                print_id(q_ctx->query_id()));
1208
0
                    }
1209
0
                }
1210
59.9k
            }
1211
            // Coordinator of this query has already dead or query context has been released.
1212
5
            queries_lost_coordinator.push_back(it.first);
1213
5
        }
1214
1.55M
        return Status::OK();
1215
1.55M
    });
1216
12.1k
}
1217
1218
void FragmentMgr::_check_brpc_available(const std::shared_ptr<PBackendService_Stub>& brpc_stub,
1219
2.92k
                                        const BrpcItem& brpc_item) {
1220
2.92k
    const std::string message = "hello doris!";
1221
2.92k
    std::string error_message;
1222
2.92k
    int32_t failed_count = 0;
1223
2.92k
    const int64_t check_timeout_ms =
1224
2.92k
            std::max<int64_t>(100, config::brpc_connection_check_timeout_ms);
1225
1226
2.92k
    while (true) {
1227
2.92k
        PHandShakeRequest request;
1228
2.92k
        request.set_hello(message);
1229
2.92k
        PHandShakeResponse response;
1230
2.92k
        brpc::Controller cntl;
1231
2.92k
        cntl.set_timeout_ms(check_timeout_ms);
1232
2.92k
        cntl.set_max_retry(10);
1233
2.92k
        brpc_stub->hand_shake(&cntl, &request, &response, nullptr);
1234
1235
2.92k
        if (cntl.Failed()) {
1236
0
            error_message = cntl.ErrorText();
1237
0
            LOG(WARNING) << "brpc stub: " << brpc_item.network_address.hostname << ":"
1238
0
                         << brpc_item.network_address.port << " check failed: " << error_message;
1239
2.92k
        } else if (response.has_status() && response.status().status_code() == 0) {
1240
2.92k
            break;
1241
2.92k
        } else {
1242
0
            error_message = response.DebugString();
1243
0
            LOG(WARNING) << "brpc stub: " << brpc_item.network_address.hostname << ":"
1244
0
                         << brpc_item.network_address.port << " check failed: " << error_message;
1245
0
        }
1246
0
        failed_count++;
1247
0
        if (failed_count == 2) {
1248
0
            for (const auto& query_wptr : brpc_item.queries) {
1249
0
                auto query = query_wptr.lock();
1250
0
                if (query && !query->is_cancelled()) {
1251
0
                    query->cancel(Status::InternalError("brpc(dest: {}:{}) check failed: {}",
1252
0
                                                        brpc_item.network_address.hostname,
1253
0
                                                        brpc_item.network_address.port,
1254
0
                                                        error_message));
1255
0
                }
1256
0
            }
1257
1258
0
            LOG(WARNING) << "remove brpc stub from cache: " << brpc_item.network_address.hostname
1259
0
                         << ":" << brpc_item.network_address.port << ", error: " << error_message;
1260
0
            ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
1261
0
                    brpc_item.network_address.hostname, brpc_item.network_address.port);
1262
0
            break;
1263
0
        }
1264
0
    }
1265
2.92k
}
1266
1267
0
void FragmentMgr::debug(std::stringstream& ss) {}
1268
/*
1269
 * 1. resolve opaqued_query_plan to thrift structure
1270
 * 2. build TPipelineFragmentParams
1271
 */
1272
Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params,
1273
                                                const TQueryPlanInfo& t_query_plan_info,
1274
                                                const TUniqueId& query_id,
1275
                                                const TUniqueId& fragment_instance_id,
1276
3
                                                std::vector<TScanColumnDesc>* selected_columns) {
1277
    // set up desc tbl
1278
3
    DescriptorTbl* desc_tbl = nullptr;
1279
3
    ObjectPool obj_pool;
1280
3
    Status st = DescriptorTbl::create(&obj_pool, t_query_plan_info.desc_tbl, &desc_tbl);
1281
3
    if (!st.ok()) {
1282
0
        LOG(WARNING) << "open context error: extract DescriptorTbl failure";
1283
0
        std::stringstream msg;
1284
0
        msg << " create DescriptorTbl error, should not be modified after returned Doris FE "
1285
0
               "processed";
1286
0
        return Status::InvalidArgument(msg.str());
1287
0
    }
1288
3
    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
1289
3
    if (tuple_desc == nullptr) {
1290
0
        LOG(WARNING) << "open context error: extract TupleDescriptor failure";
1291
0
        std::stringstream msg;
1292
0
        msg << " get  TupleDescriptor error, should not be modified after returned Doris FE "
1293
0
               "processed";
1294
0
        return Status::InvalidArgument(msg.str());
1295
0
    }
1296
    // process selected columns form slots
1297
50
    for (const SlotDescriptor* slot : tuple_desc->slots()) {
1298
50
        TScanColumnDesc col;
1299
50
        col.__set_name(slot->col_name());
1300
50
        col.__set_type(to_thrift(slot->type()->get_primitive_type()));
1301
50
        selected_columns->emplace_back(std::move(col));
1302
50
    }
1303
1304
3
    VLOG_QUERY << "BackendService execute open()  TQueryPlanInfo: "
1305
0
               << apache::thrift::ThriftDebugString(t_query_plan_info);
1306
    // assign the param used to execute PlanFragment
1307
3
    TPipelineFragmentParams exec_fragment_params;
1308
3
    exec_fragment_params.protocol_version = (PaloInternalServiceVersion::type)0;
1309
3
    exec_fragment_params.__set_is_simplified_param(false);
1310
3
    exec_fragment_params.__set_fragment(t_query_plan_info.plan_fragment);
1311
3
    exec_fragment_params.__set_desc_tbl(t_query_plan_info.desc_tbl);
1312
1313
    // assign the param used for executing of PlanFragment-self
1314
3
    TPipelineInstanceParams fragment_exec_params;
1315
3
    exec_fragment_params.query_id = query_id;
1316
3
    fragment_exec_params.fragment_instance_id = fragment_instance_id;
1317
3
    exec_fragment_params.coord.hostname = "external";
1318
3
    std::map<::doris::TPlanNodeId, std::vector<TScanRangeParams>> per_node_scan_ranges;
1319
3
    std::vector<TScanRangeParams> scan_ranges;
1320
3
    std::vector<int64_t> tablet_ids = params.tablet_ids;
1321
3
    TNetworkAddress address;
1322
3
    address.hostname = BackendOptions::get_localhost();
1323
3
    address.port = doris::config::be_port;
1324
3
    std::map<int64_t, TTabletVersionInfo> tablet_info = t_query_plan_info.tablet_info;
1325
3
    for (auto tablet_id : params.tablet_ids) {
1326
3
        TPaloScanRange scan_range;
1327
3
        scan_range.db_name = params.database;
1328
3
        scan_range.table_name = params.table;
1329
3
        auto iter = tablet_info.find(tablet_id);
1330
3
        if (iter != tablet_info.end()) {
1331
3
            TTabletVersionInfo info = iter->second;
1332
3
            scan_range.tablet_id = tablet_id;
1333
3
            scan_range.version = std::to_string(info.version);
1334
            // Useless but it is required field in TPaloScanRange
1335
3
            scan_range.version_hash = "0";
1336
3
            scan_range.schema_hash = std::to_string(info.schema_hash);
1337
3
            scan_range.hosts.push_back(address);
1338
3
        } else {
1339
0
            std::stringstream msg;
1340
0
            msg << "tablet_id: " << tablet_id << " not found";
1341
0
            LOG(WARNING) << "tablet_id [ " << tablet_id << " ] not found";
1342
0
            return Status::NotFound(msg.str());
1343
0
        }
1344
3
        TScanRange doris_scan_range;
1345
3
        doris_scan_range.__set_palo_scan_range(scan_range);
1346
3
        TScanRangeParams scan_range_params;
1347
3
        scan_range_params.scan_range = doris_scan_range;
1348
3
        scan_ranges.push_back(scan_range_params);
1349
3
    }
1350
3
    per_node_scan_ranges.insert(std::make_pair((::doris::TPlanNodeId)0, scan_ranges));
1351
3
    fragment_exec_params.per_node_scan_ranges = per_node_scan_ranges;
1352
3
    exec_fragment_params.local_params.push_back(fragment_exec_params);
1353
3
    TQueryOptions query_options;
1354
3
    query_options.batch_size = params.batch_size;
1355
3
    query_options.execution_timeout = params.execution_timeout;
1356
3
    query_options.mem_limit = params.mem_limit;
1357
3
    query_options.query_type = TQueryType::EXTERNAL;
1358
3
    query_options.be_exec_version = BeExecVersionManager::get_newest_version();
1359
3
    exec_fragment_params.__set_query_options(query_options);
1360
3
    VLOG_ROW << "external exec_plan_fragment params is "
1361
0
             << apache::thrift::ThriftDebugString(exec_fragment_params).c_str();
1362
1363
3
    TPipelineFragmentParamsList mocked;
1364
3
    return exec_plan_fragment(exec_fragment_params, QuerySource::EXTERNAL_CONNECTOR, mocked);
1365
3
}
1366
1367
Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
1368
1.95k
                                   butil::IOBufAsZeroCopyInputStream* attach_data) {
1369
1.95k
    UniqueId queryid = request->query_id();
1370
1.95k
    TUniqueId query_id;
1371
1.95k
    query_id.__set_hi(queryid.hi);
1372
1.95k
    query_id.__set_lo(queryid.lo);
1373
1.95k
    if (auto q_ctx = get_query_ctx(query_id)) {
1374
1.94k
        SCOPED_ATTACH_TASK(q_ctx.get());
1375
1.94k
        RuntimeFilterMgr* runtime_filter_mgr = q_ctx->runtime_filter_mgr();
1376
1.94k
        DCHECK(runtime_filter_mgr != nullptr);
1377
1378
        // 1. get the target filters
1379
1.94k
        std::vector<std::shared_ptr<RuntimeFilterConsumer>> filters =
1380
1.94k
                runtime_filter_mgr->get_consume_filters(request->filter_id());
1381
1382
        // 2. create the filter wrapper to replace or ignore/disable the target filters
1383
1.94k
        if (!filters.empty()) {
1384
            // Discard stale-stage requests from old recursive CTE rounds.
1385
1.94k
            if (filters[0]->stage() != request->stage()) {
1386
0
                return Status::OK();
1387
0
            }
1388
1.94k
            RETURN_IF_ERROR(filters[0]->assign(*request, attach_data));
1389
4.97k
            std::ranges::for_each(filters, [&](auto& filter) { filter->signal(filters[0].get()); });
1390
1.94k
        }
1391
1.94k
    }
1392
1.95k
    return Status::OK();
1393
1.95k
}
1394
1395
303
Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {
1396
303
    UniqueId queryid = request->query_id();
1397
303
    TUniqueId query_id;
1398
303
    query_id.__set_hi(queryid.hi);
1399
303
    query_id.__set_lo(queryid.lo);
1400
1401
303
    if (config::enable_debug_points &&
1402
303
        DebugPoints::instance()->is_enable("FragmentMgr::send_filter_size.return_eof")) {
1403
0
        return Status::EndOfFile("inject FragmentMgr::send_filter_size.return_eof");
1404
0
    }
1405
1406
303
    if (auto q_ctx = get_query_ctx(query_id)) {
1407
303
        return q_ctx->get_merge_controller_handler()->send_filter_size(q_ctx, request);
1408
303
    } else {
1409
0
        return Status::EndOfFile(
1410
0
                "Send filter size failed: Query context (query-id: {}) not found, maybe "
1411
0
                "finished",
1412
0
                queryid.to_string());
1413
0
    }
1414
303
}
1415
1416
303
Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest* request) {
1417
303
    UniqueId queryid = request->query_id();
1418
303
    TUniqueId query_id;
1419
303
    query_id.__set_hi(queryid.hi);
1420
303
    query_id.__set_lo(queryid.lo);
1421
303
    if (auto q_ctx = get_query_ctx(query_id)) {
1422
303
        try {
1423
303
            return q_ctx->runtime_filter_mgr()->sync_filter_size(request);
1424
303
        } catch (const Exception& e) {
1425
0
            return Status::InternalError(
1426
0
                    "Sync filter size failed: Query context (query-id: {}) error: {}",
1427
0
                    queryid.to_string(), e.what());
1428
0
        }
1429
303
    } else {
1430
0
        return Status::EndOfFile(
1431
0
                "Sync filter size failed: Query context (query-id: {}) already finished",
1432
0
                queryid.to_string());
1433
0
    }
1434
303
}
1435
1436
Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
1437
3.05k
                                 butil::IOBufAsZeroCopyInputStream* attach_data) {
1438
3.05k
    UniqueId queryid = request->query_id();
1439
1440
3.05k
    TUniqueId query_id;
1441
3.05k
    query_id.__set_hi(queryid.hi);
1442
3.05k
    query_id.__set_lo(queryid.lo);
1443
3.05k
    if (auto q_ctx = get_query_ctx(query_id)) {
1444
3.03k
        SCOPED_ATTACH_TASK(q_ctx.get());
1445
3.03k
        if (!q_ctx->get_merge_controller_handler()) {
1446
0
            return Status::InternalError("Merge filter failed: Merge controller handler is null");
1447
0
        }
1448
3.03k
        return q_ctx->get_merge_controller_handler()->merge(q_ctx, request, attach_data);
1449
3.03k
    } else {
1450
18
        return Status::EndOfFile(
1451
18
                "Merge filter size failed: Query context (query-id: {}) already finished",
1452
18
                queryid.to_string());
1453
18
    }
1454
3.05k
}
1455
1456
void FragmentMgr::get_runtime_query_info(
1457
24.0k
        std::vector<std::weak_ptr<ResourceContext>>* _resource_ctx_list) {
1458
24.0k
    std::vector<std::shared_ptr<QueryContext>> contexts;
1459
24.0k
    _query_ctx_map.apply(
1460
3.07M
            [&](phmap::flat_hash_map<TUniqueId, std::weak_ptr<QueryContext>>& map) -> Status {
1461
3.19M
                for (auto iter = map.begin(); iter != map.end();) {
1462
122k
                    if (auto q_ctx = iter->second.lock()) {
1463
122k
                        _resource_ctx_list->push_back(q_ctx->resource_ctx());
1464
122k
                        contexts.push_back(q_ctx);
1465
122k
                        iter++;
1466
122k
                    } else {
1467
80
                        iter = map.erase(iter);
1468
80
                    }
1469
122k
                }
1470
3.07M
                return Status::OK();
1471
3.07M
            });
1472
24.0k
}
1473
1474
Status FragmentMgr::get_realtime_exec_status(const TUniqueId& query_id,
1475
0
                                             TReportExecStatusParams* exec_status) {
1476
0
    if (exec_status == nullptr) {
1477
0
        return Status::InvalidArgument("exes_status is nullptr");
1478
0
    }
1479
1480
0
    std::shared_ptr<QueryContext> query_context = get_query_ctx(query_id);
1481
0
    if (query_context == nullptr) {
1482
0
        return Status::NotFound("Query {} not found or released", print_id(query_id));
1483
0
    }
1484
1485
0
    *exec_status = query_context->get_realtime_exec_status();
1486
1487
0
    return Status::OK();
1488
0
}
1489
1490
1
Status FragmentMgr::get_query_statistics(const TUniqueId& query_id, TQueryStatistics* query_stats) {
1491
1
    if (query_stats == nullptr) {
1492
0
        return Status::InvalidArgument("query_stats is nullptr");
1493
0
    }
1494
1495
1
    return ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_query_statistics(
1496
1
            print_id(query_id), query_stats);
1497
1
}
1498
1499
Status FragmentMgr::transmit_rec_cte_block(
1500
        const TUniqueId& query_id, const TUniqueId& instance_id, int node_id,
1501
3.76k
        const google::protobuf::RepeatedPtrField<doris::PBlock>& pblocks, bool eos) {
1502
3.76k
    if (auto q_ctx = get_query_ctx(query_id)) {
1503
3.76k
        SCOPED_ATTACH_TASK(q_ctx.get());
1504
3.76k
        return q_ctx->send_block_to_cte_scan(instance_id, node_id, pblocks, eos);
1505
3.76k
    } else {
1506
0
        return Status::EndOfFile(
1507
0
                "Transmit rec cte block failed: Query context (query-id: {}) not found, maybe "
1508
0
                "finished",
1509
0
                print_id(query_id));
1510
0
    }
1511
3.76k
}
1512
1513
// Orchestrates the recursive CTE fragment lifecycle through 4 phases:
1514
//
1515
// wait_for_destroy: collect deregister RF IDs, store brpc closure, trigger old PFC close
1516
// rebuild: increment stage, deregister old RFs, create+prepare new PFC from saved params
1517
// submit: submit the new PFC's pipeline tasks for execution
1518
// final_close: async wait for close, send final report, clean up (last round only)
1519
//
1520
// The brpc ClosureGuard is stored in the PFC so the RPC response is deferred until
1521
// the PFC is fully destroyed. This gives the caller (RecCTESourceOperatorX) a
1522
// synchronization point to know when the old PFC has finished all its tasks.
1523
Status FragmentMgr::rerun_fragment(const std::shared_ptr<brpc::ClosureGuard>& guard,
1524
                                   const TUniqueId& query_id, int fragment_id,
1525
10.0k
                                   PRerunFragmentParams_Opcode stage) {
1526
10.0k
    if (stage == PRerunFragmentParams::WAIT_FOR_DESTROY ||
1527
10.0k
        stage == PRerunFragmentParams::FINAL_CLOSE) {
1528
3.48k
        auto fragment_ctx = _pipeline_map.find({query_id, fragment_id});
1529
3.48k
        if (!fragment_ctx) {
1530
9
            return Status::NotFound("Fragment context (query-id: {}, fragment-id: {}) not found",
1531
9
                                    print_id(query_id), fragment_id);
1532
9
        }
1533
1534
3.47k
        if (stage == PRerunFragmentParams::WAIT_FOR_DESTROY) {
1535
3.28k
            std::unique_lock<std::mutex> lk(_rerunnable_params_lock);
1536
3.28k
            auto it = _rerunnable_params_map.find({query_id, fragment_id});
1537
3.28k
            if (it == _rerunnable_params_map.end()) {
1538
0
                lk.unlock();
1539
0
                auto st = fragment_ctx->listen_wait_close(guard, true);
1540
0
                if (!st.ok()) {
1541
0
                    LOG(WARNING) << fmt::format(
1542
0
                            "wait_for_destroy fragment context (query-id: {}, fragment-id: "
1543
0
                            "{}) failed: {}",
1544
0
                            print_id(query_id), fragment_id, st.to_string());
1545
0
                }
1546
0
                return Status::NotFound(
1547
0
                        "Rerunnable params (query-id: {}, fragment-id: {}) not found",
1548
0
                        print_id(query_id), fragment_id);
1549
0
            }
1550
1551
3.28k
            it->second.deregister_runtime_filter_ids.merge(
1552
3.28k
                    fragment_ctx->get_deregister_runtime_filter());
1553
3.28k
        }
1554
1555
3.47k
        auto* query_ctx = fragment_ctx->get_query_ctx();
1556
3.47k
        SCOPED_ATTACH_TASK(query_ctx);
1557
3.47k
        RETURN_IF_ERROR(
1558
3.47k
                fragment_ctx->listen_wait_close(guard, stage == PRerunFragmentParams::FINAL_CLOSE));
1559
3.47k
        fragment_ctx->notify_close();
1560
3.47k
        return Status::OK();
1561
6.57k
    } else if (stage == PRerunFragmentParams::REBUILD) {
1562
3.28k
        auto q_ctx = get_query_ctx(query_id);
1563
3.28k
        if (!q_ctx) {
1564
0
            return Status::NotFound(
1565
0
                    "rerun_fragment: Query context (query-id: {}) not found, maybe finished",
1566
0
                    print_id(query_id));
1567
0
        }
1568
3.28k
        SCOPED_ATTACH_TASK(q_ctx.get());
1569
3.28k
        RerunableFragmentInfo info;
1570
3.28k
        {
1571
3.28k
            std::lock_guard<std::mutex> lk(_rerunnable_params_lock);
1572
3.28k
            auto it = _rerunnable_params_map.find({query_id, fragment_id});
1573
3.28k
            if (it == _rerunnable_params_map.end()) {
1574
0
                return Status::NotFound("rebuild (query-id: {}, fragment-id: {}) not found",
1575
0
                                        print_id(query_id), fragment_id);
1576
0
            }
1577
3.28k
            it->second.stage++;
1578
            // Deregister old runtime filters so new ones can be registered in the new PFC.
1579
3.28k
            for (int32_t filter_id : it->second.deregister_runtime_filter_ids) {
1580
1.59k
                q_ctx->runtime_filter_mgr()->remove_filter(filter_id);
1581
1.59k
            }
1582
3.28k
            info = it->second;
1583
3.28k
        }
1584
1585
0
        auto context = std::make_shared<PipelineFragmentContext>(
1586
3.28k
                q_ctx->query_id(), info.params, q_ctx, _exec_env, info.finish_callback,
1587
3.28k
                [this](const ReportStatusRequest& req, auto&& ctx) {
1588
36
                    return this->trigger_pipeline_context_report(req, std::move(ctx));
1589
36
                });
1590
        // Propagate the recursion stage so that runtime filters created by this PFC
1591
        // carry the correct stage number.
1592
3.28k
        context->set_rec_cte_stage(info.stage);
1593
1594
3.28k
        Status prepare_st = Status::OK();
1595
3.28k
        ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st = context->prepare(_thread_pool.get()),
1596
3.28k
                                         prepare_st);
1597
3.28k
        if (!prepare_st.ok()) {
1598
0
            q_ctx->cancel(prepare_st, info.params.fragment_id);
1599
0
            return prepare_st;
1600
0
        }
1601
1602
        // Insert new PFC into _pipeline_map (old one was removed)
1603
3.28k
        _pipeline_map.insert({info.params.query_id, info.params.fragment_id}, context);
1604
1605
        // Update QueryContext mapping (must support overwrite)
1606
3.28k
        q_ctx->set_pipeline_context(info.params.fragment_id, context);
1607
3.28k
        return Status::OK();
1608
1609
3.28k
    } else if (stage == PRerunFragmentParams::SUBMIT) {
1610
3.28k
        auto fragment_ctx = _pipeline_map.find({query_id, fragment_id});
1611
3.28k
        if (!fragment_ctx) {
1612
0
            return Status::NotFound("Fragment context (query-id: {}, fragment-id: {}) not found",
1613
0
                                    print_id(query_id), fragment_id);
1614
0
        }
1615
3.28k
        return fragment_ctx->submit();
1616
3.28k
    } else {
1617
0
        return Status::InvalidArgument("Unknown rerun fragment opcode: {}", stage);
1618
0
    }
1619
10.0k
}
1620
1621
Status FragmentMgr::reset_global_rf(const TUniqueId& query_id,
1622
1.82k
                                    const google::protobuf::RepeatedField<int32_t>& filter_ids) {
1623
1.82k
    if (auto q_ctx = get_query_ctx(query_id)) {
1624
1.82k
        SCOPED_ATTACH_TASK(q_ctx.get());
1625
1.82k
        return q_ctx->reset_global_rf(filter_ids);
1626
1.82k
    } else {
1627
0
        return Status::NotFound(
1628
0
                "reset_fragment: Query context (query-id: {}) not found, maybe finished",
1629
0
                print_id(query_id));
1630
0
    }
1631
1.82k
}
1632
1633
} // namespace doris