Coverage Report

Created: 2026-04-15 19:34

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/workload_management/io_context.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include "common/factory_creator.h"
21
#include "runtime/runtime_profile.h"
22
#include "runtime/workload_management/io_throttle.h"
23
24
namespace doris {
25
26
class ResourceContext;
27
28
class IOContext : public std::enable_shared_from_this<IOContext> {
29
    ENABLE_FACTORY_CREATOR(IOContext);
30
31
public:
32
    /*
33
    * 1. operate them thread-safe.
34
    * 2. all tasks are unified.
35
    * 3. should not be operated frequently, use local variables to update Counter.
36
    */
37
    struct Stats {
38
        RuntimeProfile::Counter* scan_rows_counter_;
39
        RuntimeProfile::Counter* scan_bytes_counter_;
40
        RuntimeProfile::Counter* scan_bytes_from_local_storage_counter_;
41
        RuntimeProfile::Counter* scan_bytes_from_remote_storage_counter_;
42
        RuntimeProfile::Counter* bytes_write_into_cache_counter_;
43
44
        // number rows returned by query.
45
        // only set once by result sink when closing.
46
        RuntimeProfile::Counter* returned_rows_counter_;
47
        RuntimeProfile::Counter* shuffle_send_bytes_counter_;
48
        RuntimeProfile::Counter* shuffle_send_rows_counter_;
49
50
        RuntimeProfile::Counter* spill_write_bytes_to_local_storage_counter_;
51
        RuntimeProfile::Counter* spill_read_bytes_from_local_storage_counter_;
52
53
0
        RuntimeProfile* profile() { return profile_.get(); }
54
122k
        void init_profile() {
55
122k
            profile_ = std::make_unique<RuntimeProfile>("MemoryContext");
56
122k
            scan_rows_counter_ = ADD_COUNTER(profile_, "ScanRows", TUnit::UNIT);
57
122k
            scan_bytes_counter_ = ADD_COUNTER(profile_, "ScanBytes", TUnit::BYTES);
58
122k
            scan_bytes_from_local_storage_counter_ =
59
122k
                    ADD_COUNTER(profile_, "ScanBytesFromLocalStorage", TUnit::BYTES);
60
122k
            scan_bytes_from_remote_storage_counter_ =
61
122k
                    ADD_COUNTER(profile_, "ScanBytesFromRemoteStorage", TUnit::BYTES);
62
122k
            bytes_write_into_cache_counter_ =
63
122k
                    ADD_COUNTER(profile_, "BytesWriteIntoCache", TUnit::BYTES);
64
122k
            returned_rows_counter_ = ADD_COUNTER(profile_, "ReturnedRows", TUnit::UNIT);
65
122k
            shuffle_send_bytes_counter_ = ADD_COUNTER(profile_, "ShuffleSendBytes", TUnit::BYTES);
66
122k
            shuffle_send_rows_counter_ =
67
122k
                    ADD_COUNTER(profile_, "ShuffleSendRowsCounter_", TUnit::UNIT);
68
122k
            spill_write_bytes_to_local_storage_counter_ =
69
122k
                    ADD_COUNTER(profile_, "SpillWriteBytesToLocalStorage", TUnit::BYTES);
70
122k
            spill_read_bytes_from_local_storage_counter_ =
71
122k
                    ADD_COUNTER(profile_, "SpillReadBytesFromLocalStorage", TUnit::BYTES);
72
122k
        }
73
0
        std::string debug_string() { return profile_->pretty_print(); }
74
75
    private:
76
        std::unique_ptr<RuntimeProfile> profile_;
77
    };
78
79
122k
    IOContext() { stats_.init_profile(); }
80
122k
    virtual ~IOContext() = default;
81
82
0
    RuntimeProfile* stats_profile() { return stats_.profile(); }
83
84
12
    int64_t scan_rows() const { return stats_.scan_rows_counter_->value(); }
85
6
    int64_t scan_bytes() const { return stats_.scan_bytes_counter_->value(); }
86
0
    int64_t scan_bytes_from_local_storage() const {
87
0
        return stats_.scan_bytes_from_local_storage_counter_->value();
88
0
    }
89
0
    int64_t scan_bytes_from_remote_storage() const {
90
0
        return stats_.scan_bytes_from_remote_storage_counter_->value();
91
0
    }
92
0
    int64_t bytes_write_into_cache() const {
93
0
        return stats_.bytes_write_into_cache_counter_->value();
94
0
    }
95
0
    int64_t returned_rows() const { return stats_.returned_rows_counter_->value(); }
96
0
    int64_t shuffle_send_bytes() const { return stats_.shuffle_send_bytes_counter_->value(); }
97
0
    int64_t shuffle_send_rows() const { return stats_.shuffle_send_rows_counter_->value(); }
98
99
0
    int64_t spill_write_bytes_to_local_storage() const {
100
0
        return stats_.spill_write_bytes_to_local_storage_counter_->value();
101
0
    }
102
103
0
    int64_t spill_read_bytes_from_local_storage() const {
104
0
        return stats_.spill_read_bytes_from_local_storage_counter_->value();
105
0
    }
106
107
6
    void update_scan_rows(int64_t delta) const { stats_.scan_rows_counter_->update(delta); }
108
4
    void update_scan_bytes(int64_t delta) const { stats_.scan_bytes_counter_->update(delta); }
109
0
    void update_scan_bytes_from_local_storage(int64_t delta) const {
110
0
        stats_.scan_bytes_from_local_storage_counter_->update(delta);
111
0
    }
112
0
    void update_scan_bytes_from_remote_storage(int64_t delta) const {
113
0
        stats_.scan_bytes_from_remote_storage_counter_->update(delta);
114
0
    }
115
0
    void update_bytes_write_into_cache(int64_t delta) const {
116
0
        stats_.bytes_write_into_cache_counter_->update(delta);
117
0
    }
118
0
    void update_returned_rows(int64_t delta) const { stats_.returned_rows_counter_->update(delta); }
119
0
    void update_shuffle_send_bytes(int64_t delta) const {
120
0
        stats_.shuffle_send_bytes_counter_->update(delta);
121
0
    }
122
0
    void update_shuffle_send_rows(int64_t delta) const {
123
0
        stats_.shuffle_send_rows_counter_->update(delta);
124
0
    }
125
126
700
    void update_spill_write_bytes_to_local_storage(int64_t delta) const {
127
700
        stats_.spill_write_bytes_to_local_storage_counter_->update(delta);
128
700
    }
129
130
331
    void update_spill_read_bytes_from_local_storage(int64_t delta) const {
131
331
        stats_.spill_read_bytes_from_local_storage_counter_->update(delta);
132
331
    }
133
134
0
    IOThrottle* io_throttle() {
135
0
        // TODO: get io throttle from workload group
136
0
        return nullptr;
137
0
    }
138
139
protected:
140
    friend class ResourceContext;
141
142
122k
    void set_resource_ctx(ResourceContext* resource_ctx) { resource_ctx_ = resource_ctx; }
143
144
    Stats stats_;
145
    ResourceContext* resource_ctx_ {nullptr};
146
};
147
148
} // namespace doris