Coverage Report

Created: 2026-05-09 12:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset/rowset_meta.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/rowset/rowset_meta.h"
19
20
#include <gen_cpp/olap_file.pb.h>
21
#include <glog/logging.h>
22
23
#include <memory>
24
#include <random>
25
26
#include "cloud/cloud_storage_engine.h"
27
#include "common/logging.h"
28
#include "common/status.h"
29
#include "cpp/sync_point.h"
30
#include "exec/common/variant_util.h"
31
#include "google/protobuf/util/message_differencer.h"
32
#include "io/fs/encrypted_fs_factory.h"
33
#include "io/fs/file_system.h"
34
#include "io/fs/file_writer.h"
35
#include "io/fs/local_file_system.h"
36
#include "io/fs/packed_file_manager.h"
37
#include "io/fs/packed_file_system.h"
38
#include "json2pb/json_to_pb.h"
39
#include "json2pb/pb_to_json.h"
40
#include "runtime/exec_env.h"
41
#include "storage/olap_common.h"
42
#include "storage/storage_policy.h"
43
#include "storage/tablet/base_tablet.h"
44
#include "storage/tablet/tablet_fwd.h"
45
#include "storage/tablet/tablet_schema.h"
46
#include "storage/tablet/tablet_schema_cache.h"
47
#include "util/lru_cache.h"
48
49
namespace doris {
50
51
421k
RowsetMeta::~RowsetMeta() {
52
421k
    if (_handle) {
53
398k
        TabletSchemaCache::instance()->release(_handle);
54
398k
    }
55
421k
}
56
57
15.5k
bool RowsetMeta::init(std::string_view pb_rowset_meta) {
58
15.5k
    bool ret = _deserialize_from_pb(pb_rowset_meta);
59
15.5k
    if (!ret) {
60
1
        return false;
61
1
    }
62
15.5k
    _init();
63
15.5k
    return true;
64
15.5k
}
65
66
20.4k
bool RowsetMeta::init(const RowsetMeta* rowset_meta) {
67
20.4k
    RowsetMetaPB rowset_meta_pb;
68
20.4k
    rowset_meta->to_rowset_pb(&rowset_meta_pb);
69
20.4k
    return init_from_pb(rowset_meta_pb);
70
20.4k
}
71
72
644k
bool RowsetMeta::init_from_pb(const RowsetMetaPB& rowset_meta_pb) {
73
644k
    if (rowset_meta_pb.has_tablet_schema()) {
74
629k
        set_tablet_schema(rowset_meta_pb.tablet_schema());
75
629k
    }
76
    // Release ownership of TabletSchemaPB from `rowset_meta_pb` and then set it back to `rowset_meta_pb`,
77
    // this won't break const semantics of `rowset_meta_pb`, because `rowset_meta_pb` is not changed
78
    // before and after call this method.
79
644k
    auto& mut_rowset_meta_pb = const_cast<RowsetMetaPB&>(rowset_meta_pb);
80
644k
    auto* schema = mut_rowset_meta_pb.release_tablet_schema();
81
644k
    _rowset_meta_pb = mut_rowset_meta_pb;
82
644k
    mut_rowset_meta_pb.set_allocated_tablet_schema(schema);
83
644k
    _init();
84
644k
    return true;
85
644k
}
86
87
61
bool RowsetMeta::init_from_json(const std::string& json_rowset_meta) {
88
61
    bool ret = json2pb::JsonToProtoMessage(json_rowset_meta, &_rowset_meta_pb);
89
61
    if (!ret) {
90
1
        return false;
91
1
    }
92
60
    _init();
93
60
    return true;
94
61
}
95
96
0
bool RowsetMeta::json_rowset_meta(std::string* json_rowset_meta) {
97
0
    json2pb::Pb2JsonOptions json_options;
98
0
    json_options.pretty_json = true;
99
0
    bool ret = json2pb::ProtoMessageToJson(_rowset_meta_pb, json_rowset_meta, json_options);
100
0
    return ret;
101
0
}
102
103
248k
io::FileSystemSPtr RowsetMeta::physical_fs() {
104
248k
    if (is_local()) {
105
18.6k
        return io::global_local_filesystem();
106
18.6k
    }
107
108
229k
    auto storage_resource = remote_storage_resource();
109
229k
    if (storage_resource) {
110
229k
        return storage_resource.value()->fs;
111
18.4E
    } else {
112
18.4E
        LOG(WARNING) << storage_resource.error();
113
18.4E
        return nullptr;
114
18.4E
    }
115
229k
}
116
117
242k
io::FileSystemSPtr RowsetMeta::fs() {
118
242k
    auto fs = physical_fs();
119
120
242k
#ifndef BE_TEST
121
242k
    auto algorithm = _determine_encryption_once.call([this]() -> Result<EncryptionAlgorithmPB> {
122
54.8k
        auto maybe_tablet = ExecEnv::get_tablet(tablet_id());
123
54.8k
        if (!maybe_tablet) {
124
0
            LOG(WARNING) << "get tablet failed: " << maybe_tablet.error();
125
0
            return ResultError(maybe_tablet.error());
126
0
        }
127
54.8k
        auto tablet = maybe_tablet.value();
128
54.8k
        return tablet->tablet_meta()->encryption_algorithm();
129
54.8k
    });
130
242k
    if (!algorithm.has_value()) {
131
        // TODO: return a Result<FileSystemSPtr> in this method?
132
0
        return nullptr;
133
0
    }
134
135
    // Apply packed file system first if enabled and index_map is not empty
136
242k
    io::FileSystemSPtr wrapped = fs;
137
242k
    if (_rowset_meta_pb.packed_slice_locations_size() > 0) {
138
212k
        std::unordered_map<std::string, io::PackedSliceLocation> index_map;
139
236k
        for (const auto& [path, index_pb] : _rowset_meta_pb.packed_slice_locations()) {
140
236k
            io::PackedSliceLocation index;
141
236k
            index.packed_file_path = index_pb.packed_file_path();
142
236k
            index.offset = index_pb.offset();
143
236k
            index.size = index_pb.size();
144
236k
            index.packed_file_size =
145
18.4E
                    index_pb.has_packed_file_size() ? index_pb.packed_file_size() : -1;
146
236k
            index.tablet_id = tablet_id();
147
236k
            index.rowset_id = _rowset_id.to_string();
148
236k
            index.resource_id = wrapped->id();
149
236k
            index_map[path] = index;
150
236k
        }
151
212k
        if (!index_map.empty()) {
152
212k
            io::PackedAppendContext append_info;
153
212k
            append_info.tablet_id = tablet_id();
154
212k
            append_info.rowset_id = _rowset_id.to_string();
155
212k
            append_info.txn_id = txn_id();
156
212k
            wrapped = std::make_shared<io::PackedFileSystem>(wrapped, index_map, append_info);
157
212k
        }
158
212k
    }
159
160
    // Then apply encryption on top
161
242k
    wrapped = io::make_file_system(wrapped, algorithm.value());
162
242k
    return wrapped;
163
#else
164
    return fs;
165
#endif
166
242k
}
167
168
552k
Result<const StorageResource*> RowsetMeta::remote_storage_resource() {
169
552k
    if (is_local()) {
170
0
        return ResultError(Status::InternalError(
171
0
                "local rowset has no storage resource. tablet_id={} rowset_id={}", tablet_id(),
172
0
                _rowset_id.to_string()));
173
0
    }
174
175
552k
    if (!_storage_resource.fs) {
176
21.0k
        if (auto storage_resource = get_storage_resource(resource_id())) {
177
21.0k
            _storage_resource = std::move(storage_resource->first);
178
21.0k
        } else {
179
50
            if (config::is_cloud_mode()) {
180
                // When creating a new cluster or creating a storage resource, BE may not sync storage resource,
181
                // at the moment a query is coming, the BetaRowsetReader call loadSegment and use this method
182
                // to get the storage resource, so we need to sync storage resource here.
183
50
                ExecEnv::GetInstance()->storage_engine().to_cloud().sync_storage_vault();
184
50
                if (auto retry_resource = get_storage_resource(resource_id())) {
185
0
                    _storage_resource = std::move(retry_resource->first);
186
0
                    return &_storage_resource;
187
0
                }
188
50
            }
189
22
            return ResultError(Status::InternalError("cannot find storage resource. resource_id={}",
190
22
                                                     resource_id()));
191
22
        }
192
21.0k
    }
193
552k
    return &_storage_resource;
194
552k
}
195
196
192k
void RowsetMeta::set_remote_storage_resource(StorageResource resource) {
197
192k
    _storage_resource = std::move(resource);
198
192k
    _rowset_meta_pb.set_resource_id(_storage_resource.fs->id());
199
192k
}
200
201
586k
bool RowsetMeta::has_variant_type_in_schema() const {
202
586k
    return _schema && _schema->num_variant_columns() > 0;
203
586k
}
204
205
588k
void RowsetMeta::to_rowset_pb(RowsetMetaPB* rs_meta_pb, bool skip_schema) const {
206
588k
    *rs_meta_pb = _rowset_meta_pb;
207
588k
    if (_schema) [[likely]] {
208
565k
        rs_meta_pb->set_schema_version(_schema->schema_version());
209
565k
        if (!skip_schema) {
210
            // For cloud, separate tablet schema from rowset meta to reduce persistent size.
211
318k
            _schema->to_schema_pb(rs_meta_pb->mutable_tablet_schema());
212
318k
        }
213
565k
    }
214
588k
    rs_meta_pb->set_has_variant_type_in_schema(has_variant_type_in_schema());
215
588k
}
216
217
464k
RowsetMetaPB RowsetMeta::get_rowset_pb(bool skip_schema) const {
218
464k
    RowsetMetaPB rowset_meta_pb;
219
464k
    to_rowset_pb(&rowset_meta_pb, skip_schema);
220
464k
    return rowset_meta_pb;
221
464k
}
222
223
420k
void RowsetMeta::set_tablet_schema(const TabletSchemaSPtr& tablet_schema) {
224
420k
    if (_handle) {
225
209k
        TabletSchemaCache::instance()->release(_handle);
226
209k
    }
227
420k
    auto pair = TabletSchemaCache::instance()->insert(tablet_schema->to_key());
228
420k
    _handle = pair.first;
229
420k
    _schema = pair.second;
230
420k
}
231
232
644k
void RowsetMeta::set_tablet_schema(const TabletSchemaPB& tablet_schema) {
233
644k
    if (_handle) {
234
0
        TabletSchemaCache::instance()->release(_handle);
235
0
    }
236
644k
    auto pair = TabletSchemaCache::instance()->insert(
237
644k
            TabletSchema::deterministic_string_serialize(tablet_schema));
238
644k
    _handle = pair.first;
239
644k
    _schema = pair.second;
240
644k
}
241
242
15.5k
bool RowsetMeta::_deserialize_from_pb(std::string_view value) {
243
15.5k
    if (!_rowset_meta_pb.ParseFromArray(value.data(), cast_set<int32_t>(value.size()))) {
244
1
        _rowset_meta_pb.Clear();
245
1
        return false;
246
1
    }
247
15.5k
    if (_rowset_meta_pb.has_tablet_schema()) {
248
15.4k
        set_tablet_schema(_rowset_meta_pb.tablet_schema());
249
15.4k
        _rowset_meta_pb.set_allocated_tablet_schema(nullptr);
250
15.4k
    }
251
15.5k
    return true;
252
15.5k
}
253
254
32
bool RowsetMeta::_serialize_to_pb(std::string* value) {
255
32
    if (value == nullptr) {
256
0
        return false;
257
0
    }
258
32
    RowsetMetaPB rowset_meta_pb = _rowset_meta_pb;
259
32
    if (_schema) {
260
32
        _schema->to_schema_pb(rowset_meta_pb.mutable_tablet_schema());
261
32
    }
262
32
    return rowset_meta_pb.SerializeToString(value);
263
32
}
264
265
660k
void RowsetMeta::_init() {
266
660k
    if (_rowset_meta_pb.rowset_id() > 0) {
267
3.57k
        _rowset_id.init(_rowset_meta_pb.rowset_id());
268
656k
    } else {
269
656k
        _rowset_id.init(_rowset_meta_pb.rowset_id_v2());
270
656k
    }
271
660k
    update_metadata_size();
272
660k
}
273
274
195k
void RowsetMeta::add_segments_file_size(const std::vector<size_t>& seg_file_size) {
275
195k
    _rowset_meta_pb.set_enable_segments_file_size(true);
276
195k
    for (auto fsize : seg_file_size) {
277
61.5k
        _rowset_meta_pb.add_segments_file_size(fsize);
278
61.5k
    }
279
195k
}
280
281
375k
int64_t RowsetMeta::segment_file_size(int seg_id) const {
282
18.4E
    DCHECK(_rowset_meta_pb.segments_file_size().empty() ||
283
18.4E
           _rowset_meta_pb.segments_file_size_size() > seg_id)
284
18.4E
            << _rowset_meta_pb.segments_file_size_size() << ' ' << seg_id;
285
375k
    return _rowset_meta_pb.enable_segments_file_size()
286
375k
                   ? (_rowset_meta_pb.segments_file_size_size() > seg_id
287
343k
                              ? _rowset_meta_pb.segments_file_size(seg_id)
288
18.4E
                              : -1)
289
375k
                   : -1;
290
375k
}
291
292
void RowsetMeta::set_segments_key_bounds(const std::vector<KeyBoundsPB>& segments_key_bounds,
293
230k
                                         bool aggregate_into_single) {
294
230k
    _rowset_meta_pb.clear_segments_key_bounds();
295
230k
    bool did_aggregate = aggregate_into_single && !segments_key_bounds.empty();
296
230k
    if (did_aggregate) {
297
40.2k
        const std::string* overall_min = &segments_key_bounds.front().min_key();
298
40.2k
        const std::string* overall_max = &segments_key_bounds.front().max_key();
299
42.2k
        for (const KeyBoundsPB& key_bounds : segments_key_bounds) {
300
42.2k
            if (key_bounds.min_key() < *overall_min) {
301
4
                overall_min = &key_bounds.min_key();
302
4
            }
303
42.2k
            if (key_bounds.max_key() > *overall_max) {
304
1.39k
                overall_max = &key_bounds.max_key();
305
1.39k
            }
306
42.2k
        }
307
40.2k
        KeyBoundsPB* aggregated = _rowset_meta_pb.add_segments_key_bounds();
308
40.2k
        aggregated->set_min_key(*overall_min);
309
40.2k
        aggregated->set_max_key(*overall_max);
310
190k
    } else {
311
190k
        for (const KeyBoundsPB& key_bounds : segments_key_bounds) {
312
45.5k
            KeyBoundsPB* new_key_bounds = _rowset_meta_pb.add_segments_key_bounds();
313
45.5k
            *new_key_bounds = key_bounds;
314
45.5k
        }
315
190k
    }
316
230k
    set_segments_key_bounds_aggregated(did_aggregate);
317
318
230k
    int32_t truncation_threshold = config::segments_key_bounds_truncation_threshold;
319
230k
    if (config::random_segments_key_bounds_truncation) {
320
0
        std::mt19937 generator(std::random_device {}());
321
0
        std::uniform_int_distribution<int> distribution(-10, 40);
322
0
        truncation_threshold = distribution(generator);
323
0
    }
324
230k
    bool really_do_truncation {false};
325
230k
    if (truncation_threshold > 0) {
326
229k
        for (auto& segment_key_bounds : *_rowset_meta_pb.mutable_segments_key_bounds()) {
327
85.3k
            if (segment_key_bounds.min_key().size() > truncation_threshold) {
328
82.3k
                really_do_truncation = true;
329
82.3k
                segment_key_bounds.mutable_min_key()->resize(truncation_threshold);
330
82.3k
            }
331
85.3k
            if (segment_key_bounds.max_key().size() > truncation_threshold) {
332
83.1k
                really_do_truncation = true;
333
83.1k
                segment_key_bounds.mutable_max_key()->resize(truncation_threshold);
334
83.1k
            }
335
85.3k
        }
336
229k
    }
337
230k
    set_segments_key_bounds_truncated(really_do_truncation || is_segments_key_bounds_truncated());
338
230k
}
339
340
3.46k
void RowsetMeta::merge_rowset_meta(const RowsetMeta& other) {
341
3.46k
    set_num_segments(num_segments() + other.num_segments());
342
3.46k
    set_num_rows(num_rows() + other.num_rows());
343
3.46k
    set_data_disk_size(data_disk_size() + other.data_disk_size());
344
3.46k
    set_total_disk_size(total_disk_size() + other.total_disk_size());
345
3.46k
    set_index_disk_size(index_disk_size() + other.index_disk_size());
346
3.46k
    set_total_disk_size(data_disk_size() + index_disk_size());
347
3.46k
    set_segments_key_bounds_truncated(is_segments_key_bounds_truncated() ||
348
3.46k
                                      other.is_segments_key_bounds_truncated());
349
    // merge_rowset_meta is used in the MOW partial-update publish path, which relies
350
    // on per-segment bounds. Aggregation should never be enabled for MOW rowsets,
351
    // so we do not expect either side to be aggregated here.
352
18.4E
    DCHECK(!is_segments_key_bounds_aggregated() && !other.is_segments_key_bounds_aggregated())
353
18.4E
            << "merge_rowset_meta encountered aggregated key bounds";
354
3.46k
    if (_rowset_meta_pb.num_segment_rows_size() > 0) {
355
1.52k
        if (other.num_segments() > 0) {
356
19
            if (other._rowset_meta_pb.num_segment_rows_size() > 0) {
357
19
                for (auto row_count : other._rowset_meta_pb.num_segment_rows()) {
358
19
                    _rowset_meta_pb.add_num_segment_rows(row_count);
359
19
                }
360
19
            } else {
361
                // This may happen when a partial update load commits in high version doirs_be
362
                // and publishes with new segments in low version doris_be. In this case, just clear
363
                // all num_segment_rows.
364
0
                _rowset_meta_pb.clear_num_segment_rows();
365
0
            }
366
19
        }
367
1.52k
    }
368
3.46k
    for (auto&& key_bound : other.get_segments_key_bounds()) {
369
19
        add_segment_key_bounds(key_bound);
370
19
    }
371
3.46k
    if (_rowset_meta_pb.enable_segments_file_size() &&
372
3.46k
        other._rowset_meta_pb.enable_segments_file_size()) {
373
3.37k
        for (auto fsize : other.segments_file_size()) {
374
19
            _rowset_meta_pb.add_segments_file_size(fsize);
375
19
        }
376
3.37k
    }
377
3.46k
    if (_rowset_meta_pb.enable_inverted_index_file_info() &&
378
3.46k
        other._rowset_meta_pb.enable_inverted_index_file_info()) {
379
192
        for (auto finfo : other.inverted_index_file_info()) {
380
0
            InvertedIndexFileInfo* new_file_info = _rowset_meta_pb.add_inverted_index_file_info();
381
0
            *new_file_info = finfo;
382
0
        }
383
192
    }
384
    // In partial update the rowset schema maybe updated when table contains variant type, so we need the newest schema to be updated
385
    // Otherwise the schema is stale and lead to wrong data read
386
3.46k
    TEST_SYNC_POINT_RETURN_WITH_VOID("RowsetMeta::merge_rowset_meta:skip_schema_merge");
387
3.46k
    if (tablet_schema()->num_variant_columns() > 0) {
388
        // merge extracted columns
389
109
        TabletSchemaSPtr merged_schema;
390
109
        static_cast<void>(variant_util::get_least_common_schema(
391
109
                {tablet_schema(), other.tablet_schema()}, nullptr, merged_schema));
392
109
        if (*_schema != *merged_schema) {
393
0
            set_tablet_schema(merged_schema);
394
0
        }
395
109
    }
396
3.46k
    if (rowset_state() == RowsetStatePB::BEGIN_PARTIAL_UPDATE) {
397
3.30k
        set_rowset_state(RowsetStatePB::COMMITTED);
398
3.30k
    }
399
400
3.46k
    update_metadata_size();
401
3.46k
}
402
403
663k
int64_t RowsetMeta::get_metadata_size() const {
404
663k
    return sizeof(RowsetMeta) + _rowset_meta_pb.ByteSizeLong();
405
663k
}
406
407
187k
InvertedIndexFileInfo RowsetMeta::inverted_index_file_info(int seg_id) {
408
187k
    return _rowset_meta_pb.enable_inverted_index_file_info()
409
187k
                   ? (_rowset_meta_pb.inverted_index_file_info_size() > seg_id
410
23.7k
                              ? _rowset_meta_pb.inverted_index_file_info(seg_id)
411
18.4E
                              : InvertedIndexFileInfo())
412
187k
                   : InvertedIndexFileInfo();
413
187k
}
414
415
void RowsetMeta::add_inverted_index_files_info(
416
21.7k
        const std::vector<const InvertedIndexFileInfo*>& idx_file_info) {
417
21.7k
    _rowset_meta_pb.set_enable_inverted_index_file_info(true);
418
21.7k
    for (auto finfo : idx_file_info) {
419
6.10k
        auto* new_file_info = _rowset_meta_pb.add_inverted_index_file_info();
420
6.10k
        *new_file_info = *finfo;
421
6.10k
    }
422
21.7k
}
423
424
0
bool operator==(const RowsetMeta& a, const RowsetMeta& b) {
425
0
    if (a._rowset_id != b._rowset_id) return false;
426
0
    if (a._is_removed_from_rowset_meta != b._is_removed_from_rowset_meta) return false;
427
0
    if (!google::protobuf::util::MessageDifferencer::Equals(a._rowset_meta_pb, b._rowset_meta_pb))
428
0
        return false;
429
0
    return true;
430
0
}
431
432
} // namespace doris