Coverage Report

Created: 2026-03-14 18:33

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/channel/load_stream_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/channel/load_stream_writer.h"
19
20
#include <brpc/controller.h>
21
#include <butil/errno.h>
22
#include <fmt/format.h>
23
#include <gen_cpp/internal_service.pb.h>
24
#include <gen_cpp/olap_file.pb.h>
25
26
#include <filesystem>
27
#include <ostream>
28
#include <string>
29
#include <utility>
30
31
#include "bvar/bvar.h"
32
#include "cloud/config.h"
33
#include "common/compiler_util.h" // IWYU pragma: keep
34
#include "common/config.h"
35
#include "common/logging.h"
36
#include "common/status.h"
37
#include "core/block/block.h"
38
#include "io/fs/file_writer.h" // IWYU pragma: keep
39
#include "load/channel/load_channel_mgr.h"
40
#include "load/memtable/memtable.h"
41
#include "load/memtable/memtable_flush_executor.h"
42
#include "runtime/exec_env.h"
43
#include "runtime/memory/mem_tracker.h"
44
#include "service/backend_options.h"
45
#include "storage/data_dir.h"
46
#include "storage/index/inverted/inverted_index_desc.h"
47
#include "storage/olap_define.h"
48
#include "storage/rowset/beta_rowset.h"
49
#include "storage/rowset/beta_rowset_writer.h"
50
#include "storage/rowset/rowset_factory.h"
51
#include "storage/rowset/rowset_meta.h"
52
#include "storage/rowset/rowset_writer.h"
53
#include "storage/rowset/rowset_writer_context.h"
54
#include "storage/rowset_builder.h"
55
#include "storage/schema.h"
56
#include "storage/schema_change/schema_change.h"
57
#include "storage/segment/segment.h"
58
#include "storage/storage_engine.h"
59
#include "storage/tablet/tablet_manager.h"
60
#include "storage/tablet_info.h"
61
#include "storage/txn/txn_manager.h"
62
#include "util/brpc_client_cache.h"
63
#include "util/brpc_closure.h"
64
#include "util/debug_points.h"
65
#include "util/mem_info.h"
66
#include "util/stopwatch.hpp"
67
#include "util/time.h"
68
69
namespace doris {
70
#include "common/compile_check_begin.h"
71
using namespace ErrorCode;
72
73
bvar::Adder<int64_t> g_load_stream_writer_cnt("load_stream_writer_count");
74
bvar::Adder<int64_t> g_load_stream_file_writer_cnt("load_stream_file_writer_count");
75
76
LoadStreamWriter::LoadStreamWriter(WriteRequest* context, RuntimeProfile* profile)
77
168
        : _req(*context), _rowset_writer(nullptr) {
78
168
    g_load_stream_writer_cnt << 1;
79
    // TODO(plat1ko): CloudStorageEngine
80
168
    _rowset_builder = std::make_unique<RowsetBuilder>(
81
168
            ExecEnv::GetInstance()->storage_engine().to_local(), *context, profile);
82
168
    _resource_ctx = thread_context()->resource_ctx(); // from load stream
83
168
}
84
85
168
LoadStreamWriter::~LoadStreamWriter() {
86
168
    g_load_stream_file_writer_cnt << -_segment_file_writers.size();
87
168
    g_load_stream_file_writer_cnt << -_inverted_file_writers.size();
88
168
    g_load_stream_writer_cnt << -1;
89
168
}
90
91
168
Status LoadStreamWriter::init() {
92
168
    DBUG_EXECUTE_IF("LoadStreamWriter.init.failure",
93
168
                    { return Status::InternalError("fault injection"); });
94
168
    RETURN_IF_ERROR(_rowset_builder->init());
95
167
    _rowset_writer = _rowset_builder->rowset_writer();
96
167
    _is_init = true;
97
167
    return Status::OK();
98
168
}
99
100
Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset, butil::IOBuf buf,
101
4.70k
                                     FileType file_type) {
102
4.70k
    SCOPED_ATTACH_TASK(_resource_ctx);
103
4.70k
    io::FileWriter* file_writer = nullptr;
104
4.70k
    auto& file_writers =
105
4.70k
            file_type == FileType::SEGMENT_FILE ? _segment_file_writers : _inverted_file_writers;
106
4.70k
    {
107
4.70k
        std::lock_guard lock_guard(_lock);
108
4.70k
        DCHECK(_is_init);
109
4.70k
        if (segid >= file_writers.size()) {
110
182
            for (size_t i = file_writers.size(); i <= segid; i++) {
111
96
                Status st;
112
96
                io::FileWriterPtr seg_file_writer;
113
96
                st = _rowset_writer->create_file_writer(static_cast<uint32_t>(i), seg_file_writer,
114
96
                                                        file_type);
115
96
                DBUG_EXECUTE_IF("LoadStreamWriter.append_data.create_file_writer_failed",
116
96
                                { st = Status::InternalError("fault injection"); });
117
96
                if (!st.ok()) {
118
0
                    _is_canceled = true;
119
0
                    return st;
120
0
                }
121
96
                file_writers.push_back(std::move(seg_file_writer));
122
96
                g_load_stream_file_writer_cnt << 1;
123
96
            }
124
86
        }
125
126
        // TODO: IOBuf to Slice
127
4.70k
        file_writer = file_writers[segid].get();
128
4.70k
    }
129
4.70k
    DBUG_EXECUTE_IF("LoadStreamWriter.append_data.null_file_writer", { file_writer = nullptr; });
130
4.70k
    VLOG_DEBUG << " file_writer " << file_writer << "seg id " << segid;
131
4.70k
    if (file_writer == nullptr) {
132
0
        return Status::Corruption("append_data failed, file writer {} is destoryed", segid);
133
0
    }
134
4.70k
    DBUG_EXECUTE_IF("LoadStreamWriter.append_data.wrong_offset", { offset++; });
135
4.70k
    if (file_writer->bytes_appended() != offset) {
136
1
        return Status::Corruption(
137
1
                "append_data out-of-order in segment={}, expected offset={}, actual={}",
138
1
                file_writer->path().native(), offset, file_writer->bytes_appended());
139
1
    }
140
4.70k
    return file_writer->append(buf.to_string());
141
4.70k
}
142
143
93
Status LoadStreamWriter::close_writer(uint32_t segid, FileType file_type) {
144
93
    SCOPED_ATTACH_TASK(_resource_ctx);
145
93
    io::FileWriter* file_writer = nullptr;
146
93
    auto& file_writers =
147
93
            file_type == FileType::SEGMENT_FILE ? _segment_file_writers : _inverted_file_writers;
148
93
    {
149
93
        std::lock_guard lock_guard(_lock);
150
93
        if (!_is_init) {
151
0
            return Status::Corruption("close_writer failed, LoadStreamWriter is not inited");
152
0
        }
153
93
        DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.bad_segid",
154
93
                        { segid = static_cast<uint32_t>(file_writers.size()); });
155
93
        if (segid >= file_writers.size()) {
156
0
            return Status::Corruption(
157
0
                    "close_writer failed, file {} is never opened, file type is {}", segid,
158
0
                    file_type);
159
0
        }
160
93
        file_writer = file_writers[segid].get();
161
93
    }
162
163
93
    DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.null_file_writer", { file_writer = nullptr; });
164
93
    if (file_writer == nullptr) {
165
0
        return Status::Corruption(
166
0
                "close_writer failed, file writer {} is destoryed, fiel type is {}", segid,
167
0
                file_type);
168
0
    }
169
93
    auto st = file_writer->close();
170
93
    if (!st.ok()) {
171
0
        _is_canceled = true;
172
0
        return st;
173
0
    }
174
93
    LOG(INFO) << "file " << segid << " path " << file_writer->path().native() << " closed, written "
175
93
              << file_writer->bytes_appended() << " bytes"
176
93
              << ", file type is " << file_type;
177
    // ‌Allow the index file to be empty when creating an index on a variant-type column.
178
93
    if (file_writer->bytes_appended() == 0 && file_type != FileType::INVERTED_INDEX_FILE) {
179
1
        return Status::Corruption("file {} closed with 0 bytes, file type is {}",
180
1
                                  file_writer->path().native(), file_type);
181
1
    }
182
92
    return Status::OK();
183
93
}
184
185
72
Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& stat) {
186
72
    SCOPED_ATTACH_TASK(_resource_ctx);
187
72
    size_t segment_file_size = 0;
188
72
    size_t inverted_file_size = 0;
189
72
    {
190
72
        std::lock_guard lock_guard(_lock);
191
72
        if (!_is_init) {
192
0
            return Status::Corruption("add_segment failed, LoadStreamWriter is not inited");
193
0
        }
194
72
        DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.bad_segid",
195
72
                        { segid = static_cast<uint32_t>(_segment_file_writers.size()); });
196
72
        RETURN_IF_ERROR(_calc_file_size(segid, FileType::SEGMENT_FILE, &segment_file_size));
197
72
        if (_inverted_file_writers.size() > 0) {
198
0
            RETURN_IF_ERROR(
199
0
                    _calc_file_size(segid, FileType::INVERTED_INDEX_FILE, &inverted_file_size));
200
0
        }
201
72
    }
202
203
72
    DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.size_not_match", { segment_file_size++; });
204
72
    if (segment_file_size != stat.data_size) {
205
0
        return Status::Corruption(
206
0
                "add_segment failed, segment stat {} does not match, file size={}, inverted file "
207
0
                "size={}, stat.data_size={}, tablet id={}",
208
0
                segid, segment_file_size, inverted_file_size, stat.data_size, _req.tablet_id);
209
0
    }
210
211
72
    return _rowset_writer->add_segment(segid, stat);
212
72
}
213
214
72
Status LoadStreamWriter::_calc_file_size(uint32_t segid, FileType file_type, size_t* file_size) {
215
72
    io::FileWriter* file_writer = nullptr;
216
72
    auto& file_writers =
217
72
            (file_type == FileType::SEGMENT_FILE) ? _segment_file_writers : _inverted_file_writers;
218
219
72
    DBUG_EXECUTE_IF("LoadStreamWriter._calc_file_size.unknown_segment",
220
72
                    { segid = static_cast<uint32_t>(file_writers.size()); });
221
72
    if (segid >= file_writers.size()) {
222
0
        return Status::Corruption("calc file size failed, file {} is never opened, file type is {}",
223
0
                                  segid, file_type);
224
0
    }
225
72
    file_writer = file_writers[segid].get();
226
72
    DBUG_EXECUTE_IF("LoadStreamWriter._calc_file_size.null_file_writer",
227
72
                    { file_writer = nullptr; });
228
72
    if (file_writer == nullptr) {
229
0
        return Status::Corruption(
230
0
                "calc file size failed, file writer {} is destoryed, file type is {}", segid,
231
0
                file_type);
232
0
    }
233
72
    DBUG_EXECUTE_IF("LoadStreamWriter._calc_file_size.file_not_closed", {
234
72
        io::FileWriterPtr fwriter;
235
72
        static_cast<void>(_rowset_writer->create_file_writer(
236
72
                static_cast<uint32_t>(file_writers.size()), fwriter, FileType::SEGMENT_FILE));
237
72
        file_writers.push_back(std::move(fwriter));
238
72
        file_writer = file_writers.back().get();
239
72
    });
240
72
    if (file_writer->state() != io::FileWriter::State::CLOSED) {
241
0
        return Status::Corruption("calc file size failed, file {} is not closed",
242
0
                                  file_writer->path().native());
243
0
    }
244
72
    *file_size = file_writer->bytes_appended();
245
72
    return Status::OK();
246
72
}
247
248
163
Status LoadStreamWriter::_pre_close() {
249
163
    SCOPED_ATTACH_TASK(_resource_ctx);
250
163
    if (!_is_init) {
251
        // if this delta writer is not initialized, but close() is called.
252
        // which means this tablet has no data loaded, but at least one tablet
253
        // in same partition has data loaded.
254
        // so we have to also init this LoadStreamWriter, so that it can create an empty rowset
255
        // for this tablet when being closed.
256
0
        RETURN_IF_ERROR(init());
257
0
    }
258
259
163
    DCHECK(_is_init)
260
0
            << "rowset builder is supposed be to initialized before close_wait() being called";
261
262
163
    DBUG_EXECUTE_IF("LoadStreamWriter.close.cancelled", { _is_canceled = true; });
263
163
    if (_is_canceled) {
264
0
        return Status::InternalError("flush segment failed");
265
0
    }
266
163
    DBUG_EXECUTE_IF("LoadStreamWriter.close.inverted_writers_size_not_match", {
267
163
        io::FileWriterPtr file_writer;
268
163
        static_cast<void>(_rowset_writer->create_file_writer(
269
163
                static_cast<uint32_t>(_inverted_file_writers.size()), file_writer,
270
163
                FileType::INVERTED_INDEX_FILE));
271
163
        _inverted_file_writers.push_back(std::move(file_writer));
272
163
    });
273
163
    if (_inverted_file_writers.size() > 0 &&
274
163
        _inverted_file_writers.size() != _segment_file_writers.size()) {
275
0
        return Status::Corruption(
276
0
                "LoadStreamWriter close failed, inverted file writer size is {},"
277
0
                "segment file writer size is {}",
278
0
                _inverted_file_writers.size(), _segment_file_writers.size());
279
0
    }
280
163
    DBUG_EXECUTE_IF("LoadStreamWriter.close.file_not_closed", {
281
163
        io::FileWriterPtr file_writer;
282
163
        static_cast<void>(_rowset_writer->create_file_writer(
283
163
                static_cast<uint32_t>(_segment_file_writers.size()), file_writer,
284
163
                FileType::SEGMENT_FILE));
285
163
        _segment_file_writers.push_back(std::move(file_writer));
286
163
    });
287
163
    for (const auto& writer : _segment_file_writers) {
288
92
        if (writer->state() != io::FileWriter::State::CLOSED) {
289
1
            return Status::Corruption("LoadStreamWriter close failed, segment {} is not closed",
290
1
                                      writer->path().native());
291
1
        }
292
92
    }
293
294
162
    DBUG_EXECUTE_IF("LoadStreamWriter.close.inverted_file_not_closed", {
295
162
        io::FileWriterPtr file_writer;
296
162
        static_cast<void>(_rowset_writer->create_file_writer(
297
162
                static_cast<uint32_t>(_inverted_file_writers.size()), file_writer,
298
162
                FileType::INVERTED_INDEX_FILE));
299
162
        _inverted_file_writers.push_back(std::move(file_writer));
300
162
    });
301
162
    for (const auto& writer : _inverted_file_writers) {
302
0
        if (writer->state() != io::FileWriter::State::CLOSED) {
303
0
            return Status::Corruption(
304
0
                    "LoadStreamWriter close failed, inverted file {} is not closed",
305
0
                    writer->path().native());
306
0
        }
307
0
    }
308
309
162
    RETURN_IF_ERROR(_rowset_builder->build_rowset());
310
162
    RETURN_IF_ERROR(_rowset_builder->submit_calc_delete_bitmap_task());
311
162
    _pre_closed = true;
312
162
    return Status::OK();
313
162
}
314
315
162
Status LoadStreamWriter::close() {
316
162
    std::lock_guard<std::mutex> l(_lock);
317
162
    if (!_pre_closed) {
318
0
        RETURN_IF_ERROR(_pre_close());
319
0
    }
320
162
    RETURN_IF_ERROR(_rowset_builder->wait_calc_delete_bitmap());
321
    // FIXME(plat1ko): No `commit_txn` operation in cloud mode, need better abstractions
322
162
    RETURN_IF_ERROR(static_cast<RowsetBuilder*>(_rowset_builder.get())->commit_txn());
323
324
162
    return Status::OK();
325
162
}
326
327
#include "common/compile_check_end.h"
328
} // namespace doris