Coverage Report

Created: 2026-03-14 13:33

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset/rowset.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/rowset/rowset.h"
19
20
#include <gen_cpp/olap_file.pb.h>
21
22
#include "common/cast_set.h"
23
#include "common/config.h"
24
#include "io/cache/block_file_cache_factory.h"
25
#include "storage/index/inverted/inverted_index_desc.h"
26
#include "storage/olap_define.h"
27
#include "storage/segment/segment_loader.h"
28
#include "storage/tablet/tablet_schema.h"
29
#include "util/time.h"
30
#include "util/trace.h"
31
32
namespace doris {
33
34
#include "common/compile_check_begin.h"
35
36
Rowset::Rowset(const TabletSchemaSPtr& schema, RowsetMetaSharedPtr rowset_meta,
37
               std::string tablet_path)
38
1.14M
        : _rowset_meta(std::move(rowset_meta)),
39
1.14M
          _tablet_path(std::move(tablet_path)),
40
1.14M
          _refs_by_reader(0) {
41
1.14M
#ifndef BE_TEST
42
1.14M
    DCHECK(!is_local() || !_tablet_path.empty()); // local rowset MUST has tablet path
43
1.14M
#endif
44
45
1.14M
    _is_pending = true;
46
47
    // Generally speaking, as long as a rowset has a version, it can be considered not to be in a pending state.
48
    // However, if the rowset was created through ingesting binlogs, it will have a version but should still be
49
    // considered in a pending state because the ingesting txn has not yet been committed.
50
1.14M
    if (_rowset_meta->has_version() && _rowset_meta->start_version() > 0 &&
51
1.14M
        _rowset_meta->rowset_state() != COMMITTED) {
52
272k
        _is_pending = false;
53
272k
    }
54
55
1.14M
    if (_is_pending) {
56
873k
        _is_cumulative = false;
57
873k
    } else {
58
271k
        Version version = _rowset_meta->version();
59
271k
        _is_cumulative = version.first != version.second;
60
271k
    }
61
    // build schema from RowsetMeta.tablet_schema or Tablet.tablet_schema
62
18.4E
    _schema = _rowset_meta->tablet_schema() ? _rowset_meta->tablet_schema() : schema;
63
1.14M
}
64
65
5.00M
Status Rowset::load(bool use_cache) {
66
    // if the state is ROWSET_UNLOADING it means close() is called
67
    // and the rowset is already loaded, and the resource is not closed yet.
68
5.00M
    if (_rowset_state_machine.rowset_state() == ROWSET_LOADED) {
69
4.70M
        return Status::OK();
70
4.70M
    }
71
300k
    {
72
        // before lock, if rowset state is ROWSET_UNLOADING, maybe it is doing do_close in release
73
300k
        std::lock_guard load_lock(_lock);
74
        // after lock, if rowset state is ROWSET_UNLOADING, it is ok to return
75
309k
        if (_rowset_state_machine.rowset_state() == ROWSET_UNLOADED) {
76
309k
            RETURN_IF_ERROR(_rowset_state_machine.on_load());
77
309k
        }
78
300k
    }
79
    // load is done
80
18.4E
    VLOG_CRITICAL << "rowset is loaded. " << rowset_id()
81
18.4E
                  << ", rowset version:" << rowset_meta()->version()
82
18.4E
                  << ", state from ROWSET_UNLOADED to ROWSET_LOADED. tabletid:"
83
18.4E
                  << _rowset_meta->tablet_id();
84
300k
    return Status::OK();
85
300k
}
86
87
140
void Rowset::make_visible(Version version) {
88
140
    _is_pending = false;
89
140
    _rowset_meta->set_version(version);
90
140
    _rowset_meta->set_rowset_state(VISIBLE);
91
    // update create time to the visible time,
92
    // it's used to skip recently published version during compaction
93
140
    _rowset_meta->set_creation_time(UnixSeconds());
94
95
140
    if (_rowset_meta->has_delete_predicate()) {
96
0
        _rowset_meta->mutable_delete_predicate()->set_version(cast_set<int32_t>(version.first));
97
0
    }
98
140
}
99
100
53.3k
void Rowset::set_version(Version version) {
101
53.3k
    _rowset_meta->set_version(version);
102
53.3k
}
103
104
0
bool Rowset::check_rowset_segment() {
105
0
    std::lock_guard load_lock(_lock);
106
0
    return check_current_rowset_segment();
107
0
}
108
109
23.0k
std::string Rowset::get_rowset_info_str() {
110
23.0k
    std::string disk_size = PrettyPrinter::print(
111
23.0k
            static_cast<uint64_t>(_rowset_meta->total_disk_size()), TUnit::BYTES);
112
23.0k
    return fmt::format("[{}-{}] {} {} {} {} {}", start_version(), end_version(), num_segments(),
113
23.0k
                       _rowset_meta->has_delete_predicate() ? "DELETE" : "DATA",
114
23.0k
                       SegmentsOverlapPB_Name(_rowset_meta->segments_overlap()),
115
23.0k
                       rowset_id().to_string(), disk_size);
116
23.0k
}
117
118
980k
const TabletSchemaSPtr& Rowset::tablet_schema() const {
119
#ifdef BE_TEST
120
    // for mocking tablet schema
121
    return _schema;
122
#endif
123
980k
    return _rowset_meta->tablet_schema() ? _rowset_meta->tablet_schema() : _schema;
124
980k
}
125
126
177k
void Rowset::clear_cache() {
127
177k
    {
128
177k
        SCOPED_SIMPLE_TRACE_IF_TIMEOUT(std::chrono::seconds(1));
129
177k
        SegmentLoader::instance()->erase_segments(rowset_id(), num_segments());
130
177k
    }
131
177k
    {
132
177k
        SCOPED_SIMPLE_TRACE_IF_TIMEOUT(std::chrono::seconds(1));
133
177k
        clear_inverted_index_cache();
134
177k
    }
135
177k
    if (config::enable_file_cache) {
136
197k
        for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
137
49.9k
            auto file_key = segment_v2::Segment::file_cache_key(rowset_id().to_string(), seg_id);
138
49.9k
            auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
139
49.9k
            file_cache->remove_if_cached_async(file_key);
140
49.9k
        }
141
142
        // inverted index
143
147k
        auto file_names = get_index_file_names();
144
147k
        for (const auto& file_name : file_names) {
145
48.3k
            auto file_key = io::BlockFileCache::hash(file_name);
146
48.3k
            auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
147
48.3k
            file_cache->remove_if_cached_async(file_key);
148
48.3k
        }
149
147k
    }
150
177k
}
151
152
1.83M
Result<std::string> Rowset::segment_path(int64_t seg_id) {
153
1.83M
    if (is_local()) {
154
16.5k
        return local_segment_path(_tablet_path, _rowset_meta->rowset_id().to_string(), seg_id);
155
16.5k
    }
156
157
1.81M
    return _rowset_meta->remote_storage_resource().transform([=, this](auto&& storage_resource) {
158
1.81M
        return storage_resource->remote_segment_path(_rowset_meta->tablet_id(),
159
1.81M
                                                     _rowset_meta->rowset_id().to_string(), seg_id);
160
1.81M
    });
161
1.83M
}
162
163
115k
Status check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets) {
164
115k
    if (rowsets.size() < 2) {
165
19.8k
        return Status::OK();
166
19.8k
    }
167
95.5k
    auto prev = rowsets.begin();
168
413k
    for (auto it = rowsets.begin() + 1; it != rowsets.end(); ++it) {
169
318k
        if ((*prev)->end_version() + 1 != (*it)->start_version()) {
170
0
            return Status::InternalError("versions are not continuity: prev={} cur={}",
171
0
                                         (*prev)->version().to_string(),
172
0
                                         (*it)->version().to_string());
173
0
        }
174
318k
        prev = it;
175
318k
    }
176
95.5k
    return Status::OK();
177
95.5k
}
178
179
3.33k
void Rowset::merge_rowset_meta(const RowsetMeta& other) {
180
3.33k
    _rowset_meta->merge_rowset_meta(other);
181
    // rowset->meta_meta()->tablet_schema() maybe updated so make sure _schema is
182
    // consistent with rowset meta
183
3.33k
    _schema = _rowset_meta->tablet_schema();
184
3.33k
}
185
186
301k
std::vector<std::string> Rowset::get_index_file_names() {
187
301k
    std::vector<std::string> file_names;
188
301k
    auto idx_version = _schema->get_inverted_index_storage_format();
189
403k
    for (int64_t seg_id = 0; seg_id < num_segments(); ++seg_id) {
190
102k
        if (idx_version == InvertedIndexStorageFormatPB::V1) {
191
3.49k
            for (const auto& index : _schema->inverted_indexes()) {
192
425
                auto file_name = segment_v2::InvertedIndexDescriptor::get_index_file_name_v1(
193
425
                        rowset_id().to_string(), seg_id, index->index_id(),
194
425
                        index->get_index_suffix());
195
425
                file_names.emplace_back(std::move(file_name));
196
425
            }
197
99.0k
        } else {
198
99.0k
            auto file_name = segment_v2::InvertedIndexDescriptor::get_index_file_name_v2(
199
99.0k
                    rowset_id().to_string(), seg_id);
200
99.0k
            file_names.emplace_back(std::move(file_name));
201
99.0k
        }
202
102k
    }
203
301k
    return file_names;
204
301k
}
205
206
9.71k
int64_t Rowset::approximate_cached_data_size() {
207
9.71k
    if (!config::enable_file_cache) {
208
0
        return 0;
209
0
    }
210
211
9.71k
    int64_t total_cache_size = 0;
212
13.4k
    for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
213
3.72k
        auto cache_key = segment_v2::Segment::file_cache_key(rowset_id().to_string(), seg_id);
214
3.72k
        int64_t cache_size =
215
3.72k
                io::FileCacheFactory::instance()->get_cache_file_size_by_path(cache_key);
216
3.72k
        total_cache_size += cache_size;
217
3.72k
    }
218
9.71k
    return total_cache_size;
219
9.71k
}
220
221
9.70k
int64_t Rowset::approximate_cache_index_size() {
222
9.70k
    if (!config::enable_file_cache) {
223
0
        return 0;
224
0
    }
225
226
9.70k
    int64_t total_cache_size = 0;
227
9.70k
    auto file_names = get_index_file_names();
228
9.70k
    for (const auto& file_name : file_names) {
229
3.70k
        auto cache_key = io::BlockFileCache::hash(file_name);
230
3.70k
        int64_t cache_size =
231
3.70k
                io::FileCacheFactory::instance()->get_cache_file_size_by_path(cache_key);
232
3.70k
        total_cache_size += cache_size;
233
3.70k
    }
234
9.70k
    return total_cache_size;
235
9.70k
}
236
237
260
std::chrono::time_point<std::chrono::system_clock> Rowset::visible_timestamp() const {
238
260
    return _rowset_meta->visible_timestamp();
239
260
}
240
241
#include "common/compile_check_end.h"
242
243
} // namespace doris