Coverage Report

Created: 2026-03-19 16:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/operator/spill_utils.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/Metrics_types.h>
21
#include <gen_cpp/Types_types.h>
22
#include <glog/logging.h>
23
24
#include <atomic>
25
#include <functional>
26
#include <utility>
27
28
#include "exec/partitioner/partitioner.h"
29
#include "runtime/fragment_mgr.h"
30
#include "runtime/memory/mem_tracker_limiter.h"
31
#include "runtime/query_context.h"
32
#include "runtime/runtime_profile.h"
33
#include "runtime/runtime_state.h"
34
#include "runtime/thread_context.h"
35
36
namespace doris {
37
#include "common/compile_check_begin.h"
38
39
// Default spill partitioner for initial partitioning (level-0). Repartition
40
// paths may use different channel-id policies (e.g. raw-hash mode).
41
using SpillPartitionerType = Crc32HashPartitioner<SpillPartitionChannelIds>;
42
43
// Repartition partitioner: keeps raw hash (no final modulo) so SpillRepartitioner
44
// can apply level-aware hash mixing and channel mapping.
45
using SpillRePartitionerType = Crc32HashPartitioner<SpillRePartitionChannelIds>;
46
47
struct SpillContext {
48
    std::atomic_int running_tasks_count;
49
    TUniqueId query_id;
50
    std::function<void(SpillContext*)> all_tasks_finished_callback;
51
52
    SpillContext(int running_tasks_count_, TUniqueId query_id_,
53
                 std::function<void(SpillContext*)> all_tasks_finished_callback_)
54
5
            : running_tasks_count(running_tasks_count_),
55
5
              query_id(std::move(query_id_)),
56
5
              all_tasks_finished_callback(std::move(all_tasks_finished_callback_)) {}
57
58
5
    ~SpillContext() {
59
5
        if (running_tasks_count.load() != 0) {
60
0
            LOG(WARNING) << "Query: " << print_id(query_id)
61
0
                         << " not all spill tasks finished, remaining tasks: "
62
0
                         << running_tasks_count.load();
63
0
        }
64
5
    }
65
66
5
    void on_task_finished() {
67
5
        auto count = running_tasks_count.fetch_sub(1);
68
5
        if (count == 1) {
69
5
            all_tasks_finished_callback(this);
70
5
        }
71
5
    }
72
};
73
74
// helper to execute a spill function synchronously.  The old code used
75
// SpillRunnable/SpillSinkRunnable/SpillRecoverRunnable wrappers to track
76
// counters and optionally notify a SpillContext.  Since spill operations are
77
// now performed synchronously and external code already maintains any
78
// necessary counters, those wrappers are no longer necessary.  We keep a
79
// small utility to run the provided callbacks and forward cancellation.
80
inline Status run_spill_task(RuntimeState* state, std::function<Status()> exec_func,
81
53
                             std::function<Status()> fin_cb = {}) {
82
53
    RETURN_IF_ERROR(exec_func());
83
52
    if (fin_cb) {
84
0
        RETURN_IF_ERROR(fin_cb());
85
0
    }
86
52
    return Status::OK();
87
52
}
88
89
template <bool accumulating>
90
inline void update_profile_from_inner_profile(const std::string& name,
91
                                              RuntimeProfile* runtime_profile,
92
978
                                              RuntimeProfile* inner_profile) {
93
978
    auto* inner_counter = inner_profile->get_counter(name);
94
978
    DCHECK(inner_counter != nullptr) << "inner counter " << name << " not found";
95
978
    if (inner_counter == nullptr) [[unlikely]] {
96
0
        return;
97
0
    }
98
978
    auto* counter = runtime_profile->get_counter(name);
99
978
    if (counter == nullptr) [[unlikely]] {
100
346
        counter = runtime_profile->add_counter(name, inner_counter->type(), "",
101
346
                                               inner_counter->level());
102
346
    }
103
978
    if constexpr (accumulating) {
104
        // Memory usage should not be accumulated.
105
447
        if (counter->type() == TUnit::BYTES) {
106
108
            counter->set(inner_counter->value());
107
339
        } else {
108
339
            counter->update(inner_counter->value());
109
339
        }
110
531
    } else {
111
531
        counter->set(inner_counter->value());
112
531
    }
113
978
}
_ZN5doris33update_profile_from_inner_profileILb0EEEvRKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_14RuntimeProfileESA_
Line
Count
Source
92
531
                                              RuntimeProfile* inner_profile) {
93
531
    auto* inner_counter = inner_profile->get_counter(name);
94
531
    DCHECK(inner_counter != nullptr) << "inner counter " << name << " not found";
95
531
    if (inner_counter == nullptr) [[unlikely]] {
96
0
        return;
97
0
    }
98
531
    auto* counter = runtime_profile->get_counter(name);
99
531
    if (counter == nullptr) [[unlikely]] {
100
247
        counter = runtime_profile->add_counter(name, inner_counter->type(), "",
101
247
                                               inner_counter->level());
102
247
    }
103
    if constexpr (accumulating) {
104
        // Memory usage should not be accumulated.
105
        if (counter->type() == TUnit::BYTES) {
106
            counter->set(inner_counter->value());
107
        } else {
108
            counter->update(inner_counter->value());
109
        }
110
531
    } else {
111
531
        counter->set(inner_counter->value());
112
531
    }
113
531
}
_ZN5doris33update_profile_from_inner_profileILb1EEEvRKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_14RuntimeProfileESA_
Line
Count
Source
92
447
                                              RuntimeProfile* inner_profile) {
93
447
    auto* inner_counter = inner_profile->get_counter(name);
94
447
    DCHECK(inner_counter != nullptr) << "inner counter " << name << " not found";
95
447
    if (inner_counter == nullptr) [[unlikely]] {
96
0
        return;
97
0
    }
98
447
    auto* counter = runtime_profile->get_counter(name);
99
447
    if (counter == nullptr) [[unlikely]] {
100
99
        counter = runtime_profile->add_counter(name, inner_counter->type(), "",
101
99
                                               inner_counter->level());
102
99
    }
103
447
    if constexpr (accumulating) {
104
        // Memory usage should not be accumulated.
105
447
        if (counter->type() == TUnit::BYTES) {
106
108
            counter->set(inner_counter->value());
107
339
        } else {
108
339
            counter->update(inner_counter->value());
109
339
        }
110
    } else {
111
        counter->set(inner_counter->value());
112
    }
113
447
}
114
115
#include "common/compile_check_end.h"
116
117
} // namespace doris