Coverage Report

Created: 2026-03-16 21:03

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/task/engine_checksum_task.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/task/engine_checksum_task.h"
19
20
#include <glog/logging.h>
21
22
#include <ostream>
23
#include <string>
24
#include <vector>
25
26
#include "core/block/block.h"
27
#include "exec/common/sip_hash.h"
28
#include "io/io_common.h"
29
#include "runtime/memory/mem_tracker_limiter.h"
30
#include "runtime/thread_context.h"
31
#include "storage/iterator/block_reader.h"
32
#include "storage/olap_common.h"
33
#include "storage/olap_define.h"
34
#include "storage/rowset/rowset.h"
35
#include "storage/storage_engine.h"
36
#include "storage/tablet/tablet.h"
37
#include "storage/tablet/tablet_manager.h"
38
#include "storage/tablet/tablet_reader.h"
39
#include "storage/utils.h"
40
41
namespace doris {
42
43
EngineChecksumTask::EngineChecksumTask(StorageEngine& engine, TTabletId tablet_id,
44
                                       TSchemaHash schema_hash, TVersion version,
45
                                       uint32_t* checksum)
46
0
        : _engine(engine),
47
0
          _tablet_id(tablet_id),
48
0
          _schema_hash(schema_hash),
49
0
          _version(version),
50
0
          _checksum(checksum) {
51
0
    _mem_tracker = MemTrackerLimiter::create_shared(
52
0
            MemTrackerLimiter::Type::LOAD,
53
0
            "EngineChecksumTask#tabletId=" + std::to_string(tablet_id));
54
0
}
55
56
0
EngineChecksumTask::~EngineChecksumTask() = default;
57
58
0
Status EngineChecksumTask::execute() {
59
0
    return _compute_checksum();
60
0
} // execute
61
62
0
Status EngineChecksumTask::_compute_checksum() {
63
0
    LOG(INFO) << "begin to process compute checksum."
64
0
              << "tablet_id=" << _tablet_id << ", schema_hash=" << _schema_hash
65
0
              << ", version=" << _version;
66
0
    OlapStopWatch watch;
67
68
0
    if (_checksum == nullptr) {
69
0
        return Status::InvalidArgument("invalid checksum which is nullptr");
70
0
    }
71
72
0
    TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(_tablet_id);
73
0
    if (nullptr == tablet) {
74
0
        return Status::InternalError("could not find tablet {}", _tablet_id);
75
0
    }
76
77
0
    std::vector<RowsetSharedPtr> input_rowsets;
78
0
    Version version(0, _version);
79
0
    BlockReader reader;
80
0
    TabletReader::ReaderParams reader_params;
81
0
    Block block;
82
0
    {
83
0
        std::shared_lock rdlock(tablet->get_header_lock());
84
0
        auto ret = tablet->capture_consistent_rowsets_unlocked(version, CaptureRowsetOps {});
85
0
        if (ret) {
86
0
            input_rowsets = std::move(ret->rowsets);
87
0
        } else {
88
0
            LOG(WARNING) << "fail to captute consistent rowsets. tablet=" << tablet->tablet_id()
89
0
                         << "res=" << ret.error();
90
0
            return std::move(ret.error());
91
0
        }
92
93
0
        RETURN_IF_ERROR(TabletReader::init_reader_params_and_create_block(
94
0
                tablet, ReaderType::READER_CHECKSUM, input_rowsets, &reader_params, &block));
95
0
    }
96
0
    size_t input_size = 0;
97
0
    for (const auto& rowset : input_rowsets) {
98
0
        input_size += rowset->total_disk_size();
99
0
    }
100
101
0
    auto res = reader.init(reader_params);
102
0
    if (!res.ok()) {
103
0
        LOG(WARNING) << "initiate reader fail. res = " << res;
104
0
        return res;
105
0
    }
106
107
0
    bool eof = false;
108
0
    SipHash block_hash;
109
0
    uint64_t rows = 0;
110
0
    while (!eof) {
111
0
        RETURN_IF_ERROR(reader.next_block_with_aggregation(&block, &eof));
112
0
        rows += block.rows();
113
114
0
        block.update_hash(block_hash);
115
0
        block.clear_column_data();
116
0
    }
117
0
    uint64_t checksum64 = block_hash.get64();
118
0
    *_checksum = (checksum64 >> 32) ^ (checksum64 & 0xffffffff);
119
120
    LOG(INFO) << "success to finish compute checksum. tablet_id = " << _tablet_id
121
0
              << ", rows = " << rows << ", checksum=" << *_checksum
122
0
              << ", total_size = " << input_size << ", cost(us): " << watch.get_elapse_time_us();
123
0
    return Status::OK();
124
0
}
125
126
} // namespace doris