Coverage Report

Created: 2024-11-20 21:05

/root/doris/be/src/runtime/load_channel.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <atomic>
21
#include <cstdint>
22
#include <memory>
23
#include <mutex>
24
#include <ostream>
25
#include <string>
26
#include <unordered_map>
27
#include <unordered_set>
28
#include <utility>
29
30
#include "common/status.h"
31
#include "runtime/thread_context.h"
32
#include "util/runtime_profile.h"
33
#include "util/spinlock.h"
34
#include "util/uid_util.h"
35
36
namespace doris {
37
38
class PTabletWriterOpenRequest;
39
class PTabletWriterAddBlockRequest;
40
class PTabletWriterAddBlockResult;
41
class OpenPartitionRequest;
42
class BaseTabletsChannel;
43
44
// A LoadChannel manages tablets channels for all indexes
45
// corresponding to a certain load job
46
class LoadChannel {
47
public:
48
    LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_high_priority,
49
                std::string sender_ip, int64_t backend_id, bool enable_profile, int64_t wg_id);
50
    ~LoadChannel();
51
52
    // open a new load channel if not exist
53
    Status open(const PTabletWriterOpenRequest& request);
54
55
    // this batch must belong to a index in one transaction
56
    Status add_batch(const PTabletWriterAddBlockRequest& request,
57
                     PTabletWriterAddBlockResult* response);
58
59
    // return true if this load channel has been opened and all tablets channels are closed then.
60
    bool is_finished();
61
62
    Status cancel();
63
64
0
    time_t last_updated_time() const { return _last_updated_time.load(); }
65
66
0
    const UniqueId& load_id() const { return _load_id; }
67
68
0
    int64_t timeout() const { return _timeout_s; }
69
70
0
    bool is_high_priority() const { return _is_high_priority; }
71
72
0
    RuntimeProfile::Counter* get_mgr_add_batch_timer() { return _mgr_add_batch_timer; }
73
0
    RuntimeProfile::Counter* get_handle_mem_limit_timer() { return _handle_mem_limit_timer; }
74
75
protected:
76
    Status _get_tablets_channel(std::shared_ptr<BaseTabletsChannel>& channel, bool& is_finished,
77
                                int64_t index_id);
78
79
    Status _handle_eos(BaseTabletsChannel* channel, const PTabletWriterAddBlockRequest& request,
80
                       PTabletWriterAddBlockResult* response);
81
82
    void _init_profile();
83
    // thread safety
84
    void _report_profile(PTabletWriterAddBlockResult* response);
85
86
private:
87
    UniqueId _load_id;
88
    int64_t _txn_id = 0;
89
90
    SpinLock _profile_serialize_lock;
91
    std::unique_ptr<RuntimeProfile> _profile;
92
    RuntimeProfile* _self_profile = nullptr;
93
    RuntimeProfile::Counter* _add_batch_number_counter = nullptr;
94
    RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr;
95
    RuntimeProfile::Counter* _add_batch_timer = nullptr;
96
    RuntimeProfile::Counter* _add_batch_times = nullptr;
97
    RuntimeProfile::Counter* _mgr_add_batch_timer = nullptr;
98
    RuntimeProfile::Counter* _handle_mem_limit_timer = nullptr;
99
    RuntimeProfile::Counter* _handle_eos_timer = nullptr;
100
101
    // lock protect the tablets channel map
102
    std::mutex _lock;
103
    // index id -> tablets channel
104
    std::unordered_map<int64_t, std::shared_ptr<BaseTabletsChannel>> _tablets_channels;
105
    // index id -> (received rows, filtered rows)
106
    std::unordered_map<int64_t, std::pair<size_t, size_t>> _tablets_channels_rows;
107
    std::mutex _tablets_channels_lock;
108
    // This is to save finished channels id, to handle the retry request.
109
    std::unordered_set<int64_t> _finished_channel_ids;
110
    // set to true if at least one tablets channel has been opened
111
    bool _opened = false;
112
113
    QueryThreadContext _query_thread_context;
114
115
    std::atomic<time_t> _last_updated_time;
116
117
    // the timeout of this load job.
118
    // Timed out channels will be periodically deleted by LoadChannelMgr.
119
    int64_t _timeout_s;
120
121
    // true if this is a high priority load task
122
    bool _is_high_priority = false;
123
124
    // the ip where tablet sink locate
125
    std::string _sender_ip;
126
127
    int64_t _backend_id;
128
129
    bool _enable_profile;
130
};
131
132
0
inline std::ostream& operator<<(std::ostream& os, LoadChannel& load_channel) {
133
0
    os << "LoadChannel(id=" << load_channel.load_id()
134
0
       << ", last_update_time=" << static_cast<uint64_t>(load_channel.last_updated_time())
135
0
       << ", is high priority: " << load_channel.is_high_priority() << ")";
136
0
    return os;
137
0
}
138
139
} // namespace doris