Coverage Report

Created: 2026-03-12 17:15

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/external_scan_context_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/external_scan_context_mgr.h"
19
20
#include <glog/logging.h>
21
// IWYU pragma: no_include <bits/chrono.h>
22
#include <chrono> // IWYU pragma: keep
23
#include <ostream>
24
#include <vector>
25
26
#include "common/config.h"
27
#include "common/metrics/doris_metrics.h"
28
#include "common/metrics/metrics.h"
29
#include "runtime/exec_env.h"
30
#include "runtime/fragment_mgr.h"
31
#include "runtime/result_queue_mgr.h"
32
#include "util/thread.h"
33
#include "util/uid_util.h"
34
35
namespace doris {
36
37
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(active_scan_context_count, MetricUnit::NOUNIT);
38
39
ExternalScanContextMgr::ExternalScanContextMgr(ExecEnv* exec_env)
40
10
        : _exec_env(exec_env), _stop_background_threads_latch(1) {
41
    // start the reaper thread for gc the expired context
42
10
    CHECK(Thread::create(
43
10
                  "ExternalScanContextMgr", "gc_expired_context",
44
10
                  [this]() { this->gc_expired_context(); }, &_keep_alive_reaper)
45
10
                  .ok());
46
47
10
    REGISTER_HOOK_METRIC(active_scan_context_count, [this]() {
48
        // std::lock_guard<std::mutex> l(_lock);
49
10
        return _active_contexts.size();
50
10
    });
51
10
}
52
53
3
void ExternalScanContextMgr::stop() {
54
3
    DEREGISTER_HOOK_METRIC(active_scan_context_count);
55
3
    _stop_background_threads_latch.count_down();
56
3
    if (_keep_alive_reaper) {
57
3
        _keep_alive_reaper->join();
58
3
    }
59
3
}
60
61
3
Status ExternalScanContextMgr::create_scan_context(std::shared_ptr<ScanContext>* p_context) {
62
3
    std::string context_id = generate_uuid_string();
63
3
    std::shared_ptr<ScanContext> context(new ScanContext(context_id));
64
    // context->last_access_time  = time(nullptr);
65
3
    {
66
3
        std::lock_guard<std::mutex> l(_lock);
67
3
        _active_contexts.insert(std::make_pair(context_id, context));
68
3
    }
69
3
    *p_context = context;
70
3
    return Status::OK();
71
3
}
72
73
Status ExternalScanContextMgr::get_scan_context(const std::string& context_id,
74
3
                                                std::shared_ptr<ScanContext>* p_context) {
75
3
    {
76
3
        std::lock_guard<std::mutex> l(_lock);
77
3
        auto iter = _active_contexts.find(context_id);
78
3
        if (iter != _active_contexts.end()) {
79
1
            *p_context = iter->second;
80
2
        } else {
81
2
            LOG(WARNING) << "get_scan_context error: context id [ " << context_id << " ] not found";
82
2
            std::stringstream msg;
83
2
            msg << "context_id: " << context_id << " not found";
84
2
            return Status::NotFound(msg.str());
85
2
        }
86
3
    }
87
1
    return Status::OK();
88
3
}
89
90
1
Status ExternalScanContextMgr::clear_scan_context(const std::string& context_id) {
91
1
    std::shared_ptr<ScanContext> context;
92
1
    {
93
1
        std::lock_guard<std::mutex> l(_lock);
94
1
        auto iter = _active_contexts.find(context_id);
95
1
        if (iter != _active_contexts.end()) {
96
1
            context = iter->second;
97
1
            if (context == nullptr) {
98
0
                _active_contexts.erase(context_id);
99
0
                return Status::OK();
100
0
            }
101
1
            iter = _active_contexts.erase(iter);
102
1
        }
103
1
    }
104
1
    if (context != nullptr) {
105
        // first cancel the fragment instance, just ignore return status
106
1
        _exec_env->fragment_mgr()->cancel_query(context->query_id,
107
1
                                                Status::InternalError("cancelled by clear thread"));
108
        // clear the fragment instance's related result queue
109
1
        static_cast<void>(_exec_env->result_queue_mgr()->cancel(context->fragment_instance_id));
110
1
        LOG(INFO) << "close scan context: context id [ " << context_id << " ]";
111
1
    }
112
1
    return Status::OK();
113
1
}
114
115
6
void ExternalScanContextMgr::gc_expired_context() {
116
6
#ifndef BE_TEST
117
34
    while (!_stop_background_threads_latch.wait_for(
118
34
            std::chrono::seconds(doris::config::scan_context_gc_interval_min * 60))) {
119
28
        time_t current_time = time(nullptr);
120
28
        std::vector<std::shared_ptr<ScanContext>> expired_contexts;
121
28
        {
122
28
            std::lock_guard<std::mutex> l(_lock);
123
28
            for (auto iter = _active_contexts.begin(); iter != _active_contexts.end();) {
124
0
                auto context = iter->second;
125
0
                if (context == nullptr) {
126
0
                    iter = _active_contexts.erase(iter);
127
0
                    continue;
128
0
                }
129
                // being processed or timeout is disabled
130
0
                if (context->last_access_time == -1) {
131
0
                    ++iter; // advance one entry
132
0
                    continue;
133
0
                }
134
                // free context if context is idle for context->keep_alive_min
135
0
                if (current_time - context->last_access_time > context->keep_alive_min * 60) {
136
0
                    LOG(INFO) << "gc expired scan context: context id [ " << context->context_id
137
0
                              << " ]";
138
0
                    expired_contexts.push_back(context);
139
0
                    iter = _active_contexts.erase(iter);
140
0
                } else {
141
0
                    ++iter; // advanced
142
0
                }
143
0
            }
144
28
        }
145
28
        for (auto expired_context : expired_contexts) {
146
            // must cancel the fragment instance, otherwise return thrift transport TTransportException
147
0
            _exec_env->fragment_mgr()->cancel_query(
148
0
                    expired_context->query_id, Status::InternalError("scan context is expired"));
149
0
            static_cast<void>(
150
0
                    _exec_env->result_queue_mgr()->cancel(expired_context->fragment_instance_id));
151
0
        }
152
28
    }
153
6
#endif
154
6
}
155
} // namespace doris