Coverage Report

Created: 2026-05-16 12:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_rowset_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_rowset_writer.h"
19
20
#include "common/logging.h"
21
#include "common/status.h"
22
#include "io/cache/block_file_cache_factory.h"
23
#include "io/fs/packed_file_manager.h"
24
#include "io/fs/packed_file_writer.h"
25
#include "storage/rowset/rowset_factory.h"
26
27
namespace doris {
28
29
194k
CloudRowsetWriter::CloudRowsetWriter(CloudStorageEngine& engine) : _engine(engine) {}
30
31
194k
CloudRowsetWriter::~CloudRowsetWriter() {
32
    // Must cancel any pending delete bitmap tasks before destruction.
33
    // Otherwise, the lambda in _generate_delete_bitmap may execute after the
34
    // CloudRowsetWriter destructor runs but before BaseBetaRowsetWriter destructor,
35
    // causing virtual function calls to resolve to BaseBetaRowsetWriter::_build_rowset_meta
36
    // instead of CloudRowsetWriter::_build_rowset_meta (use-after-free on vtable).
37
194k
    if (_calc_delete_bitmap_token != nullptr) {
38
51.8k
        _calc_delete_bitmap_token->cancel();
39
51.8k
    }
40
194k
}
41
42
193k
Status CloudRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) {
43
193k
    _context = rowset_writer_context;
44
193k
    _rowset_meta = std::make_shared<RowsetMeta>();
45
46
193k
    if (_context.is_local_rowset()) {
47
        // In cloud mode, this branch implies it is an intermediate rowset for external merge sort,
48
        // we use `global_local_filesystem` to write data to `tmp_file_dir`(see `local_segment_path`).
49
2.43k
        _context.tablet_path = io::FileCacheFactory::instance()->pick_one_cache_path();
50
190k
    } else {
51
190k
        _rowset_meta->set_remote_storage_resource(*_context.storage_resource);
52
190k
    }
53
54
193k
    _rowset_meta->set_rowset_id(_context.rowset_id);
55
193k
    _rowset_meta->set_partition_id(_context.partition_id);
56
193k
    _rowset_meta->set_tablet_id(_context.tablet_id);
57
193k
    _rowset_meta->set_db_id(_context.db_id);
58
193k
    _rowset_meta->set_table_id(_context.table_id);
59
193k
    _rowset_meta->set_index_id(_context.index_id);
60
193k
    _rowset_meta->set_tablet_schema_hash(_context.tablet_schema_hash);
61
193k
    _rowset_meta->set_rowset_type(_context.rowset_type);
62
193k
    _rowset_meta->set_rowset_state(_context.rowset_state);
63
193k
    _rowset_meta->set_segments_overlap(_context.segments_overlap);
64
193k
    _rowset_meta->set_txn_id(_context.txn_id);
65
193k
    _rowset_meta->set_txn_expiration(_context.txn_expiration);
66
193k
    _rowset_meta->set_compaction_level(_context.compaction_level);
67
193k
    if (_context.rowset_state == PREPARED || _context.rowset_state == COMMITTED) {
68
174k
        _is_pending = true;
69
174k
        _rowset_meta->set_load_id(_context.load_id);
70
174k
    } else {
71
        // Rowset generated by compaction or schema change
72
18.3k
        _rowset_meta->set_version(_context.version);
73
18.3k
        DCHECK_NE(_context.newest_write_timestamp, -1);
74
18.3k
        _rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp);
75
18.3k
    }
76
193k
    _rowset_meta->set_tablet_schema(_context.tablet_schema);
77
193k
    _rowset_meta->set_job_id(_context.job_id);
78
193k
    _context.segment_collector = std::make_shared<SegmentCollectorT<BaseBetaRowsetWriter>>(this);
79
193k
    _context.file_writer_creator = std::make_shared<FileWriterCreatorT<BaseBetaRowsetWriter>>(this);
80
193k
    if (_context.mow_context != nullptr) {
81
51.7k
        _calc_delete_bitmap_token = _engine.calc_delete_bitmap_executor_for_load()->create_token();
82
51.7k
    }
83
193k
    return Status::OK();
84
193k
}
85
86
213k
Status CloudRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool check_segment_num) {
87
18.4E
    VLOG_NOTICE << "start to build rowset meta. tablet_id=" << rowset_meta->tablet_id()
88
18.4E
                << ", rowset_id=" << rowset_meta->rowset_id()
89
18.4E
                << ", check_segment_num=" << check_segment_num;
90
    // Call base class implementation
91
213k
    RETURN_IF_ERROR(BaseBetaRowsetWriter::_build_rowset_meta(rowset_meta, check_segment_num));
92
93
    // Collect packed file segment index information for interim rowsets as well.
94
213k
    return _collect_all_packed_slice_locations(rowset_meta);
95
213k
}
96
97
194k
Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) {
98
194k
    if (_calc_delete_bitmap_token != nullptr) {
99
51.8k
        RETURN_IF_ERROR(_calc_delete_bitmap_token->wait());
100
51.8k
    }
101
194k
    RETURN_IF_ERROR(_close_file_writers());
102
103
    // TODO(plat1ko): check_segment_footer
104
105
194k
    RETURN_IF_ERROR(_build_rowset_meta(_rowset_meta.get()));
106
    // At this point all writers have been closed, so collecting packed file indices is safe.
107
194k
    RETURN_IF_ERROR(_collect_all_packed_slice_locations(_rowset_meta.get()));
108
    // If the current load is a partial update, new segments may be appended to the tmp rowset after the tmp rowset
109
    // has been committed if conflicts occur due to concurrent partial updates. However, when the recycler do recycling,
110
    // it will generate the paths for the segments to be recycled on the object storage based on the number of segments
111
    // in the rowset meta. If these newly added segments are written to the object storage and the transaction is aborted
112
    // due to a failure before successfully updating the rowset meta of the corresponding tmp rowset, these newly added
113
    // segments cannot be recycled by the recycler on the object storage. Therefore, we need a new state `BEGIN_PARTIAL_UPDATE`
114
    // to indicate that the recycler should use list+delete to recycle segments. After the tmp rowset's rowset meta being
115
    // updated successfully, the `rowset_state` will be set to `COMMITTED` and the recycler can do recycling based on the
116
    // number of segments in the rowset meta safely.
117
    //
118
    // rowset_state's FSM:
119
    //
120
    //                      transfer 0
121
    //       PREPARED ---------------------------> COMMITTED
122
    //                 |                               ^
123
    //                 | transfer 1                    |
124
    //                 |                               | transfer 2
125
    //                 |--> BEGIN_PARTIAL_UPDATE ------|
126
    //
127
    // transfer 0 (PREPARED -> COMMITTED): finish writing a rowset and the rowset' meta will not be changed
128
    // transfer 1 (PREPARED -> BEGIN_PARTIAL_UPDATE): finish writing a rowset, but may append new segments later and the rowset's meta may be changed
129
    // transfer 2 (BEGIN_PARTIAL_UPDATE -> VISIBLE): finish adding new segments and the rowset' meta will not be changed, the rowset is visible to users
130
194k
    if (_context.partial_update_info && _context.partial_update_info->is_partial_update()) {
131
4.80k
        _rowset_meta->set_rowset_state(BEGIN_PARTIAL_UPDATE);
132
189k
    } else {
133
189k
        _rowset_meta->set_rowset_state(COMMITTED);
134
189k
    }
135
136
194k
    _rowset_meta->set_tablet_schema(_context.tablet_schema);
137
138
194k
    if (_rowset_meta->newest_write_timestamp() == -1) {
139
176k
        _rowset_meta->set_newest_write_timestamp(UnixSeconds());
140
176k
    }
141
142
194k
    if (auto seg_file_size = _seg_files.segments_file_size(_segment_start_id);
143
194k
        !seg_file_size.has_value()) [[unlikely]] {
144
0
        LOG(ERROR) << "expected segment file sizes, but none presents: " << seg_file_size.error();
145
194k
    } else {
146
194k
        _rowset_meta->add_segments_file_size(seg_file_size.value());
147
194k
    }
148
194k
    if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) {
149
21.6k
        if (auto idx_files_info = _idx_files.inverted_index_file_info(_segment_start_id);
150
21.6k
            !idx_files_info.has_value()) [[unlikely]] {
151
0
            LOG(ERROR) << "expected inverted index files info, but none presents: "
152
0
                       << idx_files_info.error();
153
21.6k
        } else {
154
21.6k
            _rowset_meta->add_inverted_index_files_info(idx_files_info.value());
155
21.6k
        }
156
21.6k
    }
157
158
194k
    RETURN_NOT_OK_STATUS_WITH_WARN(
159
194k
            RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, _rowset_meta,
160
194k
                                         &rowset),
161
194k
            "rowset init failed when build new rowset");
162
194k
    _already_built = true;
163
194k
    return Status::OK();
164
194k
}
165
166
407k
Status CloudRowsetWriter::_collect_all_packed_slice_locations(RowsetMeta* rowset_meta) {
167
18.4E
    VLOG_NOTICE << "start to collect packed slice locations for rowset meta. tablet_id="
168
18.4E
                << rowset_meta->tablet_id() << ", rowset_id=" << rowset_meta->rowset_id();
169
407k
    if (!_context.packed_file_active) {
170
407k
        return Status::OK();
171
407k
    }
172
173
    // Collect segment file packed indices
174
18.4E
    const auto& file_writers = _seg_files.get_file_writers();
175
18.4E
    for (const auto& [seg_id, writer_ptr] : file_writers) {
176
0
        auto segment_path = _context.segment_path(seg_id);
177
0
        RETURN_IF_ERROR(
178
0
                _collect_packed_slice_location(writer_ptr.get(), segment_path, rowset_meta));
179
0
    }
180
181
    // Collect inverted index file packed indices
182
18.4E
    const auto& idx_file_writers = _idx_files.get_file_writers();
183
18.4E
    for (const auto& [seg_id, idx_writer_ptr] : idx_file_writers) {
184
0
        if (idx_writer_ptr != nullptr && idx_writer_ptr->get_file_writer() != nullptr) {
185
0
            auto segment_path = _context.segment_path(seg_id);
186
0
            auto index_prefix_view =
187
0
                    InvertedIndexDescriptor::get_index_file_path_prefix(segment_path);
188
0
            std::string index_path =
189
0
                    InvertedIndexDescriptor::get_index_file_path_v2(std::string(index_prefix_view));
190
0
            RETURN_IF_ERROR(_collect_packed_slice_location(idx_writer_ptr->get_file_writer(),
191
0
                                                           index_path, rowset_meta));
192
0
        }
193
0
    }
194
195
18.4E
    return Status::OK();
196
18.4E
}
197
198
Status CloudRowsetWriter::_collect_packed_slice_location(io::FileWriter* file_writer,
199
                                                         const std::string& file_path,
200
0
                                                         RowsetMeta* rowset_meta) {
201
0
    VLOG_NOTICE << "collect packed slice location for file: " << file_path;
202
    // Check if file writer is closed
203
0
    if (file_writer->state() != io::FileWriter::State::CLOSED) {
204
        // Writer is still open; index will be collected after it is closed.
205
0
        return Status::OK();
206
0
    }
207
208
    // Check if file is actually in packed file (not direct write for large files)
209
0
    if (!file_writer->is_in_packed_file()) {
210
0
        return Status::OK();
211
0
    }
212
213
    // Get packed slice location directly from PackedFileManager
214
0
    io::PackedSliceLocation index;
215
0
    RETURN_IF_ERROR(
216
0
            io::PackedFileManager::instance()->get_packed_slice_location(file_path, &index));
217
0
    if (index.packed_file_path.empty()) {
218
0
        return Status::OK(); // File not in packed file, skip
219
0
    }
220
221
0
    rowset_meta->add_packed_slice_location(file_path, index.packed_file_path, index.offset,
222
0
                                           index.size, index.packed_file_size);
223
    LOG(INFO) << "collect packed file index: " << file_path << " -> " << index.packed_file_path
224
0
              << ", offset: " << index.offset << ", size: " << index.size;
225
0
    return Status::OK();
226
0
}
227
228
} // namespace doris