Coverage Report

Created: 2025-03-10 19:30

/root/doris/be/src/runtime/fragment_mgr.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/fragment_mgr.h"
19
20
#include <brpc/controller.h>
21
#include <bvar/latency_recorder.h>
22
#include <exprs/runtime_filter.h>
23
#include <fmt/format.h>
24
#include <gen_cpp/DorisExternalService_types.h>
25
#include <gen_cpp/FrontendService.h>
26
#include <gen_cpp/FrontendService_types.h>
27
#include <gen_cpp/HeartbeatService_types.h>
28
#include <gen_cpp/Metrics_types.h>
29
#include <gen_cpp/PaloInternalService_types.h>
30
#include <gen_cpp/PlanNodes_types.h>
31
#include <gen_cpp/Planner_types.h>
32
#include <gen_cpp/QueryPlanExtra_types.h>
33
#include <gen_cpp/Types_types.h>
34
#include <gen_cpp/internal_service.pb.h>
35
#include <gen_cpp/types.pb.h>
36
#include <pthread.h>
37
#include <stddef.h>
38
#include <sys/time.h>
39
#include <thrift/TApplicationException.h>
40
#include <thrift/Thrift.h>
41
#include <thrift/protocol/TDebugProtocol.h>
42
#include <thrift/transport/TTransportException.h>
43
#include <time.h>
44
45
#include <algorithm>
46
#include <atomic>
47
48
#include "common/status.h"
49
#include "pipeline/pipeline_x/pipeline_x_fragment_context.h"
50
// IWYU pragma: no_include <bits/chrono.h>
51
#include <chrono> // IWYU pragma: keep
52
#include <cstdint>
53
#include <map>
54
#include <memory>
55
#include <mutex>
56
#include <sstream>
57
#include <unordered_map>
58
#include <unordered_set>
59
#include <utility>
60
61
#include "common/config.h"
62
#include "common/logging.h"
63
#include "common/object_pool.h"
64
#include "common/utils.h"
65
#include "gutil/strings/substitute.h"
66
#include "io/fs/stream_load_pipe.h"
67
#include "pipeline/pipeline_fragment_context.h"
68
#include "runtime/client_cache.h"
69
#include "runtime/descriptors.h"
70
#include "runtime/exec_env.h"
71
#include "runtime/frontend_info.h"
72
#include "runtime/memory/mem_tracker_limiter.h"
73
#include "runtime/plan_fragment_executor.h"
74
#include "runtime/primitive_type.h"
75
#include "runtime/query_context.h"
76
#include "runtime/runtime_filter_mgr.h"
77
#include "runtime/runtime_query_statistics_mgr.h"
78
#include "runtime/runtime_state.h"
79
#include "runtime/stream_load/new_load_stream_mgr.h"
80
#include "runtime/stream_load/stream_load_context.h"
81
#include "runtime/stream_load/stream_load_executor.h"
82
#include "runtime/thread_context.h"
83
#include "runtime/types.h"
84
#include "runtime/workload_group/workload_group.h"
85
#include "runtime/workload_group/workload_group_manager.h"
86
#include "runtime/workload_management/workload_query_info.h"
87
#include "service/backend_options.h"
88
#include "util/brpc_client_cache.h"
89
#include "util/debug_points.h"
90
#include "util/debug_util.h"
91
#include "util/doris_metrics.h"
92
#include "util/hash_util.hpp"
93
#include "util/mem_info.h"
94
#include "util/network_util.h"
95
#include "util/pretty_printer.h"
96
#include "util/runtime_profile.h"
97
#include "util/thread.h"
98
#include "util/threadpool.h"
99
#include "util/thrift_util.h"
100
#include "util/uid_util.h"
101
#include "util/url_coding.h"
102
#include "vec/runtime/shared_hash_table_controller.h"
103
#include "vec/runtime/vdatetime_value.h"
104
105
namespace doris {
106
107
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_instance_count, MetricUnit::NOUNIT);
108
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count, MetricUnit::NOUNIT);
109
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_queue_size, MetricUnit::NOUNIT);
110
bvar::LatencyRecorder g_fragmentmgr_prepare_latency("doris_FragmentMgr", "prepare");
111
bvar::Adder<int64_t> g_pipeline_fragment_instances_count("doris_pipeline_fragment_instances_count");
112
113
0
std::string to_load_error_http_path(const std::string& file_name) {
114
0
    if (file_name.empty()) {
115
0
        return "";
116
0
    }
117
0
    std::stringstream url;
118
0
    url << "http://" << get_host_port(BackendOptions::get_localhost(), config::webserver_port)
119
0
        << "/api/_load_error_log?"
120
0
        << "file=" << file_name;
121
0
    return url.str();
122
0
}
123
124
using apache::thrift::TException;
125
using apache::thrift::transport::TTransportException;
126
127
static Status _do_fetch_running_queries_rpc(const FrontendInfo& fe_info,
128
0
                                            std::unordered_set<TUniqueId>& query_set) {
129
0
    TFetchRunningQueriesResult rpc_result;
130
0
    TFetchRunningQueriesRequest rpc_request;
131
132
0
    Status client_status;
133
0
    const int32 timeout_ms = 3 * 1000;
134
0
    FrontendServiceConnection rpc_client(ExecEnv::GetInstance()->frontend_client_cache(),
135
0
                                         fe_info.info.coordinator_address, timeout_ms,
136
0
                                         &client_status);
137
    // Abort this fe.
138
0
    if (!client_status.ok()) {
139
0
        LOG_WARNING("Failed to get client for {}, reason is {}",
140
0
                    PrintThriftNetworkAddress(fe_info.info.coordinator_address),
141
0
                    client_status.to_string());
142
0
        return Status::InternalError("Failed to get client for {}, reason is {}",
143
0
                                     PrintThriftNetworkAddress(fe_info.info.coordinator_address),
144
0
                                     client_status.to_string());
145
0
    }
146
147
    // do rpc
148
0
    try {
149
0
        try {
150
0
            rpc_client->fetchRunningQueries(rpc_result, rpc_request);
151
0
        } catch (const apache::thrift::transport::TTransportException& e) {
152
0
            LOG_WARNING("Transport exception reason: {}, reopening", e.what());
153
0
            client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms);
154
0
            if (!client_status.ok()) {
155
0
                LOG_WARNING("Reopen failed, reason: {}", client_status.to_string_no_stack());
156
0
                return Status::InternalError("Reopen failed, reason: {}",
157
0
                                             client_status.to_string_no_stack());
158
0
            }
159
160
0
            rpc_client->fetchRunningQueries(rpc_result, rpc_request);
161
0
        }
162
0
    } catch (apache::thrift::TException& e) {
163
        // During upgrading cluster or meet any other network error.
164
0
        LOG_WARNING("Failed to fetch running queries from {}, reason: {}",
165
0
                    PrintThriftNetworkAddress(fe_info.info.coordinator_address), e.what());
166
0
        return Status::InternalError("Failed to fetch running queries from {}, reason: {}",
167
0
                                     PrintThriftNetworkAddress(fe_info.info.coordinator_address),
168
0
                                     e.what());
169
0
    }
170
171
    // Avoid logic error in frontend.
172
0
    if (rpc_result.__isset.status == false || rpc_result.status.status_code != TStatusCode::OK) {
173
0
        LOG_WARNING("Failed to fetch running queries from {}, reason: {}",
174
0
                    PrintThriftNetworkAddress(fe_info.info.coordinator_address),
175
0
                    doris::to_string(rpc_result.status.status_code));
176
0
        return Status::InternalError("Failed to fetch running queries from {}, reason: {}",
177
0
                                     PrintThriftNetworkAddress(fe_info.info.coordinator_address),
178
0
                                     doris::to_string(rpc_result.status.status_code));
179
0
    }
180
181
0
    if (rpc_result.__isset.running_queries == false) {
182
0
        return Status::InternalError("Failed to fetch running queries from {}, reason: {}",
183
0
                                     PrintThriftNetworkAddress(fe_info.info.coordinator_address),
184
0
                                     "running_queries is not set");
185
0
    }
186
187
0
    query_set = std::unordered_set<TUniqueId>(rpc_result.running_queries.begin(),
188
0
                                              rpc_result.running_queries.end());
189
190
0
    return Status::OK();
191
0
};
192
193
0
static std::map<int64_t, std::unordered_set<TUniqueId>> _get_all_running_queries_from_fe() {
194
0
    const std::map<TNetworkAddress, FrontendInfo>& running_fes =
195
0
            ExecEnv::GetInstance()->get_running_frontends();
196
197
0
    std::map<int64_t, std::unordered_set<TUniqueId>> result;
198
0
    std::vector<FrontendInfo> qualified_fes;
199
200
0
    for (const auto& fe : running_fes) {
201
        // Only consider normal frontend.
202
0
        if (fe.first.port != 0 && fe.second.info.process_uuid != 0) {
203
0
            qualified_fes.push_back(fe.second);
204
0
        } else {
205
0
            return {};
206
0
        }
207
0
    }
208
209
0
    for (const auto& fe_addr : qualified_fes) {
210
0
        const int64_t process_uuid = fe_addr.info.process_uuid;
211
0
        std::unordered_set<TUniqueId> query_set;
212
0
        Status st = _do_fetch_running_queries_rpc(fe_addr, query_set);
213
0
        if (!st.ok()) {
214
            // Empty result, cancel worker will not do anything
215
0
            return {};
216
0
        }
217
218
        // frontend_info and process_uuid has been checked in rpc threads.
219
0
        result[process_uuid] = query_set;
220
0
    }
221
222
0
    return result;
223
0
}
224
225
2
inline uint32_t get_map_id(const TUniqueId& query_id, size_t capacity) {
226
2
    uint32_t value = HashUtil::hash(&query_id.lo, 8, 0);
227
2
    value = HashUtil::hash(&query_id.hi, 8, value);
228
2
    return value % capacity;
229
2
}
230
231
0
inline uint32_t get_map_id(std::pair<TUniqueId, int> key, size_t capacity) {
232
0
    uint32_t value = HashUtil::hash(&key.first.lo, 8, 0);
233
0
    value = HashUtil::hash(&key.first.hi, 8, value);
234
0
    return value % capacity;
235
0
}
236
237
template <typename Key, typename Value, typename ValueType>
238
12
ConcurrentContextMap<Key, Value, ValueType>::ConcurrentContextMap() {
239
12
    _internal_map.resize(config::num_query_ctx_map_partitions);
240
1.54k
    for (size_t i = 0; i < config::num_query_ctx_map_partitions; i++) {
241
1.53k
        _internal_map[i] = {std::make_unique<std::shared_mutex>(),
242
1.53k
                            phmap::flat_hash_map<Key, Value>()};
243
1.53k
    }
244
12
}
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_20PlanFragmentExecutorEES3_EC2Ev
Line
Count
Source
238
4
ConcurrentContextMap<Key, Value, ValueType>::ConcurrentContextMap() {
239
4
    _internal_map.resize(config::num_query_ctx_map_partitions);
240
516
    for (size_t i = 0; i < config::num_query_ctx_map_partitions; i++) {
241
512
        _internal_map[i] = {std::make_unique<std::shared_mutex>(),
242
512
                            phmap::flat_hash_map<Key, Value>()};
243
512
    }
244
4
}
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_8pipeline23PipelineFragmentContextEES4_EC2Ev
Line
Count
Source
238
4
ConcurrentContextMap<Key, Value, ValueType>::ConcurrentContextMap() {
239
4
    _internal_map.resize(config::num_query_ctx_map_partitions);
240
516
    for (size_t i = 0; i < config::num_query_ctx_map_partitions; i++) {
241
512
        _internal_map[i] = {std::make_unique<std::shared_mutex>(),
242
512
                            phmap::flat_hash_map<Key, Value>()};
243
512
    }
244
4
}
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_12QueryContextEES3_EC2Ev
Line
Count
Source
238
4
ConcurrentContextMap<Key, Value, ValueType>::ConcurrentContextMap() {
239
4
    _internal_map.resize(config::num_query_ctx_map_partitions);
240
516
    for (size_t i = 0; i < config::num_query_ctx_map_partitions; i++) {
241
512
        _internal_map[i] = {std::make_unique<std::shared_mutex>(),
242
512
                            phmap::flat_hash_map<Key, Value>()};
243
512
    }
244
4
}
245
246
template <typename Key, typename Value, typename ValueType>
247
2
Value ConcurrentContextMap<Key, Value, ValueType>::find(const Key& query_id) {
248
2
    auto id = get_map_id(query_id, _internal_map.size());
249
2
    {
250
2
        std::shared_lock lock(*_internal_map[id].first);
251
2
        auto& map = _internal_map[id].second;
252
2
        auto search = map.find(query_id);
253
2
        if (search != map.end()) {
254
0
            return search->second;
255
0
        }
256
2
        return std::shared_ptr<ValueType>(nullptr);
257
2
    }
258
2
}
Unexecuted instantiation: _ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_12QueryContextEES3_E4findERKS1_
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_20PlanFragmentExecutorEES3_E4findERKS1_
Line
Count
Source
247
1
Value ConcurrentContextMap<Key, Value, ValueType>::find(const Key& query_id) {
248
1
    auto id = get_map_id(query_id, _internal_map.size());
249
1
    {
250
1
        std::shared_lock lock(*_internal_map[id].first);
251
1
        auto& map = _internal_map[id].second;
252
1
        auto search = map.find(query_id);
253
1
        if (search != map.end()) {
254
0
            return search->second;
255
0
        }
256
1
        return std::shared_ptr<ValueType>(nullptr);
257
1
    }
258
1
}
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_8pipeline23PipelineFragmentContextEES4_E4findERKS1_
Line
Count
Source
247
1
Value ConcurrentContextMap<Key, Value, ValueType>::find(const Key& query_id) {
248
1
    auto id = get_map_id(query_id, _internal_map.size());
249
1
    {
250
1
        std::shared_lock lock(*_internal_map[id].first);
251
1
        auto& map = _internal_map[id].second;
252
1
        auto search = map.find(query_id);
253
1
        if (search != map.end()) {
254
0
            return search->second;
255
0
        }
256
1
        return std::shared_ptr<ValueType>(nullptr);
257
1
    }
258
1
}
259
260
template <typename Key, typename Value, typename ValueType>
261
Status ConcurrentContextMap<Key, Value, ValueType>::apply_if_not_exists(
262
0
        const Key& query_id, std::shared_ptr<ValueType>& query_ctx, ApplyFunction&& function) {
263
0
    auto id = get_map_id(query_id, _internal_map.size());
264
0
    {
265
0
        std::unique_lock lock(*_internal_map[id].first);
266
0
        auto& map = _internal_map[id].second;
267
0
        auto search = map.find(query_id);
268
0
        if (search != map.end()) {
269
0
            query_ctx = search->second;
270
0
        }
271
0
        if (!query_ctx) {
272
0
            return function(map);
273
0
        }
274
0
        return Status::OK();
275
0
    }
276
0
}
277
278
template <typename Key, typename Value, typename ValueType>
279
0
void ConcurrentContextMap<Key, Value, ValueType>::erase(const Key& query_id) {
280
0
    auto id = get_map_id(query_id, _internal_map.size());
281
0
    {
282
0
        std::unique_lock lock(*_internal_map[id].first);
283
0
        auto& map = _internal_map[id].second;
284
0
        map.erase(query_id);
285
0
    }
286
0
}
Unexecuted instantiation: _ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_20PlanFragmentExecutorEES3_E5eraseERKS1_
Unexecuted instantiation: _ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_12QueryContextEES3_E5eraseERKS1_
Unexecuted instantiation: _ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_8pipeline23PipelineFragmentContextEES4_E5eraseERKS1_
287
288
template <typename Key, typename Value, typename ValueType>
289
void ConcurrentContextMap<Key, Value, ValueType>::insert(const Key& query_id,
290
0
                                                         std::shared_ptr<ValueType> query_ctx) {
291
0
    auto id = get_map_id(query_id, _internal_map.size());
292
0
    {
293
0
        std::unique_lock lock(*_internal_map[id].first);
294
0
        auto& map = _internal_map[id].second;
295
0
        map.insert({query_id, query_ctx});
296
0
    }
297
0
}
Unexecuted instantiation: _ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_20PlanFragmentExecutorEES3_E6insertERKS1_S4_
Unexecuted instantiation: _ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_8pipeline23PipelineFragmentContextEES4_E6insertERKS1_S5_
298
299
template <typename Key, typename Value, typename ValueType>
300
12
void ConcurrentContextMap<Key, Value, ValueType>::clear() {
301
1.53k
    for (auto& pair : _internal_map) {
302
1.53k
        std::unique_lock lock(*pair.first);
303
1.53k
        auto& map = pair.second;
304
1.53k
        map.clear();
305
1.53k
    }
306
12
}
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_20PlanFragmentExecutorEES3_E5clearEv
Line
Count
Source
300
4
void ConcurrentContextMap<Key, Value, ValueType>::clear() {
301
512
    for (auto& pair : _internal_map) {
302
512
        std::unique_lock lock(*pair.first);
303
512
        auto& map = pair.second;
304
512
        map.clear();
305
512
    }
306
4
}
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_8pipeline23PipelineFragmentContextEES4_E5clearEv
Line
Count
Source
300
4
void ConcurrentContextMap<Key, Value, ValueType>::clear() {
301
512
    for (auto& pair : _internal_map) {
302
512
        std::unique_lock lock(*pair.first);
303
512
        auto& map = pair.second;
304
512
        map.clear();
305
512
    }
306
4
}
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_12QueryContextEES3_E5clearEv
Line
Count
Source
300
4
void ConcurrentContextMap<Key, Value, ValueType>::clear() {
301
512
    for (auto& pair : _internal_map) {
302
512
        std::unique_lock lock(*pair.first);
303
512
        auto& map = pair.second;
304
512
        map.clear();
305
512
    }
306
4
}
307
308
FragmentMgr::FragmentMgr(ExecEnv* exec_env)
309
4
        : _exec_env(exec_env), _stop_background_threads_latch(1) {
310
4
    _entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr");
311
4
    INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count);
312
4
    REGISTER_HOOK_METRIC(fragment_instance_count,
313
4
                         [this]() { return _fragment_instance_map.num_items(); });
314
315
4
    auto s = Thread::create(
316
4
            "FragmentMgr", "cancel_timeout_plan_fragment", [this]() { this->cancel_worker(); },
317
4
            &_cancel_thread);
318
4
    CHECK(s.ok()) << s.to_string();
319
320
    // TODO(zc): we need a better thread-pool
321
    // now one user can use all the thread pool, others have no resource.
322
4
    s = ThreadPoolBuilder("FragmentMgrThreadPool")
323
4
                .set_min_threads(config::fragment_pool_thread_num_min)
324
4
                .set_max_threads(config::fragment_pool_thread_num_max)
325
4
                .set_max_queue_size(config::fragment_pool_queue_size)
326
4
                .build(&_thread_pool);
327
328
4
    REGISTER_HOOK_METRIC(fragment_thread_pool_queue_size,
329
4
                         [this]() { return _thread_pool->get_queue_size(); });
330
4
    CHECK(s.ok()) << s.to_string();
331
332
4
    s = ThreadPoolBuilder("FragmentInstanceReportThreadPool")
333
4
                .set_min_threads(48)
334
4
                .set_max_threads(512)
335
4
                .set_max_queue_size(102400)
336
4
                .build(&_async_report_thread_pool);
337
4
    CHECK(s.ok()) << s.to_string();
338
4
}
339
340
4
FragmentMgr::~FragmentMgr() = default;
341
342
4
void FragmentMgr::stop() {
343
4
    DEREGISTER_HOOK_METRIC(fragment_instance_count);
344
4
    DEREGISTER_HOOK_METRIC(fragment_thread_pool_queue_size);
345
4
    _stop_background_threads_latch.count_down();
346
4
    if (_cancel_thread) {
347
4
        _cancel_thread->join();
348
4
    }
349
    // Stop all the worker, should wait for a while?
350
    // _thread_pool->wait_for();
351
4
    _thread_pool->shutdown();
352
353
    // Only me can delete
354
4
    _fragment_instance_map.clear();
355
4
    _pipeline_map.apply(
356
4
            [&](phmap::flat_hash_map<TUniqueId, std::shared_ptr<pipeline::PipelineFragmentContext>>&
357
512
                        map) -> Status {
358
512
                for (auto& pipeline : map) {
359
0
                    pipeline.second->close_sink();
360
0
                }
361
512
                return Status::OK();
362
512
            });
363
4
    _pipeline_map.clear();
364
4
    _query_ctx_map.clear();
365
4
    _async_report_thread_pool->shutdown();
366
4
}
367
368
0
std::string FragmentMgr::to_http_path(const std::string& file_name) {
369
0
    std::stringstream url;
370
0
    url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
371
0
        << "/api/_download_load?"
372
0
        << "token=" << _exec_env->token() << "&file=" << file_name;
373
0
    return url.str();
374
0
}
375
376
Status FragmentMgr::trigger_pipeline_context_report(
377
0
        const ReportStatusRequest req, std::shared_ptr<pipeline::PipelineFragmentContext>&& ctx) {
378
0
    return _async_report_thread_pool->submit_func([this, req, ctx]() {
379
0
        SCOPED_ATTACH_TASK(ctx->get_query_ctx()->query_mem_tracker);
380
0
        coordinator_callback(req);
381
0
        if (!req.done) {
382
0
            ctx->refresh_next_report_time();
383
0
        }
384
0
    });
385
0
}
386
387
// There can only be one of these callbacks in-flight at any moment, because
388
// it is only invoked from the executor's reporting thread.
389
// Also, the reported status will always reflect the most recent execution status,
390
// including the final status when execution finishes.
391
0
void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
392
0
    DCHECK(req.status.ok() || req.done); // if !status.ok() => done
393
0
    Status exec_status = req.update_fn(req.status);
394
0
    Status coord_status;
395
0
    FrontendServiceConnection coord(_exec_env->frontend_client_cache(), req.coord_addr,
396
0
                                    &coord_status);
397
0
    if (!coord_status.ok()) {
398
0
        std::stringstream ss;
399
0
        UniqueId uid(req.query_id.hi, req.query_id.lo);
400
0
        static_cast<void>(req.update_fn(Status::InternalError(
401
0
                "query_id: {}, couldn't get a client for {}, reason is {}", uid.to_string(),
402
0
                PrintThriftNetworkAddress(req.coord_addr), coord_status.to_string())));
403
0
        return;
404
0
    }
405
406
0
    TReportExecStatusParams params;
407
0
    params.protocol_version = FrontendServiceVersion::V1;
408
0
    params.__set_query_id(req.query_id);
409
0
    params.__set_backend_num(req.backend_num);
410
0
    params.__set_fragment_instance_id(req.fragment_instance_id);
411
0
    params.__set_fragment_id(req.fragment_id);
412
0
    params.__set_status(exec_status.to_thrift());
413
0
    params.__set_done(req.done);
414
0
    params.__set_query_type(req.runtime_state->query_type());
415
416
0
    DCHECK(req.runtime_state != nullptr);
417
418
0
    if (req.runtime_state->query_type() == TQueryType::LOAD && !req.done && req.status.ok()) {
419
        // this is a load plan, and load is not finished, just make a brief report
420
0
        params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
421
0
        params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
422
0
    } else {
423
0
        if (req.runtime_state->query_type() == TQueryType::LOAD) {
424
0
            params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
425
0
            params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
426
0
        }
427
0
        if (req.is_pipeline_x) {
428
0
            params.__isset.detailed_report = true;
429
0
            DCHECK(!req.runtime_states.empty());
430
0
            const bool enable_profile = (*req.runtime_states.begin())->enable_profile();
431
0
            if (enable_profile) {
432
0
                params.__isset.profile = true;
433
0
                params.__isset.loadChannelProfile = false;
434
0
                for (auto* rs : req.runtime_states) {
435
0
                    DCHECK(req.load_channel_profile);
436
0
                    TDetailedReportParams detailed_param;
437
0
                    rs->load_channel_profile()->to_thrift(&detailed_param.loadChannelProfile);
438
                    // merge all runtime_states.loadChannelProfile to req.load_channel_profile
439
0
                    req.load_channel_profile->update(detailed_param.loadChannelProfile);
440
0
                }
441
0
                req.load_channel_profile->to_thrift(&params.loadChannelProfile);
442
0
            } else {
443
0
                params.__isset.profile = false;
444
0
            }
445
446
0
            if (enable_profile) {
447
0
                DCHECK(req.profile != nullptr);
448
0
                TDetailedReportParams detailed_param;
449
0
                detailed_param.__isset.fragment_instance_id = false;
450
0
                detailed_param.__isset.profile = true;
451
0
                detailed_param.__isset.loadChannelProfile = false;
452
0
                detailed_param.__set_is_fragment_level(true);
453
0
                req.profile->to_thrift(&detailed_param.profile);
454
0
                params.detailed_report.push_back(detailed_param);
455
0
                for (auto& pipeline_profile : req.runtime_state->pipeline_id_to_profile()) {
456
0
                    TDetailedReportParams detailed_param;
457
0
                    detailed_param.__isset.fragment_instance_id = false;
458
0
                    detailed_param.__isset.profile = true;
459
0
                    detailed_param.__isset.loadChannelProfile = false;
460
0
                    pipeline_profile->to_thrift(&detailed_param.profile);
461
0
                    params.detailed_report.push_back(detailed_param);
462
0
                }
463
0
            }
464
0
        } else {
465
0
            if (req.profile != nullptr) {
466
0
                req.profile->to_thrift(&params.profile);
467
0
                if (req.load_channel_profile) {
468
0
                    req.load_channel_profile->to_thrift(&params.loadChannelProfile);
469
0
                }
470
0
                params.__isset.profile = true;
471
0
                params.__isset.loadChannelProfile = true;
472
0
            } else {
473
0
                params.__isset.profile = false;
474
0
            }
475
0
        }
476
477
0
        if (!req.runtime_state->output_files().empty()) {
478
0
            params.__isset.delta_urls = true;
479
0
            for (auto& it : req.runtime_state->output_files()) {
480
0
                params.delta_urls.push_back(to_http_path(it));
481
0
            }
482
0
        } else if (!req.runtime_states.empty()) {
483
0
            for (auto* rs : req.runtime_states) {
484
0
                for (auto& it : rs->output_files()) {
485
0
                    params.delta_urls.push_back(to_http_path(it));
486
0
                }
487
0
            }
488
0
            if (!params.delta_urls.empty()) {
489
0
                params.__isset.delta_urls = true;
490
0
            }
491
0
        }
492
493
        // load rows
494
0
        static std::string s_dpp_normal_all = "dpp.norm.ALL";
495
0
        static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
496
0
        static std::string s_unselected_rows = "unselected.rows";
497
0
        int64_t num_rows_load_success = 0;
498
0
        int64_t num_rows_load_filtered = 0;
499
0
        int64_t num_rows_load_unselected = 0;
500
0
        if (req.runtime_state->num_rows_load_total() > 0 ||
501
0
            req.runtime_state->num_rows_load_filtered() > 0 ||
502
0
            req.runtime_state->num_finished_range() > 0) {
503
0
            params.__isset.load_counters = true;
504
505
0
            num_rows_load_success = req.runtime_state->num_rows_load_success();
506
0
            num_rows_load_filtered = req.runtime_state->num_rows_load_filtered();
507
0
            num_rows_load_unselected = req.runtime_state->num_rows_load_unselected();
508
0
            params.__isset.fragment_instance_reports = true;
509
0
            TFragmentInstanceReport t;
510
0
            t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
511
0
            t.__set_num_finished_range(req.runtime_state->num_finished_range());
512
0
            t.__set_loaded_rows(req.runtime_state->num_rows_load_total());
513
0
            t.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
514
0
            params.fragment_instance_reports.push_back(t);
515
0
        } else if (!req.runtime_states.empty()) {
516
0
            for (auto* rs : req.runtime_states) {
517
0
                if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 ||
518
0
                    req.runtime_state->num_finished_range() > 0) {
519
0
                    params.__isset.load_counters = true;
520
0
                    num_rows_load_success += rs->num_rows_load_success();
521
0
                    num_rows_load_filtered += rs->num_rows_load_filtered();
522
0
                    num_rows_load_unselected += rs->num_rows_load_unselected();
523
0
                    params.__isset.fragment_instance_reports = true;
524
0
                    TFragmentInstanceReport t;
525
0
                    t.__set_fragment_instance_id(rs->fragment_instance_id());
526
0
                    t.__set_num_finished_range(rs->num_finished_range());
527
0
                    t.__set_loaded_rows(rs->num_rows_load_total());
528
0
                    t.__set_loaded_bytes(rs->num_bytes_load_total());
529
0
                    params.fragment_instance_reports.push_back(t);
530
0
                }
531
0
            }
532
0
        }
533
0
        params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success));
534
0
        params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered));
535
0
        params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected));
536
537
0
        if (!req.runtime_state->get_error_log_file_path().empty()) {
538
0
            params.__set_tracking_url(
539
0
                    to_load_error_http_path(req.runtime_state->get_error_log_file_path()));
540
0
        } else if (!req.runtime_states.empty()) {
541
0
            for (auto* rs : req.runtime_states) {
542
0
                if (!rs->get_error_log_file_path().empty()) {
543
0
                    params.__set_tracking_url(
544
0
                            to_load_error_http_path(rs->get_error_log_file_path()));
545
0
                }
546
0
            }
547
0
        }
548
0
        if (!req.runtime_state->export_output_files().empty()) {
549
0
            params.__isset.export_files = true;
550
0
            params.export_files = req.runtime_state->export_output_files();
551
0
        } else if (!req.runtime_states.empty()) {
552
0
            for (auto* rs : req.runtime_states) {
553
0
                if (!rs->export_output_files().empty()) {
554
0
                    params.__isset.export_files = true;
555
0
                    params.export_files.insert(params.export_files.end(),
556
0
                                               rs->export_output_files().begin(),
557
0
                                               rs->export_output_files().end());
558
0
                }
559
0
            }
560
0
        }
561
0
        if (!req.runtime_state->tablet_commit_infos().empty()) {
562
0
            params.__isset.commitInfos = true;
563
0
            params.commitInfos.reserve(req.runtime_state->tablet_commit_infos().size());
564
0
            for (auto& info : req.runtime_state->tablet_commit_infos()) {
565
0
                params.commitInfos.push_back(info);
566
0
            }
567
0
        } else if (!req.runtime_states.empty()) {
568
0
            for (auto* rs : req.runtime_states) {
569
0
                if (!rs->tablet_commit_infos().empty()) {
570
0
                    params.__isset.commitInfos = true;
571
0
                    params.commitInfos.insert(params.commitInfos.end(),
572
0
                                              rs->tablet_commit_infos().begin(),
573
0
                                              rs->tablet_commit_infos().end());
574
0
                }
575
0
            }
576
0
        }
577
0
        if (!req.runtime_state->error_tablet_infos().empty()) {
578
0
            params.__isset.errorTabletInfos = true;
579
0
            params.errorTabletInfos.reserve(req.runtime_state->error_tablet_infos().size());
580
0
            for (auto& info : req.runtime_state->error_tablet_infos()) {
581
0
                params.errorTabletInfos.push_back(info);
582
0
            }
583
0
        } else if (!req.runtime_states.empty()) {
584
0
            for (auto* rs : req.runtime_states) {
585
0
                if (!rs->error_tablet_infos().empty()) {
586
0
                    params.__isset.errorTabletInfos = true;
587
0
                    params.errorTabletInfos.insert(params.errorTabletInfos.end(),
588
0
                                                   rs->error_tablet_infos().begin(),
589
0
                                                   rs->error_tablet_infos().end());
590
0
                }
591
0
            }
592
0
        }
593
594
0
        if (!req.runtime_state->hive_partition_updates().empty()) {
595
0
            params.__isset.hive_partition_updates = true;
596
0
            params.hive_partition_updates.reserve(
597
0
                    req.runtime_state->hive_partition_updates().size());
598
0
            for (auto& hive_partition_update : req.runtime_state->hive_partition_updates()) {
599
0
                params.hive_partition_updates.push_back(hive_partition_update);
600
0
            }
601
0
        } else if (!req.runtime_states.empty()) {
602
0
            for (auto* rs : req.runtime_states) {
603
0
                if (!rs->hive_partition_updates().empty()) {
604
0
                    params.__isset.hive_partition_updates = true;
605
0
                    params.hive_partition_updates.insert(params.hive_partition_updates.end(),
606
0
                                                         rs->hive_partition_updates().begin(),
607
0
                                                         rs->hive_partition_updates().end());
608
0
                }
609
0
            }
610
0
        }
611
612
0
        if (!req.runtime_state->iceberg_commit_datas().empty()) {
613
0
            params.__isset.iceberg_commit_datas = true;
614
0
            params.iceberg_commit_datas.reserve(req.runtime_state->iceberg_commit_datas().size());
615
0
            for (auto& iceberg_commit_data : req.runtime_state->iceberg_commit_datas()) {
616
0
                params.iceberg_commit_datas.push_back(iceberg_commit_data);
617
0
            }
618
0
        } else if (!req.runtime_states.empty()) {
619
0
            for (auto* rs : req.runtime_states) {
620
0
                if (!rs->iceberg_commit_datas().empty()) {
621
0
                    params.__isset.iceberg_commit_datas = true;
622
0
                    params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
623
0
                                                       rs->iceberg_commit_datas().begin(),
624
0
                                                       rs->iceberg_commit_datas().end());
625
0
                }
626
0
            }
627
0
        }
628
629
        // Send new errors to coordinator
630
0
        req.runtime_state->get_unreported_errors(&(params.error_log));
631
0
        params.__isset.error_log = (params.error_log.size() > 0);
632
0
    }
633
634
0
    if (_exec_env->master_info()->__isset.backend_id) {
635
0
        params.__set_backend_id(_exec_env->master_info()->backend_id);
636
0
    }
637
638
0
    TReportExecStatusResult res;
639
0
    Status rpc_status;
640
641
0
    VLOG_DEBUG << "reportExecStatus params is "
642
0
               << apache::thrift::ThriftDebugString(params).c_str();
643
0
    if (!exec_status.ok()) {
644
0
        LOG(WARNING) << "report error status: " << exec_status.msg()
645
0
                     << " to coordinator: " << req.coord_addr
646
0
                     << ", query id: " << print_id(req.query_id)
647
0
                     << ", instance id: " << print_id(req.fragment_instance_id);
648
0
    }
649
0
    try {
650
0
        try {
651
0
            coord->reportExecStatus(res, params);
652
0
        } catch ([[maybe_unused]] TTransportException& e) {
653
#ifndef ADDRESS_SANITIZER
654
            LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(req.query_id)
655
                         << ", instance id: " << print_id(req.fragment_instance_id) << " to "
656
                         << req.coord_addr << ", err: " << e.what();
657
#endif
658
0
            rpc_status = coord.reopen();
659
660
0
            if (!rpc_status.ok()) {
661
                // we need to cancel the execution of this fragment
662
0
                static_cast<void>(req.update_fn(rpc_status));
663
0
                req.cancel_fn(PPlanFragmentCancelReason::INTERNAL_ERROR, "report rpc fail");
664
0
                return;
665
0
            }
666
0
            coord->reportExecStatus(res, params);
667
0
        }
668
669
0
        rpc_status = Status::create<false>(res.status);
670
0
    } catch (TException& e) {
671
0
        rpc_status = Status::InternalError("ReportExecStatus() to {} failed: {}",
672
0
                                           PrintThriftNetworkAddress(req.coord_addr), e.what());
673
0
    }
674
675
0
    if (!rpc_status.ok()) {
676
0
        LOG_INFO("Going to cancel instance {} since report exec status got rpc failed: {}",
677
0
                 print_id(req.fragment_instance_id), rpc_status.to_string());
678
        // we need to cancel the execution of this fragment
679
0
        static_cast<void>(req.update_fn(rpc_status));
680
0
        req.cancel_fn(PPlanFragmentCancelReason::INTERNAL_ERROR, rpc_status.msg());
681
0
    }
682
0
}
683
684
0
static void empty_function(RuntimeState*, Status*) {}
685
686
void FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_executor,
687
0
                               const FinishCallback& cb) {
688
0
    VLOG_DEBUG << fmt::format("Instance {}|{} executing", print_id(fragment_executor->query_id()),
689
0
                              print_id(fragment_executor->fragment_instance_id()));
690
691
0
    Status st = fragment_executor->execute();
692
0
    if (!st.ok()) {
693
0
        fragment_executor->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
694
0
                                  "fragment_executor execute failed");
695
0
    }
696
697
0
    std::shared_ptr<QueryContext> query_ctx = fragment_executor->get_query_ctx();
698
0
    bool all_done = false;
699
0
    if (query_ctx != nullptr) {
700
        // decrease the number of unfinished fragments
701
0
        all_done = query_ctx->countdown(1);
702
0
    }
703
704
    // remove exec state after this fragment finished
705
0
    {
706
0
        _fragment_instance_map.erase(fragment_executor->fragment_instance_id());
707
0
        VLOG(10) << fmt::format("Instance {} finished, all_done: {}",
708
0
                                print_id(fragment_executor->fragment_instance_id()), all_done);
709
0
    }
710
0
    if (all_done && query_ctx) {
711
0
        _query_ctx_map.erase(query_ctx->query_id());
712
0
    }
713
714
    // Callback after remove from this id
715
0
    auto status = fragment_executor->status();
716
0
    cb(fragment_executor->runtime_state(), &status);
717
0
}
718
719
Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
720
0
                                       const QuerySource query_source) {
721
0
    if (params.txn_conf.need_txn) {
722
0
        std::shared_ptr<StreamLoadContext> stream_load_ctx =
723
0
                std::make_shared<StreamLoadContext>(_exec_env);
724
0
        stream_load_ctx->db = params.txn_conf.db;
725
0
        stream_load_ctx->db_id = params.txn_conf.db_id;
726
0
        stream_load_ctx->table = params.txn_conf.tbl;
727
0
        stream_load_ctx->txn_id = params.txn_conf.txn_id;
728
0
        stream_load_ctx->id = UniqueId(params.params.query_id);
729
0
        stream_load_ctx->put_result.params = params;
730
0
        stream_load_ctx->put_result.__isset.params = true;
731
0
        stream_load_ctx->use_streaming = true;
732
0
        stream_load_ctx->load_type = TLoadType::MANUL_LOAD;
733
0
        stream_load_ctx->load_src_type = TLoadSourceType::RAW;
734
0
        stream_load_ctx->label = params.import_label;
735
0
        stream_load_ctx->format = TFileFormatType::FORMAT_CSV_PLAIN;
736
0
        stream_load_ctx->timeout_second = 3600;
737
0
        stream_load_ctx->auth.token = params.txn_conf.token;
738
0
        stream_load_ctx->need_commit_self = true;
739
0
        stream_load_ctx->need_rollback = true;
740
0
        auto pipe = std::make_shared<io::StreamLoadPipe>(
741
0
                io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
742
0
                -1 /* total_length */, true /* use_proto */);
743
0
        stream_load_ctx->body_sink = pipe;
744
0
        stream_load_ctx->pipe = pipe;
745
0
        stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio;
746
747
0
        RETURN_IF_ERROR(
748
0
                _exec_env->new_load_stream_mgr()->put(stream_load_ctx->id, stream_load_ctx));
749
750
0
        RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx));
751
0
        return Status::OK();
752
0
    } else {
753
0
        return exec_plan_fragment(params, query_source, empty_function);
754
0
    }
755
0
}
756
757
Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
758
0
                                       const QuerySource query_source) {
759
0
    if (params.txn_conf.need_txn) {
760
0
        std::shared_ptr<StreamLoadContext> stream_load_ctx =
761
0
                std::make_shared<StreamLoadContext>(_exec_env);
762
0
        stream_load_ctx->db = params.txn_conf.db;
763
0
        stream_load_ctx->db_id = params.txn_conf.db_id;
764
0
        stream_load_ctx->table = params.txn_conf.tbl;
765
0
        stream_load_ctx->txn_id = params.txn_conf.txn_id;
766
0
        stream_load_ctx->id = UniqueId(params.query_id);
767
0
        stream_load_ctx->put_result.pipeline_params = params;
768
0
        stream_load_ctx->use_streaming = true;
769
0
        stream_load_ctx->load_type = TLoadType::MANUL_LOAD;
770
0
        stream_load_ctx->load_src_type = TLoadSourceType::RAW;
771
0
        stream_load_ctx->label = params.import_label;
772
0
        stream_load_ctx->format = TFileFormatType::FORMAT_CSV_PLAIN;
773
0
        stream_load_ctx->timeout_second = 3600;
774
0
        stream_load_ctx->auth.token = params.txn_conf.token;
775
0
        stream_load_ctx->need_commit_self = true;
776
0
        stream_load_ctx->need_rollback = true;
777
0
        auto pipe = std::make_shared<io::StreamLoadPipe>(
778
0
                io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
779
0
                -1 /* total_length */, true /* use_proto */);
780
0
        stream_load_ctx->body_sink = pipe;
781
0
        stream_load_ctx->pipe = pipe;
782
0
        stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio;
783
784
0
        RETURN_IF_ERROR(
785
0
                _exec_env->new_load_stream_mgr()->put(stream_load_ctx->id, stream_load_ctx));
786
787
0
        RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx));
788
0
        return Status::OK();
789
0
    } else {
790
0
        return exec_plan_fragment(params, query_source, empty_function);
791
0
    }
792
0
}
793
794
0
Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* request) {
795
0
    TUniqueId query_id;
796
0
    query_id.__set_hi(request->query_id().hi());
797
0
    query_id.__set_lo(request->query_id().lo());
798
0
    std::shared_ptr<QueryContext> q_ctx = nullptr;
799
0
    {
800
0
        TUniqueId query_id;
801
0
        query_id.__set_hi(request->query_id().hi());
802
0
        query_id.__set_lo(request->query_id().lo());
803
0
        q_ctx = _query_ctx_map.find(query_id);
804
0
        if (q_ctx == nullptr) {
805
0
            return Status::InternalError(
806
0
                    "Failed to get query fragments context. Query may be "
807
0
                    "timeout or be cancelled. host: {}",
808
0
                    BackendOptions::get_localhost());
809
0
        }
810
0
    }
811
0
    q_ctx->set_ready_to_execute(false);
812
0
    LOG(INFO) << fmt::format("Query {} start execution", print_id(query_id));
813
0
    return Status::OK();
814
0
}
815
816
void FragmentMgr::remove_pipeline_context(
817
0
        std::shared_ptr<pipeline::PipelineFragmentContext> f_context) {
818
0
    auto* q_context = f_context->get_query_ctx();
819
0
    bool all_done = false;
820
0
    TUniqueId query_id = f_context->get_query_id();
821
0
    {
822
0
        std::vector<TUniqueId> ins_ids;
823
0
        f_context->instance_ids(ins_ids);
824
0
        all_done = q_context->countdown(ins_ids.size());
825
0
        for (const auto& ins_id : ins_ids) {
826
0
            VLOG(10) << fmt::format("Removing query {} instance {}, all done? {}",
827
0
                                    print_id(query_id), print_id(ins_id), all_done);
828
0
            _pipeline_map.erase(ins_id);
829
0
            g_pipeline_fragment_instances_count << -1;
830
0
        }
831
0
    }
832
0
    if (all_done) {
833
0
        _query_ctx_map.erase(query_id);
834
0
    }
835
0
}
836
837
template <typename Params>
838
Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline,
839
                                   QuerySource query_source,
840
0
                                   std::shared_ptr<QueryContext>& query_ctx) {
841
0
    DBUG_EXECUTE_IF("FragmentMgr._get_query_ctx.failed",
842
0
                    { return Status::InternalError("FragmentMgr._get_query_ctx.failed"); });
843
0
    if (params.is_simplified_param) {
844
        // Get common components from _query_ctx_map
845
0
        query_ctx = _query_ctx_map.find(query_id);
846
0
        if (query_ctx == nullptr) {
847
0
            return Status::InternalError(
848
0
                    "Failed to get query fragments context. Query may be "
849
0
                    "timeout or be cancelled. host: {}",
850
0
                    BackendOptions::get_localhost());
851
0
        }
852
0
    } else {
853
        // Find _query_ctx_map, in case some other request has already
854
        // create the query fragments context.
855
0
        RETURN_IF_ERROR(_query_ctx_map.apply_if_not_exists(
856
0
                query_id, query_ctx,
857
0
                [&](phmap::flat_hash_map<TUniqueId, std::shared_ptr<QueryContext>>& map) -> Status {
858
0
                    TNetworkAddress current_connect_fe_addr;
859
                    // for gray upragde between 2.1 version, fe may not set current_connect_fe,
860
                    // then use coord addr instead
861
0
                    if (params.__isset.current_connect_fe) {
862
0
                        current_connect_fe_addr = params.current_connect_fe;
863
0
                    } else {
864
0
                        current_connect_fe_addr = params.coord;
865
0
                    }
866
867
0
                    VLOG(10) << "query_id: " << print_id(query_id)
868
0
                             << ", coord_addr: " << params.coord
869
0
                             << ", total fragment num on current host: "
870
0
                             << params.fragment_num_on_host
871
0
                             << ", fe process uuid: " << params.query_options.fe_process_uuid
872
0
                             << ", query type: " << params.query_options.query_type
873
0
                             << ", report audit fe:" << current_connect_fe_addr << ", limit: "
874
0
                             << PrettyPrinter::print(query_ctx->mem_limit(), TUnit::BYTES);
875
876
                    // This may be a first fragment request of the query.
877
                    // Create the query fragments context.
878
0
                    query_ctx = QueryContext::create_shared(
879
0
                            query_id, params.fragment_num_on_host, _exec_env, params.query_options,
880
0
                            params.coord, pipeline, params.is_nereids, current_connect_fe_addr,
881
0
                            query_source);
882
0
                    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker);
883
0
                    RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl,
884
0
                                                          &(query_ctx->desc_tbl)));
885
                    // set file scan range params
886
0
                    if (params.__isset.file_scan_params) {
887
0
                        query_ctx->file_scan_range_params_map = params.file_scan_params;
888
0
                    }
889
890
0
                    query_ctx->query_globals = params.query_globals;
891
892
0
                    if (params.__isset.resource_info) {
893
0
                        query_ctx->user = params.resource_info.user;
894
0
                        query_ctx->group = params.resource_info.group;
895
0
                        query_ctx->set_rsc_info = true;
896
0
                    }
897
898
0
                    query_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(
899
0
                            pipeline);
900
0
                    _set_scan_concurrency(params, query_ctx.get());
901
0
                    const bool is_pipeline = std::is_same_v<TPipelineFragmentParams, Params>;
902
903
0
                    if (params.__isset.workload_groups && !params.workload_groups.empty()) {
904
0
                        uint64_t tg_id = params.workload_groups[0].id;
905
0
                        WorkloadGroupPtr workload_group_ptr =
906
0
                                _exec_env->workload_group_mgr()->get_task_group_by_id(tg_id);
907
0
                        if (workload_group_ptr != nullptr) {
908
0
                            RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx));
909
0
                            RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr));
910
0
                            _exec_env->runtime_query_statistics_mgr()->set_workload_group_id(
911
0
                                    print_id(query_id), tg_id);
912
913
0
                            VLOG(10) << "Query/load id: " << print_id(query_ctx->query_id())
914
0
                                     << ", use workload group: "
915
0
                                     << workload_group_ptr->debug_string()
916
0
                                     << ", is pipeline: " << ((int)is_pipeline);
917
0
                        } else {
918
0
                            VLOG(10) << "Query/load id: " << print_id(query_ctx->query_id())
919
0
                                     << " carried group info but can not find group in be";
920
0
                        }
921
0
                    }
922
                    // There is some logic in query ctx's dctor, we could not check if exists and delete the
923
                    // temp query ctx now. For example, the query id maybe removed from workload group's queryset.
924
0
                    map.insert({query_id, query_ctx});
925
0
                    return Status::OK();
926
0
                }));
927
0
    }
928
0
    return Status::OK();
929
0
}
Unexecuted instantiation: _ZN5doris11FragmentMgr14_get_query_ctxINS_23TExecPlanFragmentParamsEEENS_6StatusERKT_NS_9TUniqueIdEbNS_11QuerySourceERSt10shared_ptrINS_12QueryContextEE
Unexecuted instantiation: _ZN5doris11FragmentMgr14_get_query_ctxINS_23TPipelineFragmentParamsEEENS_6StatusERKT_NS_9TUniqueIdEbNS_11QuerySourceERSt10shared_ptrINS_12QueryContextEE
930
931
Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
932
0
                                       QuerySource query_source, const FinishCallback& cb) {
933
0
    VLOG_ROW << "exec_plan_fragment params is "
934
0
             << apache::thrift::ThriftDebugString(params).c_str();
935
    // sometimes TExecPlanFragmentParams debug string is too long and glog
936
    // will truncate the log line, so print query options seperately for debuggin purpose
937
0
    VLOG_ROW << "query options is "
938
0
             << apache::thrift::ThriftDebugString(params.query_options).c_str();
939
0
    const TUniqueId& fragment_instance_id = params.params.fragment_instance_id;
940
0
    {
941
0
        auto iter = _fragment_instance_map.find(fragment_instance_id);
942
0
        if (iter != nullptr) {
943
            // Duplicated
944
0
            LOG(WARNING) << "duplicate fragment instance id: " << print_id(fragment_instance_id);
945
0
            return Status::OK();
946
0
        }
947
0
    }
948
949
0
    std::shared_ptr<QueryContext> query_ctx;
950
0
    bool pipeline_engine_enabled = params.query_options.__isset.enable_pipeline_engine &&
951
0
                                   params.query_options.enable_pipeline_engine;
952
953
0
    RETURN_IF_ERROR(_get_query_ctx(params, params.params.query_id, pipeline_engine_enabled,
954
0
                                   query_source, query_ctx));
955
0
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker);
956
0
    {
957
        // Need lock here, because it will modify fragment ids and std::vector may resize and reallocate
958
        // memory, but query_is_canncelled will traverse the vector, it will core.
959
        // query_is_cancelled is called in allocator, we has to avoid dead lock.
960
0
        query_ctx->push_instance_ids(fragment_instance_id);
961
0
    }
962
963
0
    auto fragment_executor = std::make_shared<PlanFragmentExecutor>(
964
0
            _exec_env, query_ctx, params.params.fragment_instance_id, -1, params.backend_num,
965
0
            std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this,
966
0
                            std::placeholders::_1));
967
0
    if (params.__isset.need_wait_execution_trigger && params.need_wait_execution_trigger) {
968
        // set need_wait_execution_trigger means this instance will not actually being executed
969
        // until the execPlanFragmentStart RPC trigger to start it.
970
0
        fragment_executor->set_need_wait_execution_trigger();
971
0
    }
972
973
0
    int64_t duration_ns = 0;
974
0
    DCHECK(!pipeline_engine_enabled);
975
0
    {
976
0
        SCOPED_RAW_TIMER(&duration_ns);
977
0
        RETURN_IF_ERROR(fragment_executor->prepare(params));
978
0
    }
979
0
    g_fragmentmgr_prepare_latency << (duration_ns / 1000);
980
0
    std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
981
    // TODO need check the status, but when I add return_if_error the P0 will not pass
982
0
    static_cast<void>(_runtimefilter_controller.add_entity(
983
0
            params.params, params.params.query_id, params.query_options, &handler,
984
0
            RuntimeFilterParamsContext::create(fragment_executor->runtime_state())));
985
0
    {
986
0
        if (handler) {
987
0
            query_ctx->set_merge_controller_handler(handler);
988
0
        }
989
0
        _fragment_instance_map.insert(params.params.fragment_instance_id, fragment_executor);
990
0
    }
991
992
0
    auto st = _thread_pool->submit_func([this, fragment_executor, cb]() {
993
#ifndef BE_TEST
994
        SCOPED_ATTACH_TASK(fragment_executor->runtime_state());
995
#endif
996
0
        _exec_actual(fragment_executor, cb);
997
0
    });
998
0
    if (!st.ok()) {
999
0
        {
1000
            // Remove the exec state added
1001
0
            _fragment_instance_map.erase(params.params.fragment_instance_id);
1002
0
        }
1003
0
        fragment_executor->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
1004
0
                                  "push plan fragment to thread pool failed");
1005
0
        return Status::InternalError(
1006
0
                strings::Substitute("push plan fragment $0 to thread pool failed. err = $1, BE: $2",
1007
0
                                    print_id(params.params.fragment_instance_id), st.to_string(),
1008
0
                                    BackendOptions::get_localhost()));
1009
0
    }
1010
1011
0
    return Status::OK();
1012
0
}
1013
1014
0
std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) {
1015
0
    fmt::memory_buffer debug_string_buffer;
1016
0
    auto t = MonotonicNanos();
1017
0
    size_t i = 0;
1018
0
    {
1019
0
        fmt::format_to(debug_string_buffer,
1020
0
                       "{} pipeline fragment contexts are still running! duration_limit={}\n",
1021
0
                       _pipeline_map.num_items(), duration);
1022
1023
0
        timespec now;
1024
0
        clock_gettime(CLOCK_MONOTONIC, &now);
1025
0
        _pipeline_map.apply(
1026
0
                [&](phmap::flat_hash_map<TUniqueId,
1027
0
                                         std::shared_ptr<pipeline::PipelineFragmentContext>>& map)
1028
0
                        -> Status {
1029
0
                    for (auto& it : map) {
1030
0
                        auto elapsed = (t - it.second->create_time()) / 1000000000.0;
1031
0
                        if (elapsed < duration) {
1032
                            // Only display tasks which has been running for more than {duration} seconds.
1033
0
                            continue;
1034
0
                        }
1035
0
                        auto timeout_second = it.second->timeout_second();
1036
0
                        fmt::format_to(
1037
0
                                debug_string_buffer,
1038
0
                                "No.{} (elapse_second={}s, query_timeout_second={}s, instance_id="
1039
0
                                "{}) : {}\n",
1040
0
                                i, elapsed, timeout_second, print_id(it.first),
1041
0
                                it.second->debug_string());
1042
0
                        i++;
1043
0
                    }
1044
0
                    return Status::OK();
1045
0
                });
1046
0
    }
1047
0
    return fmt::to_string(debug_string_buffer);
1048
0
}
1049
1050
0
std::string FragmentMgr::dump_pipeline_tasks(TUniqueId& query_id) {
1051
0
    if (auto q_ctx = get_query_context(query_id)) {
1052
0
        return q_ctx->print_all_pipeline_context();
1053
0
    } else {
1054
0
        return fmt::format("Query context (query id = {}) not found. \n", print_id(query_id));
1055
0
    }
1056
0
}
1057
1058
Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
1059
0
                                       QuerySource query_source, const FinishCallback& cb) {
1060
0
    VLOG_ROW << "query: " << print_id(params.query_id) << " exec_plan_fragment params is "
1061
0
             << apache::thrift::ThriftDebugString(params).c_str();
1062
    // sometimes TExecPlanFragmentParams debug string is too long and glog
1063
    // will truncate the log line, so print query options seperately for debuggin purpose
1064
0
    VLOG_ROW << "query: " << print_id(params.query_id) << "query options is "
1065
0
             << apache::thrift::ThriftDebugString(params.query_options).c_str();
1066
1067
0
    std::shared_ptr<QueryContext> query_ctx;
1068
0
    RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, query_source, query_ctx));
1069
0
    SCOPED_ATTACH_TASK(query_ctx.get());
1070
0
    const bool enable_pipeline_x = params.query_options.__isset.enable_pipeline_x_engine &&
1071
0
                                   params.query_options.enable_pipeline_x_engine;
1072
0
    if (enable_pipeline_x) {
1073
0
        _setup_shared_hashtable_for_broadcast_join(params, query_ctx.get());
1074
0
        int64_t duration_ns = 0;
1075
0
        std::shared_ptr<pipeline::PipelineFragmentContext> context =
1076
0
                std::make_shared<pipeline::PipelineXFragmentContext>(
1077
0
                        query_ctx->query_id(), params.fragment_id, query_ctx, _exec_env, cb,
1078
0
                        std::bind<Status>(
1079
0
                                std::mem_fn(&FragmentMgr::trigger_pipeline_context_report), this,
1080
0
                                std::placeholders::_1, std::placeholders::_2));
1081
0
        {
1082
0
            SCOPED_RAW_TIMER(&duration_ns);
1083
0
            auto prepare_st = context->prepare(params, _thread_pool.get());
1084
0
            if (!prepare_st.ok()) {
1085
0
                context->close_if_prepare_failed(prepare_st);
1086
0
                query_ctx->set_execution_dependency_ready();
1087
0
                return prepare_st;
1088
0
            }
1089
0
        }
1090
0
        g_fragmentmgr_prepare_latency << (duration_ns / 1000);
1091
1092
0
        DBUG_EXECUTE_IF("FragmentMgr.exec_plan_fragment.failed",
1093
0
                        { return Status::Aborted("FragmentMgr.exec_plan_fragment.failed"); });
1094
1095
0
        std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
1096
0
        RETURN_IF_ERROR(_runtimefilter_controller.add_entity(
1097
0
                params.local_params[0], params.query_id, params.query_options, &handler,
1098
0
                RuntimeFilterParamsContext::create(context->get_runtime_state())));
1099
0
        if (handler) {
1100
0
            query_ctx->set_merge_controller_handler(handler);
1101
0
        }
1102
1103
0
        for (const auto& local_param : params.local_params) {
1104
0
            const TUniqueId& fragment_instance_id = local_param.fragment_instance_id;
1105
0
            auto iter = _pipeline_map.find(fragment_instance_id);
1106
0
            if (iter != nullptr) {
1107
0
                return Status::InternalError(
1108
0
                        "exec_plan_fragment input duplicated fragment_instance_id({})",
1109
0
                        UniqueId(fragment_instance_id).to_string());
1110
0
            }
1111
0
            query_ctx->push_instance_ids(fragment_instance_id);
1112
0
        }
1113
1114
0
        if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) {
1115
0
            query_ctx->set_ready_to_execute_only();
1116
0
        }
1117
1118
0
        {
1119
0
            std::vector<TUniqueId> ins_ids;
1120
0
            reinterpret_cast<pipeline::PipelineXFragmentContext*>(context.get())
1121
0
                    ->instance_ids(ins_ids);
1122
            // TODO: simplify this mapping
1123
0
            for (const auto& ins_id : ins_ids) {
1124
0
                _pipeline_map.insert(ins_id, context);
1125
0
            }
1126
0
        }
1127
0
        query_ctx->set_pipeline_context(params.fragment_id, context);
1128
1129
0
        RETURN_IF_ERROR(context->submit());
1130
0
        return Status::OK();
1131
0
    } else {
1132
0
        auto pre_and_submit = [&](int i) {
1133
0
            const auto& local_params = params.local_params[i];
1134
1135
0
            const TUniqueId& fragment_instance_id = local_params.fragment_instance_id;
1136
0
            {
1137
0
                auto res = _pipeline_map.find(fragment_instance_id);
1138
0
                if (res != nullptr) {
1139
                    // Duplicated
1140
0
                    return Status::OK();
1141
0
                }
1142
0
                query_ctx->push_instance_ids(fragment_instance_id);
1143
0
            }
1144
1145
0
            int64_t duration_ns = 0;
1146
0
            if (!params.__isset.need_wait_execution_trigger ||
1147
0
                !params.need_wait_execution_trigger) {
1148
0
                query_ctx->set_ready_to_execute_only();
1149
0
            }
1150
0
            _setup_shared_hashtable_for_broadcast_join(params, local_params, query_ctx.get());
1151
0
            std::shared_ptr<pipeline::PipelineFragmentContext> context =
1152
0
                    std::make_shared<pipeline::PipelineFragmentContext>(
1153
0
                            query_ctx->query_id(), fragment_instance_id, params.fragment_id,
1154
0
                            local_params.backend_num, query_ctx, _exec_env, cb,
1155
0
                            std::bind<Status>(
1156
0
                                    std::mem_fn(&FragmentMgr::trigger_pipeline_context_report),
1157
0
                                    this, std::placeholders::_1, std::placeholders::_2));
1158
0
            {
1159
0
                SCOPED_RAW_TIMER(&duration_ns);
1160
0
                auto prepare_st = context->prepare(params, i);
1161
0
                if (!prepare_st.ok()) {
1162
0
                    LOG(WARNING) << "Prepare failed: " << prepare_st.to_string();
1163
0
                    context->close_if_prepare_failed(prepare_st);
1164
0
                    static_cast<void>(context->update_status(prepare_st));
1165
0
                    return prepare_st;
1166
0
                }
1167
0
            }
1168
0
            g_fragmentmgr_prepare_latency << (duration_ns / 1000);
1169
1170
0
            DBUG_EXECUTE_IF("FragmentMgr.exec_plan_fragment.failed",
1171
0
                            { return Status::Aborted("FragmentMgr.exec_plan_fragment.failed"); });
1172
1173
0
            std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
1174
0
            RETURN_IF_ERROR(_runtimefilter_controller.add_entity(
1175
0
                    local_params, params.query_id, params.query_options, &handler,
1176
0
                    RuntimeFilterParamsContext::create(context->get_runtime_state())));
1177
0
            if (i == 0 && handler) {
1178
0
                query_ctx->set_merge_controller_handler(handler);
1179
0
            }
1180
0
            _pipeline_map.insert(fragment_instance_id, context);
1181
1182
0
            return context->submit();
1183
0
        };
1184
1185
0
        int target_size = params.local_params.size();
1186
0
        g_pipeline_fragment_instances_count << target_size;
1187
1188
0
        const auto& local_params = params.local_params[0];
1189
0
        if (local_params.__isset.runtime_filter_params) {
1190
0
            if (local_params.__isset.runtime_filter_params) {
1191
0
                query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
1192
0
                        local_params.runtime_filter_params);
1193
0
            }
1194
0
        }
1195
0
        if (local_params.__isset.topn_filter_source_node_ids) {
1196
0
            query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids);
1197
0
        } else {
1198
0
            query_ctx->init_runtime_predicates({0});
1199
0
        }
1200
1201
0
        if (target_size > 1) {
1202
0
            int prepare_done = {0};
1203
0
            Status prepare_status[target_size];
1204
0
            std::mutex m;
1205
0
            std::condition_variable cv;
1206
1207
0
            for (size_t i = 0; i < target_size; i++) {
1208
0
                RETURN_IF_ERROR(_thread_pool->submit_func([&, i]() {
1209
0
                    SCOPED_ATTACH_TASK(query_ctx.get());
1210
0
                    prepare_status[i] = pre_and_submit(i);
1211
0
                    std::unique_lock<std::mutex> lock(m);
1212
0
                    prepare_done++;
1213
0
                    if (prepare_done == target_size) {
1214
0
                        cv.notify_one();
1215
0
                    }
1216
0
                }));
1217
0
            }
1218
1219
0
            std::unique_lock<std::mutex> lock(m);
1220
0
            if (prepare_done != target_size) {
1221
0
                cv.wait(lock);
1222
1223
0
                for (size_t i = 0; i < target_size; i++) {
1224
0
                    if (!prepare_status[i].ok()) {
1225
0
                        return prepare_status[i];
1226
0
                    }
1227
0
                }
1228
0
            }
1229
0
            return Status::OK();
1230
0
        } else {
1231
0
            return pre_and_submit(0);
1232
0
        }
1233
0
    }
1234
0
    return Status::OK();
1235
0
}
1236
1237
template <typename Param>
1238
0
void FragmentMgr::_set_scan_concurrency(const Param& params, QueryContext* query_ctx) {
1239
#ifndef BE_TEST
1240
    // If the token is set, the scan task will use limited_scan_pool in scanner scheduler.
1241
    // Otherwise, the scan task will use local/remote scan pool in scanner scheduler
1242
    if (params.query_options.__isset.resource_limit &&
1243
        params.query_options.resource_limit.__isset.cpu_limit) {
1244
        query_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit, false);
1245
    }
1246
#endif
1247
0
}
Unexecuted instantiation: _ZN5doris11FragmentMgr21_set_scan_concurrencyINS_23TExecPlanFragmentParamsEEEvRKT_PNS_12QueryContextE
Unexecuted instantiation: _ZN5doris11FragmentMgr21_set_scan_concurrencyINS_23TPipelineFragmentParamsEEEvRKT_PNS_12QueryContextE
1248
1249
0
std::shared_ptr<QueryContext> FragmentMgr::get_query_context(const TUniqueId& query_id) {
1250
0
    return _query_ctx_map.find(query_id);
1251
0
}
1252
1253
void FragmentMgr::cancel_query(const TUniqueId& query_id, const PPlanFragmentCancelReason& reason,
1254
0
                               const std::string& msg) {
1255
0
    std::shared_ptr<QueryContext> query_ctx;
1256
0
    std::vector<TUniqueId> all_instance_ids;
1257
0
    {
1258
0
        query_ctx = _query_ctx_map.find(query_id);
1259
1260
0
        if (query_ctx == nullptr) {
1261
0
            LOG(WARNING) << "Query " << print_id(query_id)
1262
0
                         << " does not exists, failed to cancel it";
1263
0
            return;
1264
0
        }
1265
        // Copy instanceids to avoid concurrent modification.
1266
        // And to reduce the scope of lock.
1267
0
        all_instance_ids = query_ctx->fragment_instance_ids;
1268
0
    }
1269
0
    if (query_ctx->enable_pipeline_x_exec()) {
1270
0
        query_ctx->cancel_all_pipeline_context(reason, msg);
1271
0
    } else {
1272
0
        for (auto it : all_instance_ids) {
1273
0
            cancel_instance(it, reason, msg);
1274
0
        }
1275
0
    }
1276
1277
0
    query_ctx->cancel(msg, Status::Cancelled(msg));
1278
0
    _query_ctx_map.erase(query_id);
1279
0
    LOG(INFO) << "Query " << print_id(query_id) << " is cancelled and removed. Reason: " << msg;
1280
0
}
1281
1282
void FragmentMgr::cancel_instance(const TUniqueId& instance_id,
1283
1
                                  const PPlanFragmentCancelReason& reason, const std::string& msg) {
1284
1
    std::shared_ptr<pipeline::PipelineFragmentContext> pipeline_ctx;
1285
1
    std::shared_ptr<PlanFragmentExecutor> non_pipeline_ctx;
1286
1
    {
1287
1
        pipeline_ctx = _pipeline_map.find(instance_id);
1288
1
        if (!pipeline_ctx) {
1289
1
            non_pipeline_ctx = _fragment_instance_map.find(instance_id);
1290
1
            if (non_pipeline_ctx == nullptr) {
1291
1
                LOG(WARNING) << "Could not find the fragment instance id:" << print_id(instance_id)
1292
1
                             << " to cancel";
1293
1
                return;
1294
1
            }
1295
1
        }
1296
1
    }
1297
1298
0
    if (pipeline_ctx != nullptr) {
1299
0
        pipeline_ctx->cancel(reason, msg);
1300
0
    } else if (non_pipeline_ctx != nullptr) {
1301
        // calling PlanFragmentExecutor::cancel
1302
0
        non_pipeline_ctx->cancel(reason, msg);
1303
0
    }
1304
0
}
1305
1306
void FragmentMgr::cancel_fragment(const TUniqueId& query_id, int32_t fragment_id,
1307
0
                                  const PPlanFragmentCancelReason& reason, const std::string& msg) {
1308
0
    auto res = _query_ctx_map.find(query_id);
1309
0
    if (res != nullptr) {
1310
        // Has to use value to keep the shared ptr not deconstructed.
1311
0
        WARN_IF_ERROR(res->cancel_pipeline_context(fragment_id, reason, msg),
1312
0
                      "fail to cancel fragment");
1313
0
    } else {
1314
0
        LOG(WARNING) << "Could not find the query id:" << print_id(query_id)
1315
0
                     << " fragment id:" << fragment_id << " to cancel";
1316
0
    }
1317
0
}
1318
1319
4
void FragmentMgr::cancel_worker() {
1320
4
    LOG(INFO) << "FragmentMgr cancel worker start working.";
1321
1322
4
    timespec check_invalid_query_last_timestamp;
1323
4
    clock_gettime(CLOCK_MONOTONIC, &check_invalid_query_last_timestamp);
1324
1325
4
    do {
1326
4
        std::vector<TUniqueId> queries_timeout;
1327
4
        std::vector<TUniqueId> queries_to_cancel;
1328
4
        std::vector<TUniqueId> queries_pipeline_task_leak;
1329
        // Fe process uuid -> set<QueryId>
1330
4
        std::map<int64_t, std::unordered_set<TUniqueId>> running_queries_on_all_fes;
1331
4
        const std::map<TNetworkAddress, FrontendInfo>& running_fes =
1332
4
                ExecEnv::GetInstance()->get_running_frontends();
1333
1334
4
        timespec now_for_check_invalid_query;
1335
4
        clock_gettime(CLOCK_MONOTONIC, &now_for_check_invalid_query);
1336
1337
4
        if (config::enable_pipeline_task_leakage_detect &&
1338
4
            now_for_check_invalid_query.tv_sec - check_invalid_query_last_timestamp.tv_sec >
1339
0
                    config::pipeline_task_leakage_detect_period_secs) {
1340
0
            check_invalid_query_last_timestamp = now_for_check_invalid_query;
1341
0
            running_queries_on_all_fes = _get_all_running_queries_from_fe();
1342
4
        } else {
1343
4
            running_queries_on_all_fes.clear();
1344
4
        }
1345
1346
4
        VecDateTimeValue now = VecDateTimeValue::local_time();
1347
4
        std::unordered_map<std::shared_ptr<PBackendService_Stub>, BrpcItem> brpc_stub_with_queries;
1348
4
        {
1349
4
            _fragment_instance_map.apply(
1350
4
                    [&](phmap::flat_hash_map<TUniqueId, std::shared_ptr<PlanFragmentExecutor>>& map)
1351
512
                            -> Status {
1352
512
                        for (auto& fragment_instance_itr : map) {
1353
0
                            if (fragment_instance_itr.second->is_timeout(now)) {
1354
0
                                queries_timeout.push_back(
1355
0
                                        fragment_instance_itr.second->fragment_instance_id());
1356
0
                            }
1357
0
                        }
1358
512
                        return Status::OK();
1359
512
                    });
1360
4
            _pipeline_map.apply(
1361
4
                    [&](phmap::flat_hash_map<
1362
4
                            TUniqueId, std::shared_ptr<pipeline::PipelineFragmentContext>>& map)
1363
512
                            -> Status {
1364
512
                        for (auto& pipeline_itr : map) {
1365
0
                            if (pipeline_itr.second->is_timeout(now)) {
1366
0
                                std::vector<TUniqueId> ins_ids;
1367
0
                                reinterpret_cast<pipeline::PipelineXFragmentContext*>(
1368
0
                                        pipeline_itr.second.get())
1369
0
                                        ->instance_ids(ins_ids);
1370
0
                                for (auto& ins_id : ins_ids) {
1371
0
                                    queries_timeout.push_back(ins_id);
1372
0
                                }
1373
0
                            } else {
1374
0
                                pipeline_itr.second->clear_finished_tasks();
1375
0
                            }
1376
0
                        }
1377
512
                        return Status::OK();
1378
512
                    });
1379
4
        }
1380
4
        {
1381
4
            _query_ctx_map.apply([&](phmap::flat_hash_map<TUniqueId, std::shared_ptr<QueryContext>>&
1382
512
                                             map) -> Status {
1383
512
                for (auto it = map.begin(); it != map.end();) {
1384
0
                    if (it->second->is_timeout(now)) {
1385
0
                        LOG_WARNING("Query {} is timeout", print_id(it->first));
1386
0
                        it = map.erase(it);
1387
0
                    } else {
1388
0
                        if (config::enable_brpc_connection_check) {
1389
0
                            auto brpc_stubs = it->second->get_using_brpc_stubs();
1390
0
                            for (auto& item : brpc_stubs) {
1391
0
                                if (!brpc_stub_with_queries.contains(item.second)) {
1392
0
                                    brpc_stub_with_queries.emplace(
1393
0
                                            item.second, BrpcItem {item.first, {it->second}});
1394
0
                                } else {
1395
0
                                    brpc_stub_with_queries[item.second].queries.emplace_back(
1396
0
                                            it->second);
1397
0
                                }
1398
0
                            }
1399
0
                        }
1400
0
                        ++it;
1401
0
                    }
1402
0
                }
1403
512
                return Status::OK();
1404
512
            });
1405
4
        }
1406
4
        {
1407
            // We use a very conservative cancel strategy.
1408
            // 0. If there are no running frontends, do not cancel any queries.
1409
            // 1. If query's process uuid is zero, do not cancel
1410
            // 2. If same process uuid, do not cancel
1411
            // 3. If fe has zero process uuid, do not cancel
1412
4
            if (running_fes.empty() && _query_ctx_map.num_items() != 0) {
1413
0
                LOG_EVERY_N(WARNING, 10)
1414
0
                        << "Could not find any running frontends, maybe we are upgrading or "
1415
0
                           "starting? "
1416
0
                        << "We will not cancel any outdated queries in this situation.";
1417
4
            } else {
1418
4
                _query_ctx_map.apply([&](phmap::flat_hash_map<TUniqueId,
1419
4
                                                              std::shared_ptr<QueryContext>>& map)
1420
512
                                             -> Status {
1421
512
                    for (const auto& q : map) {
1422
0
                        auto q_ctx = q.second;
1423
0
                        const int64_t fe_process_uuid = q_ctx->get_fe_process_uuid();
1424
1425
0
                        if (fe_process_uuid == 0) {
1426
                            // zero means this query is from a older version fe or
1427
                            // this fe is starting
1428
0
                            continue;
1429
0
                        }
1430
1431
                        // If the query is not running on the any frontends, cancel it.
1432
0
                        if (auto itr = running_queries_on_all_fes.find(fe_process_uuid);
1433
0
                            itr != running_queries_on_all_fes.end()) {
1434
                            // Query not found on this frontend, and the query arrives before the last check
1435
0
                            if (itr->second.find(q_ctx->query_id()) == itr->second.end() &&
1436
                                // tv_nsec represents the number of nanoseconds that have elapsed since the time point stored in tv_sec.
1437
                                // tv_sec is enough, we do not need to check tv_nsec.
1438
0
                                q_ctx->get_query_arrival_timestamp().tv_sec <
1439
0
                                        check_invalid_query_last_timestamp.tv_sec &&
1440
0
                                q_ctx->get_query_source() == QuerySource::INTERNAL_FRONTEND) {
1441
0
                                if (q_ctx->enable_pipeline_x_exec()) {
1442
0
                                    queries_pipeline_task_leak.push_back(q_ctx->query_id());
1443
0
                                    LOG_INFO(
1444
0
                                            "Query {}, type {} is not found on any frontends, "
1445
0
                                            "maybe it is leaked.",
1446
0
                                            print_id(q_ctx->query_id()),
1447
0
                                            toString(q_ctx->get_query_source()));
1448
0
                                    continue;
1449
0
                                }
1450
0
                            }
1451
0
                        }
1452
1453
0
                        auto query_context = q.second;
1454
1455
0
                        auto itr = running_fes.find(query_context->coord_addr);
1456
0
                        if (itr != running_fes.end()) {
1457
0
                            if (q.second->get_fe_process_uuid() == itr->second.info.process_uuid ||
1458
0
                                itr->second.info.process_uuid == 0) {
1459
0
                                continue;
1460
0
                            } else {
1461
0
                                LOG_WARNING(
1462
0
                                        "Coordinator of query {} restarted, going to cancel it.",
1463
0
                                        print_id(q.second->query_id()));
1464
0
                            }
1465
0
                        } else {
1466
                            // In some rear cases, the rpc port of follower is not updated in time,
1467
                            // then the port of this follower will be zero, but acutally it is still running,
1468
                            // and be has already received the query from follower.
1469
                            // So we need to check if host is in running_fes.
1470
0
                            bool fe_host_is_standing = std::any_of(
1471
0
                                    running_fes.begin(), running_fes.end(),
1472
0
                                    [query_context](const auto& fe) {
1473
0
                                        return fe.first.hostname ==
1474
0
                                                       query_context->coord_addr.hostname &&
1475
0
                                               fe.first.port == 0;
1476
0
                                    });
1477
1478
0
                            if (fe_host_is_standing) {
1479
0
                                LOG_WARNING(
1480
0
                                        "Coordinator {}:{} is not found, but its host is still "
1481
0
                                        "running with an unstable rpc port, not going to cancel "
1482
0
                                        "it.",
1483
0
                                        query_context->coord_addr.hostname,
1484
0
                                        query_context->coord_addr.port,
1485
0
                                        print_id(query_context->query_id()));
1486
0
                                continue;
1487
0
                            } else {
1488
0
                                LOG_WARNING(
1489
0
                                        "Could not find target coordinator {}:{} of query {}, "
1490
0
                                        "going to "
1491
0
                                        "cancel it.",
1492
0
                                        query_context->coord_addr.hostname,
1493
0
                                        query_context->coord_addr.port,
1494
0
                                        print_id(query_context->query_id()));
1495
0
                            }
1496
0
                        }
1497
1498
                        // Coorninator of this query has already dead.
1499
0
                        queries_to_cancel.push_back(q.first);
1500
0
                    }
1501
512
                    return Status::OK();
1502
512
                });
1503
4
            }
1504
4
        }
1505
1506
        // TODO(zhiqiang): It seems that timeout_canceled_fragment_count is
1507
        // designed to count canceled fragment of non-pipeline query.
1508
4
        timeout_canceled_fragment_count->increment(queries_timeout.size());
1509
4
        for (auto& id : queries_timeout) {
1510
0
            cancel_instance(id, PPlanFragmentCancelReason::TIMEOUT, "Query timeout");
1511
0
            LOG(INFO) << "FragmentMgr cancel worker going to cancel timeout instance "
1512
0
                      << print_id(id);
1513
0
        }
1514
1515
4
        for (const auto& qid : queries_pipeline_task_leak) {
1516
            // Cancel the query, and maybe try to report debug info to fe so that we can
1517
            // collect debug info by sql or http api instead of search log.
1518
0
            cancel_query(qid, PPlanFragmentCancelReason::INTERNAL_ERROR,
1519
0
                         std::string("Pipeline task leak."));
1520
0
        }
1521
1522
4
        if (!queries_to_cancel.empty()) {
1523
0
            LOG(INFO) << "There are " << queries_to_cancel.size()
1524
0
                      << " queries need to be cancelled, coordinator dead or restarted.";
1525
0
        }
1526
1527
4
        for (const auto& qid : queries_to_cancel) {
1528
0
            cancel_query(qid, PPlanFragmentCancelReason::INTERNAL_ERROR,
1529
0
                         std::string("Coordinator dead."));
1530
0
        }
1531
1532
4
        if (config::enable_brpc_connection_check) {
1533
0
            for (auto it : brpc_stub_with_queries) {
1534
0
                if (!it.first) {
1535
0
                    continue;
1536
0
                }
1537
0
                _check_brpc_available(it.first, it.second);
1538
0
            }
1539
0
        }
1540
4
    } while (!_stop_background_threads_latch.wait_for(
1541
4
            std::chrono::seconds(config::fragment_mgr_cancel_worker_interval_seconds)));
1542
4
    LOG(INFO) << "FragmentMgr cancel worker is going to exit.";
1543
4
}
1544
1545
0
void FragmentMgr::debug(std::stringstream& ss) {
1546
    // Keep things simple
1547
0
    ss << "FragmentMgr have " << _fragment_instance_map.num_items() << " jobs.\n";
1548
0
    ss << "job_id\t\tstart_time\t\texecute_time(s)\n";
1549
0
    VecDateTimeValue now = VecDateTimeValue::local_time();
1550
0
    _fragment_instance_map.apply(
1551
0
            [&](phmap::flat_hash_map<TUniqueId, std::shared_ptr<PlanFragmentExecutor>>& map)
1552
0
                    -> Status {
1553
0
                for (auto& it : map) {
1554
0
                    ss << it.first << "\t" << it.second->start_time().debug_string() << "\t"
1555
0
                       << now.second_diff(it.second->start_time()) << "\n";
1556
0
                }
1557
0
                return Status::OK();
1558
0
            });
1559
0
}
1560
1561
void FragmentMgr::_check_brpc_available(const std::shared_ptr<PBackendService_Stub>& brpc_stub,
1562
0
                                        const BrpcItem& brpc_item) {
1563
0
    const std::string message = "hello doris!";
1564
0
    std::string error_message;
1565
0
    int32_t failed_count = 0;
1566
0
    const int64_t check_timeout_ms =
1567
0
            std::max<int64_t>(100, config::brpc_connection_check_timeout_ms);
1568
1569
0
    while (true) {
1570
0
        PHandShakeRequest request;
1571
0
        request.set_hello(message);
1572
0
        PHandShakeResponse response;
1573
0
        brpc::Controller cntl;
1574
0
        cntl.set_timeout_ms(check_timeout_ms);
1575
0
        cntl.set_max_retry(10);
1576
0
        brpc_stub->hand_shake(&cntl, &request, &response, nullptr);
1577
1578
0
        if (cntl.Failed()) {
1579
0
            error_message = cntl.ErrorText();
1580
0
            LOG(WARNING) << "brpc stub: " << brpc_item.network_address.hostname << ":"
1581
0
                         << brpc_item.network_address.port << " check failed: " << error_message;
1582
0
        } else if (response.has_status() && response.status().status_code() == 0) {
1583
0
            break;
1584
0
        } else {
1585
0
            error_message = response.DebugString();
1586
0
            LOG(WARNING) << "brpc stub: " << brpc_item.network_address.hostname << ":"
1587
0
                         << brpc_item.network_address.port << " check failed: " << error_message;
1588
0
        }
1589
0
        failed_count++;
1590
0
        if (failed_count == 2) {
1591
0
            for (const auto& query_wptr : brpc_item.queries) {
1592
0
                auto query = query_wptr.lock();
1593
0
                if (query && !query->is_cancelled()) {
1594
0
                    cancel_query(query->query_id(), PPlanFragmentCancelReason::INTERNAL_ERROR,
1595
0
                                 fmt::format("brpc(dest: {}:{}) check failed: {}",
1596
0
                                             brpc_item.network_address.hostname,
1597
0
                                             brpc_item.network_address.port, error_message));
1598
0
                }
1599
0
            }
1600
1601
0
            LOG(WARNING) << "remove brpc stub from cache: " << brpc_item.network_address.hostname
1602
0
                         << ":" << brpc_item.network_address.port << ", error: " << error_message;
1603
0
            ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
1604
0
                    brpc_item.network_address.hostname, brpc_item.network_address.port);
1605
0
            break;
1606
0
        }
1607
0
    }
1608
0
}
1609
1610
/*
1611
 * 1. resolve opaqued_query_plan to thrift structure
1612
 * 2. build TExecPlanFragmentParams
1613
 */
1614
Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params,
1615
                                                const TUniqueId& fragment_instance_id,
1616
0
                                                std::vector<TScanColumnDesc>* selected_columns) {
1617
0
    const std::string& opaqued_query_plan = params.opaqued_query_plan;
1618
0
    std::string query_plan_info;
1619
    // base64 decode query plan
1620
0
    if (!base64_decode(opaqued_query_plan, &query_plan_info)) {
1621
0
        LOG(WARNING) << "open context error: base64_decode decode opaqued_query_plan failure";
1622
0
        std::stringstream msg;
1623
0
        msg << "query_plan_info: " << query_plan_info
1624
0
            << " validate error, should not be modified after returned Doris FE processed";
1625
0
        return Status::InvalidArgument(msg.str());
1626
0
    }
1627
0
    TQueryPlanInfo t_query_plan_info;
1628
0
    const uint8_t* buf = (const uint8_t*)query_plan_info.data();
1629
0
    uint32_t len = query_plan_info.size();
1630
    // deserialize TQueryPlanInfo
1631
0
    auto st = deserialize_thrift_msg(buf, &len, false, &t_query_plan_info);
1632
0
    if (!st.ok()) {
1633
0
        LOG(WARNING) << "open context error: deserialize TQueryPlanInfo failure";
1634
0
        std::stringstream msg;
1635
0
        msg << "query_plan_info: " << query_plan_info
1636
0
            << " deserialize error, should not be modified after returned Doris FE processed";
1637
0
        return Status::InvalidArgument(msg.str());
1638
0
    }
1639
1640
    // set up desc tbl
1641
0
    DescriptorTbl* desc_tbl = nullptr;
1642
0
    ObjectPool obj_pool;
1643
0
    st = DescriptorTbl::create(&obj_pool, t_query_plan_info.desc_tbl, &desc_tbl);
1644
0
    if (!st.ok()) {
1645
0
        LOG(WARNING) << "open context error: extract DescriptorTbl failure";
1646
0
        std::stringstream msg;
1647
0
        msg << "query_plan_info: " << query_plan_info
1648
0
            << " create DescriptorTbl error, should not be modified after returned Doris FE "
1649
0
               "processed";
1650
0
        return Status::InvalidArgument(msg.str());
1651
0
    }
1652
0
    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
1653
0
    if (tuple_desc == nullptr) {
1654
0
        LOG(WARNING) << "open context error: extract TupleDescriptor failure";
1655
0
        std::stringstream msg;
1656
0
        msg << "query_plan_info: " << query_plan_info
1657
0
            << " get  TupleDescriptor error, should not be modified after returned Doris FE "
1658
0
               "processed";
1659
0
        return Status::InvalidArgument(msg.str());
1660
0
    }
1661
    // process selected columns form slots
1662
0
    for (const SlotDescriptor* slot : tuple_desc->slots()) {
1663
0
        TScanColumnDesc col;
1664
0
        col.__set_name(slot->col_name());
1665
0
        col.__set_type(to_thrift(slot->type().type));
1666
0
        selected_columns->emplace_back(std::move(col));
1667
0
    }
1668
1669
0
    VLOG_QUERY << "BackendService execute open()  TQueryPlanInfo: "
1670
0
               << apache::thrift::ThriftDebugString(t_query_plan_info);
1671
    // assign the param used to execute PlanFragment
1672
0
    TExecPlanFragmentParams exec_fragment_params;
1673
0
    exec_fragment_params.protocol_version = (PaloInternalServiceVersion::type)0;
1674
0
    exec_fragment_params.__set_is_simplified_param(false);
1675
0
    exec_fragment_params.__set_fragment(t_query_plan_info.plan_fragment);
1676
0
    exec_fragment_params.__set_desc_tbl(t_query_plan_info.desc_tbl);
1677
1678
    // assign the param used for executing of PlanFragment-self
1679
0
    TPlanFragmentExecParams fragment_exec_params;
1680
0
    fragment_exec_params.query_id = t_query_plan_info.query_id;
1681
0
    fragment_exec_params.fragment_instance_id = fragment_instance_id;
1682
0
    std::map<::doris::TPlanNodeId, std::vector<TScanRangeParams>> per_node_scan_ranges;
1683
0
    std::vector<TScanRangeParams> scan_ranges;
1684
0
    std::vector<int64_t> tablet_ids = params.tablet_ids;
1685
0
    TNetworkAddress address;
1686
0
    address.hostname = BackendOptions::get_localhost();
1687
0
    address.port = doris::config::be_port;
1688
0
    std::map<int64_t, TTabletVersionInfo> tablet_info = t_query_plan_info.tablet_info;
1689
0
    for (auto tablet_id : params.tablet_ids) {
1690
0
        TPaloScanRange scan_range;
1691
0
        scan_range.db_name = params.database;
1692
0
        scan_range.table_name = params.table;
1693
0
        auto iter = tablet_info.find(tablet_id);
1694
0
        if (iter != tablet_info.end()) {
1695
0
            TTabletVersionInfo info = iter->second;
1696
0
            scan_range.tablet_id = tablet_id;
1697
0
            scan_range.version = std::to_string(info.version);
1698
            // Useless but it is required field in TPaloScanRange
1699
0
            scan_range.version_hash = "0";
1700
0
            scan_range.schema_hash = std::to_string(info.schema_hash);
1701
0
            scan_range.hosts.push_back(address);
1702
0
        } else {
1703
0
            std::stringstream msg;
1704
0
            msg << "tablet_id: " << tablet_id << " not found";
1705
0
            LOG(WARNING) << "tablet_id [ " << tablet_id << " ] not found";
1706
0
            return Status::NotFound(msg.str());
1707
0
        }
1708
0
        TScanRange doris_scan_range;
1709
0
        doris_scan_range.__set_palo_scan_range(scan_range);
1710
0
        TScanRangeParams scan_range_params;
1711
0
        scan_range_params.scan_range = doris_scan_range;
1712
0
        scan_ranges.push_back(scan_range_params);
1713
0
    }
1714
0
    per_node_scan_ranges.insert(std::make_pair((::doris::TPlanNodeId)0, scan_ranges));
1715
0
    fragment_exec_params.per_node_scan_ranges = per_node_scan_ranges;
1716
0
    exec_fragment_params.__set_params(fragment_exec_params);
1717
0
    TQueryOptions query_options;
1718
0
    query_options.batch_size = params.batch_size;
1719
0
    query_options.execution_timeout = params.execution_timeout;
1720
0
    query_options.mem_limit = params.mem_limit;
1721
0
    query_options.query_type = TQueryType::EXTERNAL;
1722
0
    exec_fragment_params.__set_query_options(query_options);
1723
0
    VLOG_ROW << "external exec_plan_fragment params is "
1724
0
             << apache::thrift::ThriftDebugString(exec_fragment_params).c_str();
1725
0
    return exec_plan_fragment(exec_fragment_params, QuerySource::EXTERNAL_CONNECTOR);
1726
0
}
1727
1728
Status FragmentMgr::apply_filter(const PPublishFilterRequest* request,
1729
0
                                 butil::IOBufAsZeroCopyInputStream* attach_data) {
1730
0
    bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
1731
1732
0
    UniqueId fragment_instance_id = request->fragment_instance_id();
1733
0
    TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
1734
1735
0
    std::shared_ptr<PlanFragmentExecutor> fragment_executor;
1736
0
    std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
1737
0
    QueryThreadContext query_thread_context;
1738
1739
0
    RuntimeFilterMgr* runtime_filter_mgr = nullptr;
1740
0
    if (is_pipeline) {
1741
0
        pip_context = _pipeline_map.find(tfragment_instance_id);
1742
0
        if (pip_context == nullptr) {
1743
0
            VLOG_CRITICAL << "unknown.... fragment-id:" << fragment_instance_id;
1744
0
            return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string());
1745
0
        }
1746
1747
0
        DCHECK(pip_context != nullptr);
1748
0
        runtime_filter_mgr = pip_context->get_query_ctx()->runtime_filter_mgr();
1749
0
        query_thread_context = {pip_context->get_query_ctx()->query_id(),
1750
0
                                pip_context->get_query_ctx()->query_mem_tracker};
1751
0
    } else {
1752
0
        fragment_executor = _fragment_instance_map.find(tfragment_instance_id);
1753
0
        if (fragment_executor == nullptr) {
1754
0
            VLOG_CRITICAL << "unknown.... fragment instance id:" << print_id(tfragment_instance_id);
1755
0
            return Status::InvalidArgument("fragment-id: {}", print_id(tfragment_instance_id));
1756
0
        }
1757
1758
0
        DCHECK(fragment_executor != nullptr);
1759
0
        runtime_filter_mgr =
1760
0
                fragment_executor->runtime_state()->get_query_ctx()->runtime_filter_mgr();
1761
0
        query_thread_context = {fragment_executor->get_query_ctx()->query_id(),
1762
0
                                fragment_executor->get_query_ctx()->query_mem_tracker};
1763
0
    }
1764
1765
0
    SCOPED_ATTACH_TASK(query_thread_context);
1766
0
    return runtime_filter_mgr->update_filter(request, attach_data);
1767
0
}
1768
1769
Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
1770
0
                                   butil::IOBufAsZeroCopyInputStream* attach_data) {
1771
0
    bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
1772
0
    int64_t start_apply = MonotonicMillis();
1773
1774
0
    std::shared_ptr<PlanFragmentExecutor> fragment_executor;
1775
0
    std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
1776
0
    QueryThreadContext query_thread_context;
1777
1778
0
    RuntimeFilterMgr* runtime_filter_mgr = nullptr;
1779
0
    ObjectPool* pool = nullptr;
1780
1781
0
    const auto& fragment_instance_ids = request->fragment_instance_ids();
1782
0
    {
1783
0
        for (UniqueId fragment_instance_id : fragment_instance_ids) {
1784
0
            TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
1785
1786
0
            if (is_pipeline) {
1787
0
                pip_context = _pipeline_map.find(tfragment_instance_id);
1788
0
                if (pip_context == nullptr) {
1789
0
                    continue;
1790
0
                }
1791
1792
0
                DCHECK(pip_context != nullptr);
1793
0
                runtime_filter_mgr = pip_context->get_query_ctx()->runtime_filter_mgr();
1794
0
                pool = &pip_context->get_query_ctx()->obj_pool;
1795
0
                query_thread_context = {pip_context->get_query_ctx()->query_id(),
1796
0
                                        pip_context->get_query_ctx()->query_mem_tracker,
1797
0
                                        pip_context->get_query_ctx()->workload_group()};
1798
0
            } else {
1799
0
                fragment_executor = _fragment_instance_map.find(tfragment_instance_id);
1800
0
                if (fragment_executor == nullptr) {
1801
0
                    continue;
1802
0
                }
1803
1804
0
                DCHECK(fragment_executor != nullptr);
1805
0
                runtime_filter_mgr = fragment_executor->get_query_ctx()->runtime_filter_mgr();
1806
0
                pool = &fragment_executor->get_query_ctx()->obj_pool;
1807
0
                query_thread_context = {fragment_executor->get_query_ctx()->query_id(),
1808
0
                                        fragment_executor->get_query_ctx()->query_mem_tracker};
1809
0
            }
1810
0
            break;
1811
0
        }
1812
0
    }
1813
1814
0
    if (runtime_filter_mgr == nullptr) {
1815
        // all instance finished
1816
0
        return Status::OK();
1817
0
    }
1818
1819
0
    SCOPED_ATTACH_TASK(query_thread_context);
1820
    // 1. get the target filters
1821
0
    std::vector<IRuntimeFilter*> filters;
1822
0
    RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(request->filter_id(), filters));
1823
1824
    // 2. create the filter wrapper to replace or ignore the target filters
1825
0
    if (!filters.empty()) {
1826
0
        UpdateRuntimeFilterParamsV2 params {request, attach_data, pool, filters[0]->column_type()};
1827
0
        RuntimePredicateWrapper* filter_wrapper = nullptr;
1828
0
        RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(&params, &filter_wrapper));
1829
1830
0
        std::ranges::for_each(filters, [&](auto& filter) {
1831
0
            filter->update_filter(filter_wrapper, request->merge_time(), start_apply);
1832
0
        });
1833
0
    }
1834
1835
0
    return Status::OK();
1836
0
}
1837
1838
0
Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {
1839
0
    UniqueId queryid = request->query_id();
1840
1841
0
    if (config::enable_debug_points &&
1842
0
        DebugPoints::instance()->is_enable("FragmentMgr::send_filter_size.return_eof")) {
1843
0
        return Status::EndOfFile("inject FragmentMgr::send_filter_size.return_eof");
1844
0
    }
1845
1846
0
    std::shared_ptr<QueryContext> query_ctx;
1847
0
    {
1848
0
        TUniqueId query_id;
1849
0
        query_id.__set_hi(queryid.hi);
1850
0
        query_id.__set_lo(queryid.lo);
1851
0
        query_ctx = _query_ctx_map.find(query_id);
1852
0
        if (query_ctx == nullptr) {
1853
0
            return Status::EndOfFile("Query context (query-id: {}) not found, maybe finished",
1854
0
                                     queryid.to_string());
1855
0
        }
1856
0
    }
1857
1858
0
    std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
1859
0
    RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller));
1860
0
    auto merge_status = filter_controller->send_filter_size(request);
1861
0
    return merge_status;
1862
0
}
1863
1864
0
Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest* request) {
1865
0
    UniqueId queryid = request->query_id();
1866
0
    std::shared_ptr<QueryContext> query_ctx;
1867
0
    {
1868
0
        TUniqueId query_id;
1869
0
        query_id.__set_hi(queryid.hi);
1870
0
        query_id.__set_lo(queryid.lo);
1871
0
        query_ctx = _query_ctx_map.find(query_id);
1872
0
        if (query_ctx == nullptr) {
1873
0
            return Status::InvalidArgument("query-id: {}", queryid.to_string());
1874
0
        }
1875
0
    }
1876
0
    return query_ctx->runtime_filter_mgr()->sync_filter_size(request);
1877
0
}
1878
1879
Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
1880
0
                                 butil::IOBufAsZeroCopyInputStream* attach_data) {
1881
0
    UniqueId queryid = request->query_id();
1882
0
    bool opt_remote_rf = request->has_opt_remote_rf() && request->opt_remote_rf();
1883
0
    std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
1884
0
    RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller));
1885
1886
0
    std::shared_ptr<QueryContext> query_ctx;
1887
0
    {
1888
0
        TUniqueId query_id;
1889
0
        query_id.__set_hi(queryid.hi);
1890
0
        query_id.__set_lo(queryid.lo);
1891
0
        query_ctx = _query_ctx_map.find(query_id);
1892
0
        if (query_ctx == nullptr) {
1893
0
            return Status::InvalidArgument("query-id: {}", queryid.to_string());
1894
0
        }
1895
0
    }
1896
0
    SCOPED_ATTACH_TASK(query_ctx.get());
1897
0
    auto merge_status = filter_controller->merge(request, attach_data, opt_remote_rf);
1898
0
    return merge_status;
1899
0
}
1900
1901
void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TExecPlanFragmentParams& params,
1902
0
                                                             QueryContext* query_ctx) {
1903
0
    if (!params.query_options.__isset.enable_share_hash_table_for_broadcast_join ||
1904
0
        !params.query_options.enable_share_hash_table_for_broadcast_join) {
1905
0
        return;
1906
0
    }
1907
1908
0
    if (!params.__isset.fragment || !params.fragment.__isset.plan ||
1909
0
        params.fragment.plan.nodes.empty()) {
1910
0
        return;
1911
0
    }
1912
0
    for (auto& node : params.fragment.plan.nodes) {
1913
0
        if (node.node_type != TPlanNodeType::HASH_JOIN_NODE ||
1914
0
            !node.hash_join_node.__isset.is_broadcast_join ||
1915
0
            !node.hash_join_node.is_broadcast_join) {
1916
0
            continue;
1917
0
        }
1918
1919
0
        if (params.build_hash_table_for_broadcast_join) {
1920
0
            query_ctx->get_shared_hash_table_controller()->set_builder_and_consumers(
1921
0
                    params.params.fragment_instance_id, node.node_id);
1922
0
        }
1923
0
    }
1924
0
}
1925
1926
void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(
1927
        const TPipelineFragmentParams& params, const TPipelineInstanceParams& local_params,
1928
0
        QueryContext* query_ctx) {
1929
0
    if (!params.query_options.__isset.enable_share_hash_table_for_broadcast_join ||
1930
0
        !params.query_options.enable_share_hash_table_for_broadcast_join) {
1931
0
        return;
1932
0
    }
1933
1934
0
    if (!params.__isset.fragment || !params.fragment.__isset.plan ||
1935
0
        params.fragment.plan.nodes.empty()) {
1936
0
        return;
1937
0
    }
1938
0
    for (auto& node : params.fragment.plan.nodes) {
1939
0
        if (node.node_type != TPlanNodeType::HASH_JOIN_NODE ||
1940
0
            !node.hash_join_node.__isset.is_broadcast_join ||
1941
0
            !node.hash_join_node.is_broadcast_join) {
1942
0
            continue;
1943
0
        }
1944
1945
0
        if (local_params.build_hash_table_for_broadcast_join) {
1946
0
            query_ctx->get_shared_hash_table_controller()->set_builder_and_consumers(
1947
0
                    local_params.fragment_instance_id, node.node_id);
1948
0
        }
1949
0
    }
1950
0
}
1951
1952
void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TPipelineFragmentParams& params,
1953
0
                                                             QueryContext* query_ctx) {
1954
0
    if (!params.query_options.__isset.enable_share_hash_table_for_broadcast_join ||
1955
0
        !params.query_options.enable_share_hash_table_for_broadcast_join) {
1956
0
        return;
1957
0
    }
1958
1959
0
    if (!params.__isset.fragment || !params.fragment.__isset.plan ||
1960
0
        params.fragment.plan.nodes.empty()) {
1961
0
        return;
1962
0
    }
1963
0
    for (auto& node : params.fragment.plan.nodes) {
1964
0
        if (node.node_type != TPlanNodeType::HASH_JOIN_NODE ||
1965
0
            !node.hash_join_node.__isset.is_broadcast_join ||
1966
0
            !node.hash_join_node.is_broadcast_join) {
1967
0
            continue;
1968
0
        }
1969
1970
0
        for (auto& local_param : params.local_params) {
1971
0
            if (local_param.build_hash_table_for_broadcast_join) {
1972
0
                query_ctx->get_shared_hash_table_controller()->set_builder_and_consumers(
1973
0
                        local_param.fragment_instance_id, node.node_id);
1974
0
            }
1975
0
        }
1976
0
    }
1977
0
}
1978
1979
0
void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* query_info_list) {
1980
0
    _query_ctx_map.apply(
1981
0
            [&](phmap::flat_hash_map<TUniqueId, std::shared_ptr<QueryContext>>& map) -> Status {
1982
0
                for (const auto& q : map) {
1983
0
                    WorkloadQueryInfo workload_query_info;
1984
0
                    workload_query_info.query_id = print_id(q.first);
1985
0
                    workload_query_info.tquery_id = q.first;
1986
0
                    workload_query_info.wg_id = q.second->workload_group() == nullptr
1987
0
                                                        ? -1
1988
0
                                                        : q.second->workload_group()->id();
1989
0
                    query_info_list->push_back(workload_query_info);
1990
0
                }
1991
0
                return Status::OK();
1992
0
            });
1993
0
}
1994
1995
} // namespace doris