Coverage Report

Created: 2026-03-15 17:00

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset/rowset_meta.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/rowset/rowset_meta.h"
19
20
#include <gen_cpp/olap_file.pb.h>
21
#include <glog/logging.h>
22
23
#include <memory>
24
#include <random>
25
26
#include "cloud/cloud_storage_engine.h"
27
#include "common/logging.h"
28
#include "common/status.h"
29
#include "cpp/sync_point.h"
30
#include "exec/common/variant_util.h"
31
#include "google/protobuf/util/message_differencer.h"
32
#include "io/fs/encrypted_fs_factory.h"
33
#include "io/fs/file_system.h"
34
#include "io/fs/file_writer.h"
35
#include "io/fs/local_file_system.h"
36
#include "io/fs/packed_file_manager.h"
37
#include "io/fs/packed_file_system.h"
38
#include "json2pb/json_to_pb.h"
39
#include "json2pb/pb_to_json.h"
40
#include "runtime/exec_env.h"
41
#include "storage/olap_common.h"
42
#include "storage/storage_policy.h"
43
#include "storage/tablet/base_tablet.h"
44
#include "storage/tablet/tablet_fwd.h"
45
#include "storage/tablet/tablet_schema.h"
46
#include "storage/tablet/tablet_schema_cache.h"
47
#include "util/lru_cache.h"
48
49
namespace doris {
50
51
#include "common/compile_check_begin.h"
52
53
754k
RowsetMeta::~RowsetMeta() {
54
754k
    if (_handle) {
55
731k
        TabletSchemaCache::instance()->release(_handle);
56
731k
    }
57
754k
}
58
59
38.9k
bool RowsetMeta::init(std::string_view pb_rowset_meta) {
60
38.9k
    bool ret = _deserialize_from_pb(pb_rowset_meta);
61
38.9k
    if (!ret) {
62
1
        return false;
63
1
    }
64
38.9k
    _init();
65
38.9k
    return true;
66
38.9k
}
67
68
21.3k
bool RowsetMeta::init(const RowsetMeta* rowset_meta) {
69
21.3k
    RowsetMetaPB rowset_meta_pb;
70
21.3k
    rowset_meta->to_rowset_pb(&rowset_meta_pb);
71
21.3k
    return init_from_pb(rowset_meta_pb);
72
21.3k
}
73
74
873k
bool RowsetMeta::init_from_pb(const RowsetMetaPB& rowset_meta_pb) {
75
873k
    if (rowset_meta_pb.has_tablet_schema()) {
76
858k
        set_tablet_schema(rowset_meta_pb.tablet_schema());
77
858k
    }
78
    // Release ownership of TabletSchemaPB from `rowset_meta_pb` and then set it back to `rowset_meta_pb`,
79
    // this won't break const semantics of `rowset_meta_pb`, because `rowset_meta_pb` is not changed
80
    // before and after call this method.
81
873k
    auto& mut_rowset_meta_pb = const_cast<RowsetMetaPB&>(rowset_meta_pb);
82
873k
    auto* schema = mut_rowset_meta_pb.release_tablet_schema();
83
873k
    _rowset_meta_pb = mut_rowset_meta_pb;
84
873k
    mut_rowset_meta_pb.set_allocated_tablet_schema(schema);
85
873k
    _init();
86
873k
    return true;
87
873k
}
88
89
58
bool RowsetMeta::init_from_json(const std::string& json_rowset_meta) {
90
58
    bool ret = json2pb::JsonToProtoMessage(json_rowset_meta, &_rowset_meta_pb);
91
58
    if (!ret) {
92
1
        return false;
93
1
    }
94
57
    _init();
95
57
    return true;
96
58
}
97
98
0
bool RowsetMeta::json_rowset_meta(std::string* json_rowset_meta) {
99
0
    json2pb::Pb2JsonOptions json_options;
100
0
    json_options.pretty_json = true;
101
0
    bool ret = json2pb::ProtoMessageToJson(_rowset_meta_pb, json_rowset_meta, json_options);
102
0
    return ret;
103
0
}
104
105
1.85M
io::FileSystemSPtr RowsetMeta::physical_fs() {
106
1.85M
    if (is_local()) {
107
26.1k
        return io::global_local_filesystem();
108
26.1k
    }
109
110
1.83M
    auto storage_resource = remote_storage_resource();
111
1.83M
    if (storage_resource) {
112
1.83M
        return storage_resource.value()->fs;
113
18.4E
    } else {
114
18.4E
        LOG(WARNING) << storage_resource.error();
115
18.4E
        return nullptr;
116
18.4E
    }
117
1.83M
}
118
119
1.85M
io::FileSystemSPtr RowsetMeta::fs() {
120
1.85M
    auto fs = physical_fs();
121
122
1.85M
#ifndef BE_TEST
123
1.85M
    auto algorithm = _determine_encryption_once.call([this]() -> Result<EncryptionAlgorithmPB> {
124
114k
        auto maybe_tablet = ExecEnv::get_tablet(tablet_id());
125
114k
        if (!maybe_tablet) {
126
0
            LOG(WARNING) << "get tablet failed: " << maybe_tablet.error();
127
0
            return ResultError(maybe_tablet.error());
128
0
        }
129
114k
        auto tablet = maybe_tablet.value();
130
114k
        return tablet->tablet_meta()->encryption_algorithm();
131
114k
    });
132
1.85M
    if (!algorithm.has_value()) {
133
        // TODO: return a Result<FileSystemSPtr> in this method?
134
0
        return nullptr;
135
0
    }
136
137
    // Apply packed file system first if enabled and index_map is not empty
138
1.85M
    io::FileSystemSPtr wrapped = fs;
139
1.85M
    if (_rowset_meta_pb.packed_slice_locations_size() > 0) {
140
901k
        std::unordered_map<std::string, io::PackedSliceLocation> index_map;
141
939k
        for (const auto& [path, index_pb] : _rowset_meta_pb.packed_slice_locations()) {
142
939k
            io::PackedSliceLocation index;
143
939k
            index.packed_file_path = index_pb.packed_file_path();
144
939k
            index.offset = index_pb.offset();
145
939k
            index.size = index_pb.size();
146
939k
            index.packed_file_size =
147
18.4E
                    index_pb.has_packed_file_size() ? index_pb.packed_file_size() : -1;
148
939k
            index.tablet_id = tablet_id();
149
939k
            index.rowset_id = _rowset_id.to_string();
150
939k
            index.resource_id = wrapped->id();
151
939k
            index_map[path] = index;
152
939k
        }
153
901k
        if (!index_map.empty()) {
154
901k
            io::PackedAppendContext append_info;
155
901k
            append_info.tablet_id = tablet_id();
156
901k
            append_info.rowset_id = _rowset_id.to_string();
157
901k
            append_info.txn_id = txn_id();
158
901k
            wrapped = std::make_shared<io::PackedFileSystem>(wrapped, index_map, append_info);
159
901k
        }
160
901k
    }
161
162
    // Then apply encryption on top
163
1.85M
    wrapped = io::make_file_system(wrapped, algorithm.value());
164
1.85M
    return wrapped;
165
#else
166
    return fs;
167
#endif
168
1.85M
}
169
170
3.73M
Result<const StorageResource*> RowsetMeta::remote_storage_resource() {
171
3.73M
    if (is_local()) {
172
392
        return ResultError(Status::InternalError(
173
392
                "local rowset has no storage resource. tablet_id={} rowset_id={}", tablet_id(),
174
392
                _rowset_id.to_string()));
175
392
    }
176
177
3.73M
    if (!_storage_resource.fs) {
178
85.5k
        if (auto storage_resource = get_storage_resource(resource_id())) {
179
85.4k
            _storage_resource = std::move(storage_resource->first);
180
85.4k
        } else {
181
478
            if (config::is_cloud_mode()) {
182
                // When creating a new cluster or creating a storage resource, BE may not sync storage resource,
183
                // at the moment a query is coming, the BetaRowsetReader call loadSegment and use this method
184
                // to get the storage resource, so we need to sync storage resource here.
185
478
                ExecEnv::GetInstance()->storage_engine().to_cloud().sync_storage_vault();
186
478
                if (auto retry_resource = get_storage_resource(resource_id())) {
187
0
                    _storage_resource = std::move(retry_resource->first);
188
0
                    return &_storage_resource;
189
0
                }
190
478
            }
191
171
            return ResultError(Status::InternalError("cannot find storage resource. resource_id={}",
192
171
                                                     resource_id()));
193
171
        }
194
85.5k
    }
195
3.73M
    return &_storage_resource;
196
3.73M
}
197
198
199k
void RowsetMeta::set_remote_storage_resource(StorageResource resource) {
199
199k
    _storage_resource = std::move(resource);
200
199k
    _rowset_meta_pb.set_resource_id(_storage_resource.fs->id());
201
199k
}
202
203
349k
bool RowsetMeta::has_variant_type_in_schema() const {
204
349k
    return _schema && _schema->num_variant_columns() > 0;
205
349k
}
206
207
350k
void RowsetMeta::to_rowset_pb(RowsetMetaPB* rs_meta_pb, bool skip_schema) const {
208
350k
    *rs_meta_pb = _rowset_meta_pb;
209
350k
    if (_schema) [[likely]] {
210
328k
        rs_meta_pb->set_schema_version(_schema->schema_version());
211
328k
        if (!skip_schema) {
212
            // For cloud, separate tablet schema from rowset meta to reduce persistent size.
213
173k
            _schema->to_schema_pb(rs_meta_pb->mutable_tablet_schema());
214
173k
        }
215
328k
    }
216
350k
    rs_meta_pb->set_has_variant_type_in_schema(has_variant_type_in_schema());
217
350k
}
218
219
231k
RowsetMetaPB RowsetMeta::get_rowset_pb(bool skip_schema) const {
220
231k
    RowsetMetaPB rowset_meta_pb;
221
231k
    to_rowset_pb(&rowset_meta_pb, skip_schema);
222
231k
    return rowset_meta_pb;
223
231k
}
224
225
540k
void RowsetMeta::set_tablet_schema(const TabletSchemaSPtr& tablet_schema) {
226
540k
    if (_handle) {
227
206k
        TabletSchemaCache::instance()->release(_handle);
228
206k
    }
229
540k
    auto pair = TabletSchemaCache::instance()->insert(tablet_schema->to_key());
230
540k
    _handle = pair.first;
231
540k
    _schema = pair.second;
232
540k
}
233
234
896k
void RowsetMeta::set_tablet_schema(const TabletSchemaPB& tablet_schema) {
235
896k
    if (_handle) {
236
0
        TabletSchemaCache::instance()->release(_handle);
237
0
    }
238
896k
    auto pair = TabletSchemaCache::instance()->insert(
239
896k
            TabletSchema::deterministic_string_serialize(tablet_schema));
240
896k
    _handle = pair.first;
241
896k
    _schema = pair.second;
242
896k
}
243
244
38.9k
bool RowsetMeta::_deserialize_from_pb(std::string_view value) {
245
38.9k
    if (!_rowset_meta_pb.ParseFromArray(value.data(), cast_set<int32_t>(value.size()))) {
246
1
        _rowset_meta_pb.Clear();
247
1
        return false;
248
1
    }
249
38.9k
    if (_rowset_meta_pb.has_tablet_schema()) {
250
38.9k
        set_tablet_schema(_rowset_meta_pb.tablet_schema());
251
38.9k
        _rowset_meta_pb.set_allocated_tablet_schema(nullptr);
252
38.9k
    }
253
38.9k
    return true;
254
38.9k
}
255
256
256
bool RowsetMeta::_serialize_to_pb(std::string* value) {
257
256
    if (value == nullptr) {
258
0
        return false;
259
0
    }
260
256
    RowsetMetaPB rowset_meta_pb = _rowset_meta_pb;
261
256
    if (_schema) {
262
256
        _schema->to_schema_pb(rowset_meta_pb.mutable_tablet_schema());
263
256
    }
264
256
    return rowset_meta_pb.SerializeToString(value);
265
256
}
266
267
912k
void RowsetMeta::_init() {
268
912k
    if (_rowset_meta_pb.rowset_id() > 0) {
269
3.56k
        _rowset_id.init(_rowset_meta_pb.rowset_id());
270
908k
    } else {
271
908k
        _rowset_id.init(_rowset_meta_pb.rowset_id_v2());
272
908k
    }
273
912k
    update_metadata_size();
274
912k
}
275
276
204k
void RowsetMeta::add_segments_file_size(const std::vector<size_t>& seg_file_size) {
277
204k
    _rowset_meta_pb.set_enable_segments_file_size(true);
278
204k
    for (auto fsize : seg_file_size) {
279
64.1k
        _rowset_meta_pb.add_segments_file_size(fsize);
280
64.1k
    }
281
204k
}
282
283
1.88M
int64_t RowsetMeta::segment_file_size(int seg_id) const {
284
18.4E
    DCHECK(_rowset_meta_pb.segments_file_size().empty() ||
285
18.4E
           _rowset_meta_pb.segments_file_size_size() > seg_id)
286
18.4E
            << _rowset_meta_pb.segments_file_size_size() << ' ' << seg_id;
287
1.88M
    return _rowset_meta_pb.enable_segments_file_size()
288
1.88M
                   ? (_rowset_meta_pb.segments_file_size_size() > seg_id
289
1.85M
                              ? _rowset_meta_pb.segments_file_size(seg_id)
290
18.4E
                              : -1)
291
1.88M
                   : -1;
292
1.88M
}
293
294
227k
void RowsetMeta::set_segments_key_bounds(const std::vector<KeyBoundsPB>& segments_key_bounds) {
295
227k
    for (const KeyBoundsPB& key_bounds : segments_key_bounds) {
296
91.5k
        KeyBoundsPB* new_key_bounds = _rowset_meta_pb.add_segments_key_bounds();
297
91.5k
        *new_key_bounds = key_bounds;
298
91.5k
    }
299
300
227k
    int32_t truncation_threshold = config::segments_key_bounds_truncation_threshold;
301
227k
    if (config::random_segments_key_bounds_truncation) {
302
0
        std::mt19937 generator(std::random_device {}());
303
0
        std::uniform_int_distribution<int> distribution(-10, 40);
304
0
        truncation_threshold = distribution(generator);
305
0
    }
306
227k
    bool really_do_truncation {false};
307
227k
    if (truncation_threshold > 0) {
308
226k
        for (auto& segment_key_bounds : *_rowset_meta_pb.mutable_segments_key_bounds()) {
309
86.3k
            if (segment_key_bounds.min_key().size() > truncation_threshold) {
310
84.0k
                really_do_truncation = true;
311
84.0k
                segment_key_bounds.mutable_min_key()->resize(truncation_threshold);
312
84.0k
            }
313
86.3k
            if (segment_key_bounds.max_key().size() > truncation_threshold) {
314
84.8k
                really_do_truncation = true;
315
84.8k
                segment_key_bounds.mutable_max_key()->resize(truncation_threshold);
316
84.8k
            }
317
86.3k
        }
318
226k
    }
319
227k
    set_segments_key_bounds_truncated(really_do_truncation || is_segments_key_bounds_truncated());
320
227k
}
321
322
1.39k
void RowsetMeta::merge_rowset_meta(const RowsetMeta& other) {
323
1.39k
    set_num_segments(num_segments() + other.num_segments());
324
1.39k
    set_num_rows(num_rows() + other.num_rows());
325
1.39k
    set_data_disk_size(data_disk_size() + other.data_disk_size());
326
1.39k
    set_total_disk_size(total_disk_size() + other.total_disk_size());
327
1.39k
    set_index_disk_size(index_disk_size() + other.index_disk_size());
328
1.39k
    set_total_disk_size(data_disk_size() + index_disk_size());
329
1.39k
    set_segments_key_bounds_truncated(is_segments_key_bounds_truncated() ||
330
1.39k
                                      other.is_segments_key_bounds_truncated());
331
1.40k
    if (_rowset_meta_pb.num_segment_rows_size() > 0) {
332
1.40k
        if (other.num_segments() > 0) {
333
20
            if (other._rowset_meta_pb.num_segment_rows_size() > 0) {
334
20
                for (auto row_count : other._rowset_meta_pb.num_segment_rows()) {
335
20
                    _rowset_meta_pb.add_num_segment_rows(row_count);
336
20
                }
337
20
            } else {
338
                // This may happen when a partial update load commits in high version doirs_be
339
                // and publishes with new segments in low version doris_be. In this case, just clear
340
                // all num_segment_rows.
341
0
                _rowset_meta_pb.clear_num_segment_rows();
342
0
            }
343
20
        }
344
1.40k
    }
345
1.39k
    for (auto&& key_bound : other.get_segments_key_bounds()) {
346
20
        add_segment_key_bounds(key_bound);
347
20
    }
348
1.39k
    if (_rowset_meta_pb.enable_segments_file_size() &&
349
1.39k
        other._rowset_meta_pb.enable_segments_file_size()) {
350
1.39k
        for (auto fsize : other.segments_file_size()) {
351
20
            _rowset_meta_pb.add_segments_file_size(fsize);
352
20
        }
353
1.39k
    }
354
1.39k
    if (_rowset_meta_pb.enable_inverted_index_file_info() &&
355
1.39k
        other._rowset_meta_pb.enable_inverted_index_file_info()) {
356
178
        for (auto finfo : other.inverted_index_file_info()) {
357
0
            InvertedIndexFileInfo* new_file_info = _rowset_meta_pb.add_inverted_index_file_info();
358
0
            *new_file_info = finfo;
359
0
        }
360
178
    }
361
    // In partial update the rowset schema maybe updated when table contains variant type, so we need the newest schema to be updated
362
    // Otherwise the schema is stale and lead to wrong data read
363
1.39k
    TEST_SYNC_POINT_RETURN_WITH_VOID("RowsetMeta::merge_rowset_meta:skip_schema_merge");
364
1.39k
    if (tablet_schema()->num_variant_columns() > 0) {
365
        // merge extracted columns
366
47
        TabletSchemaSPtr merged_schema;
367
47
        static_cast<void>(variant_util::get_least_common_schema(
368
47
                {tablet_schema(), other.tablet_schema()}, nullptr, merged_schema));
369
47
        if (*_schema != *merged_schema) {
370
0
            set_tablet_schema(merged_schema);
371
0
        }
372
47
    }
373
1.39k
    if (rowset_state() == RowsetStatePB::BEGIN_PARTIAL_UPDATE) {
374
1.37k
        set_rowset_state(RowsetStatePB::COMMITTED);
375
1.37k
    }
376
377
1.39k
    update_metadata_size();
378
1.39k
}
379
380
913k
int64_t RowsetMeta::get_metadata_size() const {
381
913k
    return sizeof(RowsetMeta) + _rowset_meta_pb.ByteSizeLong();
382
913k
}
383
384
1.85M
InvertedIndexFileInfo RowsetMeta::inverted_index_file_info(int seg_id) {
385
1.85M
    return _rowset_meta_pb.enable_inverted_index_file_info()
386
1.85M
                   ? (_rowset_meta_pb.inverted_index_file_info_size() > seg_id
387
50.6k
                              ? _rowset_meta_pb.inverted_index_file_info(seg_id)
388
18.4E
                              : InvertedIndexFileInfo())
389
1.85M
                   : InvertedIndexFileInfo();
390
1.85M
}
391
392
void RowsetMeta::add_inverted_index_files_info(
393
21.1k
        const std::vector<const InvertedIndexFileInfo*>& idx_file_info) {
394
21.1k
    _rowset_meta_pb.set_enable_inverted_index_file_info(true);
395
21.1k
    for (auto finfo : idx_file_info) {
396
6.05k
        auto* new_file_info = _rowset_meta_pb.add_inverted_index_file_info();
397
6.05k
        *new_file_info = *finfo;
398
6.05k
    }
399
21.1k
}
400
401
0
bool operator==(const RowsetMeta& a, const RowsetMeta& b) {
402
0
    if (a._rowset_id != b._rowset_id) return false;
403
0
    if (a._is_removed_from_rowset_meta != b._is_removed_from_rowset_meta) return false;
404
0
    if (!google::protobuf::util::MessageDifferencer::Equals(a._rowset_meta_pb, b._rowset_meta_pb))
405
0
        return false;
406
0
    return true;
407
0
}
408
409
#include "common/compile_check_end.h"
410
411
} // namespace doris