be/src/storage/rowset/beta_rowset.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/rowset/beta_rowset.h" |
19 | | |
20 | | #include <crc32c/crc32c.h> |
21 | | #include <ctype.h> |
22 | | #include <errno.h> |
23 | | #include <fmt/format.h> |
24 | | |
25 | | #include <algorithm> |
26 | | #include <filesystem> |
27 | | #include <memory> |
28 | | #include <ostream> |
29 | | #include <utility> |
30 | | |
31 | | #include "cloud/config.h" |
32 | | #include "common/config.h" |
33 | | #include "common/logging.h" |
34 | | #include "common/metrics/doris_metrics.h" |
35 | | #include "common/status.h" |
36 | | #include "cpp/sync_point.h" |
37 | | #include "io/fs/file_reader.h" |
38 | | #include "io/fs/file_system.h" |
39 | | #include "io/fs/local_file_system.h" |
40 | | #include "io/fs/path.h" |
41 | | #include "io/fs/remote_file_system.h" |
42 | | #include "storage/index/index_file_reader.h" |
43 | | #include "storage/index/inverted/inverted_index_cache.h" |
44 | | #include "storage/index/inverted/inverted_index_desc.h" |
45 | | #include "storage/olap_common.h" |
46 | | #include "storage/olap_define.h" |
47 | | #include "storage/rowset/beta_rowset.h" |
48 | | #include "storage/rowset/beta_rowset_reader.h" |
49 | | #include "storage/rowset/rowset.h" |
50 | | #include "storage/segment/segment_loader.h" |
51 | | #include "storage/tablet/tablet_schema.h" |
52 | | #include "storage/utils.h" |
53 | | #include "util/debug_points.h" |
54 | | |
55 | | namespace doris { |
56 | | using namespace ErrorCode; |
57 | | |
58 | | std::string BetaRowset::local_segment_path_segcompacted(const std::string& tablet_path, |
59 | | const RowsetId& rowset_id, int64_t begin, |
60 | 34 | int64_t end) { |
61 | | // {root_path}/data/{shard_id}/{tablet_id}/{schema_hash}/{rowset_id}_{begin_seg}-{end_seg}.dat |
62 | 34 | return fmt::format("{}/{}_{}-{}.dat", tablet_path, rowset_id.to_string(), begin, end); |
63 | 34 | } |
64 | | |
65 | | BetaRowset::BetaRowset(const TabletSchemaSPtr& schema, const RowsetMetaSharedPtr& rowset_meta, |
66 | | std::string tablet_path) |
67 | 972k | : Rowset(schema, rowset_meta, std::move(tablet_path)) {} |
68 | | |
69 | 514k | BetaRowset::~BetaRowset() = default; |
70 | | |
71 | 961k | Status BetaRowset::init() { |
72 | 961k | return Status::OK(); // no op |
73 | 961k | } |
74 | | |
75 | | namespace { |
76 | | Status load_segment_rows_from_footer(BetaRowsetSharedPtr rowset, |
77 | | std::vector<uint32_t>* segment_rows, bool enable_segment_cache, |
78 | 39.3k | OlapReaderStatistics* read_stats) { |
79 | 39.3k | SegmentCacheHandle segment_cache_handle; |
80 | 39.3k | RETURN_IF_ERROR(SegmentLoader::instance()->load_segments( |
81 | 39.3k | rowset, &segment_cache_handle, enable_segment_cache, false, read_stats)); |
82 | 39.3k | for (const auto& segment : segment_cache_handle.get_segments()) { |
83 | 39.3k | segment_rows->emplace_back(segment->num_rows()); |
84 | 39.3k | } |
85 | 39.3k | return Status::OK(); |
86 | 39.3k | } |
87 | | |
88 | | Status check_segment_rows_consistency(const std::vector<uint32_t>& rows_from_meta, |
89 | | const std::vector<uint32_t>& rows_from_footer, |
90 | 39.3k | int64_t tablet_id, const std::string& rowset_id) { |
91 | 39.3k | DCHECK_EQ(rows_from_footer.size(), rows_from_meta.size()); |
92 | 78.7k | for (size_t i = 0; i < rows_from_footer.size(); i++) { |
93 | 39.3k | if (rows_from_footer[i] != rows_from_meta[i]) { |
94 | 0 | auto msg = fmt::format( |
95 | 0 | "segment rows mismatch between rowset meta and segment footer. " |
96 | 0 | "segment index: {}, meta rows: {}, footer rows: {}, tablet={}, rowset={}", |
97 | 0 | i, rows_from_meta[i], rows_from_footer[i], tablet_id, rowset_id); |
98 | 0 | if (config::enable_segment_rows_check_core) { |
99 | 0 | CHECK(false) << msg; |
100 | 0 | } |
101 | 0 | return Status::InternalError(msg); |
102 | 0 | } |
103 | 39.3k | } |
104 | 39.3k | return Status::OK(); |
105 | 39.3k | } |
106 | | } // namespace |
107 | | |
108 | | Status BetaRowset::get_segment_num_rows(std::vector<uint32_t>* segment_rows, |
109 | | bool enable_segment_cache, |
110 | 2.13M | OlapReaderStatistics* read_stats) { |
111 | 2.13M | #ifndef BE_TEST |
112 | | // `ROWSET_UNLOADING` is state for closed() called but owned by some readers. |
113 | | // So here `ROWSET_UNLOADING` is allowed. |
114 | 2.13M | DCHECK_NE(_rowset_state_machine.rowset_state(), ROWSET_UNLOADED); |
115 | 2.13M | #endif |
116 | 2.13M | RETURN_IF_ERROR(_load_segment_rows_once.call([this, enable_segment_cache, read_stats] { |
117 | 2.13M | auto segment_count = num_segments(); |
118 | 2.13M | if (segment_count == 0) { |
119 | 2.13M | return Status::OK(); |
120 | 2.13M | } |
121 | | |
122 | 2.13M | if (!_rowset_meta->get_num_segment_rows().empty()) { |
123 | 2.13M | if (_rowset_meta->get_num_segment_rows().size() == segment_count) { |
124 | | // use segment rows in rowset meta if eligible |
125 | 2.13M | TEST_SYNC_POINT("BetaRowset::get_segment_num_rows:use_segment_rows_from_meta"); |
126 | 2.13M | _segments_rows.assign(_rowset_meta->get_num_segment_rows().cbegin(), |
127 | 2.13M | _rowset_meta->get_num_segment_rows().cend()); |
128 | 2.13M | if (config::enable_segment_rows_consistency_check) { |
129 | | // verify segment rows from meta match segment footer |
130 | 2.13M | std::vector<uint32_t> rows_from_footer; |
131 | 2.13M | auto self = std::dynamic_pointer_cast<BetaRowset>(shared_from_this()); |
132 | 2.13M | auto load_status = load_segment_rows_from_footer( |
133 | 2.13M | self, &rows_from_footer, enable_segment_cache, read_stats); |
134 | 2.13M | if (load_status.ok()) { |
135 | 2.13M | return check_segment_rows_consistency( |
136 | 2.13M | _segments_rows, rows_from_footer, _rowset_meta->tablet_id(), |
137 | 2.13M | _rowset_meta->rowset_id().to_string()); |
138 | 2.13M | } |
139 | 2.13M | } |
140 | 2.13M | return Status::OK(); |
141 | 2.13M | } else { |
142 | 2.13M | auto msg = fmt::format( |
143 | 2.13M | "[verbose] corrupted segment rows info in rowset meta. " |
144 | 2.13M | "segment count: {}, segment rows size: {}, tablet={}, rowset={}", |
145 | 2.13M | segment_count, _rowset_meta->get_num_segment_rows().size(), |
146 | 2.13M | _rowset_meta->tablet_id(), _rowset_meta->rowset_id().to_string()); |
147 | 2.13M | if (config::enable_segment_rows_check_core) { |
148 | 2.13M | CHECK(false) << msg; |
149 | 2.13M | } |
150 | 2.13M | LOG_EVERY_SECOND(WARNING) << msg; |
151 | 2.13M | } |
152 | 2.13M | } |
153 | 2.13M | if (config::fail_when_segment_rows_not_in_rowset_meta) { |
154 | 2.13M | CHECK(false) << "[verbose] segment rows info not found in rowset meta. tablet=" |
155 | 2.13M | << _rowset_meta->tablet_id() |
156 | 2.13M | << ", rowset=" << _rowset_meta->rowset_id().to_string() |
157 | 2.13M | << ", version=" << _rowset_meta->version() |
158 | 2.13M | << ", debug_string=" << _rowset_meta->debug_string() |
159 | 2.13M | << ", stack=" << Status::InternalError("error"); |
160 | 2.13M | } |
161 | | // otherwise, read it from segment footer |
162 | 2.13M | TEST_SYNC_POINT("BetaRowset::get_segment_num_rows:load_from_segment_footer"); |
163 | 2.13M | auto self = std::dynamic_pointer_cast<BetaRowset>(shared_from_this()); |
164 | 2.13M | return load_segment_rows_from_footer(self, &_segments_rows, enable_segment_cache, |
165 | 2.13M | read_stats); |
166 | 2.13M | })); |
167 | 2.13M | segment_rows->assign(_segments_rows.cbegin(), _segments_rows.cend()); |
168 | 2.13M | return Status::OK(); |
169 | 2.13M | } |
170 | | |
171 | 21 | Status BetaRowset::get_inverted_index_size(int64_t* index_size) { |
172 | 21 | const auto& fs = _rowset_meta->fs(); |
173 | 21 | if (!fs) { |
174 | 0 | return Status::Error<INIT_FAILED>("get fs failed, resource_id={}", |
175 | 0 | _rowset_meta->resource_id()); |
176 | 0 | } |
177 | | |
178 | 21 | if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
179 | 7 | for (const auto& index : _schema->inverted_indexes()) { |
180 | 15 | for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { |
181 | 9 | auto seg_path = DORIS_TRY(segment_path(seg_id)); |
182 | 9 | int64_t file_size = 0; |
183 | | |
184 | 9 | std::string inverted_index_file_path = |
185 | 9 | InvertedIndexDescriptor::get_index_file_path_v1( |
186 | 9 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_path), |
187 | 9 | index->index_id(), index->get_index_suffix()); |
188 | 9 | RETURN_IF_ERROR(fs->file_size(inverted_index_file_path, &file_size)); |
189 | 9 | *index_size += file_size; |
190 | 9 | } |
191 | 6 | } |
192 | 14 | } else { |
193 | 20 | for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { |
194 | 14 | auto seg_path = DORIS_TRY(segment_path(seg_id)); |
195 | 14 | int64_t file_size = 0; |
196 | | |
197 | 14 | std::string inverted_index_file_path = InvertedIndexDescriptor::get_index_file_path_v2( |
198 | 14 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)); |
199 | 14 | RETURN_IF_ERROR(fs->file_size(inverted_index_file_path, &file_size)); |
200 | 6 | *index_size += file_size; |
201 | 6 | } |
202 | 14 | } |
203 | 13 | return Status::OK(); |
204 | 21 | } |
205 | | |
206 | 149k | void BetaRowset::clear_inverted_index_cache() { |
207 | 197k | for (int i = 0; i < num_segments(); ++i) { |
208 | 47.3k | auto seg_path = segment_path(i); |
209 | 47.3k | if (!seg_path) { |
210 | 0 | continue; |
211 | 0 | } |
212 | | |
213 | 47.3k | auto index_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(*seg_path); |
214 | 607k | for (const auto& column : tablet_schema()->columns()) { |
215 | 607k | auto index_metas = tablet_schema()->inverted_indexs(*column); |
216 | 607k | for (const auto& index_meta : index_metas) { |
217 | 16.4k | auto inverted_index_file_cache_key = |
218 | 16.4k | InvertedIndexDescriptor::get_index_file_cache_key( |
219 | 16.4k | index_path_prefix, index_meta->index_id(), |
220 | 16.4k | index_meta->get_index_suffix()); |
221 | 16.4k | (void)segment_v2::InvertedIndexSearcherCache::instance()->erase( |
222 | 16.4k | inverted_index_file_cache_key); |
223 | 16.4k | } |
224 | 607k | } |
225 | 47.3k | } |
226 | 149k | } |
227 | | |
228 | 41 | Status BetaRowset::get_segments_size(std::vector<size_t>* segments_size) { |
229 | 41 | auto fs = _rowset_meta->fs(); |
230 | 41 | if (!fs) { |
231 | 0 | return Status::Error<INIT_FAILED>("get fs failed, resource_id={}", |
232 | 0 | _rowset_meta->resource_id()); |
233 | 0 | } |
234 | | |
235 | 89 | for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { |
236 | 49 | auto seg_path = DORIS_TRY(segment_path(seg_id)); |
237 | 49 | int64_t file_size; |
238 | 49 | RETURN_IF_ERROR(fs->file_size(seg_path, &file_size)); |
239 | 48 | segments_size->push_back(file_size); |
240 | 48 | } |
241 | 40 | return Status::OK(); |
242 | 41 | } |
243 | | |
244 | 102k | Status BetaRowset::load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments) { |
245 | 102k | return load_segments(0, num_segments(), segments); |
246 | 102k | } |
247 | | |
248 | | Status BetaRowset::load_segments(int64_t seg_id_begin, int64_t seg_id_end, |
249 | 121k | std::vector<segment_v2::SegmentSharedPtr>* segments) { |
250 | 121k | int64_t seg_id = seg_id_begin; |
251 | 214k | while (seg_id < seg_id_end) { |
252 | 92.3k | std::shared_ptr<segment_v2::Segment> segment; |
253 | 92.3k | RETURN_IF_ERROR(load_segment(seg_id, nullptr, &segment)); |
254 | 92.3k | segments->push_back(std::move(segment)); |
255 | 92.3k | seg_id++; |
256 | 92.3k | } |
257 | 121k | return Status::OK(); |
258 | 121k | } |
259 | | |
260 | | Status BetaRowset::load_segment(int64_t seg_id, OlapReaderStatistics* stats, |
261 | 1.76M | segment_v2::SegmentSharedPtr* segment) { |
262 | 1.76M | auto fs = _rowset_meta->fs(); |
263 | 1.76M | if (!fs) { |
264 | 1 | return Status::Error<INIT_FAILED>("get fs failed"); |
265 | 1 | } |
266 | | |
267 | 1.76M | DCHECK(seg_id >= 0); |
268 | 1.76M | auto seg_path = DORIS_TRY(segment_path(seg_id)); |
269 | 1.76M | io::FileReaderOptions reader_options { |
270 | 1.76M | .cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE |
271 | 1.76M | : io::FileCachePolicy::NO_CACHE, |
272 | 1.76M | .is_doris_table = true, |
273 | 1.76M | .cache_base_path = "", |
274 | 1.76M | .file_size = _rowset_meta->segment_file_size(static_cast<int>(seg_id)), |
275 | 1.76M | .tablet_id = _rowset_meta->tablet_id(), |
276 | 1.76M | }; |
277 | | |
278 | 1.76M | auto s = segment_v2::Segment::open( |
279 | 1.76M | fs, seg_path, _rowset_meta->tablet_id(), static_cast<uint32_t>(seg_id), rowset_id(), |
280 | 1.76M | _schema, reader_options, segment, |
281 | 1.76M | _rowset_meta->inverted_index_file_info(static_cast<int>(seg_id)), stats); |
282 | 1.76M | if (!s.ok()) { |
283 | 5 | LOG(WARNING) << "failed to open segment. " << seg_path << " under rowset " << rowset_id() |
284 | 5 | << " : " << s.to_string(); |
285 | 5 | return s; |
286 | 5 | } |
287 | 1.76M | return Status::OK(); |
288 | 1.76M | } |
289 | | |
290 | 4.39M | Status BetaRowset::create_reader(RowsetReaderSharedPtr* result) { |
291 | | // NOTE: We use std::static_pointer_cast for performance |
292 | 4.39M | result->reset(new BetaRowsetReader(std::static_pointer_cast<BetaRowset>(shared_from_this()))); |
293 | 4.39M | return Status::OK(); |
294 | 4.39M | } |
295 | | |
296 | 7.96k | Status BetaRowset::remove() { |
297 | 7.96k | if (!is_local()) { |
298 | 0 | DCHECK(false) << _rowset_meta->tablet_id() << ' ' << rowset_id(); |
299 | 0 | return Status::OK(); |
300 | 0 | } |
301 | | |
302 | | // TODO should we close and remove all segment reader first? |
303 | 7.96k | VLOG_NOTICE << "begin to remove files in rowset " << rowset_id() |
304 | 5 | << ", version:" << start_version() << "-" << end_version() |
305 | 5 | << ", tabletid:" << _rowset_meta->tablet_id(); |
306 | | // If the rowset was removed, it need to remove the fds in segment cache directly |
307 | 7.96k | clear_cache(); |
308 | | |
309 | 7.96k | bool success = true; |
310 | 7.96k | Status st; |
311 | 7.96k | const auto& fs = io::global_local_filesystem(); |
312 | 9.67k | for (int i = 0; i < num_segments(); ++i) { |
313 | 1.71k | auto seg_path = local_segment_path(_tablet_path, rowset_id().to_string(), i); |
314 | 1.71k | LOG(INFO) << "deleting " << seg_path; |
315 | 1.71k | st = fs->delete_file(seg_path); |
316 | 1.71k | if (!st.ok()) { |
317 | 0 | LOG(WARNING) << st.to_string(); |
318 | 0 | success = false; |
319 | 0 | } |
320 | | |
321 | 1.71k | if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
322 | 0 | for (const auto& column : _schema->columns()) { |
323 | 0 | auto index_metas = _schema->inverted_indexs(*column); |
324 | 0 | for (const auto& index_meta : index_metas) { |
325 | 0 | std::string inverted_index_file = |
326 | 0 | InvertedIndexDescriptor::get_index_file_path_v1( |
327 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_path), |
328 | 0 | index_meta->index_id(), index_meta->get_index_suffix()); |
329 | 0 | st = fs->delete_file(inverted_index_file); |
330 | 0 | if (!st.ok()) { |
331 | 0 | LOG(WARNING) << st.to_string(); |
332 | 0 | success = false; |
333 | 0 | } |
334 | 0 | } |
335 | 0 | } |
336 | 1.71k | } else { |
337 | 1.71k | if (_schema->has_inverted_index() || _schema->has_ann_index()) { |
338 | 366 | std::string inverted_index_file = InvertedIndexDescriptor::get_index_file_path_v2( |
339 | 366 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)); |
340 | 366 | st = fs->delete_file(inverted_index_file); |
341 | 366 | if (!st.ok()) { |
342 | 0 | LOG(WARNING) << st.to_string(); |
343 | 0 | success = false; |
344 | 0 | } |
345 | 366 | } |
346 | 1.71k | } |
347 | 1.71k | } |
348 | 7.96k | if (!success) { |
349 | 0 | return Status::Error<ROWSET_DELETE_FILE_FAILED>("failed to remove files in rowset {}", |
350 | 0 | rowset_id().to_string()); |
351 | 0 | } |
352 | 7.96k | return Status::OK(); |
353 | 7.96k | } |
354 | | |
355 | 1 | void BetaRowset::do_close() { |
356 | | // do nothing. |
357 | 1 | } |
358 | | |
359 | | Status BetaRowset::link_files_to(const std::string& dir, RowsetId new_rowset_id, |
360 | | size_t new_rowset_start_seg_id, |
361 | 59 | std::set<int64_t>* without_index_uids) { |
362 | 59 | if (!is_local()) { |
363 | 0 | DCHECK(false) << _rowset_meta->tablet_id() << ' ' << rowset_id(); |
364 | 0 | return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}", |
365 | 0 | _rowset_meta->tablet_id(), rowset_id().to_string()); |
366 | 0 | } |
367 | | |
368 | 59 | const auto& local_fs = io::global_local_filesystem(); |
369 | 59 | Status status; |
370 | 59 | std::vector<std::string> linked_success_files; |
371 | 59 | Defer remove_linked_files {[&]() { // clear linked files if errors happen |
372 | 59 | if (!status.ok()) { |
373 | 0 | LOG(WARNING) << "will delete linked success files due to error " << status; |
374 | 0 | std::vector<io::Path> paths; |
375 | 0 | for (auto& file : linked_success_files) { |
376 | 0 | paths.emplace_back(file); |
377 | 0 | LOG(WARNING) << "will delete linked success file " << file << " due to error"; |
378 | 0 | } |
379 | 0 | static_cast<void>(local_fs->batch_delete(paths)); |
380 | 0 | LOG(WARNING) << "done delete linked success files due to error " << status; |
381 | 0 | } |
382 | 59 | }}; |
383 | | |
384 | 120 | for (int i = 0; i < num_segments(); ++i) { |
385 | 61 | auto dst_path = |
386 | 61 | local_segment_path(dir, new_rowset_id.to_string(), i + new_rowset_start_seg_id); |
387 | 61 | bool dst_path_exist = false; |
388 | 61 | if (!local_fs->exists(dst_path, &dst_path_exist).ok() || dst_path_exist) { |
389 | 0 | status = Status::Error<FILE_ALREADY_EXIST>( |
390 | 0 | "failed to create hard link, file already exist: {}", dst_path); |
391 | 0 | return status; |
392 | 0 | } |
393 | 61 | auto src_path = local_segment_path(_tablet_path, rowset_id().to_string(), i); |
394 | | // TODO(lingbin): how external storage support link? |
395 | | // use copy? or keep refcount to avoid being delete? |
396 | 61 | if (!local_fs->link_file(src_path, dst_path).ok()) { |
397 | 0 | status = Status::Error<OS_ERROR>("fail to create hard link. from={}, to={}, errno={}", |
398 | 0 | src_path, dst_path, Errno::no()); |
399 | 0 | return status; |
400 | 0 | } |
401 | 61 | linked_success_files.push_back(dst_path); |
402 | 61 | DBUG_EXECUTE_IF("fault_inject::BetaRowset::link_files_to::_link_inverted_index_file", { |
403 | 61 | status = Status::Error<OS_ERROR>("fault_inject link_file error"); |
404 | 61 | return status; |
405 | 61 | }); |
406 | 61 | if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
407 | 40 | for (const auto& index : _schema->inverted_indexes()) { |
408 | 9 | auto index_id = index->index_id(); |
409 | 9 | if (without_index_uids != nullptr && without_index_uids->count(index_id)) { |
410 | 1 | continue; |
411 | 1 | } |
412 | 8 | std::string inverted_index_src_file_path = |
413 | 8 | InvertedIndexDescriptor::get_index_file_path_v1( |
414 | 8 | InvertedIndexDescriptor::get_index_file_path_prefix(src_path), |
415 | 8 | index_id, index->get_index_suffix()); |
416 | 8 | std::string inverted_index_dst_file_path = |
417 | 8 | InvertedIndexDescriptor::get_index_file_path_v1( |
418 | 8 | InvertedIndexDescriptor::get_index_file_path_prefix(dst_path), |
419 | 8 | index_id, index->get_index_suffix()); |
420 | 8 | bool index_file_exists = true; |
421 | 8 | RETURN_IF_ERROR(local_fs->exists(inverted_index_src_file_path, &index_file_exists)); |
422 | 8 | if (index_file_exists) { |
423 | 8 | DBUG_EXECUTE_IF( |
424 | 8 | "fault_inject::BetaRowset::link_files_to::_link_inverted_index_file", { |
425 | 8 | status = Status::Error<OS_ERROR>( |
426 | 8 | "fault_inject link_file error from={}, to={}", |
427 | 8 | inverted_index_src_file_path, inverted_index_dst_file_path); |
428 | 8 | return status; |
429 | 8 | }); |
430 | 8 | if (!local_fs->link_file(inverted_index_src_file_path, |
431 | 8 | inverted_index_dst_file_path) |
432 | 8 | .ok()) { |
433 | 0 | status = Status::Error<OS_ERROR>( |
434 | 0 | "fail to create hard link. from={}, to={}, errno={}", |
435 | 0 | inverted_index_src_file_path, inverted_index_dst_file_path, |
436 | 0 | Errno::no()); |
437 | 0 | return status; |
438 | 0 | } |
439 | 8 | linked_success_files.push_back(inverted_index_dst_file_path); |
440 | 8 | LOG(INFO) << "success to create hard link. from=" |
441 | 8 | << inverted_index_src_file_path << ", " |
442 | 8 | << "to=" << inverted_index_dst_file_path; |
443 | 8 | } else { |
444 | 0 | LOG(WARNING) << "skip create hard link to not existed index file=" |
445 | 0 | << inverted_index_src_file_path; |
446 | 0 | } |
447 | 8 | } |
448 | 40 | } else { |
449 | 21 | if ((_schema->has_inverted_index() || _schema->has_ann_index()) && |
450 | 21 | (without_index_uids == nullptr || without_index_uids->empty())) { |
451 | 0 | std::string inverted_index_file_src = |
452 | 0 | InvertedIndexDescriptor::get_index_file_path_v2( |
453 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(src_path)); |
454 | 0 | std::string inverted_index_file_dst = |
455 | 0 | InvertedIndexDescriptor::get_index_file_path_v2( |
456 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(dst_path)); |
457 | 0 | bool index_dst_path_exist = false; |
458 | |
|
459 | 0 | if (!local_fs->exists(inverted_index_file_dst, &index_dst_path_exist).ok() || |
460 | 0 | index_dst_path_exist) { |
461 | 0 | status = Status::Error<FILE_ALREADY_EXIST>( |
462 | 0 | "failed to create hard link, file already exist: {}", |
463 | 0 | inverted_index_file_dst); |
464 | 0 | return status; |
465 | 0 | } |
466 | 0 | if (!local_fs->link_file(inverted_index_file_src, inverted_index_file_dst).ok()) { |
467 | 0 | status = Status::Error<OS_ERROR>( |
468 | 0 | "fail to create hard link. from={}, to={}, errno={}", |
469 | 0 | inverted_index_file_src, inverted_index_file_dst, Errno::no()); |
470 | 0 | return status; |
471 | 0 | } |
472 | 0 | linked_success_files.push_back(inverted_index_file_dst); |
473 | 0 | } |
474 | 21 | } |
475 | 61 | } |
476 | 59 | return Status::OK(); |
477 | 59 | } |
478 | | |
479 | 4 | Status BetaRowset::copy_files_to(const std::string& dir, const RowsetId& new_rowset_id) { |
480 | 4 | if (!is_local()) { |
481 | 0 | DCHECK(false) << _rowset_meta->tablet_id() << ' ' << rowset_id(); |
482 | 0 | return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}", |
483 | 0 | _rowset_meta->tablet_id(), rowset_id().to_string()); |
484 | 0 | } |
485 | | |
486 | 4 | bool exists = false; |
487 | 4 | for (int i = 0; i < num_segments(); ++i) { |
488 | 0 | auto dst_path = local_segment_path(dir, new_rowset_id.to_string(), i); |
489 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(dst_path, &exists)); |
490 | 0 | if (exists) { |
491 | 0 | return Status::Error<FILE_ALREADY_EXIST>("file already exist: {}", dst_path); |
492 | 0 | } |
493 | 0 | auto src_path = local_segment_path(_tablet_path, rowset_id().to_string(), i); |
494 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->copy_path(src_path, dst_path)); |
495 | 0 | if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
496 | 0 | for (const auto& column : _schema->columns()) { |
497 | | // if (column.has_inverted_index()) { |
498 | 0 | auto index_metas = _schema->inverted_indexs(*column); |
499 | 0 | for (const auto& index_meta : index_metas) { |
500 | 0 | std::string inverted_index_src_file_path = |
501 | 0 | InvertedIndexDescriptor::get_index_file_path_v1( |
502 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(src_path), |
503 | 0 | index_meta->index_id(), index_meta->get_index_suffix()); |
504 | 0 | std::string inverted_index_dst_file_path = |
505 | 0 | InvertedIndexDescriptor::get_index_file_path_v1( |
506 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(dst_path), |
507 | 0 | index_meta->index_id(), index_meta->get_index_suffix()); |
508 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->copy_path( |
509 | 0 | inverted_index_src_file_path, inverted_index_dst_file_path)); |
510 | 0 | LOG(INFO) << "success to copy file. from=" << inverted_index_src_file_path |
511 | 0 | << ", " |
512 | 0 | << "to=" << inverted_index_dst_file_path; |
513 | 0 | } |
514 | 0 | } |
515 | 0 | } else { |
516 | 0 | if (_schema->has_inverted_index() || _schema->has_ann_index()) { |
517 | 0 | std::string inverted_index_src_file = |
518 | 0 | InvertedIndexDescriptor::get_index_file_path_v2( |
519 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(src_path)); |
520 | 0 | std::string inverted_index_dst_file = |
521 | 0 | InvertedIndexDescriptor::get_index_file_path_v2( |
522 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(dst_path)); |
523 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->copy_path(inverted_index_src_file, |
524 | 0 | inverted_index_dst_file)); |
525 | 0 | LOG(INFO) << "success to copy file. from=" << inverted_index_src_file << ", " |
526 | 0 | << "to=" << inverted_index_dst_file; |
527 | 0 | } |
528 | 0 | } |
529 | 0 | } |
530 | 4 | return Status::OK(); |
531 | 4 | } |
532 | | |
533 | 5 | Status BetaRowset::upload_to(const StorageResource& dest_fs, const RowsetId& new_rowset_id) { |
534 | 5 | if (!is_local()) { |
535 | 0 | DCHECK(false) << _rowset_meta->tablet_id() << ' ' << rowset_id(); |
536 | 0 | return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}", |
537 | 0 | _rowset_meta->tablet_id(), rowset_id().to_string()); |
538 | 0 | } |
539 | | |
540 | 5 | if (num_segments() < 1) { |
541 | 3 | return Status::OK(); |
542 | 3 | } |
543 | 2 | std::vector<io::Path> local_paths; |
544 | 2 | local_paths.reserve(num_segments()); |
545 | 2 | std::vector<io::Path> dest_paths; |
546 | 2 | dest_paths.reserve(num_segments()); |
547 | 4 | for (int i = 0; i < num_segments(); ++i) { |
548 | | // Note: Here we use relative path for remote. |
549 | 2 | auto remote_seg_path = dest_fs.remote_segment_path(_rowset_meta->tablet_id(), |
550 | 2 | new_rowset_id.to_string(), i); |
551 | 2 | auto local_seg_path = local_segment_path(_tablet_path, rowset_id().to_string(), i); |
552 | 2 | dest_paths.emplace_back(remote_seg_path); |
553 | 2 | local_paths.emplace_back(local_seg_path); |
554 | 2 | if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
555 | 0 | for (const auto& column : _schema->columns()) { |
556 | | // if (column.has_inverted_index()) { |
557 | 0 | auto index_metas = _schema->inverted_indexs(*column); |
558 | 0 | for (const auto& index_meta : index_metas) { |
559 | 0 | std::string remote_inverted_index_file = |
560 | 0 | InvertedIndexDescriptor::get_index_file_path_v1( |
561 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix( |
562 | 0 | remote_seg_path), |
563 | 0 | index_meta->index_id(), index_meta->get_index_suffix()); |
564 | 0 | std::string local_inverted_index_file = |
565 | 0 | InvertedIndexDescriptor::get_index_file_path_v1( |
566 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix( |
567 | 0 | local_seg_path), |
568 | 0 | index_meta->index_id(), index_meta->get_index_suffix()); |
569 | 0 | dest_paths.emplace_back(remote_inverted_index_file); |
570 | 0 | local_paths.emplace_back(local_inverted_index_file); |
571 | 0 | } |
572 | 0 | } |
573 | 2 | } else { |
574 | 2 | if (_schema->has_inverted_index() || _schema->has_ann_index()) { |
575 | 0 | std::string remote_inverted_index_file = |
576 | 0 | InvertedIndexDescriptor::get_index_file_path_v2( |
577 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix( |
578 | 0 | remote_seg_path)); |
579 | 0 | std::string local_inverted_index_file = |
580 | 0 | InvertedIndexDescriptor::get_index_file_path_v2( |
581 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix( |
582 | 0 | local_seg_path)); |
583 | 0 | dest_paths.emplace_back(remote_inverted_index_file); |
584 | 0 | local_paths.emplace_back(local_inverted_index_file); |
585 | 0 | } |
586 | 2 | } |
587 | 2 | } |
588 | 2 | auto st = dest_fs.fs->batch_upload(local_paths, dest_paths); |
589 | 2 | if (st.ok()) { |
590 | 2 | DorisMetrics::instance()->upload_rowset_count->increment(1); |
591 | 2 | DorisMetrics::instance()->upload_total_byte->increment(total_disk_size()); |
592 | 2 | } else { |
593 | 0 | DorisMetrics::instance()->upload_fail_count->increment(1); |
594 | 0 | } |
595 | 2 | return st; |
596 | 5 | } |
597 | | |
598 | 0 | Status BetaRowset::check_file_exist() { |
599 | 0 | const auto& fs = _rowset_meta->fs(); |
600 | 0 | if (!fs) { |
601 | 0 | return Status::InternalError("fs is not initialized, resource_id={}", |
602 | 0 | _rowset_meta->resource_id()); |
603 | 0 | } |
604 | | |
605 | 0 | for (int i = 0; i < num_segments(); ++i) { |
606 | 0 | auto seg_path = DORIS_TRY(segment_path(i)); |
607 | 0 | bool seg_file_exist = false; |
608 | 0 | RETURN_IF_ERROR(fs->exists(seg_path, &seg_file_exist)); |
609 | 0 | if (!seg_file_exist) { |
610 | 0 | return Status::InternalError("data file not existed: {}, rowset_id={}", seg_path, |
611 | 0 | rowset_id().to_string()); |
612 | 0 | } |
613 | 0 | } |
614 | | |
615 | 0 | return Status::OK(); |
616 | 0 | } |
617 | | |
618 | 0 | Status BetaRowset::check_current_rowset_segment() { |
619 | 0 | const auto& fs = _rowset_meta->fs(); |
620 | 0 | if (!fs) { |
621 | 0 | return Status::InternalError("fs is not initialized, resource_id={}", |
622 | 0 | _rowset_meta->resource_id()); |
623 | 0 | } |
624 | | |
625 | 0 | for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { |
626 | 0 | auto seg_path = DORIS_TRY(segment_path(seg_id)); |
627 | |
|
628 | 0 | std::shared_ptr<segment_v2::Segment> segment; |
629 | 0 | io::FileReaderOptions reader_options { |
630 | 0 | .cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE |
631 | 0 | : io::FileCachePolicy::NO_CACHE, |
632 | 0 | .is_doris_table = true, |
633 | 0 | .cache_base_path {}, |
634 | 0 | .file_size = _rowset_meta->segment_file_size(seg_id), |
635 | 0 | .tablet_id = _rowset_meta->tablet_id(), |
636 | 0 | }; |
637 | |
|
638 | 0 | auto s = segment_v2::Segment::open(fs, seg_path, _rowset_meta->tablet_id(), seg_id, |
639 | 0 | rowset_id(), _schema, reader_options, &segment, |
640 | 0 | _rowset_meta->inverted_index_file_info(seg_id)); |
641 | 0 | if (!s.ok()) { |
642 | 0 | LOG(WARNING) << "segment can not be opened. file=" << seg_path; |
643 | 0 | return s; |
644 | 0 | } |
645 | 0 | } |
646 | | |
647 | 0 | return Status::OK(); |
648 | 0 | } |
649 | | |
650 | 4 | Status BetaRowset::add_to_binlog() { |
651 | | // FIXME(Drogon): not only local file system |
652 | 4 | if (!is_local()) { |
653 | 0 | DCHECK(false) << _rowset_meta->tablet_id() << ' ' << rowset_id(); |
654 | 0 | return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}", |
655 | 0 | _rowset_meta->tablet_id(), rowset_id().to_string()); |
656 | 0 | } |
657 | | |
658 | 4 | const auto& fs = io::global_local_filesystem(); |
659 | 4 | auto segments_num = num_segments(); |
660 | 4 | VLOG_DEBUG << fmt::format("add rowset to binlog. rowset_id={}, segments_num={}", |
661 | 0 | rowset_id().to_string(), segments_num); |
662 | | |
663 | 4 | Status status; |
664 | 4 | std::vector<std::string> linked_success_files; |
665 | 4 | Defer remove_linked_files {[&]() { // clear linked files if errors happen |
666 | 4 | if (!status.ok()) { |
667 | 0 | LOG(WARNING) << "will delete linked success files due to error " |
668 | 0 | << status.to_string_no_stack(); |
669 | 0 | std::vector<io::Path> paths; |
670 | 0 | for (auto& file : linked_success_files) { |
671 | 0 | paths.emplace_back(file); |
672 | 0 | LOG(WARNING) << "will delete linked success file " << file << " due to error"; |
673 | 0 | } |
674 | 0 | static_cast<void>(fs->batch_delete(paths)); |
675 | 0 | LOG(WARNING) << "done delete linked success files due to error " |
676 | 0 | << status.to_string_no_stack(); |
677 | 0 | } |
678 | 4 | }}; |
679 | | |
680 | | // The publish_txn might fail even if the add_to_binlog success, so we need to check |
681 | | // whether a file already exists before linking. |
682 | 4 | auto errno_is_file_exists = []() { return Errno::no() == EEXIST; }; |
683 | | |
684 | | // all segments are in the same directory, so cache binlog_dir without multi times check |
685 | 4 | std::string binlog_dir; |
686 | 6 | for (int i = 0; i < segments_num; ++i) { |
687 | 2 | auto seg_file = local_segment_path(_tablet_path, rowset_id().to_string(), i); |
688 | | |
689 | 2 | if (binlog_dir.empty()) { |
690 | 2 | binlog_dir = std::filesystem::path(seg_file).parent_path().append("_binlog").string(); |
691 | | |
692 | 2 | bool exists = true; |
693 | 2 | RETURN_IF_ERROR(fs->exists(binlog_dir, &exists)); |
694 | 2 | if (!exists) { |
695 | 2 | RETURN_IF_ERROR(fs->create_directory(binlog_dir)); |
696 | 2 | } |
697 | 2 | } |
698 | | |
699 | 2 | auto binlog_file = |
700 | 2 | (std::filesystem::path(binlog_dir) / std::filesystem::path(seg_file).filename()) |
701 | 2 | .string(); |
702 | 2 | VLOG_DEBUG << "link " << seg_file << " to " << binlog_file; |
703 | 2 | if (!fs->link_file(seg_file, binlog_file).ok() && !errno_is_file_exists()) { |
704 | 0 | status = Status::Error<OS_ERROR>("fail to create hard link. from={}, to={}, errno={}", |
705 | 0 | seg_file, binlog_file, Errno::no()); |
706 | 0 | return status; |
707 | 0 | } |
708 | 2 | linked_success_files.push_back(binlog_file); |
709 | | |
710 | 2 | if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
711 | 2 | for (const auto& index : _schema->inverted_indexes()) { |
712 | 2 | auto index_id = index->index_id(); |
713 | 2 | auto index_file = InvertedIndexDescriptor::get_index_file_path_v1( |
714 | 2 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_file), index_id, |
715 | 2 | index->get_index_suffix()); |
716 | 2 | auto binlog_index_file = (std::filesystem::path(binlog_dir) / |
717 | 2 | std::filesystem::path(index_file).filename()) |
718 | 2 | .string(); |
719 | 2 | VLOG_DEBUG << "link " << index_file << " to " << binlog_index_file; |
720 | 2 | if (!fs->link_file(index_file, binlog_index_file).ok() && !errno_is_file_exists()) { |
721 | 0 | status = Status::Error<OS_ERROR>( |
722 | 0 | "fail to create hard link. from={}, to={}, errno={}", index_file, |
723 | 0 | binlog_index_file, Errno::no()); |
724 | 0 | return status; |
725 | 0 | } |
726 | 2 | linked_success_files.push_back(binlog_index_file); |
727 | 2 | } |
728 | 1 | } else { |
729 | 1 | if (_schema->has_inverted_index() || _schema->has_ann_index()) { |
730 | 1 | auto index_file = InvertedIndexDescriptor::get_index_file_path_v2( |
731 | 1 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_file)); |
732 | 1 | auto binlog_index_file = (std::filesystem::path(binlog_dir) / |
733 | 1 | std::filesystem::path(index_file).filename()) |
734 | 1 | .string(); |
735 | 1 | VLOG_DEBUG << "link " << index_file << " to " << binlog_index_file; |
736 | 1 | if (!fs->link_file(index_file, binlog_index_file).ok() && !errno_is_file_exists()) { |
737 | 0 | status = Status::Error<OS_ERROR>( |
738 | 0 | "fail to create hard link. from={}, to={}, errno={}", index_file, |
739 | 0 | binlog_index_file, Errno::no()); |
740 | 0 | return status; |
741 | 0 | } |
742 | 1 | linked_success_files.push_back(binlog_index_file); |
743 | 1 | } |
744 | 1 | } |
745 | 2 | } |
746 | | |
747 | 4 | return Status::OK(); |
748 | 4 | } |
749 | | |
750 | 10 | Status BetaRowset::calc_file_crc(uint32_t* crc_value, int64_t* file_count) { |
751 | 10 | const auto& fs = _rowset_meta->fs(); |
752 | 10 | DBUG_EXECUTE_IF("fault_inject::BetaRowset::calc_file_crc", |
753 | 10 | { return Status::Error<OS_ERROR>("fault_inject calc_file_crc error"); }); |
754 | 10 | if (num_segments() < 1) { |
755 | 2 | *crc_value = 0x92a8fc17; // magic code from crc32c table |
756 | 2 | return Status::OK(); |
757 | 2 | } |
758 | | |
759 | | // 1. pick up all the files including dat file and idx file |
760 | 8 | std::vector<io::Path> file_paths; |
761 | 16 | for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { |
762 | 8 | auto seg_path = DORIS_TRY(segment_path(seg_id)); |
763 | 8 | file_paths.emplace_back(seg_path); |
764 | 8 | if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { |
765 | 0 | for (const auto& column : _schema->columns()) { |
766 | 0 | auto index_metas = _schema->inverted_indexs(*column); |
767 | 0 | for (const auto& index_meta : index_metas) { |
768 | 0 | std::string inverted_index_file = |
769 | 0 | InvertedIndexDescriptor::get_index_file_path_v1( |
770 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_path), |
771 | 0 | index_meta->index_id(), index_meta->get_index_suffix()); |
772 | 0 | file_paths.emplace_back(std::move(inverted_index_file)); |
773 | 0 | } |
774 | 0 | } |
775 | 8 | } else { |
776 | 8 | if (_schema->has_inverted_index() || _schema->has_ann_index()) { |
777 | 8 | std::string inverted_index_file = InvertedIndexDescriptor::get_index_file_path_v2( |
778 | 8 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)); |
779 | 8 | file_paths.emplace_back(std::move(inverted_index_file)); |
780 | 8 | } |
781 | 8 | } |
782 | 8 | } |
783 | 8 | *crc_value = 0; |
784 | 8 | *file_count = file_paths.size(); |
785 | 8 | if (!is_local()) { |
786 | 8 | return Status::OK(); |
787 | 8 | } |
788 | | |
789 | | // 2. calculate the md5sum of each file |
790 | 0 | const auto& local_fs = io::global_local_filesystem(); |
791 | 0 | DCHECK(!file_paths.empty()); |
792 | 0 | std::vector<std::string> all_file_md5; |
793 | 0 | all_file_md5.reserve(file_paths.size()); |
794 | 0 | for (const auto& file_path : file_paths) { |
795 | 0 | std::string file_md5sum; |
796 | 0 | auto status = local_fs->md5sum(file_path, &file_md5sum); |
797 | 0 | if (!status.ok()) { |
798 | 0 | return status; |
799 | 0 | } |
800 | 0 | VLOG_CRITICAL << fmt::format("calc file_md5sum finished. file_path={}, md5sum={}", |
801 | 0 | file_path.string(), file_md5sum); |
802 | 0 | all_file_md5.emplace_back(std::move(file_md5sum)); |
803 | 0 | } |
804 | 0 | std::sort(all_file_md5.begin(), all_file_md5.end()); |
805 | | |
806 | | // 3. calculate the crc_value based on all_file_md5 |
807 | 0 | DCHECK(file_paths.size() == all_file_md5.size()); |
808 | 0 | for (auto& i : all_file_md5) { |
809 | 0 | *crc_value = crc32c::Extend(*crc_value, (const uint8_t*)i.data(), i.size()); |
810 | 0 | } |
811 | |
|
812 | 0 | return Status::OK(); |
813 | 0 | } |
814 | | |
815 | | Status BetaRowset::show_nested_index_file(rapidjson::Value* rowset_value, |
816 | 147 | rapidjson::Document::AllocatorType& allocator) { |
817 | 147 | const auto& fs = _rowset_meta->fs(); |
818 | 147 | auto storage_format = _schema->get_inverted_index_storage_format(); |
819 | 147 | std::string format_str; |
820 | 147 | switch (storage_format) { |
821 | 7 | case InvertedIndexStorageFormatPB::V1: |
822 | 7 | format_str = "V1"; |
823 | 7 | break; |
824 | 71 | case InvertedIndexStorageFormatPB::V2: |
825 | 71 | format_str = "V2"; |
826 | 71 | break; |
827 | 69 | case InvertedIndexStorageFormatPB::V3: |
828 | 69 | format_str = "V3"; |
829 | 69 | break; |
830 | 0 | default: |
831 | 0 | return Status::InternalError("inverted index storage format error"); |
832 | 0 | break; |
833 | 147 | } |
834 | 147 | auto rs_id = rowset_id().to_string(); |
835 | 147 | rowset_value->AddMember("rowset_id", rapidjson::Value(rs_id.c_str(), allocator), allocator); |
836 | 147 | rowset_value->AddMember("index_storage_format", rapidjson::Value(format_str.c_str(), allocator), |
837 | 147 | allocator); |
838 | 147 | rapidjson::Value segments(rapidjson::kArrayType); |
839 | 221 | for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { |
840 | 75 | rapidjson::Value segment(rapidjson::kObjectType); |
841 | 75 | segment.AddMember("segment_id", rapidjson::Value(seg_id).Move(), allocator); |
842 | | |
843 | 75 | auto seg_path = DORIS_TRY(segment_path(seg_id)); |
844 | 75 | auto index_file_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(seg_path); |
845 | 75 | auto index_file_reader = std::make_unique<IndexFileReader>( |
846 | 75 | fs, std::string(index_file_path_prefix), storage_format, InvertedIndexFileInfo(), |
847 | 75 | _rowset_meta->tablet_id()); |
848 | 75 | RETURN_IF_ERROR(index_file_reader->init()); |
849 | 74 | auto dirs = index_file_reader->get_all_directories(); |
850 | | |
851 | 74 | auto add_file_info_to_json = [&](const std::string& path, |
852 | 80 | rapidjson::Value& json_value) -> Status { |
853 | 80 | json_value.AddMember("idx_file_path", rapidjson::Value(path.c_str(), allocator), |
854 | 80 | allocator); |
855 | 80 | int64_t idx_file_size = 0; |
856 | 80 | auto st = fs->file_size(path, &idx_file_size); |
857 | 80 | if (st != Status::OK()) { |
858 | 0 | LOG(WARNING) << "show nested index file get file size error, file: " << path |
859 | 0 | << ", error: " << st.msg(); |
860 | 0 | return st; |
861 | 0 | } |
862 | 80 | json_value.AddMember("idx_file_size", rapidjson::Value(idx_file_size).Move(), |
863 | 80 | allocator); |
864 | 80 | return Status::OK(); |
865 | 80 | }; |
866 | | |
867 | 74 | auto process_files = [&allocator, &index_file_reader](auto& index_meta, |
868 | 74 | rapidjson::Value& indices, |
869 | 140 | rapidjson::Value& index) -> Status { |
870 | 140 | rapidjson::Value files_value(rapidjson::kArrayType); |
871 | 140 | std::vector<std::string> files; |
872 | 140 | auto ret = index_file_reader->open(&index_meta); |
873 | 140 | if (!ret.has_value()) { |
874 | 0 | LOG(INFO) << "IndexFileReader open error:" << ret.error(); |
875 | 0 | return Status::InternalError("IndexFileReader open error"); |
876 | 0 | } |
877 | 140 | using T = std::decay_t<decltype(ret)>; |
878 | 140 | auto reader = std::forward<T>(ret).value(); |
879 | 140 | reader->list(&files); |
880 | 860 | for (auto& file : files) { |
881 | 860 | rapidjson::Value file_value(rapidjson::kObjectType); |
882 | 860 | auto size = reader->fileLength(file.c_str()); |
883 | 860 | file_value.AddMember("name", rapidjson::Value(file.c_str(), allocator), allocator); |
884 | 860 | file_value.AddMember("size", rapidjson::Value(size).Move(), allocator); |
885 | 860 | files_value.PushBack(file_value, allocator); |
886 | 860 | } |
887 | 140 | index.AddMember("files", files_value, allocator); |
888 | 140 | indices.PushBack(index, allocator); |
889 | 140 | return Status::OK(); |
890 | 140 | }; beta_rowset.cpp:_ZZN5doris10BetaRowset22show_nested_index_fileEPN9rapidjson12GenericValueINS1_4UTF8IcEENS1_19MemoryPoolAllocatorINS1_12CrtAllocatorEEEEERS7_ENK3$_0clINS_11TabletIndexEEENS_6StatusERT_RS8_SH_ Line | Count | Source | 869 | 128 | rapidjson::Value& index) -> Status { | 870 | 128 | rapidjson::Value files_value(rapidjson::kArrayType); | 871 | 128 | std::vector<std::string> files; | 872 | 128 | auto ret = index_file_reader->open(&index_meta); | 873 | 128 | if (!ret.has_value()) { | 874 | 0 | LOG(INFO) << "IndexFileReader open error:" << ret.error(); | 875 | 0 | return Status::InternalError("IndexFileReader open error"); | 876 | 0 | } | 877 | 128 | using T = std::decay_t<decltype(ret)>; | 878 | 128 | auto reader = std::forward<T>(ret).value(); | 879 | 128 | reader->list(&files); | 880 | 788 | for (auto& file : files) { | 881 | 788 | rapidjson::Value file_value(rapidjson::kObjectType); | 882 | 788 | auto size = reader->fileLength(file.c_str()); | 883 | 788 | file_value.AddMember("name", rapidjson::Value(file.c_str(), allocator), allocator); | 884 | 788 | file_value.AddMember("size", rapidjson::Value(size).Move(), allocator); | 885 | 788 | files_value.PushBack(file_value, allocator); | 886 | 788 | } | 887 | 128 | index.AddMember("files", files_value, allocator); | 888 | 128 | indices.PushBack(index, allocator); | 889 | 128 | return Status::OK(); | 890 | 128 | }; |
beta_rowset.cpp:_ZZN5doris10BetaRowset22show_nested_index_fileEPN9rapidjson12GenericValueINS1_4UTF8IcEENS1_19MemoryPoolAllocatorINS1_12CrtAllocatorEEEEERS7_ENK3$_0clIKNS_11TabletIndexEEENS_6StatusERT_RS8_SI_ Line | Count | Source | 869 | 12 | rapidjson::Value& index) -> Status { | 870 | 12 | rapidjson::Value files_value(rapidjson::kArrayType); | 871 | 12 | std::vector<std::string> files; | 872 | 12 | auto ret = index_file_reader->open(&index_meta); | 873 | 12 | if (!ret.has_value()) { | 874 | 0 | LOG(INFO) << "IndexFileReader open error:" << ret.error(); | 875 | 0 | return Status::InternalError("IndexFileReader open error"); | 876 | 0 | } | 877 | 12 | using T = std::decay_t<decltype(ret)>; | 878 | 12 | auto reader = std::forward<T>(ret).value(); | 879 | 12 | reader->list(&files); | 880 | 72 | for (auto& file : files) { | 881 | 72 | rapidjson::Value file_value(rapidjson::kObjectType); | 882 | 72 | auto size = reader->fileLength(file.c_str()); | 883 | 72 | file_value.AddMember("name", rapidjson::Value(file.c_str(), allocator), allocator); | 884 | 72 | file_value.AddMember("size", rapidjson::Value(size).Move(), allocator); | 885 | 72 | files_value.PushBack(file_value, allocator); | 886 | 72 | } | 887 | 12 | index.AddMember("files", files_value, allocator); | 888 | 12 | indices.PushBack(index, allocator); | 889 | 12 | return Status::OK(); | 890 | 12 | }; |
|
891 | | |
892 | 74 | if (storage_format != InvertedIndexStorageFormatPB::V1) { |
893 | 68 | auto path = InvertedIndexDescriptor::get_index_file_path_v2(index_file_path_prefix); |
894 | 68 | auto st = add_file_info_to_json(path, segment); |
895 | 68 | if (!st.ok()) { |
896 | 0 | return st; |
897 | 0 | } |
898 | 68 | rapidjson::Value indices(rapidjson::kArrayType); |
899 | 128 | for (auto& dir : *dirs) { |
900 | 128 | rapidjson::Value index(rapidjson::kObjectType); |
901 | 128 | auto index_id = dir.first.first; |
902 | 128 | auto index_suffix = dir.first.second; |
903 | 128 | index.AddMember("index_id", rapidjson::Value(index_id).Move(), allocator); |
904 | 128 | index.AddMember("index_suffix", rapidjson::Value(index_suffix.c_str(), allocator), |
905 | 128 | allocator); |
906 | | |
907 | 128 | rapidjson::Value files_value(rapidjson::kArrayType); |
908 | 128 | std::vector<std::string> files; |
909 | 128 | doris::TabletIndexPB index_pb; |
910 | 128 | index_pb.set_index_id(index_id); |
911 | 128 | index_pb.set_index_suffix_name(index_suffix); |
912 | 128 | TabletIndex index_meta; |
913 | 128 | index_meta.init_from_pb(index_pb); |
914 | | |
915 | 128 | auto status = process_files(index_meta, indices, index); |
916 | 128 | if (!status.ok()) { |
917 | 0 | return status; |
918 | 0 | } |
919 | 128 | } |
920 | 68 | segment.AddMember("indices", indices, allocator); |
921 | 68 | segments.PushBack(segment, allocator); |
922 | 68 | } else { |
923 | 6 | rapidjson::Value indices(rapidjson::kArrayType); |
924 | 18 | for (auto column : _rowset_meta->tablet_schema()->columns()) { |
925 | 18 | auto index_metas = _rowset_meta->tablet_schema()->inverted_indexs(*column); |
926 | 18 | for (const auto& index_meta : index_metas) { |
927 | 12 | rapidjson::Value index(rapidjson::kObjectType); |
928 | 12 | auto index_id = index_meta->index_id(); |
929 | 12 | auto index_suffix = index_meta->get_index_suffix(); |
930 | 12 | index.AddMember("index_id", rapidjson::Value(index_id).Move(), allocator); |
931 | 12 | index.AddMember("index_suffix", |
932 | 12 | rapidjson::Value(index_suffix.c_str(), allocator), allocator); |
933 | 12 | auto path = InvertedIndexDescriptor::get_index_file_path_v1( |
934 | 12 | index_file_path_prefix, index_id, index_suffix); |
935 | 12 | RETURN_IF_ERROR(add_file_info_to_json(path, index)); |
936 | 12 | RETURN_IF_ERROR(process_files(*index_meta, indices, index)); |
937 | 12 | } |
938 | 18 | } |
939 | 6 | segment.AddMember("indices", indices, allocator); |
940 | 6 | segments.PushBack(segment, allocator); |
941 | 6 | } |
942 | 74 | } |
943 | 146 | rowset_value->AddMember("segments", segments, allocator); |
944 | 146 | return Status::OK(); |
945 | 147 | } |
946 | | } // namespace doris |