Coverage Report

Created: 2026-04-14 13:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/channel/load_stream_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/channel/load_stream_writer.h"
19
20
#include <brpc/controller.h>
21
#include <butil/errno.h>
22
#include <fmt/format.h>
23
#include <gen_cpp/internal_service.pb.h>
24
#include <gen_cpp/olap_file.pb.h>
25
26
#include <filesystem>
27
#include <ostream>
28
#include <string>
29
#include <utility>
30
31
#include "bvar/bvar.h"
32
#include "cloud/config.h"
33
#include "common/compiler_util.h" // IWYU pragma: keep
34
#include "common/config.h"
35
#include "common/logging.h"
36
#include "common/status.h"
37
#include "core/block/block.h"
38
#include "io/fs/file_writer.h" // IWYU pragma: keep
39
#include "load/channel/load_channel_mgr.h"
40
#include "load/memtable/memtable.h"
41
#include "load/memtable/memtable_flush_executor.h"
42
#include "runtime/exec_env.h"
43
#include "runtime/memory/mem_tracker.h"
44
#include "service/backend_options.h"
45
#include "storage/data_dir.h"
46
#include "storage/index/inverted/inverted_index_desc.h"
47
#include "storage/olap_define.h"
48
#include "storage/rowset/beta_rowset.h"
49
#include "storage/rowset/beta_rowset_writer.h"
50
#include "storage/rowset/rowset_factory.h"
51
#include "storage/rowset/rowset_meta.h"
52
#include "storage/rowset/rowset_writer.h"
53
#include "storage/rowset/rowset_writer_context.h"
54
#include "storage/rowset_builder.h"
55
#include "storage/schema.h"
56
#include "storage/schema_change/schema_change.h"
57
#include "storage/segment/segment.h"
58
#include "storage/storage_engine.h"
59
#include "storage/tablet/tablet_manager.h"
60
#include "storage/tablet_info.h"
61
#include "storage/txn/txn_manager.h"
62
#include "util/brpc_client_cache.h"
63
#include "util/brpc_closure.h"
64
#include "util/debug_points.h"
65
#include "util/mem_info.h"
66
#include "util/stopwatch.hpp"
67
#include "util/time.h"
68
69
namespace doris {
70
using namespace ErrorCode;
71
72
bvar::Adder<int64_t> g_load_stream_writer_cnt("load_stream_writer_count");
73
bvar::Adder<int64_t> g_load_stream_file_writer_cnt("load_stream_file_writer_count");
74
75
LoadStreamWriter::LoadStreamWriter(WriteRequest* context, RuntimeProfile* profile)
76
14
        : _req(*context), _rowset_writer(nullptr) {
77
14
    g_load_stream_writer_cnt << 1;
78
    // TODO(plat1ko): CloudStorageEngine
79
14
    _rowset_builder = std::make_unique<RowsetBuilder>(
80
14
            ExecEnv::GetInstance()->storage_engine().to_local(), *context, profile);
81
14
    _resource_ctx = thread_context()->resource_ctx(); // from load stream
82
14
}
83
84
14
LoadStreamWriter::~LoadStreamWriter() {
85
14
    g_load_stream_file_writer_cnt << -_segment_file_writers.size();
86
14
    g_load_stream_file_writer_cnt << -_inverted_file_writers.size();
87
14
    g_load_stream_writer_cnt << -1;
88
14
}
89
90
14
Status LoadStreamWriter::init() {
91
14
    DBUG_EXECUTE_IF("LoadStreamWriter.init.failure",
92
14
                    { return Status::InternalError("fault injection"); });
93
14
    RETURN_IF_ERROR(_rowset_builder->init());
94
13
    _rowset_writer = _rowset_builder->rowset_writer();
95
13
    _is_init = true;
96
13
    return Status::OK();
97
14
}
98
99
Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset, butil::IOBuf buf,
100
26
                                     FileType file_type) {
101
26
    SCOPED_ATTACH_TASK(_resource_ctx);
102
26
    io::FileWriter* file_writer = nullptr;
103
26
    auto& file_writers =
104
26
            file_type == FileType::SEGMENT_FILE ? _segment_file_writers : _inverted_file_writers;
105
26
    {
106
26
        std::lock_guard lock_guard(_lock);
107
26
        DCHECK(_is_init);
108
26
        if (segid >= file_writers.size()) {
109
38
            for (size_t i = file_writers.size(); i <= segid; i++) {
110
24
                Status st;
111
24
                io::FileWriterPtr seg_file_writer;
112
24
                st = _rowset_writer->create_file_writer(static_cast<uint32_t>(i), seg_file_writer,
113
24
                                                        file_type);
114
24
                DBUG_EXECUTE_IF("LoadStreamWriter.append_data.create_file_writer_failed",
115
24
                                { st = Status::InternalError("fault injection"); });
116
24
                if (!st.ok()) {
117
0
                    _is_canceled = true;
118
0
                    return st;
119
0
                }
120
24
                file_writers.push_back(std::move(seg_file_writer));
121
24
                g_load_stream_file_writer_cnt << 1;
122
24
            }
123
14
        }
124
125
        // TODO: IOBuf to Slice
126
26
        file_writer = file_writers[segid].get();
127
26
    }
128
26
    DBUG_EXECUTE_IF("LoadStreamWriter.append_data.null_file_writer", { file_writer = nullptr; });
129
26
    VLOG_DEBUG << " file_writer " << file_writer << "seg id " << segid;
130
26
    if (file_writer == nullptr) {
131
0
        return Status::Corruption("append_data failed, file writer {} is destoryed", segid);
132
0
    }
133
26
    DBUG_EXECUTE_IF("LoadStreamWriter.append_data.wrong_offset", { offset++; });
134
26
    if (file_writer->bytes_appended() != offset) {
135
1
        return Status::Corruption(
136
1
                "append_data out-of-order in segment={}, expected offset={}, actual={}",
137
1
                file_writer->path().native(), offset, file_writer->bytes_appended());
138
1
    }
139
25
    return file_writer->append(buf.to_string());
140
26
}
141
142
21
Status LoadStreamWriter::close_writer(uint32_t segid, FileType file_type) {
143
21
    SCOPED_ATTACH_TASK(_resource_ctx);
144
21
    io::FileWriter* file_writer = nullptr;
145
21
    auto& file_writers =
146
21
            file_type == FileType::SEGMENT_FILE ? _segment_file_writers : _inverted_file_writers;
147
21
    {
148
21
        std::lock_guard lock_guard(_lock);
149
21
        if (!_is_init) {
150
0
            return Status::Corruption("close_writer failed, LoadStreamWriter is not inited");
151
0
        }
152
21
        DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.bad_segid",
153
21
                        { segid = static_cast<uint32_t>(file_writers.size()); });
154
21
        if (segid >= file_writers.size()) {
155
0
            return Status::Corruption(
156
0
                    "close_writer failed, file {} is never opened, file type is {}", segid,
157
0
                    file_type);
158
0
        }
159
21
        file_writer = file_writers[segid].get();
160
21
    }
161
162
21
    DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.null_file_writer", { file_writer = nullptr; });
163
21
    if (file_writer == nullptr) {
164
0
        return Status::Corruption(
165
0
                "close_writer failed, file writer {} is destoryed, fiel type is {}", segid,
166
0
                file_type);
167
0
    }
168
21
    auto st = file_writer->close();
169
21
    if (!st.ok()) {
170
0
        _is_canceled = true;
171
0
        return st;
172
0
    }
173
21
    LOG(INFO) << "file " << segid << " path " << file_writer->path().native() << " closed, written "
174
21
              << file_writer->bytes_appended() << " bytes"
175
21
              << ", file type is " << file_type;
176
    // ‌Allow the index file to be empty when creating an index on a variant-type column.
177
21
    if (file_writer->bytes_appended() == 0 && file_type != FileType::INVERTED_INDEX_FILE) {
178
1
        return Status::Corruption("file {} closed with 0 bytes, file type is {}",
179
1
                                  file_writer->path().native(), file_type);
180
1
    }
181
20
    return Status::OK();
182
21
}
183
184
0
Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& stat) {
185
0
    SCOPED_ATTACH_TASK(_resource_ctx);
186
0
    size_t segment_file_size = 0;
187
0
    size_t inverted_file_size = 0;
188
0
    {
189
0
        std::lock_guard lock_guard(_lock);
190
0
        if (!_is_init) {
191
0
            return Status::Corruption("add_segment failed, LoadStreamWriter is not inited");
192
0
        }
193
0
        DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.bad_segid",
194
0
                        { segid = static_cast<uint32_t>(_segment_file_writers.size()); });
195
0
        RETURN_IF_ERROR(_calc_file_size(segid, FileType::SEGMENT_FILE, &segment_file_size));
196
0
        if (_inverted_file_writers.size() > 0) {
197
0
            RETURN_IF_ERROR(
198
0
                    _calc_file_size(segid, FileType::INVERTED_INDEX_FILE, &inverted_file_size));
199
0
        }
200
0
    }
201
202
0
    DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.size_not_match", { segment_file_size++; });
203
0
    if (segment_file_size != stat.data_size) {
204
0
        return Status::Corruption(
205
0
                "add_segment failed, segment stat {} does not match, file size={}, inverted file "
206
0
                "size={}, stat.data_size={}, tablet id={}",
207
0
                segid, segment_file_size, inverted_file_size, stat.data_size, _req.tablet_id);
208
0
    }
209
210
0
    return _rowset_writer->add_segment(segid, stat);
211
0
}
212
213
0
Status LoadStreamWriter::_calc_file_size(uint32_t segid, FileType file_type, size_t* file_size) {
214
0
    io::FileWriter* file_writer = nullptr;
215
0
    auto& file_writers =
216
0
            (file_type == FileType::SEGMENT_FILE) ? _segment_file_writers : _inverted_file_writers;
217
218
0
    DBUG_EXECUTE_IF("LoadStreamWriter._calc_file_size.unknown_segment",
219
0
                    { segid = static_cast<uint32_t>(file_writers.size()); });
220
0
    if (segid >= file_writers.size()) {
221
0
        return Status::Corruption("calc file size failed, file {} is never opened, file type is {}",
222
0
                                  segid, file_type);
223
0
    }
224
0
    file_writer = file_writers[segid].get();
225
0
    DBUG_EXECUTE_IF("LoadStreamWriter._calc_file_size.null_file_writer",
226
0
                    { file_writer = nullptr; });
227
0
    if (file_writer == nullptr) {
228
0
        return Status::Corruption(
229
0
                "calc file size failed, file writer {} is destoryed, file type is {}", segid,
230
0
                file_type);
231
0
    }
232
0
    DBUG_EXECUTE_IF("LoadStreamWriter._calc_file_size.file_not_closed", {
233
0
        io::FileWriterPtr fwriter;
234
0
        static_cast<void>(_rowset_writer->create_file_writer(
235
0
                static_cast<uint32_t>(file_writers.size()), fwriter, FileType::SEGMENT_FILE));
236
0
        file_writers.push_back(std::move(fwriter));
237
0
        file_writer = file_writers.back().get();
238
0
    });
239
0
    if (file_writer->state() != io::FileWriter::State::CLOSED) {
240
0
        return Status::Corruption("calc file size failed, file {} is not closed",
241
0
                                  file_writer->path().native());
242
0
    }
243
0
    *file_size = file_writer->bytes_appended();
244
0
    return Status::OK();
245
0
}
246
247
9
Status LoadStreamWriter::_pre_close() {
248
9
    SCOPED_ATTACH_TASK(_resource_ctx);
249
9
    if (!_is_init) {
250
        // if this delta writer is not initialized, but close() is called.
251
        // which means this tablet has no data loaded, but at least one tablet
252
        // in same partition has data loaded.
253
        // so we have to also init this LoadStreamWriter, so that it can create an empty rowset
254
        // for this tablet when being closed.
255
0
        RETURN_IF_ERROR(init());
256
0
    }
257
258
9
    DCHECK(_is_init)
259
0
            << "rowset builder is supposed be to initialized before close_wait() being called";
260
261
9
    DBUG_EXECUTE_IF("LoadStreamWriter.close.cancelled", { _is_canceled = true; });
262
9
    if (_is_canceled) {
263
0
        return Status::InternalError("flush segment failed");
264
0
    }
265
9
    DBUG_EXECUTE_IF("LoadStreamWriter.close.inverted_writers_size_not_match", {
266
9
        io::FileWriterPtr file_writer;
267
9
        static_cast<void>(_rowset_writer->create_file_writer(
268
9
                static_cast<uint32_t>(_inverted_file_writers.size()), file_writer,
269
9
                FileType::INVERTED_INDEX_FILE));
270
9
        _inverted_file_writers.push_back(std::move(file_writer));
271
9
    });
272
9
    if (_inverted_file_writers.size() > 0 &&
273
9
        _inverted_file_writers.size() != _segment_file_writers.size()) {
274
0
        return Status::Corruption(
275
0
                "LoadStreamWriter close failed, inverted file writer size is {},"
276
0
                "segment file writer size is {}",
277
0
                _inverted_file_writers.size(), _segment_file_writers.size());
278
0
    }
279
9
    DBUG_EXECUTE_IF("LoadStreamWriter.close.file_not_closed", {
280
9
        io::FileWriterPtr file_writer;
281
9
        static_cast<void>(_rowset_writer->create_file_writer(
282
9
                static_cast<uint32_t>(_segment_file_writers.size()), file_writer,
283
9
                FileType::SEGMENT_FILE));
284
9
        _segment_file_writers.push_back(std::move(file_writer));
285
9
    });
286
20
    for (const auto& writer : _segment_file_writers) {
287
20
        if (writer->state() != io::FileWriter::State::CLOSED) {
288
1
            return Status::Corruption("LoadStreamWriter close failed, segment {} is not closed",
289
1
                                      writer->path().native());
290
1
        }
291
20
    }
292
293
8
    DBUG_EXECUTE_IF("LoadStreamWriter.close.inverted_file_not_closed", {
294
8
        io::FileWriterPtr file_writer;
295
8
        static_cast<void>(_rowset_writer->create_file_writer(
296
8
                static_cast<uint32_t>(_inverted_file_writers.size()), file_writer,
297
8
                FileType::INVERTED_INDEX_FILE));
298
8
        _inverted_file_writers.push_back(std::move(file_writer));
299
8
    });
300
8
    for (const auto& writer : _inverted_file_writers) {
301
0
        if (writer->state() != io::FileWriter::State::CLOSED) {
302
0
            return Status::Corruption(
303
0
                    "LoadStreamWriter close failed, inverted file {} is not closed",
304
0
                    writer->path().native());
305
0
        }
306
0
    }
307
308
8
    RETURN_IF_ERROR(_rowset_builder->build_rowset());
309
8
    RETURN_IF_ERROR(_rowset_builder->submit_calc_delete_bitmap_task());
310
8
    _pre_closed = true;
311
8
    return Status::OK();
312
8
}
313
314
8
Status LoadStreamWriter::close() {
315
8
    std::lock_guard<std::mutex> l(_lock);
316
8
    if (!_pre_closed) {
317
0
        RETURN_IF_ERROR(_pre_close());
318
0
    }
319
8
    RETURN_IF_ERROR(_rowset_builder->wait_calc_delete_bitmap());
320
    // FIXME(plat1ko): No `commit_txn` operation in cloud mode, need better abstractions
321
8
    RETURN_IF_ERROR(static_cast<RowsetBuilder*>(_rowset_builder.get())->commit_txn());
322
323
8
    return Status::OK();
324
8
}
325
326
} // namespace doris