Coverage Report

Created: 2026-03-13 09:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/external_scan_context_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/external_scan_context_mgr.h"
19
20
#include <glog/logging.h>
21
// IWYU pragma: no_include <bits/chrono.h>
22
#include <chrono> // IWYU pragma: keep
23
#include <ostream>
24
#include <vector>
25
26
#include "common/config.h"
27
#include "common/metrics/doris_metrics.h"
28
#include "common/metrics/metrics.h"
29
#include "runtime/exec_env.h"
30
#include "runtime/fragment_mgr.h"
31
#include "runtime/result_queue_mgr.h"
32
#include "util/thread.h"
33
#include "util/uid_util.h"
34
35
namespace doris {
36
37
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(active_scan_context_count, MetricUnit::NOUNIT);
38
39
ExternalScanContextMgr::ExternalScanContextMgr(ExecEnv* exec_env)
40
11
        : _exec_env(exec_env), _stop_background_threads_latch(1) {
41
    // start the reaper thread for gc the expired context
42
11
    CHECK(Thread::create(
43
11
                  "ExternalScanContextMgr", "gc_expired_context",
44
11
                  [this]() { this->gc_expired_context(); }, &_keep_alive_reaper)
45
11
                  .ok());
46
47
11
    REGISTER_HOOK_METRIC(active_scan_context_count, [this]() {
48
        // std::lock_guard<std::mutex> l(_lock);
49
11
        return _active_contexts.size();
50
11
    });
51
11
}
52
53
3
void ExternalScanContextMgr::stop() {
54
3
    DEREGISTER_HOOK_METRIC(active_scan_context_count);
55
3
    _stop_background_threads_latch.count_down();
56
3
    if (_keep_alive_reaper) {
57
3
        _keep_alive_reaper->join();
58
3
    }
59
3
}
60
61
6
Status ExternalScanContextMgr::create_scan_context(std::shared_ptr<ScanContext>* p_context) {
62
6
    std::string context_id = generate_uuid_string();
63
6
    std::shared_ptr<ScanContext> context(new ScanContext(context_id));
64
    // context->last_access_time  = time(nullptr);
65
6
    {
66
6
        std::lock_guard<std::mutex> l(_lock);
67
6
        _active_contexts.insert(std::make_pair(context_id, context));
68
6
    }
69
6
    *p_context = context;
70
6
    return Status::OK();
71
6
}
72
73
Status ExternalScanContextMgr::get_scan_context(const std::string& context_id,
74
8
                                                std::shared_ptr<ScanContext>* p_context) {
75
8
    {
76
8
        std::lock_guard<std::mutex> l(_lock);
77
8
        auto iter = _active_contexts.find(context_id);
78
8
        if (iter != _active_contexts.end()) {
79
6
            *p_context = iter->second;
80
6
        } else {
81
2
            LOG(WARNING) << "get_scan_context error: context id [ " << context_id << " ] not found";
82
2
            std::stringstream msg;
83
2
            msg << "context_id: " << context_id << " not found";
84
2
            return Status::NotFound(msg.str());
85
2
        }
86
8
    }
87
6
    return Status::OK();
88
8
}
89
90
3
Status ExternalScanContextMgr::clear_scan_context(const std::string& context_id) {
91
3
    std::shared_ptr<ScanContext> context;
92
3
    {
93
3
        std::lock_guard<std::mutex> l(_lock);
94
3
        auto iter = _active_contexts.find(context_id);
95
3
        if (iter != _active_contexts.end()) {
96
3
            context = iter->second;
97
3
            if (context == nullptr) {
98
0
                _active_contexts.erase(context_id);
99
0
                return Status::OK();
100
0
            }
101
3
            iter = _active_contexts.erase(iter);
102
3
        }
103
3
    }
104
3
    if (context != nullptr) {
105
        // first cancel the fragment instance, just ignore return status
106
3
        _exec_env->fragment_mgr()->cancel_query(context->query_id,
107
3
                                                Status::InternalError("cancelled by clear thread"));
108
        // clear the fragment instance's related result queue
109
3
        static_cast<void>(_exec_env->result_queue_mgr()->cancel(context->fragment_instance_id));
110
3
        LOG(INFO) << "close scan context: context id [ " << context_id << " ]";
111
3
    }
112
3
    return Status::OK();
113
3
}
114
115
7
void ExternalScanContextMgr::gc_expired_context() {
116
7
#ifndef BE_TEST
117
49
    while (!_stop_background_threads_latch.wait_for(
118
49
            std::chrono::seconds(doris::config::scan_context_gc_interval_min * 60))) {
119
42
        time_t current_time = time(nullptr);
120
42
        std::vector<std::shared_ptr<ScanContext>> expired_contexts;
121
42
        {
122
42
            std::lock_guard<std::mutex> l(_lock);
123
44
            for (auto iter = _active_contexts.begin(); iter != _active_contexts.end();) {
124
2
                auto context = iter->second;
125
2
                if (context == nullptr) {
126
0
                    iter = _active_contexts.erase(iter);
127
0
                    continue;
128
0
                }
129
                // being processed or timeout is disabled
130
2
                if (context->last_access_time == -1) {
131
0
                    ++iter; // advance one entry
132
0
                    continue;
133
0
                }
134
                // free context if context is idle for context->keep_alive_min
135
2
                if (current_time - context->last_access_time > context->keep_alive_min * 60) {
136
1
                    LOG(INFO) << "gc expired scan context: context id [ " << context->context_id
137
1
                              << " ]";
138
1
                    expired_contexts.push_back(context);
139
1
                    iter = _active_contexts.erase(iter);
140
1
                } else {
141
1
                    ++iter; // advanced
142
1
                }
143
2
            }
144
42
        }
145
42
        for (auto expired_context : expired_contexts) {
146
            // must cancel the fragment instance, otherwise return thrift transport TTransportException
147
1
            _exec_env->fragment_mgr()->cancel_query(
148
1
                    expired_context->query_id, Status::InternalError("scan context is expired"));
149
1
            static_cast<void>(
150
1
                    _exec_env->result_queue_mgr()->cancel(expired_context->fragment_instance_id));
151
1
        }
152
42
    }
153
7
#endif
154
7
}
155
} // namespace doris