Coverage Report

Created: 2026-03-13 09:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_delta_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_delta_writer.h"
19
20
#include "cloud/cloud_meta_mgr.h"
21
#include "cloud/cloud_rowset_builder.h"
22
#include "cloud/cloud_storage_engine.h"
23
#include "cloud/config.h"
24
#include "load/delta_writer/delta_writer.h"
25
#include "runtime/thread_context.h"
26
27
namespace doris {
28
29
bvar::Adder<int64_t> g_cloud_commit_rowset_count("cloud_commit_rowset_count");
30
bvar::Adder<int64_t> g_cloud_commit_empty_rowset_count("cloud_commit_empty_rowset_count");
31
32
CloudDeltaWriter::CloudDeltaWriter(CloudStorageEngine& engine, const WriteRequest& req,
33
                                   RuntimeProfile* profile, const UniqueId& load_id)
34
270k
        : BaseDeltaWriter(req, profile, load_id), _engine(engine) {
35
270k
    _rowset_builder = std::make_unique<CloudRowsetBuilder>(engine, req, profile);
36
270k
    _resource_ctx = thread_context()->resource_ctx();
37
270k
}
38
39
270k
CloudDeltaWriter::~CloudDeltaWriter() = default;
40
41
32.1k
Status CloudDeltaWriter::batch_init(std::vector<CloudDeltaWriter*> writers) {
42
32.1k
    if (writers.empty()) {
43
0
        return Status::OK();
44
0
    }
45
46
32.1k
    std::vector<std::function<Status()>> tasks;
47
32.1k
    tasks.reserve(writers.size());
48
112k
    for (auto* writer : writers) {
49
112k
        if (writer->_is_init || writer->_is_cancelled) {
50
58.3k
            continue;
51
58.3k
        }
52
53
54.4k
        tasks.emplace_back([writer] {
54
54.3k
            SCOPED_ATTACH_TASK(writer->resource_context());
55
54.3k
            std::lock_guard<bthread::Mutex> lock(writer->_mtx);
56
54.4k
            if (writer->_is_init || writer->_is_cancelled) {
57
0
                return Status::OK();
58
0
            }
59
54.3k
            Status st = writer->init(); // included in SCOPED_ATTACH_TASK
60
54.3k
            return st;
61
54.3k
        });
62
54.4k
    }
63
64
32.1k
    return cloud::bthread_fork_join(tasks, 10);
65
32.1k
}
66
67
112k
Status CloudDeltaWriter::write(const Block* block, const DorisVector<uint32_t>& row_idxs) {
68
112k
    if (row_idxs.empty()) [[unlikely]] {
69
0
        return Status::OK();
70
0
    }
71
112k
    std::lock_guard lock(_mtx);
72
112k
    CHECK(_is_init || _is_cancelled);
73
112k
    {
74
112k
        SCOPED_TIMER(_wait_flush_limit_timer);
75
112k
        while (_memtable_writer->flush_running_count() >=
76
112k
               config::memtable_flush_running_count_limit) {
77
0
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
78
0
        }
79
112k
    }
80
112k
    return _memtable_writer->write(block, row_idxs);
81
112k
}
82
83
54.4k
Status CloudDeltaWriter::close() {
84
54.4k
    std::lock_guard lock(_mtx);
85
54.4k
    CHECK(_is_init);
86
54.4k
    return _memtable_writer->close();
87
54.4k
}
88
89
91.0k
Status CloudDeltaWriter::cancel_with_status(const Status& st) {
90
91.0k
    std::lock_guard lock(_mtx);
91
91.0k
    return BaseDeltaWriter::cancel_with_status(st);
92
91.0k
}
93
94
54.4k
Status CloudDeltaWriter::build_rowset() {
95
54.4k
    std::lock_guard lock(_mtx);
96
54.4k
    CHECK(_is_init);
97
54.4k
    return BaseDeltaWriter::build_rowset();
98
54.4k
}
99
100
537k
CloudRowsetBuilder* CloudDeltaWriter::rowset_builder() {
101
537k
    return static_cast<CloudRowsetBuilder*>(_rowset_builder.get());
102
537k
}
103
104
179k
void CloudDeltaWriter::update_tablet_stats() {
105
179k
    rowset_builder()->update_tablet_stats();
106
179k
}
107
108
54.3k
const RowsetMetaSharedPtr& CloudDeltaWriter::rowset_meta() {
109
54.3k
    return rowset_builder()->rowset_meta();
110
54.3k
}
111
112
178k
Status CloudDeltaWriter::commit_rowset() {
113
178k
    g_cloud_commit_rowset_count << 1;
114
178k
    std::lock_guard<bthread::Mutex> lock(_mtx);
115
116
    // Handle empty rowset (no data written)
117
178k
    if (!_is_init) {
118
124k
        g_cloud_commit_empty_rowset_count << 1;
119
124k
        return _commit_empty_rowset();
120
124k
    }
121
122
    // Handle normal rowset with data
123
53.6k
    return _engine.meta_mgr().commit_rowset(*rowset_meta(), "");
124
178k
}
125
126
124k
Status CloudDeltaWriter::_commit_empty_rowset() {
127
    // If skip writing empty rowset metadata is enabled,
128
    // we do not prepare rowset to meta service.
129
124k
    if (config::skip_writing_empty_rowset_metadata) {
130
124k
        rowset_builder()->set_skip_writing_rowset_metadata(true);
131
124k
    }
132
133
124k
    RETURN_IF_ERROR(_rowset_builder->init());
134
124k
    RETURN_IF_ERROR(_rowset_builder->build_rowset());
135
136
    // If skip writing empty rowset metadata is enabled, we do not commit rowset to meta service.
137
124k
    if (config::skip_writing_empty_rowset_metadata) {
138
124k
        return Status::OK();
139
124k
    }
140
    // write a empty rowset kv to keep version continuous
141
18.4E
    return _engine.meta_mgr().commit_rowset(*rowset_meta(), "");
142
124k
}
143
144
179k
Status CloudDeltaWriter::set_txn_related_delete_bitmap() {
145
179k
    return rowset_builder()->set_txn_related_delete_bitmap();
146
179k
}
147
148
} // namespace doris