Coverage Report

Created: 2026-04-02 14:31

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/common/memory.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/formatIPv6.cpp
19
// and modified by Doris
20
21
#include "exec/common/memory.h"
22
23
namespace doris {
24
25
MemShareArbitrator::MemShareArbitrator(const TUniqueId& qid, int64_t query_mem_limit,
26
                                       double max_scan_ratio)
27
122k
        : query_id(qid),
28
122k
          query_mem_limit(query_mem_limit),
29
122k
          mem_limit(std::max<int64_t>(
30
122k
                  1, static_cast<int64_t>(static_cast<double>(query_mem_limit) * max_scan_ratio))) {
31
122k
}
32
33
2
void MemShareArbitrator::register_scan_node() {
34
2
    total_mem_bytes.fetch_add(DEFAULT_SCANNER_MEM_BYTES);
35
2
}
36
37
9
int64_t MemShareArbitrator::update_mem_bytes(int64_t old_value, int64_t new_value) {
38
9
    int64_t diff = new_value - old_value;
39
9
    int64_t total = total_mem_bytes.fetch_add(diff) + diff;
40
9
    if (new_value == 0) return 0;
41
4
    if (total <= 0) return mem_limit;
42
    // Proportional sharing: allocate based on this context's share of total usage
43
4
    double ratio = static_cast<double>(new_value) / static_cast<double>(std::max(total, new_value));
44
4
    return static_cast<int64_t>(static_cast<double>(mem_limit) * ratio);
45
4
}
46
47
// ==================== MemLimiter ====================
48
5
int MemLimiter::available_scanner_count(int ins_idx) const {
49
5
    int64_t mem_limit_value = mem_limit.load();
50
5
    int64_t running_tasks_count_value = running_tasks_count.load();
51
5
    int64_t estimated_block_mem_bytes_value = get_estimated_block_mem_bytes();
52
5
    DCHECK_GT(estimated_block_mem_bytes_value, 0);
53
54
5
    int64_t max_count = std::max(1L, mem_limit_value / estimated_block_mem_bytes_value);
55
5
    int64_t avail_count = max_count;
56
5
    int64_t per_count = avail_count / parallelism;
57
5
    if (serial_operator) {
58
1
        per_count += (avail_count - per_count * parallelism);
59
4
    } else if (ins_idx < avail_count - per_count * parallelism) {
60
2
        per_count += 1;
61
2
    }
62
63
5
    VLOG_DEBUG << "available_scanner_count. max_count=" << max_count << "("
64
0
               << running_tasks_count_value << "/" << estimated_block_mem_bytes_value
65
0
               << "), operator_mem_limit = " << operator_mem_limit
66
0
               << ", running_tasks_count = " << running_tasks_count_value
67
0
               << ", parallelism = " << parallelism << ", avail_count = " << avail_count
68
0
               << ", ins_id = " << ins_idx << ", per_count = " << per_count
69
0
               << " debug_string: " << debug_string();
70
71
5
    return cast_set<int>(per_count);
72
5
}
73
74
9
void MemLimiter::reestimated_block_mem_bytes(int64_t value) {
75
9
    if (value == 0) return;
76
8
    value = std::min(value, operator_mem_limit);
77
78
8
    std::lock_guard<std::mutex> L(lock);
79
8
    auto old_value = estimated_block_mem_bytes.load();
80
8
    int64_t total =
81
8
            get_estimated_block_mem_bytes() * estimated_block_mem_bytes_update_count + value;
82
8
    estimated_block_mem_bytes_update_count += 1;
83
8
    estimated_block_mem_bytes = total / estimated_block_mem_bytes_update_count;
84
8
    VLOG_DEBUG << fmt::format(
85
0
            "reestimated_block_mem_bytes. MemLimiter = {}, estimated_block_mem_bytes = {}, "
86
0
            "old_value = {}, value: {}",
87
0
            debug_string(), estimated_block_mem_bytes, old_value, value);
88
8
}
89
90
} // namespace doris