/root/doris/be/src/runtime/load_channel.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <atomic> |
21 | | #include <cstdint> |
22 | | #include <memory> |
23 | | #include <mutex> |
24 | | #include <ostream> |
25 | | #include <string> |
26 | | #include <unordered_map> |
27 | | #include <unordered_set> |
28 | | #include <utility> |
29 | | |
30 | | #include "common/status.h" |
31 | | #include "runtime/thread_context.h" |
32 | | #include "util/runtime_profile.h" |
33 | | #include "util/spinlock.h" |
34 | | #include "util/uid_util.h" |
35 | | |
36 | | namespace doris { |
37 | | |
38 | | class PTabletWriterOpenRequest; |
39 | | class PTabletWriterAddBlockRequest; |
40 | | class PTabletWriterAddBlockResult; |
41 | | class OpenPartitionRequest; |
42 | | class BaseTabletsChannel; |
43 | | |
44 | | // A LoadChannel manages tablets channels for all indexes |
45 | | // corresponding to a certain load job |
46 | | class LoadChannel { |
47 | | public: |
48 | | LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_high_priority, |
49 | | std::string sender_ip, int64_t backend_id, bool enable_profile, int64_t wg_id); |
50 | | ~LoadChannel(); |
51 | | |
52 | | // open a new load channel if not exist |
53 | | Status open(const PTabletWriterOpenRequest& request); |
54 | | |
55 | | // this batch must belong to a index in one transaction |
56 | | Status add_batch(const PTabletWriterAddBlockRequest& request, |
57 | | PTabletWriterAddBlockResult* response); |
58 | | |
59 | | // return true if this load channel has been opened and all tablets channels are closed then. |
60 | | bool is_finished(); |
61 | | |
62 | | Status cancel(); |
63 | | |
64 | 0 | time_t last_updated_time() const { return _last_updated_time.load(); } |
65 | | |
66 | 0 | const UniqueId& load_id() const { return _load_id; } |
67 | | |
68 | 0 | int64_t timeout() const { return _timeout_s; } |
69 | | |
70 | 0 | bool is_high_priority() const { return _is_high_priority; } |
71 | | |
72 | 0 | RuntimeProfile::Counter* get_mgr_add_batch_timer() { return _mgr_add_batch_timer; } |
73 | 0 | RuntimeProfile::Counter* get_handle_mem_limit_timer() { return _handle_mem_limit_timer; } |
74 | | |
75 | | protected: |
76 | | Status _get_tablets_channel(std::shared_ptr<BaseTabletsChannel>& channel, bool& is_finished, |
77 | | int64_t index_id); |
78 | | |
79 | | Status _handle_eos(BaseTabletsChannel* channel, const PTabletWriterAddBlockRequest& request, |
80 | | PTabletWriterAddBlockResult* response); |
81 | | |
82 | | void _init_profile(); |
83 | | // thread safety |
84 | | void _report_profile(PTabletWriterAddBlockResult* response); |
85 | | |
86 | | private: |
87 | | UniqueId _load_id; |
88 | | int64_t _txn_id = 0; |
89 | | |
90 | | SpinLock _profile_serialize_lock; |
91 | | std::unique_ptr<RuntimeProfile> _profile; |
92 | | RuntimeProfile* _self_profile = nullptr; |
93 | | RuntimeProfile::Counter* _add_batch_number_counter = nullptr; |
94 | | RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr; |
95 | | RuntimeProfile::Counter* _add_batch_timer = nullptr; |
96 | | RuntimeProfile::Counter* _add_batch_times = nullptr; |
97 | | RuntimeProfile::Counter* _mgr_add_batch_timer = nullptr; |
98 | | RuntimeProfile::Counter* _handle_mem_limit_timer = nullptr; |
99 | | RuntimeProfile::Counter* _handle_eos_timer = nullptr; |
100 | | |
101 | | // lock protect the tablets channel map |
102 | | std::mutex _lock; |
103 | | // index id -> tablets channel |
104 | | std::unordered_map<int64_t, std::shared_ptr<BaseTabletsChannel>> _tablets_channels; |
105 | | // index id -> (received rows, filtered rows) |
106 | | std::unordered_map<int64_t, std::pair<size_t, size_t>> _tablets_channels_rows; |
107 | | std::mutex _tablets_channels_lock; |
108 | | // This is to save finished channels id, to handle the retry request. |
109 | | std::unordered_set<int64_t> _finished_channel_ids; |
110 | | // set to true if at least one tablets channel has been opened |
111 | | bool _opened = false; |
112 | | |
113 | | QueryThreadContext _query_thread_context; |
114 | | |
115 | | std::atomic<time_t> _last_updated_time; |
116 | | |
117 | | // the timeout of this load job. |
118 | | // Timed out channels will be periodically deleted by LoadChannelMgr. |
119 | | int64_t _timeout_s; |
120 | | |
121 | | // true if this is a high priority load task |
122 | | bool _is_high_priority = false; |
123 | | |
124 | | // the ip where tablet sink locate |
125 | | std::string _sender_ip; |
126 | | |
127 | | int64_t _backend_id; |
128 | | |
129 | | bool _enable_profile; |
130 | | }; |
131 | | |
132 | 0 | inline std::ostream& operator<<(std::ostream& os, LoadChannel& load_channel) { |
133 | 0 | os << "LoadChannel(id=" << load_channel.load_id() |
134 | 0 | << ", last_update_time=" << static_cast<uint64_t>(load_channel.last_updated_time()) |
135 | 0 | << ", is high priority: " << load_channel.is_high_priority() << ")"; |
136 | 0 | return os; |
137 | 0 | } |
138 | | |
139 | | } // namespace doris |