be/src/cloud/cloud_rowset_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "cloud/cloud_rowset_writer.h" |
19 | | |
20 | | #include "common/logging.h" |
21 | | #include "common/status.h" |
22 | | #include "io/cache/block_file_cache_factory.h" |
23 | | #include "io/fs/packed_file_manager.h" |
24 | | #include "io/fs/packed_file_writer.h" |
25 | | #include "storage/rowset/rowset_factory.h" |
26 | | |
27 | | namespace doris { |
28 | | |
29 | 206k | CloudRowsetWriter::CloudRowsetWriter(CloudStorageEngine& engine) : _engine(engine) {} |
30 | | |
31 | 206k | CloudRowsetWriter::~CloudRowsetWriter() { |
32 | | // Must cancel any pending delete bitmap tasks before destruction. |
33 | | // Otherwise, the lambda in _generate_delete_bitmap may execute after the |
34 | | // CloudRowsetWriter destructor runs but before BaseBetaRowsetWriter destructor, |
35 | | // causing virtual function calls to resolve to BaseBetaRowsetWriter::_build_rowset_meta |
36 | | // instead of CloudRowsetWriter::_build_rowset_meta (use-after-free on vtable). |
37 | 206k | if (_calc_delete_bitmap_token != nullptr) { |
38 | 55.5k | _calc_delete_bitmap_token->cancel(); |
39 | 55.5k | } |
40 | 206k | } |
41 | | |
42 | 204k | Status CloudRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) { |
43 | 204k | _context = rowset_writer_context; |
44 | 204k | _rowset_meta = std::make_shared<RowsetMeta>(); |
45 | | |
46 | 204k | if (_context.is_local_rowset()) { |
47 | | // In cloud mode, this branch implies it is an intermediate rowset for external merge sort, |
48 | | // we use `global_local_filesystem` to write data to `tmp_file_dir`(see `local_segment_path`). |
49 | 3.47k | _context.tablet_path = io::FileCacheFactory::instance()->pick_one_cache_path(); |
50 | 200k | } else { |
51 | 200k | _rowset_meta->set_remote_storage_resource(*_context.storage_resource); |
52 | 200k | } |
53 | | |
54 | 204k | _rowset_meta->set_rowset_id(_context.rowset_id); |
55 | 204k | _rowset_meta->set_partition_id(_context.partition_id); |
56 | 204k | _rowset_meta->set_tablet_id(_context.tablet_id); |
57 | 204k | _rowset_meta->set_index_id(_context.index_id); |
58 | 204k | _rowset_meta->set_tablet_schema_hash(_context.tablet_schema_hash); |
59 | 204k | _rowset_meta->set_rowset_type(_context.rowset_type); |
60 | 204k | _rowset_meta->set_rowset_state(_context.rowset_state); |
61 | 204k | _rowset_meta->set_segments_overlap(_context.segments_overlap); |
62 | 204k | _rowset_meta->set_txn_id(_context.txn_id); |
63 | 204k | _rowset_meta->set_txn_expiration(_context.txn_expiration); |
64 | 204k | _rowset_meta->set_compaction_level(_context.compaction_level); |
65 | 204k | if (_context.rowset_state == PREPARED || _context.rowset_state == COMMITTED) { |
66 | 184k | _is_pending = true; |
67 | 184k | _rowset_meta->set_load_id(_context.load_id); |
68 | 184k | } else { |
69 | | // Rowset generated by compaction or schema change |
70 | 20.1k | _rowset_meta->set_version(_context.version); |
71 | 20.1k | DCHECK_NE(_context.newest_write_timestamp, -1); |
72 | 20.1k | _rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp); |
73 | 20.1k | } |
74 | 204k | _rowset_meta->set_tablet_schema(_context.tablet_schema); |
75 | 204k | _rowset_meta->set_job_id(_context.job_id); |
76 | 204k | _context.segment_collector = std::make_shared<SegmentCollectorT<BaseBetaRowsetWriter>>(this); |
77 | 204k | _context.file_writer_creator = std::make_shared<FileWriterCreatorT<BaseBetaRowsetWriter>>(this); |
78 | 204k | if (_context.mow_context != nullptr) { |
79 | 55.4k | _calc_delete_bitmap_token = _engine.calc_delete_bitmap_executor_for_load()->create_token(); |
80 | 55.4k | } |
81 | 204k | return Status::OK(); |
82 | 204k | } |
83 | | |
84 | 227k | Status CloudRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool check_segment_num) { |
85 | 18.4E | VLOG_NOTICE << "start to build rowset meta. tablet_id=" << rowset_meta->tablet_id() |
86 | 18.4E | << ", rowset_id=" << rowset_meta->rowset_id() |
87 | 18.4E | << ", check_segment_num=" << check_segment_num; |
88 | | // Call base class implementation |
89 | 227k | RETURN_IF_ERROR(BaseBetaRowsetWriter::_build_rowset_meta(rowset_meta, check_segment_num)); |
90 | | |
91 | | // Collect packed file segment index information for interim rowsets as well. |
92 | 227k | return _collect_all_packed_slice_locations(rowset_meta); |
93 | 227k | } |
94 | | |
95 | 206k | Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) { |
96 | 206k | if (_calc_delete_bitmap_token != nullptr) { |
97 | 55.4k | RETURN_IF_ERROR(_calc_delete_bitmap_token->wait()); |
98 | 55.4k | } |
99 | 206k | RETURN_IF_ERROR(_close_file_writers()); |
100 | | |
101 | | // TODO(plat1ko): check_segment_footer |
102 | | |
103 | 206k | RETURN_IF_ERROR(_build_rowset_meta(_rowset_meta.get())); |
104 | | // At this point all writers have been closed, so collecting packed file indices is safe. |
105 | 206k | RETURN_IF_ERROR(_collect_all_packed_slice_locations(_rowset_meta.get())); |
106 | | // If the current load is a partial update, new segments may be appended to the tmp rowset after the tmp rowset |
107 | | // has been committed if conflicts occur due to concurrent partial updates. However, when the recycler do recycling, |
108 | | // it will generate the paths for the segments to be recycled on the object storage based on the number of segments |
109 | | // in the rowset meta. If these newly added segments are written to the object storage and the transaction is aborted |
110 | | // due to a failure before successfully updating the rowset meta of the corresponding tmp rowset, these newly added |
111 | | // segments cannot be recycled by the recycler on the object storage. Therefore, we need a new state `BEGIN_PARTIAL_UPDATE` |
112 | | // to indicate that the recycler should use list+delete to recycle segments. After the tmp rowset's rowset meta being |
113 | | // updated successfully, the `rowset_state` will be set to `COMMITTED` and the recycler can do recycling based on the |
114 | | // number of segments in the rowset meta safely. |
115 | | // |
116 | | // rowset_state's FSM: |
117 | | // |
118 | | // transfer 0 |
119 | | // PREPARED ---------------------------> COMMITTED |
120 | | // | ^ |
121 | | // | transfer 1 | |
122 | | // | | transfer 2 |
123 | | // |--> BEGIN_PARTIAL_UPDATE ------| |
124 | | // |
125 | | // transfer 0 (PREPARED -> COMMITTED): finish writing a rowset and the rowset' meta will not be changed |
126 | | // transfer 1 (PREPARED -> BEGIN_PARTIAL_UPDATE): finish writing a rowset, but may append new segments later and the rowset's meta may be changed |
127 | | // transfer 2 (BEGIN_PARTIAL_UPDATE -> VISIBLE): finish adding new segments and the rowset' meta will not be changed, the rowset is visible to users |
128 | 206k | if (_context.partial_update_info && _context.partial_update_info->is_partial_update()) { |
129 | 4.76k | _rowset_meta->set_rowset_state(BEGIN_PARTIAL_UPDATE); |
130 | 201k | } else { |
131 | 201k | _rowset_meta->set_rowset_state(COMMITTED); |
132 | 201k | } |
133 | | |
134 | 206k | _rowset_meta->set_tablet_schema(_context.tablet_schema); |
135 | | |
136 | 206k | if (_rowset_meta->newest_write_timestamp() == -1) { |
137 | 185k | _rowset_meta->set_newest_write_timestamp(UnixSeconds()); |
138 | 185k | } |
139 | | |
140 | 206k | if (auto seg_file_size = _seg_files.segments_file_size(_segment_start_id); |
141 | 206k | !seg_file_size.has_value()) [[unlikely]] { |
142 | 0 | LOG(ERROR) << "expected segment file sizes, but none presents: " << seg_file_size.error(); |
143 | 206k | } else { |
144 | 206k | _rowset_meta->add_segments_file_size(seg_file_size.value()); |
145 | 206k | } |
146 | 206k | if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) { |
147 | 21.0k | if (auto idx_files_info = _idx_files.inverted_index_file_info(_segment_start_id); |
148 | 21.0k | !idx_files_info.has_value()) [[unlikely]] { |
149 | 0 | LOG(ERROR) << "expected inverted index files info, but none presents: " |
150 | 0 | << idx_files_info.error(); |
151 | 21.0k | } else { |
152 | 21.0k | _rowset_meta->add_inverted_index_files_info(idx_files_info.value()); |
153 | 21.0k | } |
154 | 21.0k | } |
155 | | |
156 | 206k | RETURN_NOT_OK_STATUS_WITH_WARN( |
157 | 206k | RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, _rowset_meta, |
158 | 206k | &rowset), |
159 | 206k | "rowset init failed when build new rowset"); |
160 | 206k | _already_built = true; |
161 | 206k | return Status::OK(); |
162 | 206k | } |
163 | | |
164 | 431k | Status CloudRowsetWriter::_collect_all_packed_slice_locations(RowsetMeta* rowset_meta) { |
165 | 18.4E | VLOG_NOTICE << "start to collect packed slice locations for rowset meta. tablet_id=" |
166 | 18.4E | << rowset_meta->tablet_id() << ", rowset_id=" << rowset_meta->rowset_id(); |
167 | 431k | if (!_context.packed_file_active) { |
168 | 297k | return Status::OK(); |
169 | 297k | } |
170 | | |
171 | | // Collect segment file packed indices |
172 | 134k | const auto& file_writers = _seg_files.get_file_writers(); |
173 | 134k | for (const auto& [seg_id, writer_ptr] : file_writers) { |
174 | 134k | auto segment_path = _context.segment_path(seg_id); |
175 | 134k | RETURN_IF_ERROR( |
176 | 134k | _collect_packed_slice_location(writer_ptr.get(), segment_path, rowset_meta)); |
177 | 134k | } |
178 | | |
179 | | // Collect inverted index file packed indices |
180 | 134k | const auto& idx_file_writers = _idx_files.get_file_writers(); |
181 | 134k | for (const auto& [seg_id, idx_writer_ptr] : idx_file_writers) { |
182 | 12.8k | if (idx_writer_ptr != nullptr && idx_writer_ptr->get_file_writer() != nullptr) { |
183 | 12.8k | auto segment_path = _context.segment_path(seg_id); |
184 | 12.8k | auto index_prefix_view = |
185 | 12.8k | InvertedIndexDescriptor::get_index_file_path_prefix(segment_path); |
186 | 12.8k | std::string index_path = |
187 | 12.8k | InvertedIndexDescriptor::get_index_file_path_v2(std::string(index_prefix_view)); |
188 | 12.8k | RETURN_IF_ERROR(_collect_packed_slice_location(idx_writer_ptr->get_file_writer(), |
189 | 12.8k | index_path, rowset_meta)); |
190 | 12.8k | } |
191 | 12.8k | } |
192 | | |
193 | 134k | return Status::OK(); |
194 | 134k | } |
195 | | |
196 | | Status CloudRowsetWriter::_collect_packed_slice_location(io::FileWriter* file_writer, |
197 | | const std::string& file_path, |
198 | 147k | RowsetMeta* rowset_meta) { |
199 | 18.4E | VLOG_NOTICE << "collect packed slice location for file: " << file_path; |
200 | | // Check if file writer is closed |
201 | 147k | if (file_writer->state() != io::FileWriter::State::CLOSED) { |
202 | | // Writer is still open; index will be collected after it is closed. |
203 | 2.19k | return Status::OK(); |
204 | 2.19k | } |
205 | | |
206 | | // Check if file is actually in packed file (not direct write for large files) |
207 | 144k | if (!file_writer->is_in_packed_file()) { |
208 | 594 | return Status::OK(); |
209 | 594 | } |
210 | | |
211 | | // Get packed slice location directly from PackedFileManager |
212 | 144k | io::PackedSliceLocation index; |
213 | 144k | RETURN_IF_ERROR( |
214 | 144k | io::PackedFileManager::instance()->get_packed_slice_location(file_path, &index)); |
215 | 144k | if (index.packed_file_path.empty()) { |
216 | 0 | return Status::OK(); // File not in packed file, skip |
217 | 0 | } |
218 | | |
219 | 144k | rowset_meta->add_packed_slice_location(file_path, index.packed_file_path, index.offset, |
220 | 144k | index.size, index.packed_file_size); |
221 | | LOG(INFO) << "collect packed file index: " << file_path << " -> " << index.packed_file_path |
222 | 144k | << ", offset: " << index.offset << ", size: " << index.size; |
223 | 144k | return Status::OK(); |
224 | 144k | } |
225 | | |
226 | | } // namespace doris |