Coverage Report

Created: 2024-11-21 21:13

/root/doris/be/src/runtime/fragment_mgr.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/fragment_mgr.h"
19
20
#include <brpc/controller.h>
21
#include <bvar/latency_recorder.h>
22
#include <exprs/runtime_filter.h>
23
#include <fmt/format.h>
24
#include <gen_cpp/DorisExternalService_types.h>
25
#include <gen_cpp/FrontendService.h>
26
#include <gen_cpp/FrontendService_types.h>
27
#include <gen_cpp/HeartbeatService_types.h>
28
#include <gen_cpp/Metrics_types.h>
29
#include <gen_cpp/PaloInternalService_types.h>
30
#include <gen_cpp/PlanNodes_types.h>
31
#include <gen_cpp/Planner_types.h>
32
#include <gen_cpp/QueryPlanExtra_types.h>
33
#include <gen_cpp/RuntimeProfile_types.h>
34
#include <gen_cpp/Types_types.h>
35
#include <gen_cpp/internal_service.pb.h>
36
#include <pthread.h>
37
#include <stddef.h>
38
#include <sys/time.h>
39
#include <thrift/TApplicationException.h>
40
#include <thrift/Thrift.h>
41
#include <thrift/protocol/TDebugProtocol.h>
42
#include <thrift/transport/TTransportException.h>
43
#include <time.h>
44
#include <unistd.h>
45
46
#include <algorithm>
47
#include <atomic>
48
49
#include "common/status.h"
50
// IWYU pragma: no_include <bits/chrono.h>
51
#include <chrono> // IWYU pragma: keep
52
#include <cstdint>
53
#include <map>
54
#include <memory>
55
#include <mutex>
56
#include <sstream>
57
#include <unordered_map>
58
#include <unordered_set>
59
#include <utility>
60
61
#include "cloud/config.h"
62
#include "common/config.h"
63
#include "common/logging.h"
64
#include "common/object_pool.h"
65
#include "common/utils.h"
66
#include "gutil/strings/substitute.h"
67
#include "io/fs/stream_load_pipe.h"
68
#include "pipeline/pipeline_fragment_context.h"
69
#include "runtime/client_cache.h"
70
#include "runtime/descriptors.h"
71
#include "runtime/exec_env.h"
72
#include "runtime/frontend_info.h"
73
#include "runtime/memory/mem_tracker_limiter.h"
74
#include "runtime/primitive_type.h"
75
#include "runtime/query_context.h"
76
#include "runtime/runtime_filter_mgr.h"
77
#include "runtime/runtime_query_statistics_mgr.h"
78
#include "runtime/runtime_state.h"
79
#include "runtime/stream_load/new_load_stream_mgr.h"
80
#include "runtime/stream_load/stream_load_context.h"
81
#include "runtime/stream_load/stream_load_executor.h"
82
#include "runtime/thread_context.h"
83
#include "runtime/types.h"
84
#include "runtime/workload_group/workload_group.h"
85
#include "runtime/workload_group/workload_group_manager.h"
86
#include "runtime/workload_management/workload_query_info.h"
87
#include "service/backend_options.h"
88
#include "util/brpc_client_cache.h"
89
#include "util/debug_points.h"
90
#include "util/debug_util.h"
91
#include "util/doris_metrics.h"
92
#include "util/hash_util.hpp"
93
#include "util/mem_info.h"
94
#include "util/network_util.h"
95
#include "util/pretty_printer.h"
96
#include "util/runtime_profile.h"
97
#include "util/thread.h"
98
#include "util/threadpool.h"
99
#include "util/thrift_util.h"
100
#include "util/uid_util.h"
101
#include "util/url_coding.h"
102
#include "vec/runtime/shared_hash_table_controller.h"
103
#include "vec/runtime/vdatetime_value.h"
104
105
namespace doris {
106
107
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_instance_count, MetricUnit::NOUNIT);
108
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count, MetricUnit::NOUNIT);
109
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_queue_size, MetricUnit::NOUNIT);
110
bvar::LatencyRecorder g_fragmentmgr_prepare_latency("doris_FragmentMgr", "prepare");
111
112
bvar::Adder<uint64_t> g_fragment_executing_count("fragment_executing_count");
113
bvar::Status<uint64_t> g_fragment_last_active_time(
114
        "fragment_last_active_time", duration_cast<std::chrono::milliseconds>(
115
                                             std::chrono::system_clock::now().time_since_epoch())
116
                                             .count());
117
118
0
uint64_t get_fragment_executing_count() {
119
0
    return g_fragment_executing_count.get_value();
120
0
}
121
0
uint64_t get_fragment_last_active_time() {
122
0
    return g_fragment_last_active_time.get_value();
123
0
}
124
125
0
std::string to_load_error_http_path(const std::string& file_name) {
126
0
    if (file_name.empty()) {
127
0
        return "";
128
0
    }
129
0
    if (file_name.compare(0, 4, "http") == 0) {
130
0
        return file_name;
131
0
    }
132
0
    std::stringstream url;
133
0
    url << "http://" << get_host_port(BackendOptions::get_localhost(), config::webserver_port)
134
0
        << "/api/_load_error_log?"
135
0
        << "file=" << file_name;
136
0
    return url.str();
137
0
}
138
139
using apache::thrift::TException;
140
using apache::thrift::transport::TTransportException;
141
142
static Status _do_fetch_running_queries_rpc(const FrontendInfo& fe_info,
143
0
                                            std::unordered_set<TUniqueId>& query_set) {
144
0
    TFetchRunningQueriesResult rpc_result;
145
0
    TFetchRunningQueriesRequest rpc_request;
146
147
0
    Status client_status;
148
0
    const int32 timeout_ms = 3 * 1000;
149
0
    FrontendServiceConnection rpc_client(ExecEnv::GetInstance()->frontend_client_cache(),
150
0
                                         fe_info.info.coordinator_address, timeout_ms,
151
0
                                         &client_status);
152
    // Abort this fe.
153
0
    if (!client_status.ok()) {
154
0
        LOG_WARNING("Failed to get client for {}, reason is {}",
155
0
                    PrintThriftNetworkAddress(fe_info.info.coordinator_address),
156
0
                    client_status.to_string());
157
0
        return Status::InternalError("Failed to get client for {}, reason is {}",
158
0
                                     PrintThriftNetworkAddress(fe_info.info.coordinator_address),
159
0
                                     client_status.to_string());
160
0
    }
161
162
    // do rpc
163
0
    try {
164
0
        try {
165
0
            rpc_client->fetchRunningQueries(rpc_result, rpc_request);
166
0
        } catch (const apache::thrift::transport::TTransportException& e) {
167
0
            LOG_WARNING("Transport exception reason: {}, reopening", e.what());
168
0
            client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms);
169
0
            if (!client_status.ok()) {
170
0
                LOG_WARNING("Reopen failed, reason: {}", client_status.to_string_no_stack());
171
0
                return Status::InternalError("Reopen failed, reason: {}",
172
0
                                             client_status.to_string_no_stack());
173
0
            }
174
175
0
            rpc_client->fetchRunningQueries(rpc_result, rpc_request);
176
0
        }
177
0
    } catch (apache::thrift::TException& e) {
178
        // During upgrading cluster or meet any other network error.
179
0
        LOG_WARNING("Failed to fetch running queries from {}, reason: {}",
180
0
                    PrintThriftNetworkAddress(fe_info.info.coordinator_address), e.what());
181
0
        return Status::InternalError("Failed to fetch running queries from {}, reason: {}",
182
0
                                     PrintThriftNetworkAddress(fe_info.info.coordinator_address),
183
0
                                     e.what());
184
0
    }
185
186
    // Avoid logic error in frontend.
187
0
    if (rpc_result.__isset.status == false || rpc_result.status.status_code != TStatusCode::OK) {
188
0
        LOG_WARNING("Failed to fetch running queries from {}, reason: {}",
189
0
                    PrintThriftNetworkAddress(fe_info.info.coordinator_address),
190
0
                    doris::to_string(rpc_result.status.status_code));
191
0
        return Status::InternalError("Failed to fetch running queries from {}, reason: {}",
192
0
                                     PrintThriftNetworkAddress(fe_info.info.coordinator_address),
193
0
                                     doris::to_string(rpc_result.status.status_code));
194
0
    }
195
196
0
    if (rpc_result.__isset.running_queries == false) {
197
0
        return Status::InternalError("Failed to fetch running queries from {}, reason: {}",
198
0
                                     PrintThriftNetworkAddress(fe_info.info.coordinator_address),
199
0
                                     "running_queries is not set");
200
0
    }
201
202
0
    query_set = std::unordered_set<TUniqueId>(rpc_result.running_queries.begin(),
203
0
                                              rpc_result.running_queries.end());
204
0
    return Status::OK();
205
0
};
206
207
0
static std::map<int64_t, std::unordered_set<TUniqueId>> _get_all_running_queries_from_fe() {
208
0
    const std::map<TNetworkAddress, FrontendInfo>& running_fes =
209
0
            ExecEnv::GetInstance()->get_running_frontends();
210
211
0
    std::map<int64_t, std::unordered_set<TUniqueId>> result;
212
0
    std::vector<FrontendInfo> qualified_fes;
213
214
0
    for (const auto& fe : running_fes) {
215
        // Only consider normal frontend.
216
0
        if (fe.first.port != 0 && fe.second.info.process_uuid != 0) {
217
0
            qualified_fes.push_back(fe.second);
218
0
        } else {
219
0
            return {};
220
0
        }
221
0
    }
222
223
0
    for (const auto& fe_addr : qualified_fes) {
224
0
        const int64_t process_uuid = fe_addr.info.process_uuid;
225
0
        std::unordered_set<TUniqueId> query_set;
226
0
        Status st = _do_fetch_running_queries_rpc(fe_addr, query_set);
227
0
        if (!st.ok()) {
228
            // Empty result, cancel worker will not do anything
229
0
            return {};
230
0
        }
231
232
        // frontend_info and process_uuid has been checked in rpc threads.
233
0
        result[process_uuid] = query_set;
234
0
    }
235
236
0
    return result;
237
0
}
238
239
FragmentMgr::FragmentMgr(ExecEnv* exec_env)
240
4
        : _exec_env(exec_env), _stop_background_threads_latch(1) {
241
4
    _entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr");
242
4
    INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count);
243
244
4
    auto s = Thread::create(
245
4
            "FragmentMgr", "cancel_timeout_plan_fragment", [this]() { this->cancel_worker(); },
246
4
            &_cancel_thread);
247
4
    CHECK(s.ok()) << s.to_string();
248
249
4
    s = ThreadPoolBuilder("FragmentMgrAsyncWorkThreadPool")
250
4
                .set_min_threads(config::fragment_mgr_asynic_work_pool_thread_num_min)
251
4
                .set_max_threads(config::fragment_mgr_asynic_work_pool_thread_num_max)
252
4
                .set_max_queue_size(config::fragment_mgr_asynic_work_pool_queue_size)
253
4
                .build(&_thread_pool);
254
255
4
    REGISTER_HOOK_METRIC(fragment_thread_pool_queue_size,
256
4
                         [this]() { return _thread_pool->get_queue_size(); });
257
4
    CHECK(s.ok()) << s.to_string();
258
4
}
259
260
4
FragmentMgr::~FragmentMgr() = default;
261
262
4
void FragmentMgr::stop() {
263
4
    DEREGISTER_HOOK_METRIC(fragment_instance_count);
264
4
    DEREGISTER_HOOK_METRIC(fragment_thread_pool_queue_size);
265
4
    _stop_background_threads_latch.count_down();
266
4
    if (_cancel_thread) {
267
4
        _cancel_thread->join();
268
4
    }
269
270
    // Only me can delete
271
4
    {
272
4
        std::lock_guard<std::mutex> lock(_lock);
273
4
        _query_ctx_map.clear();
274
4
        _pipeline_map.clear();
275
4
    }
276
4
    _thread_pool->shutdown();
277
4
}
278
279
0
std::string FragmentMgr::to_http_path(const std::string& file_name) {
280
0
    std::stringstream url;
281
0
    url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
282
0
        << "/api/_download_load?"
283
0
        << "token=" << _exec_env->token() << "&file=" << file_name;
284
0
    return url.str();
285
0
}
286
287
Status FragmentMgr::trigger_pipeline_context_report(
288
0
        const ReportStatusRequest req, std::shared_ptr<pipeline::PipelineFragmentContext>&& ctx) {
289
0
    return _thread_pool->submit_func([this, req, ctx]() {
290
0
        SCOPED_ATTACH_TASK(ctx->get_query_ctx()->query_mem_tracker);
291
0
        coordinator_callback(req);
292
0
        if (!req.done) {
293
0
            ctx->refresh_next_report_time();
294
0
        }
295
0
    });
296
0
}
297
298
// There can only be one of these callbacks in-flight at any moment, because
299
// it is only invoked from the executor's reporting thread.
300
// Also, the reported status will always reflect the most recent execution status,
301
// including the final status when execution finishes.
302
0
void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
303
0
    DCHECK(req.status.ok() || req.done); // if !status.ok() => done
304
0
    if (req.coord_addr.hostname == "external") {
305
        // External query (flink/spark read tablets) not need to report to FE.
306
0
        return;
307
0
    }
308
0
    Status exec_status = req.status;
309
0
    Status coord_status;
310
0
    FrontendServiceConnection coord(_exec_env->frontend_client_cache(), req.coord_addr,
311
0
                                    &coord_status);
312
0
    if (!coord_status.ok()) {
313
0
        std::stringstream ss;
314
0
        UniqueId uid(req.query_id.hi, req.query_id.lo);
315
0
        static_cast<void>(req.cancel_fn(Status::InternalError(
316
0
                "query_id: {}, couldn't get a client for {}, reason is {}", uid.to_string(),
317
0
                PrintThriftNetworkAddress(req.coord_addr), coord_status.to_string())));
318
0
        return;
319
0
    }
320
321
0
    TReportExecStatusParams params;
322
0
    params.protocol_version = FrontendServiceVersion::V1;
323
0
    params.__set_query_id(req.query_id);
324
0
    params.__set_backend_num(req.backend_num);
325
0
    params.__set_fragment_instance_id(req.fragment_instance_id);
326
0
    params.__set_fragment_id(req.fragment_id);
327
0
    params.__set_status(exec_status.to_thrift());
328
0
    params.__set_done(req.done);
329
0
    params.__set_query_type(req.runtime_state->query_type());
330
0
    params.__isset.profile = false;
331
332
0
    DCHECK(req.runtime_state != nullptr);
333
334
0
    if (req.runtime_state->query_type() == TQueryType::LOAD) {
335
0
        params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
336
0
        params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
337
0
    } else {
338
0
        DCHECK(!req.runtime_states.empty());
339
0
        if (!req.runtime_state->output_files().empty()) {
340
0
            params.__isset.delta_urls = true;
341
0
            for (auto& it : req.runtime_state->output_files()) {
342
0
                params.delta_urls.push_back(to_http_path(it));
343
0
            }
344
0
        }
345
0
        if (!params.delta_urls.empty()) {
346
0
            params.__isset.delta_urls = true;
347
0
        }
348
0
    }
349
350
    // load rows
351
0
    static std::string s_dpp_normal_all = "dpp.norm.ALL";
352
0
    static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
353
0
    static std::string s_unselected_rows = "unselected.rows";
354
0
    int64_t num_rows_load_success = 0;
355
0
    int64_t num_rows_load_filtered = 0;
356
0
    int64_t num_rows_load_unselected = 0;
357
0
    if (req.runtime_state->num_rows_load_total() > 0 ||
358
0
        req.runtime_state->num_rows_load_filtered() > 0 ||
359
0
        req.runtime_state->num_finished_range() > 0) {
360
0
        params.__isset.load_counters = true;
361
362
0
        num_rows_load_success = req.runtime_state->num_rows_load_success();
363
0
        num_rows_load_filtered = req.runtime_state->num_rows_load_filtered();
364
0
        num_rows_load_unselected = req.runtime_state->num_rows_load_unselected();
365
0
        params.__isset.fragment_instance_reports = true;
366
0
        TFragmentInstanceReport t;
367
0
        t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
368
0
        t.__set_num_finished_range(req.runtime_state->num_finished_range());
369
0
        t.__set_loaded_rows(req.runtime_state->num_rows_load_total());
370
0
        t.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
371
0
        params.fragment_instance_reports.push_back(t);
372
0
    } else if (!req.runtime_states.empty()) {
373
0
        for (auto* rs : req.runtime_states) {
374
0
            if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 ||
375
0
                req.runtime_state->num_finished_range() > 0) {
376
0
                params.__isset.load_counters = true;
377
0
                num_rows_load_success += rs->num_rows_load_success();
378
0
                num_rows_load_filtered += rs->num_rows_load_filtered();
379
0
                num_rows_load_unselected += rs->num_rows_load_unselected();
380
0
                params.__isset.fragment_instance_reports = true;
381
0
                TFragmentInstanceReport t;
382
0
                t.__set_fragment_instance_id(rs->fragment_instance_id());
383
0
                t.__set_num_finished_range(rs->num_finished_range());
384
0
                t.__set_loaded_rows(rs->num_rows_load_total());
385
0
                t.__set_loaded_bytes(rs->num_bytes_load_total());
386
0
                params.fragment_instance_reports.push_back(t);
387
0
            }
388
0
        }
389
0
    }
390
0
    params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success));
391
0
    params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered));
392
0
    params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected));
393
394
0
    if (!req.runtime_state->get_error_log_file_path().empty()) {
395
0
        params.__set_tracking_url(
396
0
                to_load_error_http_path(req.runtime_state->get_error_log_file_path()));
397
0
    } else if (!req.runtime_states.empty()) {
398
0
        for (auto* rs : req.runtime_states) {
399
0
            if (!rs->get_error_log_file_path().empty()) {
400
0
                params.__set_tracking_url(to_load_error_http_path(rs->get_error_log_file_path()));
401
0
            }
402
0
            if (rs->wal_id() > 0) {
403
0
                params.__set_txn_id(rs->wal_id());
404
0
                params.__set_label(rs->import_label());
405
0
            }
406
0
        }
407
0
    }
408
0
    if (!req.runtime_state->export_output_files().empty()) {
409
0
        params.__isset.export_files = true;
410
0
        params.export_files = req.runtime_state->export_output_files();
411
0
    } else if (!req.runtime_states.empty()) {
412
0
        for (auto* rs : req.runtime_states) {
413
0
            if (!rs->export_output_files().empty()) {
414
0
                params.__isset.export_files = true;
415
0
                params.export_files.insert(params.export_files.end(),
416
0
                                           rs->export_output_files().begin(),
417
0
                                           rs->export_output_files().end());
418
0
            }
419
0
        }
420
0
    }
421
0
    if (!req.runtime_state->tablet_commit_infos().empty()) {
422
0
        params.__isset.commitInfos = true;
423
0
        params.commitInfos.reserve(req.runtime_state->tablet_commit_infos().size());
424
0
        for (auto& info : req.runtime_state->tablet_commit_infos()) {
425
0
            params.commitInfos.push_back(info);
426
0
        }
427
0
    } else if (!req.runtime_states.empty()) {
428
0
        for (auto* rs : req.runtime_states) {
429
0
            if (!rs->tablet_commit_infos().empty()) {
430
0
                params.__isset.commitInfos = true;
431
0
                params.commitInfos.insert(params.commitInfos.end(),
432
0
                                          rs->tablet_commit_infos().begin(),
433
0
                                          rs->tablet_commit_infos().end());
434
0
            }
435
0
        }
436
0
    }
437
0
    if (!req.runtime_state->error_tablet_infos().empty()) {
438
0
        params.__isset.errorTabletInfos = true;
439
0
        params.errorTabletInfos.reserve(req.runtime_state->error_tablet_infos().size());
440
0
        for (auto& info : req.runtime_state->error_tablet_infos()) {
441
0
            params.errorTabletInfos.push_back(info);
442
0
        }
443
0
    } else if (!req.runtime_states.empty()) {
444
0
        for (auto* rs : req.runtime_states) {
445
0
            if (!rs->error_tablet_infos().empty()) {
446
0
                params.__isset.errorTabletInfos = true;
447
0
                params.errorTabletInfos.insert(params.errorTabletInfos.end(),
448
0
                                               rs->error_tablet_infos().begin(),
449
0
                                               rs->error_tablet_infos().end());
450
0
            }
451
0
        }
452
0
    }
453
454
0
    if (!req.runtime_state->hive_partition_updates().empty()) {
455
0
        params.__isset.hive_partition_updates = true;
456
0
        params.hive_partition_updates.reserve(req.runtime_state->hive_partition_updates().size());
457
0
        for (auto& hive_partition_update : req.runtime_state->hive_partition_updates()) {
458
0
            params.hive_partition_updates.push_back(hive_partition_update);
459
0
        }
460
0
    } else if (!req.runtime_states.empty()) {
461
0
        for (auto* rs : req.runtime_states) {
462
0
            if (!rs->hive_partition_updates().empty()) {
463
0
                params.__isset.hive_partition_updates = true;
464
0
                params.hive_partition_updates.insert(params.hive_partition_updates.end(),
465
0
                                                     rs->hive_partition_updates().begin(),
466
0
                                                     rs->hive_partition_updates().end());
467
0
            }
468
0
        }
469
0
    }
470
471
0
    if (!req.runtime_state->iceberg_commit_datas().empty()) {
472
0
        params.__isset.iceberg_commit_datas = true;
473
0
        params.iceberg_commit_datas.reserve(req.runtime_state->iceberg_commit_datas().size());
474
0
        for (auto& iceberg_commit_data : req.runtime_state->iceberg_commit_datas()) {
475
0
            params.iceberg_commit_datas.push_back(iceberg_commit_data);
476
0
        }
477
0
    } else if (!req.runtime_states.empty()) {
478
0
        for (auto* rs : req.runtime_states) {
479
0
            if (!rs->iceberg_commit_datas().empty()) {
480
0
                params.__isset.iceberg_commit_datas = true;
481
0
                params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
482
0
                                                   rs->iceberg_commit_datas().begin(),
483
0
                                                   rs->iceberg_commit_datas().end());
484
0
            }
485
0
        }
486
0
    }
487
488
    // Send new errors to coordinator
489
0
    req.runtime_state->get_unreported_errors(&(params.error_log));
490
0
    params.__isset.error_log = (!params.error_log.empty());
491
492
0
    if (_exec_env->cluster_info()->backend_id != 0) {
493
0
        params.__set_backend_id(_exec_env->cluster_info()->backend_id);
494
0
    }
495
496
0
    TReportExecStatusResult res;
497
0
    Status rpc_status;
498
499
0
    VLOG_DEBUG << "reportExecStatus params is "
500
0
               << apache::thrift::ThriftDebugString(params).c_str();
501
0
    if (!exec_status.ok()) {
502
0
        LOG(WARNING) << "report error status: " << exec_status.msg()
503
0
                     << " to coordinator: " << req.coord_addr
504
0
                     << ", query id: " << print_id(req.query_id);
505
0
    }
506
0
    try {
507
0
        try {
508
0
            coord->reportExecStatus(res, params);
509
0
        } catch (TTransportException& e) {
510
0
            LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(req.query_id)
511
0
                         << ", instance id: " << print_id(req.fragment_instance_id) << " to "
512
0
                         << req.coord_addr << ", err: " << e.what();
513
0
            rpc_status = coord.reopen();
514
515
0
            if (!rpc_status.ok()) {
516
                // we need to cancel the execution of this fragment
517
0
                req.cancel_fn(rpc_status);
518
0
                return;
519
0
            }
520
0
            coord->reportExecStatus(res, params);
521
0
        }
522
523
0
        rpc_status = Status::create<false>(res.status);
524
0
    } catch (TException& e) {
525
0
        rpc_status = Status::InternalError("ReportExecStatus() to {} failed: {}",
526
0
                                           PrintThriftNetworkAddress(req.coord_addr), e.what());
527
0
    }
528
529
0
    if (!rpc_status.ok()) {
530
0
        LOG_INFO("Going to cancel query {} since report exec status got rpc failed: {}",
531
0
                 print_id(req.query_id), rpc_status.to_string());
532
        // we need to cancel the execution of this fragment
533
0
        req.cancel_fn(rpc_status);
534
0
    }
535
0
}
536
537
0
static void empty_function(RuntimeState*, Status*) {}
538
539
Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
540
0
                                       const QuerySource query_source) {
541
0
    return Status::InternalError("Non-pipeline is disabled!");
542
0
}
543
544
Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
545
0
                                       const QuerySource query_source) {
546
0
    if (params.txn_conf.need_txn) {
547
0
        std::shared_ptr<StreamLoadContext> stream_load_ctx =
548
0
                std::make_shared<StreamLoadContext>(_exec_env);
549
0
        stream_load_ctx->db = params.txn_conf.db;
550
0
        stream_load_ctx->db_id = params.txn_conf.db_id;
551
0
        stream_load_ctx->table = params.txn_conf.tbl;
552
0
        stream_load_ctx->txn_id = params.txn_conf.txn_id;
553
0
        stream_load_ctx->id = UniqueId(params.query_id);
554
0
        stream_load_ctx->put_result.__set_pipeline_params(params);
555
0
        stream_load_ctx->use_streaming = true;
556
0
        stream_load_ctx->load_type = TLoadType::MANUL_LOAD;
557
0
        stream_load_ctx->load_src_type = TLoadSourceType::RAW;
558
0
        stream_load_ctx->label = params.import_label;
559
0
        stream_load_ctx->format = TFileFormatType::FORMAT_CSV_PLAIN;
560
0
        stream_load_ctx->timeout_second = 3600;
561
0
        stream_load_ctx->auth.token = params.txn_conf.token;
562
0
        stream_load_ctx->need_commit_self = true;
563
0
        stream_load_ctx->need_rollback = true;
564
0
        auto pipe = std::make_shared<io::StreamLoadPipe>(
565
0
                io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
566
0
                -1 /* total_length */, true /* use_proto */);
567
0
        stream_load_ctx->body_sink = pipe;
568
0
        stream_load_ctx->pipe = pipe;
569
0
        stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio;
570
571
0
        RETURN_IF_ERROR(
572
0
                _exec_env->new_load_stream_mgr()->put(stream_load_ctx->id, stream_load_ctx));
573
574
0
        RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx));
575
0
        return Status::OK();
576
0
    } else {
577
0
        return exec_plan_fragment(params, query_source, empty_function);
578
0
    }
579
0
}
580
581
// Stage 2. prepare finished. then get FE instruction to execute
582
0
Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* request) {
583
0
    TUniqueId query_id;
584
0
    query_id.__set_hi(request->query_id().hi());
585
0
    query_id.__set_lo(request->query_id().lo());
586
0
    std::shared_ptr<QueryContext> q_ctx = nullptr;
587
0
    {
588
0
        std::lock_guard<std::mutex> lock(_lock);
589
0
        q_ctx = _get_or_erase_query_ctx(query_id);
590
0
    }
591
0
    if (q_ctx) {
592
0
        q_ctx->set_ready_to_execute(Status::OK());
593
0
        LOG_INFO("Query {} start execution", print_id(query_id));
594
0
    } else {
595
0
        return Status::InternalError(
596
0
                "Failed to get query fragments context. Query may be "
597
0
                "timeout or be cancelled. host: {}",
598
0
                BackendOptions::get_localhost());
599
0
    }
600
0
    return Status::OK();
601
0
}
602
603
void FragmentMgr::remove_pipeline_context(
604
0
        std::shared_ptr<pipeline::PipelineFragmentContext> f_context) {
605
0
    {
606
0
        std::lock_guard<std::mutex> lock(_lock);
607
0
        auto query_id = f_context->get_query_id();
608
0
        int64 now = duration_cast<std::chrono::milliseconds>(
609
0
                            std::chrono::system_clock::now().time_since_epoch())
610
0
                            .count();
611
0
        g_fragment_executing_count << -1;
612
0
        g_fragment_last_active_time.set_value(now);
613
0
        _pipeline_map.erase({query_id, f_context->get_fragment_id()});
614
0
    }
615
0
}
616
617
1
std::shared_ptr<QueryContext> FragmentMgr::_get_or_erase_query_ctx(const TUniqueId& query_id) {
618
1
    auto search = _query_ctx_map.find(query_id);
619
1
    if (search != _query_ctx_map.end()) {
620
0
        if (auto q_ctx = search->second.lock()) {
621
0
            return q_ctx;
622
0
        } else {
623
0
            LOG(WARNING) << "Query context (query id = " << print_id(query_id)
624
0
                         << ") has been released.";
625
0
            _query_ctx_map.erase(search);
626
0
            return nullptr;
627
0
        }
628
0
    }
629
1
    return nullptr;
630
1
}
631
632
std::shared_ptr<QueryContext> FragmentMgr::get_or_erase_query_ctx_with_lock(
633
0
        const TUniqueId& query_id) {
634
0
    std::unique_lock<std::mutex> lock(_lock);
635
0
    return _get_or_erase_query_ctx(query_id);
636
0
}
637
638
template <typename Params>
639
Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline,
640
                                   QuerySource query_source,
641
0
                                   std::shared_ptr<QueryContext>& query_ctx) {
642
0
    DBUG_EXECUTE_IF("FragmentMgr._get_query_ctx.failed", {
643
0
        return Status::InternalError("FragmentMgr._get_query_ctx.failed, query id {}",
644
0
                                     print_id(query_id));
645
0
    });
646
0
    if (params.is_simplified_param) {
647
        // Get common components from _query_ctx_map
648
0
        std::lock_guard<std::mutex> lock(_lock);
649
0
        if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
650
0
            query_ctx = q_ctx;
651
0
        } else {
652
0
            return Status::InternalError(
653
0
                    "Failed to get query fragments context. Query {} may be timeout or be "
654
0
                    "cancelled. host: {}",
655
0
                    print_id(query_id), BackendOptions::get_localhost());
656
0
        }
657
0
    } else {
658
        // Find _query_ctx_map, in case some other request has already
659
        // create the query fragments context.
660
0
        std::lock_guard<std::mutex> lock(_lock);
661
0
        if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
662
0
            query_ctx = q_ctx;
663
0
            return Status::OK();
664
0
        }
665
666
        // First time a fragment of a query arrived. print logs.
667
0
        LOG(INFO) << "query_id: " << print_id(query_id) << ", coord_addr: " << params.coord
668
0
                  << ", total fragment num on current host: " << params.fragment_num_on_host
669
0
                  << ", fe process uuid: " << params.query_options.fe_process_uuid
670
0
                  << ", query type: " << params.query_options.query_type
671
0
                  << ", report audit fe:" << params.current_connect_fe;
672
673
        // This may be a first fragment request of the query.
674
        // Create the query fragments context.
675
0
        query_ctx = QueryContext::create_shared(query_id, _exec_env, params.query_options,
676
0
                                                params.coord, params.is_nereids,
677
0
                                                params.current_connect_fe, query_source);
678
0
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker);
679
0
        RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl,
680
0
                                              &(query_ctx->desc_tbl)));
681
        // set file scan range params
682
0
        if (params.__isset.file_scan_params) {
683
0
            query_ctx->file_scan_range_params_map = params.file_scan_params;
684
0
        }
685
686
0
        query_ctx->query_globals = params.query_globals;
687
688
0
        if (params.__isset.resource_info) {
689
0
            query_ctx->user = params.resource_info.user;
690
0
            query_ctx->group = params.resource_info.group;
691
0
            query_ctx->set_rsc_info = true;
692
0
        }
693
694
0
        _set_scan_concurrency(params, query_ctx.get());
695
696
0
        if (params.__isset.workload_groups && !params.workload_groups.empty()) {
697
0
            uint64_t tg_id = params.workload_groups[0].id;
698
0
            WorkloadGroupPtr workload_group_ptr =
699
0
                    _exec_env->workload_group_mgr()->get_task_group_by_id(tg_id);
700
0
            if (workload_group_ptr != nullptr) {
701
0
                RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx));
702
0
                RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr));
703
0
                _exec_env->runtime_query_statistics_mgr()->set_workload_group_id(print_id(query_id),
704
0
                                                                                 tg_id);
705
0
            } else {
706
0
                LOG(WARNING) << "Query/load id: " << print_id(query_ctx->query_id())
707
0
                             << "can't find its workload group " << tg_id;
708
0
            }
709
0
        }
710
        // There is some logic in query ctx's dctor, we could not check if exists and delete the
711
        // temp query ctx now. For example, the query id maybe removed from workload group's queryset.
712
0
        _query_ctx_map.insert(std::make_pair(query_ctx->query_id(), query_ctx));
713
0
    }
714
0
    return Status::OK();
715
0
}
716
717
Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
718
0
                                       QuerySource query_source, const FinishCallback& cb) {
719
0
    return Status::InternalError("Non-pipeline is disabled!");
720
0
}
721
722
0
std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) {
723
0
    fmt::memory_buffer debug_string_buffer;
724
0
    size_t i = 0;
725
0
    {
726
0
        std::lock_guard<std::mutex> lock(_lock);
727
0
        fmt::format_to(debug_string_buffer,
728
0
                       "{} pipeline fragment contexts are still running! duration_limit={}\n",
729
0
                       _pipeline_map.size(), duration);
730
731
0
        timespec now;
732
0
        clock_gettime(CLOCK_MONOTONIC, &now);
733
0
        for (auto& it : _pipeline_map) {
734
0
            auto elapsed = it.second->elapsed_time() / 1000000000.0;
735
0
            if (elapsed < duration) {
736
                // Only display tasks which has been running for more than {duration} seconds.
737
0
                continue;
738
0
            }
739
0
            auto timeout_second = it.second->timeout_second();
740
0
            fmt::format_to(
741
0
                    debug_string_buffer,
742
0
                    "No.{} (elapse_second={}s, query_timeout_second={}s, is_timeout={}) : {}\n", i,
743
0
                    elapsed, timeout_second, it.second->is_timeout(now), it.second->debug_string());
744
0
            i++;
745
0
        }
746
0
    }
747
0
    return fmt::to_string(debug_string_buffer);
748
0
}
749
750
0
std::string FragmentMgr::dump_pipeline_tasks(TUniqueId& query_id) {
751
0
    if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
752
0
        return q_ctx->print_all_pipeline_context();
753
0
    } else {
754
0
        return fmt::format("Query context (query id = {}) not found. \n", print_id(query_id));
755
0
    }
756
0
}
757
758
Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
759
0
                                       QuerySource query_source, const FinishCallback& cb) {
760
0
    VLOG_ROW << "query: " << print_id(params.query_id) << " exec_plan_fragment params is "
761
0
             << apache::thrift::ThriftDebugString(params).c_str();
762
    // sometimes TExecPlanFragmentParams debug string is too long and glog
763
    // will truncate the log line, so print query options seperately for debuggin purpose
764
0
    VLOG_ROW << "query: " << print_id(params.query_id) << "query options is "
765
0
             << apache::thrift::ThriftDebugString(params.query_options).c_str();
766
767
0
    std::shared_ptr<QueryContext> query_ctx;
768
0
    RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, query_source, query_ctx));
769
0
    SCOPED_ATTACH_TASK(query_ctx.get());
770
0
    int64_t duration_ns = 0;
771
0
    std::shared_ptr<pipeline::PipelineFragmentContext> context =
772
0
            std::make_shared<pipeline::PipelineFragmentContext>(
773
0
                    query_ctx->query_id(), params.fragment_id, query_ctx, _exec_env, cb,
774
0
                    std::bind<Status>(std::mem_fn(&FragmentMgr::trigger_pipeline_context_report),
775
0
                                      this, std::placeholders::_1, std::placeholders::_2));
776
0
    {
777
0
        SCOPED_RAW_TIMER(&duration_ns);
778
0
        Status prepare_st = Status::OK();
779
0
        ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st = context->prepare(params, _thread_pool.get()),
780
0
                                         prepare_st);
781
0
        if (!prepare_st.ok()) {
782
0
            query_ctx->cancel(prepare_st, params.fragment_id);
783
0
            query_ctx->set_execution_dependency_ready();
784
0
            return prepare_st;
785
0
        }
786
0
    }
787
0
    g_fragmentmgr_prepare_latency << (duration_ns / 1000);
788
789
0
    DBUG_EXECUTE_IF("FragmentMgr.exec_plan_fragment.failed",
790
0
                    { return Status::Aborted("FragmentMgr.exec_plan_fragment.failed"); });
791
792
0
    std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
793
0
    RETURN_IF_ERROR(_runtimefilter_controller.add_entity(
794
0
            params.local_params[0], params.query_id, params.query_options, &handler,
795
0
            RuntimeFilterParamsContext::create(context->get_runtime_state())));
796
0
    if (handler) {
797
0
        query_ctx->set_merge_controller_handler(handler);
798
0
    }
799
800
0
    {
801
        // (query_id, fragment_id) is executed only on one BE, locks _pipeline_map.
802
0
        std::lock_guard<std::mutex> lock(_lock);
803
0
        for (const auto& local_param : params.local_params) {
804
0
            const TUniqueId& fragment_instance_id = local_param.fragment_instance_id;
805
0
            auto iter = _pipeline_map.find({params.query_id, params.fragment_id});
806
0
            if (iter != _pipeline_map.end()) {
807
0
                return Status::InternalError(
808
0
                        "exec_plan_fragment query_id({}) input duplicated fragment_id({})",
809
0
                        print_id(params.query_id), params.fragment_id);
810
0
            }
811
0
            query_ctx->fragment_instance_ids.push_back(fragment_instance_id);
812
0
        }
813
814
0
        int64 now = duration_cast<std::chrono::milliseconds>(
815
0
                            std::chrono::system_clock::now().time_since_epoch())
816
0
                            .count();
817
0
        g_fragment_executing_count << 1;
818
0
        g_fragment_last_active_time.set_value(now);
819
        // TODO: simplify this mapping
820
0
        _pipeline_map.insert({{params.query_id, params.fragment_id}, context});
821
0
    }
822
823
0
    if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) {
824
0
        query_ctx->set_ready_to_execute_only();
825
0
    }
826
827
0
    query_ctx->set_pipeline_context(params.fragment_id, context);
828
829
0
    RETURN_IF_ERROR(context->submit());
830
0
    return Status::OK();
831
0
}
832
833
template <typename Param>
834
0
void FragmentMgr::_set_scan_concurrency(const Param& params, QueryContext* query_ctx) {
835
#ifndef BE_TEST
836
    // If the token is set, the scan task will use limited_scan_pool in scanner scheduler.
837
    // Otherwise, the scan task will use local/remote scan pool in scanner scheduler
838
    if (params.query_options.__isset.resource_limit &&
839
        params.query_options.resource_limit.__isset.cpu_limit) {
840
        query_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit, false);
841
    }
842
#endif
843
0
}
844
845
1
void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) {
846
1
    std::shared_ptr<QueryContext> query_ctx = nullptr;
847
1
    std::vector<TUniqueId> all_instance_ids;
848
1
    {
849
1
        std::lock_guard<std::mutex> state_lock(_lock);
850
1
        if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
851
0
            query_ctx = q_ctx;
852
            // Copy instanceids to avoid concurrent modification.
853
            // And to reduce the scope of lock.
854
0
            all_instance_ids = query_ctx->fragment_instance_ids;
855
1
        } else {
856
1
            LOG(WARNING) << "Query " << print_id(query_id)
857
1
                         << " does not exists, failed to cancel it";
858
1
            return;
859
1
        }
860
1
    }
861
0
    query_ctx->cancel(reason);
862
0
    {
863
0
        std::lock_guard<std::mutex> state_lock(_lock);
864
0
        _query_ctx_map.erase(query_id);
865
0
    }
866
0
    LOG(INFO) << "Query " << print_id(query_id)
867
0
              << " is cancelled and removed. Reason: " << reason.to_string();
868
0
}
869
870
4
void FragmentMgr::cancel_worker() {
871
4
    LOG(INFO) << "FragmentMgr cancel worker start working.";
872
873
4
    timespec check_invalid_query_last_timestamp;
874
4
    clock_gettime(CLOCK_MONOTONIC, &check_invalid_query_last_timestamp);
875
876
4
    do {
877
4
        std::vector<TUniqueId> queries_lost_coordinator;
878
4
        std::vector<TUniqueId> queries_timeout;
879
4
        std::vector<TUniqueId> queries_pipeline_task_leak;
880
        // Fe process uuid -> set<QueryId>
881
4
        std::map<int64_t, std::unordered_set<TUniqueId>> running_queries_on_all_fes;
882
4
        const std::map<TNetworkAddress, FrontendInfo>& running_fes =
883
4
                ExecEnv::GetInstance()->get_running_frontends();
884
885
4
        timespec now;
886
4
        clock_gettime(CLOCK_MONOTONIC, &now);
887
888
4
        if (config::enable_pipeline_task_leakage_detect &&
889
4
            now.tv_sec - check_invalid_query_last_timestamp.tv_sec >
890
0
                    config::pipeline_task_leakage_detect_period_secs) {
891
0
            check_invalid_query_last_timestamp = now;
892
0
            running_queries_on_all_fes = _get_all_running_queries_from_fe();
893
4
        } else {
894
4
            running_queries_on_all_fes.clear();
895
4
        }
896
897
4
        std::vector<std::shared_ptr<pipeline::PipelineFragmentContext>> ctx;
898
4
        {
899
4
            std::lock_guard<std::mutex> lock(_lock);
900
4
            ctx.reserve(_pipeline_map.size());
901
4
            for (auto& pipeline_itr : _pipeline_map) {
902
0
                ctx.push_back(pipeline_itr.second);
903
0
            }
904
4
        }
905
4
        for (auto& c : ctx) {
906
0
            c->clear_finished_tasks();
907
0
        }
908
909
4
        std::unordered_map<std::shared_ptr<PBackendService_Stub>, BrpcItem> brpc_stub_with_queries;
910
4
        {
911
4
            std::lock_guard<std::mutex> lock(_lock);
912
4
            for (auto it = _query_ctx_map.begin(); it != _query_ctx_map.end();) {
913
0
                if (auto q_ctx = it->second.lock()) {
914
0
                    if (q_ctx->is_timeout(now)) {
915
0
                        LOG_WARNING("Query {} is timeout", print_id(it->first));
916
0
                        queries_timeout.push_back(it->first);
917
0
                    } else if (config::enable_brpc_connection_check) {
918
0
                        auto brpc_stubs = q_ctx->get_using_brpc_stubs();
919
0
                        for (auto& item : brpc_stubs) {
920
0
                            if (!brpc_stub_with_queries.contains(item.second)) {
921
0
                                brpc_stub_with_queries.emplace(item.second,
922
0
                                                               BrpcItem {item.first, {q_ctx}});
923
0
                            } else {
924
0
                                brpc_stub_with_queries[item.second].queries.emplace_back(q_ctx);
925
0
                            }
926
0
                        }
927
0
                    }
928
0
                    ++it;
929
0
                } else {
930
0
                    it = _query_ctx_map.erase(it);
931
0
                }
932
0
            }
933
934
            // We use a very conservative cancel strategy.
935
            // 0. If there are no running frontends, do not cancel any queries.
936
            // 1. If query's process uuid is zero, do not cancel
937
            // 2. If same process uuid, do not cancel
938
            // 3. If fe has zero process uuid, do not cancel
939
4
            if (running_fes.empty() && !_query_ctx_map.empty()) {
940
0
                LOG_EVERY_N(WARNING, 10)
941
0
                        << "Could not find any running frontends, maybe we are upgrading or "
942
0
                           "starting? "
943
0
                        << "We will not cancel any outdated queries in this situation.";
944
4
            } else {
945
4
                for (const auto& it : _query_ctx_map) {
946
0
                    if (auto q_ctx = it.second.lock()) {
947
0
                        const int64_t fe_process_uuid = q_ctx->get_fe_process_uuid();
948
949
0
                        if (fe_process_uuid == 0) {
950
                            // zero means this query is from a older version fe or
951
                            // this fe is starting
952
0
                            continue;
953
0
                        }
954
955
                        // If the query is not running on the any frontends, cancel it.
956
0
                        if (auto itr = running_queries_on_all_fes.find(fe_process_uuid);
957
0
                            itr != running_queries_on_all_fes.end()) {
958
                            // Query not found on this frontend, and the query arrives before the last check
959
0
                            if (itr->second.find(it.first) == itr->second.end() &&
960
                                // tv_nsec represents the number of nanoseconds that have elapsed since the time point stored in tv_sec.
961
                                // tv_sec is enough, we do not need to check tv_nsec.
962
0
                                q_ctx->get_query_arrival_timestamp().tv_sec <
963
0
                                        check_invalid_query_last_timestamp.tv_sec &&
964
0
                                q_ctx->get_query_source() == QuerySource::INTERNAL_FRONTEND) {
965
0
                                queries_pipeline_task_leak.push_back(q_ctx->query_id());
966
0
                                LOG_INFO(
967
0
                                        "Query {}, type {} is not found on any frontends, maybe it "
968
0
                                        "is leaked.",
969
0
                                        print_id(q_ctx->query_id()),
970
0
                                        toString(q_ctx->get_query_source()));
971
0
                                continue;
972
0
                            }
973
0
                        }
974
975
0
                        auto itr = running_fes.find(q_ctx->coord_addr);
976
0
                        if (itr != running_fes.end()) {
977
0
                            if (fe_process_uuid == itr->second.info.process_uuid ||
978
0
                                itr->second.info.process_uuid == 0) {
979
0
                                continue;
980
0
                            } else {
981
0
                                LOG_WARNING(
982
0
                                        "Coordinator of query {} restarted, going to cancel it.",
983
0
                                        print_id(q_ctx->query_id()));
984
0
                            }
985
0
                        } else {
986
                            // In some rear cases, the rpc port of follower is not updated in time,
987
                            // then the port of this follower will be zero, but acutally it is still running,
988
                            // and be has already received the query from follower.
989
                            // So we need to check if host is in running_fes.
990
0
                            bool fe_host_is_standing = std::any_of(
991
0
                                    running_fes.begin(), running_fes.end(),
992
0
                                    [&q_ctx](const auto& fe) {
993
0
                                        return fe.first.hostname == q_ctx->coord_addr.hostname &&
994
0
                                               fe.first.port == 0;
995
0
                                    });
996
0
                            if (fe_host_is_standing) {
997
0
                                LOG_WARNING(
998
0
                                        "Coordinator {}:{} is not found, but its host is still "
999
0
                                        "running with an unstable brpc port, not going to cancel "
1000
0
                                        "it.",
1001
0
                                        q_ctx->coord_addr.hostname, q_ctx->coord_addr.port,
1002
0
                                        print_id(q_ctx->query_id()));
1003
0
                                continue;
1004
0
                            } else {
1005
0
                                LOG_WARNING(
1006
0
                                        "Could not find target coordinator {}:{} of query {}, "
1007
0
                                        "going to "
1008
0
                                        "cancel it.",
1009
0
                                        q_ctx->coord_addr.hostname, q_ctx->coord_addr.port,
1010
0
                                        print_id(q_ctx->query_id()));
1011
0
                            }
1012
0
                        }
1013
0
                    }
1014
                    // Coordinator of this query has already dead or query context has been released.
1015
0
                    queries_lost_coordinator.push_back(it.first);
1016
0
                }
1017
4
            }
1018
4
        }
1019
1020
4
        if (config::enable_brpc_connection_check) {
1021
0
            for (auto it : brpc_stub_with_queries) {
1022
0
                if (!it.first) {
1023
0
                    LOG(WARNING) << "brpc stub is nullptr, skip it.";
1024
0
                    continue;
1025
0
                }
1026
0
                _check_brpc_available(it.first, it.second);
1027
0
            }
1028
0
        }
1029
1030
4
        if (!queries_lost_coordinator.empty()) {
1031
0
            LOG(INFO) << "There are " << queries_lost_coordinator.size()
1032
0
                      << " queries need to be cancelled, coordinator dead or restarted.";
1033
0
        }
1034
1035
4
        for (const auto& qid : queries_timeout) {
1036
0
            cancel_query(qid,
1037
0
                         Status::Error<ErrorCode::TIMEOUT>(
1038
0
                                 "FragmentMgr cancel worker going to cancel timeout instance "));
1039
0
        }
1040
1041
4
        for (const auto& qid : queries_pipeline_task_leak) {
1042
            // Cancel the query, and maybe try to report debug info to fe so that we can
1043
            // collect debug info by sql or http api instead of search log.
1044
0
            cancel_query(qid, Status::Error<ErrorCode::ILLEGAL_STATE>(
1045
0
                                      "Potential pipeline task leakage"));
1046
0
        }
1047
1048
4
        for (const auto& qid : queries_lost_coordinator) {
1049
0
            cancel_query(qid, Status::Error<ErrorCode::CANCELLED>(
1050
0
                                      "Source frontend is not running or restarted"));
1051
0
        }
1052
1053
4
    } while (!_stop_background_threads_latch.wait_for(
1054
4
            std::chrono::seconds(config::fragment_mgr_cancel_worker_interval_seconds)));
1055
4
    LOG(INFO) << "FragmentMgr cancel worker is going to exit.";
1056
4
}
1057
1058
void FragmentMgr::_check_brpc_available(const std::shared_ptr<PBackendService_Stub>& brpc_stub,
1059
0
                                        const BrpcItem& brpc_item) {
1060
0
    const std::string message = "hello doris!";
1061
0
    std::string error_message;
1062
0
    int32_t failed_count = 0;
1063
0
    while (true) {
1064
0
        PHandShakeRequest request;
1065
0
        request.set_hello(message);
1066
0
        PHandShakeResponse response;
1067
0
        brpc::Controller cntl;
1068
0
        cntl.set_timeout_ms(500 * (failed_count + 1));
1069
0
        cntl.set_max_retry(10);
1070
0
        brpc_stub->hand_shake(&cntl, &request, &response, nullptr);
1071
1072
0
        if (cntl.Failed()) {
1073
0
            error_message = cntl.ErrorText();
1074
0
            LOG(WARNING) << "brpc stub: " << brpc_item.network_address.hostname << ":"
1075
0
                         << brpc_item.network_address.port << " check failed: " << error_message;
1076
0
        } else if (response.has_status() && response.status().status_code() == 0) {
1077
0
            break;
1078
0
        } else {
1079
0
            error_message = response.DebugString();
1080
0
            LOG(WARNING) << "brpc stub: " << brpc_item.network_address.hostname << ":"
1081
0
                         << brpc_item.network_address.port << " check failed: " << error_message;
1082
0
        }
1083
0
        failed_count++;
1084
0
        if (failed_count == 2) {
1085
0
            for (const auto& query_wptr : brpc_item.queries) {
1086
0
                auto query = query_wptr.lock();
1087
0
                if (query && !query->is_cancelled()) {
1088
0
                    query->cancel(Status::InternalError("brpc(dest: {}:{}) check failed: {}",
1089
0
                                                        brpc_item.network_address.hostname,
1090
0
                                                        brpc_item.network_address.port,
1091
0
                                                        error_message));
1092
0
                }
1093
0
            }
1094
1095
0
            LOG(WARNING) << "remove brpc stub from cache: " << brpc_item.network_address.hostname
1096
0
                         << ":" << brpc_item.network_address.port << ", error: " << error_message;
1097
0
            ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
1098
0
                    brpc_item.network_address.hostname, brpc_item.network_address.port);
1099
0
            break;
1100
0
        }
1101
0
    }
1102
0
}
1103
1104
0
void FragmentMgr::debug(std::stringstream& ss) {}
1105
/*
1106
 * 1. resolve opaqued_query_plan to thrift structure
1107
 * 2. build TExecPlanFragmentParams
1108
 */
1109
Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params,
1110
                                                const TQueryPlanInfo& t_query_plan_info,
1111
                                                const TUniqueId& query_id,
1112
                                                const TUniqueId& fragment_instance_id,
1113
0
                                                std::vector<TScanColumnDesc>* selected_columns) {
1114
    // set up desc tbl
1115
0
    DescriptorTbl* desc_tbl = nullptr;
1116
0
    ObjectPool obj_pool;
1117
0
    Status st = DescriptorTbl::create(&obj_pool, t_query_plan_info.desc_tbl, &desc_tbl);
1118
0
    if (!st.ok()) {
1119
0
        LOG(WARNING) << "open context error: extract DescriptorTbl failure";
1120
0
        std::stringstream msg;
1121
0
        msg << " create DescriptorTbl error, should not be modified after returned Doris FE "
1122
0
               "processed";
1123
0
        return Status::InvalidArgument(msg.str());
1124
0
    }
1125
0
    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
1126
0
    if (tuple_desc == nullptr) {
1127
0
        LOG(WARNING) << "open context error: extract TupleDescriptor failure";
1128
0
        std::stringstream msg;
1129
0
        msg << " get  TupleDescriptor error, should not be modified after returned Doris FE "
1130
0
               "processed";
1131
0
        return Status::InvalidArgument(msg.str());
1132
0
    }
1133
    // process selected columns form slots
1134
0
    for (const SlotDescriptor* slot : tuple_desc->slots()) {
1135
0
        TScanColumnDesc col;
1136
0
        col.__set_name(slot->col_name());
1137
0
        col.__set_type(to_thrift(slot->type().type));
1138
0
        selected_columns->emplace_back(std::move(col));
1139
0
    }
1140
1141
0
    VLOG_QUERY << "BackendService execute open()  TQueryPlanInfo: "
1142
0
               << apache::thrift::ThriftDebugString(t_query_plan_info);
1143
    // assign the param used to execute PlanFragment
1144
0
    TPipelineFragmentParams exec_fragment_params;
1145
0
    exec_fragment_params.protocol_version = (PaloInternalServiceVersion::type)0;
1146
0
    exec_fragment_params.__set_is_simplified_param(false);
1147
0
    exec_fragment_params.__set_fragment(t_query_plan_info.plan_fragment);
1148
0
    exec_fragment_params.__set_desc_tbl(t_query_plan_info.desc_tbl);
1149
1150
    // assign the param used for executing of PlanFragment-self
1151
0
    TPipelineInstanceParams fragment_exec_params;
1152
0
    exec_fragment_params.query_id = query_id;
1153
0
    fragment_exec_params.fragment_instance_id = fragment_instance_id;
1154
0
    exec_fragment_params.coord.hostname = "external";
1155
0
    std::map<::doris::TPlanNodeId, std::vector<TScanRangeParams>> per_node_scan_ranges;
1156
0
    std::vector<TScanRangeParams> scan_ranges;
1157
0
    std::vector<int64_t> tablet_ids = params.tablet_ids;
1158
0
    TNetworkAddress address;
1159
0
    address.hostname = BackendOptions::get_localhost();
1160
0
    address.port = doris::config::be_port;
1161
0
    std::map<int64_t, TTabletVersionInfo> tablet_info = t_query_plan_info.tablet_info;
1162
0
    for (auto tablet_id : params.tablet_ids) {
1163
0
        TPaloScanRange scan_range;
1164
0
        scan_range.db_name = params.database;
1165
0
        scan_range.table_name = params.table;
1166
0
        auto iter = tablet_info.find(tablet_id);
1167
0
        if (iter != tablet_info.end()) {
1168
0
            TTabletVersionInfo info = iter->second;
1169
0
            scan_range.tablet_id = tablet_id;
1170
0
            scan_range.version = std::to_string(info.version);
1171
            // Useless but it is required field in TPaloScanRange
1172
0
            scan_range.version_hash = "0";
1173
0
            scan_range.schema_hash = std::to_string(info.schema_hash);
1174
0
            scan_range.hosts.push_back(address);
1175
0
        } else {
1176
0
            std::stringstream msg;
1177
0
            msg << "tablet_id: " << tablet_id << " not found";
1178
0
            LOG(WARNING) << "tablet_id [ " << tablet_id << " ] not found";
1179
0
            return Status::NotFound(msg.str());
1180
0
        }
1181
0
        TScanRange doris_scan_range;
1182
0
        doris_scan_range.__set_palo_scan_range(scan_range);
1183
0
        TScanRangeParams scan_range_params;
1184
0
        scan_range_params.scan_range = doris_scan_range;
1185
0
        scan_ranges.push_back(scan_range_params);
1186
0
    }
1187
0
    per_node_scan_ranges.insert(std::make_pair((::doris::TPlanNodeId)0, scan_ranges));
1188
0
    fragment_exec_params.per_node_scan_ranges = per_node_scan_ranges;
1189
0
    exec_fragment_params.local_params.push_back(fragment_exec_params);
1190
0
    TQueryOptions query_options;
1191
0
    query_options.batch_size = params.batch_size;
1192
0
    query_options.execution_timeout = params.execution_timeout;
1193
0
    query_options.mem_limit = params.mem_limit;
1194
0
    query_options.query_type = TQueryType::EXTERNAL;
1195
0
    query_options.be_exec_version = BeExecVersionManager::get_newest_version();
1196
0
    exec_fragment_params.__set_query_options(query_options);
1197
0
    VLOG_ROW << "external exec_plan_fragment params is "
1198
0
             << apache::thrift::ThriftDebugString(exec_fragment_params).c_str();
1199
0
    return exec_plan_fragment(exec_fragment_params, QuerySource::EXTERNAL_CONNECTOR);
1200
0
}
1201
1202
Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
1203
0
                                   butil::IOBufAsZeroCopyInputStream* attach_data) {
1204
0
    int64_t start_apply = MonotonicMillis();
1205
1206
0
    std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
1207
0
    QueryThreadContext query_thread_context;
1208
1209
0
    RuntimeFilterMgr* runtime_filter_mgr = nullptr;
1210
1211
0
    const auto& fragment_ids = request->fragment_ids();
1212
0
    {
1213
0
        std::unique_lock<std::mutex> lock(_lock);
1214
0
        for (auto fragment_id : fragment_ids) {
1215
0
            auto iter =
1216
0
                    _pipeline_map.find({UniqueId(request->query_id()).to_thrift(), fragment_id});
1217
0
            if (iter == _pipeline_map.end()) {
1218
0
                continue;
1219
0
            }
1220
0
            pip_context = iter->second;
1221
1222
0
            DCHECK(pip_context != nullptr);
1223
0
            runtime_filter_mgr = pip_context->get_query_ctx()->runtime_filter_mgr();
1224
0
            query_thread_context = {pip_context->get_query_ctx()->query_id(),
1225
0
                                    pip_context->get_query_ctx()->query_mem_tracker,
1226
0
                                    pip_context->get_query_ctx()->workload_group()};
1227
0
            break;
1228
0
        }
1229
0
    }
1230
1231
0
    if (runtime_filter_mgr == nullptr) {
1232
        // all instance finished
1233
0
        return Status::OK();
1234
0
    }
1235
1236
0
    SCOPED_ATTACH_TASK(query_thread_context);
1237
    // 1. get the target filters
1238
0
    std::vector<std::shared_ptr<IRuntimeFilter>> filters;
1239
0
    RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(request->filter_id(), filters));
1240
1241
    // 2. create the filter wrapper to replace or ignore the target filters
1242
0
    if (!filters.empty()) {
1243
0
        UpdateRuntimeFilterParamsV2 params {request, attach_data, filters[0]->column_type()};
1244
0
        std::shared_ptr<RuntimePredicateWrapper> filter_wrapper;
1245
0
        RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(&params, &filter_wrapper));
1246
1247
0
        std::ranges::for_each(filters, [&](auto& filter) {
1248
0
            filter->update_filter(
1249
0
                    filter_wrapper, request->merge_time(), start_apply,
1250
0
                    request->has_local_merge_time() ? request->local_merge_time() : 0);
1251
0
        });
1252
0
    }
1253
1254
0
    return Status::OK();
1255
0
}
1256
1257
0
Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {
1258
0
    UniqueId queryid = request->query_id();
1259
1260
0
    std::shared_ptr<QueryContext> query_ctx;
1261
0
    {
1262
0
        TUniqueId query_id;
1263
0
        query_id.__set_hi(queryid.hi);
1264
0
        query_id.__set_lo(queryid.lo);
1265
0
        std::lock_guard<std::mutex> lock(_lock);
1266
0
        if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
1267
0
            query_ctx = q_ctx;
1268
0
        } else {
1269
0
            return Status::EndOfFile("Query context (query-id: {}) not found, maybe finished",
1270
0
                                     queryid.to_string());
1271
0
        }
1272
0
    }
1273
1274
0
    std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
1275
0
    RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller));
1276
0
    auto merge_status = filter_controller->send_filter_size(query_ctx, request);
1277
0
    return merge_status;
1278
0
}
1279
1280
0
Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest* request) {
1281
0
    UniqueId queryid = request->query_id();
1282
0
    std::shared_ptr<QueryContext> query_ctx;
1283
0
    {
1284
0
        TUniqueId query_id;
1285
0
        query_id.__set_hi(queryid.hi);
1286
0
        query_id.__set_lo(queryid.lo);
1287
0
        std::lock_guard<std::mutex> lock(_lock);
1288
0
        if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
1289
0
            query_ctx = q_ctx;
1290
0
        } else {
1291
0
            return Status::InvalidArgument("Query context (query-id: {}) not found",
1292
0
                                           queryid.to_string());
1293
0
        }
1294
0
    }
1295
0
    return query_ctx->runtime_filter_mgr()->sync_filter_size(request);
1296
0
}
1297
1298
Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
1299
0
                                 butil::IOBufAsZeroCopyInputStream* attach_data) {
1300
0
    UniqueId queryid = request->query_id();
1301
1302
0
    std::shared_ptr<QueryContext> query_ctx;
1303
0
    {
1304
0
        TUniqueId query_id;
1305
0
        query_id.__set_hi(queryid.hi);
1306
0
        query_id.__set_lo(queryid.lo);
1307
0
        std::lock_guard<std::mutex> lock(_lock);
1308
0
        if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
1309
0
            query_ctx = q_ctx;
1310
0
        } else {
1311
0
            return Status::InvalidArgument("Query context (query-id: {}) not found",
1312
0
                                           queryid.to_string());
1313
0
        }
1314
0
    }
1315
0
    SCOPED_ATTACH_TASK(query_ctx.get());
1316
0
    std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
1317
0
    RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller));
1318
0
    auto merge_status = filter_controller->merge(query_ctx, request, attach_data);
1319
0
    return merge_status;
1320
0
}
1321
1322
0
void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* query_info_list) {
1323
0
    {
1324
0
        std::lock_guard<std::mutex> lock(_lock);
1325
0
        for (auto iter = _query_ctx_map.begin(); iter != _query_ctx_map.end();) {
1326
0
            if (auto q_ctx = iter->second.lock()) {
1327
0
                WorkloadQueryInfo workload_query_info;
1328
0
                workload_query_info.query_id = print_id(iter->first);
1329
0
                workload_query_info.tquery_id = iter->first;
1330
0
                workload_query_info.wg_id =
1331
0
                        q_ctx->workload_group() == nullptr ? -1 : q_ctx->workload_group()->id();
1332
0
                query_info_list->push_back(workload_query_info);
1333
0
                iter++;
1334
0
            } else {
1335
0
                iter = _query_ctx_map.erase(iter);
1336
0
            }
1337
0
        }
1338
0
    }
1339
0
}
1340
1341
Status FragmentMgr::get_realtime_exec_status(const TUniqueId& query_id,
1342
0
                                             TReportExecStatusParams* exec_status) {
1343
0
    if (exec_status == nullptr) {
1344
0
        return Status::InvalidArgument("exes_status is nullptr");
1345
0
    }
1346
1347
0
    std::shared_ptr<QueryContext> query_context = nullptr;
1348
1349
0
    {
1350
0
        std::lock_guard<std::mutex> lock(_lock);
1351
0
        if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
1352
0
            query_context = q_ctx;
1353
0
        } else {
1354
0
            return Status::NotFound("Query {} has been released", print_id(query_id));
1355
0
        }
1356
0
    }
1357
1358
0
    if (query_context == nullptr) {
1359
0
        return Status::NotFound("Query {} not found", print_id(query_id));
1360
0
    }
1361
1362
0
    *exec_status = query_context->get_realtime_exec_status();
1363
1364
0
    return Status::OK();
1365
0
}
1366
1367
} // namespace doris