be/src/exec/common/memory.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/Types_types.h> |
22 | | |
23 | | #include <atomic> |
24 | | #include <string> |
25 | | |
26 | | #include "common/cast_set.h" |
27 | | #include "common/factory_creator.h" |
28 | | #include "common/logging.h" |
29 | | #include "util/uid_util.h" |
30 | | |
31 | | namespace doris { |
32 | | static constexpr int64_t DEFAULT_SCANNER_MEM_BYTES = 64 * 1024 * 1024; // 64MB default |
33 | | |
34 | | // Query-level memory arbitrator that distributes memory fairly across all scan contexts |
35 | | struct MemShareArbitrator { |
36 | | ENABLE_FACTORY_CREATOR(MemShareArbitrator) |
37 | | TUniqueId query_id; |
38 | | int64_t query_mem_limit = 0; |
39 | | int64_t mem_limit = 0; |
40 | | std::atomic<int64_t> total_mem_bytes = 0; |
41 | | |
42 | | MemShareArbitrator(const TUniqueId& qid, int64_t query_mem_limit, double max_scan_ratio); |
43 | | |
44 | | // Update memory allocation when scanner memory usage changes |
45 | | // Returns new scan memory limit for this context |
46 | | int64_t update_mem_bytes(int64_t old_value, int64_t new_value); |
47 | | void register_scan_node(); |
48 | 1 | std::string debug_string() const { |
49 | 1 | return fmt::format("query_id: {}, query_mem_limit: {}, mem_limit: {}", print_id(query_id), |
50 | 1 | query_mem_limit, mem_limit); |
51 | 1 | } |
52 | | }; |
53 | | |
54 | | // Scan-context-level memory limiter that controls scanner concurrency based on memory |
55 | | struct MemLimiter { |
56 | | private: |
57 | | TUniqueId query_id; |
58 | | mutable std::mutex lock; |
59 | | // Parallelism of the scan operator |
60 | | const int64_t parallelism = 0; |
61 | | const bool serial_operator = false; |
62 | | const int64_t operator_mem_limit; |
63 | | std::atomic<int64_t> running_tasks_count = 0; |
64 | | |
65 | | std::atomic<int64_t> estimated_block_mem_bytes = 0; |
66 | | int64_t estimated_block_mem_bytes_update_count = 0; |
67 | | int64_t arb_mem_bytes = 0; |
68 | | std::atomic<int64_t> open_tasks_count = 0; |
69 | | |
70 | | // Memory limit for this scan node (shared by all instances), updated by memory share arbitrator |
71 | | std::atomic<int64_t> mem_limit = 0; |
72 | | |
73 | | public: |
74 | | ENABLE_FACTORY_CREATOR(MemLimiter) |
75 | | MemLimiter(const TUniqueId& qid, int64_t parallelism, bool serial_operator_, int64_t mem_limit) |
76 | 242k | : query_id(qid), |
77 | 242k | parallelism(parallelism), |
78 | 242k | serial_operator(serial_operator_), |
79 | 242k | operator_mem_limit(mem_limit) {} |
80 | 241k | ~MemLimiter() { DCHECK_LE(open_tasks_count, 0); } |
81 | | |
82 | | // Calculate available scanner count based on memory limit |
83 | | int available_scanner_count(int ins_idx) const; |
84 | | |
85 | 2.74M | int64_t update_running_tasks_count(int delta) { return running_tasks_count += delta; } |
86 | | |
87 | | // Re-estimated the average memory usage of a block, and update the estimated_block_mem_bytes accordingly. |
88 | | void reestimated_block_mem_bytes(int64_t value); |
89 | 1.81M | void update_mem_limit(int64_t value) { mem_limit = value; } |
90 | | // Update the memory usage of this context to memory share arbitrator. |
91 | | // NOTE: This could be accessed by parallel tasks without synchronization, but it's fine |
92 | | // since the memory share arbitrator will do proportional sharing based on the ratio of this |
93 | | // context's memory usage to total memory usage, so even if there are some fluctuations in the |
94 | | // memory usage, the overall proportional sharing will still work. |
95 | 2.10M | void update_arb_mem_bytes(int64_t value) { |
96 | 2.10M | value = std::min(value, operator_mem_limit); |
97 | 2.10M | arb_mem_bytes = value; |
98 | 2.10M | } |
99 | 1.81M | int64_t get_arb_scanner_mem_bytes() const { return arb_mem_bytes; } |
100 | | |
101 | 3.77M | int64_t get_estimated_block_mem_bytes() const { return estimated_block_mem_bytes; } |
102 | | |
103 | 582k | int64_t update_open_tasks_count(int delta) { return open_tasks_count.fetch_add(delta); } |
104 | 1 | std::string debug_string() const { |
105 | 1 | return fmt::format( |
106 | 1 | "query_id: {}, parallelism: {}, serial_operator: {}, operator_mem_limit: {}, " |
107 | 1 | "running_tasks_count: {}, estimated_block_mem_bytes: {}, " |
108 | 1 | "estimated_block_mem_bytes_update_count: {}, arb_mem_bytes: {}, " |
109 | 1 | "open_tasks_count: {}, mem_limit: {}", |
110 | 1 | print_id(query_id), parallelism, serial_operator, operator_mem_limit, |
111 | 1 | running_tasks_count.load(), estimated_block_mem_bytes.load(), |
112 | 1 | estimated_block_mem_bytes_update_count, arb_mem_bytes, open_tasks_count, mem_limit); |
113 | 1 | } |
114 | | }; |
115 | | |
116 | | } // namespace doris |