Coverage Report

Created: 2024-11-21 14:46

/root/doris/be/src/runtime/query_context.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/query_context.h"
19
20
#include <exception>
21
#include <memory>
22
23
#include "common/logging.h"
24
#include "pipeline/pipeline_fragment_context.h"
25
#include "pipeline/pipeline_x/dependency.h"
26
#include "runtime/runtime_query_statistics_mgr.h"
27
#include "runtime/thread_context.h"
28
#include "runtime/workload_group/workload_group_manager.h"
29
#include "util/mem_info.h"
30
#include "util/uid_util.h"
31
#include "vec/spill/spill_stream_manager.h"
32
33
namespace doris {
34
35
class DelayReleaseToken : public Runnable {
36
    ENABLE_FACTORY_CREATOR(DelayReleaseToken);
37
38
public:
39
0
    DelayReleaseToken(std::unique_ptr<ThreadPoolToken>&& token) { token_ = std::move(token); }
40
0
    ~DelayReleaseToken() override = default;
41
0
    void run() override {}
42
    std::unique_ptr<ThreadPoolToken> token_;
43
};
44
45
0
const std::string toString(QuerySource queryType) {
46
0
    switch (queryType) {
47
0
    case QuerySource::INTERNAL_FRONTEND:
48
0
        return "INTERNAL_FRONTEND";
49
0
    case QuerySource::STREAM_LOAD:
50
0
        return "STREAM_LOAD";
51
0
    case QuerySource::GROUP_COMMIT_LOAD:
52
0
        return "EXTERNAL_QUERY";
53
0
    case QuerySource::ROUTINE_LOAD:
54
0
        return "ROUTINE_LOAD";
55
0
    case QuerySource::EXTERNAL_CONNECTOR:
56
0
        return "EXTERNAL_CONNECTOR";
57
0
    default:
58
0
        return "UNKNOWN";
59
0
    }
60
0
}
61
62
QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv* exec_env,
63
                           const TQueryOptions& query_options, TNetworkAddress coord_addr,
64
                           bool is_pipeline, bool is_nereids, TNetworkAddress current_connect_fe,
65
                           QuerySource query_source)
66
        : fragment_num(total_fragment_num),
67
          timeout_second(-1),
68
          _query_id(query_id),
69
          _exec_env(exec_env),
70
          _is_pipeline(is_pipeline),
71
          _is_nereids(is_nereids),
72
          _query_options(query_options),
73
0
          _query_source(query_source) {
74
0
    _init_query_mem_tracker();
75
0
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker);
76
77
0
    _start_time = VecDateTimeValue::local_time();
78
0
    _shared_hash_table_controller.reset(new vectorized::SharedHashTableController());
79
0
    _shared_scanner_controller.reset(new vectorized::SharedScannerController());
80
0
    _execution_dependency = pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency");
81
0
    _runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(
82
0
            TUniqueId(), RuntimeFilterParamsContext::create(this), query_mem_tracker);
83
84
0
    timeout_second = query_options.execution_timeout;
85
86
0
    bool is_query_type_valid = query_options.query_type == TQueryType::SELECT ||
87
0
                               query_options.query_type == TQueryType::LOAD ||
88
0
                               query_options.query_type == TQueryType::EXTERNAL;
89
0
    DCHECK_EQ(is_query_type_valid, true);
90
91
0
    this->coord_addr = coord_addr;
92
    // current_connect_fe is used for report query statistics
93
0
    this->current_connect_fe = current_connect_fe;
94
    // external query has no current_connect_fe
95
0
    if (query_options.query_type != TQueryType::EXTERNAL) {
96
0
        bool is_report_fe_addr_valid =
97
0
                !this->current_connect_fe.hostname.empty() && this->current_connect_fe.port != 0;
98
0
        DCHECK_EQ(is_report_fe_addr_valid, true);
99
0
    }
100
0
    clock_gettime(CLOCK_MONOTONIC, &this->_query_arrival_timestamp);
101
0
    register_memory_statistics();
102
0
    register_cpu_statistics();
103
0
    DorisMetrics::instance()->query_ctx_cnt->increment(1);
104
0
}
105
106
0
void QueryContext::_init_query_mem_tracker() {
107
0
    bool has_query_mem_limit = _query_options.__isset.mem_limit && (_query_options.mem_limit > 0);
108
0
    int64_t _bytes_limit = has_query_mem_limit ? _query_options.mem_limit : -1;
109
0
    if (_bytes_limit > MemInfo::mem_limit()) {
110
0
        VLOG_NOTICE << "Query memory limit " << PrettyPrinter::print(_bytes_limit, TUnit::BYTES)
111
0
                    << " exceeds process memory limit of "
112
0
                    << PrettyPrinter::print(MemInfo::mem_limit(), TUnit::BYTES)
113
0
                    << ". Using process memory limit instead";
114
0
        _bytes_limit = MemInfo::mem_limit();
115
0
    }
116
0
    if (_query_options.query_type == TQueryType::SELECT) {
117
0
        query_mem_tracker = MemTrackerLimiter::create_shared(
118
0
                MemTrackerLimiter::Type::QUERY, fmt::format("Query#Id={}", print_id(_query_id)),
119
0
                _bytes_limit);
120
0
    } else if (_query_options.query_type == TQueryType::LOAD) {
121
0
        query_mem_tracker = MemTrackerLimiter::create_shared(
122
0
                MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}", print_id(_query_id)),
123
0
                _bytes_limit);
124
0
    } else { // EXTERNAL
125
0
        query_mem_tracker = MemTrackerLimiter::create_shared(
126
0
                MemTrackerLimiter::Type::LOAD, fmt::format("External#Id={}", print_id(_query_id)),
127
0
                _bytes_limit);
128
0
    }
129
0
    if (_query_options.__isset.is_report_success && _query_options.is_report_success) {
130
0
        query_mem_tracker->enable_print_log_usage();
131
0
    }
132
0
}
133
134
0
QueryContext::~QueryContext() {
135
0
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker);
136
    // query mem tracker consumption is equal to 0, it means that after QueryContext is created,
137
    // it is found that query already exists in _query_ctx_map, and query mem tracker is not used.
138
    // query mem tracker consumption is not equal to 0 after use, because there is memory consumed
139
    // on query mem tracker, released on other trackers.
140
0
    std::string mem_tracker_msg;
141
0
    if (query_mem_tracker->peak_consumption() != 0) {
142
0
        mem_tracker_msg = fmt::format(
143
0
                ", deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, "
144
0
                "PeakUsed={}",
145
0
                print_id(_query_id), MemTracker::print_bytes(query_mem_tracker->limit()),
146
0
                MemTracker::print_bytes(query_mem_tracker->consumption()),
147
0
                MemTracker::print_bytes(query_mem_tracker->peak_consumption()));
148
0
    }
149
0
    uint64_t group_id = 0;
150
0
    if (_workload_group) {
151
0
        group_id = _workload_group->id(); // before remove
152
0
        _workload_group->remove_mem_tracker_limiter(query_mem_tracker);
153
0
        _workload_group->remove_query(_query_id);
154
0
    }
155
156
0
    _exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id));
157
0
    LOG_INFO("Query {} deconstructed, {}", print_id(_query_id), mem_tracker_msg);
158
    // Not release the the thread token in query context's dector method, because the query
159
    // conext may be dectored in the thread token it self. It is very dangerous and may core.
160
    // And also thread token need shutdown, it may take some time, may cause the thread that
161
    // release the token hang, the thread maybe a pipeline task scheduler thread.
162
0
    if (_thread_token) {
163
0
        Status submit_st = ExecEnv::GetInstance()->lazy_release_obj_pool()->submit(
164
0
                DelayReleaseToken::create_shared(std::move(_thread_token)));
165
0
        if (!submit_st.ok()) {
166
0
            LOG(WARNING) << "Failed to release query context thread token, query_id "
167
0
                         << print_id(_query_id) << ", error status " << submit_st;
168
0
        }
169
0
    }
170
171
    //TODO: check if pipeline and tracing both enabled
172
0
    if (_is_pipeline && ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) [[unlikely]] {
173
0
        try {
174
0
            ExecEnv::GetInstance()->pipeline_tracer_context()->end_query(_query_id, group_id);
175
0
        } catch (std::exception& e) {
176
0
            LOG(WARNING) << "Dump trace log failed bacause " << e.what();
177
0
        }
178
0
    }
179
0
    _runtime_filter_mgr.reset();
180
0
    _execution_dependency.reset();
181
0
    _shared_hash_table_controller.reset();
182
0
    _shared_scanner_controller.reset();
183
0
    _runtime_predicates.clear();
184
0
    file_scan_range_params_map.clear();
185
0
    obj_pool.clear();
186
187
0
    _exec_env->spill_stream_mgr()->async_cleanup_query(_query_id);
188
0
    DorisMetrics::instance()->query_ctx_cnt->increment(-1);
189
    // the only one msg shows query's end. any other msg should append to it if need.
190
0
    LOG_INFO("Query {} deconstructed, mem_tracker: {}", print_id(this->_query_id), mem_tracker_msg);
191
0
}
192
193
0
void QueryContext::set_ready_to_execute(bool is_cancelled) {
194
0
    set_execution_dependency_ready();
195
0
    {
196
0
        std::lock_guard<std::mutex> l(_start_lock);
197
0
        if (!_is_cancelled) {
198
0
            _is_cancelled = is_cancelled;
199
0
        }
200
0
        _ready_to_execute = true;
201
0
    }
202
0
    if (query_mem_tracker && is_cancelled) {
203
0
        query_mem_tracker->set_is_query_cancelled(is_cancelled);
204
0
    }
205
0
    _start_cond.notify_all();
206
0
}
207
208
0
void QueryContext::set_ready_to_execute_only() {
209
0
    set_execution_dependency_ready();
210
0
    {
211
0
        std::lock_guard<std::mutex> l(_start_lock);
212
0
        _ready_to_execute = true;
213
0
    }
214
0
    _start_cond.notify_all();
215
0
}
216
217
0
void QueryContext::set_execution_dependency_ready() {
218
0
    _execution_dependency->set_ready();
219
0
}
220
221
0
void QueryContext::cancel(std::string msg, Status new_status, int fragment_id) {
222
    // we must get this wrong status once query ctx's `_is_cancelled` = true.
223
0
    set_exec_status(new_status);
224
    // Just for CAS need a left value
225
0
    bool false_cancel = false;
226
0
    if (!_is_cancelled.compare_exchange_strong(false_cancel, true)) {
227
0
        return;
228
0
    }
229
0
    DCHECK(!false_cancel && _is_cancelled);
230
231
0
    set_ready_to_execute(true);
232
0
    std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> ctx_to_cancel;
233
0
    {
234
0
        std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
235
0
        for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) {
236
0
            if (fragment_id == f_id) {
237
0
                continue;
238
0
            }
239
0
            ctx_to_cancel.push_back(f_context);
240
0
        }
241
0
    }
242
    // Must not add lock here. There maybe dead lock because it will call fragment
243
    // ctx cancel and fragment ctx will call query ctx cancel.
244
0
    for (auto& f_context : ctx_to_cancel) {
245
0
        if (auto pipeline_ctx = f_context.lock()) {
246
0
            pipeline_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, msg);
247
0
        }
248
0
    }
249
0
}
250
251
void QueryContext::cancel_all_pipeline_context(const PPlanFragmentCancelReason& reason,
252
0
                                               const std::string& msg) {
253
0
    std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> ctx_to_cancel;
254
0
    {
255
0
        std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
256
0
        for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) {
257
0
            ctx_to_cancel.push_back(f_context);
258
0
        }
259
0
    }
260
0
    for (auto& f_context : ctx_to_cancel) {
261
0
        if (auto pipeline_ctx = f_context.lock()) {
262
0
            pipeline_ctx->cancel(reason, msg);
263
0
        }
264
0
    }
265
0
}
266
267
Status QueryContext::cancel_pipeline_context(const int fragment_id,
268
                                             const PPlanFragmentCancelReason& reason,
269
0
                                             const std::string& msg) {
270
0
    std::weak_ptr<pipeline::PipelineFragmentContext> ctx_to_cancel;
271
0
    {
272
0
        std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
273
0
        if (!_fragment_id_to_pipeline_ctx.contains(fragment_id)) {
274
0
            return Status::InternalError("fragment_id_to_pipeline_ctx is empty!");
275
0
        }
276
0
        ctx_to_cancel = _fragment_id_to_pipeline_ctx[fragment_id];
277
0
    }
278
0
    if (auto pipeline_ctx = ctx_to_cancel.lock()) {
279
0
        pipeline_ctx->cancel(reason, msg);
280
0
    }
281
0
    return Status::OK();
282
0
}
283
284
0
std::string QueryContext::print_all_pipeline_context() {
285
0
    std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> ctx_to_print;
286
0
    fmt::memory_buffer debug_string_buffer;
287
0
    size_t i = 0;
288
0
    {
289
0
        fmt::format_to(debug_string_buffer, "{} pipeline fragment contexts in query {}. \n",
290
0
                       _fragment_id_to_pipeline_ctx.size(), print_id(_query_id));
291
292
0
        {
293
0
            std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
294
0
            for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) {
295
0
                ctx_to_print.push_back(f_context);
296
0
            }
297
0
        }
298
0
        for (auto& f_context : ctx_to_print) {
299
0
            if (auto pipeline_ctx = f_context.lock()) {
300
0
                auto elapsed = pipeline_ctx->elapsed_time() / 1000000000.0;
301
0
                fmt::format_to(debug_string_buffer,
302
0
                               "No.{} (elapse_second={}s, fragment_id={}) : {}\n", i, elapsed,
303
0
                               pipeline_ctx->get_fragment_id(), pipeline_ctx->debug_string());
304
0
                i++;
305
0
            }
306
0
        }
307
0
    }
308
0
    return fmt::to_string(debug_string_buffer);
309
0
}
310
311
void QueryContext::set_pipeline_context(
312
0
        const int fragment_id, std::shared_ptr<pipeline::PipelineFragmentContext> pip_ctx) {
313
0
    std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
314
0
    _fragment_id_to_pipeline_ctx.insert({fragment_id, pip_ctx});
315
0
}
316
317
0
void QueryContext::register_query_statistics(std::shared_ptr<QueryStatistics> qs) {
318
0
    _exec_env->runtime_query_statistics_mgr()->register_query_statistics(
319
0
            print_id(_query_id), qs, current_connect_fe, _query_options.query_type);
320
0
}
321
322
0
std::shared_ptr<QueryStatistics> QueryContext::get_query_statistics() {
323
0
    return _exec_env->runtime_query_statistics_mgr()->get_runtime_query_statistics(
324
0
            print_id(_query_id));
325
0
}
326
327
0
void QueryContext::register_memory_statistics() {
328
0
    if (query_mem_tracker) {
329
0
        std::shared_ptr<QueryStatistics> qs = query_mem_tracker->get_query_statistics();
330
0
        std::string query_id = print_id(_query_id);
331
0
        if (qs) {
332
0
            _exec_env->runtime_query_statistics_mgr()->register_query_statistics(
333
0
                    query_id, qs, current_connect_fe, _query_options.query_type);
334
0
        } else {
335
0
            LOG(INFO) << " query " << query_id << " get memory query statistics failed ";
336
0
        }
337
0
    }
338
0
}
339
340
0
void QueryContext::register_cpu_statistics() {
341
0
    if (!_cpu_statistics) {
342
0
        _cpu_statistics = std::make_shared<QueryStatistics>();
343
0
        _exec_env->runtime_query_statistics_mgr()->register_query_statistics(
344
0
                print_id(_query_id), _cpu_statistics, current_connect_fe,
345
0
                _query_options.query_type);
346
0
    }
347
0
}
348
349
0
doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() {
350
0
    if (_workload_group) {
351
0
        if (_task_scheduler) {
352
0
            return _task_scheduler;
353
0
        }
354
0
    }
355
0
    return _exec_env->pipeline_task_scheduler();
356
0
}
357
358
0
ThreadPool* QueryContext::get_memtable_flush_pool() {
359
0
    if (_workload_group) {
360
0
        return _memtable_flush_pool;
361
0
    } else {
362
0
        return nullptr;
363
0
    }
364
0
}
365
366
0
Status QueryContext::set_workload_group(WorkloadGroupPtr& tg) {
367
0
    _workload_group = tg;
368
    // Should add query first, then the workload group will not be deleted.
369
    // see task_group_manager::delete_workload_group_by_ids
370
0
    _workload_group->add_mem_tracker_limiter(query_mem_tracker);
371
0
    _workload_group->get_query_scheduler(&_task_scheduler, &_scan_task_scheduler,
372
0
                                         &_memtable_flush_pool, &_remote_scan_task_scheduler);
373
0
    return Status::OK();
374
0
}
375
376
} // namespace doris