Coverage Report

Created: 2026-05-11 13:39

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/workload_management/io_context.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include "common/factory_creator.h"
21
#include "runtime/runtime_profile.h"
22
#include "runtime/workload_management/io_throttle.h"
23
24
namespace doris {
25
26
class ResourceContext;
27
28
class IOContext : public std::enable_shared_from_this<IOContext> {
29
    ENABLE_FACTORY_CREATOR(IOContext);
30
31
public:
32
    /*
33
    * 1. operate them thread-safe.
34
    * 2. all tasks are unified.
35
    * 3. should not be operated frequently, use local variables to update Counter.
36
    */
37
    struct Stats {
38
        RuntimeProfile::Counter* scan_rows_counter_;
39
        RuntimeProfile::Counter* scan_bytes_counter_;
40
        RuntimeProfile::Counter* scan_bytes_from_local_storage_counter_;
41
        RuntimeProfile::Counter* scan_bytes_from_remote_storage_counter_;
42
        RuntimeProfile::Counter* bytes_write_into_cache_counter_;
43
44
        // number rows returned by query.
45
        // only set once by result sink when closing.
46
        RuntimeProfile::Counter* returned_rows_counter_;
47
        RuntimeProfile::Counter* process_rows_counter_;
48
        RuntimeProfile::Counter* shuffle_send_bytes_counter_;
49
        RuntimeProfile::Counter* shuffle_send_rows_counter_;
50
51
        RuntimeProfile::Counter* spill_write_bytes_to_local_storage_counter_;
52
        RuntimeProfile::Counter* spill_read_bytes_from_local_storage_counter_;
53
54
0
        RuntimeProfile* profile() { return profile_.get(); }
55
122k
        void init_profile() {
56
122k
            profile_ = std::make_unique<RuntimeProfile>("MemoryContext");
57
122k
            scan_rows_counter_ = ADD_COUNTER(profile_, "ScanRows", TUnit::UNIT);
58
122k
            scan_bytes_counter_ = ADD_COUNTER(profile_, "ScanBytes", TUnit::BYTES);
59
122k
            scan_bytes_from_local_storage_counter_ =
60
122k
                    ADD_COUNTER(profile_, "ScanBytesFromLocalStorage", TUnit::BYTES);
61
122k
            scan_bytes_from_remote_storage_counter_ =
62
122k
                    ADD_COUNTER(profile_, "ScanBytesFromRemoteStorage", TUnit::BYTES);
63
122k
            bytes_write_into_cache_counter_ =
64
122k
                    ADD_COUNTER(profile_, "BytesWriteIntoCache", TUnit::BYTES);
65
122k
            returned_rows_counter_ = ADD_COUNTER(profile_, "ReturnedRows", TUnit::UNIT);
66
122k
            process_rows_counter_ = ADD_COUNTER(profile_, "ProcessRows", TUnit::UNIT);
67
122k
            shuffle_send_bytes_counter_ = ADD_COUNTER(profile_, "ShuffleSendBytes", TUnit::BYTES);
68
122k
            shuffle_send_rows_counter_ =
69
122k
                    ADD_COUNTER(profile_, "ShuffleSendRowsCounter_", TUnit::UNIT);
70
122k
            spill_write_bytes_to_local_storage_counter_ =
71
122k
                    ADD_COUNTER(profile_, "SpillWriteBytesToLocalStorage", TUnit::BYTES);
72
122k
            spill_read_bytes_from_local_storage_counter_ =
73
122k
                    ADD_COUNTER(profile_, "SpillReadBytesFromLocalStorage", TUnit::BYTES);
74
122k
        }
75
0
        std::string debug_string() { return profile_->pretty_print(); }
76
77
    private:
78
        std::unique_ptr<RuntimeProfile> profile_;
79
    };
80
81
122k
    IOContext() { stats_.init_profile(); }
82
122k
    virtual ~IOContext() = default;
83
84
0
    RuntimeProfile* stats_profile() { return stats_.profile(); }
85
86
14
    int64_t scan_rows() const { return stats_.scan_rows_counter_->value(); }
87
8
    int64_t scan_bytes() const { return stats_.scan_bytes_counter_->value(); }
88
2
    int64_t scan_bytes_from_local_storage() const {
89
2
        return stats_.scan_bytes_from_local_storage_counter_->value();
90
2
    }
91
2
    int64_t scan_bytes_from_remote_storage() const {
92
2
        return stats_.scan_bytes_from_remote_storage_counter_->value();
93
2
    }
94
2
    int64_t bytes_write_into_cache() const {
95
2
        return stats_.bytes_write_into_cache_counter_->value();
96
2
    }
97
2
    int64_t returned_rows() const { return stats_.returned_rows_counter_->value(); }
98
2
    int64_t process_rows() const { return stats_.process_rows_counter_->value(); }
99
2
    int64_t shuffle_send_bytes() const { return stats_.shuffle_send_bytes_counter_->value(); }
100
2
    int64_t shuffle_send_rows() const { return stats_.shuffle_send_rows_counter_->value(); }
101
102
2
    int64_t spill_write_bytes_to_local_storage() const {
103
2
        return stats_.spill_write_bytes_to_local_storage_counter_->value();
104
2
    }
105
106
2
    int64_t spill_read_bytes_from_local_storage() const {
107
2
        return stats_.spill_read_bytes_from_local_storage_counter_->value();
108
2
    }
109
110
6
    void update_scan_rows(int64_t delta) const { stats_.scan_rows_counter_->update(delta); }
111
4
    void update_scan_bytes(int64_t delta) const { stats_.scan_bytes_counter_->update(delta); }
112
0
    void update_scan_bytes_from_local_storage(int64_t delta) const {
113
0
        stats_.scan_bytes_from_local_storage_counter_->update(delta);
114
0
    }
115
0
    void update_scan_bytes_from_remote_storage(int64_t delta) const {
116
0
        stats_.scan_bytes_from_remote_storage_counter_->update(delta);
117
0
    }
118
0
    void update_bytes_write_into_cache(int64_t delta) const {
119
0
        stats_.bytes_write_into_cache_counter_->update(delta);
120
0
    }
121
0
    void update_returned_rows(int64_t delta) const { stats_.returned_rows_counter_->update(delta); }
122
1.04k
    void update_process_rows(int64_t delta) const { stats_.process_rows_counter_->update(delta); }
123
0
    void update_shuffle_send_bytes(int64_t delta) const {
124
0
        stats_.shuffle_send_bytes_counter_->update(delta);
125
0
    }
126
0
    void update_shuffle_send_rows(int64_t delta) const {
127
0
        stats_.shuffle_send_rows_counter_->update(delta);
128
0
    }
129
130
699
    void update_spill_write_bytes_to_local_storage(int64_t delta) const {
131
699
        stats_.spill_write_bytes_to_local_storage_counter_->update(delta);
132
699
    }
133
134
329
    void update_spill_read_bytes_from_local_storage(int64_t delta) const {
135
329
        stats_.spill_read_bytes_from_local_storage_counter_->update(delta);
136
329
    }
137
138
0
    IOThrottle* io_throttle() {
139
0
        // TODO: get io throttle from workload group
140
0
        return nullptr;
141
0
    }
142
143
protected:
144
    friend class ResourceContext;
145
146
122k
    void set_resource_ctx(ResourceContext* resource_ctx) { resource_ctx_ = resource_ctx; }
147
148
    Stats stats_;
149
    ResourceContext* resource_ctx_ {nullptr};
150
};
151
152
} // namespace doris