be/src/io/fs/stream_sink_file_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/fs/stream_sink_file_writer.h" |
19 | | |
20 | | #include <gen_cpp/internal_service.pb.h> |
21 | | |
22 | | #include "exec/sink/load_stream_stub.h" |
23 | | #include "storage/olap_common.h" |
24 | | #include "storage/rowset/beta_rowset_writer.h" |
25 | | #include "util/debug_points.h" |
26 | | #include "util/uid_util.h" |
27 | | |
28 | | namespace doris::io { |
29 | | |
30 | | void StreamSinkFileWriter::init(PUniqueId load_id, int64_t partition_id, int64_t index_id, |
31 | 1 | int64_t tablet_id, int32_t segment_id, FileType file_type) { |
32 | 1 | VLOG_DEBUG << "init stream writer, load id(" << UniqueId(load_id).to_string() |
33 | 0 | << "), partition id(" << partition_id << "), index id(" << index_id |
34 | 0 | << "), tablet_id(" << tablet_id << "), segment_id(" << segment_id << ")" |
35 | 0 | << ", file_type(" << file_type << ")"; |
36 | 1 | _load_id = load_id; |
37 | 1 | _partition_id = partition_id; |
38 | 1 | _index_id = index_id; |
39 | 1 | _tablet_id = tablet_id; |
40 | 1 | _segment_id = segment_id; |
41 | 1 | _file_type = file_type; |
42 | 1 | } |
43 | | |
44 | 1 | Status StreamSinkFileWriter::appendv(const Slice* data, size_t data_cnt) { |
45 | 1 | size_t bytes_req = 0; |
46 | 3 | for (int i = 0; i < data_cnt; i++) { |
47 | 2 | bytes_req += data[i].get_size(); |
48 | 2 | } |
49 | | |
50 | 1 | VLOG_DEBUG << "writer appendv, load_id: " << print_id(_load_id) << ", index_id: " << _index_id |
51 | 0 | << ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id |
52 | 0 | << ", data_length: " << bytes_req << "file_type" << _file_type; |
53 | | |
54 | 1 | std::span<const Slice> slices {data, data_cnt}; |
55 | 1 | size_t fault_injection_skipped_streams = 0; |
56 | 1 | bool ok = false; |
57 | 1 | Status st; |
58 | 3 | for (auto& stream : _streams) { |
59 | 3 | DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", { |
60 | 3 | if (fault_injection_skipped_streams < 1) { |
61 | 3 | fault_injection_skipped_streams++; |
62 | 3 | continue; |
63 | 3 | } |
64 | 3 | }); |
65 | 3 | DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", { |
66 | 3 | if (fault_injection_skipped_streams < 2) { |
67 | 3 | fault_injection_skipped_streams++; |
68 | 3 | continue; |
69 | 3 | } |
70 | 3 | }); |
71 | 3 | DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", |
72 | 3 | { continue; }); |
73 | 3 | st = stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, _bytes_appended, |
74 | 3 | slices, false, _file_type); |
75 | 3 | ok = ok || st.ok(); |
76 | 3 | if (!st.ok()) { |
77 | 0 | LOG(WARNING) << "failed to send segment data to backend " << stream->dst_id() |
78 | 0 | << ", load_id: " << print_id(_load_id) << ", index_id: " << _index_id |
79 | 0 | << ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id |
80 | 0 | << ", data_length: " << bytes_req << ", reason: " << st; |
81 | 0 | } |
82 | 3 | } |
83 | 1 | if (!ok) { |
84 | 0 | std::stringstream ss; |
85 | 0 | for (auto& stream : _streams) { |
86 | 0 | ss << " " << stream->dst_id(); |
87 | 0 | } |
88 | 0 | LOG(WARNING) << "failed to send segment data to any replicas, load_id: " |
89 | 0 | << print_id(_load_id) << ", index_id: " << _index_id |
90 | 0 | << ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id |
91 | 0 | << ", data_length: " << bytes_req << ", backends:" << ss.str(); |
92 | 0 | return Status::InternalError( |
93 | 0 | "failed to send segment data to any replicas, tablet_id={}, segment_id={}", |
94 | 0 | _tablet_id, _segment_id); |
95 | 0 | } |
96 | 1 | _bytes_appended += bytes_req; |
97 | 1 | return Status::OK(); |
98 | 1 | } |
99 | | |
100 | 3 | Status StreamSinkFileWriter::close(bool non_block) { |
101 | 3 | if (_state == State::CLOSED) { |
102 | 0 | return Status::InternalError("StreamSinkFileWriter already closed, load id {}", |
103 | 0 | print_id(_load_id)); |
104 | 0 | } |
105 | 3 | if (_state == State::ASYNC_CLOSING) { |
106 | 1 | if (non_block) { |
107 | 0 | return Status::InternalError("Don't submit async close multi times"); |
108 | 0 | } |
109 | | // Actucally the first time call to close(true) would return the value of _finalize, if it returned one |
110 | | // error status then the code would never call the second close(true) |
111 | 1 | _state = State::CLOSED; |
112 | 1 | return Status::OK(); |
113 | 1 | } |
114 | 2 | if (non_block) { |
115 | 1 | _state = State::ASYNC_CLOSING; |
116 | 1 | } else { |
117 | 1 | _state = State::CLOSED; |
118 | 1 | } |
119 | 2 | return _finalize(); |
120 | 3 | } |
121 | | |
122 | 2 | Status StreamSinkFileWriter::_finalize() { |
123 | 2 | VLOG_DEBUG << "writer finalize, load_id: " << print_id(_load_id) << ", index_id: " << _index_id |
124 | 0 | << ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id; |
125 | | // TODO(zhengyu): update get_inverted_index_file_size into stat |
126 | 2 | size_t fault_injection_skipped_streams = 0; |
127 | 2 | bool ok = false; |
128 | 6 | for (auto& stream : _streams) { |
129 | 6 | DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica", { |
130 | 6 | if (fault_injection_skipped_streams < 1) { |
131 | 6 | fault_injection_skipped_streams++; |
132 | 6 | continue; |
133 | 6 | } |
134 | 6 | }); |
135 | 6 | DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica", { |
136 | 6 | if (fault_injection_skipped_streams < 2) { |
137 | 6 | fault_injection_skipped_streams++; |
138 | 6 | continue; |
139 | 6 | } |
140 | 6 | }); |
141 | 6 | DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", |
142 | 6 | { continue; }); |
143 | 6 | auto st = stream->append_data(_partition_id, _index_id, _tablet_id, _segment_id, |
144 | 6 | _bytes_appended, {}, true, _file_type); |
145 | 6 | ok = ok || st.ok(); |
146 | 6 | if (!st.ok()) { |
147 | 0 | LOG(WARNING) << "failed to send segment eos to backend " << stream->dst_id() |
148 | 0 | << ", load_id: " << print_id(_load_id) << ", index_id: " << _index_id |
149 | 0 | << ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id |
150 | 0 | << ", reason: " << st; |
151 | 0 | } |
152 | 6 | } |
153 | 2 | DBUG_EXECUTE_IF("StreamSinkFileWriter.finalize.finalize_failed", { ok = false; }); |
154 | 2 | if (!ok) { |
155 | 0 | std::stringstream ss; |
156 | 0 | for (auto& stream : _streams) { |
157 | 0 | ss << " " << stream->dst_id(); |
158 | 0 | } |
159 | 0 | LOG(WARNING) << "failed to send segment eos to any replicas, load_id: " |
160 | 0 | << print_id(_load_id) << ", index_id: " << _index_id |
161 | 0 | << ", tablet_id: " << _tablet_id << ", segment_id: " << _segment_id |
162 | 0 | << ", backends:" << ss.str(); |
163 | 0 | return Status::InternalError( |
164 | 0 | "failed to send segment eos to any replicas, tablet_id={}, segment_id={}", |
165 | 0 | _tablet_id, _segment_id); |
166 | 0 | } |
167 | 2 | return Status::OK(); |
168 | 2 | } |
169 | | |
170 | | } // namespace doris::io |