Coverage Report

Created: 2026-03-15 08:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/channel/load_stream_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/channel/load_stream_writer.h"
19
20
#include <brpc/controller.h>
21
#include <butil/errno.h>
22
#include <fmt/format.h>
23
#include <gen_cpp/internal_service.pb.h>
24
#include <gen_cpp/olap_file.pb.h>
25
26
#include <filesystem>
27
#include <ostream>
28
#include <string>
29
#include <utility>
30
31
#include "bvar/bvar.h"
32
#include "cloud/config.h"
33
#include "common/compiler_util.h" // IWYU pragma: keep
34
#include "common/config.h"
35
#include "common/logging.h"
36
#include "common/status.h"
37
#include "core/block/block.h"
38
#include "io/fs/file_writer.h" // IWYU pragma: keep
39
#include "load/channel/load_channel_mgr.h"
40
#include "load/memtable/memtable.h"
41
#include "load/memtable/memtable_flush_executor.h"
42
#include "runtime/exec_env.h"
43
#include "runtime/memory/mem_tracker.h"
44
#include "service/backend_options.h"
45
#include "storage/data_dir.h"
46
#include "storage/index/inverted/inverted_index_desc.h"
47
#include "storage/olap_define.h"
48
#include "storage/rowset/beta_rowset.h"
49
#include "storage/rowset/beta_rowset_writer.h"
50
#include "storage/rowset/rowset_factory.h"
51
#include "storage/rowset/rowset_meta.h"
52
#include "storage/rowset/rowset_writer.h"
53
#include "storage/rowset/rowset_writer_context.h"
54
#include "storage/rowset_builder.h"
55
#include "storage/schema.h"
56
#include "storage/schema_change/schema_change.h"
57
#include "storage/segment/segment.h"
58
#include "storage/storage_engine.h"
59
#include "storage/tablet/tablet_manager.h"
60
#include "storage/tablet_info.h"
61
#include "storage/txn/txn_manager.h"
62
#include "util/brpc_client_cache.h"
63
#include "util/brpc_closure.h"
64
#include "util/debug_points.h"
65
#include "util/mem_info.h"
66
#include "util/stopwatch.hpp"
67
#include "util/time.h"
68
69
namespace doris {
70
#include "common/compile_check_begin.h"
71
using namespace ErrorCode;
72
73
bvar::Adder<int64_t> g_load_stream_writer_cnt("load_stream_writer_count");
74
bvar::Adder<int64_t> g_load_stream_file_writer_cnt("load_stream_file_writer_count");
75
76
LoadStreamWriter::LoadStreamWriter(WriteRequest* context, RuntimeProfile* profile)
77
14
        : _req(*context), _rowset_writer(nullptr) {
78
14
    g_load_stream_writer_cnt << 1;
79
    // TODO(plat1ko): CloudStorageEngine
80
14
    _rowset_builder = std::make_unique<RowsetBuilder>(
81
14
            ExecEnv::GetInstance()->storage_engine().to_local(), *context, profile);
82
14
    _resource_ctx = thread_context()->resource_ctx(); // from load stream
83
14
}
84
85
14
LoadStreamWriter::~LoadStreamWriter() {
86
14
    g_load_stream_file_writer_cnt << -_segment_file_writers.size();
87
14
    g_load_stream_file_writer_cnt << -_inverted_file_writers.size();
88
14
    g_load_stream_writer_cnt << -1;
89
14
}
90
91
14
Status LoadStreamWriter::init() {
92
14
    DBUG_EXECUTE_IF("LoadStreamWriter.init.failure",
93
14
                    { return Status::InternalError("fault injection"); });
94
14
    RETURN_IF_ERROR(_rowset_builder->init());
95
13
    _rowset_writer = _rowset_builder->rowset_writer();
96
13
    _is_init = true;
97
13
    return Status::OK();
98
14
}
99
100
Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset, butil::IOBuf buf,
101
26
                                     FileType file_type) {
102
26
    SCOPED_ATTACH_TASK(_resource_ctx);
103
26
    io::FileWriter* file_writer = nullptr;
104
26
    auto& file_writers =
105
26
            file_type == FileType::SEGMENT_FILE ? _segment_file_writers : _inverted_file_writers;
106
26
    {
107
26
        std::lock_guard lock_guard(_lock);
108
26
        DCHECK(_is_init);
109
26
        if (segid >= file_writers.size()) {
110
39
            for (size_t i = file_writers.size(); i <= segid; i++) {
111
24
                Status st;
112
24
                io::FileWriterPtr seg_file_writer;
113
24
                st = _rowset_writer->create_file_writer(static_cast<uint32_t>(i), seg_file_writer,
114
24
                                                        file_type);
115
24
                DBUG_EXECUTE_IF("LoadStreamWriter.append_data.create_file_writer_failed",
116
24
                                { st = Status::InternalError("fault injection"); });
117
24
                if (!st.ok()) {
118
0
                    _is_canceled = true;
119
0
                    return st;
120
0
                }
121
24
                file_writers.push_back(std::move(seg_file_writer));
122
24
                g_load_stream_file_writer_cnt << 1;
123
24
            }
124
15
        }
125
126
        // TODO: IOBuf to Slice
127
26
        file_writer = file_writers[segid].get();
128
26
    }
129
26
    DBUG_EXECUTE_IF("LoadStreamWriter.append_data.null_file_writer", { file_writer = nullptr; });
130
26
    VLOG_DEBUG << " file_writer " << file_writer << "seg id " << segid;
131
26
    if (file_writer == nullptr) {
132
0
        return Status::Corruption("append_data failed, file writer {} is destoryed", segid);
133
0
    }
134
26
    DBUG_EXECUTE_IF("LoadStreamWriter.append_data.wrong_offset", { offset++; });
135
26
    if (file_writer->bytes_appended() != offset) {
136
1
        return Status::Corruption(
137
1
                "append_data out-of-order in segment={}, expected offset={}, actual={}",
138
1
                file_writer->path().native(), offset, file_writer->bytes_appended());
139
1
    }
140
25
    return file_writer->append(buf.to_string());
141
26
}
142
143
21
Status LoadStreamWriter::close_writer(uint32_t segid, FileType file_type) {
144
21
    SCOPED_ATTACH_TASK(_resource_ctx);
145
21
    io::FileWriter* file_writer = nullptr;
146
21
    auto& file_writers =
147
21
            file_type == FileType::SEGMENT_FILE ? _segment_file_writers : _inverted_file_writers;
148
21
    {
149
21
        std::lock_guard lock_guard(_lock);
150
21
        if (!_is_init) {
151
0
            return Status::Corruption("close_writer failed, LoadStreamWriter is not inited");
152
0
        }
153
21
        DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.bad_segid",
154
21
                        { segid = static_cast<uint32_t>(file_writers.size()); });
155
21
        if (segid >= file_writers.size()) {
156
0
            return Status::Corruption(
157
0
                    "close_writer failed, file {} is never opened, file type is {}", segid,
158
0
                    file_type);
159
0
        }
160
21
        file_writer = file_writers[segid].get();
161
21
    }
162
163
21
    DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.null_file_writer", { file_writer = nullptr; });
164
21
    if (file_writer == nullptr) {
165
0
        return Status::Corruption(
166
0
                "close_writer failed, file writer {} is destoryed, fiel type is {}", segid,
167
0
                file_type);
168
0
    }
169
21
    auto st = file_writer->close();
170
21
    if (!st.ok()) {
171
0
        _is_canceled = true;
172
0
        return st;
173
0
    }
174
21
    LOG(INFO) << "file " << segid << " path " << file_writer->path().native() << " closed, written "
175
21
              << file_writer->bytes_appended() << " bytes"
176
21
              << ", file type is " << file_type;
177
    // ‌Allow the index file to be empty when creating an index on a variant-type column.
178
21
    if (file_writer->bytes_appended() == 0 && file_type != FileType::INVERTED_INDEX_FILE) {
179
1
        return Status::Corruption("file {} closed with 0 bytes, file type is {}",
180
1
                                  file_writer->path().native(), file_type);
181
1
    }
182
20
    return Status::OK();
183
21
}
184
185
0
Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& stat) {
186
0
    SCOPED_ATTACH_TASK(_resource_ctx);
187
0
    size_t segment_file_size = 0;
188
0
    size_t inverted_file_size = 0;
189
0
    {
190
0
        std::lock_guard lock_guard(_lock);
191
0
        if (!_is_init) {
192
0
            return Status::Corruption("add_segment failed, LoadStreamWriter is not inited");
193
0
        }
194
0
        DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.bad_segid",
195
0
                        { segid = static_cast<uint32_t>(_segment_file_writers.size()); });
196
0
        RETURN_IF_ERROR(_calc_file_size(segid, FileType::SEGMENT_FILE, &segment_file_size));
197
0
        if (_inverted_file_writers.size() > 0) {
198
0
            RETURN_IF_ERROR(
199
0
                    _calc_file_size(segid, FileType::INVERTED_INDEX_FILE, &inverted_file_size));
200
0
        }
201
0
    }
202
203
0
    DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.size_not_match", { segment_file_size++; });
204
0
    if (segment_file_size != stat.data_size) {
205
0
        return Status::Corruption(
206
0
                "add_segment failed, segment stat {} does not match, file size={}, inverted file "
207
0
                "size={}, stat.data_size={}, tablet id={}",
208
0
                segid, segment_file_size, inverted_file_size, stat.data_size, _req.tablet_id);
209
0
    }
210
211
0
    return _rowset_writer->add_segment(segid, stat);
212
0
}
213
214
0
Status LoadStreamWriter::_calc_file_size(uint32_t segid, FileType file_type, size_t* file_size) {
215
0
    io::FileWriter* file_writer = nullptr;
216
0
    auto& file_writers =
217
0
            (file_type == FileType::SEGMENT_FILE) ? _segment_file_writers : _inverted_file_writers;
218
219
0
    DBUG_EXECUTE_IF("LoadStreamWriter._calc_file_size.unknown_segment",
220
0
                    { segid = static_cast<uint32_t>(file_writers.size()); });
221
0
    if (segid >= file_writers.size()) {
222
0
        return Status::Corruption("calc file size failed, file {} is never opened, file type is {}",
223
0
                                  segid, file_type);
224
0
    }
225
0
    file_writer = file_writers[segid].get();
226
0
    DBUG_EXECUTE_IF("LoadStreamWriter._calc_file_size.null_file_writer",
227
0
                    { file_writer = nullptr; });
228
0
    if (file_writer == nullptr) {
229
0
        return Status::Corruption(
230
0
                "calc file size failed, file writer {} is destoryed, file type is {}", segid,
231
0
                file_type);
232
0
    }
233
0
    DBUG_EXECUTE_IF("LoadStreamWriter._calc_file_size.file_not_closed", {
234
0
        io::FileWriterPtr fwriter;
235
0
        static_cast<void>(_rowset_writer->create_file_writer(
236
0
                static_cast<uint32_t>(file_writers.size()), fwriter, FileType::SEGMENT_FILE));
237
0
        file_writers.push_back(std::move(fwriter));
238
0
        file_writer = file_writers.back().get();
239
0
    });
240
0
    if (file_writer->state() != io::FileWriter::State::CLOSED) {
241
0
        return Status::Corruption("calc file size failed, file {} is not closed",
242
0
                                  file_writer->path().native());
243
0
    }
244
0
    *file_size = file_writer->bytes_appended();
245
0
    return Status::OK();
246
0
}
247
248
9
Status LoadStreamWriter::_pre_close() {
249
9
    SCOPED_ATTACH_TASK(_resource_ctx);
250
9
    if (!_is_init) {
251
        // if this delta writer is not initialized, but close() is called.
252
        // which means this tablet has no data loaded, but at least one tablet
253
        // in same partition has data loaded.
254
        // so we have to also init this LoadStreamWriter, so that it can create an empty rowset
255
        // for this tablet when being closed.
256
0
        RETURN_IF_ERROR(init());
257
0
    }
258
259
9
    DCHECK(_is_init)
260
0
            << "rowset builder is supposed be to initialized before close_wait() being called";
261
262
9
    DBUG_EXECUTE_IF("LoadStreamWriter.close.cancelled", { _is_canceled = true; });
263
9
    if (_is_canceled) {
264
0
        return Status::InternalError("flush segment failed");
265
0
    }
266
9
    DBUG_EXECUTE_IF("LoadStreamWriter.close.inverted_writers_size_not_match", {
267
9
        io::FileWriterPtr file_writer;
268
9
        static_cast<void>(_rowset_writer->create_file_writer(
269
9
                static_cast<uint32_t>(_inverted_file_writers.size()), file_writer,
270
9
                FileType::INVERTED_INDEX_FILE));
271
9
        _inverted_file_writers.push_back(std::move(file_writer));
272
9
    });
273
9
    if (_inverted_file_writers.size() > 0 &&
274
9
        _inverted_file_writers.size() != _segment_file_writers.size()) {
275
0
        return Status::Corruption(
276
0
                "LoadStreamWriter close failed, inverted file writer size is {},"
277
0
                "segment file writer size is {}",
278
0
                _inverted_file_writers.size(), _segment_file_writers.size());
279
0
    }
280
9
    DBUG_EXECUTE_IF("LoadStreamWriter.close.file_not_closed", {
281
9
        io::FileWriterPtr file_writer;
282
9
        static_cast<void>(_rowset_writer->create_file_writer(
283
9
                static_cast<uint32_t>(_segment_file_writers.size()), file_writer,
284
9
                FileType::SEGMENT_FILE));
285
9
        _segment_file_writers.push_back(std::move(file_writer));
286
9
    });
287
20
    for (const auto& writer : _segment_file_writers) {
288
20
        if (writer->state() != io::FileWriter::State::CLOSED) {
289
1
            return Status::Corruption("LoadStreamWriter close failed, segment {} is not closed",
290
1
                                      writer->path().native());
291
1
        }
292
20
    }
293
294
8
    DBUG_EXECUTE_IF("LoadStreamWriter.close.inverted_file_not_closed", {
295
8
        io::FileWriterPtr file_writer;
296
8
        static_cast<void>(_rowset_writer->create_file_writer(
297
8
                static_cast<uint32_t>(_inverted_file_writers.size()), file_writer,
298
8
                FileType::INVERTED_INDEX_FILE));
299
8
        _inverted_file_writers.push_back(std::move(file_writer));
300
8
    });
301
8
    for (const auto& writer : _inverted_file_writers) {
302
0
        if (writer->state() != io::FileWriter::State::CLOSED) {
303
0
            return Status::Corruption(
304
0
                    "LoadStreamWriter close failed, inverted file {} is not closed",
305
0
                    writer->path().native());
306
0
        }
307
0
    }
308
309
8
    RETURN_IF_ERROR(_rowset_builder->build_rowset());
310
8
    RETURN_IF_ERROR(_rowset_builder->submit_calc_delete_bitmap_task());
311
8
    _pre_closed = true;
312
8
    return Status::OK();
313
8
}
314
315
8
Status LoadStreamWriter::close() {
316
8
    std::lock_guard<std::mutex> l(_lock);
317
8
    if (!_pre_closed) {
318
0
        RETURN_IF_ERROR(_pre_close());
319
0
    }
320
8
    RETURN_IF_ERROR(_rowset_builder->wait_calc_delete_bitmap());
321
    // FIXME(plat1ko): No `commit_txn` operation in cloud mode, need better abstractions
322
8
    RETURN_IF_ERROR(static_cast<RowsetBuilder*>(_rowset_builder.get())->commit_txn());
323
324
8
    return Status::OK();
325
8
}
326
327
#include "common/compile_check_end.h"
328
} // namespace doris