Coverage Report

Created: 2026-04-15 18:59

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/memory/memory_reclamation.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/PaloInternalService_types.h>
21
22
#include "runtime/workload_management/memory_context.h"
23
#include "runtime/workload_management/resource_context.h"
24
#include "runtime/workload_management/task_controller.h"
25
26
namespace doris {
27
28
constexpr size_t SMALL_MEMORY_TASK = 32 * 1024 * 1024; // 32M
29
30
class MemoryReclamation {
31
public:
32
    enum class PriorityCmpFunc { TOP_MEMORY = 0, TOP_OVERCOMMITED_MEMORY = 1 };
33
    enum class FilterFunc {
34
        EXCLUDE_IS_SMALL = 0,
35
        EXCLUDE_IS_OVERCOMMITED = 1,
36
        IS_QUERY = 2,
37
        IS_LOAD = 3,
38
        IS_COMPACTION = 4
39
    };
40
    enum class ActionFunc { CANCEL = 0 };
41
42
    inline static std::unordered_map<PriorityCmpFunc,
43
                                     const std::function<int64_t(ResourceContext*)>>
44
            PriorityCmpFuncImpl = {
45
                    {PriorityCmpFunc::TOP_MEMORY,
46
32
                     [](ResourceContext* resource_ctx) {
47
32
                         return resource_ctx->memory_context()->current_memory_bytes();
48
32
                     }},
49
                    {PriorityCmpFunc::TOP_OVERCOMMITED_MEMORY,
50
44
                     [](ResourceContext* resource_ctx) {
51
44
                         int64_t mem_limit = resource_ctx->memory_context()->mem_limit();
52
44
                         int64_t mem_size = resource_ctx->memory_context()->current_memory_bytes();
53
44
                         if (mem_limit <= 0 || mem_limit > mem_size) {
54
26
                             return static_cast<int64_t>(-1); // skip not overcommited task.
55
26
                         }
56
18
                         return static_cast<int64_t>(
57
18
                                 (static_cast<double>(mem_size) / static_cast<double>(mem_limit)) *
58
18
                                 1000000); // mem_size will not be greater than 9000G, so not overflow int64_t.
59
44
                     }},
60
    };
61
62
82
    static std::string priority_cmp_func_string(PriorityCmpFunc func) {
63
82
        switch (func) {
64
54
        case PriorityCmpFunc::TOP_MEMORY:
65
54
            return "Top Memory";
66
28
        case PriorityCmpFunc::TOP_OVERCOMMITED_MEMORY:
67
28
            return "Top Overcommited Memory";
68
0
        default:
69
0
            return "Error";
70
82
        }
71
82
    }
72
73
    inline static std::unordered_map<FilterFunc, const std::function<bool(ResourceContext*)>>
74
            FilterFuncImpl = {
75
                    {FilterFunc::EXCLUDE_IS_SMALL,
76
38
                     [](ResourceContext* resource_ctx) {
77
38
                         return resource_ctx->memory_context()->current_memory_bytes() >
78
38
                                SMALL_MEMORY_TASK;
79
38
                     }},
80
                    {FilterFunc::EXCLUDE_IS_OVERCOMMITED,
81
8
                     [](ResourceContext* resource_ctx) {
82
8
                         return resource_ctx->memory_context()->current_memory_bytes() <
83
8
                                resource_ctx->memory_context()->mem_limit();
84
8
                     }},
85
                    {FilterFunc::IS_QUERY,
86
70
                     [](ResourceContext* resource_ctx) {
87
70
                         return resource_ctx->task_controller()->query_type() == TQueryType::SELECT;
88
70
                     }},
89
                    {FilterFunc::IS_LOAD,
90
6
                     [](ResourceContext* resource_ctx) {
91
6
                         return resource_ctx->task_controller()->query_type() == TQueryType::LOAD;
92
6
                     }},
93
                    // TODO, FilterFunc::IS_COMPACTION, IS_SCHEMA_CHANGE, etc.
94
    };
95
96
52
    static std::string filter_func_string(std::vector<FilterFunc> func) {
97
52
        std::vector<std::string> func_strs;
98
62
        std::for_each(func.begin(), func.end(), [&](const auto& it) {
99
62
            switch (it) {
100
18
            case FilterFunc::EXCLUDE_IS_SMALL:
101
18
                func_strs.emplace_back("Exclude Is Small");
102
18
                break;
103
4
            case FilterFunc::EXCLUDE_IS_OVERCOMMITED:
104
4
                func_strs.emplace_back("Exclude Is Overcommited");
105
4
                break;
106
34
            case FilterFunc::IS_QUERY:
107
34
                func_strs.emplace_back("Is Query");
108
34
                break;
109
6
            case FilterFunc::IS_LOAD:
110
6
                func_strs.emplace_back("Is Load");
111
6
                break;
112
0
            case FilterFunc::IS_COMPACTION:
113
0
                func_strs.emplace_back("Is Compaction");
114
0
                break;
115
0
            default:
116
0
                func_strs.emplace_back("Error");
117
62
            }
118
62
        });
119
52
        return join(func_strs, ",");
120
52
    }
121
122
    inline static std::unordered_map<ActionFunc,
123
                                     const std::function<bool(ResourceContext*, const Status&)>>
124
            ActionFuncImpl = {
125
                    {ActionFunc::CANCEL,
126
30
                     [](ResourceContext* resource_ctx, const Status& reason) {
127
30
                         return resource_ctx->task_controller()->cancel(reason);
128
30
                     }},
129
                    // TODO, FilterFunc::SPILL, etc.
130
    };
131
132
82
    static std::string action_func_string(ActionFunc func) {
133
82
        switch (func) {
134
82
        case ActionFunc::CANCEL:
135
82
            return "Cancel";
136
0
        default:
137
0
            return "Error";
138
82
        }
139
82
    }
140
141
    static int64_t revoke_tasks_memory(
142
            int64_t need_free_mem,
143
            const std::vector<std::shared_ptr<ResourceContext>>& resource_ctxs,
144
            const std::string& revoke_reason, RuntimeProfile* profile, PriorityCmpFunc priority_cmp,
145
            std::vector<FilterFunc> filters, ActionFunc action);
146
147
    static bool revoke_process_memory(const std::string& revoke_reason);
148
149
    static void je_purge_dirty_pages();
150
151
private:
152
};
153
154
} // namespace doris