be/src/load/delta_writer/delta_writer_v2.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "load/delta_writer/delta_writer_v2.h" |
19 | | |
20 | | #include <brpc/controller.h> |
21 | | #include <butil/errno.h> |
22 | | #include <fmt/format.h> |
23 | | #include <gen_cpp/internal_service.pb.h> |
24 | | #include <gen_cpp/olap_file.pb.h> |
25 | | |
26 | | #include <filesystem> |
27 | | #include <ostream> |
28 | | #include <string> |
29 | | #include <utility> |
30 | | |
31 | | #include "common/compiler_util.h" // IWYU pragma: keep |
32 | | #include "common/config.h" |
33 | | #include "common/logging.h" |
34 | | #include "common/status.h" |
35 | | #include "core/block/block.h" |
36 | | #include "exec/sink/load_stream_stub.h" |
37 | | #include "io/fs/file_writer.h" // IWYU pragma: keep |
38 | | #include "runtime/exec_env.h" |
39 | | #include "runtime/query_context.h" |
40 | | #include "service/backend_options.h" |
41 | | #include "storage/data_dir.h" |
42 | | #include "storage/index/inverted/inverted_index_desc.h" |
43 | | #include "storage/olap_define.h" |
44 | | #include "storage/rowset/beta_rowset.h" |
45 | | #include "storage/rowset/beta_rowset_writer_v2.h" |
46 | | #include "storage/rowset/rowset_meta.h" |
47 | | #include "storage/rowset/rowset_writer.h" |
48 | | #include "storage/rowset/rowset_writer_context.h" |
49 | | #include "storage/schema.h" |
50 | | #include "storage/schema_change/schema_change.h" |
51 | | #include "storage/segment/segment.h" |
52 | | #include "storage/storage_engine.h" |
53 | | #include "storage/tablet/tablet_manager.h" |
54 | | #include "storage/tablet/tablet_schema.h" |
55 | | #include "storage/tablet_info.h" |
56 | | #include "util/brpc_client_cache.h" |
57 | | #include "util/brpc_closure.h" |
58 | | #include "util/debug_points.h" |
59 | | #include "util/mem_info.h" |
60 | | #include "util/stopwatch.hpp" |
61 | | #include "util/time.h" |
62 | | |
63 | | namespace doris { |
64 | | #include "common/compile_check_begin.h" |
65 | | using namespace ErrorCode; |
66 | | |
67 | | DeltaWriterV2::DeltaWriterV2(WriteRequest* req, |
68 | | const std::vector<std::shared_ptr<LoadStreamStub>>& streams, |
69 | | RuntimeState* state) |
70 | 2 | : _state(state), |
71 | 2 | _req(*req), |
72 | 2 | _tablet_schema(new TabletSchema), |
73 | 2 | _memtable_writer(new MemTableWriter(*req)), |
74 | 2 | _streams(streams) {} |
75 | | |
76 | 0 | void DeltaWriterV2::_update_profile(RuntimeProfile* profile) { |
77 | 0 | auto child = profile->create_child(fmt::format("DeltaWriterV2 {}", _req.tablet_id), true, true); |
78 | 0 | auto write_memtable_timer = ADD_TIMER(child, "WriteMemTableTime"); |
79 | 0 | auto wait_flush_limit_timer = ADD_TIMER(child, "WaitFlushLimitTime"); |
80 | 0 | auto close_wait_timer = ADD_TIMER(child, "CloseWaitTime"); |
81 | 0 | COUNTER_SET(write_memtable_timer, _write_memtable_time); |
82 | 0 | COUNTER_SET(wait_flush_limit_timer, _wait_flush_limit_time); |
83 | 0 | COUNTER_SET(close_wait_timer, _close_wait_time); |
84 | 0 | } |
85 | | |
86 | 2 | DeltaWriterV2::~DeltaWriterV2() { |
87 | 2 | if (!_is_init) { |
88 | 2 | return; |
89 | 2 | } |
90 | | |
91 | | // cancel and wait all memtables in flush queue to be finished |
92 | 0 | static_cast<void>(_memtable_writer->cancel()); |
93 | 0 | } |
94 | | |
95 | 1 | Status DeltaWriterV2::init() { |
96 | 1 | if (_is_init) { |
97 | 0 | return Status::OK(); |
98 | 0 | } |
99 | | // build tablet schema in request level |
100 | 1 | DBUG_EXECUTE_IF("DeltaWriterV2.init.stream_size", { _streams.clear(); }); |
101 | 1 | if (_streams.size() == 0 || _streams[0]->tablet_schema(_req.index_id) == nullptr) { |
102 | 1 | return Status::InternalError("failed to find tablet schema for {}", _req.index_id); |
103 | 1 | } |
104 | 0 | RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), |
105 | 0 | *_streams[0]->tablet_schema(_req.index_id))); |
106 | 0 | RowsetWriterContext context; |
107 | 0 | context.txn_id = _req.txn_id; |
108 | 0 | context.load_id = _req.load_id; |
109 | 0 | context.index_id = _req.index_id; |
110 | 0 | context.partition_id = _req.partition_id; |
111 | 0 | context.rowset_state = PREPARED; |
112 | 0 | context.segments_overlap = OVERLAPPING; |
113 | 0 | context.tablet_schema = _tablet_schema; |
114 | 0 | context.newest_write_timestamp = UnixSeconds(); |
115 | 0 | context.tablet = nullptr; |
116 | 0 | context.write_type = DataWriteType::TYPE_DIRECT; |
117 | 0 | context.tablet_id = _req.tablet_id; |
118 | 0 | context.partition_id = _req.partition_id; |
119 | 0 | context.tablet_schema_hash = _req.schema_hash; |
120 | 0 | context.enable_unique_key_merge_on_write = _streams[0]->enable_unique_mow(_req.index_id); |
121 | 0 | context.rowset_type = RowsetTypePB::BETA_ROWSET; |
122 | 0 | context.rowset_id = ExecEnv::GetInstance()->storage_engine().next_rowset_id(); |
123 | 0 | context.data_dir = nullptr; |
124 | 0 | context.partial_update_info = _partial_update_info; |
125 | 0 | context.memtable_on_sink_support_index_v2 = true; |
126 | 0 | context.encrypt_algorithm = EncryptionAlgorithmPB::PLAINTEXT; |
127 | |
|
128 | 0 | _rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams); |
129 | 0 | RETURN_IF_ERROR(_rowset_writer->init(context)); |
130 | 0 | std::shared_ptr<WorkloadGroup> wg_sptr = nullptr; |
131 | 0 | if (_state->get_query_ctx()) { |
132 | 0 | wg_sptr = _state->get_query_ctx()->workload_group(); |
133 | 0 | } |
134 | 0 | RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, _partial_update_info, |
135 | 0 | wg_sptr, _streams[0]->enable_unique_mow(_req.index_id))); |
136 | 0 | ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer); |
137 | 0 | _is_init = true; |
138 | 0 | _streams.clear(); |
139 | 0 | return Status::OK(); |
140 | 0 | } |
141 | | |
142 | 0 | Status DeltaWriterV2::write(const Block* block, const DorisVector<uint32_t>& row_idxs) { |
143 | 0 | if (UNLIKELY(row_idxs.empty())) { |
144 | 0 | return Status::OK(); |
145 | 0 | } |
146 | 0 | _lock_watch.start(); |
147 | 0 | std::lock_guard<std::mutex> l(_lock); |
148 | 0 | _lock_watch.stop(); |
149 | 0 | if (!_is_init && !_is_cancelled) { |
150 | 0 | RETURN_IF_ERROR(init()); |
151 | 0 | } |
152 | 0 | { |
153 | 0 | SCOPED_RAW_TIMER(&_wait_flush_limit_time); |
154 | 0 | auto memtable_flush_running_count_limit = config::memtable_flush_running_count_limit; |
155 | 0 | DBUG_EXECUTE_IF("DeltaWriterV2.write.back_pressure", |
156 | 0 | { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); }); |
157 | 0 | while (_memtable_writer->flush_running_count() >= memtable_flush_running_count_limit) { |
158 | 0 | if (_state->is_cancelled()) { |
159 | 0 | return _state->cancel_reason(); |
160 | 0 | } |
161 | 0 | std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
162 | 0 | } |
163 | 0 | } |
164 | 0 | SCOPED_RAW_TIMER(&_write_memtable_time); |
165 | 0 | return _memtable_writer->write(block, row_idxs); |
166 | 0 | } |
167 | | |
168 | 1 | Status DeltaWriterV2::close() { |
169 | 1 | _lock_watch.start(); |
170 | 1 | std::lock_guard<std::mutex> l(_lock); |
171 | 1 | _lock_watch.stop(); |
172 | 1 | if (!_is_init && !_is_cancelled) { |
173 | | // if this delta writer is not initialized, but close() is called. |
174 | | // which means this tablet has no data loaded, but at least one tablet |
175 | | // in same partition has data loaded. |
176 | | // so we have to also init this DeltaWriterV2, so that it can create an empty rowset |
177 | | // for this tablet when being closed. |
178 | 1 | RETURN_IF_ERROR(init()); |
179 | 1 | } |
180 | 0 | return _memtable_writer->close(); |
181 | 1 | } |
182 | | |
183 | 0 | Status DeltaWriterV2::close_wait(int32_t& num_segments, RuntimeProfile* profile) { |
184 | 0 | SCOPED_RAW_TIMER(&_close_wait_time); |
185 | 0 | std::lock_guard<std::mutex> l(_lock); |
186 | 0 | DCHECK(_is_init) |
187 | 0 | << "delta writer is supposed be to initialized before close_wait() being called"; |
188 | |
|
189 | 0 | if (profile != nullptr) { |
190 | 0 | _update_profile(profile); |
191 | 0 | } |
192 | 0 | RETURN_IF_ERROR(_memtable_writer->close_wait(profile)); |
193 | 0 | num_segments = _rowset_writer->next_segment_id(); |
194 | |
|
195 | 0 | _delta_written_success = true; |
196 | 0 | return Status::OK(); |
197 | 0 | } |
198 | | |
199 | 0 | Status DeltaWriterV2::cancel() { |
200 | 0 | return cancel_with_status(Status::Cancelled("already cancelled")); |
201 | 0 | } |
202 | | |
203 | 0 | Status DeltaWriterV2::cancel_with_status(const Status& st) { |
204 | 0 | std::lock_guard<std::mutex> l(_lock); |
205 | 0 | if (_is_cancelled) { |
206 | 0 | return Status::OK(); |
207 | 0 | } |
208 | 0 | RETURN_IF_ERROR(_memtable_writer->cancel_with_status(st)); |
209 | 0 | _is_cancelled = true; |
210 | 0 | return Status::OK(); |
211 | 0 | } |
212 | | |
213 | | Status DeltaWriterV2::_build_current_tablet_schema(int64_t index_id, |
214 | | const OlapTableSchemaParam* table_schema_param, |
215 | 0 | const TabletSchema& ori_tablet_schema) { |
216 | 0 | _tablet_schema->copy_from(ori_tablet_schema); |
217 | | // find the right index id |
218 | 0 | int i = 0; |
219 | 0 | auto indexes = table_schema_param->indexes(); |
220 | 0 | for (; i < indexes.size(); i++) { |
221 | 0 | if (indexes[i]->index_id == index_id) { |
222 | 0 | break; |
223 | 0 | } |
224 | 0 | } |
225 | |
|
226 | 0 | if (!indexes.empty() && !indexes[i]->columns.empty() && |
227 | 0 | indexes[i]->columns[0]->unique_id() >= 0) { |
228 | 0 | _tablet_schema->build_current_tablet_schema( |
229 | 0 | index_id, static_cast<int32_t>(table_schema_param->version()), indexes[i], |
230 | 0 | ori_tablet_schema); |
231 | 0 | } |
232 | |
|
233 | 0 | _tablet_schema->set_table_id(table_schema_param->table_id()); |
234 | 0 | _tablet_schema->set_db_id(table_schema_param->db_id()); |
235 | 0 | if (table_schema_param->is_partial_update()) { |
236 | 0 | _tablet_schema->set_auto_increment_column(table_schema_param->auto_increment_coulumn()); |
237 | 0 | } |
238 | | // set partial update columns info |
239 | 0 | _partial_update_info = std::make_shared<PartialUpdateInfo>(); |
240 | 0 | RETURN_IF_ERROR(_partial_update_info->init( |
241 | 0 | _req.tablet_id, _req.txn_id, *_tablet_schema, |
242 | 0 | table_schema_param->unique_key_update_mode(), |
243 | 0 | table_schema_param->partial_update_new_key_policy(), |
244 | 0 | table_schema_param->partial_update_input_columns(), |
245 | 0 | table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(), |
246 | 0 | table_schema_param->nano_seconds(), table_schema_param->timezone(), |
247 | 0 | table_schema_param->auto_increment_coulumn())); |
248 | 0 | return Status::OK(); |
249 | 0 | } |
250 | | |
251 | | #include "common/compile_check_end.h" |
252 | | } // namespace doris |