Coverage Report

Created: 2026-04-11 14:25

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/channel/load_stream_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/channel/load_stream_writer.h"
19
20
#include <brpc/controller.h>
21
#include <butil/errno.h>
22
#include <fmt/format.h>
23
#include <gen_cpp/internal_service.pb.h>
24
#include <gen_cpp/olap_file.pb.h>
25
26
#include <filesystem>
27
#include <ostream>
28
#include <string>
29
#include <utility>
30
31
#include "bvar/bvar.h"
32
#include "cloud/config.h"
33
#include "common/compiler_util.h" // IWYU pragma: keep
34
#include "common/config.h"
35
#include "common/logging.h"
36
#include "common/status.h"
37
#include "core/block/block.h"
38
#include "io/fs/file_writer.h" // IWYU pragma: keep
39
#include "load/channel/load_channel_mgr.h"
40
#include "load/memtable/memtable.h"
41
#include "load/memtable/memtable_flush_executor.h"
42
#include "runtime/exec_env.h"
43
#include "runtime/memory/mem_tracker.h"
44
#include "service/backend_options.h"
45
#include "storage/data_dir.h"
46
#include "storage/index/inverted/inverted_index_desc.h"
47
#include "storage/olap_define.h"
48
#include "storage/rowset/beta_rowset.h"
49
#include "storage/rowset/beta_rowset_writer.h"
50
#include "storage/rowset/rowset_factory.h"
51
#include "storage/rowset/rowset_meta.h"
52
#include "storage/rowset/rowset_writer.h"
53
#include "storage/rowset/rowset_writer_context.h"
54
#include "storage/rowset_builder.h"
55
#include "storage/schema.h"
56
#include "storage/schema_change/schema_change.h"
57
#include "storage/segment/segment.h"
58
#include "storage/storage_engine.h"
59
#include "storage/tablet/tablet_manager.h"
60
#include "storage/tablet_info.h"
61
#include "storage/txn/txn_manager.h"
62
#include "util/brpc_client_cache.h"
63
#include "util/brpc_closure.h"
64
#include "util/debug_points.h"
65
#include "util/mem_info.h"
66
#include "util/stopwatch.hpp"
67
#include "util/time.h"
68
69
namespace doris {
70
using namespace ErrorCode;
71
72
bvar::Adder<int64_t> g_load_stream_writer_cnt("load_stream_writer_count");
73
bvar::Adder<int64_t> g_load_stream_file_writer_cnt("load_stream_file_writer_count");
74
75
LoadStreamWriter::LoadStreamWriter(WriteRequest* context, RuntimeProfile* profile)
76
13.4k
        : _req(*context), _rowset_writer(nullptr) {
77
13.4k
    g_load_stream_writer_cnt << 1;
78
    // TODO(plat1ko): CloudStorageEngine
79
13.4k
    _rowset_builder = std::make_unique<RowsetBuilder>(
80
13.4k
            ExecEnv::GetInstance()->storage_engine().to_local(), *context, profile);
81
13.4k
    _resource_ctx = thread_context()->resource_ctx(); // from load stream
82
13.4k
}
83
84
13.4k
LoadStreamWriter::~LoadStreamWriter() {
85
13.4k
    g_load_stream_file_writer_cnt << -_segment_file_writers.size();
86
13.4k
    g_load_stream_file_writer_cnt << -_inverted_file_writers.size();
87
13.4k
    g_load_stream_writer_cnt << -1;
88
13.4k
}
89
90
13.4k
Status LoadStreamWriter::init() {
91
13.4k
    DBUG_EXECUTE_IF("LoadStreamWriter.init.failure",
92
13.4k
                    { return Status::InternalError("fault injection"); });
93
13.4k
    RETURN_IF_ERROR(_rowset_builder->init());
94
13.4k
    _rowset_writer = _rowset_builder->rowset_writer();
95
13.4k
    _is_init = true;
96
13.4k
    return Status::OK();
97
13.4k
}
98
99
Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset, butil::IOBuf buf,
100
106k
                                     FileType file_type) {
101
106k
    SCOPED_ATTACH_TASK(_resource_ctx);
102
106k
    io::FileWriter* file_writer = nullptr;
103
106k
    auto& file_writers =
104
18.4E
            file_type == FileType::SEGMENT_FILE ? _segment_file_writers : _inverted_file_writers;
105
106k
    {
106
106k
        std::lock_guard lock_guard(_lock);
107
106k
        DCHECK(_is_init);
108
106k
        if (segid >= file_writers.size()) {
109
7.44k
            for (size_t i = file_writers.size(); i <= segid; i++) {
110
3.72k
                Status st;
111
3.72k
                io::FileWriterPtr seg_file_writer;
112
3.72k
                st = _rowset_writer->create_file_writer(static_cast<uint32_t>(i), seg_file_writer,
113
3.72k
                                                        file_type);
114
3.72k
                DBUG_EXECUTE_IF("LoadStreamWriter.append_data.create_file_writer_failed",
115
3.72k
                                { st = Status::InternalError("fault injection"); });
116
3.72k
                if (!st.ok()) {
117
0
                    _is_canceled = true;
118
0
                    return st;
119
0
                }
120
3.72k
                file_writers.push_back(std::move(seg_file_writer));
121
3.72k
                g_load_stream_file_writer_cnt << 1;
122
3.72k
            }
123
3.71k
        }
124
125
        // TODO: IOBuf to Slice
126
106k
        file_writer = file_writers[segid].get();
127
106k
    }
128
106k
    DBUG_EXECUTE_IF("LoadStreamWriter.append_data.null_file_writer", { file_writer = nullptr; });
129
18.4E
    VLOG_DEBUG << " file_writer " << file_writer << "seg id " << segid;
130
106k
    if (file_writer == nullptr) {
131
0
        return Status::Corruption("append_data failed, file writer {} is destoryed", segid);
132
0
    }
133
106k
    DBUG_EXECUTE_IF("LoadStreamWriter.append_data.wrong_offset", { offset++; });
134
106k
    if (file_writer->bytes_appended() != offset) {
135
1
        return Status::Corruption(
136
1
                "append_data out-of-order in segment={}, expected offset={}, actual={}",
137
1
                file_writer->path().native(), offset, file_writer->bytes_appended());
138
1
    }
139
106k
    return file_writer->append(buf.to_string());
140
106k
}
141
142
3.72k
Status LoadStreamWriter::close_writer(uint32_t segid, FileType file_type) {
143
3.72k
    SCOPED_ATTACH_TASK(_resource_ctx);
144
3.72k
    io::FileWriter* file_writer = nullptr;
145
3.72k
    auto& file_writers =
146
3.72k
            file_type == FileType::SEGMENT_FILE ? _segment_file_writers : _inverted_file_writers;
147
3.72k
    {
148
3.72k
        std::lock_guard lock_guard(_lock);
149
3.72k
        if (!_is_init) {
150
0
            return Status::Corruption("close_writer failed, LoadStreamWriter is not inited");
151
0
        }
152
3.72k
        DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.bad_segid",
153
3.72k
                        { segid = static_cast<uint32_t>(file_writers.size()); });
154
3.72k
        if (segid >= file_writers.size()) {
155
0
            return Status::Corruption(
156
0
                    "close_writer failed, file {} is never opened, file type is {}", segid,
157
0
                    file_type);
158
0
        }
159
3.72k
        file_writer = file_writers[segid].get();
160
3.72k
    }
161
162
3.72k
    DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.null_file_writer", { file_writer = nullptr; });
163
3.72k
    if (file_writer == nullptr) {
164
0
        return Status::Corruption(
165
0
                "close_writer failed, file writer {} is destoryed, fiel type is {}", segid,
166
0
                file_type);
167
0
    }
168
3.72k
    auto st = file_writer->close();
169
3.72k
    if (!st.ok()) {
170
0
        _is_canceled = true;
171
0
        return st;
172
0
    }
173
3.72k
    LOG(INFO) << "file " << segid << " path " << file_writer->path().native() << " closed, written "
174
3.72k
              << file_writer->bytes_appended() << " bytes"
175
3.72k
              << ", file type is " << file_type;
176
    // ‌Allow the index file to be empty when creating an index on a variant-type column.
177
3.72k
    if (file_writer->bytes_appended() == 0 && file_type != FileType::INVERTED_INDEX_FILE) {
178
1
        return Status::Corruption("file {} closed with 0 bytes, file type is {}",
179
1
                                  file_writer->path().native(), file_type);
180
1
    }
181
3.72k
    return Status::OK();
182
3.72k
}
183
184
3.70k
Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& stat) {
185
3.70k
    SCOPED_ATTACH_TASK(_resource_ctx);
186
3.70k
    size_t segment_file_size = 0;
187
3.70k
    size_t inverted_file_size = 0;
188
3.70k
    {
189
3.70k
        std::lock_guard lock_guard(_lock);
190
3.70k
        if (!_is_init) {
191
0
            return Status::Corruption("add_segment failed, LoadStreamWriter is not inited");
192
0
        }
193
3.70k
        DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.bad_segid",
194
3.70k
                        { segid = static_cast<uint32_t>(_segment_file_writers.size()); });
195
3.70k
        RETURN_IF_ERROR(_calc_file_size(segid, FileType::SEGMENT_FILE, &segment_file_size));
196
3.70k
        if (_inverted_file_writers.size() > 0) {
197
0
            RETURN_IF_ERROR(
198
0
                    _calc_file_size(segid, FileType::INVERTED_INDEX_FILE, &inverted_file_size));
199
0
        }
200
3.70k
    }
201
202
3.70k
    DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.size_not_match", { segment_file_size++; });
203
3.70k
    if (segment_file_size != stat.data_size) {
204
0
        return Status::Corruption(
205
0
                "add_segment failed, segment stat {} does not match, file size={}, inverted file "
206
0
                "size={}, stat.data_size={}, tablet id={}",
207
0
                segid, segment_file_size, inverted_file_size, stat.data_size, _req.tablet_id);
208
0
    }
209
210
3.70k
    return _rowset_writer->add_segment(segid, stat);
211
3.70k
}
212
213
3.70k
Status LoadStreamWriter::_calc_file_size(uint32_t segid, FileType file_type, size_t* file_size) {
214
3.70k
    io::FileWriter* file_writer = nullptr;
215
3.70k
    auto& file_writers =
216
3.70k
            (file_type == FileType::SEGMENT_FILE) ? _segment_file_writers : _inverted_file_writers;
217
218
3.70k
    DBUG_EXECUTE_IF("LoadStreamWriter._calc_file_size.unknown_segment",
219
3.70k
                    { segid = static_cast<uint32_t>(file_writers.size()); });
220
3.70k
    if (segid >= file_writers.size()) {
221
0
        return Status::Corruption("calc file size failed, file {} is never opened, file type is {}",
222
0
                                  segid, file_type);
223
0
    }
224
3.70k
    file_writer = file_writers[segid].get();
225
3.70k
    DBUG_EXECUTE_IF("LoadStreamWriter._calc_file_size.null_file_writer",
226
3.70k
                    { file_writer = nullptr; });
227
3.70k
    if (file_writer == nullptr) {
228
0
        return Status::Corruption(
229
0
                "calc file size failed, file writer {} is destoryed, file type is {}", segid,
230
0
                file_type);
231
0
    }
232
3.70k
    DBUG_EXECUTE_IF("LoadStreamWriter._calc_file_size.file_not_closed", {
233
3.70k
        io::FileWriterPtr fwriter;
234
3.70k
        static_cast<void>(_rowset_writer->create_file_writer(
235
3.70k
                static_cast<uint32_t>(file_writers.size()), fwriter, FileType::SEGMENT_FILE));
236
3.70k
        file_writers.push_back(std::move(fwriter));
237
3.70k
        file_writer = file_writers.back().get();
238
3.70k
    });
239
3.70k
    if (file_writer->state() != io::FileWriter::State::CLOSED) {
240
0
        return Status::Corruption("calc file size failed, file {} is not closed",
241
0
                                  file_writer->path().native());
242
0
    }
243
3.70k
    *file_size = file_writer->bytes_appended();
244
3.70k
    return Status::OK();
245
3.70k
}
246
247
13.4k
Status LoadStreamWriter::_pre_close() {
248
13.4k
    SCOPED_ATTACH_TASK(_resource_ctx);
249
13.4k
    if (!_is_init) {
250
        // if this delta writer is not initialized, but close() is called.
251
        // which means this tablet has no data loaded, but at least one tablet
252
        // in same partition has data loaded.
253
        // so we have to also init this LoadStreamWriter, so that it can create an empty rowset
254
        // for this tablet when being closed.
255
0
        RETURN_IF_ERROR(init());
256
0
    }
257
258
13.4k
    DCHECK(_is_init)
259
0
            << "rowset builder is supposed be to initialized before close_wait() being called";
260
261
13.4k
    DBUG_EXECUTE_IF("LoadStreamWriter.close.cancelled", { _is_canceled = true; });
262
13.4k
    if (_is_canceled) {
263
0
        return Status::InternalError("flush segment failed");
264
0
    }
265
13.4k
    DBUG_EXECUTE_IF("LoadStreamWriter.close.inverted_writers_size_not_match", {
266
13.4k
        io::FileWriterPtr file_writer;
267
13.4k
        static_cast<void>(_rowset_writer->create_file_writer(
268
13.4k
                static_cast<uint32_t>(_inverted_file_writers.size()), file_writer,
269
13.4k
                FileType::INVERTED_INDEX_FILE));
270
13.4k
        _inverted_file_writers.push_back(std::move(file_writer));
271
13.4k
    });
272
13.4k
    if (_inverted_file_writers.size() > 0 &&
273
13.4k
        _inverted_file_writers.size() != _segment_file_writers.size()) {
274
0
        return Status::Corruption(
275
0
                "LoadStreamWriter close failed, inverted file writer size is {},"
276
0
                "segment file writer size is {}",
277
0
                _inverted_file_writers.size(), _segment_file_writers.size());
278
0
    }
279
13.4k
    DBUG_EXECUTE_IF("LoadStreamWriter.close.file_not_closed", {
280
13.4k
        io::FileWriterPtr file_writer;
281
13.4k
        static_cast<void>(_rowset_writer->create_file_writer(
282
13.4k
                static_cast<uint32_t>(_segment_file_writers.size()), file_writer,
283
13.4k
                FileType::SEGMENT_FILE));
284
13.4k
        _segment_file_writers.push_back(std::move(file_writer));
285
13.4k
    });
286
13.4k
    for (const auto& writer : _segment_file_writers) {
287
3.72k
        if (writer->state() != io::FileWriter::State::CLOSED) {
288
1
            return Status::Corruption("LoadStreamWriter close failed, segment {} is not closed",
289
1
                                      writer->path().native());
290
1
        }
291
3.72k
    }
292
293
13.4k
    DBUG_EXECUTE_IF("LoadStreamWriter.close.inverted_file_not_closed", {
294
13.4k
        io::FileWriterPtr file_writer;
295
13.4k
        static_cast<void>(_rowset_writer->create_file_writer(
296
13.4k
                static_cast<uint32_t>(_inverted_file_writers.size()), file_writer,
297
13.4k
                FileType::INVERTED_INDEX_FILE));
298
13.4k
        _inverted_file_writers.push_back(std::move(file_writer));
299
13.4k
    });
300
13.4k
    for (const auto& writer : _inverted_file_writers) {
301
0
        if (writer->state() != io::FileWriter::State::CLOSED) {
302
0
            return Status::Corruption(
303
0
                    "LoadStreamWriter close failed, inverted file {} is not closed",
304
0
                    writer->path().native());
305
0
        }
306
0
    }
307
308
13.4k
    RETURN_IF_ERROR(_rowset_builder->build_rowset());
309
13.4k
    RETURN_IF_ERROR(_rowset_builder->submit_calc_delete_bitmap_task());
310
13.4k
    _pre_closed = true;
311
13.4k
    return Status::OK();
312
13.4k
}
313
314
13.4k
Status LoadStreamWriter::close() {
315
13.4k
    std::lock_guard<std::mutex> l(_lock);
316
13.4k
    if (!_pre_closed) {
317
0
        RETURN_IF_ERROR(_pre_close());
318
0
    }
319
13.4k
    RETURN_IF_ERROR(_rowset_builder->wait_calc_delete_bitmap());
320
    // FIXME(plat1ko): No `commit_txn` operation in cloud mode, need better abstractions
321
13.4k
    RETURN_IF_ERROR(static_cast<RowsetBuilder*>(_rowset_builder.get())->commit_txn());
322
323
13.4k
    return Status::OK();
324
13.4k
}
325
326
} // namespace doris