Coverage Report

Created: 2026-04-10 16:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/operator/spill_utils.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/Metrics_types.h>
21
#include <gen_cpp/Types_types.h>
22
#include <glog/logging.h>
23
24
#include <atomic>
25
#include <functional>
26
#include <utility>
27
28
#include "exec/partitioner/partitioner.h"
29
#include "runtime/fragment_mgr.h"
30
#include "runtime/memory/mem_tracker_limiter.h"
31
#include "runtime/query_context.h"
32
#include "runtime/runtime_profile.h"
33
#include "runtime/runtime_state.h"
34
#include "runtime/thread_context.h"
35
36
namespace doris {
37
38
// Default spill partitioner for initial partitioning (level-0). Repartition
39
// paths may use different channel-id policies (e.g. raw-hash mode).
40
using SpillPartitionerType = Crc32HashPartitioner<SpillPartitionChannelIds>;
41
42
// Repartition partitioner: keeps raw hash (no final modulo) so SpillRepartitioner
43
// can apply level-aware hash mixing and channel mapping.
44
using SpillRePartitionerType = Crc32HashPartitioner<SpillRePartitionChannelIds>;
45
46
struct SpillContext {
47
    std::atomic_int running_tasks_count;
48
    TUniqueId query_id;
49
    std::function<void(SpillContext*)> all_tasks_finished_callback;
50
51
    SpillContext(int running_tasks_count_, TUniqueId query_id_,
52
                 std::function<void(SpillContext*)> all_tasks_finished_callback_)
53
5
            : running_tasks_count(running_tasks_count_),
54
5
              query_id(std::move(query_id_)),
55
5
              all_tasks_finished_callback(std::move(all_tasks_finished_callback_)) {}
56
57
5
    ~SpillContext() {
58
5
        if (running_tasks_count.load() != 0) {
59
0
            LOG(WARNING) << "Query: " << print_id(query_id)
60
0
                         << " not all spill tasks finished, remaining tasks: "
61
0
                         << running_tasks_count.load();
62
0
        }
63
5
    }
64
65
5
    void on_task_finished() {
66
5
        auto count = running_tasks_count.fetch_sub(1);
67
5
        if (count == 1) {
68
5
            all_tasks_finished_callback(this);
69
5
        }
70
5
    }
71
};
72
73
// helper to execute a spill function synchronously.  The old code used
74
// SpillRunnable/SpillSinkRunnable/SpillRecoverRunnable wrappers to track
75
// counters and optionally notify a SpillContext.  Since spill operations are
76
// now performed synchronously and external code already maintains any
77
// necessary counters, those wrappers are no longer necessary.  We keep a
78
// small utility to run the provided callbacks and forward cancellation.
79
inline Status run_spill_task(RuntimeState* state, std::function<Status()> exec_func,
80
53
                             std::function<Status()> fin_cb = {}) {
81
53
    RETURN_IF_ERROR(exec_func());
82
52
    if (fin_cb) {
83
0
        RETURN_IF_ERROR(fin_cb());
84
0
    }
85
52
    return Status::OK();
86
52
}
87
88
template <bool accumulating>
89
inline void update_profile_from_inner_profile(const std::string& name,
90
                                              RuntimeProfile* runtime_profile,
91
1.76k
                                              RuntimeProfile* inner_profile) {
92
1.76k
    auto* inner_counter = inner_profile->get_counter(name);
93
18.4E
    DCHECK(inner_counter != nullptr) << "inner counter " << name << " not found";
94
1.76k
    if (inner_counter == nullptr) [[unlikely]] {
95
0
        return;
96
0
    }
97
1.76k
    auto* counter = runtime_profile->get_counter(name);
98
1.76k
    if (counter == nullptr) [[unlikely]] {
99
1.11k
        counter = runtime_profile->add_counter(name, inner_counter->type(), "",
100
1.11k
                                               inner_counter->level());
101
1.11k
    }
102
1.76k
    if constexpr (accumulating) {
103
        // Memory usage should not be accumulated.
104
447
        if (counter->type() == TUnit::BYTES) {
105
108
            counter->set(inner_counter->value());
106
339
        } else {
107
339
            counter->update(inner_counter->value());
108
339
        }
109
1.31k
    } else {
110
1.31k
        counter->set(inner_counter->value());
111
1.31k
    }
112
1.76k
}
_ZN5doris33update_profile_from_inner_profileILb0EEEvRKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_14RuntimeProfileESA_
Line
Count
Source
91
1.31k
                                              RuntimeProfile* inner_profile) {
92
1.31k
    auto* inner_counter = inner_profile->get_counter(name);
93
18.4E
    DCHECK(inner_counter != nullptr) << "inner counter " << name << " not found";
94
1.31k
    if (inner_counter == nullptr) [[unlikely]] {
95
0
        return;
96
0
    }
97
1.31k
    auto* counter = runtime_profile->get_counter(name);
98
1.31k
    if (counter == nullptr) [[unlikely]] {
99
1.01k
        counter = runtime_profile->add_counter(name, inner_counter->type(), "",
100
1.01k
                                               inner_counter->level());
101
1.01k
    }
102
    if constexpr (accumulating) {
103
        // Memory usage should not be accumulated.
104
        if (counter->type() == TUnit::BYTES) {
105
            counter->set(inner_counter->value());
106
        } else {
107
            counter->update(inner_counter->value());
108
        }
109
1.31k
    } else {
110
1.31k
        counter->set(inner_counter->value());
111
1.31k
    }
112
1.31k
}
_ZN5doris33update_profile_from_inner_profileILb1EEEvRKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_14RuntimeProfileESA_
Line
Count
Source
91
447
                                              RuntimeProfile* inner_profile) {
92
447
    auto* inner_counter = inner_profile->get_counter(name);
93
447
    DCHECK(inner_counter != nullptr) << "inner counter " << name << " not found";
94
447
    if (inner_counter == nullptr) [[unlikely]] {
95
0
        return;
96
0
    }
97
447
    auto* counter = runtime_profile->get_counter(name);
98
447
    if (counter == nullptr) [[unlikely]] {
99
99
        counter = runtime_profile->add_counter(name, inner_counter->type(), "",
100
99
                                               inner_counter->level());
101
99
    }
102
447
    if constexpr (accumulating) {
103
        // Memory usage should not be accumulated.
104
447
        if (counter->type() == TUnit::BYTES) {
105
108
            counter->set(inner_counter->value());
106
339
        } else {
107
339
            counter->update(inner_counter->value());
108
339
        }
109
    } else {
110
        counter->set(inner_counter->value());
111
    }
112
447
}
113
114
} // namespace doris