Coverage Report

Created: 2024-11-20 12:56

/root/doris/be/src/olap/tablet_meta.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "olap/tablet_meta.h"
19
20
#include <gen_cpp/Descriptors_types.h>
21
#include <gen_cpp/Types_types.h>
22
#include <gen_cpp/olap_common.pb.h>
23
#include <gen_cpp/olap_file.pb.h>
24
#include <gen_cpp/segment_v2.pb.h>
25
#include <gen_cpp/types.pb.h>
26
#include <json2pb/pb_to_json.h>
27
#include <time.h>
28
29
#include <cstdint>
30
#include <set>
31
#include <utility>
32
33
#include "common/config.h"
34
#include "gutil/integral_types.h"
35
#include "io/fs/file_reader_writer_fwd.h"
36
#include "io/fs/file_writer.h"
37
#include "olap/data_dir.h"
38
#include "olap/file_header.h"
39
#include "olap/olap_common.h"
40
#include "olap/olap_define.h"
41
#include "olap/tablet_meta_manager.h"
42
#include "olap/utils.h"
43
#include "util/debug_points.h"
44
#include "util/mem_info.h"
45
#include "util/parse_util.h"
46
#include "util/string_util.h"
47
#include "util/time.h"
48
#include "util/uid_util.h"
49
50
using std::string;
51
using std::unordered_map;
52
using std::vector;
53
54
namespace doris {
55
using namespace ErrorCode;
56
57
Status TabletMeta::create(const TCreateTabletReq& request, const TabletUid& tablet_uid,
58
                          uint64_t shard_id, uint32_t next_unique_id,
59
                          const unordered_map<uint32_t, uint32_t>& col_ordinal_to_unique_id,
60
24
                          TabletMetaSharedPtr* tablet_meta) {
61
24
    std::optional<TBinlogConfig> binlog_config;
62
24
    if (request.__isset.binlog_config) {
63
0
        binlog_config = request.binlog_config;
64
0
    }
65
24
    *tablet_meta = std::make_shared<TabletMeta>(
66
24
            request.table_id, request.partition_id, request.tablet_id, request.replica_id,
67
24
            request.tablet_schema.schema_hash, shard_id, request.tablet_schema, next_unique_id,
68
24
            col_ordinal_to_unique_id, tablet_uid,
69
24
            request.__isset.tablet_type ? request.tablet_type : TTabletType::TABLET_TYPE_DISK,
70
24
            request.compression_type, request.storage_policy_id,
71
24
            request.__isset.enable_unique_key_merge_on_write
72
24
                    ? request.enable_unique_key_merge_on_write
73
24
                    : false,
74
24
            std::move(binlog_config), request.compaction_policy,
75
24
            request.time_series_compaction_goal_size_mbytes,
76
24
            request.time_series_compaction_file_count_threshold,
77
24
            request.time_series_compaction_time_threshold_seconds,
78
24
            request.time_series_compaction_empty_rowsets_threshold,
79
24
            request.time_series_compaction_level_threshold);
80
24
    return Status::OK();
81
24
}
82
83
TabletMeta::TabletMeta()
84
        : _tablet_uid(0, 0),
85
          _schema(new TabletSchema),
86
35
          _delete_bitmap(new DeleteBitmap(_tablet_id)) {}
87
88
TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id,
89
                       int64_t replica_id, int32_t schema_hash, uint64_t shard_id,
90
                       const TTabletSchema& tablet_schema, uint32_t next_unique_id,
91
                       const std::unordered_map<uint32_t, uint32_t>& col_ordinal_to_unique_id,
92
                       TabletUid tablet_uid, TTabletType::type tabletType,
93
                       TCompressionType::type compression_type, int64_t storage_policy_id,
94
                       bool enable_unique_key_merge_on_write,
95
                       std::optional<TBinlogConfig> binlog_config, std::string compaction_policy,
96
                       int64_t time_series_compaction_goal_size_mbytes,
97
                       int64_t time_series_compaction_file_count_threshold,
98
                       int64_t time_series_compaction_time_threshold_seconds,
99
                       int64_t time_series_compaction_empty_rowsets_threshold,
100
                       int64_t time_series_compaction_level_threshold)
101
        : _tablet_uid(0, 0),
102
          _schema(new TabletSchema),
103
156
          _delete_bitmap(new DeleteBitmap(tablet_id)) {
104
156
    TabletMetaPB tablet_meta_pb;
105
156
    tablet_meta_pb.set_table_id(table_id);
106
156
    tablet_meta_pb.set_partition_id(partition_id);
107
156
    tablet_meta_pb.set_tablet_id(tablet_id);
108
156
    tablet_meta_pb.set_replica_id(replica_id);
109
156
    tablet_meta_pb.set_schema_hash(schema_hash);
110
156
    tablet_meta_pb.set_shard_id(shard_id);
111
    // Persist the creation time, but it is not used
112
156
    tablet_meta_pb.set_creation_time(time(nullptr));
113
156
    tablet_meta_pb.set_cumulative_layer_point(-1);
114
156
    tablet_meta_pb.set_tablet_state(PB_RUNNING);
115
156
    *(tablet_meta_pb.mutable_tablet_uid()) = tablet_uid.to_proto();
116
156
    tablet_meta_pb.set_tablet_type(tabletType == TTabletType::TABLET_TYPE_DISK
117
156
                                           ? TabletTypePB::TABLET_TYPE_DISK
118
156
                                           : TabletTypePB::TABLET_TYPE_MEMORY);
119
156
    tablet_meta_pb.set_enable_unique_key_merge_on_write(enable_unique_key_merge_on_write);
120
156
    tablet_meta_pb.set_storage_policy_id(storage_policy_id);
121
156
    tablet_meta_pb.set_compaction_policy(compaction_policy);
122
156
    tablet_meta_pb.set_time_series_compaction_goal_size_mbytes(
123
156
            time_series_compaction_goal_size_mbytes);
124
156
    tablet_meta_pb.set_time_series_compaction_file_count_threshold(
125
156
            time_series_compaction_file_count_threshold);
126
156
    tablet_meta_pb.set_time_series_compaction_time_threshold_seconds(
127
156
            time_series_compaction_time_threshold_seconds);
128
156
    tablet_meta_pb.set_time_series_compaction_empty_rowsets_threshold(
129
156
            time_series_compaction_empty_rowsets_threshold);
130
156
    tablet_meta_pb.set_time_series_compaction_level_threshold(
131
156
            time_series_compaction_level_threshold);
132
156
    TabletSchemaPB* schema = tablet_meta_pb.mutable_schema();
133
156
    schema->set_num_short_key_columns(tablet_schema.short_key_column_count);
134
156
    schema->set_num_rows_per_row_block(config::default_num_rows_per_column_file_block);
135
156
    schema->set_sequence_col_idx(tablet_schema.sequence_col_idx);
136
156
    switch (tablet_schema.keys_type) {
137
30
    case TKeysType::DUP_KEYS:
138
30
        schema->set_keys_type(KeysType::DUP_KEYS);
139
30
        break;
140
71
    case TKeysType::UNIQUE_KEYS:
141
71
        schema->set_keys_type(KeysType::UNIQUE_KEYS);
142
71
        break;
143
18
    case TKeysType::AGG_KEYS:
144
18
        schema->set_keys_type(KeysType::AGG_KEYS);
145
18
        break;
146
37
    default:
147
37
        LOG(WARNING) << "unknown tablet keys type";
148
37
        break;
149
156
    }
150
    // compress_kind used to compress segment files
151
156
    schema->set_compress_kind(COMPRESS_LZ4);
152
153
    // compression_type used to compress segment page
154
156
    switch (compression_type) {
155
0
    case TCompressionType::NO_COMPRESSION:
156
0
        schema->set_compression_type(segment_v2::NO_COMPRESSION);
157
0
        break;
158
0
    case TCompressionType::SNAPPY:
159
0
        schema->set_compression_type(segment_v2::SNAPPY);
160
0
        break;
161
0
    case TCompressionType::LZ4:
162
0
        schema->set_compression_type(segment_v2::LZ4);
163
0
        break;
164
156
    case TCompressionType::LZ4F:
165
156
        schema->set_compression_type(segment_v2::LZ4F);
166
156
        break;
167
0
    case TCompressionType::ZLIB:
168
0
        schema->set_compression_type(segment_v2::ZLIB);
169
0
        break;
170
0
    case TCompressionType::ZSTD:
171
0
        schema->set_compression_type(segment_v2::ZSTD);
172
0
        break;
173
0
    default:
174
0
        schema->set_compression_type(segment_v2::LZ4F);
175
0
        break;
176
156
    }
177
178
156
    switch (tablet_schema.sort_type) {
179
0
    case TSortType::type::ZORDER:
180
0
        schema->set_sort_type(SortType::ZORDER);
181
0
        break;
182
156
    default:
183
156
        schema->set_sort_type(SortType::LEXICAL);
184
156
    }
185
156
    schema->set_sort_col_num(tablet_schema.sort_col_num);
186
156
    tablet_meta_pb.set_in_restore_mode(false);
187
188
    // set column information
189
156
    uint32_t col_ordinal = 0;
190
156
    bool has_bf_columns = false;
191
483
    for (TColumn tcolumn : tablet_schema.columns) {
192
483
        ColumnPB* column = schema->add_column();
193
483
        uint32_t unique_id = -1;
194
483
        if (tcolumn.col_unique_id >= 0) {
195
0
            unique_id = tcolumn.col_unique_id;
196
483
        } else {
197
483
            unique_id = col_ordinal_to_unique_id.at(col_ordinal);
198
483
        }
199
483
        col_ordinal++;
200
483
        init_column_from_tcolumn(unique_id, tcolumn, column);
201
202
483
        if (column->is_bf_column()) {
203
0
            has_bf_columns = true;
204
0
        }
205
206
483
        if (tablet_schema.__isset.indexes) {
207
0
            for (auto& index : tablet_schema.indexes) {
208
0
                if (index.index_type == TIndexType::type::BITMAP) {
209
0
                    DCHECK_EQ(index.columns.size(), 1);
210
0
                    if (iequal(tcolumn.column_name, index.columns[0])) {
211
0
                        column->set_has_bitmap_index(true);
212
0
                        break;
213
0
                    }
214
0
                } else if (index.index_type == TIndexType::type::BLOOMFILTER ||
215
0
                           index.index_type == TIndexType::type::NGRAM_BF) {
216
0
                    DCHECK_EQ(index.columns.size(), 1);
217
0
                    if (iequal(tcolumn.column_name, index.columns[0])) {
218
0
                        column->set_is_bf_column(true);
219
0
                        break;
220
0
                    }
221
0
                }
222
0
            }
223
0
        }
224
483
    }
225
226
    // copy index meta
227
156
    if (tablet_schema.__isset.indexes) {
228
0
        for (auto& index : tablet_schema.indexes) {
229
0
            TabletIndexPB* index_pb = schema->add_index();
230
0
            index_pb->set_index_id(index.index_id);
231
0
            index_pb->set_index_name(index.index_name);
232
            // init col_unique_id in index at be side, since col_unique_id may be -1 at fe side
233
            // get column unique id by name
234
0
            for (auto column_name : index.columns) {
235
0
                for (auto column : schema->column()) {
236
0
                    if (iequal(column.name(), column_name)) {
237
0
                        index_pb->add_col_unique_id(column.unique_id());
238
0
                    }
239
0
                }
240
0
            }
241
0
            switch (index.index_type) {
242
0
            case TIndexType::BITMAP:
243
0
                index_pb->set_index_type(IndexType::BITMAP);
244
0
                break;
245
0
            case TIndexType::INVERTED:
246
0
                index_pb->set_index_type(IndexType::INVERTED);
247
0
                break;
248
0
            case TIndexType::BLOOMFILTER:
249
0
                index_pb->set_index_type(IndexType::BLOOMFILTER);
250
0
                break;
251
0
            case TIndexType::NGRAM_BF:
252
0
                index_pb->set_index_type(IndexType::NGRAM_BF);
253
0
                break;
254
0
            }
255
256
0
            if (index.__isset.properties) {
257
0
                auto properties = index_pb->mutable_properties();
258
0
                for (auto kv : index.properties) {
259
0
                    (*properties)[kv.first] = kv.second;
260
0
                }
261
0
            }
262
0
        }
263
0
    }
264
265
156
    schema->set_next_column_unique_id(next_unique_id);
266
156
    if (has_bf_columns && tablet_schema.__isset.bloom_filter_fpp) {
267
0
        schema->set_bf_fpp(tablet_schema.bloom_filter_fpp);
268
0
    }
269
270
156
    if (tablet_schema.__isset.is_in_memory) {
271
0
        schema->set_is_in_memory(tablet_schema.is_in_memory);
272
0
    }
273
274
156
    if (tablet_schema.__isset.disable_auto_compaction) {
275
0
        schema->set_disable_auto_compaction(tablet_schema.disable_auto_compaction);
276
0
    }
277
278
156
    if (tablet_schema.__isset.enable_single_replica_compaction) {
279
156
        schema->set_enable_single_replica_compaction(
280
156
                tablet_schema.enable_single_replica_compaction);
281
156
    }
282
283
156
    if (tablet_schema.__isset.is_dynamic_schema) {
284
156
        schema->set_is_dynamic_schema(tablet_schema.is_dynamic_schema);
285
156
    }
286
287
156
    if (tablet_schema.__isset.delete_sign_idx) {
288
156
        schema->set_delete_sign_idx(tablet_schema.delete_sign_idx);
289
156
    }
290
156
    if (tablet_schema.__isset.store_row_column) {
291
156
        schema->set_store_row_column(tablet_schema.store_row_column);
292
156
    }
293
156
    if (tablet_schema.__isset.skip_write_index_on_load) {
294
156
        schema->set_skip_write_index_on_load(tablet_schema.skip_write_index_on_load);
295
156
    }
296
156
    if (binlog_config.has_value()) {
297
0
        BinlogConfig tmp_binlog_config;
298
0
        tmp_binlog_config = binlog_config.value();
299
0
        tmp_binlog_config.to_pb(tablet_meta_pb.mutable_binlog_config());
300
0
    }
301
302
156
    init_from_pb(tablet_meta_pb);
303
156
    LOG(INFO) << "init tablet meta from pb: " << tablet_meta_pb.ShortDebugString();
304
156
}
305
306
TabletMeta::TabletMeta(const TabletMeta& b)
307
        : _table_id(b._table_id),
308
          _partition_id(b._partition_id),
309
          _tablet_id(b._tablet_id),
310
          _replica_id(b._replica_id),
311
          _schema_hash(b._schema_hash),
312
          _shard_id(b._shard_id),
313
          _creation_time(b._creation_time),
314
          _cumulative_layer_point(b._cumulative_layer_point),
315
          _tablet_uid(b._tablet_uid),
316
          _tablet_type(b._tablet_type),
317
          _tablet_state(b._tablet_state),
318
          _schema(b._schema),
319
          _rs_metas(b._rs_metas),
320
          _stale_rs_metas(b._stale_rs_metas),
321
          _in_restore_mode(b._in_restore_mode),
322
          _preferred_rowset_type(b._preferred_rowset_type),
323
          _storage_policy_id(b._storage_policy_id),
324
          _cooldown_meta_id(b._cooldown_meta_id),
325
          _enable_unique_key_merge_on_write(b._enable_unique_key_merge_on_write),
326
          _delete_bitmap(b._delete_bitmap),
327
          _binlog_config(b._binlog_config),
328
          _compaction_policy(b._compaction_policy),
329
          _time_series_compaction_goal_size_mbytes(b._time_series_compaction_goal_size_mbytes),
330
          _time_series_compaction_file_count_threshold(
331
                  b._time_series_compaction_file_count_threshold),
332
          _time_series_compaction_time_threshold_seconds(
333
                  b._time_series_compaction_time_threshold_seconds),
334
          _time_series_compaction_empty_rowsets_threshold(
335
                  b._time_series_compaction_empty_rowsets_threshold),
336
0
          _time_series_compaction_level_threshold(b._time_series_compaction_level_threshold) {};
337
338
void TabletMeta::init_column_from_tcolumn(uint32_t unique_id, const TColumn& tcolumn,
339
483
                                          ColumnPB* column) {
340
483
    column->set_unique_id(unique_id);
341
483
    column->set_name(tcolumn.column_name);
342
483
    column->set_has_bitmap_index(tcolumn.has_bitmap_index);
343
483
    string data_type;
344
483
    EnumToString(TPrimitiveType, tcolumn.column_type.type, data_type);
345
483
    column->set_type(data_type);
346
347
483
    uint32_t length = TabletColumn::get_field_length_by_type(tcolumn.column_type.type,
348
483
                                                             tcolumn.column_type.len);
349
483
    column->set_length(length);
350
483
    column->set_index_length(length);
351
483
    column->set_precision(tcolumn.column_type.precision);
352
483
    column->set_frac(tcolumn.column_type.scale);
353
354
483
    if (tcolumn.__isset.result_is_nullable) {
355
0
        column->set_result_is_nullable(tcolumn.result_is_nullable);
356
0
    }
357
358
483
    if (tcolumn.column_type.type == TPrimitiveType::VARCHAR ||
359
483
        tcolumn.column_type.type == TPrimitiveType::STRING) {
360
17
        if (!tcolumn.column_type.__isset.index_len) {
361
17
            column->set_index_length(10);
362
17
        } else {
363
0
            column->set_index_length(tcolumn.column_type.index_len);
364
0
        }
365
17
    }
366
483
    if (!tcolumn.is_key) {
367
211
        column->set_is_key(false);
368
211
        if (tcolumn.__isset.aggregation) {
369
0
            column->set_aggregation(tcolumn.aggregation);
370
211
        } else {
371
211
            string aggregation_type;
372
211
            EnumToString(TAggregationType, tcolumn.aggregation_type, aggregation_type);
373
211
            column->set_aggregation(aggregation_type);
374
211
        }
375
272
    } else {
376
272
        column->set_is_key(true);
377
272
        column->set_aggregation("NONE");
378
272
    }
379
483
    column->set_is_nullable(tcolumn.is_allow_null);
380
483
    if (tcolumn.__isset.default_value) {
381
0
        column->set_default_value(tcolumn.default_value);
382
0
    }
383
483
    if (tcolumn.__isset.is_bloom_filter_column) {
384
0
        column->set_is_bf_column(tcolumn.is_bloom_filter_column);
385
0
    }
386
483
    for (size_t i = 0; i < tcolumn.children_column.size(); i++) {
387
0
        ColumnPB* children_column = column->add_children_columns();
388
0
        init_column_from_tcolumn(i, tcolumn.children_column[i], children_column);
389
0
    }
390
483
}
391
392
5
Status TabletMeta::create_from_file(const string& file_path) {
393
5
    FileHeader<TabletMetaPB> file_header(file_path);
394
    // In file_header.deserialize(), it validates file length, signature, checksum of protobuf.
395
5
    RETURN_IF_ERROR(file_header.deserialize());
396
5
    TabletMetaPB tablet_meta_pb;
397
5
    try {
398
5
        tablet_meta_pb.CopyFrom(file_header.message());
399
5
    } catch (...) {
400
0
        return Status::Error<PARSE_PROTOBUF_ERROR>("fail to copy protocol buffer object. file={}",
401
0
                                                   file_path);
402
0
    }
403
404
5
    init_from_pb(tablet_meta_pb);
405
5
    return Status::OK();
406
5
}
407
408
std::string TabletMeta::construct_header_file_path(const string& schema_hash_path,
409
2
                                                   int64_t tablet_id) {
410
2
    std::stringstream header_name_stream;
411
2
    header_name_stream << schema_hash_path << "/" << tablet_id << ".hdr";
412
2
    return header_name_stream.str();
413
2
}
414
415
0
Status TabletMeta::save_as_json(const string& file_path, DataDir* dir) {
416
0
    std::string json_meta;
417
0
    json2pb::Pb2JsonOptions json_options;
418
0
    json_options.pretty_json = true;
419
0
    json_options.bytes_to_base64 = true;
420
0
    to_json(&json_meta, json_options);
421
    // save to file
422
0
    io::FileWriterPtr file_writer;
423
0
    RETURN_IF_ERROR(dir->fs()->create_file(file_path, &file_writer));
424
0
    RETURN_IF_ERROR(file_writer->append(json_meta));
425
0
    RETURN_IF_ERROR(file_writer->close());
426
0
    return Status::OK();
427
0
}
428
429
6
Status TabletMeta::save(const string& file_path) {
430
6
    TabletMetaPB tablet_meta_pb;
431
6
    to_meta_pb(&tablet_meta_pb);
432
6
    return TabletMeta::save(file_path, tablet_meta_pb);
433
6
}
434
435
8
Status TabletMeta::save(const string& file_path, const TabletMetaPB& tablet_meta_pb) {
436
8
    DCHECK(!file_path.empty());
437
8
    FileHeader<TabletMetaPB> file_header(file_path);
438
8
    try {
439
8
        file_header.mutable_message()->CopyFrom(tablet_meta_pb);
440
8
    } catch (...) {
441
0
        LOG(WARNING) << "fail to copy protocol buffer object. file='" << file_path;
442
0
        return Status::Error<ErrorCode::INTERNAL_ERROR>(
443
0
                "fail to copy protocol buffer object. file={}", file_path);
444
0
    }
445
8
    RETURN_IF_ERROR(file_header.prepare());
446
8
    RETURN_IF_ERROR(file_header.serialize());
447
8
    return Status::OK();
448
8
}
449
450
53
Status TabletMeta::save_meta(DataDir* data_dir) {
451
53
    std::lock_guard<std::shared_mutex> wrlock(_meta_lock);
452
53
    return _save_meta(data_dir);
453
53
}
454
455
53
Status TabletMeta::_save_meta(DataDir* data_dir) {
456
    // check if tablet uid is valid
457
53
    if (_tablet_uid.hi == 0 && _tablet_uid.lo == 0) {
458
0
        LOG(FATAL) << "tablet_uid is invalid"
459
0
                   << " tablet=" << full_name() << " _tablet_uid=" << _tablet_uid.to_string();
460
0
    }
461
53
    string meta_binary;
462
463
53
    auto t1 = MonotonicMicros();
464
53
    RETURN_IF_ERROR(serialize(&meta_binary));
465
53
    auto t2 = MonotonicMicros();
466
53
    Status status = TabletMetaManager::save(data_dir, tablet_id(), schema_hash(), meta_binary);
467
53
    if (!status.ok()) {
468
0
        LOG(FATAL) << "fail to save tablet_meta. status=" << status << ", tablet_id=" << tablet_id()
469
0
                   << ", schema_hash=" << schema_hash();
470
0
    }
471
53
    auto t3 = MonotonicMicros();
472
53
    auto cost = t3 - t1;
473
53
    if (cost > 1 * 1000 * 1000) {
474
0
        LOG(INFO) << "save tablet(" << full_name() << ") meta too slow. serialize cost " << t2 - t1
475
0
                  << "(us), serialized binary size: " << meta_binary.length()
476
0
                  << "(bytes), write rocksdb cost " << t3 - t2 << "(us)";
477
0
    }
478
53
    return status;
479
53
}
480
481
56
Status TabletMeta::serialize(string* meta_binary) {
482
56
    TabletMetaPB tablet_meta_pb;
483
56
    to_meta_pb(&tablet_meta_pb);
484
56
    if (tablet_meta_pb.partition_id() <= 0) {
485
2
        LOG(WARNING) << "invalid partition id " << tablet_meta_pb.partition_id() << " tablet "
486
2
                     << tablet_meta_pb.tablet_id();
487
2
    }
488
56
    DBUG_EXECUTE_IF("TabletMeta::serialize::zero_partition_id", {
489
56
        long partition_id = tablet_meta_pb.partition_id();
490
56
        tablet_meta_pb.set_partition_id(0);
491
56
        LOG(WARNING) << "set debug point TabletMeta::serialize::zero_partition_id old="
492
56
                     << partition_id << " new=" << tablet_meta_pb.DebugString();
493
56
    });
494
56
    bool serialize_success = tablet_meta_pb.SerializeToString(meta_binary);
495
56
    if (!_rs_metas.empty() || !_stale_rs_metas.empty()) {
496
56
        _avg_rs_meta_serialize_size =
497
56
                meta_binary->length() / (_rs_metas.size() + _stale_rs_metas.size());
498
56
        if (meta_binary->length() > config::tablet_meta_serialize_size_limit ||
499
56
            !serialize_success) {
500
0
            int64_t origin_meta_size = meta_binary->length();
501
0
            int64_t stale_rowsets_num = tablet_meta_pb.stale_rs_metas().size();
502
0
            tablet_meta_pb.clear_stale_rs_metas();
503
0
            meta_binary->clear();
504
0
            serialize_success = tablet_meta_pb.SerializeToString(meta_binary);
505
0
            LOG(WARNING) << "tablet meta serialization size exceeds limit: "
506
0
                         << config::tablet_meta_serialize_size_limit
507
0
                         << " clean up stale rowsets, tablet id: " << tablet_id()
508
0
                         << " stale rowset num: " << stale_rowsets_num
509
0
                         << " serialization size before clean " << origin_meta_size
510
0
                         << " serialization size after clean " << meta_binary->length();
511
0
        }
512
56
    }
513
514
56
    if (!serialize_success) {
515
0
        LOG(FATAL) << "failed to serialize meta " << full_name();
516
0
    }
517
56
    return Status::OK();
518
56
}
519
520
12
Status TabletMeta::deserialize(const string& meta_binary) {
521
12
    TabletMetaPB tablet_meta_pb;
522
12
    bool parsed = tablet_meta_pb.ParseFromString(meta_binary);
523
12
    if (!parsed) {
524
0
        return Status::Error<INIT_FAILED>("parse tablet meta failed");
525
0
    }
526
12
    init_from_pb(tablet_meta_pb);
527
12
    return Status::OK();
528
12
}
529
530
2
void TabletMeta::init_rs_metas_fs(const io::FileSystemSPtr& fs) {
531
4
    for (auto& rs_meta : _rs_metas) {
532
4
        if (rs_meta->is_local()) {
533
4
            rs_meta->set_fs(fs);
534
4
        }
535
4
    }
536
2
    for (auto& rs_meta : _stale_rs_metas) {
537
0
        if (rs_meta->is_local()) {
538
0
            rs_meta->set_fs(fs);
539
0
        }
540
0
    }
541
2
}
542
543
175
void TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) {
544
175
    _table_id = tablet_meta_pb.table_id();
545
175
    _partition_id = tablet_meta_pb.partition_id();
546
175
    _tablet_id = tablet_meta_pb.tablet_id();
547
175
    _replica_id = tablet_meta_pb.replica_id();
548
175
    _schema_hash = tablet_meta_pb.schema_hash();
549
175
    _shard_id = tablet_meta_pb.shard_id();
550
175
    _creation_time = tablet_meta_pb.creation_time();
551
175
    _cumulative_layer_point = tablet_meta_pb.cumulative_layer_point();
552
175
    _tablet_uid = TabletUid(tablet_meta_pb.tablet_uid());
553
175
    if (tablet_meta_pb.has_tablet_type()) {
554
175
        _tablet_type = tablet_meta_pb.tablet_type();
555
175
    } else {
556
0
        _tablet_type = TabletTypePB::TABLET_TYPE_DISK;
557
0
    }
558
559
    // init _tablet_state
560
175
    switch (tablet_meta_pb.tablet_state()) {
561
3
    case PB_NOTREADY:
562
3
        _tablet_state = TabletState::TABLET_NOTREADY;
563
3
        break;
564
167
    case PB_RUNNING:
565
167
        _tablet_state = TabletState::TABLET_RUNNING;
566
167
        break;
567
0
    case PB_TOMBSTONED:
568
0
        _tablet_state = TabletState::TABLET_TOMBSTONED;
569
0
        break;
570
0
    case PB_STOPPED:
571
0
        _tablet_state = TabletState::TABLET_STOPPED;
572
0
        break;
573
5
    case PB_SHUTDOWN:
574
5
        _tablet_state = TabletState::TABLET_SHUTDOWN;
575
5
        break;
576
0
    default:
577
0
        LOG(WARNING) << "tablet has no state. tablet=" << tablet_id()
578
0
                     << ", schema_hash=" << schema_hash();
579
175
    }
580
581
    // init _schema
582
175
    _schema->init_from_pb(tablet_meta_pb.schema());
583
584
175
    if (tablet_meta_pb.has_enable_unique_key_merge_on_write()) {
585
175
        _enable_unique_key_merge_on_write = tablet_meta_pb.enable_unique_key_merge_on_write();
586
175
    }
587
588
    // init _rs_metas
589
175
    for (auto& it : tablet_meta_pb.rs_metas()) {
590
30
        RowsetMetaSharedPtr rs_meta(new RowsetMeta());
591
30
        rs_meta->init_from_pb(it);
592
30
        _rs_metas.push_back(std::move(rs_meta));
593
30
    }
594
595
    // For mow table, delete bitmap of stale rowsets has not been persisted.
596
    // When be restart, query should not read the stale rowset, otherwise duplicate keys
597
    // will be read out. Therefore, we don't add them to _stale_rs_meta for mow table.
598
175
    if (!_enable_unique_key_merge_on_write) {
599
141
        for (auto& it : tablet_meta_pb.stale_rs_metas()) {
600
0
            RowsetMetaSharedPtr rs_meta(new RowsetMeta());
601
0
            rs_meta->init_from_pb(it);
602
0
            _stale_rs_metas.push_back(std::move(rs_meta));
603
0
        }
604
141
    }
605
606
175
    if (tablet_meta_pb.has_in_restore_mode()) {
607
175
        _in_restore_mode = tablet_meta_pb.in_restore_mode();
608
175
    }
609
610
175
    if (tablet_meta_pb.has_preferred_rowset_type()) {
611
19
        _preferred_rowset_type = tablet_meta_pb.preferred_rowset_type();
612
19
    }
613
614
175
    _storage_policy_id = tablet_meta_pb.storage_policy_id();
615
175
    if (tablet_meta_pb.has_cooldown_meta_id()) {
616
0
        _cooldown_meta_id = tablet_meta_pb.cooldown_meta_id();
617
0
    }
618
619
175
    if (tablet_meta_pb.has_delete_bitmap()) {
620
0
        int rst_ids_size = tablet_meta_pb.delete_bitmap().rowset_ids_size();
621
0
        int seg_ids_size = tablet_meta_pb.delete_bitmap().segment_ids_size();
622
0
        int versions_size = tablet_meta_pb.delete_bitmap().versions_size();
623
0
        int seg_maps_size = tablet_meta_pb.delete_bitmap().segment_delete_bitmaps_size();
624
0
        CHECK(rst_ids_size == seg_ids_size && seg_ids_size == seg_maps_size &&
625
0
              seg_maps_size == versions_size);
626
0
        for (size_t i = 0; i < rst_ids_size; ++i) {
627
0
            RowsetId rst_id;
628
0
            rst_id.init(tablet_meta_pb.delete_bitmap().rowset_ids(i));
629
0
            auto seg_id = tablet_meta_pb.delete_bitmap().segment_ids(i);
630
0
            uint32_t ver = tablet_meta_pb.delete_bitmap().versions(i);
631
0
            auto bitmap = tablet_meta_pb.delete_bitmap().segment_delete_bitmaps(i).data();
632
0
            delete_bitmap().delete_bitmap[{rst_id, seg_id, ver}] = roaring::Roaring::read(bitmap);
633
0
        }
634
0
    }
635
636
175
    if (tablet_meta_pb.has_binlog_config()) {
637
17
        _binlog_config = tablet_meta_pb.binlog_config();
638
17
    }
639
175
    _compaction_policy = tablet_meta_pb.compaction_policy();
640
175
    _time_series_compaction_goal_size_mbytes =
641
175
            tablet_meta_pb.time_series_compaction_goal_size_mbytes();
642
175
    _time_series_compaction_file_count_threshold =
643
175
            tablet_meta_pb.time_series_compaction_file_count_threshold();
644
175
    _time_series_compaction_time_threshold_seconds =
645
175
            tablet_meta_pb.time_series_compaction_time_threshold_seconds();
646
175
    _time_series_compaction_empty_rowsets_threshold =
647
175
            tablet_meta_pb.time_series_compaction_empty_rowsets_threshold();
648
175
    _time_series_compaction_level_threshold =
649
175
            tablet_meta_pb.time_series_compaction_level_threshold();
650
175
}
651
652
68
void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
653
68
    tablet_meta_pb->set_table_id(table_id());
654
68
    tablet_meta_pb->set_partition_id(partition_id());
655
68
    tablet_meta_pb->set_tablet_id(tablet_id());
656
68
    tablet_meta_pb->set_replica_id(replica_id());
657
68
    tablet_meta_pb->set_schema_hash(schema_hash());
658
68
    tablet_meta_pb->set_shard_id(shard_id());
659
68
    tablet_meta_pb->set_creation_time(creation_time());
660
68
    tablet_meta_pb->set_cumulative_layer_point(cumulative_layer_point());
661
68
    *(tablet_meta_pb->mutable_tablet_uid()) = tablet_uid().to_proto();
662
68
    tablet_meta_pb->set_tablet_type(_tablet_type);
663
68
    switch (tablet_state()) {
664
3
    case TABLET_NOTREADY:
665
3
        tablet_meta_pb->set_tablet_state(PB_NOTREADY);
666
3
        break;
667
39
    case TABLET_RUNNING:
668
39
        tablet_meta_pb->set_tablet_state(PB_RUNNING);
669
39
        break;
670
0
    case TABLET_TOMBSTONED:
671
0
        tablet_meta_pb->set_tablet_state(PB_TOMBSTONED);
672
0
        break;
673
0
    case TABLET_STOPPED:
674
0
        tablet_meta_pb->set_tablet_state(PB_STOPPED);
675
0
        break;
676
26
    case TABLET_SHUTDOWN:
677
26
        tablet_meta_pb->set_tablet_state(PB_SHUTDOWN);
678
26
        break;
679
68
    }
680
681
105
    for (auto& rs : _rs_metas) {
682
105
        rs->to_rowset_pb(tablet_meta_pb->add_rs_metas());
683
105
    }
684
68
    for (auto rs : _stale_rs_metas) {
685
0
        rs->to_rowset_pb(tablet_meta_pb->add_stale_rs_metas());
686
0
    }
687
68
    _schema->to_schema_pb(tablet_meta_pb->mutable_schema());
688
689
68
    tablet_meta_pb->set_in_restore_mode(in_restore_mode());
690
691
    // to avoid modify tablet meta to the greatest extend
692
68
    if (_preferred_rowset_type == BETA_ROWSET) {
693
68
        tablet_meta_pb->set_preferred_rowset_type(_preferred_rowset_type);
694
68
    }
695
68
    if (_storage_policy_id > 0) {
696
3
        tablet_meta_pb->set_storage_policy_id(_storage_policy_id);
697
3
    }
698
68
    if (_cooldown_meta_id.initialized()) {
699
3
        tablet_meta_pb->mutable_cooldown_meta_id()->CopyFrom(_cooldown_meta_id.to_proto());
700
3
    }
701
702
68
    tablet_meta_pb->set_enable_unique_key_merge_on_write(_enable_unique_key_merge_on_write);
703
704
68
    if (_enable_unique_key_merge_on_write) {
705
2
        std::set<RowsetId> stale_rs_ids;
706
2
        for (const auto& rowset : _stale_rs_metas) {
707
0
            stale_rs_ids.insert(rowset->rowset_id());
708
0
        }
709
2
        DeleteBitmapPB* delete_bitmap_pb = tablet_meta_pb->mutable_delete_bitmap();
710
2
        for (auto& [id, bitmap] : delete_bitmap().snapshot().delete_bitmap) {
711
2
            auto& [rowset_id, segment_id, ver] = id;
712
2
            if (stale_rs_ids.count(rowset_id) != 0) {
713
0
                continue;
714
0
            }
715
2
            delete_bitmap_pb->add_rowset_ids(rowset_id.to_string());
716
2
            delete_bitmap_pb->add_segment_ids(segment_id);
717
2
            delete_bitmap_pb->add_versions(ver);
718
2
            std::string bitmap_data(bitmap.getSizeInBytes(), '\0');
719
2
            bitmap.write(bitmap_data.data());
720
2
            *(delete_bitmap_pb->add_segment_delete_bitmaps()) = std::move(bitmap_data);
721
2
        }
722
2
    }
723
68
    _binlog_config.to_pb(tablet_meta_pb->mutable_binlog_config());
724
68
    tablet_meta_pb->set_compaction_policy(compaction_policy());
725
68
    tablet_meta_pb->set_time_series_compaction_goal_size_mbytes(
726
68
            time_series_compaction_goal_size_mbytes());
727
68
    tablet_meta_pb->set_time_series_compaction_file_count_threshold(
728
68
            time_series_compaction_file_count_threshold());
729
68
    tablet_meta_pb->set_time_series_compaction_time_threshold_seconds(
730
68
            time_series_compaction_time_threshold_seconds());
731
68
    tablet_meta_pb->set_time_series_compaction_empty_rowsets_threshold(
732
68
            time_series_compaction_empty_rowsets_threshold());
733
68
    tablet_meta_pb->set_time_series_compaction_level_threshold(
734
68
            time_series_compaction_level_threshold());
735
68
}
736
737
49
uint32_t TabletMeta::mem_size() const {
738
49
    auto size = sizeof(TabletMeta);
739
49
    size += _schema->mem_size();
740
49
    return size;
741
49
}
742
743
2
void TabletMeta::to_json(string* json_string, json2pb::Pb2JsonOptions& options) {
744
2
    TabletMetaPB tablet_meta_pb;
745
2
    to_meta_pb(&tablet_meta_pb);
746
2
    json2pb::ProtoMessageToJson(tablet_meta_pb, json_string, options);
747
2
}
748
749
48
Version TabletMeta::max_version() const {
750
48
    Version max_version = {-1, 0};
751
99
    for (auto& rs_meta : _rs_metas) {
752
99
        if (rs_meta->end_version() > max_version.second) {
753
97
            max_version = rs_meta->version();
754
97
        }
755
99
    }
756
48
    return max_version;
757
48
}
758
759
// Find the rowset with specified version and return its schema
760
// Currently, this API is used by delete condition
761
0
const TabletSchemaSPtr TabletMeta::tablet_schema(Version version) const {
762
0
    auto it = _rs_metas.begin();
763
0
    while (it != _rs_metas.end()) {
764
0
        if ((*it)->version() == version) {
765
0
            return (*it)->tablet_schema();
766
0
        }
767
0
        ++it;
768
0
    }
769
0
    return nullptr;
770
0
}
771
772
284
Status TabletMeta::add_rs_meta(const RowsetMetaSharedPtr& rs_meta) {
773
    // check RowsetMeta is valid
774
1.32k
    for (auto& rs : _rs_metas) {
775
1.32k
        if (rs->version() == rs_meta->version()) {
776
0
            if (rs->rowset_id() != rs_meta->rowset_id()) {
777
0
                return Status::Error<PUSH_VERSION_ALREADY_EXIST>(
778
0
                        "version already exist. rowset_id={}, version={}, tablet={}",
779
0
                        rs->rowset_id().to_string(), rs->version().to_string(), full_name());
780
0
            } else {
781
                // rowsetid,version is equal, it is a duplicate req, skip it
782
0
                return Status::OK();
783
0
            }
784
0
        }
785
1.32k
    }
786
284
    _rs_metas.push_back(rs_meta);
787
284
    return Status::OK();
788
284
}
789
790
void TabletMeta::delete_rs_meta_by_version(const Version& version,
791
0
                                           std::vector<RowsetMetaSharedPtr>* deleted_rs_metas) {
792
0
    auto it = _rs_metas.begin();
793
0
    while (it != _rs_metas.end()) {
794
0
        if ((*it)->version() == version) {
795
0
            if (deleted_rs_metas != nullptr) {
796
0
                deleted_rs_metas->push_back(*it);
797
0
            }
798
0
            _rs_metas.erase(it);
799
0
            return;
800
0
        } else {
801
0
            ++it;
802
0
        }
803
0
    }
804
0
}
805
806
void TabletMeta::modify_rs_metas(const std::vector<RowsetMetaSharedPtr>& to_add,
807
                                 const std::vector<RowsetMetaSharedPtr>& to_delete,
808
7
                                 bool same_version) {
809
    // Remove to_delete rowsets from _rs_metas
810
7
    for (auto rs_to_del : to_delete) {
811
4
        auto it = _rs_metas.begin();
812
4
        while (it != _rs_metas.end()) {
813
4
            if (rs_to_del->version() == (*it)->version()) {
814
4
                _rs_metas.erase(it);
815
                // there should be only one rowset match the version
816
4
                break;
817
4
            } else {
818
0
                ++it;
819
0
            }
820
4
        }
821
4
    }
822
7
    if (!same_version) {
823
        // put to_delete rowsets in _stale_rs_metas.
824
4
        _stale_rs_metas.insert(_stale_rs_metas.end(), to_delete.begin(), to_delete.end());
825
4
    }
826
    // put to_add rowsets in _rs_metas.
827
7
    _rs_metas.insert(_rs_metas.end(), to_add.begin(), to_add.end());
828
7
}
829
830
// Use the passing "rs_metas" to replace the rs meta in this tablet meta
831
// Also clear the _stale_rs_metas because this tablet meta maybe copyied from
832
// an existing tablet before. Add after revise, only the passing "rs_metas"
833
// is needed.
834
3
void TabletMeta::revise_rs_metas(std::vector<RowsetMetaSharedPtr>&& rs_metas) {
835
3
    std::lock_guard<std::shared_mutex> wrlock(_meta_lock);
836
3
    _rs_metas = std::move(rs_metas);
837
3
    _stale_rs_metas.clear();
838
3
}
839
840
// This method should call after revise_rs_metas, since new rs_metas might be a subset
841
// of original tablet, we should revise the delete_bitmap according to current rowset.
842
//
843
// Delete bitmap is protected by Tablet::_meta_lock, we don't need to acquire the
844
// TabletMeta's _meta_lock
845
1
void TabletMeta::revise_delete_bitmap_unlocked(const DeleteBitmap& delete_bitmap) {
846
1
    _delete_bitmap = std::make_unique<DeleteBitmap>(tablet_id());
847
2
    for (auto rs : _rs_metas) {
848
2
        DeleteBitmap rs_bm(tablet_id());
849
2
        delete_bitmap.subset({rs->rowset_id(), 0, 0}, {rs->rowset_id(), UINT32_MAX, INT64_MAX},
850
2
                             &rs_bm);
851
2
        _delete_bitmap->merge(rs_bm);
852
2
    }
853
1
    for (auto rs : _stale_rs_metas) {
854
0
        DeleteBitmap rs_bm(tablet_id());
855
0
        delete_bitmap.subset({rs->rowset_id(), 0, 0}, {rs->rowset_id(), UINT32_MAX, INT64_MAX},
856
0
                             &rs_bm);
857
0
        _delete_bitmap->merge(rs_bm);
858
0
    }
859
1
}
860
861
0
void TabletMeta::delete_stale_rs_meta_by_version(const Version& version) {
862
0
    auto it = _stale_rs_metas.begin();
863
0
    while (it != _stale_rs_metas.end()) {
864
0
        if ((*it)->version() == version) {
865
0
            if (_enable_unique_key_merge_on_write) {
866
                // remove rowset delete bitmap
867
0
                delete_bitmap().remove({(*it)->rowset_id(), 0, 0},
868
0
                                       {(*it)->rowset_id(), UINT32_MAX, 0});
869
0
            }
870
0
            it = _stale_rs_metas.erase(it);
871
0
        } else {
872
0
            it++;
873
0
        }
874
0
    }
875
0
}
876
877
0
RowsetMetaSharedPtr TabletMeta::acquire_rs_meta_by_version(const Version& version) const {
878
0
    for (auto it : _rs_metas) {
879
0
        if (it->version() == version) {
880
0
            return it;
881
0
        }
882
0
    }
883
0
    return nullptr;
884
0
}
885
886
8
RowsetMetaSharedPtr TabletMeta::acquire_stale_rs_meta_by_version(const Version& version) const {
887
8
    for (auto it : _stale_rs_metas) {
888
0
        if (it->version() == version) {
889
0
            return it;
890
0
        }
891
0
    }
892
8
    return nullptr;
893
8
}
894
895
0
std::string TabletMeta::full_name() const {
896
0
    std::stringstream ss;
897
0
    ss << _tablet_id << "." << _schema_hash << "." << _tablet_uid.to_string();
898
0
    return ss.str();
899
0
}
900
901
12
Status TabletMeta::set_partition_id(int64_t partition_id) {
902
12
    if ((_partition_id > 0 && _partition_id != partition_id) || partition_id < 1) {
903
0
        LOG(WARNING) << "cur partition id=" << _partition_id << " new partition id=" << partition_id
904
0
                     << " not equal";
905
0
    }
906
12
    _partition_id = partition_id;
907
12
    return Status::OK();
908
12
}
909
910
1
bool operator==(const TabletMeta& a, const TabletMeta& b) {
911
1
    if (a._table_id != b._table_id) return false;
912
1
    if (a._partition_id != b._partition_id) return false;
913
1
    if (a._tablet_id != b._tablet_id) return false;
914
1
    if (a._replica_id != b._replica_id) return false;
915
1
    if (a._schema_hash != b._schema_hash) return false;
916
1
    if (a._shard_id != b._shard_id) return false;
917
1
    if (a._creation_time != b._creation_time) return false;
918
1
    if (a._cumulative_layer_point != b._cumulative_layer_point) return false;
919
1
    if (a._tablet_uid != b._tablet_uid) return false;
920
1
    if (a._tablet_type != b._tablet_type) return false;
921
1
    if (a._tablet_state != b._tablet_state) return false;
922
1
    if (*a._schema != *b._schema) return false;
923
1
    if (a._rs_metas.size() != b._rs_metas.size()) return false;
924
1
    for (int i = 0; i < a._rs_metas.size(); ++i) {
925
0
        if (a._rs_metas[i] != b._rs_metas[i]) return false;
926
0
    }
927
1
    if (a._in_restore_mode != b._in_restore_mode) return false;
928
1
    if (a._preferred_rowset_type != b._preferred_rowset_type) return false;
929
1
    if (a._storage_policy_id != b._storage_policy_id) return false;
930
1
    if (a._compaction_policy != b._compaction_policy) return false;
931
1
    if (a._time_series_compaction_goal_size_mbytes != b._time_series_compaction_goal_size_mbytes)
932
0
        return false;
933
1
    if (a._time_series_compaction_file_count_threshold !=
934
1
        b._time_series_compaction_file_count_threshold)
935
0
        return false;
936
1
    if (a._time_series_compaction_time_threshold_seconds !=
937
1
        b._time_series_compaction_time_threshold_seconds)
938
0
        return false;
939
1
    if (a._time_series_compaction_empty_rowsets_threshold !=
940
1
        b._time_series_compaction_empty_rowsets_threshold)
941
0
        return false;
942
1
    if (a._time_series_compaction_level_threshold != b._time_series_compaction_level_threshold)
943
0
        return false;
944
1
    return true;
945
1
}
946
947
0
bool operator!=(const TabletMeta& a, const TabletMeta& b) {
948
0
    return !(a == b);
949
0
}
950
951
219
DeleteBitmap::DeleteBitmap(int64_t tablet_id) : _tablet_id(tablet_id) {
952
    // The default delete bitmap cache is set to 100MB,
953
    // which can be insufficient and cause performance issues when the amount of user data is large.
954
    // To mitigate the problem of an inadequate cache,
955
    // we will take the larger of 0.5% of the total memory and 100MB as the delete bitmap cache size.
956
219
    bool is_percent = false;
957
219
    int64_t delete_bitmap_agg_cache_cache_limit =
958
219
            ParseUtil::parse_mem_spec(config::delete_bitmap_dynamic_agg_cache_limit,
959
219
                                      MemInfo::mem_limit(), MemInfo::physical_mem(), &is_percent);
960
219
    _agg_cache.reset(new AggCache(delete_bitmap_agg_cache_cache_limit >
961
219
                                                  config::delete_bitmap_agg_cache_capacity
962
219
                                          ? delete_bitmap_agg_cache_cache_limit
963
219
                                          : config::delete_bitmap_agg_cache_capacity));
964
219
}
965
966
5
DeleteBitmap::DeleteBitmap(const DeleteBitmap& o) {
967
5
    delete_bitmap = o.delete_bitmap; // just copy data
968
5
    _tablet_id = o._tablet_id;
969
5
}
970
971
0
DeleteBitmap& DeleteBitmap::operator=(const DeleteBitmap& o) {
972
0
    delete_bitmap = o.delete_bitmap; // just copy data
973
0
    _tablet_id = o._tablet_id;
974
0
    return *this;
975
0
}
976
977
0
DeleteBitmap::DeleteBitmap(DeleteBitmap&& o) {
978
0
    delete_bitmap = std::move(o.delete_bitmap);
979
0
    _tablet_id = o._tablet_id;
980
0
}
981
982
0
DeleteBitmap& DeleteBitmap::operator=(DeleteBitmap&& o) {
983
0
    delete_bitmap = std::move(o.delete_bitmap);
984
0
    _tablet_id = o._tablet_id;
985
0
    return *this;
986
0
}
987
988
5
DeleteBitmap DeleteBitmap::snapshot() const {
989
5
    std::shared_lock l(lock);
990
5
    return DeleteBitmap(*this);
991
5
}
992
993
3
DeleteBitmap DeleteBitmap::snapshot(Version version) const {
994
    // Take snapshot first, then remove keys greater than given version.
995
3
    DeleteBitmap snapshot = this->snapshot();
996
3
    auto it = snapshot.delete_bitmap.begin();
997
412
    while (it != snapshot.delete_bitmap.end()) {
998
409
        if (std::get<2>(it->first) > version) {
999
4
            it = snapshot.delete_bitmap.erase(it);
1000
405
        } else {
1001
405
            it++;
1002
405
        }
1003
409
    }
1004
3
    return snapshot;
1005
3
}
1006
1007
459k
void DeleteBitmap::add(const BitmapKey& bmk, uint32_t row_id) {
1008
459k
    std::lock_guard l(lock);
1009
459k
    delete_bitmap[bmk].add(row_id);
1010
459k
}
1011
1012
0
int DeleteBitmap::remove(const BitmapKey& bmk, uint32_t row_id) {
1013
0
    std::lock_guard l(lock);
1014
0
    auto it = delete_bitmap.find(bmk);
1015
0
    if (it == delete_bitmap.end()) return -1;
1016
0
    it->second.remove(row_id);
1017
0
    return 0;
1018
0
}
1019
1020
8
void DeleteBitmap::remove(const BitmapKey& start, const BitmapKey& end) {
1021
8
    std::lock_guard l(lock);
1022
107
    for (auto it = delete_bitmap.lower_bound(start); it != delete_bitmap.end();) {
1023
101
        auto& [k, _] = *it;
1024
101
        if (k >= end) {
1025
2
            break;
1026
2
        }
1027
99
        it = delete_bitmap.erase(it);
1028
99
    }
1029
8
}
1030
1031
6
bool DeleteBitmap::contains(const BitmapKey& bmk, uint32_t row_id) const {
1032
6
    std::shared_lock l(lock);
1033
6
    auto it = delete_bitmap.find(bmk);
1034
6
    return it != delete_bitmap.end() && it->second.contains(row_id);
1035
6
}
1036
1037
2
bool DeleteBitmap::contains_agg(const BitmapKey& bmk, uint32_t row_id) const {
1038
2
    return get_agg(bmk)->contains(row_id);
1039
2
}
1040
1041
0
bool DeleteBitmap::empty() const {
1042
0
    std::shared_lock l(lock);
1043
0
    return delete_bitmap.empty();
1044
0
}
1045
1046
1
bool DeleteBitmap::contains_agg_without_cache(const BitmapKey& bmk, uint32_t row_id) const {
1047
1
    std::shared_lock l(lock);
1048
1
    DeleteBitmap::BitmapKey start {std::get<0>(bmk), std::get<1>(bmk), 0};
1049
1
    for (auto it = delete_bitmap.lower_bound(start); it != delete_bitmap.end(); ++it) {
1050
0
        auto& [k, bm] = *it;
1051
0
        if (std::get<0>(k) != std::get<0>(bmk) || std::get<1>(k) != std::get<1>(bmk) ||
1052
0
            std::get<2>(k) > std::get<2>(bmk)) {
1053
0
            break;
1054
0
        }
1055
0
        if (bm.contains(row_id)) {
1056
0
            return true;
1057
0
        }
1058
0
    }
1059
1
    return false;
1060
1
}
1061
1062
38
int DeleteBitmap::set(const BitmapKey& bmk, const roaring::Roaring& segment_delete_bitmap) {
1063
38
    std::lock_guard l(lock);
1064
38
    auto [_, inserted] = delete_bitmap.insert_or_assign(bmk, segment_delete_bitmap);
1065
38
    return inserted;
1066
38
}
1067
1068
3
int DeleteBitmap::get(const BitmapKey& bmk, roaring::Roaring* segment_delete_bitmap) const {
1069
3
    std::shared_lock l(lock);
1070
3
    auto it = delete_bitmap.find(bmk);
1071
3
    if (it == delete_bitmap.end()) return -1;
1072
3
    *segment_delete_bitmap = it->second; // copy
1073
3
    return 0;
1074
3
}
1075
1076
54
const roaring::Roaring* DeleteBitmap::get(const BitmapKey& bmk) const {
1077
54
    std::shared_lock l(lock);
1078
54
    auto it = delete_bitmap.find(bmk);
1079
54
    if (it == delete_bitmap.end()) return nullptr;
1080
41
    return &(it->second); // get address
1081
54
}
1082
1083
void DeleteBitmap::subset(const BitmapKey& start, const BitmapKey& end,
1084
3
                          DeleteBitmap* subset_rowset_map) const {
1085
3
    roaring::Roaring roaring;
1086
3
    DCHECK(start < end);
1087
3
    std::shared_lock l(lock);
1088
26
    for (auto it = delete_bitmap.lower_bound(start); it != delete_bitmap.end(); ++it) {
1089
25
        auto& [k, bm] = *it;
1090
25
        if (k >= end) {
1091
2
            break;
1092
2
        }
1093
23
        subset_rowset_map->set(k, bm);
1094
23
    }
1095
3
}
1096
1097
2
void DeleteBitmap::merge(const BitmapKey& bmk, const roaring::Roaring& segment_delete_bitmap) {
1098
2
    std::lock_guard l(lock);
1099
2
    auto [iter, succ] = delete_bitmap.emplace(bmk, segment_delete_bitmap);
1100
2
    if (!succ) {
1101
0
        iter->second |= segment_delete_bitmap;
1102
0
    }
1103
2
}
1104
1105
8
void DeleteBitmap::merge(const DeleteBitmap& other) {
1106
8
    std::lock_guard l(lock);
1107
29
    for (auto& i : other.delete_bitmap) {
1108
29
        auto [j, succ] = this->delete_bitmap.insert(i);
1109
29
        if (!succ) j->second |= i.second;
1110
29
    }
1111
8
}
1112
1113
// We cannot just copy the underlying memory to construct a string
1114
// due to equivalent objects may have different padding bytes.
1115
// Reading padding bytes is undefined behavior, neither copy nor
1116
// placement new will help simplify the code.
1117
// Refer to C11 standards §6.2.6.1/6 and §6.7.9/21 for more info.
1118
44
static std::string agg_cache_key(int64_t tablet_id, const DeleteBitmap::BitmapKey& bmk) {
1119
44
    std::string ret(sizeof(tablet_id) + sizeof(bmk), '\0');
1120
44
    *reinterpret_cast<int64_t*>(ret.data()) = tablet_id;
1121
44
    auto t = reinterpret_cast<DeleteBitmap::BitmapKey*>(ret.data() + sizeof(tablet_id));
1122
44
    std::get<RowsetId>(*t).version = std::get<RowsetId>(bmk).version;
1123
44
    std::get<RowsetId>(*t).hi = std::get<RowsetId>(bmk).hi;
1124
44
    std::get<RowsetId>(*t).mi = std::get<RowsetId>(bmk).mi;
1125
44
    std::get<RowsetId>(*t).lo = std::get<RowsetId>(bmk).lo;
1126
44
    std::get<1>(*t) = std::get<1>(bmk);
1127
44
    std::get<2>(*t) = std::get<2>(bmk);
1128
44
    return ret;
1129
44
}
1130
1131
44
std::shared_ptr<roaring::Roaring> DeleteBitmap::get_agg(const BitmapKey& bmk) const {
1132
44
    std::string key_str = agg_cache_key(_tablet_id, bmk); // Cache key container
1133
44
    CacheKey key(key_str);
1134
44
    Cache::Handle* handle = _agg_cache->repr()->lookup(key);
1135
1136
44
    AggCache::Value* val =
1137
44
            handle == nullptr
1138
44
                    ? nullptr
1139
44
                    : reinterpret_cast<AggCache::Value*>(_agg_cache->repr()->value(handle));
1140
    // FIXME: do we need a mutex here to get rid of duplicated initializations
1141
    //        of cache entries in some cases?
1142
44
    if (val == nullptr) { // Renew if needed, put a new Value to cache
1143
38
        val = new AggCache::Value();
1144
38
        {
1145
38
            std::shared_lock l(lock);
1146
38
            DeleteBitmap::BitmapKey start {std::get<0>(bmk), std::get<1>(bmk), 0};
1147
69
            for (auto it = delete_bitmap.lower_bound(start); it != delete_bitmap.end(); ++it) {
1148
66
                auto& [k, bm] = *it;
1149
66
                if (std::get<0>(k) != std::get<0>(bmk) || std::get<1>(k) != std::get<1>(bmk) ||
1150
66
                    std::get<2>(k) > std::get<2>(bmk)) {
1151
35
                    break;
1152
35
                }
1153
31
                val->bitmap |= bm;
1154
31
            }
1155
38
        }
1156
38
        static auto deleter = [](const CacheKey& key, void* value) {
1157
0
            delete (AggCache::Value*)value; // Just delete to reclaim
1158
0
        };
1159
38
        size_t charge = val->bitmap.getSizeInBytes() + sizeof(AggCache::Value);
1160
38
        handle = _agg_cache->repr()->insert(key, val, charge, deleter, CachePriority::NORMAL);
1161
38
    }
1162
1163
    // It is natural for the cache to reclaim the underlying memory
1164
44
    return std::shared_ptr<roaring::Roaring>(
1165
44
            &val->bitmap, [this, handle](...) { _agg_cache->repr()->release(handle); });
1166
44
}
1167
1168
std::atomic<ShardedLRUCache*> DeleteBitmap::AggCache::s_repr {nullptr};
1169
1170
} // namespace doris