Coverage Report

Created: 2024-11-20 12:56

/root/doris/be/src/runtime/load_channel.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/load_channel.h"
19
20
#include <gen_cpp/internal_service.pb.h>
21
#include <glog/logging.h>
22
23
#include "runtime/memory/mem_tracker.h"
24
#include "runtime/tablets_channel.h"
25
26
namespace doris {
27
28
LoadChannel::LoadChannel(const UniqueId& load_id, std::unique_ptr<MemTracker> mem_tracker,
29
                         int64_t timeout_s, bool is_high_priority, const std::string& sender_ip,
30
                         int64_t backend_id, bool enable_profile)
31
        : _load_id(load_id),
32
          _mem_tracker(std::move(mem_tracker)),
33
          _timeout_s(timeout_s),
34
          _is_high_priority(is_high_priority),
35
          _sender_ip(sender_ip),
36
          _backend_id(backend_id),
37
0
          _enable_profile(enable_profile) {
38
    // _last_updated_time should be set before being inserted to
39
    // _load_channels in load_channel_mgr, or it may be erased
40
    // immediately by gc thread.
41
0
    _last_updated_time.store(time(nullptr));
42
0
    _init_profile();
43
0
}
44
45
0
LoadChannel::~LoadChannel() {
46
0
    LOG(INFO) << "load channel removed. mem peak usage=" << _mem_tracker->peak_consumption()
47
0
              << ", info=" << _mem_tracker->debug_string() << ", load_id=" << _load_id
48
0
              << ", is high priority=" << _is_high_priority << ", sender_ip=" << _sender_ip;
49
0
}
50
51
0
void LoadChannel::_init_profile() {
52
0
    _profile = std::make_unique<RuntimeProfile>("LoadChannels");
53
0
    _mgr_add_batch_timer = ADD_TIMER(_profile, "LoadChannelMgrAddBatchTime");
54
0
    _handle_mem_limit_timer = ADD_TIMER(_profile, "HandleMemLimitTime");
55
0
    _self_profile =
56
0
            _profile->create_child(fmt::format("LoadChannel load_id={} (host={}, backend_id={})",
57
0
                                               _load_id.to_string(), _sender_ip, _backend_id),
58
0
                                   true, true);
59
0
    _add_batch_number_counter = ADD_COUNTER(_self_profile, "NumberBatchAdded", TUnit::UNIT);
60
0
    _peak_memory_usage_counter = ADD_COUNTER(_self_profile, "PeakMemoryUsage", TUnit::BYTES);
61
0
    _add_batch_timer = ADD_TIMER(_self_profile, "AddBatchTime");
62
0
    _handle_eos_timer = ADD_CHILD_TIMER(_self_profile, "HandleEosTime", "AddBatchTime");
63
0
    _add_batch_times = ADD_COUNTER(_self_profile, "AddBatchTimes", TUnit::UNIT);
64
0
}
65
66
0
Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
67
0
    int64_t index_id = params.index_id();
68
0
    std::shared_ptr<TabletsChannel> channel;
69
0
    {
70
0
        std::lock_guard<std::mutex> l(_lock);
71
0
        auto it = _tablets_channels.find(index_id);
72
0
        if (it != _tablets_channels.end()) {
73
0
            channel = it->second;
74
0
        } else {
75
            // create a new tablets channel
76
0
            TabletsChannelKey key(params.id(), index_id);
77
0
            channel = std::make_shared<TabletsChannel>(key, _load_id, _is_high_priority,
78
0
                                                       _self_profile);
79
0
            {
80
0
                std::lock_guard<SpinLock> l(_tablets_channels_lock);
81
0
                _tablets_channels.insert({index_id, channel});
82
0
            }
83
0
        }
84
0
    }
85
86
0
    RETURN_IF_ERROR(channel->open(params));
87
88
0
    _opened = true;
89
0
    _last_updated_time.store(time(nullptr));
90
0
    return Status::OK();
91
0
}
92
93
Status LoadChannel::_get_tablets_channel(std::shared_ptr<TabletsChannel>& channel,
94
0
                                         bool& is_finished, const int64_t index_id) {
95
0
    std::lock_guard<std::mutex> l(_lock);
96
0
    auto it = _tablets_channels.find(index_id);
97
0
    if (it == _tablets_channels.end()) {
98
0
        if (_finished_channel_ids.find(index_id) != _finished_channel_ids.end()) {
99
            // this channel is already finished, just return OK
100
0
            is_finished = true;
101
0
            return Status::OK();
102
0
        }
103
0
        std::stringstream ss;
104
0
        ss << "load channel " << _load_id << " add batch with unknown index id: " << index_id;
105
0
        return Status::InternalError(ss.str());
106
0
    }
107
108
0
    is_finished = false;
109
0
    channel = it->second;
110
0
    return Status::OK();
111
0
}
112
113
Status LoadChannel::add_batch(const PTabletWriterAddBlockRequest& request,
114
0
                              PTabletWriterAddBlockResult* response) {
115
0
    SCOPED_TIMER(_add_batch_timer);
116
0
    COUNTER_UPDATE(_add_batch_times, 1);
117
0
    int64_t index_id = request.index_id();
118
    // 1. get tablets channel
119
0
    std::shared_ptr<TabletsChannel> channel;
120
0
    bool is_finished = false;
121
0
    Status st = _get_tablets_channel(channel, is_finished, index_id);
122
0
    if (!st.ok() || is_finished) {
123
0
        return st;
124
0
    }
125
126
    // 2. add block to tablets channel
127
0
    if (request.has_block()) {
128
0
        RETURN_IF_ERROR(channel->add_batch(request, response));
129
0
        _add_batch_number_counter->update(1);
130
0
    }
131
132
    // 3. handle eos
133
0
    if (request.has_eos() && request.eos()) {
134
0
        st = _handle_eos(channel, request, response);
135
0
        _report_profile(response);
136
0
        if (!st.ok()) {
137
0
            return st;
138
0
        }
139
0
    } else if (_add_batch_number_counter->value() % 100 == 1) {
140
0
        _report_profile(response);
141
0
    }
142
0
    _last_updated_time.store(time(nullptr));
143
0
    return st;
144
0
}
145
146
0
void LoadChannel::_report_profile(PTabletWriterAddBlockResult* response) {
147
0
    if (!_enable_profile) {
148
0
        return;
149
0
    }
150
151
0
    COUNTER_SET(_peak_memory_usage_counter, _mem_tracker->peak_consumption());
152
    // TabletSink and LoadChannel in BE are M: N relationship,
153
    // Every once in a while LoadChannel will randomly return its own runtime profile to a TabletSink,
154
    // so usually all LoadChannel runtime profiles are saved on each TabletSink,
155
    // and the timeliness of the same LoadChannel profile saved on different TabletSinks is different,
156
    // and each TabletSink will periodically send fe reports all the LoadChannel profiles saved by itself,
157
    // and ensures to update the latest LoadChannel profile according to the timestamp.
158
0
    _self_profile->set_timestamp(_last_updated_time);
159
160
0
    {
161
0
        std::lock_guard<SpinLock> l(_tablets_channels_lock);
162
0
        for (auto& it : _tablets_channels) {
163
0
            it.second->refresh_profile();
164
0
        }
165
0
    }
166
167
0
    TRuntimeProfileTree tprofile;
168
0
    ThriftSerializer ser(false, 4096);
169
0
    uint8_t* buf = nullptr;
170
0
    uint32_t len = 0;
171
0
    std::lock_guard<SpinLock> l(_profile_serialize_lock);
172
0
    _profile->to_thrift(&tprofile);
173
0
    auto st = ser.serialize(&tprofile, &len, &buf);
174
0
    if (st.ok()) {
175
0
        response->set_load_channel_profile(std::string((const char*)buf, len));
176
0
    } else {
177
0
        LOG(WARNING) << "load channel TRuntimeProfileTree serialize failed, errmsg=" << st;
178
0
    }
179
0
}
180
181
0
bool LoadChannel::is_finished() {
182
0
    if (!_opened) {
183
0
        return false;
184
0
    }
185
0
    std::lock_guard<std::mutex> l(_lock);
186
0
    return _tablets_channels.empty();
187
0
}
188
189
0
Status LoadChannel::cancel() {
190
0
    std::lock_guard<std::mutex> l(_lock);
191
0
    for (auto& it : _tablets_channels) {
192
0
        it.second->cancel();
193
0
    }
194
0
    return Status::OK();
195
0
}
196
197
} // namespace doris