be/src/load/stream_load/stream_load_context.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <gen_cpp/BackendService_types.h> |
21 | | #include <gen_cpp/FrontendService_types.h> |
22 | | #include <gen_cpp/PlanNodes_types.h> |
23 | | #include <gen_cpp/Types_types.h> |
24 | | #include <stddef.h> |
25 | | #include <stdint.h> |
26 | | |
27 | | #include <condition_variable> |
28 | | #include <future> |
29 | | #include <map> |
30 | | #include <memory> |
31 | | #include <string> |
32 | | #include <utility> |
33 | | #include <vector> |
34 | | |
35 | | #include "common/config.h" |
36 | | #include "common/logging.h" |
37 | | #include "common/status.h" |
38 | | #include "common/utils.h" |
39 | | #include "load/stream_load/stream_load_executor.h" |
40 | | #include "runtime/exec_env.h" |
41 | | #include "runtime/thread_context.h" |
42 | | #include "util/byte_buffer.h" |
43 | | #include "util/time.h" |
44 | | #include "util/uid_util.h" |
45 | | |
46 | | namespace doris { |
47 | | namespace io { |
48 | | class StreamLoadPipe; |
49 | | } // namespace io |
50 | | |
51 | | // kafka related info |
52 | | class KafkaLoadInfo { |
53 | | public: |
54 | | KafkaLoadInfo(const TKafkaLoadInfo& t_info) |
55 | 1 | : brokers(t_info.brokers), |
56 | 1 | topic(t_info.topic), |
57 | 1 | begin_offset(t_info.partition_begin_offset), |
58 | 1 | properties(t_info.properties) { |
59 | | // The offset(begin_offset) sent from FE is the starting offset, |
60 | | // and the offset(cmt_offset) reported by BE to FE is the consumed offset, |
61 | | // so we need to minus 1 here. |
62 | 1 | for (auto& p : t_info.partition_begin_offset) { |
63 | 1 | cmt_offset[p.first] = p.second - 1; |
64 | 1 | } |
65 | 1 | } |
66 | | |
67 | 0 | void reset_offset() { |
68 | 0 | // reset the commit offset |
69 | 0 | for (auto& p : begin_offset) { |
70 | 0 | cmt_offset[p.first] = p.second - 1; |
71 | 0 | } |
72 | 0 | } |
73 | | |
74 | | public: |
75 | | std::string brokers; |
76 | | std::string topic; |
77 | | |
78 | | // the following members control the max progress of a consuming |
79 | | // process. if any of them reach, the consuming will finish. |
80 | | int64_t max_interval_s = 5; |
81 | | int64_t max_batch_rows = 1024; |
82 | | int64_t max_batch_size = 100 * 1024 * 1024; // 100MB |
83 | | |
84 | | // partition -> begin offset, inclusive. |
85 | | std::map<int32_t, int64_t> begin_offset; |
86 | | // partition -> commit offset, inclusive. |
87 | | std::map<int32_t, int64_t> cmt_offset; |
88 | | //custom kafka property key -> value |
89 | | std::map<std::string, std::string> properties; |
90 | | }; |
91 | | |
92 | | class MessageBodySink; |
93 | | |
94 | | class StreamLoadContext { |
95 | | ENABLE_FACTORY_CREATOR(StreamLoadContext); |
96 | | |
97 | | public: |
98 | 5 | StreamLoadContext(ExecEnv* exec_env) : id(UniqueId::gen_uid()), _exec_env(exec_env) { |
99 | 5 | start_millis = UnixMillis(); |
100 | 5 | } |
101 | | |
102 | 5 | ~StreamLoadContext() { |
103 | 5 | if (need_rollback) { |
104 | 0 | _exec_env->stream_load_executor()->rollback_txn(this); |
105 | 0 | need_rollback = false; |
106 | 0 | } |
107 | 5 | } |
108 | | |
109 | | std::string data_saved_path; |
110 | | |
111 | | std::string to_json() const; |
112 | | |
113 | | std::string prepare_stream_load_record(const std::string& stream_load_record); |
114 | | static void parse_stream_load_record(const std::string& stream_load_record, |
115 | | TStreamLoadRecord& stream_load_item); |
116 | | |
117 | | // the old mini load result format is not same as stream load. |
118 | | // add this function for compatible with old mini load result format. |
119 | | std::string to_json_for_mini_load() const; |
120 | | |
121 | | // return the brief info of this context. |
122 | | // also print the load source info if detail is set to true |
123 | | std::string brief(bool detail = false) const; |
124 | | |
125 | | bool is_mow_table() const; |
126 | | |
127 | 0 | Status allocate_schema_buffer() { |
128 | 0 | if (_schema_buffer == nullptr) { |
129 | 0 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
130 | 0 | ExecEnv::GetInstance()->stream_load_pipe_tracker()); |
131 | 0 | return ByteBuffer::allocate(config::stream_tvf_buffer_size, &_schema_buffer); |
132 | 0 | } |
133 | 0 | return Status::OK(); |
134 | 0 | } |
135 | | |
136 | 0 | ByteBufferPtr schema_buffer() { return _schema_buffer; } |
137 | | |
138 | | public: |
139 | | static const int default_txn_id = -1; |
140 | | // load type, eg: ROUTINE LOAD/MANUAL LOAD |
141 | | TLoadType::type load_type = TLoadType::type::MANUL_LOAD; |
142 | | // load data source: eg: KAFKA/RAW |
143 | | TLoadSourceType::type load_src_type; |
144 | | |
145 | | // the job this stream load task belongs to, |
146 | | // set to -1 if there is no job |
147 | | int64_t job_id = -1; |
148 | | |
149 | | // id for each load |
150 | | UniqueId id; |
151 | | |
152 | | std::string db; |
153 | | int64_t db_id = -1; |
154 | | int64_t wal_id = -1; |
155 | | std::string table; |
156 | | int64_t table_id = -1; |
157 | | int64_t schema_version = -1; |
158 | | std::string label; |
159 | | std::string sql_str; |
160 | | // optional |
161 | | std::string sub_label; |
162 | | double max_filter_ratio = 0.0; |
163 | | int32_t timeout_second = -1; |
164 | | AuthInfo auth; |
165 | | bool two_phase_commit = false; |
166 | | std::string load_comment; |
167 | | |
168 | | // the following members control the max progress of a consuming |
169 | | // process. if any of them reach, the consuming will finish. |
170 | | // same as values set in fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java |
171 | | int64_t max_interval_s = 60; |
172 | | int64_t max_batch_rows = 20000000; |
173 | | int64_t max_batch_size = 1024 * 1024 * 1024; // 1GB |
174 | | |
175 | | // for parse json-data |
176 | | std::string data_format = ""; |
177 | | std::string jsonpath_file = ""; |
178 | | std::string jsonpath = ""; |
179 | | |
180 | | // only used to check if we receive whole body |
181 | | size_t body_bytes = 0; |
182 | | size_t receive_bytes = 0; |
183 | | bool is_chunked_transfer = false; |
184 | | |
185 | | int64_t txn_id = default_txn_id; |
186 | | |
187 | | // http stream |
188 | | bool is_read_schema = true; |
189 | | |
190 | | std::string txn_operation = ""; |
191 | | |
192 | | bool need_rollback = false; |
193 | | // when use_streaming is true, we use stream_pipe to send source data, |
194 | | // otherwise we save source data to file first, then process it. |
195 | | bool use_streaming = false; |
196 | | TFileFormatType::type format = TFileFormatType::FORMAT_CSV_PLAIN; |
197 | | TFileCompressType::type compress_type = TFileCompressType::UNKNOWN; |
198 | | bool group_commit = false; |
199 | | |
200 | | std::shared_ptr<MessageBodySink> body_sink; |
201 | | std::shared_ptr<io::StreamLoadPipe> pipe; |
202 | | |
203 | | TStreamLoadPutResult put_result; |
204 | | TStreamLoadMultiTablePutResult multi_table_put_result; |
205 | | |
206 | | std::vector<TTabletCommitInfo> commit_infos; |
207 | | |
208 | | std::promise<Status> load_status_promise; |
209 | | std::future<Status> load_status_future = load_status_promise.get_future(); |
210 | | |
211 | | Status status; |
212 | | |
213 | | int64_t number_total_rows = 0; |
214 | | int64_t number_loaded_rows = 0; |
215 | | int64_t number_filtered_rows = 0; |
216 | | int64_t number_unselected_rows = 0; |
217 | | int64_t loaded_bytes = 0; |
218 | | int64_t start_millis = 0; |
219 | | int64_t start_write_data_nanos = 0; |
220 | | int64_t load_cost_millis = 0; |
221 | | int64_t begin_txn_cost_nanos = 0; |
222 | | int64_t stream_load_put_cost_nanos = 0; |
223 | | int64_t commit_and_publish_txn_cost_nanos = 0; |
224 | | int64_t pre_commit_txn_cost_nanos = 0; |
225 | | int64_t read_data_cost_nanos = 0; |
226 | | int64_t write_data_cost_nanos = 0; |
227 | | int64_t receive_and_read_data_cost_nanos = 0; |
228 | | int64_t begin_receive_and_read_data_cost_nanos = 0; |
229 | | |
230 | | std::string error_url = ""; |
231 | | std::string first_error_msg = ""; |
232 | | // if label already be used, set existing job's status here |
233 | | // should be RUNNING or FINISHED |
234 | | std::string existing_job_status = ""; |
235 | | |
236 | | std::unique_ptr<KafkaLoadInfo> kafka_info; |
237 | | |
238 | | // consumer_id is used for data consumer cache key. |
239 | | // to identified a specified data consumer. |
240 | | int64_t consumer_id; |
241 | | |
242 | | // If this is an transactional insert operation, this will be true |
243 | | bool need_commit_self = false; |
244 | | |
245 | | // csv with header type |
246 | | std::string header_type = ""; |
247 | | |
248 | | // is this load single-stream-multi-table? |
249 | | bool is_multi_table = false; |
250 | | |
251 | | // for single-stream-multi-table, we have table list |
252 | | std::vector<std::string> table_list; |
253 | | |
254 | | bool memtable_on_sink_node = false; |
255 | | |
256 | | // use for cloud cluster mode |
257 | | std::string qualified_user; |
258 | | std::string cloud_cluster; |
259 | | |
260 | | // 1. _can_send_reply: Ensure `send_reply` is invoked only after on_header/handle complete, |
261 | | // avoid client errors (e.g., broken pipe). |
262 | | // 2. _finish_send_reply: Prevent duplicate reply sending; skip reply if HTTP request is canceled |
263 | | // due to long import execution time. |
264 | | std::mutex _send_reply_lock; |
265 | | std::condition_variable _can_send_reply_cv; |
266 | | bool _can_send_reply = false; |
267 | | bool _finish_send_reply = false; |
268 | | |
269 | | public: |
270 | 1 | ExecEnv* exec_env() { return _exec_env; } |
271 | | |
272 | | private: |
273 | | ExecEnv* _exec_env = nullptr; |
274 | | ByteBufferPtr _schema_buffer; |
275 | | }; |
276 | | |
277 | | } // namespace doris |