Coverage Report

Created: 2026-03-13 19:41

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_delta_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_delta_writer.h"
19
20
#include "cloud/cloud_meta_mgr.h"
21
#include "cloud/cloud_rowset_builder.h"
22
#include "cloud/cloud_storage_engine.h"
23
#include "cloud/config.h"
24
#include "load/delta_writer/delta_writer.h"
25
#include "runtime/thread_context.h"
26
27
namespace doris {
28
29
bvar::Adder<int64_t> g_cloud_commit_rowset_count("cloud_commit_rowset_count");
30
bvar::Adder<int64_t> g_cloud_commit_empty_rowset_count("cloud_commit_empty_rowset_count");
31
32
CloudDeltaWriter::CloudDeltaWriter(CloudStorageEngine& engine, const WriteRequest& req,
33
                                   RuntimeProfile* profile, const UniqueId& load_id)
34
0
        : BaseDeltaWriter(req, profile, load_id), _engine(engine) {
35
0
    _rowset_builder = std::make_unique<CloudRowsetBuilder>(engine, req, profile);
36
0
    _resource_ctx = thread_context()->resource_ctx();
37
0
}
38
39
0
CloudDeltaWriter::~CloudDeltaWriter() = default;
40
41
0
Status CloudDeltaWriter::batch_init(std::vector<CloudDeltaWriter*> writers) {
42
0
    if (writers.empty()) {
43
0
        return Status::OK();
44
0
    }
45
46
0
    std::vector<std::function<Status()>> tasks;
47
0
    tasks.reserve(writers.size());
48
0
    for (auto* writer : writers) {
49
0
        if (writer->_is_init || writer->_is_cancelled) {
50
0
            continue;
51
0
        }
52
53
0
        tasks.emplace_back([writer] {
54
0
            SCOPED_ATTACH_TASK(writer->resource_context());
55
0
            std::lock_guard<bthread::Mutex> lock(writer->_mtx);
56
0
            if (writer->_is_init || writer->_is_cancelled) {
57
0
                return Status::OK();
58
0
            }
59
0
            Status st = writer->init(); // included in SCOPED_ATTACH_TASK
60
0
            return st;
61
0
        });
62
0
    }
63
64
0
    return cloud::bthread_fork_join(tasks, 10);
65
0
}
66
67
0
Status CloudDeltaWriter::write(const Block* block, const DorisVector<uint32_t>& row_idxs) {
68
0
    if (row_idxs.empty()) [[unlikely]] {
69
0
        return Status::OK();
70
0
    }
71
0
    std::lock_guard lock(_mtx);
72
0
    CHECK(_is_init || _is_cancelled);
73
0
    {
74
0
        SCOPED_TIMER(_wait_flush_limit_timer);
75
0
        while (_memtable_writer->flush_running_count() >=
76
0
               config::memtable_flush_running_count_limit) {
77
0
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
78
0
        }
79
0
    }
80
0
    return _memtable_writer->write(block, row_idxs);
81
0
}
82
83
0
Status CloudDeltaWriter::close() {
84
0
    std::lock_guard lock(_mtx);
85
0
    CHECK(_is_init);
86
0
    return _memtable_writer->close();
87
0
}
88
89
0
Status CloudDeltaWriter::cancel_with_status(const Status& st) {
90
0
    std::lock_guard lock(_mtx);
91
0
    return BaseDeltaWriter::cancel_with_status(st);
92
0
}
93
94
0
Status CloudDeltaWriter::build_rowset() {
95
0
    std::lock_guard lock(_mtx);
96
0
    CHECK(_is_init);
97
0
    return BaseDeltaWriter::build_rowset();
98
0
}
99
100
0
CloudRowsetBuilder* CloudDeltaWriter::rowset_builder() {
101
0
    return static_cast<CloudRowsetBuilder*>(_rowset_builder.get());
102
0
}
103
104
0
void CloudDeltaWriter::update_tablet_stats() {
105
0
    rowset_builder()->update_tablet_stats();
106
0
}
107
108
0
const RowsetMetaSharedPtr& CloudDeltaWriter::rowset_meta() {
109
0
    return rowset_builder()->rowset_meta();
110
0
}
111
112
0
Status CloudDeltaWriter::commit_rowset() {
113
0
    g_cloud_commit_rowset_count << 1;
114
0
    std::lock_guard<bthread::Mutex> lock(_mtx);
115
116
    // Handle empty rowset (no data written)
117
0
    if (!_is_init) {
118
0
        g_cloud_commit_empty_rowset_count << 1;
119
0
        return _commit_empty_rowset();
120
0
    }
121
122
    // Handle normal rowset with data
123
0
    return _engine.meta_mgr().commit_rowset(*rowset_meta(), "");
124
0
}
125
126
0
Status CloudDeltaWriter::_commit_empty_rowset() {
127
    // If skip writing empty rowset metadata is enabled,
128
    // we do not prepare rowset to meta service.
129
0
    if (config::skip_writing_empty_rowset_metadata) {
130
0
        rowset_builder()->set_skip_writing_rowset_metadata(true);
131
0
    }
132
133
0
    RETURN_IF_ERROR(_rowset_builder->init());
134
0
    RETURN_IF_ERROR(_rowset_builder->build_rowset());
135
136
    // If skip writing empty rowset metadata is enabled, we do not commit rowset to meta service.
137
0
    if (config::skip_writing_empty_rowset_metadata) {
138
0
        return Status::OK();
139
0
    }
140
    // write a empty rowset kv to keep version continuous
141
0
    return _engine.meta_mgr().commit_rowset(*rowset_meta(), "");
142
0
}
143
144
0
Status CloudDeltaWriter::set_txn_related_delete_bitmap() {
145
0
    return rowset_builder()->set_txn_related_delete_bitmap();
146
0
}
147
148
} // namespace doris