be/src/storage/rowset/beta_rowset.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/rowset/beta_rowset.h" |
19 | | |
20 | | #include <crc32c/crc32c.h> |
21 | | #include <ctype.h> |
22 | | #include <errno.h> |
23 | | #include <fmt/format.h> |
24 | | |
25 | | #include <algorithm> |
26 | | #include <filesystem> |
27 | | #include <memory> |
28 | | #include <ostream> |
29 | | #include <utility> |
30 | | |
31 | | #include "cloud/config.h" |
32 | | #include "common/config.h" |
33 | | #include "common/logging.h" |
34 | | #include "common/metrics/doris_metrics.h" |
35 | | #include "common/status.h" |
36 | | #include "cpp/sync_point.h" |
37 | | #include "io/fs/file_reader.h" |
38 | | #include "io/fs/file_system.h" |
39 | | #include "io/fs/local_file_system.h" |
40 | | #include "io/fs/path.h" |
41 | | #include "io/fs/remote_file_system.h" |
42 | | #include "storage/index/index_file_reader.h" |
43 | | #include "storage/index/inverted/inverted_index_cache.h" |
44 | | #include "storage/index/inverted/inverted_index_desc.h" |
45 | | #include "storage/olap_common.h" |
46 | | #include "storage/olap_define.h" |
47 | | #include "storage/rowset/beta_rowset.h" |
48 | | #include "storage/rowset/beta_rowset_reader.h" |
49 | | #include "storage/rowset/rowset.h" |
50 | | #include "storage/segment/segment_loader.h" |
51 | | #include "storage/tablet/tablet_schema.h" |
52 | | #include "storage/utils.h" |
53 | | #include "util/debug_points.h" |
54 | | |
55 | | namespace doris { |
56 | | #include "common/compile_check_begin.h" |
57 | | using namespace ErrorCode; |
58 | | |
59 | | std::string BetaRowset::local_segment_path_segcompacted(const std::string& tablet_path, |
60 | | const RowsetId& rowset_id, int64_t begin, |
61 | 34 | int64_t end) { |
62 | | // {root_path}/data/{shard_id}/{tablet_id}/{schema_hash}/{rowset_id}_{begin_seg}-{end_seg}.dat |
63 | 34 | return fmt::format("{}/{}_{}-{}.dat", tablet_path, rowset_id.to_string(), begin, end); |
64 | 34 | } |
65 | | |
66 | | BetaRowset::BetaRowset(const TabletSchemaSPtr& schema, const RowsetMetaSharedPtr& rowset_meta, |
67 | | std::string tablet_path) |
68 | 1.19M | : Rowset(schema, rowset_meta, std::move(tablet_path)) {} |
69 | | |
70 | 682k | BetaRowset::~BetaRowset() = default; |
71 | | |
72 | 1.18M | Status BetaRowset::init() { |
73 | 1.18M | return Status::OK(); // no op |
74 | 1.18M | } |
75 | | |
76 | | namespace { |
77 | | Status load_segment_rows_from_footer(BetaRowsetSharedPtr rowset, |
78 | | std::vector<uint32_t>* segment_rows, bool enable_segment_cache, |
79 | 40.0k | OlapReaderStatistics* read_stats) { |
80 | 40.0k | SegmentCacheHandle segment_cache_handle; |
81 | 40.0k | RETURN_IF_ERROR(SegmentLoader::instance()->load_segments( |
82 | 40.0k | rowset, &segment_cache_handle, enable_segment_cache, false, read_stats)); |
83 | 40.0k | for (const auto& segment : segment_cache_handle.get_segments()) { |
84 | 40.0k | segment_rows->emplace_back(segment->num_rows()); |
85 | 40.0k | } |
86 | 40.0k | return Status::OK(); |
87 | 40.0k | } |
88 | | |
89 | | Status check_segment_rows_consistency(const std::vector<uint32_t>& rows_from_meta, |
90 | | const std::vector<uint32_t>& rows_from_footer, |
91 | 40.0k | int64_t tablet_id, const std::string& rowset_id) { |
92 | 40.0k | DCHECK_EQ(rows_from_footer.size(), rows_from_meta.size()); |
93 | 80.1k | for (size_t i = 0; i < rows_from_footer.size(); i++) { |
94 | 40.0k | if (rows_from_footer[i] != rows_from_meta[i]) { |
95 | 0 | auto msg = fmt::format( |
96 | 0 | "segment rows mismatch between rowset meta and segment footer. " |
97 | 0 | "segment index: {}, meta rows: {}, footer rows: {}, tablet={}, rowset={}", |
98 | 0 | i, rows_from_meta[i], rows_from_footer[i], tablet_id, rowset_id); |
99 | 0 | if (config::enable_segment_rows_check_core) { |
100 | 0 | CHECK(false) << msg; |
101 | 0 | } |
102 | 0 | return Status::InternalError(msg); |
103 | 0 | } |
104 | 40.0k | } |
105 | 40.0k | return Status::OK(); |
106 | 40.0k | } |
107 | | } // namespace |
108 | | |
109 | | Status BetaRowset::get_segment_num_rows(std::vector<uint32_t>* segment_rows, |
110 | | bool enable_segment_cache, |
111 | 1.96M | OlapReaderStatistics* read_stats) { |
112 | 1.96M | #ifndef BE_TEST |
113 | | // `ROWSET_UNLOADING` is state for closed() called but owned by some readers. |
114 | | // So here `ROWSET_UNLOADING` is allowed. |
115 | 1.96M | DCHECK_NE(_rowset_state_machine.rowset_state(), ROWSET_UNLOADED); |
116 | 1.96M | #endif |
117 | 1.96M | RETURN_IF_ERROR(_load_segment_rows_once.call([this, enable_segment_cache, read_stats] { |
118 | 1.96M | auto segment_count = num_segments(); |
119 | 1.96M | if (segment_count == 0) { |
120 | 1.96M | return Status::OK(); |
121 | 1.96M | } |
122 | | |
123 | 1.96M | if (!_rowset_meta->get_num_segment_rows().empty()) { |
124 | 1.96M | if (_rowset_meta->get_num_segment_rows().size() == segment_count) { |
125 | | // use segment rows in rowset meta if eligible |
126 | 1.96M | TEST_SYNC_POINT("BetaRowset::get_segment_num_rows:use_segment_rows_from_meta"); |
127 | 1.96M | _segments_rows.assign(_rowset_meta->get_num_segment_rows().cbegin(), |
128 | 1.96M | _rowset_meta->get_num_segment_rows().cend()); |
129 | 1.96M | if (config::enable_segment_rows_consistency_check) { |
130 | | // verify segment rows from meta match segment footer |
131 | 1.96M | std::vector<uint32_t> rows_from_footer; |
132 | 1.96M | auto self = std::dynamic_pointer_cast<BetaRowset>(shared_from_this()); |
133 | 1.96M | auto load_status = load_segment_rows_from_footer( |
134 | 1.96M | self, &rows_from_footer, enable_segment_cache, read_stats); |
135 | 1.96M | if (load_status.ok()) { |
136 | 1.96M | return check_segment_rows_consistency( |
137 | 1.96M | _segments_rows, rows_from_footer, _rowset_meta->tablet_id(), |
138 | 1.96M | _rowset_meta->rowset_id().to_string()); |
139 | 1.96M | } |
140 | 1.96M | } |
141 | 1.96M | return Status::OK(); |
142 | 1.96M | } else { |
143 | 1.96M | auto msg = fmt::format( |
144 | 1.96M | "[verbose] corrupted segment rows info in rowset meta. " |
145 | 1.96M | "segment count: {}, segment rows size: {}, tablet={}, rowset={}", |
146 | 1.96M | segment_count, _rowset_meta->get_num_segment_rows().size(), |
147 | 1.96M | _rowset_meta->tablet_id(), _rowset_meta->rowset_id().to_string()); |
148 | 1.96M | if (config::enable_segment_rows_check_core) { |
149 | 1.96M | CHECK(false) << msg; |
150 | 1.96M | } |
151 | 1.96M | LOG_EVERY_SECOND(WARNING) << msg; |
152 | 1.96M | } |
153 | 1.96M | } |
154 | 1.96M | if (config::fail_when_segment_rows_not_in_rowset_meta) { |
155 | 1.96M | CHECK(false) << "[verbose] segment rows info not found in rowset meta. tablet=" |
156 | 1.96M | << _rowset_meta->tablet_id() |
157 | 1.96M | << ", rowset=" << _rowset_meta->rowset_id().to_string() |
158 | 1.96M | << ", version=" << _rowset_meta->version() |
159 | 1.96M | << ", debug_string=" << _rowset_meta->debug_string() |
160 | 1.96M | << ", stack=" << Status::InternalError("error"); |
161 | 1.96M | } |
162 | | // otherwise, read it from segment footer |
163 | 1.96M | TEST_SYNC_POINT("BetaRowset::get_segment_num_rows:load_from_segment_footer"); |
164 | 1.96M | auto self = std::dynamic_pointer_cast<BetaRowset>(shared_from_this()); |
165 | 1.96M | return load_segment_rows_from_footer(self, &_segments_rows, enable_segment_cache, |
166 | 1.96M | read_stats); |
167 | 1.96M | })); |
168 | 1.96M | segment_rows->assign(_segments_rows.cbegin(), _segments_rows.cend()); |
169 | 1.96M | return Status::OK(); |
170 | 1.96M | } |
171 | | |
172 | 21 | Status BetaRowset::get_inverted_index_size(int64_t* index_size) { |
173 | 21 | const auto& fs = _rowset_meta->fs(); |
174 | 21 | if (!fs) { |
175 | 0 | return Status::Error<INIT_FAILED>("get fs failed, resource_id={}", |
176 | 0 | _rowset_meta->resource_id()); |
177 | 0 | } |
178 | | |
179 | 21 | if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
180 | 7 | for (const auto& index : _schema->inverted_indexes()) { |
181 | 15 | for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { |
182 | 9 | auto seg_path = DORIS_TRY(segment_path(seg_id)); |
183 | 9 | int64_t file_size = 0; |
184 | | |
185 | 9 | std::string inverted_index_file_path = |
186 | 9 | InvertedIndexDescriptor::get_index_file_path_v1( |
187 | 9 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_path), |
188 | 9 | index->index_id(), index->get_index_suffix()); |
189 | 9 | RETURN_IF_ERROR(fs->file_size(inverted_index_file_path, &file_size)); |
190 | 9 | *index_size += file_size; |
191 | 9 | } |
192 | 6 | } |
193 | 14 | } else { |
194 | 20 | for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { |
195 | 14 | auto seg_path = DORIS_TRY(segment_path(seg_id)); |
196 | 14 | int64_t file_size = 0; |
197 | | |
198 | 14 | std::string inverted_index_file_path = InvertedIndexDescriptor::get_index_file_path_v2( |
199 | 14 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)); |
200 | 14 | RETURN_IF_ERROR(fs->file_size(inverted_index_file_path, &file_size)); |
201 | 6 | *index_size += file_size; |
202 | 6 | } |
203 | 14 | } |
204 | 13 | return Status::OK(); |
205 | 21 | } |
206 | | |
207 | 232k | void BetaRowset::clear_inverted_index_cache() { |
208 | 295k | for (int i = 0; i < num_segments(); ++i) { |
209 | 63.2k | auto seg_path = segment_path(i); |
210 | 63.2k | if (!seg_path) { |
211 | 0 | continue; |
212 | 0 | } |
213 | | |
214 | 63.2k | auto index_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(*seg_path); |
215 | 720k | for (const auto& column : tablet_schema()->columns()) { |
216 | 720k | auto index_metas = tablet_schema()->inverted_indexs(*column); |
217 | 720k | for (const auto& index_meta : index_metas) { |
218 | 14.5k | auto inverted_index_file_cache_key = |
219 | 14.5k | InvertedIndexDescriptor::get_index_file_cache_key( |
220 | 14.5k | index_path_prefix, index_meta->index_id(), |
221 | 14.5k | index_meta->get_index_suffix()); |
222 | 14.5k | (void)segment_v2::InvertedIndexSearcherCache::instance()->erase( |
223 | 14.5k | inverted_index_file_cache_key); |
224 | 14.5k | } |
225 | 720k | } |
226 | 63.2k | } |
227 | 232k | } |
228 | | |
229 | 51 | Status BetaRowset::get_segments_size(std::vector<size_t>* segments_size) { |
230 | 51 | auto fs = _rowset_meta->fs(); |
231 | 51 | if (!fs) { |
232 | 0 | return Status::Error<INIT_FAILED>("get fs failed, resource_id={}", |
233 | 0 | _rowset_meta->resource_id()); |
234 | 0 | } |
235 | | |
236 | 109 | for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { |
237 | 59 | auto seg_path = DORIS_TRY(segment_path(seg_id)); |
238 | 59 | int64_t file_size; |
239 | 59 | RETURN_IF_ERROR(fs->file_size(seg_path, &file_size)); |
240 | 58 | segments_size->push_back(file_size); |
241 | 58 | } |
242 | 50 | return Status::OK(); |
243 | 51 | } |
244 | | |
245 | 106k | Status BetaRowset::load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments) { |
246 | 106k | return load_segments(0, num_segments(), segments); |
247 | 106k | } |
248 | | |
249 | | Status BetaRowset::load_segments(int64_t seg_id_begin, int64_t seg_id_end, |
250 | 127k | std::vector<segment_v2::SegmentSharedPtr>* segments) { |
251 | 127k | int64_t seg_id = seg_id_begin; |
252 | 223k | while (seg_id < seg_id_end) { |
253 | 96.1k | std::shared_ptr<segment_v2::Segment> segment; |
254 | 96.1k | RETURN_IF_ERROR(load_segment(seg_id, nullptr, &segment)); |
255 | 96.1k | segments->push_back(std::move(segment)); |
256 | 96.1k | seg_id++; |
257 | 96.1k | } |
258 | 127k | return Status::OK(); |
259 | 127k | } |
260 | | |
261 | | Status BetaRowset::load_segment(int64_t seg_id, OlapReaderStatistics* stats, |
262 | 1.86M | segment_v2::SegmentSharedPtr* segment) { |
263 | 1.86M | auto fs = _rowset_meta->fs(); |
264 | 1.86M | if (!fs) { |
265 | 1 | return Status::Error<INIT_FAILED>("get fs failed"); |
266 | 1 | } |
267 | | |
268 | 1.86M | DCHECK(seg_id >= 0); |
269 | 1.86M | auto seg_path = DORIS_TRY(segment_path(seg_id)); |
270 | 1.86M | io::FileReaderOptions reader_options { |
271 | 1.86M | .cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE |
272 | 1.86M | : io::FileCachePolicy::NO_CACHE, |
273 | 1.86M | .is_doris_table = true, |
274 | 1.86M | .cache_base_path = "", |
275 | 1.86M | .file_size = _rowset_meta->segment_file_size(static_cast<int>(seg_id)), |
276 | 1.86M | .tablet_id = _rowset_meta->tablet_id(), |
277 | 1.86M | }; |
278 | | |
279 | 1.86M | auto s = segment_v2::Segment::open( |
280 | 1.86M | fs, seg_path, _rowset_meta->tablet_id(), static_cast<uint32_t>(seg_id), rowset_id(), |
281 | 1.86M | _schema, reader_options, segment, |
282 | 1.86M | _rowset_meta->inverted_index_file_info(static_cast<int>(seg_id)), stats); |
283 | 1.86M | if (!s.ok()) { |
284 | 5 | LOG(WARNING) << "failed to open segment. " << seg_path << " under rowset " << rowset_id() |
285 | 5 | << " : " << s.to_string(); |
286 | 5 | return s; |
287 | 5 | } |
288 | 1.86M | return Status::OK(); |
289 | 1.86M | } |
290 | | |
291 | 4.38M | Status BetaRowset::create_reader(RowsetReaderSharedPtr* result) { |
292 | | // NOTE: We use std::static_pointer_cast for performance |
293 | 4.38M | result->reset(new BetaRowsetReader(std::static_pointer_cast<BetaRowset>(shared_from_this()))); |
294 | 4.38M | return Status::OK(); |
295 | 4.38M | } |
296 | | |
297 | 12.0k | Status BetaRowset::remove() { |
298 | 12.0k | if (!is_local()) { |
299 | 0 | DCHECK(false) << _rowset_meta->tablet_id() << ' ' << rowset_id(); |
300 | 0 | return Status::OK(); |
301 | 0 | } |
302 | | |
303 | | // TODO should we close and remove all segment reader first? |
304 | 12.0k | VLOG_NOTICE << "begin to remove files in rowset " << rowset_id() |
305 | 5 | << ", version:" << start_version() << "-" << end_version() |
306 | 5 | << ", tabletid:" << _rowset_meta->tablet_id(); |
307 | | // If the rowset was removed, it need to remove the fds in segment cache directly |
308 | 12.0k | clear_cache(); |
309 | | |
310 | 12.0k | bool success = true; |
311 | 12.0k | Status st; |
312 | 12.0k | const auto& fs = io::global_local_filesystem(); |
313 | 15.3k | for (int i = 0; i < num_segments(); ++i) { |
314 | 3.30k | auto seg_path = local_segment_path(_tablet_path, rowset_id().to_string(), i); |
315 | 3.30k | LOG(INFO) << "deleting " << seg_path; |
316 | 3.30k | st = fs->delete_file(seg_path); |
317 | 3.30k | if (!st.ok()) { |
318 | 0 | LOG(WARNING) << st.to_string(); |
319 | 0 | success = false; |
320 | 0 | } |
321 | | |
322 | 3.30k | if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
323 | 0 | for (const auto& column : _schema->columns()) { |
324 | 0 | auto index_metas = _schema->inverted_indexs(*column); |
325 | 0 | for (const auto& index_meta : index_metas) { |
326 | 0 | std::string inverted_index_file = |
327 | 0 | InvertedIndexDescriptor::get_index_file_path_v1( |
328 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_path), |
329 | 0 | index_meta->index_id(), index_meta->get_index_suffix()); |
330 | 0 | st = fs->delete_file(inverted_index_file); |
331 | 0 | if (!st.ok()) { |
332 | 0 | LOG(WARNING) << st.to_string(); |
333 | 0 | success = false; |
334 | 0 | } |
335 | 0 | } |
336 | 0 | } |
337 | 3.30k | } else { |
338 | 3.30k | if (_schema->has_inverted_index() || _schema->has_ann_index()) { |
339 | 86 | std::string inverted_index_file = InvertedIndexDescriptor::get_index_file_path_v2( |
340 | 86 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)); |
341 | 86 | st = fs->delete_file(inverted_index_file); |
342 | 86 | if (!st.ok()) { |
343 | 0 | LOG(WARNING) << st.to_string(); |
344 | 0 | success = false; |
345 | 0 | } |
346 | 86 | } |
347 | 3.30k | } |
348 | 3.30k | } |
349 | 12.0k | if (!success) { |
350 | 0 | return Status::Error<ROWSET_DELETE_FILE_FAILED>("failed to remove files in rowset {}", |
351 | 0 | rowset_id().to_string()); |
352 | 0 | } |
353 | 12.0k | return Status::OK(); |
354 | 12.0k | } |
355 | | |
356 | 1 | void BetaRowset::do_close() { |
357 | | // do nothing. |
358 | 1 | } |
359 | | |
360 | | Status BetaRowset::link_files_to(const std::string& dir, RowsetId new_rowset_id, |
361 | | size_t new_rowset_start_seg_id, |
362 | 59 | std::set<int64_t>* without_index_uids) { |
363 | 59 | if (!is_local()) { |
364 | 0 | DCHECK(false) << _rowset_meta->tablet_id() << ' ' << rowset_id(); |
365 | 0 | return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}", |
366 | 0 | _rowset_meta->tablet_id(), rowset_id().to_string()); |
367 | 0 | } |
368 | | |
369 | 59 | const auto& local_fs = io::global_local_filesystem(); |
370 | 59 | Status status; |
371 | 59 | std::vector<std::string> linked_success_files; |
372 | 59 | Defer remove_linked_files {[&]() { // clear linked files if errors happen |
373 | 59 | if (!status.ok()) { |
374 | 0 | LOG(WARNING) << "will delete linked success files due to error " << status; |
375 | 0 | std::vector<io::Path> paths; |
376 | 0 | for (auto& file : linked_success_files) { |
377 | 0 | paths.emplace_back(file); |
378 | 0 | LOG(WARNING) << "will delete linked success file " << file << " due to error"; |
379 | 0 | } |
380 | 0 | static_cast<void>(local_fs->batch_delete(paths)); |
381 | 0 | LOG(WARNING) << "done delete linked success files due to error " << status; |
382 | 0 | } |
383 | 59 | }}; |
384 | | |
385 | 120 | for (int i = 0; i < num_segments(); ++i) { |
386 | 61 | auto dst_path = |
387 | 61 | local_segment_path(dir, new_rowset_id.to_string(), i + new_rowset_start_seg_id); |
388 | 61 | bool dst_path_exist = false; |
389 | 61 | if (!local_fs->exists(dst_path, &dst_path_exist).ok() || dst_path_exist) { |
390 | 0 | status = Status::Error<FILE_ALREADY_EXIST>( |
391 | 0 | "failed to create hard link, file already exist: {}", dst_path); |
392 | 0 | return status; |
393 | 0 | } |
394 | 61 | auto src_path = local_segment_path(_tablet_path, rowset_id().to_string(), i); |
395 | | // TODO(lingbin): how external storage support link? |
396 | | // use copy? or keep refcount to avoid being delete? |
397 | 61 | if (!local_fs->link_file(src_path, dst_path).ok()) { |
398 | 0 | status = Status::Error<OS_ERROR>("fail to create hard link. from={}, to={}, errno={}", |
399 | 0 | src_path, dst_path, Errno::no()); |
400 | 0 | return status; |
401 | 0 | } |
402 | 61 | linked_success_files.push_back(dst_path); |
403 | 61 | DBUG_EXECUTE_IF("fault_inject::BetaRowset::link_files_to::_link_inverted_index_file", { |
404 | 61 | status = Status::Error<OS_ERROR>("fault_inject link_file error"); |
405 | 61 | return status; |
406 | 61 | }); |
407 | 61 | if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
408 | 40 | for (const auto& index : _schema->inverted_indexes()) { |
409 | 9 | auto index_id = index->index_id(); |
410 | 9 | if (without_index_uids != nullptr && without_index_uids->count(index_id)) { |
411 | 1 | continue; |
412 | 1 | } |
413 | 8 | std::string inverted_index_src_file_path = |
414 | 8 | InvertedIndexDescriptor::get_index_file_path_v1( |
415 | 8 | InvertedIndexDescriptor::get_index_file_path_prefix(src_path), |
416 | 8 | index_id, index->get_index_suffix()); |
417 | 8 | std::string inverted_index_dst_file_path = |
418 | 8 | InvertedIndexDescriptor::get_index_file_path_v1( |
419 | 8 | InvertedIndexDescriptor::get_index_file_path_prefix(dst_path), |
420 | 8 | index_id, index->get_index_suffix()); |
421 | 8 | bool index_file_exists = true; |
422 | 8 | RETURN_IF_ERROR(local_fs->exists(inverted_index_src_file_path, &index_file_exists)); |
423 | 8 | if (index_file_exists) { |
424 | 8 | DBUG_EXECUTE_IF( |
425 | 8 | "fault_inject::BetaRowset::link_files_to::_link_inverted_index_file", { |
426 | 8 | status = Status::Error<OS_ERROR>( |
427 | 8 | "fault_inject link_file error from={}, to={}", |
428 | 8 | inverted_index_src_file_path, inverted_index_dst_file_path); |
429 | 8 | return status; |
430 | 8 | }); |
431 | 8 | if (!local_fs->link_file(inverted_index_src_file_path, |
432 | 8 | inverted_index_dst_file_path) |
433 | 8 | .ok()) { |
434 | 0 | status = Status::Error<OS_ERROR>( |
435 | 0 | "fail to create hard link. from={}, to={}, errno={}", |
436 | 0 | inverted_index_src_file_path, inverted_index_dst_file_path, |
437 | 0 | Errno::no()); |
438 | 0 | return status; |
439 | 0 | } |
440 | 8 | linked_success_files.push_back(inverted_index_dst_file_path); |
441 | 8 | LOG(INFO) << "success to create hard link. from=" |
442 | 8 | << inverted_index_src_file_path << ", " |
443 | 8 | << "to=" << inverted_index_dst_file_path; |
444 | 8 | } else { |
445 | 0 | LOG(WARNING) << "skip create hard link to not existed index file=" |
446 | 0 | << inverted_index_src_file_path; |
447 | 0 | } |
448 | 8 | } |
449 | 40 | } else { |
450 | 21 | if ((_schema->has_inverted_index() || _schema->has_ann_index()) && |
451 | 21 | (without_index_uids == nullptr || without_index_uids->empty())) { |
452 | 0 | std::string inverted_index_file_src = |
453 | 0 | InvertedIndexDescriptor::get_index_file_path_v2( |
454 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(src_path)); |
455 | 0 | std::string inverted_index_file_dst = |
456 | 0 | InvertedIndexDescriptor::get_index_file_path_v2( |
457 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(dst_path)); |
458 | 0 | bool index_dst_path_exist = false; |
459 | |
|
460 | 0 | if (!local_fs->exists(inverted_index_file_dst, &index_dst_path_exist).ok() || |
461 | 0 | index_dst_path_exist) { |
462 | 0 | status = Status::Error<FILE_ALREADY_EXIST>( |
463 | 0 | "failed to create hard link, file already exist: {}", |
464 | 0 | inverted_index_file_dst); |
465 | 0 | return status; |
466 | 0 | } |
467 | 0 | if (!local_fs->link_file(inverted_index_file_src, inverted_index_file_dst).ok()) { |
468 | 0 | status = Status::Error<OS_ERROR>( |
469 | 0 | "fail to create hard link. from={}, to={}, errno={}", |
470 | 0 | inverted_index_file_src, inverted_index_file_dst, Errno::no()); |
471 | 0 | return status; |
472 | 0 | } |
473 | 0 | linked_success_files.push_back(inverted_index_file_dst); |
474 | 0 | } |
475 | 21 | } |
476 | 61 | } |
477 | 59 | return Status::OK(); |
478 | 59 | } |
479 | | |
480 | 4 | Status BetaRowset::copy_files_to(const std::string& dir, const RowsetId& new_rowset_id) { |
481 | 4 | if (!is_local()) { |
482 | 0 | DCHECK(false) << _rowset_meta->tablet_id() << ' ' << rowset_id(); |
483 | 0 | return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}", |
484 | 0 | _rowset_meta->tablet_id(), rowset_id().to_string()); |
485 | 0 | } |
486 | | |
487 | 4 | bool exists = false; |
488 | 4 | for (int i = 0; i < num_segments(); ++i) { |
489 | 0 | auto dst_path = local_segment_path(dir, new_rowset_id.to_string(), i); |
490 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(dst_path, &exists)); |
491 | 0 | if (exists) { |
492 | 0 | return Status::Error<FILE_ALREADY_EXIST>("file already exist: {}", dst_path); |
493 | 0 | } |
494 | 0 | auto src_path = local_segment_path(_tablet_path, rowset_id().to_string(), i); |
495 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->copy_path(src_path, dst_path)); |
496 | 0 | if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
497 | 0 | for (const auto& column : _schema->columns()) { |
498 | | // if (column.has_inverted_index()) { |
499 | 0 | auto index_metas = _schema->inverted_indexs(*column); |
500 | 0 | for (const auto& index_meta : index_metas) { |
501 | 0 | std::string inverted_index_src_file_path = |
502 | 0 | InvertedIndexDescriptor::get_index_file_path_v1( |
503 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(src_path), |
504 | 0 | index_meta->index_id(), index_meta->get_index_suffix()); |
505 | 0 | std::string inverted_index_dst_file_path = |
506 | 0 | InvertedIndexDescriptor::get_index_file_path_v1( |
507 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(dst_path), |
508 | 0 | index_meta->index_id(), index_meta->get_index_suffix()); |
509 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->copy_path( |
510 | 0 | inverted_index_src_file_path, inverted_index_dst_file_path)); |
511 | 0 | LOG(INFO) << "success to copy file. from=" << inverted_index_src_file_path |
512 | 0 | << ", " |
513 | 0 | << "to=" << inverted_index_dst_file_path; |
514 | 0 | } |
515 | 0 | } |
516 | 0 | } else { |
517 | 0 | if (_schema->has_inverted_index() || _schema->has_ann_index()) { |
518 | 0 | std::string inverted_index_src_file = |
519 | 0 | InvertedIndexDescriptor::get_index_file_path_v2( |
520 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(src_path)); |
521 | 0 | std::string inverted_index_dst_file = |
522 | 0 | InvertedIndexDescriptor::get_index_file_path_v2( |
523 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(dst_path)); |
524 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->copy_path(inverted_index_src_file, |
525 | 0 | inverted_index_dst_file)); |
526 | 0 | LOG(INFO) << "success to copy file. from=" << inverted_index_src_file << ", " |
527 | 0 | << "to=" << inverted_index_dst_file; |
528 | 0 | } |
529 | 0 | } |
530 | 0 | } |
531 | 4 | return Status::OK(); |
532 | 4 | } |
533 | | |
534 | 5 | Status BetaRowset::upload_to(const StorageResource& dest_fs, const RowsetId& new_rowset_id) { |
535 | 5 | if (!is_local()) { |
536 | 0 | DCHECK(false) << _rowset_meta->tablet_id() << ' ' << rowset_id(); |
537 | 0 | return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}", |
538 | 0 | _rowset_meta->tablet_id(), rowset_id().to_string()); |
539 | 0 | } |
540 | | |
541 | 5 | if (num_segments() < 1) { |
542 | 3 | return Status::OK(); |
543 | 3 | } |
544 | 2 | std::vector<io::Path> local_paths; |
545 | 2 | local_paths.reserve(num_segments()); |
546 | 2 | std::vector<io::Path> dest_paths; |
547 | 2 | dest_paths.reserve(num_segments()); |
548 | 4 | for (int i = 0; i < num_segments(); ++i) { |
549 | | // Note: Here we use relative path for remote. |
550 | 2 | auto remote_seg_path = dest_fs.remote_segment_path(_rowset_meta->tablet_id(), |
551 | 2 | new_rowset_id.to_string(), i); |
552 | 2 | auto local_seg_path = local_segment_path(_tablet_path, rowset_id().to_string(), i); |
553 | 2 | dest_paths.emplace_back(remote_seg_path); |
554 | 2 | local_paths.emplace_back(local_seg_path); |
555 | 2 | if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
556 | 0 | for (const auto& column : _schema->columns()) { |
557 | | // if (column.has_inverted_index()) { |
558 | 0 | auto index_metas = _schema->inverted_indexs(*column); |
559 | 0 | for (const auto& index_meta : index_metas) { |
560 | 0 | std::string remote_inverted_index_file = |
561 | 0 | InvertedIndexDescriptor::get_index_file_path_v1( |
562 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix( |
563 | 0 | remote_seg_path), |
564 | 0 | index_meta->index_id(), index_meta->get_index_suffix()); |
565 | 0 | std::string local_inverted_index_file = |
566 | 0 | InvertedIndexDescriptor::get_index_file_path_v1( |
567 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix( |
568 | 0 | local_seg_path), |
569 | 0 | index_meta->index_id(), index_meta->get_index_suffix()); |
570 | 0 | dest_paths.emplace_back(remote_inverted_index_file); |
571 | 0 | local_paths.emplace_back(local_inverted_index_file); |
572 | 0 | } |
573 | 0 | } |
574 | 2 | } else { |
575 | 2 | if (_schema->has_inverted_index() || _schema->has_ann_index()) { |
576 | 0 | std::string remote_inverted_index_file = |
577 | 0 | InvertedIndexDescriptor::get_index_file_path_v2( |
578 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix( |
579 | 0 | remote_seg_path)); |
580 | 0 | std::string local_inverted_index_file = |
581 | 0 | InvertedIndexDescriptor::get_index_file_path_v2( |
582 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix( |
583 | 0 | local_seg_path)); |
584 | 0 | dest_paths.emplace_back(remote_inverted_index_file); |
585 | 0 | local_paths.emplace_back(local_inverted_index_file); |
586 | 0 | } |
587 | 2 | } |
588 | 2 | } |
589 | 2 | auto st = dest_fs.fs->batch_upload(local_paths, dest_paths); |
590 | 2 | if (st.ok()) { |
591 | 2 | DorisMetrics::instance()->upload_rowset_count->increment(1); |
592 | 2 | DorisMetrics::instance()->upload_total_byte->increment(total_disk_size()); |
593 | 2 | } else { |
594 | 0 | DorisMetrics::instance()->upload_fail_count->increment(1); |
595 | 0 | } |
596 | 2 | return st; |
597 | 5 | } |
598 | | |
599 | 0 | Status BetaRowset::check_file_exist() { |
600 | 0 | const auto& fs = _rowset_meta->fs(); |
601 | 0 | if (!fs) { |
602 | 0 | return Status::InternalError("fs is not initialized, resource_id={}", |
603 | 0 | _rowset_meta->resource_id()); |
604 | 0 | } |
605 | | |
606 | 0 | for (int i = 0; i < num_segments(); ++i) { |
607 | 0 | auto seg_path = DORIS_TRY(segment_path(i)); |
608 | 0 | bool seg_file_exist = false; |
609 | 0 | RETURN_IF_ERROR(fs->exists(seg_path, &seg_file_exist)); |
610 | 0 | if (!seg_file_exist) { |
611 | 0 | return Status::InternalError("data file not existed: {}, rowset_id={}", seg_path, |
612 | 0 | rowset_id().to_string()); |
613 | 0 | } |
614 | 0 | } |
615 | | |
616 | 0 | return Status::OK(); |
617 | 0 | } |
618 | | |
619 | 0 | Status BetaRowset::check_current_rowset_segment() { |
620 | 0 | const auto& fs = _rowset_meta->fs(); |
621 | 0 | if (!fs) { |
622 | 0 | return Status::InternalError("fs is not initialized, resource_id={}", |
623 | 0 | _rowset_meta->resource_id()); |
624 | 0 | } |
625 | | |
626 | 0 | for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { |
627 | 0 | auto seg_path = DORIS_TRY(segment_path(seg_id)); |
628 | |
|
629 | 0 | std::shared_ptr<segment_v2::Segment> segment; |
630 | 0 | io::FileReaderOptions reader_options { |
631 | 0 | .cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE |
632 | 0 | : io::FileCachePolicy::NO_CACHE, |
633 | 0 | .is_doris_table = true, |
634 | 0 | .cache_base_path {}, |
635 | 0 | .file_size = _rowset_meta->segment_file_size(seg_id), |
636 | 0 | }; |
637 | |
|
638 | 0 | auto s = segment_v2::Segment::open(fs, seg_path, _rowset_meta->tablet_id(), seg_id, |
639 | 0 | rowset_id(), _schema, reader_options, &segment, |
640 | 0 | _rowset_meta->inverted_index_file_info(seg_id)); |
641 | 0 | if (!s.ok()) { |
642 | 0 | LOG(WARNING) << "segment can not be opened. file=" << seg_path; |
643 | 0 | return s; |
644 | 0 | } |
645 | 0 | } |
646 | | |
647 | 0 | return Status::OK(); |
648 | 0 | } |
649 | | |
650 | 4 | Status BetaRowset::add_to_binlog() { |
651 | | // FIXME(Drogon): not only local file system |
652 | 4 | if (!is_local()) { |
653 | 0 | DCHECK(false) << _rowset_meta->tablet_id() << ' ' << rowset_id(); |
654 | 0 | return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}", |
655 | 0 | _rowset_meta->tablet_id(), rowset_id().to_string()); |
656 | 0 | } |
657 | | |
658 | 4 | const auto& fs = io::global_local_filesystem(); |
659 | 4 | auto segments_num = num_segments(); |
660 | 4 | VLOG_DEBUG << fmt::format("add rowset to binlog. rowset_id={}, segments_num={}", |
661 | 0 | rowset_id().to_string(), segments_num); |
662 | | |
663 | 4 | Status status; |
664 | 4 | std::vector<std::string> linked_success_files; |
665 | 4 | Defer remove_linked_files {[&]() { // clear linked files if errors happen |
666 | 4 | if (!status.ok()) { |
667 | 0 | LOG(WARNING) << "will delete linked success files due to error " |
668 | 0 | << status.to_string_no_stack(); |
669 | 0 | std::vector<io::Path> paths; |
670 | 0 | for (auto& file : linked_success_files) { |
671 | 0 | paths.emplace_back(file); |
672 | 0 | LOG(WARNING) << "will delete linked success file " << file << " due to error"; |
673 | 0 | } |
674 | 0 | static_cast<void>(fs->batch_delete(paths)); |
675 | 0 | LOG(WARNING) << "done delete linked success files due to error " |
676 | 0 | << status.to_string_no_stack(); |
677 | 0 | } |
678 | 4 | }}; |
679 | | |
680 | | // The publish_txn might fail even if the add_to_binlog success, so we need to check |
681 | | // whether a file already exists before linking. |
682 | 4 | auto errno_is_file_exists = []() { return Errno::no() == EEXIST; }; |
683 | | |
684 | | // all segments are in the same directory, so cache binlog_dir without multi times check |
685 | 4 | std::string binlog_dir; |
686 | 6 | for (int i = 0; i < segments_num; ++i) { |
687 | 2 | auto seg_file = local_segment_path(_tablet_path, rowset_id().to_string(), i); |
688 | | |
689 | 2 | if (binlog_dir.empty()) { |
690 | 2 | binlog_dir = std::filesystem::path(seg_file).parent_path().append("_binlog").string(); |
691 | | |
692 | 2 | bool exists = true; |
693 | 2 | RETURN_IF_ERROR(fs->exists(binlog_dir, &exists)); |
694 | 2 | if (!exists) { |
695 | 2 | RETURN_IF_ERROR(fs->create_directory(binlog_dir)); |
696 | 2 | } |
697 | 2 | } |
698 | | |
699 | 2 | auto binlog_file = |
700 | 2 | (std::filesystem::path(binlog_dir) / std::filesystem::path(seg_file).filename()) |
701 | 2 | .string(); |
702 | 2 | VLOG_DEBUG << "link " << seg_file << " to " << binlog_file; |
703 | 2 | if (!fs->link_file(seg_file, binlog_file).ok() && !errno_is_file_exists()) { |
704 | 0 | status = Status::Error<OS_ERROR>("fail to create hard link. from={}, to={}, errno={}", |
705 | 0 | seg_file, binlog_file, Errno::no()); |
706 | 0 | return status; |
707 | 0 | } |
708 | 2 | linked_success_files.push_back(binlog_file); |
709 | | |
710 | 2 | if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
711 | 2 | for (const auto& index : _schema->inverted_indexes()) { |
712 | 2 | auto index_id = index->index_id(); |
713 | 2 | auto index_file = InvertedIndexDescriptor::get_index_file_path_v1( |
714 | 2 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_file), index_id, |
715 | 2 | index->get_index_suffix()); |
716 | 2 | auto binlog_index_file = (std::filesystem::path(binlog_dir) / |
717 | 2 | std::filesystem::path(index_file).filename()) |
718 | 2 | .string(); |
719 | 2 | VLOG_DEBUG << "link " << index_file << " to " << binlog_index_file; |
720 | 2 | if (!fs->link_file(index_file, binlog_index_file).ok() && !errno_is_file_exists()) { |
721 | 0 | status = Status::Error<OS_ERROR>( |
722 | 0 | "fail to create hard link. from={}, to={}, errno={}", index_file, |
723 | 0 | binlog_index_file, Errno::no()); |
724 | 0 | return status; |
725 | 0 | } |
726 | 2 | linked_success_files.push_back(binlog_index_file); |
727 | 2 | } |
728 | 1 | } else { |
729 | 1 | if (_schema->has_inverted_index() || _schema->has_ann_index()) { |
730 | 1 | auto index_file = InvertedIndexDescriptor::get_index_file_path_v2( |
731 | 1 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_file)); |
732 | 1 | auto binlog_index_file = (std::filesystem::path(binlog_dir) / |
733 | 1 | std::filesystem::path(index_file).filename()) |
734 | 1 | .string(); |
735 | 1 | VLOG_DEBUG << "link " << index_file << " to " << binlog_index_file; |
736 | 1 | if (!fs->link_file(index_file, binlog_index_file).ok() && !errno_is_file_exists()) { |
737 | 0 | status = Status::Error<OS_ERROR>( |
738 | 0 | "fail to create hard link. from={}, to={}, errno={}", index_file, |
739 | 0 | binlog_index_file, Errno::no()); |
740 | 0 | return status; |
741 | 0 | } |
742 | 1 | linked_success_files.push_back(binlog_index_file); |
743 | 1 | } |
744 | 1 | } |
745 | 2 | } |
746 | | |
747 | 4 | return Status::OK(); |
748 | 4 | } |
749 | | |
750 | 10 | Status BetaRowset::calc_file_crc(uint32_t* crc_value, int64_t* file_count) { |
751 | 10 | const auto& fs = _rowset_meta->fs(); |
752 | 10 | DBUG_EXECUTE_IF("fault_inject::BetaRowset::calc_file_crc", |
753 | 10 | { return Status::Error<OS_ERROR>("fault_inject calc_file_crc error"); }); |
754 | 10 | if (num_segments() < 1) { |
755 | 2 | *crc_value = 0x92a8fc17; // magic code from crc32c table |
756 | 2 | return Status::OK(); |
757 | 2 | } |
758 | | |
759 | | // 1. pick up all the files including dat file and idx file |
760 | 8 | std::vector<io::Path> file_paths; |
761 | 16 | for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { |
762 | 8 | auto seg_path = DORIS_TRY(segment_path(seg_id)); |
763 | 8 | file_paths.emplace_back(seg_path); |
764 | 8 | if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
765 | 0 | for (const auto& column : _schema->columns()) { |
766 | 0 | auto index_metas = _schema->inverted_indexs(*column); |
767 | 0 | for (const auto& index_meta : index_metas) { |
768 | 0 | std::string inverted_index_file = |
769 | 0 | InvertedIndexDescriptor::get_index_file_path_v1( |
770 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_path), |
771 | 0 | index_meta->index_id(), index_meta->get_index_suffix()); |
772 | 0 | file_paths.emplace_back(std::move(inverted_index_file)); |
773 | 0 | } |
774 | 0 | } |
775 | 8 | } else { |
776 | 8 | if (_schema->has_inverted_index() || _schema->has_ann_index()) { |
777 | 8 | std::string inverted_index_file = InvertedIndexDescriptor::get_index_file_path_v2( |
778 | 8 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)); |
779 | 8 | file_paths.emplace_back(std::move(inverted_index_file)); |
780 | 8 | } |
781 | 8 | } |
782 | 8 | } |
783 | 8 | *crc_value = 0; |
784 | 8 | *file_count = file_paths.size(); |
785 | 8 | if (!is_local()) { |
786 | 8 | return Status::OK(); |
787 | 8 | } |
788 | | |
789 | | // 2. calculate the md5sum of each file |
790 | 0 | const auto& local_fs = io::global_local_filesystem(); |
791 | 0 | DCHECK(!file_paths.empty()); |
792 | 0 | std::vector<std::string> all_file_md5; |
793 | 0 | all_file_md5.reserve(file_paths.size()); |
794 | 0 | for (const auto& file_path : file_paths) { |
795 | 0 | std::string file_md5sum; |
796 | 0 | auto status = local_fs->md5sum(file_path, &file_md5sum); |
797 | 0 | if (!status.ok()) { |
798 | 0 | return status; |
799 | 0 | } |
800 | 0 | VLOG_CRITICAL << fmt::format("calc file_md5sum finished. file_path={}, md5sum={}", |
801 | 0 | file_path.string(), file_md5sum); |
802 | 0 | all_file_md5.emplace_back(std::move(file_md5sum)); |
803 | 0 | } |
804 | 0 | std::sort(all_file_md5.begin(), all_file_md5.end()); |
805 | | |
806 | | // 3. calculate the crc_value based on all_file_md5 |
807 | 0 | DCHECK(file_paths.size() == all_file_md5.size()); |
808 | 0 | for (auto& i : all_file_md5) { |
809 | 0 | *crc_value = crc32c::Extend(*crc_value, (const uint8_t*)i.data(), i.size()); |
810 | 0 | } |
811 | |
|
812 | 0 | return Status::OK(); |
813 | 0 | } |
814 | | |
815 | | Status BetaRowset::show_nested_index_file(rapidjson::Value* rowset_value, |
816 | 147 | rapidjson::Document::AllocatorType& allocator) { |
817 | 147 | const auto& fs = _rowset_meta->fs(); |
818 | 147 | auto storage_format = _schema->get_inverted_index_storage_format(); |
819 | 147 | std::string format_str; |
820 | 147 | switch (storage_format) { |
821 | 7 | case InvertedIndexStorageFormatPB::V1: |
822 | 7 | format_str = "V1"; |
823 | 7 | break; |
824 | 71 | case InvertedIndexStorageFormatPB::V2: |
825 | 71 | format_str = "V2"; |
826 | 71 | break; |
827 | 69 | case InvertedIndexStorageFormatPB::V3: |
828 | 69 | format_str = "V3"; |
829 | 69 | break; |
830 | 0 | default: |
831 | 0 | return Status::InternalError("inverted index storage format error"); |
832 | 0 | break; |
833 | 147 | } |
834 | 147 | auto rs_id = rowset_id().to_string(); |
835 | 147 | rowset_value->AddMember("rowset_id", rapidjson::Value(rs_id.c_str(), allocator), allocator); |
836 | 147 | rowset_value->AddMember("index_storage_format", rapidjson::Value(format_str.c_str(), allocator), |
837 | 147 | allocator); |
838 | 147 | rapidjson::Value segments(rapidjson::kArrayType); |
839 | 221 | for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { |
840 | 75 | rapidjson::Value segment(rapidjson::kObjectType); |
841 | 75 | segment.AddMember("segment_id", rapidjson::Value(seg_id).Move(), allocator); |
842 | | |
843 | 75 | auto seg_path = DORIS_TRY(segment_path(seg_id)); |
844 | 75 | auto index_file_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(seg_path); |
845 | 75 | auto index_file_reader = std::make_unique<IndexFileReader>( |
846 | 75 | fs, std::string(index_file_path_prefix), storage_format); |
847 | 75 | RETURN_IF_ERROR(index_file_reader->init()); |
848 | 74 | auto dirs = index_file_reader->get_all_directories(); |
849 | | |
850 | 74 | auto add_file_info_to_json = [&](const std::string& path, |
851 | 80 | rapidjson::Value& json_value) -> Status { |
852 | 80 | json_value.AddMember("idx_file_path", rapidjson::Value(path.c_str(), allocator), |
853 | 80 | allocator); |
854 | 80 | int64_t idx_file_size = 0; |
855 | 80 | auto st = fs->file_size(path, &idx_file_size); |
856 | 80 | if (st != Status::OK()) { |
857 | 0 | LOG(WARNING) << "show nested index file get file size error, file: " << path |
858 | 0 | << ", error: " << st.msg(); |
859 | 0 | return st; |
860 | 0 | } |
861 | 80 | json_value.AddMember("idx_file_size", rapidjson::Value(idx_file_size).Move(), |
862 | 80 | allocator); |
863 | 80 | return Status::OK(); |
864 | 80 | }; |
865 | | |
866 | 74 | auto process_files = [&allocator, &index_file_reader](auto& index_meta, |
867 | 74 | rapidjson::Value& indices, |
868 | 140 | rapidjson::Value& index) -> Status { |
869 | 140 | rapidjson::Value files_value(rapidjson::kArrayType); |
870 | 140 | std::vector<std::string> files; |
871 | 140 | auto ret = index_file_reader->open(&index_meta); |
872 | 140 | if (!ret.has_value()) { |
873 | 0 | LOG(INFO) << "IndexFileReader open error:" << ret.error(); |
874 | 0 | return Status::InternalError("IndexFileReader open error"); |
875 | 0 | } |
876 | 140 | using T = std::decay_t<decltype(ret)>; |
877 | 140 | auto reader = std::forward<T>(ret).value(); |
878 | 140 | reader->list(&files); |
879 | 860 | for (auto& file : files) { |
880 | 860 | rapidjson::Value file_value(rapidjson::kObjectType); |
881 | 860 | auto size = reader->fileLength(file.c_str()); |
882 | 860 | file_value.AddMember("name", rapidjson::Value(file.c_str(), allocator), allocator); |
883 | 860 | file_value.AddMember("size", rapidjson::Value(size).Move(), allocator); |
884 | 860 | files_value.PushBack(file_value, allocator); |
885 | 860 | } |
886 | 140 | index.AddMember("files", files_value, allocator); |
887 | 140 | indices.PushBack(index, allocator); |
888 | 140 | return Status::OK(); |
889 | 140 | }; beta_rowset.cpp:_ZZN5doris10BetaRowset22show_nested_index_fileEPN9rapidjson12GenericValueINS1_4UTF8IcEENS1_19MemoryPoolAllocatorINS1_12CrtAllocatorEEEEERS7_ENK3$_0clINS_11TabletIndexEEENS_6StatusERT_RS8_SH_ Line | Count | Source | 868 | 128 | rapidjson::Value& index) -> Status { | 869 | 128 | rapidjson::Value files_value(rapidjson::kArrayType); | 870 | 128 | std::vector<std::string> files; | 871 | 128 | auto ret = index_file_reader->open(&index_meta); | 872 | 128 | if (!ret.has_value()) { | 873 | 0 | LOG(INFO) << "IndexFileReader open error:" << ret.error(); | 874 | 0 | return Status::InternalError("IndexFileReader open error"); | 875 | 0 | } | 876 | 128 | using T = std::decay_t<decltype(ret)>; | 877 | 128 | auto reader = std::forward<T>(ret).value(); | 878 | 128 | reader->list(&files); | 879 | 788 | for (auto& file : files) { | 880 | 788 | rapidjson::Value file_value(rapidjson::kObjectType); | 881 | 788 | auto size = reader->fileLength(file.c_str()); | 882 | 788 | file_value.AddMember("name", rapidjson::Value(file.c_str(), allocator), allocator); | 883 | 788 | file_value.AddMember("size", rapidjson::Value(size).Move(), allocator); | 884 | 788 | files_value.PushBack(file_value, allocator); | 885 | 788 | } | 886 | 128 | index.AddMember("files", files_value, allocator); | 887 | 128 | indices.PushBack(index, allocator); | 888 | 128 | return Status::OK(); | 889 | 128 | }; |
beta_rowset.cpp:_ZZN5doris10BetaRowset22show_nested_index_fileEPN9rapidjson12GenericValueINS1_4UTF8IcEENS1_19MemoryPoolAllocatorINS1_12CrtAllocatorEEEEERS7_ENK3$_0clIKNS_11TabletIndexEEENS_6StatusERT_RS8_SI_ Line | Count | Source | 868 | 12 | rapidjson::Value& index) -> Status { | 869 | 12 | rapidjson::Value files_value(rapidjson::kArrayType); | 870 | 12 | std::vector<std::string> files; | 871 | 12 | auto ret = index_file_reader->open(&index_meta); | 872 | 12 | if (!ret.has_value()) { | 873 | 0 | LOG(INFO) << "IndexFileReader open error:" << ret.error(); | 874 | 0 | return Status::InternalError("IndexFileReader open error"); | 875 | 0 | } | 876 | 12 | using T = std::decay_t<decltype(ret)>; | 877 | 12 | auto reader = std::forward<T>(ret).value(); | 878 | 12 | reader->list(&files); | 879 | 72 | for (auto& file : files) { | 880 | 72 | rapidjson::Value file_value(rapidjson::kObjectType); | 881 | 72 | auto size = reader->fileLength(file.c_str()); | 882 | 72 | file_value.AddMember("name", rapidjson::Value(file.c_str(), allocator), allocator); | 883 | 72 | file_value.AddMember("size", rapidjson::Value(size).Move(), allocator); | 884 | 72 | files_value.PushBack(file_value, allocator); | 885 | 72 | } | 886 | 12 | index.AddMember("files", files_value, allocator); | 887 | 12 | indices.PushBack(index, allocator); | 888 | 12 | return Status::OK(); | 889 | 12 | }; |
|
890 | | |
891 | 74 | if (storage_format != InvertedIndexStorageFormatPB::V1) { |
892 | 68 | auto path = InvertedIndexDescriptor::get_index_file_path_v2(index_file_path_prefix); |
893 | 68 | auto st = add_file_info_to_json(path, segment); |
894 | 68 | if (!st.ok()) { |
895 | 0 | return st; |
896 | 0 | } |
897 | 68 | rapidjson::Value indices(rapidjson::kArrayType); |
898 | 128 | for (auto& dir : *dirs) { |
899 | 128 | rapidjson::Value index(rapidjson::kObjectType); |
900 | 128 | auto index_id = dir.first.first; |
901 | 128 | auto index_suffix = dir.first.second; |
902 | 128 | index.AddMember("index_id", rapidjson::Value(index_id).Move(), allocator); |
903 | 128 | index.AddMember("index_suffix", rapidjson::Value(index_suffix.c_str(), allocator), |
904 | 128 | allocator); |
905 | | |
906 | 128 | rapidjson::Value files_value(rapidjson::kArrayType); |
907 | 128 | std::vector<std::string> files; |
908 | 128 | doris::TabletIndexPB index_pb; |
909 | 128 | index_pb.set_index_id(index_id); |
910 | 128 | index_pb.set_index_suffix_name(index_suffix); |
911 | 128 | TabletIndex index_meta; |
912 | 128 | index_meta.init_from_pb(index_pb); |
913 | | |
914 | 128 | auto status = process_files(index_meta, indices, index); |
915 | 128 | if (!status.ok()) { |
916 | 0 | return status; |
917 | 0 | } |
918 | 128 | } |
919 | 68 | segment.AddMember("indices", indices, allocator); |
920 | 68 | segments.PushBack(segment, allocator); |
921 | 68 | } else { |
922 | 6 | rapidjson::Value indices(rapidjson::kArrayType); |
923 | 18 | for (auto column : _rowset_meta->tablet_schema()->columns()) { |
924 | 18 | auto index_metas = _rowset_meta->tablet_schema()->inverted_indexs(*column); |
925 | 18 | for (const auto& index_meta : index_metas) { |
926 | 12 | rapidjson::Value index(rapidjson::kObjectType); |
927 | 12 | auto index_id = index_meta->index_id(); |
928 | 12 | auto index_suffix = index_meta->get_index_suffix(); |
929 | 12 | index.AddMember("index_id", rapidjson::Value(index_id).Move(), allocator); |
930 | 12 | index.AddMember("index_suffix", |
931 | 12 | rapidjson::Value(index_suffix.c_str(), allocator), allocator); |
932 | 12 | auto path = InvertedIndexDescriptor::get_index_file_path_v1( |
933 | 12 | index_file_path_prefix, index_id, index_suffix); |
934 | 12 | RETURN_IF_ERROR(add_file_info_to_json(path, index)); |
935 | 12 | RETURN_IF_ERROR(process_files(*index_meta, indices, index)); |
936 | 12 | } |
937 | 18 | } |
938 | 6 | segment.AddMember("indices", indices, allocator); |
939 | 6 | segments.PushBack(segment, allocator); |
940 | 6 | } |
941 | 74 | } |
942 | 146 | rowset_value->AddMember("segments", segments, allocator); |
943 | 146 | return Status::OK(); |
944 | 147 | } |
945 | | #include "common/compile_check_end.h" |
946 | | } // namespace doris |