Coverage Report

Created: 2026-03-16 16:04

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/stream_sink_file_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/stream_sink_file_writer.h"
19
20
#include <gen_cpp/internal_service.pb.h>
21
22
#include "exec/sink/load_stream_stub.h"
23
#include "storage/olap_common.h"
24
#include "storage/rowset/beta_rowset_writer.h"
25
#include "util/debug_points.h"
26
#include "util/uid_util.h"
27
28
namespace doris::io {
29
30
void StreamSinkFileWriter::init(PUniqueId load_id, int64_t partition_id, int64_t index_id,
31
1
                                int64_t tablet_id, int32_t segment_id, FileType file_type) {
32
1
    VLOG_DEBUG << "init stream writer, load id(" << UniqueId(load_id).to_string()
33
0
               << "), partition id(" << partition_id << "), index id(" << index_id
34
0
               << "), tablet_id(" << tablet_id << "), segment_id(" << segment_id << ")"
35
0
               << ", file_type(" << file_type << ")";
36
1
    _load_id = load_id;
37
1
    _partition_id = partition_id;
38
1
    _index_id = index_id;
39
1
    _tablet_id = tablet_id;
40
1
    _segment_id = segment_id;
41
1
    _file_type = file_type;
42
1
}
43
44
1
Status StreamSinkFileWriter::appendv(const Slice* data, size_t data_cnt) {
45
1
    size_t bytes_req = 0;
46
3
    for (int i = 0; i < data_cnt; i++) {
47
2
        bytes_req += data[i].get_size();
48
2
    }
49
50
1
    VLOG_DEBUG << "writer appendv, load_id: " << print_id(_load_id) << ", index_id: " << _index_id
51
0
               << ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id
52
0
               << ", data_length: " << bytes_req << "file_type" << _file_type;
53
54
1
    std::span<const Slice> slices {data, data_cnt};
55
1
    size_t fault_injection_skipped_streams = 0;
56
1
    bool ok = false;
57
1
    Status st;
58
3
    for (auto& stream : _streams) {
59
3
        DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", {
60
3
            if (fault_injection_skipped_streams < 1) {
61
3
                fault_injection_skipped_streams++;
62
3
                continue;
63
3
            }
64
3
        });
65
3
        DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", {
66
3
            if (fault_injection_skipped_streams < 2) {
67
3
                fault_injection_skipped_streams++;
68
3
                continue;
69
3
            }
70
3
        });
71
3
        DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
72
3
                        { continue; });
73
3
        st = stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, _bytes_appended,
74
3
                                 slices, false, _file_type);
75
3
        ok = ok || st.ok();
76
3
        if (!st.ok()) {
77
0
            LOG(WARNING) << "failed to send segment data to backend " << stream->dst_id()
78
0
                         << ", load_id: " << print_id(_load_id) << ", index_id: " << _index_id
79
0
                         << ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id
80
0
                         << ", data_length: " << bytes_req << ", reason: " << st;
81
0
        }
82
3
    }
83
1
    if (!ok) {
84
0
        std::stringstream ss;
85
0
        for (auto& stream : _streams) {
86
0
            ss << " " << stream->dst_id();
87
0
        }
88
0
        LOG(WARNING) << "failed to send segment data to any replicas, load_id: "
89
0
                     << print_id(_load_id) << ", index_id: " << _index_id
90
0
                     << ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id
91
0
                     << ", data_length: " << bytes_req << ", backends:" << ss.str();
92
0
        return Status::InternalError(
93
0
                "failed to send segment data to any replicas, tablet_id={}, segment_id={}",
94
0
                _tablet_id, _segment_id);
95
0
    }
96
1
    _bytes_appended += bytes_req;
97
1
    return Status::OK();
98
1
}
99
100
3
Status StreamSinkFileWriter::close(bool non_block) {
101
3
    if (_state == State::CLOSED) {
102
0
        return Status::InternalError("StreamSinkFileWriter already closed, load id {}",
103
0
                                     print_id(_load_id));
104
0
    }
105
3
    if (_state == State::ASYNC_CLOSING) {
106
1
        if (non_block) {
107
0
            return Status::InternalError("Don't submit async close multi times");
108
0
        }
109
        // Actucally the first time call to close(true) would return the value of _finalize, if it returned one
110
        // error status then the code would never call the second close(true)
111
1
        _state = State::CLOSED;
112
1
        return Status::OK();
113
1
    }
114
2
    if (non_block) {
115
1
        _state = State::ASYNC_CLOSING;
116
1
    } else {
117
1
        _state = State::CLOSED;
118
1
    }
119
2
    return _finalize();
120
3
}
121
122
2
Status StreamSinkFileWriter::_finalize() {
123
2
    VLOG_DEBUG << "writer finalize, load_id: " << print_id(_load_id) << ", index_id: " << _index_id
124
0
               << ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id;
125
    // TODO(zhengyu): update get_inverted_index_file_size into stat
126
2
    size_t fault_injection_skipped_streams = 0;
127
2
    bool ok = false;
128
6
    for (auto& stream : _streams) {
129
6
        DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", {
130
6
            if (fault_injection_skipped_streams < 1) {
131
6
                fault_injection_skipped_streams++;
132
6
                continue;
133
6
            }
134
6
        });
135
6
        DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", {
136
6
            if (fault_injection_skipped_streams < 2) {
137
6
                fault_injection_skipped_streams++;
138
6
                continue;
139
6
            }
140
6
        });
141
6
        DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
142
6
                        { continue; });
143
6
        auto st = stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id,
144
6
                                      _bytes_appended, {}, true, _file_type);
145
6
        ok = ok || st.ok();
146
6
        if (!st.ok()) {
147
0
            LOG(WARNING) << "failed to send segment eos to backend " << stream->dst_id()
148
0
                         << ", load_id: " << print_id(_load_id) << ", index_id: " << _index_id
149
0
                         << ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id
150
0
                         << ", reason: " << st;
151
0
        }
152
6
    }
153
2
    DBUG_EXECUTE_IF("StreamSinkFileWriter.finalize.finalize_failed", { ok = false; });
154
2
    if (!ok) {
155
0
        std::stringstream ss;
156
0
        for (auto& stream : _streams) {
157
0
            ss << " " << stream->dst_id();
158
0
        }
159
0
        LOG(WARNING) << "failed to send segment eos to any replicas, load_id: "
160
0
                     << print_id(_load_id) << ", index_id: " << _index_id
161
0
                     << ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id
162
0
                     << ", backends:" << ss.str();
163
0
        return Status::InternalError(
164
0
                "failed to send segment eos to any replicas, tablet_id={}, segment_id={}",
165
0
                _tablet_id, _segment_id);
166
0
    }
167
2
    return Status::OK();
168
2
}
169
170
} // namespace doris::io