Coverage Report

Created: 2025-07-27 03:09

/root/doris/be/src/olap/rowset/rowset.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "olap/rowset/rowset.h"
19
20
#include <gen_cpp/olap_file.pb.h>
21
22
#include "common/config.h"
23
#include "io/cache/block_file_cache_factory.h"
24
#include "olap/olap_define.h"
25
#include "olap/rowset/segment_v2/inverted_index_desc.h"
26
#include "olap/segment_loader.h"
27
#include "olap/tablet_schema.h"
28
#include "util/time.h"
29
#include "util/trace.h"
30
31
namespace doris {
32
33
Rowset::Rowset(const TabletSchemaSPtr& schema, RowsetMetaSharedPtr rowset_meta,
34
               std::string tablet_path)
35
        : _rowset_meta(std::move(rowset_meta)),
36
          _tablet_path(std::move(tablet_path)),
37
12.6k
          _refs_by_reader(0) {
38
#ifndef BE_TEST
39
    DCHECK(!is_local() || !_tablet_path.empty()); // local rowset MUST has tablet path
40
#endif
41
42
12.6k
    _is_pending = true;
43
44
    // Generally speaking, as long as a rowset has a version, it can be considered not to be in a pending state.
45
    // However, if the rowset was created through ingesting binlogs, it will have a version but should still be
46
    // considered in a pending state because the ingesting txn has not yet been committed.
47
12.6k
    if (_rowset_meta->has_version() && _rowset_meta->start_version() > 0 &&
  Branch (47:9): [True: 12.4k, False: 152]
  Branch (47:40): [True: 11.8k, False: 588]
48
12.6k
        _rowset_meta->rowset_state() != COMMITTED) {
  Branch (48:9): [True: 11.8k, False: 0]
49
11.8k
        _is_pending = false;
50
11.8k
    }
51
52
12.6k
    if (_is_pending) {
  Branch (52:9): [True: 740, False: 11.8k]
53
740
        _is_cumulative = false;
54
11.8k
    } else {
55
11.8k
        Version version = _rowset_meta->version();
56
11.8k
        _is_cumulative = version.first != version.second;
57
11.8k
    }
58
    // build schema from RowsetMeta.tablet_schema or Tablet.tablet_schema
59
12.6k
    _schema = _rowset_meta->tablet_schema() ? _rowset_meta->tablet_schema() : schema;
  Branch (59:15): [True: 1.31k, False: 11.3k]
60
12.6k
}
61
62
1.22k
Status Rowset::load(bool use_cache) {
63
    // if the state is ROWSET_UNLOADING it means close() is called
64
    // and the rowset is already loaded, and the resource is not closed yet.
65
1.22k
    if (_rowset_state_machine.rowset_state() == ROWSET_LOADED) {
  Branch (65:9): [True: 741, False: 479]
66
741
        return Status::OK();
67
741
    }
68
479
    {
69
        // before lock, if rowset state is ROWSET_UNLOADING, maybe it is doing do_close in release
70
479
        std::lock_guard load_lock(_lock);
71
        // after lock, if rowset state is ROWSET_UNLOADING, it is ok to return
72
479
        if (_rowset_state_machine.rowset_state() == ROWSET_UNLOADED) {
  Branch (72:13): [True: 479, False: 0]
73
            // first do load, then change the state
74
479
            RETURN_IF_ERROR(do_load(use_cache));
75
479
            RETURN_IF_ERROR(_rowset_state_machine.on_load());
76
479
        }
77
479
    }
78
    // load is done
79
479
    VLOG_CRITICAL << "rowset is loaded. " << rowset_id()
Line
Count
Source
43
0
#define VLOG_CRITICAL VLOG(1)
80
0
                  << ", rowset version:" << rowset_meta()->version()
81
0
                  << ", state from ROWSET_UNLOADED to ROWSET_LOADED. tabletid:"
82
0
                  << _rowset_meta->tablet_id();
83
479
    return Status::OK();
84
479
}
85
86
16
void Rowset::make_visible(Version version) {
87
16
    _is_pending = false;
88
16
    _rowset_meta->set_version(version);
89
16
    _rowset_meta->set_rowset_state(VISIBLE);
90
    // update create time to the visible time,
91
    // it's used to skip recently published version during compaction
92
16
    _rowset_meta->set_creation_time(UnixSeconds());
93
94
16
    if (_rowset_meta->has_delete_predicate()) {
  Branch (94:9): [True: 0, False: 16]
95
0
        _rowset_meta->mutable_delete_predicate()->set_version(version.first);
96
0
    }
97
16
}
98
99
0
void Rowset::set_version(Version version) {
100
0
    _rowset_meta->set_version(version);
101
0
}
102
103
0
bool Rowset::check_rowset_segment() {
104
0
    std::lock_guard load_lock(_lock);
105
0
    return check_current_rowset_segment();
106
0
}
107
108
0
std::string Rowset::get_rowset_info_str() {
109
0
    std::string disk_size = PrettyPrinter::print(
110
0
            static_cast<uint64_t>(_rowset_meta->total_disk_size()), TUnit::BYTES);
111
0
    return fmt::format("[{}-{}] {} {} {} {} {}", start_version(), end_version(), num_segments(),
112
0
                       _rowset_meta->has_delete_predicate() ? "DELETE" : "DATA",
  Branch (112:24): [True: 0, False: 0]
113
0
                       SegmentsOverlapPB_Name(_rowset_meta->segments_overlap()),
114
0
                       rowset_id().to_string(), disk_size);
115
0
}
116
117
21.3k
void Rowset::clear_cache() {
118
21.3k
    {
119
21.3k
        SCOPED_SIMPLE_TRACE_IF_TIMEOUT(std::chrono::seconds(1));
Line
Count
Source
31
21.3k
    SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT(timeout, LOG(WARNING))
Line
Count
Source
41
21.3k
    using namespace std::chrono_literals;                                               \
42
21.3k
    auto VARNAME_LINENUM(scoped_simple_trace) = doris::MonotonicMicros();               \
43
21.3k
    SCOPED_CLEANUP({                                                                    \
Line
Count
Source
34
21.3k
    auto VARNAME_LINENUM(scoped_cleanup) = MakeScopedCleanup([&] { func_body });
44
21.3k
        auto VARNAME_LINENUM(timeout_us) =                                              \
45
21.3k
                std::chrono::duration_cast<std::chrono::microseconds>(timeout).count(); \
46
21.3k
        auto VARNAME_LINENUM(cost_us) =                                                 \
47
21.3k
                doris::MonotonicMicros() - VARNAME_LINENUM(scoped_simple_trace);        \
48
21.3k
        if (VARNAME_LINENUM(cost_us) >= VARNAME_LINENUM(timeout_us)) {                  \
49
21.3k
            stream << "Simple trace cost(us): " << VARNAME_LINENUM(cost_us);            \
50
21.3k
        }                                                                               \
51
21.3k
    })
120
21.3k
        SegmentLoader::instance()->erase_segments(rowset_id(), num_segments());
121
21.3k
    }
122
21.3k
    {
123
21.3k
        SCOPED_SIMPLE_TRACE_IF_TIMEOUT(std::chrono::seconds(1));
Line
Count
Source
31
21.3k
    SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT(timeout, LOG(WARNING))
Line
Count
Source
41
21.3k
    using namespace std::chrono_literals;                                               \
42
21.3k
    auto VARNAME_LINENUM(scoped_simple_trace) = doris::MonotonicMicros();               \
43
21.3k
    SCOPED_CLEANUP({                                                                    \
Line
Count
Source
34
21.3k
    auto VARNAME_LINENUM(scoped_cleanup) = MakeScopedCleanup([&] { func_body });
44
21.3k
        auto VARNAME_LINENUM(timeout_us) =                                              \
45
21.3k
                std::chrono::duration_cast<std::chrono::microseconds>(timeout).count(); \
46
21.3k
        auto VARNAME_LINENUM(cost_us) =                                                 \
47
21.3k
                doris::MonotonicMicros() - VARNAME_LINENUM(scoped_simple_trace);        \
48
21.3k
        if (VARNAME_LINENUM(cost_us) >= VARNAME_LINENUM(timeout_us)) {                  \
49
21.3k
            stream << "Simple trace cost(us): " << VARNAME_LINENUM(cost_us);            \
50
21.3k
        }                                                                               \
51
21.3k
    })
124
21.3k
        clear_inverted_index_cache();
125
21.3k
    }
126
21.3k
    if (config::enable_file_cache) {
  Branch (126:9): [True: 0, False: 21.3k]
127
0
        for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
  Branch (127:30): [True: 0, False: 0]
128
0
            auto file_key = segment_v2::Segment::file_cache_key(rowset_id().to_string(), seg_id);
129
0
            auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
130
0
            file_cache->remove_if_cached_async(file_key);
131
0
        }
132
133
        // inverted index
134
0
        auto file_names = get_index_file_names();
135
0
        for (const auto& file_name : file_names) {
  Branch (135:36): [True: 0, False: 0]
136
0
            auto file_key = io::BlockFileCache::hash(file_name);
137
0
            auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
138
0
            file_cache->remove_if_cached_async(file_key);
139
0
        }
140
0
    }
141
21.3k
}
142
143
8.39k
Result<std::string> Rowset::segment_path(int64_t seg_id) {
144
8.39k
    if (is_local()) {
  Branch (144:9): [True: 8.39k, False: 4]
145
8.39k
        return local_segment_path(_tablet_path, _rowset_meta->rowset_id().to_string(), seg_id);
146
8.39k
    }
147
148
4
    return _rowset_meta->remote_storage_resource().transform([=, this](auto&& storage_resource) {
149
4
        return storage_resource->remote_segment_path(_rowset_meta->tablet_id(),
150
4
                                                     _rowset_meta->rowset_id().to_string(), seg_id);
151
4
    });
152
8.39k
}
153
154
1
Status check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets) {
155
1
    if (rowsets.size() < 2) {
  Branch (155:9): [True: 0, False: 1]
156
0
        return Status::OK();
157
0
    }
158
1
    auto prev = rowsets.begin();
159
24
    for (auto it = rowsets.begin() + 1; it != rowsets.end(); ++it) {
  Branch (159:41): [True: 23, False: 1]
160
23
        if ((*prev)->end_version() + 1 != (*it)->start_version()) {
  Branch (160:13): [True: 0, False: 23]
161
0
            return Status::InternalError("versions are not continuity: prev={} cur={}",
162
0
                                         (*prev)->version().to_string(),
163
0
                                         (*it)->version().to_string());
164
0
        }
165
23
        prev = it;
166
23
    }
167
1
    return Status::OK();
168
1
}
169
170
0
void Rowset::merge_rowset_meta(const RowsetMeta& other) {
171
0
    _rowset_meta->merge_rowset_meta(other);
172
    // rowset->meta_meta()->tablet_schema() maybe updated so make sure _schema is
173
    // consistent with rowset meta
174
0
    _schema = _rowset_meta->tablet_schema();
175
0
}
176
177
2
std::vector<std::string> Rowset::get_index_file_names() {
178
2
    std::vector<std::string> file_names;
179
2
    auto idx_version = _schema->get_inverted_index_storage_format();
180
6
    for (int64_t seg_id = 0; seg_id < num_segments(); ++seg_id) {
  Branch (180:30): [True: 4, False: 2]
181
4
        if (idx_version == InvertedIndexStorageFormatPB::V1) {
  Branch (181:13): [True: 2, False: 2]
182
4
            for (const auto& index : _schema->inverted_indexes()) {
  Branch (182:36): [True: 4, False: 2]
183
4
                auto file_name = segment_v2::InvertedIndexDescriptor::get_index_file_name_v1(
184
4
                        rowset_id().to_string(), seg_id, index->index_id(),
185
4
                        index->get_index_suffix());
186
4
                file_names.emplace_back(std::move(file_name));
187
4
            }
188
2
        } else {
189
2
            auto file_name = segment_v2::InvertedIndexDescriptor::get_index_file_name_v2(
190
2
                    rowset_id().to_string(), seg_id);
191
2
            file_names.emplace_back(std::move(file_name));
192
2
        }
193
4
    }
194
2
    return file_names;
195
2
}
196
197
} // namespace doris