/root/doris/be/src/olap/rowset/rowset.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "olap/rowset/rowset.h" |
19 | | |
20 | | #include <gen_cpp/olap_file.pb.h> |
21 | | |
22 | | #include "common/config.h" |
23 | | #include "io/cache/block_file_cache_factory.h" |
24 | | #include "olap/olap_define.h" |
25 | | #include "olap/rowset/segment_v2/inverted_index_desc.h" |
26 | | #include "olap/segment_loader.h" |
27 | | #include "olap/tablet_schema.h" |
28 | | #include "util/time.h" |
29 | | #include "util/trace.h" |
30 | | |
31 | | namespace doris { |
32 | | |
33 | | Rowset::Rowset(const TabletSchemaSPtr& schema, RowsetMetaSharedPtr rowset_meta, |
34 | | std::string tablet_path) |
35 | | : _rowset_meta(std::move(rowset_meta)), |
36 | | _tablet_path(std::move(tablet_path)), |
37 | 12.6k | _refs_by_reader(0) { |
38 | | #ifndef BE_TEST |
39 | | DCHECK(!is_local() || !_tablet_path.empty()); // local rowset MUST has tablet path |
40 | | #endif |
41 | | |
42 | 12.6k | _is_pending = true; |
43 | | |
44 | | // Generally speaking, as long as a rowset has a version, it can be considered not to be in a pending state. |
45 | | // However, if the rowset was created through ingesting binlogs, it will have a version but should still be |
46 | | // considered in a pending state because the ingesting txn has not yet been committed. |
47 | 12.6k | if (_rowset_meta->has_version() && _rowset_meta->start_version() > 0 && Branch (47:9): [True: 12.4k, False: 152]
Branch (47:40): [True: 11.8k, False: 588]
|
48 | 12.6k | _rowset_meta->rowset_state() != COMMITTED) { Branch (48:9): [True: 11.8k, False: 0]
|
49 | 11.8k | _is_pending = false; |
50 | 11.8k | } |
51 | | |
52 | 12.6k | if (_is_pending) { Branch (52:9): [True: 740, False: 11.8k]
|
53 | 740 | _is_cumulative = false; |
54 | 11.8k | } else { |
55 | 11.8k | Version version = _rowset_meta->version(); |
56 | 11.8k | _is_cumulative = version.first != version.second; |
57 | 11.8k | } |
58 | | // build schema from RowsetMeta.tablet_schema or Tablet.tablet_schema |
59 | 12.6k | _schema = _rowset_meta->tablet_schema() ? _rowset_meta->tablet_schema() : schema; Branch (59:15): [True: 1.31k, False: 11.3k]
|
60 | 12.6k | } |
61 | | |
62 | 1.22k | Status Rowset::load(bool use_cache) { |
63 | | // if the state is ROWSET_UNLOADING it means close() is called |
64 | | // and the rowset is already loaded, and the resource is not closed yet. |
65 | 1.22k | if (_rowset_state_machine.rowset_state() == ROWSET_LOADED) { Branch (65:9): [True: 741, False: 479]
|
66 | 741 | return Status::OK(); |
67 | 741 | } |
68 | 479 | { |
69 | | // before lock, if rowset state is ROWSET_UNLOADING, maybe it is doing do_close in release |
70 | 479 | std::lock_guard load_lock(_lock); |
71 | | // after lock, if rowset state is ROWSET_UNLOADING, it is ok to return |
72 | 479 | if (_rowset_state_machine.rowset_state() == ROWSET_UNLOADED) { Branch (72:13): [True: 479, False: 0]
|
73 | | // first do load, then change the state |
74 | 479 | RETURN_IF_ERROR(do_load(use_cache)); |
75 | 479 | RETURN_IF_ERROR(_rowset_state_machine.on_load()); |
76 | 479 | } |
77 | 479 | } |
78 | | // load is done |
79 | 479 | VLOG_CRITICAL << "rowset is loaded. " << rowset_id() Line | Count | Source | 43 | 0 | #define VLOG_CRITICAL VLOG(1) |
|
80 | 0 | << ", rowset version:" << rowset_meta()->version() |
81 | 0 | << ", state from ROWSET_UNLOADED to ROWSET_LOADED. tabletid:" |
82 | 0 | << _rowset_meta->tablet_id(); |
83 | 479 | return Status::OK(); |
84 | 479 | } |
85 | | |
86 | 16 | void Rowset::make_visible(Version version) { |
87 | 16 | _is_pending = false; |
88 | 16 | _rowset_meta->set_version(version); |
89 | 16 | _rowset_meta->set_rowset_state(VISIBLE); |
90 | | // update create time to the visible time, |
91 | | // it's used to skip recently published version during compaction |
92 | 16 | _rowset_meta->set_creation_time(UnixSeconds()); |
93 | | |
94 | 16 | if (_rowset_meta->has_delete_predicate()) { Branch (94:9): [True: 0, False: 16]
|
95 | 0 | _rowset_meta->mutable_delete_predicate()->set_version(version.first); |
96 | 0 | } |
97 | 16 | } |
98 | | |
99 | 0 | void Rowset::set_version(Version version) { |
100 | 0 | _rowset_meta->set_version(version); |
101 | 0 | } |
102 | | |
103 | 0 | bool Rowset::check_rowset_segment() { |
104 | 0 | std::lock_guard load_lock(_lock); |
105 | 0 | return check_current_rowset_segment(); |
106 | 0 | } |
107 | | |
108 | 0 | std::string Rowset::get_rowset_info_str() { |
109 | 0 | std::string disk_size = PrettyPrinter::print( |
110 | 0 | static_cast<uint64_t>(_rowset_meta->total_disk_size()), TUnit::BYTES); |
111 | 0 | return fmt::format("[{}-{}] {} {} {} {} {}", start_version(), end_version(), num_segments(), |
112 | 0 | _rowset_meta->has_delete_predicate() ? "DELETE" : "DATA", Branch (112:24): [True: 0, False: 0]
|
113 | 0 | SegmentsOverlapPB_Name(_rowset_meta->segments_overlap()), |
114 | 0 | rowset_id().to_string(), disk_size); |
115 | 0 | } |
116 | | |
117 | 21.3k | void Rowset::clear_cache() { |
118 | 21.3k | { |
119 | 21.3k | SCOPED_SIMPLE_TRACE_IF_TIMEOUT(std::chrono::seconds(1)); Line | Count | Source | 31 | 21.3k | SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT(timeout, LOG(WARNING)) Line | Count | Source | 41 | 21.3k | using namespace std::chrono_literals; \ | 42 | 21.3k | auto VARNAME_LINENUM(scoped_simple_trace) = doris::MonotonicMicros(); \ | 43 | 21.3k | SCOPED_CLEANUP({ \ Line | Count | Source | 34 | 21.3k | auto VARNAME_LINENUM(scoped_cleanup) = MakeScopedCleanup([&] { func_body }); |
| 44 | 21.3k | auto VARNAME_LINENUM(timeout_us) = \ | 45 | 21.3k | std::chrono::duration_cast<std::chrono::microseconds>(timeout).count(); \ | 46 | 21.3k | auto VARNAME_LINENUM(cost_us) = \ | 47 | 21.3k | doris::MonotonicMicros() - VARNAME_LINENUM(scoped_simple_trace); \ | 48 | 21.3k | if (VARNAME_LINENUM(cost_us) >= VARNAME_LINENUM(timeout_us)) { \ | 49 | 21.3k | stream << "Simple trace cost(us): " << VARNAME_LINENUM(cost_us); \ | 50 | 21.3k | } \ | 51 | 21.3k | }) |
|
|
120 | 21.3k | SegmentLoader::instance()->erase_segments(rowset_id(), num_segments()); |
121 | 21.3k | } |
122 | 21.3k | { |
123 | 21.3k | SCOPED_SIMPLE_TRACE_IF_TIMEOUT(std::chrono::seconds(1)); Line | Count | Source | 31 | 21.3k | SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT(timeout, LOG(WARNING)) Line | Count | Source | 41 | 21.3k | using namespace std::chrono_literals; \ | 42 | 21.3k | auto VARNAME_LINENUM(scoped_simple_trace) = doris::MonotonicMicros(); \ | 43 | 21.3k | SCOPED_CLEANUP({ \ Line | Count | Source | 34 | 21.3k | auto VARNAME_LINENUM(scoped_cleanup) = MakeScopedCleanup([&] { func_body }); |
| 44 | 21.3k | auto VARNAME_LINENUM(timeout_us) = \ | 45 | 21.3k | std::chrono::duration_cast<std::chrono::microseconds>(timeout).count(); \ | 46 | 21.3k | auto VARNAME_LINENUM(cost_us) = \ | 47 | 21.3k | doris::MonotonicMicros() - VARNAME_LINENUM(scoped_simple_trace); \ | 48 | 21.3k | if (VARNAME_LINENUM(cost_us) >= VARNAME_LINENUM(timeout_us)) { \ | 49 | 21.3k | stream << "Simple trace cost(us): " << VARNAME_LINENUM(cost_us); \ | 50 | 21.3k | } \ | 51 | 21.3k | }) |
|
|
124 | 21.3k | clear_inverted_index_cache(); |
125 | 21.3k | } |
126 | 21.3k | if (config::enable_file_cache) { Branch (126:9): [True: 0, False: 21.3k]
|
127 | 0 | for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { Branch (127:30): [True: 0, False: 0]
|
128 | 0 | auto file_key = segment_v2::Segment::file_cache_key(rowset_id().to_string(), seg_id); |
129 | 0 | auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key); |
130 | 0 | file_cache->remove_if_cached_async(file_key); |
131 | 0 | } |
132 | | |
133 | | // inverted index |
134 | 0 | auto file_names = get_index_file_names(); |
135 | 0 | for (const auto& file_name : file_names) { Branch (135:36): [True: 0, False: 0]
|
136 | 0 | auto file_key = io::BlockFileCache::hash(file_name); |
137 | 0 | auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key); |
138 | 0 | file_cache->remove_if_cached_async(file_key); |
139 | 0 | } |
140 | 0 | } |
141 | 21.3k | } |
142 | | |
143 | 8.39k | Result<std::string> Rowset::segment_path(int64_t seg_id) { |
144 | 8.39k | if (is_local()) { Branch (144:9): [True: 8.39k, False: 4]
|
145 | 8.39k | return local_segment_path(_tablet_path, _rowset_meta->rowset_id().to_string(), seg_id); |
146 | 8.39k | } |
147 | | |
148 | 4 | return _rowset_meta->remote_storage_resource().transform([=, this](auto&& storage_resource) { |
149 | 4 | return storage_resource->remote_segment_path(_rowset_meta->tablet_id(), |
150 | 4 | _rowset_meta->rowset_id().to_string(), seg_id); |
151 | 4 | }); |
152 | 8.39k | } |
153 | | |
154 | 1 | Status check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets) { |
155 | 1 | if (rowsets.size() < 2) { Branch (155:9): [True: 0, False: 1]
|
156 | 0 | return Status::OK(); |
157 | 0 | } |
158 | 1 | auto prev = rowsets.begin(); |
159 | 24 | for (auto it = rowsets.begin() + 1; it != rowsets.end(); ++it) { Branch (159:41): [True: 23, False: 1]
|
160 | 23 | if ((*prev)->end_version() + 1 != (*it)->start_version()) { Branch (160:13): [True: 0, False: 23]
|
161 | 0 | return Status::InternalError("versions are not continuity: prev={} cur={}", |
162 | 0 | (*prev)->version().to_string(), |
163 | 0 | (*it)->version().to_string()); |
164 | 0 | } |
165 | 23 | prev = it; |
166 | 23 | } |
167 | 1 | return Status::OK(); |
168 | 1 | } |
169 | | |
170 | 0 | void Rowset::merge_rowset_meta(const RowsetMeta& other) { |
171 | 0 | _rowset_meta->merge_rowset_meta(other); |
172 | | // rowset->meta_meta()->tablet_schema() maybe updated so make sure _schema is |
173 | | // consistent with rowset meta |
174 | 0 | _schema = _rowset_meta->tablet_schema(); |
175 | 0 | } |
176 | | |
177 | 2 | std::vector<std::string> Rowset::get_index_file_names() { |
178 | 2 | std::vector<std::string> file_names; |
179 | 2 | auto idx_version = _schema->get_inverted_index_storage_format(); |
180 | 6 | for (int64_t seg_id = 0; seg_id < num_segments(); ++seg_id) { Branch (180:30): [True: 4, False: 2]
|
181 | 4 | if (idx_version == InvertedIndexStorageFormatPB::V1) { Branch (181:13): [True: 2, False: 2]
|
182 | 4 | for (const auto& index : _schema->inverted_indexes()) { Branch (182:36): [True: 4, False: 2]
|
183 | 4 | auto file_name = segment_v2::InvertedIndexDescriptor::get_index_file_name_v1( |
184 | 4 | rowset_id().to_string(), seg_id, index->index_id(), |
185 | 4 | index->get_index_suffix()); |
186 | 4 | file_names.emplace_back(std::move(file_name)); |
187 | 4 | } |
188 | 2 | } else { |
189 | 2 | auto file_name = segment_v2::InvertedIndexDescriptor::get_index_file_name_v2( |
190 | 2 | rowset_id().to_string(), seg_id); |
191 | 2 | file_names.emplace_back(std::move(file_name)); |
192 | 2 | } |
193 | 4 | } |
194 | 2 | return file_names; |
195 | 2 | } |
196 | | |
197 | | } // namespace doris |