Coverage Report

Created: 2026-05-14 23:40

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset/rowset_meta.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/rowset/rowset_meta.h"
19
20
#include <gen_cpp/olap_file.pb.h>
21
#include <glog/logging.h>
22
23
#include <memory>
24
#include <random>
25
26
#include "cloud/cloud_storage_engine.h"
27
#include "common/logging.h"
28
#include "common/status.h"
29
#include "cpp/sync_point.h"
30
#include "exec/common/variant_util.h"
31
#include "google/protobuf/util/message_differencer.h"
32
#include "io/fs/encrypted_fs_factory.h"
33
#include "io/fs/file_system.h"
34
#include "io/fs/file_writer.h"
35
#include "io/fs/local_file_system.h"
36
#include "io/fs/packed_file_manager.h"
37
#include "io/fs/packed_file_system.h"
38
#include "json2pb/json_to_pb.h"
39
#include "json2pb/pb_to_json.h"
40
#include "runtime/exec_env.h"
41
#include "storage/olap_common.h"
42
#include "storage/storage_policy.h"
43
#include "storage/tablet/base_tablet.h"
44
#include "storage/tablet/tablet_fwd.h"
45
#include "storage/tablet/tablet_schema.h"
46
#include "storage/tablet/tablet_schema_cache.h"
47
#include "util/lru_cache.h"
48
49
namespace doris {
50
51
28.1k
RowsetMeta::~RowsetMeta() {
52
28.1k
    if (_handle) {
53
4.89k
        TabletSchemaCache::instance()->release(_handle);
54
4.89k
    }
55
28.1k
}
56
57
9
bool RowsetMeta::init(std::string_view pb_rowset_meta) {
58
9
    bool ret = _deserialize_from_pb(pb_rowset_meta);
59
9
    if (!ret) {
60
1
        return false;
61
1
    }
62
8
    _init();
63
8
    return true;
64
9
}
65
66
68
bool RowsetMeta::init(const RowsetMeta* rowset_meta) {
67
68
    RowsetMetaPB rowset_meta_pb;
68
68
    rowset_meta->to_rowset_pb(&rowset_meta_pb);
69
68
    return init_from_pb(rowset_meta_pb);
70
68
}
71
72
14.4k
bool RowsetMeta::init_from_pb(const RowsetMetaPB& rowset_meta_pb) {
73
14.4k
    if (rowset_meta_pb.has_tablet_schema()) {
74
556
        set_tablet_schema(rowset_meta_pb.tablet_schema());
75
556
    }
76
    // Temporarily detach TabletSchemaPB from the input proto so `_rowset_meta_pb` can copy the
77
    // rowset metadata without keeping another large schema payload. `schema` is owned by
78
    // `rowset_meta_pb` before release_tablet_schema(), and set_allocated_tablet_schema(schema)
79
    // immediately returns that ownership to the same proto before this method returns.
80
14.4k
    auto& mut_rowset_meta_pb = const_cast<RowsetMetaPB&>(rowset_meta_pb);
81
14.4k
    auto* schema = mut_rowset_meta_pb.release_tablet_schema();
82
14.4k
    _rowset_meta_pb = mut_rowset_meta_pb;
83
14.4k
    mut_rowset_meta_pb.set_allocated_tablet_schema(schema);
84
14.4k
    _init();
85
14.4k
    return true;
86
14.4k
}
87
88
61
bool RowsetMeta::init_from_json(const std::string& json_rowset_meta) {
89
61
    bool ret = json2pb::JsonToProtoMessage(json_rowset_meta, &_rowset_meta_pb);
90
61
    if (!ret) {
91
1
        return false;
92
1
    }
93
60
    _init();
94
60
    return true;
95
61
}
96
97
0
bool RowsetMeta::json_rowset_meta(std::string* json_rowset_meta) {
98
0
    json2pb::Pb2JsonOptions json_options;
99
0
    json_options.pretty_json = true;
100
0
    bool ret = json2pb::ProtoMessageToJson(_rowset_meta_pb, json_rowset_meta, json_options);
101
0
    return ret;
102
0
}
103
104
5.56k
io::FileSystemSPtr RowsetMeta::physical_fs() {
105
5.56k
    if (is_local()) {
106
5.55k
        return io::global_local_filesystem();
107
5.55k
    }
108
109
7
    auto storage_resource = remote_storage_resource();
110
7
    if (storage_resource) {
111
4
        return storage_resource.value()->fs;
112
4
    } else {
113
3
        LOG(WARNING) << storage_resource.error();
114
3
        return nullptr;
115
3
    }
116
7
}
117
118
5.56k
io::FileSystemSPtr RowsetMeta::fs() {
119
5.56k
    auto fs = physical_fs();
120
121
#ifndef BE_TEST
122
    auto algorithm = _determine_encryption_once.call([this]() -> Result<EncryptionAlgorithmPB> {
123
        auto maybe_tablet = ExecEnv::get_tablet(tablet_id());
124
        if (!maybe_tablet) {
125
            LOG(WARNING) << "get tablet failed: " << maybe_tablet.error();
126
            return ResultError(maybe_tablet.error());
127
        }
128
        auto tablet = maybe_tablet.value();
129
        return tablet->tablet_meta()->encryption_algorithm();
130
    });
131
    if (!algorithm.has_value()) {
132
        // TODO: return a Result<FileSystemSPtr> in this method?
133
        return nullptr;
134
    }
135
136
    // Apply packed file system first if enabled and index_map is not empty
137
    io::FileSystemSPtr wrapped = fs;
138
    if (_rowset_meta_pb.packed_slice_locations_size() > 0) {
139
        std::unordered_map<std::string, io::PackedSliceLocation> index_map;
140
        for (const auto& [path, index_pb] : _rowset_meta_pb.packed_slice_locations()) {
141
            io::PackedSliceLocation index;
142
            index.packed_file_path = index_pb.packed_file_path();
143
            index.offset = index_pb.offset();
144
            index.size = index_pb.size();
145
            index.packed_file_size =
146
                    index_pb.has_packed_file_size() ? index_pb.packed_file_size() : -1;
147
            index.tablet_id = tablet_id();
148
            index.rowset_id = _rowset_id.to_string();
149
            index.resource_id = wrapped->id();
150
            index_map[path] = index;
151
        }
152
        if (!index_map.empty()) {
153
            io::PackedAppendContext append_info;
154
            append_info.tablet_id = tablet_id();
155
            append_info.rowset_id = _rowset_id.to_string();
156
            append_info.txn_id = txn_id();
157
            wrapped = std::make_shared<io::PackedFileSystem>(wrapped, index_map, append_info);
158
        }
159
    }
160
161
    // Then apply encryption on top
162
    wrapped = io::make_file_system(wrapped, algorithm.value());
163
    return wrapped;
164
#else
165
5.56k
    return fs;
166
5.56k
#endif
167
5.56k
}
168
169
13
Result<const StorageResource*> RowsetMeta::remote_storage_resource() {
170
13
    if (is_local()) {
171
0
        return ResultError(Status::InternalError(
172
0
                "local rowset has no storage resource. tablet_id={} rowset_id={}", tablet_id(),
173
0
                _rowset_id.to_string()));
174
0
    }
175
176
13
    if (!_storage_resource.fs) {
177
3
        if (auto storage_resource = get_storage_resource(resource_id())) {
178
0
            _storage_resource = std::move(storage_resource->first);
179
3
        } else {
180
3
            if (config::is_cloud_mode()) {
181
                // When creating a new cluster or creating a storage resource, BE may not sync storage resource,
182
                // at the moment a query is coming, the BetaRowsetReader call loadSegment and use this method
183
                // to get the storage resource, so we need to sync storage resource here.
184
0
                ExecEnv::GetInstance()->storage_engine().to_cloud().sync_storage_vault();
185
0
                if (auto retry_resource = get_storage_resource(resource_id())) {
186
0
                    _storage_resource = std::move(retry_resource->first);
187
0
                    return &_storage_resource;
188
0
                }
189
0
            }
190
3
            return ResultError(Status::InternalError("cannot find storage resource. resource_id={}",
191
3
                                                     resource_id()));
192
3
        }
193
3
    }
194
10
    return &_storage_resource;
195
13
}
196
197
13
void RowsetMeta::set_remote_storage_resource(StorageResource resource) {
198
13
    _storage_resource = std::move(resource);
199
13
    _rowset_meta_pb.set_resource_id(_storage_resource.fs->id());
200
13
}
201
202
21.9k
bool RowsetMeta::has_variant_type_in_schema() const {
203
21.9k
    return _schema && _schema->num_variant_columns() > 0;
204
21.9k
}
205
206
21.9k
void RowsetMeta::to_rowset_pb(RowsetMetaPB* rs_meta_pb, bool skip_schema) const {
207
21.9k
    *rs_meta_pb = _rowset_meta_pb;
208
21.9k
    if (_schema) [[likely]] {
209
1.01k
        rs_meta_pb->set_schema_version(_schema->schema_version());
210
1.01k
        if (!skip_schema) {
211
            // For cloud, separate tablet schema from rowset meta to reduce persistent size.
212
1.01k
            _schema->to_schema_pb(rs_meta_pb->mutable_tablet_schema());
213
1.01k
        }
214
1.01k
    }
215
21.9k
    rs_meta_pb->set_has_variant_type_in_schema(has_variant_type_in_schema());
216
21.9k
}
217
218
172
RowsetMetaPB RowsetMeta::get_rowset_pb(bool skip_schema) const {
219
172
    RowsetMetaPB rowset_meta_pb;
220
172
    to_rowset_pb(&rowset_meta_pb, skip_schema);
221
172
    return rowset_meta_pb;
222
172
}
223
224
5.30k
void RowsetMeta::set_tablet_schema(const TabletSchemaSPtr& tablet_schema) {
225
5.30k
    if (_handle) {
226
969
        TabletSchemaCache::instance()->release(_handle);
227
969
    }
228
5.30k
    auto pair = TabletSchemaCache::instance()->insert(tablet_schema->to_key());
229
5.30k
    _handle = pair.first;
230
5.30k
    _schema = pair.second;
231
5.30k
}
232
233
559
void RowsetMeta::set_tablet_schema(const TabletSchemaPB& tablet_schema) {
234
559
    if (_handle) {
235
0
        TabletSchemaCache::instance()->release(_handle);
236
0
    }
237
559
    auto pair = TabletSchemaCache::instance()->insert(
238
559
            TabletSchema::deterministic_string_serialize(tablet_schema));
239
559
    _handle = pair.first;
240
559
    _schema = pair.second;
241
559
}
242
243
9
bool RowsetMeta::_deserialize_from_pb(std::string_view value) {
244
9
    if (!_rowset_meta_pb.ParseFromArray(value.data(), cast_set<int32_t>(value.size()))) {
245
1
        _rowset_meta_pb.Clear();
246
1
        return false;
247
1
    }
248
8
    if (_rowset_meta_pb.has_tablet_schema()) {
249
0
        set_tablet_schema(_rowset_meta_pb.tablet_schema());
250
        // The schema has been materialized into TabletSchemaCache by set_tablet_schema(). Drop the
251
        // protobuf-owned copy from `_rowset_meta_pb` to avoid holding the large schema twice; passing
252
        // nullptr intentionally deletes the current protobuf submessage.
253
0
        _rowset_meta_pb.set_allocated_tablet_schema(nullptr);
254
0
    }
255
8
    return true;
256
9
}
257
258
0
bool RowsetMeta::_serialize_to_pb(std::string* value) {
259
0
    if (value == nullptr) {
260
0
        return false;
261
0
    }
262
0
    RowsetMetaPB rowset_meta_pb = _rowset_meta_pb;
263
0
    if (_schema) {
264
0
        _schema->to_schema_pb(rowset_meta_pb.mutable_tablet_schema());
265
0
    }
266
0
    return rowset_meta_pb.SerializeToString(value);
267
0
}
268
269
14.5k
void RowsetMeta::_init() {
270
14.5k
    if (_rowset_meta_pb.rowset_id() > 0) {
271
3.57k
        _rowset_id.init(_rowset_meta_pb.rowset_id());
272
10.9k
    } else {
273
10.9k
        _rowset_id.init(_rowset_meta_pb.rowset_id_v2());
274
10.9k
    }
275
14.5k
    update_metadata_size();
276
14.5k
}
277
278
0
void RowsetMeta::add_segments_file_size(const std::vector<size_t>& seg_file_size) {
279
0
    _rowset_meta_pb.set_enable_segments_file_size(true);
280
0
    for (auto fsize : seg_file_size) {
281
0
        _rowset_meta_pb.add_segments_file_size(fsize);
282
0
    }
283
0
}
284
285
3.21k
int64_t RowsetMeta::segment_file_size(int seg_id) const {
286
3.21k
    DCHECK(_rowset_meta_pb.segments_file_size().empty() ||
287
0
           _rowset_meta_pb.segments_file_size_size() > seg_id)
288
0
            << _rowset_meta_pb.segments_file_size_size() << ' ' << seg_id;
289
3.21k
    return _rowset_meta_pb.enable_segments_file_size()
290
3.21k
                   ? (_rowset_meta_pb.segments_file_size_size() > seg_id
291
0
                              ? _rowset_meta_pb.segments_file_size(seg_id)
292
0
                              : -1)
293
3.21k
                   : -1;
294
3.21k
}
295
296
void RowsetMeta::set_segments_key_bounds(const std::vector<KeyBoundsPB>& segments_key_bounds,
297
1.09k
                                         bool aggregate_into_single) {
298
1.09k
    _rowset_meta_pb.clear_segments_key_bounds();
299
1.09k
    bool did_aggregate = aggregate_into_single && !segments_key_bounds.empty();
300
1.09k
    if (did_aggregate) {
301
609
        const std::string* overall_min = &segments_key_bounds.front().min_key();
302
609
        const std::string* overall_max = &segments_key_bounds.front().max_key();
303
2.59k
        for (const KeyBoundsPB& key_bounds : segments_key_bounds) {
304
2.59k
            if (key_bounds.min_key() < *overall_min) {
305
4
                overall_min = &key_bounds.min_key();
306
4
            }
307
2.59k
            if (key_bounds.max_key() > *overall_max) {
308
1.39k
                overall_max = &key_bounds.max_key();
309
1.39k
            }
310
2.59k
        }
311
609
        KeyBoundsPB* aggregated = _rowset_meta_pb.add_segments_key_bounds();
312
609
        aggregated->set_min_key(*overall_min);
313
609
        aggregated->set_max_key(*overall_max);
314
609
    } else {
315
481
        for (const KeyBoundsPB& key_bounds : segments_key_bounds) {
316
195
            KeyBoundsPB* new_key_bounds = _rowset_meta_pb.add_segments_key_bounds();
317
195
            *new_key_bounds = key_bounds;
318
195
        }
319
481
    }
320
1.09k
    set_segments_key_bounds_aggregated(did_aggregate);
321
322
1.09k
    int32_t truncation_threshold = config::segments_key_bounds_truncation_threshold;
323
1.09k
    if (config::random_segments_key_bounds_truncation) {
324
0
        std::mt19937 generator(std::random_device {}());
325
0
        std::uniform_int_distribution<int> distribution(-10, 40);
326
0
        truncation_threshold = distribution(generator);
327
0
    }
328
1.09k
    bool really_do_truncation {false};
329
1.09k
    if (truncation_threshold > 0) {
330
799
        for (auto& segment_key_bounds : *_rowset_meta_pb.mutable_segments_key_bounds()) {
331
525
            if (segment_key_bounds.min_key().size() > truncation_threshold) {
332
441
                really_do_truncation = true;
333
441
                segment_key_bounds.mutable_min_key()->resize(truncation_threshold);
334
441
            }
335
525
            if (segment_key_bounds.max_key().size() > truncation_threshold) {
336
436
                really_do_truncation = true;
337
436
                segment_key_bounds.mutable_max_key()->resize(truncation_threshold);
338
436
            }
339
525
        }
340
799
    }
341
1.09k
    set_segments_key_bounds_truncated(really_do_truncation || is_segments_key_bounds_truncated());
342
1.09k
}
343
344
3
void RowsetMeta::merge_rowset_meta(const RowsetMeta& other) {
345
3
    set_num_segments(num_segments() + other.num_segments());
346
3
    set_num_rows(num_rows() + other.num_rows());
347
3
    set_data_disk_size(data_disk_size() + other.data_disk_size());
348
3
    set_total_disk_size(total_disk_size() + other.total_disk_size());
349
3
    set_index_disk_size(index_disk_size() + other.index_disk_size());
350
3
    set_total_disk_size(data_disk_size() + index_disk_size());
351
3
    set_segments_key_bounds_truncated(is_segments_key_bounds_truncated() ||
352
3
                                      other.is_segments_key_bounds_truncated());
353
    // merge_rowset_meta is used in the MOW partial-update publish path, which relies
354
    // on per-segment bounds. Aggregation should never be enabled for MOW rowsets,
355
    // so we do not expect either side to be aggregated here.
356
3
    DCHECK(!is_segments_key_bounds_aggregated() && !other.is_segments_key_bounds_aggregated())
357
0
            << "merge_rowset_meta encountered aggregated key bounds";
358
3
    if (_rowset_meta_pb.num_segment_rows_size() > 0) {
359
2
        if (other.num_segments() > 0) {
360
2
            if (other._rowset_meta_pb.num_segment_rows_size() > 0) {
361
3
                for (auto row_count : other._rowset_meta_pb.num_segment_rows()) {
362
3
                    _rowset_meta_pb.add_num_segment_rows(row_count);
363
3
                }
364
1
            } else {
365
                // This may happen when a partial update load commits in high version doirs_be
366
                // and publishes with new segments in low version doris_be. In this case, just clear
367
                // all num_segment_rows.
368
1
                _rowset_meta_pb.clear_num_segment_rows();
369
1
            }
370
2
        }
371
2
    }
372
3
    for (auto&& key_bound : other.get_segments_key_bounds()) {
373
0
        add_segment_key_bounds(key_bound);
374
0
    }
375
3
    if (_rowset_meta_pb.enable_segments_file_size() &&
376
3
        other._rowset_meta_pb.enable_segments_file_size()) {
377
0
        for (auto fsize : other.segments_file_size()) {
378
0
            _rowset_meta_pb.add_segments_file_size(fsize);
379
0
        }
380
0
    }
381
3
    if (_rowset_meta_pb.enable_inverted_index_file_info() &&
382
3
        other._rowset_meta_pb.enable_inverted_index_file_info()) {
383
0
        for (auto finfo : other.inverted_index_file_info()) {
384
0
            InvertedIndexFileInfo* new_file_info = _rowset_meta_pb.add_inverted_index_file_info();
385
0
            *new_file_info = finfo;
386
0
        }
387
0
    }
388
    // In partial update the rowset schema maybe updated when table contains variant type, so we need the newest schema to be updated
389
    // Otherwise the schema is stale and lead to wrong data read
390
3
    TEST_SYNC_POINT_RETURN_WITH_VOID("RowsetMeta::merge_rowset_meta:skip_schema_merge");
391
0
    if (tablet_schema()->num_variant_columns() > 0) {
392
        // merge extracted columns
393
0
        TabletSchemaSPtr merged_schema;
394
0
        static_cast<void>(variant_util::get_least_common_schema(
395
0
                {tablet_schema(), other.tablet_schema()}, nullptr, merged_schema));
396
0
        if (*_schema != *merged_schema) {
397
0
            set_tablet_schema(merged_schema);
398
0
        }
399
0
    }
400
0
    if (rowset_state() == RowsetStatePB::BEGIN_PARTIAL_UPDATE) {
401
0
        set_rowset_state(RowsetStatePB::COMMITTED);
402
0
    }
403
404
0
    update_metadata_size();
405
0
}
406
407
14.5k
int64_t RowsetMeta::get_metadata_size() const {
408
14.5k
    return sizeof(RowsetMeta) + _rowset_meta_pb.ByteSizeLong();
409
14.5k
}
410
411
4.76k
InvertedIndexFileInfo RowsetMeta::inverted_index_file_info(int seg_id) {
412
4.76k
    return _rowset_meta_pb.enable_inverted_index_file_info()
413
4.76k
                   ? (_rowset_meta_pb.inverted_index_file_info_size() > seg_id
414
2.18k
                              ? _rowset_meta_pb.inverted_index_file_info(seg_id)
415
2.18k
                              : InvertedIndexFileInfo())
416
4.76k
                   : InvertedIndexFileInfo();
417
4.76k
}
418
419
void RowsetMeta::add_inverted_index_files_info(
420
106
        const std::vector<const InvertedIndexFileInfo*>& idx_file_info) {
421
106
    _rowset_meta_pb.set_enable_inverted_index_file_info(true);
422
215
    for (auto finfo : idx_file_info) {
423
215
        auto* new_file_info = _rowset_meta_pb.add_inverted_index_file_info();
424
215
        *new_file_info = *finfo;
425
215
    }
426
106
}
427
428
0
bool operator==(const RowsetMeta& a, const RowsetMeta& b) {
429
0
    if (a._rowset_id != b._rowset_id) return false;
430
0
    if (a._is_removed_from_rowset_meta != b._is_removed_from_rowset_meta) return false;
431
0
    if (!google::protobuf::util::MessageDifferencer::Equals(a._rowset_meta_pb, b._rowset_meta_pb))
432
0
        return false;
433
0
    return true;
434
0
}
435
436
} // namespace doris