Coverage Report

Created: 2026-03-15 08:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/memory/memory_reclamation.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/PaloInternalService_types.h>
21
22
#include "runtime/workload_management/memory_context.h"
23
#include "runtime/workload_management/resource_context.h"
24
#include "runtime/workload_management/task_controller.h"
25
26
namespace doris {
27
#include "common/compile_check_begin.h"
28
29
constexpr size_t SMALL_MEMORY_TASK = 32 * 1024 * 1024; // 32M
30
31
class MemoryReclamation {
32
public:
33
    enum class PriorityCmpFunc { TOP_MEMORY = 0, TOP_OVERCOMMITED_MEMORY = 1 };
34
    enum class FilterFunc {
35
        EXCLUDE_IS_SMALL = 0,
36
        EXCLUDE_IS_OVERCOMMITED = 1,
37
        IS_QUERY = 2,
38
        IS_LOAD = 3,
39
        IS_COMPACTION = 4
40
    };
41
    enum class ActionFunc { CANCEL = 0 };
42
43
    inline static std::unordered_map<PriorityCmpFunc,
44
                                     const std::function<int64_t(ResourceContext*)>>
45
            PriorityCmpFuncImpl = {
46
                    {PriorityCmpFunc::TOP_MEMORY,
47
16
                     [](ResourceContext* resource_ctx) {
48
16
                         return resource_ctx->memory_context()->current_memory_bytes();
49
16
                     }},
50
                    {PriorityCmpFunc::TOP_OVERCOMMITED_MEMORY,
51
22
                     [](ResourceContext* resource_ctx) {
52
22
                         int64_t mem_limit = resource_ctx->memory_context()->mem_limit();
53
22
                         int64_t mem_size = resource_ctx->memory_context()->current_memory_bytes();
54
22
                         if (mem_limit <= 0 || mem_limit > mem_size) {
55
13
                             return static_cast<int64_t>(-1); // skip not overcommited task.
56
13
                         }
57
9
                         return static_cast<int64_t>(
58
9
                                 (static_cast<double>(mem_size) / static_cast<double>(mem_limit)) *
59
9
                                 1000000); // mem_size will not be greater than 9000G, so not overflow int64_t.
60
22
                     }},
61
    };
62
63
41
    static std::string priority_cmp_func_string(PriorityCmpFunc func) {
64
41
        switch (func) {
65
27
        case PriorityCmpFunc::TOP_MEMORY:
66
27
            return "Top Memory";
67
14
        case PriorityCmpFunc::TOP_OVERCOMMITED_MEMORY:
68
14
            return "Top Overcommited Memory";
69
0
        default:
70
0
            return "Error";
71
41
        }
72
41
    }
73
74
    inline static std::unordered_map<FilterFunc, const std::function<bool(ResourceContext*)>>
75
            FilterFuncImpl = {
76
                    {FilterFunc::EXCLUDE_IS_SMALL,
77
19
                     [](ResourceContext* resource_ctx) {
78
19
                         return resource_ctx->memory_context()->current_memory_bytes() >
79
19
                                SMALL_MEMORY_TASK;
80
19
                     }},
81
                    {FilterFunc::EXCLUDE_IS_OVERCOMMITED,
82
4
                     [](ResourceContext* resource_ctx) {
83
4
                         return resource_ctx->memory_context()->current_memory_bytes() <
84
4
                                resource_ctx->memory_context()->mem_limit();
85
4
                     }},
86
                    {FilterFunc::IS_QUERY,
87
35
                     [](ResourceContext* resource_ctx) {
88
35
                         return resource_ctx->task_controller()->query_type() == TQueryType::SELECT;
89
35
                     }},
90
                    {FilterFunc::IS_LOAD,
91
3
                     [](ResourceContext* resource_ctx) {
92
3
                         return resource_ctx->task_controller()->query_type() == TQueryType::LOAD;
93
3
                     }},
94
                    // TODO, FilterFunc::IS_COMPACTION, IS_SCHEMA_CHANGE, etc.
95
    };
96
97
26
    static std::string filter_func_string(std::vector<FilterFunc> func) {
98
26
        std::vector<std::string> func_strs;
99
31
        std::for_each(func.begin(), func.end(), [&](const auto& it) {
100
31
            switch (it) {
101
9
            case FilterFunc::EXCLUDE_IS_SMALL:
102
9
                func_strs.emplace_back("Exclude Is Small");
103
9
                break;
104
2
            case FilterFunc::EXCLUDE_IS_OVERCOMMITED:
105
2
                func_strs.emplace_back("Exclude Is Overcommited");
106
2
                break;
107
17
            case FilterFunc::IS_QUERY:
108
17
                func_strs.emplace_back("Is Query");
109
17
                break;
110
3
            case FilterFunc::IS_LOAD:
111
3
                func_strs.emplace_back("Is Load");
112
3
                break;
113
0
            case FilterFunc::IS_COMPACTION:
114
0
                func_strs.emplace_back("Is Compaction");
115
0
                break;
116
0
            default:
117
0
                func_strs.emplace_back("Error");
118
31
            }
119
31
        });
120
26
        return join(func_strs, ",");
121
26
    }
122
123
    inline static std::unordered_map<ActionFunc,
124
                                     const std::function<bool(ResourceContext*, const Status&)>>
125
            ActionFuncImpl = {
126
                    {ActionFunc::CANCEL,
127
15
                     [](ResourceContext* resource_ctx, const Status& reason) {
128
15
                         return resource_ctx->task_controller()->cancel(reason);
129
15
                     }},
130
                    // TODO, FilterFunc::SPILL, etc.
131
    };
132
133
41
    static std::string action_func_string(ActionFunc func) {
134
41
        switch (func) {
135
41
        case ActionFunc::CANCEL:
136
41
            return "Cancel";
137
0
        default:
138
0
            return "Error";
139
41
        }
140
41
    }
141
142
    static int64_t revoke_tasks_memory(
143
            int64_t need_free_mem,
144
            const std::vector<std::shared_ptr<ResourceContext>>& resource_ctxs,
145
            const std::string& revoke_reason, RuntimeProfile* profile, PriorityCmpFunc priority_cmp,
146
            std::vector<FilterFunc> filters, ActionFunc action);
147
148
    static bool revoke_process_memory(const std::string& revoke_reason);
149
150
    static void je_purge_dirty_pages();
151
152
private:
153
};
154
155
#include "common/compile_check_end.h"
156
} // namespace doris