Coverage Report

Created: 2026-04-10 05:08

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/query_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/query_context.h"
19
20
#include <fmt/core.h>
21
#include <gen_cpp/FrontendService_types.h>
22
#include <gen_cpp/RuntimeProfile_types.h>
23
#include <gen_cpp/Types_types.h>
24
#include <glog/logging.h>
25
26
#include <algorithm>
27
#include <exception>
28
#include <memory>
29
#include <mutex>
30
#include <utility>
31
#include <vector>
32
33
#include "common/logging.h"
34
#include "common/status.h"
35
#include "exec/operator/rec_cte_scan_operator.h"
36
#include "exec/pipeline/dependency.h"
37
#include "exec/pipeline/pipeline_fragment_context.h"
38
#include "exec/runtime_filter/runtime_filter_definitions.h"
39
#include "exec/spill/spill_file_manager.h"
40
#include "runtime/exec_env.h"
41
#include "runtime/fragment_mgr.h"
42
#include "runtime/memory/heap_profiler.h"
43
#include "runtime/runtime_query_statistics_mgr.h"
44
#include "runtime/runtime_state.h"
45
#include "runtime/thread_context.h"
46
#include "runtime/workload_group/workload_group_manager.h"
47
#include "runtime/workload_management/query_task_controller.h"
48
#include "storage/olap_common.h"
49
#include "util/mem_info.h"
50
#include "util/uid_util.h"
51
52
namespace doris {
53
54
class DelayReleaseToken : public Runnable {
55
    ENABLE_FACTORY_CREATOR(DelayReleaseToken);
56
57
public:
58
0
    DelayReleaseToken(std::unique_ptr<ThreadPoolToken>&& token) { token_ = std::move(token); }
59
    ~DelayReleaseToken() override = default;
60
0
    void run() override {}
61
    std::unique_ptr<ThreadPoolToken> token_;
62
};
63
64
0
const std::string toString(QuerySource queryType) {
65
0
    switch (queryType) {
66
0
    case QuerySource::INTERNAL_FRONTEND:
67
0
        return "INTERNAL_FRONTEND";
68
0
    case QuerySource::STREAM_LOAD:
69
0
        return "STREAM_LOAD";
70
0
    case QuerySource::GROUP_COMMIT_LOAD:
71
0
        return "EXTERNAL_QUERY";
72
0
    case QuerySource::ROUTINE_LOAD:
73
0
        return "ROUTINE_LOAD";
74
0
    case QuerySource::EXTERNAL_CONNECTOR:
75
0
        return "EXTERNAL_CONNECTOR";
76
0
    case QuerySource::EXTERNAL_FRONTEND:
77
0
        return "EXTERNAL_FRONTEND";
78
0
    default:
79
0
        return "UNKNOWN";
80
0
    }
81
0
}
82
83
std::shared_ptr<QueryContext> QueryContext::create(TUniqueId query_id, ExecEnv* exec_env,
84
                                                   const TQueryOptions& query_options,
85
                                                   TNetworkAddress coord_addr, bool is_nereids,
86
                                                   TNetworkAddress current_connect_fe,
87
209k
                                                   QuerySource query_type) {
88
209k
    auto ctx = QueryContext::create_shared(query_id, exec_env, query_options, coord_addr,
89
209k
                                           is_nereids, current_connect_fe, query_type);
90
209k
    ctx->init_query_task_controller();
91
209k
    return ctx;
92
209k
}
93
94
QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env,
95
                           const TQueryOptions& query_options, TNetworkAddress coord_addr,
96
                           bool is_nereids, TNetworkAddress current_connect_fe,
97
                           QuerySource query_source)
98
331k
        : _timeout_second(-1),
99
331k
          _query_id(std::move(query_id)),
100
331k
          _exec_env(exec_env),
101
331k
          _is_nereids(is_nereids),
102
331k
          _query_options(query_options),
103
331k
          _query_source(query_source) {
104
331k
    _init_resource_context();
105
331k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker());
106
331k
    _query_watcher.start();
107
331k
    _execution_dependency = Dependency::create_unique(-1, -1, "ExecutionDependency", false);
108
331k
    _memory_sufficient_dependency =
109
331k
            Dependency::create_unique(-1, -1, "MemorySufficientDependency", true);
110
111
331k
    _runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(true);
112
113
331k
    _timeout_second = query_options.execution_timeout;
114
115
    // For olap tables, data is cached regardless of the 'enable_file_cache_for_olap_table' setting:
116
    // - When true: data enters the normal queue.
117
    // - When false: data enters the disposable queue.
118
    // In both cases, a context_holder must be generated for the query limit functionality.
119
120
    // For external tables, file cache is only used when 'enable_file_cache_for_external_table' is enabled:
121
    // - When true: data enters the normal queue.
122
    // - When false: data is not cached at all.
123
    // Therefore, a context_holder is required only when external table caching is enabled.
124
125
    // Summary: The only scenario where no context_holder is needed is for external tables
126
    // when 'enable_file_cache_for_external_table' is disabled.
127
331k
    bool initialize_context_holder = config::enable_file_cache &&
128
331k
                                     config::enable_file_cache_query_limit &&
129
331k
                                     !(query_options.query_type == TQueryType::EXTERNAL &&
130
208k
                                       !query_options.enable_file_cache_for_external_table) &&
131
331k
                                     query_options.__isset.file_cache_query_limit_percent &&
132
331k
                                     query_options.file_cache_query_limit_percent < 100;
133
134
    // Initialize file cache context holders
135
331k
    if (initialize_context_holder) {
136
2.11k
        _query_context_holders = io::FileCacheFactory::instance()->get_query_context_holders(
137
2.11k
                _query_id, query_options.file_cache_query_limit_percent);
138
2.11k
    }
139
140
331k
    bool is_query_type_valid = query_options.query_type == TQueryType::SELECT ||
141
331k
                               query_options.query_type == TQueryType::LOAD ||
142
331k
                               query_options.query_type == TQueryType::EXTERNAL;
143
331k
    DCHECK_EQ(is_query_type_valid, true);
144
145
331k
    this->coord_addr = coord_addr;
146
    // current_connect_fe is used for report query statistics
147
331k
    this->current_connect_fe = current_connect_fe;
148
    // external query has no current_connect_fe
149
331k
    if (query_options.query_type != TQueryType::EXTERNAL) {
150
209k
        bool is_report_fe_addr_valid =
151
209k
                !this->current_connect_fe.hostname.empty() && this->current_connect_fe.port != 0;
152
209k
        DCHECK_EQ(is_report_fe_addr_valid, true);
153
209k
    }
154
331k
    clock_gettime(CLOCK_MONOTONIC, &this->_query_arrival_timestamp);
155
331k
    DorisMetrics::instance()->query_ctx_cnt->increment(1);
156
331k
    _mem_arb = MemShareArbitrator::create_shared(
157
331k
            query_id, query_options.mem_limit,
158
331k
            query_options.__isset.max_scan_mem_ratio ? query_options.max_scan_mem_ratio : 1.0);
159
331k
}
160
161
330k
void QueryContext::_init_query_mem_tracker() {
162
330k
    bool has_query_mem_limit = _query_options.__isset.mem_limit && (_query_options.mem_limit > 0);
163
18.4E
    int64_t bytes_limit = has_query_mem_limit ? _query_options.mem_limit : -1;
164
330k
    if (bytes_limit > MemInfo::mem_limit() || bytes_limit == -1) {
165
139k
        VLOG_NOTICE << "Query memory limit " << PrettyPrinter::print(bytes_limit, TUnit::BYTES)
166
184
                    << " exceeds process memory limit of "
167
184
                    << PrettyPrinter::print(MemInfo::mem_limit(), TUnit::BYTES)
168
184
                    << " OR is -1. Using process memory limit instead.";
169
139k
        bytes_limit = MemInfo::mem_limit();
170
139k
    }
171
    // If the query is a pure load task(streamload, routine load, group commit), then it should not use
172
    // memlimit per query to limit their memory usage.
173
330k
    if (is_pure_load_task()) {
174
124k
        bytes_limit = MemInfo::mem_limit();
175
124k
    }
176
330k
    std::shared_ptr<MemTrackerLimiter> query_mem_tracker;
177
330k
    if (_query_options.query_type == TQueryType::SELECT) {
178
181k
        query_mem_tracker = MemTrackerLimiter::create_shared(
179
181k
                MemTrackerLimiter::Type::QUERY, fmt::format("Query#Id={}", print_id(_query_id)),
180
181k
                bytes_limit);
181
181k
    } else if (_query_options.query_type == TQueryType::LOAD) {
182
27.9k
        query_mem_tracker = MemTrackerLimiter::create_shared(
183
27.9k
                MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}", print_id(_query_id)),
184
27.9k
                bytes_limit);
185
121k
    } else if (_query_options.query_type == TQueryType::EXTERNAL) { // spark/flink/etc..
186
121k
        query_mem_tracker = MemTrackerLimiter::create_shared(
187
121k
                MemTrackerLimiter::Type::QUERY, fmt::format("External#Id={}", print_id(_query_id)),
188
121k
                bytes_limit);
189
18.4E
    } else {
190
18.4E
        LOG(FATAL) << "__builtin_unreachable";
191
18.4E
        __builtin_unreachable();
192
18.4E
    }
193
331k
    if (_query_options.__isset.is_report_success && _query_options.is_report_success) {
194
26.6k
        query_mem_tracker->enable_print_log_usage();
195
26.6k
    }
196
197
    // If enable reserve memory, not enable check limit, because reserve memory will check it.
198
    // If reserve enabled, even if the reserved memory size is smaller than the actual requested memory,
199
    // and the query memory consumption is larger than the limit, we do not expect the query to fail
200
    // after `check_limit` returns an error, but to run as long as possible,
201
    // and will enter the paused state and try to spill when the query reserves next time.
202
    // If the workload group or process runs out of memory, it will be forced to cancel.
203
331k
    query_mem_tracker->set_enable_check_limit(!(_query_options.__isset.enable_reserve_memory &&
204
331k
                                                _query_options.enable_reserve_memory));
205
331k
    _resource_ctx->memory_context()->set_mem_tracker(query_mem_tracker);
206
331k
}
207
208
331k
void QueryContext::_init_resource_context() {
209
331k
    _resource_ctx = ResourceContext::create_shared();
210
331k
    _init_query_mem_tracker();
211
331k
}
212
213
329k
void QueryContext::init_query_task_controller() {
214
329k
    _resource_ctx->set_task_controller(QueryTaskController::create(shared_from_this()));
215
329k
    _resource_ctx->task_controller()->set_task_id(_query_id);
216
329k
    _resource_ctx->task_controller()->set_fe_addr(current_connect_fe);
217
329k
    _resource_ctx->task_controller()->set_query_type(_query_options.query_type);
218
329k
#ifndef BE_TEST
219
329k
    _exec_env->runtime_query_statistics_mgr()->register_resource_context(print_id(_query_id),
220
329k
                                                                         _resource_ctx);
221
329k
#endif
222
329k
}
223
224
209k
QueryContext::~QueryContext() {
225
209k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker());
226
    // query mem tracker consumption is equal to 0, it means that after QueryContext is created,
227
    // it is found that query already exists in _query_ctx_map, and query mem tracker is not used.
228
    // query mem tracker consumption is not equal to 0 after use, because there is memory consumed
229
    // on query mem tracker, released on other trackers.
230
209k
    std::string mem_tracker_msg;
231
209k
    if (query_mem_tracker()->peak_consumption() != 0) {
232
208k
        mem_tracker_msg = fmt::format(
233
208k
                "deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, "
234
208k
                "PeakUsed={}",
235
208k
                print_id(_query_id), PrettyPrinter::print_bytes(query_mem_tracker()->limit()),
236
208k
                PrettyPrinter::print_bytes(query_mem_tracker()->consumption()),
237
208k
                PrettyPrinter::print_bytes(query_mem_tracker()->peak_consumption()));
238
208k
    }
239
209k
    [[maybe_unused]] uint64_t group_id = 0;
240
209k
    if (workload_group()) {
241
209k
        group_id = workload_group()->id(); // before remove
242
209k
    }
243
244
209k
    _resource_ctx->task_controller()->finish();
245
246
209k
    if (enable_profile()) {
247
919
        _report_query_profile();
248
919
    }
249
250
209k
#ifndef BE_TEST
251
209k
    if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) [[unlikely]] {
252
0
        try {
253
0
            ExecEnv::GetInstance()->pipeline_tracer_context()->end_query(_query_id, group_id);
254
0
        } catch (std::exception& e) {
255
0
            LOG(WARNING) << "Dump trace log failed bacause " << e.what();
256
0
        }
257
0
    }
258
209k
#endif
259
209k
    _runtime_filter_mgr.reset();
260
209k
    _execution_dependency.reset();
261
209k
    _runtime_predicates.clear();
262
209k
    file_scan_range_params_map.clear();
263
209k
    obj_pool.clear();
264
209k
    _merge_controller_handler.reset();
265
266
209k
    DorisMetrics::instance()->query_ctx_cnt->increment(-1);
267
    // fragment_mgr is nullptr in unittest
268
209k
    if (ExecEnv::GetInstance()->fragment_mgr()) {
269
209k
        ExecEnv::GetInstance()->fragment_mgr()->remove_query_context(this->_query_id);
270
209k
    }
271
    // the only one msg shows query's end. any other msg should append to it if need.
272
209k
    LOG_INFO("Query {} deconstructed, mem_tracker: {}", print_id(this->_query_id), mem_tracker_msg);
273
209k
}
274
275
98.3k
void QueryContext::set_ready_to_execute(Status reason) {
276
98.3k
    set_execution_dependency_ready();
277
98.3k
    _exec_status.update(reason);
278
98.3k
}
279
280
114k
void QueryContext::set_ready_to_execute_only() {
281
114k
    set_execution_dependency_ready();
282
114k
}
283
284
212k
void QueryContext::set_execution_dependency_ready() {
285
212k
    _execution_dependency->set_ready();
286
212k
}
287
288
18
void QueryContext::set_memory_sufficient(bool sufficient) {
289
18
    if (sufficient) {
290
8
        {
291
8
            _memory_sufficient_dependency->set_ready();
292
8
            _resource_ctx->task_controller()->reset_paused_reason();
293
8
        }
294
10
    } else {
295
10
        _memory_sufficient_dependency->block();
296
10
        _resource_ctx->task_controller()->add_paused_count();
297
10
    }
298
18
}
299
300
103k
void QueryContext::cancel(Status new_status, int fragment_id) {
301
103k
    if (!_exec_status.update(new_status)) {
302
99.0k
        return;
303
99.0k
    }
304
    // Tasks should be always runnable.
305
4.71k
    _execution_dependency->set_always_ready();
306
4.71k
    _memory_sufficient_dependency->set_always_ready();
307
4.71k
    if ((new_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>() ||
308
4.71k
         new_status.is<ErrorCode::MEM_ALLOC_FAILED>()) &&
309
4.71k
        _query_options.__isset.dump_heap_profile_when_mem_limit_exceeded &&
310
4.71k
        _query_options.dump_heap_profile_when_mem_limit_exceeded) {
311
        // if query is cancelled because of query mem limit exceeded, dump heap profile
312
        // at the time of cancellation can get the most accurate memory usage for problem analysis
313
0
        auto wg = workload_group();
314
0
        auto log_str = fmt::format(
315
0
                "Query {} canceled because of memory limit exceeded, dumping memory "
316
0
                "detail profiles. wg: {}. {}",
317
0
                print_id(_query_id), wg ? wg->debug_string() : "null",
318
0
                doris::ProcessProfile::instance()->memory_profile()->process_memory_detail_str());
319
0
        LOG_LONG_STRING(INFO, log_str);
320
0
        std::string dot = HeapProfiler::instance()->dump_heap_profile_to_dot();
321
0
        if (!dot.empty()) {
322
0
            dot += "\n-------------------------------------------------------\n";
323
0
            dot += "Copy the text after `digraph` in the above output to "
324
0
                   "http://www.webgraphviz.com to generate a dot graph.\n"
325
0
                   "after start heap profiler, if there is no operation, will print `No nodes "
326
0
                   "to "
327
0
                   "print`."
328
0
                   "If there are many errors: `addr2line: Dwarf Error`,"
329
0
                   "or other FAQ, reference doc: "
330
0
                   "https://doris.apache.org/community/developer-guide/debug-tool/#4-qa\n";
331
0
            auto nest_log_str =
332
0
                    fmt::format("Query {}, dump heap profile to dot: {}", print_id(_query_id), dot);
333
0
            LOG_LONG_STRING(INFO, nest_log_str);
334
0
        }
335
0
    }
336
337
4.71k
    set_ready_to_execute(new_status);
338
4.71k
    cancel_all_pipeline_context(new_status, fragment_id);
339
4.71k
}
340
341
23
void QueryContext::set_load_error_url(std::string error_url) {
342
23
    std::lock_guard<std::mutex> lock(_error_url_lock);
343
23
    _load_error_url = error_url;
344
23
}
345
346
35.0k
std::string QueryContext::get_load_error_url() {
347
35.0k
    std::lock_guard<std::mutex> lock(_error_url_lock);
348
35.0k
    return _load_error_url;
349
35.0k
}
350
351
23
void QueryContext::set_first_error_msg(std::string error_msg) {
352
23
    std::lock_guard<std::mutex> lock(_error_url_lock);
353
23
    _first_error_msg = error_msg;
354
23
}
355
356
35.0k
std::string QueryContext::get_first_error_msg() {
357
35.0k
    std::lock_guard<std::mutex> lock(_error_url_lock);
358
35.0k
    return _first_error_msg;
359
35.0k
}
360
361
4.69k
void QueryContext::cancel_all_pipeline_context(const Status& reason, int fragment_id) {
362
4.69k
    std::vector<std::weak_ptr<PipelineFragmentContext>> ctx_to_cancel;
363
4.69k
    {
364
4.69k
        std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
365
6.03k
        for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) {
366
6.03k
            if (fragment_id == f_id) {
367
1.31k
                continue;
368
1.31k
            }
369
4.72k
            ctx_to_cancel.push_back(f_context);
370
4.72k
        }
371
4.69k
    }
372
4.72k
    for (auto& f_context : ctx_to_cancel) {
373
4.72k
        if (auto pipeline_ctx = f_context.lock()) {
374
413
            pipeline_ctx->cancel(reason);
375
413
        }
376
4.72k
    }
377
4.69k
}
378
379
0
std::string QueryContext::print_all_pipeline_context() {
380
0
    std::vector<std::weak_ptr<PipelineFragmentContext>> ctx_to_print;
381
0
    fmt::memory_buffer debug_string_buffer;
382
0
    size_t i = 0;
383
0
    {
384
0
        fmt::format_to(debug_string_buffer, "{} pipeline fragment contexts in query {}. \n",
385
0
                       _fragment_id_to_pipeline_ctx.size(), print_id(_query_id));
386
387
0
        {
388
0
            std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
389
0
            for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) {
390
0
                ctx_to_print.push_back(f_context);
391
0
            }
392
0
        }
393
0
        for (auto& f_context : ctx_to_print) {
394
0
            if (auto pipeline_ctx = f_context.lock()) {
395
0
                auto elapsed = pipeline_ctx->elapsed_time() / 1000000000.0;
396
0
                fmt::format_to(debug_string_buffer,
397
0
                               "No.{} (elapse_second={}s, fragment_id={}) : {}\n", i, elapsed,
398
0
                               pipeline_ctx->get_fragment_id(), pipeline_ctx->debug_string());
399
0
                i++;
400
0
            }
401
0
        }
402
0
    }
403
0
    return fmt::to_string(debug_string_buffer);
404
0
}
405
406
void QueryContext::set_pipeline_context(const int fragment_id,
407
318k
                                        std::shared_ptr<PipelineFragmentContext> pip_ctx) {
408
318k
    std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
409
    // Use insert_or_assign instead of insert to support overwriting old entries
410
    // when recursive CTE recreates PipelineFragmentContext between rounds.
411
318k
    _fragment_id_to_pipeline_ctx.insert_or_assign(fragment_id, pip_ctx);
412
318k
}
413
414
4.60M
doris::TaskScheduler* QueryContext::get_pipe_exec_scheduler() {
415
4.60M
    if (!_task_scheduler) {
416
0
        throw Exception(Status::InternalError("task_scheduler is null"));
417
0
    }
418
4.60M
    return _task_scheduler;
419
4.60M
}
420
421
209k
Status QueryContext::set_workload_group(WorkloadGroupPtr& wg) {
422
209k
    _resource_ctx->set_workload_group(wg);
423
    // Should add query first, the workload group will not be deleted,
424
    // then visit workload group's resource
425
    // see task_group_manager::delete_workload_group_by_ids
426
209k
    RETURN_IF_ERROR(workload_group()->add_resource_ctx(_query_id, _resource_ctx));
427
428
209k
    workload_group()->get_query_scheduler(&_task_scheduler, &_scan_task_scheduler,
429
209k
                                          &_remote_scan_task_scheduler);
430
209k
    return Status::OK();
431
209k
}
432
433
void QueryContext::add_fragment_profile(
434
        int fragment_id, const std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profiles,
435
1.79k
        std::shared_ptr<TRuntimeProfileTree> load_channel_profile) {
436
1.79k
    if (pipeline_profiles.empty()) {
437
0
        std::string msg = fmt::format("Add pipeline profile failed, query {}, fragment {}",
438
0
                                      print_id(this->_query_id), fragment_id);
439
0
        LOG_ERROR(msg);
440
0
        DCHECK(false) << msg;
441
0
        return;
442
0
    }
443
444
1.79k
#ifndef NDEBUG
445
5.22k
    for (const auto& p : pipeline_profiles) {
446
5.22k
        DCHECK(p != nullptr) << fmt::format("Add pipeline profile failed, query {}, fragment {}",
447
0
                                            print_id(this->_query_id), fragment_id);
448
5.22k
    }
449
1.79k
#endif
450
451
1.79k
    std::lock_guard<std::mutex> l(_profile_mutex);
452
1.79k
    VLOG_ROW << fmt::format(
453
0
            "Query add fragment profile, query {}, fragment {}, pipeline profile count {} ",
454
0
            print_id(this->_query_id), fragment_id, pipeline_profiles.size());
455
456
1.79k
    _profile_map.insert(std::make_pair(fragment_id, pipeline_profiles));
457
458
1.79k
    if (load_channel_profile != nullptr) {
459
1.79k
        _load_channel_profile_map.insert(std::make_pair(fragment_id, load_channel_profile));
460
1.79k
    }
461
1.79k
}
462
463
919
void QueryContext::_report_query_profile() {
464
919
    std::lock_guard<std::mutex> lg(_profile_mutex);
465
466
1.73k
    for (auto& [fragment_id, fragment_profile] : _profile_map) {
467
1.73k
        std::shared_ptr<TRuntimeProfileTree> load_channel_profile = nullptr;
468
469
1.73k
        if (_load_channel_profile_map.contains(fragment_id)) {
470
1.73k
            load_channel_profile = _load_channel_profile_map[fragment_id];
471
1.73k
        }
472
473
1.73k
        ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_fragment_profile(
474
1.73k
                _query_id, this->coord_addr, fragment_id, fragment_profile, load_channel_profile);
475
1.73k
    }
476
477
919
    ExecEnv::GetInstance()->runtime_query_statistics_mgr()->trigger_profile_reporting();
478
919
}
479
480
std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
481
0
QueryContext::_collect_realtime_query_profile() {
482
0
    std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> res;
483
0
    std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
484
0
    for (const auto& [fragment_id, fragment_ctx_wptr] : _fragment_id_to_pipeline_ctx) {
485
0
        if (auto fragment_ctx = fragment_ctx_wptr.lock()) {
486
0
            if (fragment_ctx == nullptr) {
487
0
                std::string msg =
488
0
                        fmt::format("PipelineFragmentContext is nullptr, query {} fragment_id: {}",
489
0
                                    print_id(_query_id), fragment_id);
490
0
                LOG_ERROR(msg);
491
0
                DCHECK(false) << msg;
492
0
                continue;
493
0
            }
494
495
0
            auto profile = fragment_ctx->collect_realtime_profile();
496
497
0
            if (profile.empty()) {
498
0
                std::string err_msg = fmt::format(
499
0
                        "Get nothing when collecting profile, query {}, fragment_id: {}",
500
0
                        print_id(_query_id), fragment_id);
501
0
                LOG_ERROR(err_msg);
502
0
                DCHECK(false) << err_msg;
503
0
                continue;
504
0
            }
505
506
0
            res.insert(std::make_pair(fragment_id, profile));
507
0
        }
508
0
    }
509
510
0
    return res;
511
0
}
512
513
0
TReportExecStatusParams QueryContext::get_realtime_exec_status() {
514
0
    TReportExecStatusParams exec_status;
515
516
0
    auto realtime_query_profile = _collect_realtime_query_profile();
517
0
    std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profiles;
518
519
0
    for (auto load_channel_profile : _load_channel_profile_map) {
520
0
        if (load_channel_profile.second != nullptr) {
521
0
            load_channel_profiles.push_back(load_channel_profile.second);
522
0
        }
523
0
    }
524
525
0
    exec_status = RuntimeQueryStatisticsMgr::create_report_exec_status_params(
526
0
            this->_query_id, std::move(realtime_query_profile), std::move(load_channel_profiles),
527
0
            /*is_done=*/false);
528
529
0
    return exec_status;
530
0
}
531
532
Status QueryContext::send_block_to_cte_scan(
533
        const TUniqueId& instance_id, int node_id,
534
3.76k
        const google::protobuf::RepeatedPtrField<doris::PBlock>& pblocks, bool eos) {
535
3.76k
    std::unique_lock<std::mutex> l(_cte_scan_lock);
536
3.76k
    auto it = _cte_scan.find(std::make_pair(instance_id, node_id));
537
3.76k
    if (it == _cte_scan.end()) {
538
0
        return Status::InternalError("RecCTEScan not found for instance {}, node {}",
539
0
                                     print_id(instance_id), node_id);
540
0
    }
541
3.76k
    for (const auto& pblock : pblocks) {
542
2.55k
        RETURN_IF_ERROR(it->second->add_block(pblock));
543
2.55k
    }
544
3.76k
    if (eos) {
545
1.95k
        it->second->set_ready();
546
1.95k
    }
547
3.76k
    return Status::OK();
548
3.76k
}
549
550
void QueryContext::registe_cte_scan(const TUniqueId& instance_id, int node_id,
551
1.95k
                                    RecCTEScanLocalState* scan) {
552
1.95k
    std::unique_lock<std::mutex> l(_cte_scan_lock);
553
1.95k
    auto key = std::make_pair(instance_id, node_id);
554
1.95k
    DCHECK(!_cte_scan.contains(key)) << "Duplicate registe cte scan for instance "
555
0
                                     << print_id(instance_id) << ", node " << node_id;
556
1.95k
    _cte_scan.emplace(key, scan);
557
1.95k
}
558
559
1.95k
void QueryContext::deregiste_cte_scan(const TUniqueId& instance_id, int node_id) {
560
1.95k
    std::lock_guard<std::mutex> l(_cte_scan_lock);
561
1.95k
    auto key = std::make_pair(instance_id, node_id);
562
18.4E
    DCHECK(_cte_scan.contains(key)) << "Duplicate deregiste cte scan for instance "
563
18.4E
                                    << print_id(instance_id) << ", node " << node_id;
564
1.95k
    _cte_scan.erase(key);
565
1.95k
}
566
567
1.82k
Status QueryContext::reset_global_rf(const google::protobuf::RepeatedField<int32_t>& filter_ids) {
568
1.82k
    if (_merge_controller_handler) {
569
1.82k
        return _merge_controller_handler->reset_global_rf(this, filter_ids);
570
1.82k
    }
571
0
    return Status::OK();
572
1.82k
}
573
574
} // namespace doris