Coverage Report

Created: 2025-04-12 13:58

/root/doris/be/src/runtime/query_context.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/query_context.h"
19
20
#include <fmt/core.h>
21
#include <gen_cpp/FrontendService_types.h>
22
#include <gen_cpp/RuntimeProfile_types.h>
23
#include <gen_cpp/Types_types.h>
24
#include <glog/logging.h>
25
26
#include <algorithm>
27
#include <exception>
28
#include <memory>
29
#include <mutex>
30
#include <utility>
31
#include <vector>
32
33
#include "common/logging.h"
34
#include "common/status.h"
35
#include "olap/olap_common.h"
36
#include "pipeline/dependency.h"
37
#include "pipeline/pipeline_fragment_context.h"
38
#include "runtime/exec_env.h"
39
#include "runtime/fragment_mgr.h"
40
#include "runtime/memory/heap_profiler.h"
41
#include "runtime/runtime_query_statistics_mgr.h"
42
#include "runtime/runtime_state.h"
43
#include "runtime/thread_context.h"
44
#include "runtime/workload_group/workload_group_manager.h"
45
#include "runtime_filter/runtime_filter_definitions.h"
46
#include "util/mem_info.h"
47
#include "util/uid_util.h"
48
#include "vec/spill/spill_stream_manager.h"
49
50
namespace doris {
51
52
class DelayReleaseToken : public Runnable {
53
    ENABLE_FACTORY_CREATOR(DelayReleaseToken);
54
55
public:
56
0
    DelayReleaseToken(std::unique_ptr<ThreadPoolToken>&& token) { token_ = std::move(token); }
57
0
    ~DelayReleaseToken() override = default;
58
0
    void run() override {}
59
    std::unique_ptr<ThreadPoolToken> token_;
60
};
61
62
0
const std::string toString(QuerySource queryType) {
63
0
    switch (queryType) {
64
0
    case QuerySource::INTERNAL_FRONTEND:
65
0
        return "INTERNAL_FRONTEND";
66
0
    case QuerySource::STREAM_LOAD:
67
0
        return "STREAM_LOAD";
68
0
    case QuerySource::GROUP_COMMIT_LOAD:
69
0
        return "EXTERNAL_QUERY";
70
0
    case QuerySource::ROUTINE_LOAD:
71
0
        return "ROUTINE_LOAD";
72
0
    case QuerySource::EXTERNAL_CONNECTOR:
73
0
        return "EXTERNAL_CONNECTOR";
74
0
    default:
75
0
        return "UNKNOWN";
76
0
    }
77
0
}
78
79
59
std::unique_ptr<TaskController> QueryContext::QueryTaskController::create(QueryContext* query_ctx) {
80
59
    return QueryContext::QueryTaskController::create_unique(query_ctx->shared_from_this());
81
59
}
82
83
0
bool QueryContext::QueryTaskController::is_cancelled() const {
84
0
    auto query_ctx = query_ctx_.lock();
85
0
    if (query_ctx == nullptr) {
86
0
        return true;
87
0
    }
88
0
    return query_ctx->is_cancelled();
89
0
}
90
91
0
Status QueryContext::QueryTaskController::cancel(const Status& reason, int fragment_id) {
92
0
    auto query_ctx = query_ctx_.lock();
93
0
    if (query_ctx == nullptr) {
94
0
        return Status::InternalError("QueryContext is destroyed");
95
0
    }
96
0
    query_ctx->cancel(reason, fragment_id);
97
0
    return Status::OK();
98
0
}
99
100
419
std::unique_ptr<MemoryContext> QueryContext::QueryMemoryContext::create() {
101
419
    return QueryContext::QueryMemoryContext::create_unique();
102
419
}
103
104
std::shared_ptr<QueryContext> QueryContext::create(TUniqueId query_id, ExecEnv* exec_env,
105
                                                   const TQueryOptions& query_options,
106
                                                   TNetworkAddress coord_addr, bool is_nereids,
107
                                                   TNetworkAddress current_connect_fe,
108
59
                                                   QuerySource query_type) {
109
59
    auto ctx = QueryContext::create_shared(query_id, exec_env, query_options, coord_addr,
110
59
                                           is_nereids, current_connect_fe, query_type);
111
59
    ctx->init_query_task_controller();
112
59
    return ctx;
113
59
}
114
115
QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env,
116
                           const TQueryOptions& query_options, TNetworkAddress coord_addr,
117
                           bool is_nereids, TNetworkAddress current_connect_fe,
118
                           QuerySource query_source)
119
        : _timeout_second(-1),
120
          _query_id(std::move(query_id)),
121
          _exec_env(exec_env),
122
          _is_nereids(is_nereids),
123
          _query_options(query_options),
124
419
          _query_source(query_source) {
125
419
    _init_resource_context();
126
419
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker());
127
419
    _query_watcher.start();
128
419
    _execution_dependency =
129
419
            pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency", false);
130
419
    _memory_sufficient_dependency =
131
419
            pipeline::Dependency::create_unique(-1, -1, "MemorySufficientDependency", true);
132
133
419
    _runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(true);
134
135
419
    _timeout_second = query_options.execution_timeout;
136
137
419
    bool is_query_type_valid = query_options.query_type == TQueryType::SELECT ||
138
419
                               query_options.query_type == TQueryType::LOAD ||
139
419
                               query_options.query_type == TQueryType::EXTERNAL;
140
419
    DCHECK_EQ(is_query_type_valid, true);
141
142
419
    this->coord_addr = coord_addr;
143
    // current_connect_fe is used for report query statistics
144
419
    this->current_connect_fe = current_connect_fe;
145
    // external query has no current_connect_fe
146
419
    if (query_options.query_type != TQueryType::EXTERNAL) {
147
124
        bool is_report_fe_addr_valid =
148
124
                !this->current_connect_fe.hostname.empty() && this->current_connect_fe.port != 0;
149
124
        DCHECK_EQ(is_report_fe_addr_valid, true);
150
124
    }
151
419
    clock_gettime(CLOCK_MONOTONIC, &this->_query_arrival_timestamp);
152
419
    DorisMetrics::instance()->query_ctx_cnt->increment(1);
153
419
}
154
155
419
void QueryContext::_init_query_mem_tracker() {
156
419
    bool has_query_mem_limit = _query_options.__isset.mem_limit && (_query_options.mem_limit > 0);
157
419
    int64_t bytes_limit = has_query_mem_limit ? _query_options.mem_limit : -1;
158
419
    if (bytes_limit > MemInfo::mem_limit() || bytes_limit == -1) {
159
0
        VLOG_NOTICE << "Query memory limit " << PrettyPrinter::print(bytes_limit, TUnit::BYTES)
160
0
                    << " exceeds process memory limit of "
161
0
                    << PrettyPrinter::print(MemInfo::mem_limit(), TUnit::BYTES)
162
0
                    << " OR is -1. Using process memory limit instead.";
163
0
        bytes_limit = MemInfo::mem_limit();
164
0
    }
165
    // If the query is a pure load task(streamload, routine load, group commit), then it should not use
166
    // memlimit per query to limit their memory usage.
167
419
    if (is_pure_load_task()) {
168
295
        bytes_limit = MemInfo::mem_limit();
169
295
    }
170
419
    std::shared_ptr<MemTrackerLimiter> query_mem_tracker;
171
419
    if (_query_options.query_type == TQueryType::SELECT) {
172
124
        query_mem_tracker = MemTrackerLimiter::create_shared(
173
124
                MemTrackerLimiter::Type::QUERY, fmt::format("Query#Id={}", print_id(_query_id)),
174
124
                bytes_limit);
175
295
    } else if (_query_options.query_type == TQueryType::LOAD) {
176
0
        query_mem_tracker = MemTrackerLimiter::create_shared(
177
0
                MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}", print_id(_query_id)),
178
0
                bytes_limit);
179
295
    } else if (_query_options.query_type == TQueryType::EXTERNAL) { // spark/flink/etc..
180
295
        query_mem_tracker = MemTrackerLimiter::create_shared(
181
295
                MemTrackerLimiter::Type::QUERY, fmt::format("External#Id={}", print_id(_query_id)),
182
295
                bytes_limit);
183
295
    } else {
184
0
        LOG(FATAL) << "__builtin_unreachable";
185
0
        __builtin_unreachable();
186
0
    }
187
419
    if (_query_options.__isset.is_report_success && _query_options.is_report_success) {
188
0
        query_mem_tracker->enable_print_log_usage();
189
0
    }
190
191
419
    query_mem_tracker->set_enable_reserve_memory(_query_options.__isset.enable_reserve_memory &&
192
419
                                                 _query_options.enable_reserve_memory);
193
419
    _user_set_mem_limit = bytes_limit;
194
419
    _adjusted_mem_limit = bytes_limit;
195
196
419
    _resource_ctx->memory_context()->set_mem_tracker(query_mem_tracker);
197
419
}
198
199
419
void QueryContext::_init_resource_context() {
200
419
    _resource_ctx = ResourceContext::create_shared();
201
419
    _resource_ctx->set_memory_context(QueryContext::QueryMemoryContext::create());
202
419
    _init_query_mem_tracker();
203
419
}
204
205
59
void QueryContext::init_query_task_controller() {
206
59
    _resource_ctx->set_task_controller(QueryContext::QueryTaskController::create(this));
207
59
    _resource_ctx->task_controller()->set_task_id(_query_id);
208
59
    _resource_ctx->task_controller()->set_fe_addr(current_connect_fe);
209
59
    _resource_ctx->task_controller()->set_query_type(_query_options.query_type);
210
#ifndef BE_TEST
211
    _exec_env->runtime_query_statistics_mgr()->register_resource_context(print_id(_query_id),
212
                                                                         _resource_ctx);
213
#endif
214
59
}
215
216
419
QueryContext::~QueryContext() {
217
419
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker());
218
    // query mem tracker consumption is equal to 0, it means that after QueryContext is created,
219
    // it is found that query already exists in _query_ctx_map, and query mem tracker is not used.
220
    // query mem tracker consumption is not equal to 0 after use, because there is memory consumed
221
    // on query mem tracker, released on other trackers.
222
419
    std::string mem_tracker_msg;
223
419
    if (query_mem_tracker()->peak_consumption() != 0) {
224
6
        mem_tracker_msg = fmt::format(
225
6
                "deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, "
226
6
                "PeakUsed={}",
227
6
                print_id(_query_id), MemCounter::print_bytes(query_mem_tracker()->limit()),
228
6
                MemCounter::print_bytes(query_mem_tracker()->consumption()),
229
6
                MemCounter::print_bytes(query_mem_tracker()->peak_consumption()));
230
6
    }
231
419
    [[maybe_unused]] uint64_t group_id = 0;
232
419
    if (workload_group()) {
233
10
        group_id = workload_group()->id(); // before remove
234
10
    }
235
236
419
    _resource_ctx->task_controller()->finish();
237
238
419
    if (enable_profile()) {
239
0
        _report_query_profile();
240
0
    }
241
242
    // Not release the the thread token in query context's dector method, because the query
243
    // conext may be dectored in the thread token it self. It is very dangerous and may core.
244
    // And also thread token need shutdown, it may take some time, may cause the thread that
245
    // release the token hang, the thread maybe a pipeline task scheduler thread.
246
419
    if (_thread_token) {
247
0
        Status submit_st = ExecEnv::GetInstance()->lazy_release_obj_pool()->submit(
248
0
                DelayReleaseToken::create_shared(std::move(_thread_token)));
249
0
        if (!submit_st.ok()) {
250
0
            LOG(WARNING) << "Failed to release query context thread token, query_id "
251
0
                         << print_id(_query_id) << ", error status " << submit_st;
252
0
        }
253
0
    }
254
#ifndef BE_TEST
255
    if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) [[unlikely]] {
256
        try {
257
            ExecEnv::GetInstance()->pipeline_tracer_context()->end_query(_query_id, group_id);
258
        } catch (std::exception& e) {
259
            LOG(WARNING) << "Dump trace log failed bacause " << e.what();
260
        }
261
    }
262
#endif
263
419
    _runtime_filter_mgr.reset();
264
419
    _execution_dependency.reset();
265
419
    _runtime_predicates.clear();
266
419
    file_scan_range_params_map.clear();
267
419
    obj_pool.clear();
268
419
    _merge_controller_handler.reset();
269
270
#ifndef BE_TEST
271
    _exec_env->spill_stream_mgr()->async_cleanup_query(_query_id);
272
#endif
273
419
    DorisMetrics::instance()->query_ctx_cnt->increment(-1);
274
    // the only one msg shows query's end. any other msg should append to it if need.
275
419
    LOG_INFO("Query {} deconstructed, mem_tracker: {}", print_id(this->_query_id), mem_tracker_msg);
276
419
}
277
278
1
void QueryContext::set_ready_to_execute(Status reason) {
279
1
    set_execution_dependency_ready();
280
1
    _exec_status.update(reason);
281
1
    if (query_mem_tracker() && !reason.ok()) {
282
1
        query_mem_tracker()->set_is_query_cancelled(!reason.ok());
283
1
    }
284
1
}
285
286
0
void QueryContext::set_ready_to_execute_only() {
287
0
    set_execution_dependency_ready();
288
0
}
289
290
1
void QueryContext::set_execution_dependency_ready() {
291
1
    _execution_dependency->set_ready();
292
1
}
293
294
18
void QueryContext::set_memory_sufficient(bool sufficient) {
295
18
    if (sufficient) {
296
8
        {
297
8
            _memory_sufficient_dependency->set_ready();
298
8
            std::lock_guard l(_paused_mutex);
299
8
            _paused_reason = Status::OK();
300
8
        }
301
10
    } else {
302
10
        _memory_sufficient_dependency->block();
303
10
        ++_paused_count;
304
10
    }
305
18
}
306
307
1
void QueryContext::cancel(Status new_status, int fragment_id) {
308
1
    if (!_exec_status.update(new_status)) {
309
0
        return;
310
0
    }
311
    // Tasks should be always runnable.
312
1
    _execution_dependency->set_always_ready();
313
1
    _memory_sufficient_dependency->set_always_ready();
314
1
    if ((new_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>() ||
315
1
         new_status.is<ErrorCode::MEM_ALLOC_FAILED>()) &&
316
1
        _query_options.__isset.dump_heap_profile_when_mem_limit_exceeded &&
317
1
        _query_options.dump_heap_profile_when_mem_limit_exceeded) {
318
        // if query is cancelled because of query mem limit exceeded, dump heap profile
319
        // at the time of cancellation can get the most accurate memory usage for problem analysis
320
0
        auto wg = workload_group();
321
0
        auto log_str = fmt::format(
322
0
                "Query {} canceled because of memory limit exceeded, dumping memory "
323
0
                "detail profiles. wg: {}. {}",
324
0
                print_id(_query_id), wg ? wg->debug_string() : "null",
325
0
                doris::ProcessProfile::instance()->memory_profile()->process_memory_detail_str());
326
0
        LOG_LONG_STRING(INFO, log_str);
327
0
        std::string dot = HeapProfiler::instance()->dump_heap_profile_to_dot();
328
0
        if (!dot.empty()) {
329
0
            dot += "\n-------------------------------------------------------\n";
330
0
            dot += "Copy the text after `digraph` in the above output to "
331
0
                   "http://www.webgraphviz.com to generate a dot graph.\n"
332
0
                   "after start heap profiler, if there is no operation, will print `No nodes "
333
0
                   "to "
334
0
                   "print`."
335
0
                   "If there are many errors: `addr2line: Dwarf Error`,"
336
0
                   "or other FAQ, reference doc: "
337
0
                   "https://doris.apache.org/community/developer-guide/debug-tool/#4-qa\n";
338
0
            auto log_str =
339
0
                    fmt::format("Query {}, dump heap profile to dot: {}", print_id(_query_id), dot);
340
0
            LOG_LONG_STRING(INFO, log_str);
341
0
        }
342
0
    }
343
344
1
    set_ready_to_execute(new_status);
345
1
    cancel_all_pipeline_context(new_status, fragment_id);
346
1
}
347
348
0
void QueryContext::set_load_error_url(std::string error_url) {
349
0
    std::lock_guard<std::mutex> lock(_error_url_lock);
350
0
    _load_error_url = error_url;
351
0
}
352
353
0
std::string QueryContext::get_load_error_url() {
354
0
    std::lock_guard<std::mutex> lock(_error_url_lock);
355
0
    return _load_error_url;
356
0
}
357
358
1
void QueryContext::cancel_all_pipeline_context(const Status& reason, int fragment_id) {
359
1
    std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> ctx_to_cancel;
360
1
    {
361
1
        std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
362
1
        for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) {
363
0
            if (fragment_id == f_id) {
364
0
                continue;
365
0
            }
366
0
            ctx_to_cancel.push_back(f_context);
367
0
        }
368
1
    }
369
1
    for (auto& f_context : ctx_to_cancel) {
370
0
        if (auto pipeline_ctx = f_context.lock()) {
371
0
            pipeline_ctx->cancel(reason);
372
0
        }
373
0
    }
374
1
}
375
376
0
std::string QueryContext::print_all_pipeline_context() {
377
0
    std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> ctx_to_print;
378
0
    fmt::memory_buffer debug_string_buffer;
379
0
    size_t i = 0;
380
0
    {
381
0
        fmt::format_to(debug_string_buffer, "{} pipeline fragment contexts in query {}. \n",
382
0
                       _fragment_id_to_pipeline_ctx.size(), print_id(_query_id));
383
384
0
        {
385
0
            std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
386
0
            for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) {
387
0
                ctx_to_print.push_back(f_context);
388
0
            }
389
0
        }
390
0
        for (auto& f_context : ctx_to_print) {
391
0
            if (auto pipeline_ctx = f_context.lock()) {
392
0
                auto elapsed = pipeline_ctx->elapsed_time() / 1000000000.0;
393
0
                fmt::format_to(debug_string_buffer,
394
0
                               "No.{} (elapse_second={}s, fragment_id={}) : {}\n", i, elapsed,
395
0
                               pipeline_ctx->get_fragment_id(), pipeline_ctx->debug_string());
396
0
                i++;
397
0
            }
398
0
        }
399
0
    }
400
0
    return fmt::to_string(debug_string_buffer);
401
0
}
402
403
void QueryContext::set_pipeline_context(
404
0
        const int fragment_id, std::shared_ptr<pipeline::PipelineFragmentContext> pip_ctx) {
405
0
    std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
406
0
    _fragment_id_to_pipeline_ctx.insert({fragment_id, pip_ctx});
407
0
}
408
409
0
doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() {
410
0
    if (workload_group()) {
411
0
        if (_task_scheduler) {
412
0
            return _task_scheduler;
413
0
        }
414
0
    }
415
0
    return _exec_env->pipeline_task_scheduler();
416
0
}
417
418
10
void QueryContext::set_workload_group(WorkloadGroupPtr& wg) {
419
10
    _resource_ctx->set_workload_group(wg);
420
    // Should add query first, then the workload group will not be deleted.
421
    // see task_group_manager::delete_workload_group_by_ids
422
10
    workload_group()->add_mem_tracker_limiter(query_mem_tracker());
423
10
    workload_group()->get_query_scheduler(&_task_scheduler, &_scan_task_scheduler,
424
10
                                          &_remote_scan_task_scheduler);
425
10
}
426
427
void QueryContext::add_fragment_profile(
428
        int fragment_id, const std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profiles,
429
0
        std::shared_ptr<TRuntimeProfileTree> load_channel_profile) {
430
0
    if (pipeline_profiles.empty()) {
431
0
        std::string msg = fmt::format("Add pipeline profile failed, query {}, fragment {}",
432
0
                                      print_id(this->_query_id), fragment_id);
433
0
        LOG_ERROR(msg);
434
0
        DCHECK(false) << msg;
435
0
        return;
436
0
    }
437
438
0
#ifndef NDEBUG
439
0
    for (const auto& p : pipeline_profiles) {
440
0
        DCHECK(p != nullptr) << fmt::format("Add pipeline profile failed, query {}, fragment {}",
441
0
                                            print_id(this->_query_id), fragment_id);
442
0
    }
443
0
#endif
444
445
0
    std::lock_guard<std::mutex> l(_profile_mutex);
446
0
    VLOG_ROW << fmt::format(
447
0
            "Query add fragment profile, query {}, fragment {}, pipeline profile count {} ",
448
0
            print_id(this->_query_id), fragment_id, pipeline_profiles.size());
449
450
0
    _profile_map.insert(std::make_pair(fragment_id, pipeline_profiles));
451
452
0
    if (load_channel_profile != nullptr) {
453
0
        _load_channel_profile_map.insert(std::make_pair(fragment_id, load_channel_profile));
454
0
    }
455
0
}
456
457
0
void QueryContext::_report_query_profile() {
458
0
    std::lock_guard<std::mutex> lg(_profile_mutex);
459
460
0
    for (auto& [fragment_id, fragment_profile] : _profile_map) {
461
0
        std::shared_ptr<TRuntimeProfileTree> load_channel_profile = nullptr;
462
463
0
        if (_load_channel_profile_map.contains(fragment_id)) {
464
0
            load_channel_profile = _load_channel_profile_map[fragment_id];
465
0
        }
466
467
0
        ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_fragment_profile(
468
0
                _query_id, this->coord_addr, fragment_id, fragment_profile, load_channel_profile);
469
0
    }
470
471
0
    ExecEnv::GetInstance()->runtime_query_statistics_mgr()->trigger_report_profile();
472
0
}
473
474
void QueryContext::get_revocable_info(size_t* revocable_size, size_t* memory_usage,
475
84
                                      bool* has_running_task) const {
476
84
    *revocable_size = 0;
477
84
    for (auto&& [fragment_id, fragment_wptr] : _fragment_id_to_pipeline_ctx) {
478
0
        auto fragment_ctx = fragment_wptr.lock();
479
0
        if (!fragment_ctx) {
480
0
            continue;
481
0
        }
482
483
0
        *revocable_size += fragment_ctx->get_revocable_size(has_running_task);
484
485
        // Should wait for all tasks are not running before revoking memory.
486
0
        if (*has_running_task) {
487
0
            break;
488
0
        }
489
0
    }
490
491
84
    *memory_usage = query_mem_tracker()->consumption();
492
84
}
493
494
0
size_t QueryContext::get_revocable_size() const {
495
0
    size_t revocable_size = 0;
496
0
    for (auto&& [fragment_id, fragment_wptr] : _fragment_id_to_pipeline_ctx) {
497
0
        auto fragment_ctx = fragment_wptr.lock();
498
0
        if (!fragment_ctx) {
499
0
            continue;
500
0
        }
501
502
0
        bool has_running_task = false;
503
0
        revocable_size += fragment_ctx->get_revocable_size(&has_running_task);
504
505
        // Should wait for all tasks are not running before revoking memory.
506
0
        if (has_running_task) {
507
0
            return 0;
508
0
        }
509
0
    }
510
0
    return revocable_size;
511
0
}
512
513
0
Status QueryContext::revoke_memory() {
514
0
    std::vector<std::pair<size_t, pipeline::PipelineTask*>> tasks;
515
0
    std::vector<std::shared_ptr<pipeline::PipelineFragmentContext>> fragments;
516
0
    for (auto&& [fragment_id, fragment_wptr] : _fragment_id_to_pipeline_ctx) {
517
0
        auto fragment_ctx = fragment_wptr.lock();
518
0
        if (!fragment_ctx) {
519
0
            continue;
520
0
        }
521
522
0
        auto tasks_of_fragment = fragment_ctx->get_revocable_tasks();
523
0
        for (auto* task : tasks_of_fragment) {
524
0
            tasks.emplace_back(task->get_revocable_size(), task);
525
0
        }
526
0
        fragments.emplace_back(std::move(fragment_ctx));
527
0
    }
528
529
0
    std::sort(tasks.begin(), tasks.end(), [](auto&& l, auto&& r) { return l.first > r.first; });
530
531
    // Do not use memlimit, use current memory usage.
532
    // For example, if current limit is 1.6G, but current used is 1G, if reserve failed
533
    // should free 200MB memory, not 300MB
534
0
    const auto target_revoking_size = (int64_t)(query_mem_tracker()->consumption() * 0.2);
535
0
    size_t revoked_size = 0;
536
0
    size_t total_revokable_size = 0;
537
538
0
    std::vector<pipeline::PipelineTask*> chosen_tasks;
539
0
    for (auto&& [revocable_size, task] : tasks) {
540
        // Only revoke the largest task to ensure memory is used as much as possible
541
        // break;
542
0
        if (revoked_size < target_revoking_size) {
543
0
            chosen_tasks.emplace_back(task);
544
0
            revoked_size += revocable_size;
545
0
        }
546
0
        total_revokable_size += revocable_size;
547
0
    }
548
549
0
    std::weak_ptr<QueryContext> this_ctx = shared_from_this();
550
0
    auto spill_context = std::make_shared<pipeline::SpillContext>(
551
0
            chosen_tasks.size(), _query_id, [this_ctx](pipeline::SpillContext* context) {
552
0
                auto query_context = this_ctx.lock();
553
0
                if (!query_context) {
554
0
                    return;
555
0
                }
556
557
0
                LOG(INFO) << query_context->debug_string() << ", context: " << ((void*)context)
558
0
                          << " all spill tasks done, resume it.";
559
0
                query_context->set_memory_sufficient(true);
560
0
            });
561
562
0
    LOG(INFO) << fmt::format(
563
0
            "{}, spill context: {}, revokable mem: {}/{}, tasks count: {}/{}", this->debug_string(),
564
0
            ((void*)spill_context.get()), PrettyPrinter::print_bytes(revoked_size),
565
0
            PrettyPrinter::print_bytes(total_revokable_size), chosen_tasks.size(), tasks.size());
566
567
0
    for (auto* task : chosen_tasks) {
568
0
        RETURN_IF_ERROR(task->revoke_memory(spill_context));
569
0
    }
570
0
    return Status::OK();
571
0
}
572
573
14
void QueryContext::decrease_revoking_tasks_count() {
574
14
    _revoking_tasks_count.fetch_sub(1);
575
14
}
576
577
84
std::vector<pipeline::PipelineTask*> QueryContext::get_revocable_tasks() const {
578
84
    std::vector<pipeline::PipelineTask*> tasks;
579
84
    for (auto&& [fragment_id, fragment_wptr] : _fragment_id_to_pipeline_ctx) {
580
0
        auto fragment_ctx = fragment_wptr.lock();
581
0
        if (!fragment_ctx) {
582
0
            continue;
583
0
        }
584
0
        auto tasks_of_fragment = fragment_ctx->get_revocable_tasks();
585
0
        tasks.insert(tasks.end(), tasks_of_fragment.cbegin(), tasks_of_fragment.cend());
586
0
    }
587
84
    return tasks;
588
84
}
589
590
95
std::string QueryContext::debug_string() {
591
95
    std::lock_guard l(_paused_mutex);
592
95
    return fmt::format(
593
95
            "QueryId={}, Memory [Used={}, Limit={}, Peak={}], Spill[RunningSpillTaskCnt={}, "
594
95
            "TotalPausedPeriodSecs={}, LatestPausedReason={}]",
595
95
            print_id(_query_id),
596
95
            PrettyPrinter::print(query_mem_tracker()->consumption(), TUnit::BYTES),
597
95
            PrettyPrinter::print(query_mem_tracker()->limit(), TUnit::BYTES),
598
95
            PrettyPrinter::print(query_mem_tracker()->peak_consumption(), TUnit::BYTES),
599
95
            _revoking_tasks_count,
600
95
            _memory_sufficient_dependency->watcher_elapse_time() / NANOS_PER_SEC,
601
95
            _paused_reason.to_string());
602
95
}
603
604
std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
605
0
QueryContext::_collect_realtime_query_profile() const {
606
0
    std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> res;
607
608
0
    for (const auto& [fragment_id, fragment_ctx_wptr] : _fragment_id_to_pipeline_ctx) {
609
0
        if (auto fragment_ctx = fragment_ctx_wptr.lock()) {
610
0
            if (fragment_ctx == nullptr) {
611
0
                std::string msg =
612
0
                        fmt::format("PipelineFragmentContext is nullptr, query {} fragment_id: {}",
613
0
                                    print_id(_query_id), fragment_id);
614
0
                LOG_ERROR(msg);
615
0
                DCHECK(false) << msg;
616
0
                continue;
617
0
            }
618
619
0
            auto profile = fragment_ctx->collect_realtime_profile();
620
621
0
            if (profile.empty()) {
622
0
                std::string err_msg = fmt::format(
623
0
                        "Get nothing when collecting profile, query {}, fragment_id: {}",
624
0
                        print_id(_query_id), fragment_id);
625
0
                LOG_ERROR(err_msg);
626
0
                DCHECK(false) << err_msg;
627
0
                continue;
628
0
            }
629
630
0
            res.insert(std::make_pair(fragment_id, profile));
631
0
        }
632
0
    }
633
634
0
    return res;
635
0
}
636
637
0
TReportExecStatusParams QueryContext::get_realtime_exec_status() const {
638
0
    TReportExecStatusParams exec_status;
639
640
0
    auto realtime_query_profile = _collect_realtime_query_profile();
641
0
    std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profiles;
642
643
0
    for (auto load_channel_profile : _load_channel_profile_map) {
644
0
        if (load_channel_profile.second != nullptr) {
645
0
            load_channel_profiles.push_back(load_channel_profile.second);
646
0
        }
647
0
    }
648
649
0
    exec_status = RuntimeQueryStatisticsMgr::create_report_exec_status_params(
650
0
            this->_query_id, std::move(realtime_query_profile), std::move(load_channel_profiles),
651
0
            /*is_done=*/false);
652
653
0
    return exec_status;
654
0
}
655
656
} // namespace doris