be/src/storage/rowset/rowset.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/rowset/rowset.h" |
19 | | |
20 | | #include <gen_cpp/olap_file.pb.h> |
21 | | |
22 | | #include "common/cast_set.h" |
23 | | #include "common/config.h" |
24 | | #include "io/cache/block_file_cache_factory.h" |
25 | | #include "storage/index/inverted/inverted_index_desc.h" |
26 | | #include "storage/olap_define.h" |
27 | | #include "storage/segment/segment_loader.h" |
28 | | #include "storage/tablet/tablet_schema.h" |
29 | | #include "util/time.h" |
30 | | #include "util/trace.h" |
31 | | |
32 | | namespace doris { |
33 | | |
34 | | #include "common/compile_check_begin.h" |
35 | | |
36 | | Rowset::Rowset(const TabletSchemaSPtr& schema, RowsetMetaSharedPtr rowset_meta, |
37 | | std::string tablet_path) |
38 | 1.04M | : _rowset_meta(std::move(rowset_meta)), |
39 | 1.04M | _tablet_path(std::move(tablet_path)), |
40 | 1.04M | _refs_by_reader(0) { |
41 | 1.04M | #ifndef BE_TEST |
42 | 1.04M | DCHECK(!is_local() || !_tablet_path.empty()); // local rowset MUST has tablet path |
43 | 1.04M | #endif |
44 | | |
45 | 1.04M | _is_pending = true; |
46 | | |
47 | | // Generally speaking, as long as a rowset has a version, it can be considered not to be in a pending state. |
48 | | // However, if the rowset was created through ingesting binlogs, it will have a version but should still be |
49 | | // considered in a pending state because the ingesting txn has not yet been committed. |
50 | 1.04M | if (_rowset_meta->has_version() && _rowset_meta->start_version() > 0 && |
51 | 1.04M | _rowset_meta->rowset_state() != COMMITTED) { |
52 | 377k | _is_pending = false; |
53 | 377k | } |
54 | | |
55 | 1.04M | if (_is_pending) { |
56 | 672k | _is_cumulative = false; |
57 | 672k | } else { |
58 | 377k | Version version = _rowset_meta->version(); |
59 | 377k | _is_cumulative = version.first != version.second; |
60 | 377k | } |
61 | | // build schema from RowsetMeta.tablet_schema or Tablet.tablet_schema |
62 | 1.04M | _schema = _rowset_meta->tablet_schema() ? _rowset_meta->tablet_schema() : schema; |
63 | 1.04M | } |
64 | | |
65 | 5.36M | Status Rowset::load(bool use_cache) { |
66 | | // if the state is ROWSET_UNLOADING it means close() is called |
67 | | // and the rowset is already loaded, and the resource is not closed yet. |
68 | 5.36M | if (_rowset_state_machine.rowset_state() == ROWSET_LOADED) { |
69 | 5.08M | return Status::OK(); |
70 | 5.08M | } |
71 | 278k | { |
72 | | // before lock, if rowset state is ROWSET_UNLOADING, maybe it is doing do_close in release |
73 | 278k | std::lock_guard load_lock(_lock); |
74 | | // after lock, if rowset state is ROWSET_UNLOADING, it is ok to return |
75 | 283k | if (_rowset_state_machine.rowset_state() == ROWSET_UNLOADED) { |
76 | 283k | RETURN_IF_ERROR(_rowset_state_machine.on_load()); |
77 | 283k | } |
78 | 278k | } |
79 | | // load is done |
80 | 18.4E | VLOG_CRITICAL << "rowset is loaded. " << rowset_id() |
81 | 18.4E | << ", rowset version:" << rowset_meta()->version() |
82 | 18.4E | << ", state from ROWSET_UNLOADED to ROWSET_LOADED. tabletid:" |
83 | 18.4E | << _rowset_meta->tablet_id(); |
84 | 278k | return Status::OK(); |
85 | 278k | } |
86 | | |
87 | 105 | void Rowset::make_visible(Version version, int64_t commit_tso) { |
88 | 105 | _is_pending = false; |
89 | 105 | _rowset_meta->set_version(version); |
90 | 105 | _rowset_meta->set_rowset_state(VISIBLE); |
91 | | // update create time to the visible time, |
92 | | // it's used to skip recently published version during compaction |
93 | 105 | _rowset_meta->set_creation_time(UnixSeconds()); |
94 | | |
95 | 105 | if (_rowset_meta->has_delete_predicate()) { |
96 | 0 | _rowset_meta->mutable_delete_predicate()->set_version(cast_set<int32_t>(version.first)); |
97 | 0 | } |
98 | 105 | _rowset_meta->set_commit_tso(commit_tso); |
99 | 105 | } |
100 | | |
101 | 20.7k | void Rowset::set_version(Version version) { |
102 | 20.7k | _rowset_meta->set_version(version); |
103 | 20.7k | } |
104 | | |
105 | 0 | bool Rowset::check_rowset_segment() { |
106 | 0 | std::lock_guard load_lock(_lock); |
107 | 0 | return check_current_rowset_segment(); |
108 | 0 | } |
109 | | |
110 | 25.1k | std::string Rowset::get_rowset_info_str() { |
111 | 25.1k | std::string disk_size = PrettyPrinter::print( |
112 | 25.1k | static_cast<uint64_t>(_rowset_meta->total_disk_size()), TUnit::BYTES); |
113 | 25.1k | return fmt::format("[{}-{}] {} {} {} {} {}", start_version(), end_version(), num_segments(), |
114 | 25.1k | _rowset_meta->has_delete_predicate() ? "DELETE" : "DATA", |
115 | 25.1k | SegmentsOverlapPB_Name(_rowset_meta->segments_overlap()), |
116 | 25.1k | rowset_id().to_string(), disk_size); |
117 | 25.1k | } |
118 | | |
119 | 1.04M | const TabletSchemaSPtr& Rowset::tablet_schema() const { |
120 | | #ifdef BE_TEST |
121 | | // for mocking tablet schema |
122 | | return _schema; |
123 | | #endif |
124 | 1.04M | return _rowset_meta->tablet_schema() ? _rowset_meta->tablet_schema() : _schema; |
125 | 1.04M | } |
126 | | |
127 | 271k | void Rowset::clear_cache() { |
128 | 271k | { |
129 | 271k | SCOPED_SIMPLE_TRACE_IF_TIMEOUT(std::chrono::seconds(1)); |
130 | 271k | SegmentLoader::instance()->erase_segments(rowset_id(), num_segments()); |
131 | 271k | } |
132 | 271k | { |
133 | 271k | SCOPED_SIMPLE_TRACE_IF_TIMEOUT(std::chrono::seconds(1)); |
134 | 271k | clear_inverted_index_cache(); |
135 | 271k | } |
136 | 271k | if (config::enable_file_cache) { |
137 | 150k | for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { |
138 | 42.4k | auto file_key = segment_v2::Segment::file_cache_key(rowset_id().to_string(), seg_id); |
139 | 42.4k | auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key); |
140 | 42.4k | file_cache->remove_if_cached_async(file_key); |
141 | 42.4k | } |
142 | | |
143 | | // inverted index |
144 | 107k | auto file_names = get_index_file_names(); |
145 | 107k | for (const auto& file_name : file_names) { |
146 | 41.0k | auto file_key = io::BlockFileCache::hash(file_name); |
147 | 41.0k | auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key); |
148 | 41.0k | file_cache->remove_if_cached_async(file_key); |
149 | 41.0k | } |
150 | 107k | } |
151 | 271k | } |
152 | | |
153 | 225k | Result<std::string> Rowset::segment_path(int64_t seg_id) { |
154 | 225k | if (is_local()) { |
155 | 32.0k | return local_segment_path(_tablet_path, _rowset_meta->rowset_id().to_string(), seg_id); |
156 | 32.0k | } |
157 | | |
158 | 193k | return _rowset_meta->remote_storage_resource().transform([=, this](auto&& storage_resource) { |
159 | 192k | return storage_resource->remote_segment_path(_rowset_meta->tablet_id(), |
160 | 192k | _rowset_meta->rowset_id().to_string(), seg_id); |
161 | 192k | }); |
162 | 225k | } |
163 | | |
164 | 133k | Status check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets) { |
165 | 133k | if (rowsets.size() < 2) { |
166 | 30.2k | return Status::OK(); |
167 | 30.2k | } |
168 | 103k | auto prev = rowsets.begin(); |
169 | 410k | for (auto it = rowsets.begin() + 1; it != rowsets.end(); ++it) { |
170 | 307k | if ((*prev)->end_version() + 1 != (*it)->start_version()) { |
171 | 0 | return Status::InternalError("versions are not continuity: prev={} cur={}", |
172 | 0 | (*prev)->version().to_string(), |
173 | 0 | (*it)->version().to_string()); |
174 | 0 | } |
175 | 307k | prev = it; |
176 | 307k | } |
177 | 103k | return Status::OK(); |
178 | 103k | } |
179 | | |
180 | 1.46k | void Rowset::merge_rowset_meta(const RowsetMeta& other) { |
181 | 1.46k | _rowset_meta->merge_rowset_meta(other); |
182 | | // rowset->meta_meta()->tablet_schema() maybe updated so make sure _schema is |
183 | | // consistent with rowset meta |
184 | 1.46k | _schema = _rowset_meta->tablet_schema(); |
185 | 1.46k | } |
186 | | |
187 | 221k | std::vector<std::string> Rowset::get_index_file_names() { |
188 | 221k | std::vector<std::string> file_names; |
189 | 221k | auto idx_version = _schema->get_inverted_index_storage_format(); |
190 | 309k | for (int64_t seg_id = 0; seg_id < num_segments(); ++seg_id) { |
191 | 87.7k | if (idx_version == InvertedIndexStorageFormatPB::V1) { |
192 | 3.34k | for (const auto& index : _schema->inverted_indexes()) { |
193 | 471 | auto file_name = segment_v2::InvertedIndexDescriptor::get_index_file_name_v1( |
194 | 471 | rowset_id().to_string(), seg_id, index->index_id(), |
195 | 471 | index->get_index_suffix()); |
196 | 471 | file_names.emplace_back(std::move(file_name)); |
197 | 471 | } |
198 | 84.3k | } else { |
199 | 84.3k | auto file_name = segment_v2::InvertedIndexDescriptor::get_index_file_name_v2( |
200 | 84.3k | rowset_id().to_string(), seg_id); |
201 | 84.3k | file_names.emplace_back(std::move(file_name)); |
202 | 84.3k | } |
203 | 87.7k | } |
204 | 221k | return file_names; |
205 | 221k | } |
206 | | |
207 | 10.4k | int64_t Rowset::approximate_cached_data_size() { |
208 | 10.4k | if (!config::enable_file_cache) { |
209 | 0 | return 0; |
210 | 0 | } |
211 | | |
212 | 10.4k | int64_t total_cache_size = 0; |
213 | 14.2k | for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { |
214 | 3.77k | auto cache_key = segment_v2::Segment::file_cache_key(rowset_id().to_string(), seg_id); |
215 | 3.77k | int64_t cache_size = |
216 | 3.77k | io::FileCacheFactory::instance()->get_cache_file_size_by_path(cache_key); |
217 | 3.77k | total_cache_size += cache_size; |
218 | 3.77k | } |
219 | 10.4k | return total_cache_size; |
220 | 10.4k | } |
221 | | |
222 | 10.4k | int64_t Rowset::approximate_cache_index_size() { |
223 | 10.4k | if (!config::enable_file_cache) { |
224 | 0 | return 0; |
225 | 0 | } |
226 | | |
227 | 10.4k | int64_t total_cache_size = 0; |
228 | 10.4k | auto file_names = get_index_file_names(); |
229 | 10.4k | for (const auto& file_name : file_names) { |
230 | 3.76k | auto cache_key = io::BlockFileCache::hash(file_name); |
231 | 3.76k | int64_t cache_size = |
232 | 3.76k | io::FileCacheFactory::instance()->get_cache_file_size_by_path(cache_key); |
233 | 3.76k | total_cache_size += cache_size; |
234 | 3.76k | } |
235 | 10.4k | return total_cache_size; |
236 | 10.4k | } |
237 | | |
238 | 356 | std::chrono::time_point<std::chrono::system_clock> Rowset::visible_timestamp() const { |
239 | 356 | return _rowset_meta->visible_timestamp(); |
240 | 356 | } |
241 | | |
242 | | #include "common/compile_check_end.h" |
243 | | |
244 | | } // namespace doris |