Coverage Report

Created: 2026-03-15 17:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/workload_management/io_context.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include "common/factory_creator.h"
21
#include "runtime/runtime_profile.h"
22
#include "runtime/workload_management/io_throttle.h"
23
24
namespace doris {
25
#include "common/compile_check_begin.h"
26
27
class ResourceContext;
28
29
class IOContext : public std::enable_shared_from_this<IOContext> {
30
    ENABLE_FACTORY_CREATOR(IOContext);
31
32
public:
33
    /*
34
    * 1. operate them thread-safe.
35
    * 2. all tasks are unified.
36
    * 3. should not be operated frequently, use local variables to update Counter.
37
    */
38
    struct Stats {
39
        RuntimeProfile::Counter* scan_rows_counter_;
40
        RuntimeProfile::Counter* scan_bytes_counter_;
41
        RuntimeProfile::Counter* scan_bytes_from_local_storage_counter_;
42
        RuntimeProfile::Counter* scan_bytes_from_remote_storage_counter_;
43
        RuntimeProfile::Counter* bytes_write_into_cache_counter_;
44
45
        // number rows returned by query.
46
        // only set once by result sink when closing.
47
        RuntimeProfile::Counter* returned_rows_counter_;
48
        RuntimeProfile::Counter* shuffle_send_bytes_counter_;
49
        RuntimeProfile::Counter* shuffle_send_rows_counter_;
50
51
        RuntimeProfile::Counter* spill_write_bytes_to_local_storage_counter_;
52
        RuntimeProfile::Counter* spill_read_bytes_from_local_storage_counter_;
53
54
0
        RuntimeProfile* profile() { return profile_.get(); }
55
122k
        void init_profile() {
56
122k
            profile_ = std::make_unique<RuntimeProfile>("MemoryContext");
57
122k
            scan_rows_counter_ = ADD_COUNTER(profile_, "ScanRows", TUnit::UNIT);
58
122k
            scan_bytes_counter_ = ADD_COUNTER(profile_, "ScanBytes", TUnit::BYTES);
59
122k
            scan_bytes_from_local_storage_counter_ =
60
122k
                    ADD_COUNTER(profile_, "ScanBytesFromLocalStorage", TUnit::BYTES);
61
122k
            scan_bytes_from_remote_storage_counter_ =
62
122k
                    ADD_COUNTER(profile_, "ScanBytesFromRemoteStorage", TUnit::BYTES);
63
122k
            bytes_write_into_cache_counter_ =
64
122k
                    ADD_COUNTER(profile_, "BytesWriteIntoCache", TUnit::BYTES);
65
122k
            returned_rows_counter_ = ADD_COUNTER(profile_, "ReturnedRows", TUnit::UNIT);
66
122k
            shuffle_send_bytes_counter_ = ADD_COUNTER(profile_, "ShuffleSendBytes", TUnit::BYTES);
67
122k
            shuffle_send_rows_counter_ =
68
122k
                    ADD_COUNTER(profile_, "ShuffleSendRowsCounter_", TUnit::UNIT);
69
122k
            spill_write_bytes_to_local_storage_counter_ =
70
122k
                    ADD_COUNTER(profile_, "SpillWriteBytesToLocalStorage", TUnit::BYTES);
71
122k
            spill_read_bytes_from_local_storage_counter_ =
72
122k
                    ADD_COUNTER(profile_, "SpillReadBytesFromLocalStorage", TUnit::BYTES);
73
122k
        }
74
0
        std::string debug_string() { return profile_->pretty_print(); }
75
76
    private:
77
        std::unique_ptr<RuntimeProfile> profile_;
78
    };
79
80
122k
    IOContext() { stats_.init_profile(); }
81
122k
    virtual ~IOContext() = default;
82
83
0
    RuntimeProfile* stats_profile() { return stats_.profile(); }
84
85
12
    int64_t scan_rows() const { return stats_.scan_rows_counter_->value(); }
86
6
    int64_t scan_bytes() const { return stats_.scan_bytes_counter_->value(); }
87
0
    int64_t scan_bytes_from_local_storage() const {
88
0
        return stats_.scan_bytes_from_local_storage_counter_->value();
89
0
    }
90
0
    int64_t scan_bytes_from_remote_storage() const {
91
0
        return stats_.scan_bytes_from_remote_storage_counter_->value();
92
0
    }
93
0
    int64_t bytes_write_into_cache() const {
94
0
        return stats_.bytes_write_into_cache_counter_->value();
95
0
    }
96
0
    int64_t returned_rows() const { return stats_.returned_rows_counter_->value(); }
97
0
    int64_t shuffle_send_bytes() const { return stats_.shuffle_send_bytes_counter_->value(); }
98
0
    int64_t shuffle_send_rows() const { return stats_.shuffle_send_rows_counter_->value(); }
99
100
0
    int64_t spill_write_bytes_to_local_storage() const {
101
0
        return stats_.spill_write_bytes_to_local_storage_counter_->value();
102
0
    }
103
104
0
    int64_t spill_read_bytes_from_local_storage() const {
105
0
        return stats_.spill_read_bytes_from_local_storage_counter_->value();
106
0
    }
107
108
6
    void update_scan_rows(int64_t delta) const { stats_.scan_rows_counter_->update(delta); }
109
4
    void update_scan_bytes(int64_t delta) const { stats_.scan_bytes_counter_->update(delta); }
110
0
    void update_scan_bytes_from_local_storage(int64_t delta) const {
111
0
        stats_.scan_bytes_from_local_storage_counter_->update(delta);
112
0
    }
113
0
    void update_scan_bytes_from_remote_storage(int64_t delta) const {
114
0
        stats_.scan_bytes_from_remote_storage_counter_->update(delta);
115
0
    }
116
0
    void update_bytes_write_into_cache(int64_t delta) const {
117
0
        stats_.bytes_write_into_cache_counter_->update(delta);
118
0
    }
119
0
    void update_returned_rows(int64_t delta) const { stats_.returned_rows_counter_->update(delta); }
120
0
    void update_shuffle_send_bytes(int64_t delta) const {
121
0
        stats_.shuffle_send_bytes_counter_->update(delta);
122
0
    }
123
0
    void update_shuffle_send_rows(int64_t delta) const {
124
0
        stats_.shuffle_send_rows_counter_->update(delta);
125
0
    }
126
127
324
    void update_spill_write_bytes_to_local_storage(int64_t delta) const {
128
324
        stats_.spill_write_bytes_to_local_storage_counter_->update(delta);
129
324
    }
130
131
166
    void update_spill_read_bytes_from_local_storage(int64_t delta) const {
132
166
        stats_.spill_read_bytes_from_local_storage_counter_->update(delta);
133
166
    }
134
135
0
    IOThrottle* io_throttle() {
136
0
        // TODO: get io throttle from workload group
137
0
        return nullptr;
138
0
    }
139
140
protected:
141
    friend class ResourceContext;
142
143
122k
    void set_resource_ctx(ResourceContext* resource_ctx) { resource_ctx_ = resource_ctx; }
144
145
    Stats stats_;
146
    ResourceContext* resource_ctx_ {nullptr};
147
};
148
149
#include "common/compile_check_end.h"
150
} // namespace doris