Coverage Report

Created: 2026-04-03 07:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/common/memory.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/formatIPv6.cpp
19
// and modified by Doris
20
21
#include "exec/common/memory.h"
22
23
namespace doris {
24
25
MemShareArbitrator::MemShareArbitrator(const TUniqueId& qid, int64_t query_mem_limit,
26
                                       double max_scan_ratio)
27
406k
        : query_id(qid),
28
406k
          query_mem_limit(query_mem_limit),
29
406k
          mem_limit(std::max<int64_t>(
30
406k
                  1, static_cast<int64_t>(static_cast<double>(query_mem_limit) * max_scan_ratio))) {
31
406k
}
32
33
241k
void MemShareArbitrator::register_scan_node() {
34
241k
    total_mem_bytes.fetch_add(DEFAULT_SCANNER_MEM_BYTES);
35
241k
}
36
37
1.82M
int64_t MemShareArbitrator::update_mem_bytes(int64_t old_value, int64_t new_value) {
38
1.82M
    int64_t diff = new_value - old_value;
39
1.82M
    int64_t total = total_mem_bytes.fetch_add(diff) + diff;
40
1.82M
    if (new_value == 0) return 0;
41
1.60M
    if (total <= 0) return mem_limit;
42
    // Proportional sharing: allocate based on this context's share of total usage
43
1.60M
    double ratio = static_cast<double>(new_value) / static_cast<double>(std::max(total, new_value));
44
1.60M
    return static_cast<int64_t>(static_cast<double>(mem_limit) * ratio);
45
1.60M
}
46
47
// ==================== MemLimiter ====================
48
1.66M
int MemLimiter::available_scanner_count(int ins_idx) const {
49
1.66M
    int64_t mem_limit_value = mem_limit.load();
50
1.66M
    int64_t running_tasks_count_value = running_tasks_count.load();
51
1.66M
    int64_t estimated_block_mem_bytes_value = get_estimated_block_mem_bytes();
52
1.66M
    DCHECK_GT(estimated_block_mem_bytes_value, 0);
53
54
1.66M
    int64_t max_count = std::max(1L, mem_limit_value / estimated_block_mem_bytes_value);
55
1.66M
    int64_t avail_count = max_count;
56
1.66M
    int64_t per_count = avail_count / parallelism;
57
1.66M
    if (serial_operator) {
58
268k
        per_count += (avail_count - per_count * parallelism);
59
1.39M
    } else if (ins_idx < avail_count - per_count * parallelism) {
60
187k
        per_count += 1;
61
187k
    }
62
63
18.4E
    VLOG_DEBUG << "available_scanner_count. max_count=" << max_count << "("
64
18.4E
               << running_tasks_count_value << "/" << estimated_block_mem_bytes_value
65
18.4E
               << "), operator_mem_limit = " << operator_mem_limit
66
18.4E
               << ", running_tasks_count = " << running_tasks_count_value
67
18.4E
               << ", parallelism = " << parallelism << ", avail_count = " << avail_count
68
18.4E
               << ", ins_id = " << ins_idx << ", per_count = " << per_count
69
18.4E
               << " debug_string: " << debug_string();
70
71
1.66M
    return cast_set<int>(per_count);
72
1.66M
}
73
74
1.66M
void MemLimiter::reestimated_block_mem_bytes(int64_t value) {
75
1.66M
    if (value == 0) return;
76
734k
    value = std::min(value, operator_mem_limit);
77
78
734k
    std::lock_guard<std::mutex> L(lock);
79
734k
    auto old_value = estimated_block_mem_bytes.load();
80
734k
    int64_t total =
81
734k
            get_estimated_block_mem_bytes() * estimated_block_mem_bytes_update_count + value;
82
734k
    estimated_block_mem_bytes_update_count += 1;
83
734k
    estimated_block_mem_bytes = total / estimated_block_mem_bytes_update_count;
84
18.4E
    VLOG_DEBUG << fmt::format(
85
18.4E
            "reestimated_block_mem_bytes. MemLimiter = {}, estimated_block_mem_bytes = {}, "
86
18.4E
            "old_value = {}, value: {}",
87
18.4E
            debug_string(), estimated_block_mem_bytes, old_value, value);
88
734k
}
89
90
} // namespace doris