Coverage Report

Created: 2025-04-16 14:10

/root/doris/be/src/runtime/query_context.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/query_context.h"
19
20
#include <fmt/core.h>
21
#include <gen_cpp/FrontendService_types.h>
22
#include <gen_cpp/RuntimeProfile_types.h>
23
#include <gen_cpp/Types_types.h>
24
#include <glog/logging.h>
25
26
#include <exception>
27
#include <memory>
28
#include <mutex>
29
#include <utility>
30
31
#include "common/logging.h"
32
#include "pipeline/dependency.h"
33
#include "pipeline/pipeline_fragment_context.h"
34
#include "runtime/exec_env.h"
35
#include "runtime/fragment_mgr.h"
36
#include "runtime/runtime_query_statistics_mgr.h"
37
#include "runtime/runtime_state.h"
38
#include "runtime/thread_context.h"
39
#include "runtime/workload_group/workload_group_manager.h"
40
#include "util/mem_info.h"
41
#include "util/uid_util.h"
42
#include "vec/spill/spill_stream_manager.h"
43
44
namespace doris {
45
46
class DelayReleaseToken : public Runnable {
47
    ENABLE_FACTORY_CREATOR(DelayReleaseToken);
48
49
public:
50
0
    DelayReleaseToken(std::unique_ptr<ThreadPoolToken>&& token) { token_ = std::move(token); }
51
0
    ~DelayReleaseToken() override = default;
52
0
    void run() override {}
53
    std::unique_ptr<ThreadPoolToken> token_;
54
};
55
56
0
const std::string toString(QuerySource queryType) {
57
0
    switch (queryType) {
58
0
    case QuerySource::INTERNAL_FRONTEND:
59
0
        return "INTERNAL_FRONTEND";
60
0
    case QuerySource::STREAM_LOAD:
61
0
        return "STREAM_LOAD";
62
0
    case QuerySource::GROUP_COMMIT_LOAD:
63
0
        return "EXTERNAL_QUERY";
64
0
    case QuerySource::ROUTINE_LOAD:
65
0
        return "ROUTINE_LOAD";
66
0
    case QuerySource::EXTERNAL_CONNECTOR:
67
0
        return "EXTERNAL_CONNECTOR";
68
0
    default:
69
0
        return "UNKNOWN";
70
0
    }
71
0
}
72
73
QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env,
74
                           const TQueryOptions& query_options, TNetworkAddress coord_addr,
75
                           bool is_pipeline, bool is_nereids, TNetworkAddress current_connect_fe,
76
                           QuerySource query_source)
77
        : _timeout_second(-1),
78
          _query_id(query_id),
79
          _exec_env(exec_env),
80
          _is_pipeline(is_pipeline),
81
          _is_nereids(is_nereids),
82
          _query_options(query_options),
83
0
          _query_source(query_source) {
84
0
    _init_query_mem_tracker();
85
0
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker);
86
0
    _query_watcher.start();
87
0
    _shared_hash_table_controller.reset(new vectorized::SharedHashTableController());
88
0
    _execution_dependency = pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency");
89
0
    _runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(
90
0
            TUniqueId(), RuntimeFilterParamsContext::create(this), query_mem_tracker, true);
91
92
0
    _timeout_second = query_options.execution_timeout;
93
94
0
    bool is_query_type_valid = query_options.query_type == TQueryType::SELECT ||
95
0
                               query_options.query_type == TQueryType::LOAD ||
96
0
                               query_options.query_type == TQueryType::EXTERNAL;
97
0
    DCHECK_EQ(is_query_type_valid, true);
98
99
0
    this->coord_addr = coord_addr;
100
    // current_connect_fe is used for report query statistics
101
0
    this->current_connect_fe = current_connect_fe;
102
    // external query has no current_connect_fe
103
0
    if (query_options.query_type != TQueryType::EXTERNAL) {
104
0
        bool is_report_fe_addr_valid =
105
0
                !this->current_connect_fe.hostname.empty() && this->current_connect_fe.port != 0;
106
0
        DCHECK_EQ(is_report_fe_addr_valid, true);
107
0
    }
108
0
    clock_gettime(CLOCK_MONOTONIC, &this->_query_arrival_timestamp);
109
0
    register_memory_statistics();
110
0
    register_cpu_statistics();
111
0
    DorisMetrics::instance()->query_ctx_cnt->increment(1);
112
0
}
113
114
0
void QueryContext::_init_query_mem_tracker() {
115
0
    bool has_query_mem_limit = _query_options.__isset.mem_limit && (_query_options.mem_limit > 0);
116
0
    int64_t _bytes_limit = has_query_mem_limit ? _query_options.mem_limit : -1;
117
0
    if (_bytes_limit > MemInfo::mem_limit()) {
118
0
        VLOG_NOTICE << "Query memory limit " << PrettyPrinter::print(_bytes_limit, TUnit::BYTES)
119
0
                    << " exceeds process memory limit of "
120
0
                    << PrettyPrinter::print(MemInfo::mem_limit(), TUnit::BYTES)
121
0
                    << ". Using process memory limit instead";
122
0
        _bytes_limit = MemInfo::mem_limit();
123
0
    }
124
0
    if (_query_options.query_type == TQueryType::SELECT) {
125
0
        query_mem_tracker = MemTrackerLimiter::create_shared(
126
0
                MemTrackerLimiter::Type::QUERY, fmt::format("Query#Id={}", print_id(_query_id)),
127
0
                _bytes_limit);
128
0
    } else if (_query_options.query_type == TQueryType::LOAD) {
129
0
        query_mem_tracker = MemTrackerLimiter::create_shared(
130
0
                MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}", print_id(_query_id)),
131
0
                _bytes_limit);
132
0
    } else { // EXTERNAL
133
0
        query_mem_tracker = MemTrackerLimiter::create_shared(
134
0
                MemTrackerLimiter::Type::LOAD, fmt::format("External#Id={}", print_id(_query_id)),
135
0
                _bytes_limit);
136
0
    }
137
0
    if (_query_options.__isset.is_report_success && _query_options.is_report_success) {
138
0
        query_mem_tracker->enable_print_log_usage();
139
0
    }
140
0
}
141
142
0
QueryContext::~QueryContext() {
143
0
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker);
144
    // query mem tracker consumption is equal to 0, it means that after QueryContext is created,
145
    // it is found that query already exists in _query_ctx_map, and query mem tracker is not used.
146
    // query mem tracker consumption is not equal to 0 after use, because there is memory consumed
147
    // on query mem tracker, released on other trackers.
148
0
    std::string mem_tracker_msg;
149
0
    if (query_mem_tracker->peak_consumption() != 0) {
150
0
        mem_tracker_msg = fmt::format(
151
0
                "deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, "
152
0
                "PeakUsed={}",
153
0
                print_id(_query_id), MemCounter::print_bytes(query_mem_tracker->limit()),
154
0
                MemCounter::print_bytes(query_mem_tracker->consumption()),
155
0
                MemCounter::print_bytes(query_mem_tracker->peak_consumption()));
156
0
    }
157
0
    uint64_t group_id = 0;
158
0
    if (_workload_group) {
159
0
        group_id = _workload_group->id(); // before remove
160
0
    }
161
162
0
    _exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id));
163
164
0
    if (enable_profile()) {
165
0
        _report_query_profile();
166
0
    }
167
168
    // Not release the the thread token in query context's dector method, because the query
169
    // conext may be dectored in the thread token it self. It is very dangerous and may core.
170
    // And also thread token need shutdown, it may take some time, may cause the thread that
171
    // release the token hang, the thread maybe a pipeline task scheduler thread.
172
0
    if (_thread_token) {
173
0
        Status submit_st = ExecEnv::GetInstance()->lazy_release_obj_pool()->submit(
174
0
                DelayReleaseToken::create_shared(std::move(_thread_token)));
175
0
        if (!submit_st.ok()) {
176
0
            LOG(WARNING) << "Failed to release query context thread token, query_id "
177
0
                         << print_id(_query_id) << ", error status " << submit_st;
178
0
        }
179
0
    }
180
181
    //TODO: check if pipeline and tracing both enabled
182
0
    if (_is_pipeline && ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) [[unlikely]] {
183
0
        try {
184
0
            ExecEnv::GetInstance()->pipeline_tracer_context()->end_query(_query_id, group_id);
185
0
        } catch (std::exception& e) {
186
0
            LOG(WARNING) << "Dump trace log failed bacause " << e.what();
187
0
        }
188
0
    }
189
0
    _runtime_filter_mgr.reset();
190
0
    _execution_dependency.reset();
191
0
    _shared_hash_table_controller.reset();
192
0
    _runtime_predicates.clear();
193
0
    file_scan_range_params_map.clear();
194
0
    obj_pool.clear();
195
0
    _merge_controller_handler.reset();
196
197
0
    _exec_env->spill_stream_mgr()->async_cleanup_query(_query_id);
198
0
    DorisMetrics::instance()->query_ctx_cnt->increment(-1);
199
    // the only one msg shows query's end. any other msg should append to it if need.
200
0
    LOG_INFO("Query {} deconstructed, mem_tracker: {}", print_id(this->_query_id), mem_tracker_msg);
201
0
}
202
203
0
void QueryContext::set_ready_to_execute(Status reason) {
204
0
    set_execution_dependency_ready();
205
0
    _exec_status.update(reason);
206
0
    if (query_mem_tracker && !reason.ok()) {
207
0
        query_mem_tracker->set_is_query_cancelled(!reason.ok());
208
0
    }
209
0
}
210
211
0
void QueryContext::set_ready_to_execute_only() {
212
0
    set_execution_dependency_ready();
213
0
}
214
215
0
void QueryContext::set_execution_dependency_ready() {
216
0
    _execution_dependency->set_ready();
217
0
}
218
219
0
void QueryContext::cancel(Status new_status, int fragment_id) {
220
0
    if (!_exec_status.update(new_status)) {
221
0
        return;
222
0
    }
223
224
0
    set_ready_to_execute(new_status);
225
0
    cancel_all_pipeline_context(new_status, fragment_id);
226
0
}
227
228
0
void QueryContext::set_load_error_url(std::string error_url) {
229
0
    std::lock_guard<std::mutex> lock(_error_url_lock);
230
0
    _load_error_url = error_url;
231
0
}
232
233
0
std::string QueryContext::get_load_error_url() {
234
0
    std::lock_guard<std::mutex> lock(_error_url_lock);
235
0
    return _load_error_url;
236
0
}
237
238
0
void QueryContext::cancel_all_pipeline_context(const Status& reason, int fragment_id) {
239
0
    std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> ctx_to_cancel;
240
0
    {
241
0
        std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
242
0
        for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) {
243
0
            if (fragment_id == f_id) {
244
0
                continue;
245
0
            }
246
0
            ctx_to_cancel.push_back(f_context);
247
0
        }
248
0
    }
249
0
    for (auto& f_context : ctx_to_cancel) {
250
0
        if (auto pipeline_ctx = f_context.lock()) {
251
0
            pipeline_ctx->cancel(reason);
252
0
        }
253
0
    }
254
0
}
255
256
0
std::string QueryContext::print_all_pipeline_context() {
257
0
    std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> ctx_to_print;
258
0
    fmt::memory_buffer debug_string_buffer;
259
0
    size_t i = 0;
260
0
    {
261
0
        fmt::format_to(debug_string_buffer, "{} pipeline fragment contexts in query {}. \n",
262
0
                       _fragment_id_to_pipeline_ctx.size(), print_id(_query_id));
263
264
0
        {
265
0
            std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
266
0
            for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) {
267
0
                ctx_to_print.push_back(f_context);
268
0
            }
269
0
        }
270
0
        for (auto& f_context : ctx_to_print) {
271
0
            if (auto pipeline_ctx = f_context.lock()) {
272
0
                auto elapsed = pipeline_ctx->elapsed_time() / 1000000000.0;
273
0
                fmt::format_to(debug_string_buffer,
274
0
                               "No.{} (elapse_second={}s, fragment_id={}) : {}\n", i, elapsed,
275
0
                               pipeline_ctx->get_fragment_id(), pipeline_ctx->debug_string());
276
0
                i++;
277
0
            }
278
0
        }
279
0
    }
280
0
    return fmt::to_string(debug_string_buffer);
281
0
}
282
283
void QueryContext::set_pipeline_context(
284
0
        const int fragment_id, std::shared_ptr<pipeline::PipelineFragmentContext> pip_ctx) {
285
0
    std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
286
0
    _fragment_id_to_pipeline_ctx.insert({fragment_id, pip_ctx});
287
0
}
288
289
0
void QueryContext::register_query_statistics(std::shared_ptr<QueryStatistics> qs) {
290
0
    _exec_env->runtime_query_statistics_mgr()->register_query_statistics(
291
0
            print_id(_query_id), qs, current_connect_fe, _query_options.query_type);
292
0
}
293
294
0
std::shared_ptr<QueryStatistics> QueryContext::get_query_statistics() {
295
0
    return _exec_env->runtime_query_statistics_mgr()->get_runtime_query_statistics(
296
0
            print_id(_query_id));
297
0
}
298
299
0
void QueryContext::register_memory_statistics() {
300
0
    if (query_mem_tracker) {
301
0
        std::shared_ptr<QueryStatistics> qs = query_mem_tracker->get_query_statistics();
302
0
        std::string query_id = print_id(_query_id);
303
0
        if (qs) {
304
0
            _exec_env->runtime_query_statistics_mgr()->register_query_statistics(
305
0
                    query_id, qs, current_connect_fe, _query_options.query_type);
306
0
        } else {
307
0
            LOG(INFO) << " query " << query_id << " get memory query statistics failed ";
308
0
        }
309
0
    }
310
0
}
311
312
0
void QueryContext::register_cpu_statistics() {
313
0
    if (!_cpu_statistics) {
314
0
        _cpu_statistics = std::make_shared<QueryStatistics>();
315
0
        _exec_env->runtime_query_statistics_mgr()->register_query_statistics(
316
0
                print_id(_query_id), _cpu_statistics, current_connect_fe,
317
0
                _query_options.query_type);
318
0
    }
319
0
}
320
321
0
doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() {
322
0
    if (_workload_group) {
323
0
        if (_task_scheduler) {
324
0
            return _task_scheduler;
325
0
        }
326
0
    }
327
0
    return _exec_env->pipeline_task_scheduler();
328
0
}
329
330
0
ThreadPool* QueryContext::get_memtable_flush_pool() {
331
0
    if (_workload_group) {
332
0
        return _memtable_flush_pool;
333
0
    } else {
334
0
        return nullptr;
335
0
    }
336
0
}
337
338
0
void QueryContext::set_workload_group(WorkloadGroupPtr& tg) {
339
0
    _workload_group = tg;
340
    // Should add query first, then the workload group will not be deleted.
341
    // see task_group_manager::delete_workload_group_by_ids
342
0
    _workload_group->add_mem_tracker_limiter(query_mem_tracker);
343
0
    _workload_group->get_query_scheduler(&_task_scheduler, &_scan_task_scheduler,
344
0
                                         &_memtable_flush_pool, &_remote_scan_task_scheduler);
345
0
}
346
347
void QueryContext::add_fragment_profile(
348
        int fragment_id, const std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profiles,
349
0
        std::shared_ptr<TRuntimeProfileTree> load_channel_profile) {
350
0
    if (pipeline_profiles.empty()) {
351
0
        std::string msg = fmt::format("Add pipeline profile failed, query {}, fragment {}",
352
0
                                      print_id(this->_query_id), fragment_id);
353
0
        LOG_ERROR(msg);
354
0
        DCHECK(false) << msg;
355
0
        return;
356
0
    }
357
358
0
#ifndef NDEBUG
359
0
    for (const auto& p : pipeline_profiles) {
360
0
        DCHECK(p != nullptr) << fmt::format("Add pipeline profile failed, query {}, fragment {}",
361
0
                                            print_id(this->_query_id), fragment_id);
362
0
    }
363
0
#endif
364
365
0
    std::lock_guard<std::mutex> l(_profile_mutex);
366
0
    VLOG_ROW << fmt::format(
367
0
            "Query add fragment profile, query {}, fragment {}, pipeline profile count {} ",
368
0
            print_id(this->_query_id), fragment_id, pipeline_profiles.size());
369
370
0
    _profile_map.insert(std::make_pair(fragment_id, pipeline_profiles));
371
372
0
    if (load_channel_profile != nullptr) {
373
0
        _load_channel_profile_map.insert(std::make_pair(fragment_id, load_channel_profile));
374
0
    }
375
0
}
376
377
0
void QueryContext::_report_query_profile() {
378
0
    std::lock_guard<std::mutex> lg(_profile_mutex);
379
380
0
    for (auto& [fragment_id, fragment_profile] : _profile_map) {
381
0
        std::shared_ptr<TRuntimeProfileTree> load_channel_profile = nullptr;
382
383
0
        if (_load_channel_profile_map.contains(fragment_id)) {
384
0
            load_channel_profile = _load_channel_profile_map[fragment_id];
385
0
        }
386
387
0
        ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_fragment_profile(
388
0
                _query_id, this->coord_addr, fragment_id, fragment_profile, load_channel_profile);
389
0
    }
390
391
0
    ExecEnv::GetInstance()->runtime_query_statistics_mgr()->trigger_report_profile();
392
0
}
393
394
std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
395
0
QueryContext::_collect_realtime_query_profile() const {
396
0
    std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> res;
397
398
0
    for (auto& [fragment_id, fragment_ctx_wptr] : _fragment_id_to_pipeline_ctx) {
399
0
        if (auto fragment_ctx = fragment_ctx_wptr.lock()) {
400
0
            if (fragment_ctx == nullptr) {
401
0
                std::string msg =
402
0
                        fmt::format("PipelineFragmentContext is nullptr, query {} fragment_id: {}",
403
0
                                    print_id(_query_id), fragment_id);
404
0
                LOG_ERROR(msg);
405
0
                DCHECK(false) << msg;
406
0
                continue;
407
0
            }
408
409
0
            auto profile = fragment_ctx->collect_realtime_profile();
410
411
0
            if (profile.empty()) {
412
0
                std::string err_msg = fmt::format(
413
0
                        "Get nothing when collecting profile, query {}, fragment_id: {}",
414
0
                        print_id(_query_id), fragment_id);
415
0
                LOG_ERROR(err_msg);
416
0
                DCHECK(false) << err_msg;
417
0
                continue;
418
0
            }
419
420
0
            res.insert(std::make_pair(fragment_id, profile));
421
0
        }
422
0
    }
423
424
0
    return res;
425
0
}
426
427
0
TReportExecStatusParams QueryContext::get_realtime_exec_status() const {
428
0
    TReportExecStatusParams exec_status;
429
430
0
    auto realtime_query_profile = _collect_realtime_query_profile();
431
0
    std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profiles;
432
433
0
    for (auto load_channel_profile : _load_channel_profile_map) {
434
0
        if (load_channel_profile.second != nullptr) {
435
0
            load_channel_profiles.push_back(load_channel_profile.second);
436
0
        }
437
0
    }
438
439
0
    exec_status = RuntimeQueryStatisticsMgr::create_report_exec_status_params(
440
0
            this->_query_id, std::move(realtime_query_profile), std::move(load_channel_profiles),
441
0
            /*is_done=*/false);
442
443
0
    return exec_status;
444
0
}
445
446
} // namespace doris