be/src/storage/rowset/rowset_writer_context.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <gen_cpp/olap_file.pb.h> |
21 | | #include <glog/logging.h> |
22 | | |
23 | | #include <functional> |
24 | | #include <optional> |
25 | | #include <string_view> |
26 | | #include <unordered_map> |
27 | | |
28 | | #include "cloud/config.h" |
29 | | #include "common/status.h" |
30 | | #include "io/fs/encrypted_fs_factory.h" |
31 | | #include "io/fs/file_system.h" |
32 | | #include "io/fs/file_writer.h" |
33 | | #include "io/fs/packed_file_system.h" |
34 | | #include "runtime/exec_env.h" |
35 | | #include "storage/binlog.h" |
36 | | #include "storage/olap_define.h" |
37 | | #include "storage/partial_update_info.h" |
38 | | #include "storage/segment/historical_row_retriever.h" |
39 | | #include "storage/storage_policy.h" |
40 | | #include "storage/tablet/tablet.h" |
41 | | #include "storage/tablet/tablet_schema.h" |
42 | | |
43 | | namespace doris { |
44 | | |
45 | | class RowsetWriterContextBuilder; |
46 | | using RowsetWriterContextBuilderSharedPtr = std::shared_ptr<RowsetWriterContextBuilder>; |
47 | | class DataDir; |
48 | | class Tablet; |
49 | | class FileWriterCreator; |
50 | | class SegmentCollector; |
51 | | |
52 | | namespace segment_v2 { |
53 | | struct HistoricalRowRetrieverContext; |
54 | | } |
55 | | |
56 | | struct RowsetWriterContext { |
57 | 2.10k | RowsetWriterContext() : schema_lock(new std::mutex) { |
58 | 2.10k | load_id.set_hi(0); |
59 | 2.10k | load_id.set_lo(0); |
60 | 2.10k | } |
61 | | |
62 | | RowsetId rowset_id; |
63 | | int64_t db_id {0}; |
64 | | int64_t table_id {0}; |
65 | | int64_t tablet_id {0}; |
66 | | int32_t tablet_schema_hash {0}; |
67 | | int64_t index_id {0}; |
68 | | int64_t partition_id {0}; |
69 | | RowsetTypePB rowset_type {BETA_ROWSET}; |
70 | | |
71 | | TabletSchemaSPtr tablet_schema; |
72 | | |
73 | | // PREPARED/COMMITTED for pending rowset |
74 | | // VISIBLE for non-pending rowset |
75 | | RowsetStatePB rowset_state {PREPARED}; |
76 | | // properties for non-pending rowset |
77 | | Version version {0, 0}; |
78 | | |
79 | | // properties for pending rowset |
80 | | int64_t txn_id {0}; |
81 | | int64_t txn_expiration {0}; // For cloud mode |
82 | | PUniqueId load_id; |
83 | | TabletUid tablet_uid {0, 0}; |
84 | | // indicate whether the data among segments is overlapping. |
85 | | // default is OVERLAP_UNKNOWN. |
86 | | SegmentsOverlapPB segments_overlap {OVERLAP_UNKNOWN}; |
87 | | // segment file use uint32 to represent row number, therefore the maximum is UINT32_MAX. |
88 | | // the default is set to INT32_MAX to avoid overflow issue when casting from uint32_t to int. |
89 | | // test cases can change this value to control flush timing |
90 | | uint32_t max_rows_per_segment = INT32_MAX; |
91 | | // not owned, point to the data dir of this rowset |
92 | | // for checking disk capacity when write data to disk. |
93 | | // ATTN: not support for RowsetConvertor. |
94 | | // (because it hard to refactor, and RowsetConvertor will be deprecated in future) |
95 | | DataDir* data_dir = nullptr; |
96 | | |
97 | | int64_t newest_write_timestamp = -1; |
98 | | bool enable_unique_key_merge_on_write = false; |
99 | | // store column_unique_id to do index compaction |
100 | | std::set<int32_t> columns_to_do_index_compaction; |
101 | | DataWriteType write_type = DataWriteType::TYPE_DEFAULT; |
102 | | // need to figure out the sub type of compaction |
103 | | ReaderType compaction_type = ReaderType::UNKNOWN; |
104 | | BaseTabletSPtr tablet = nullptr; |
105 | | |
106 | | std::shared_ptr<MowContext> mow_context; |
107 | | std::shared_ptr<FileWriterCreator> file_writer_creator; |
108 | | std::shared_ptr<SegmentCollector> segment_collector; |
109 | | |
110 | | // memtable_on_sink_support_index_v2 = true, we will create SinkFileWriter to send inverted index file |
111 | | bool memtable_on_sink_support_index_v2 = false; |
112 | | |
113 | | /// begin file cache opts |
114 | | bool write_file_cache = false; |
115 | | bool is_hot_data = false; |
116 | | uint64_t file_cache_ttl_sec = 0; |
117 | | uint64_t approximate_bytes_to_write = 0; |
118 | | // If true, compaction output only writes index files to file cache, not data files |
119 | | bool compaction_output_write_index_only = false; |
120 | | /// end file cache opts |
121 | | |
122 | | // segcompaction for this RowsetWriter, only enabled when importing data |
123 | | bool enable_segcompaction = false; |
124 | | |
125 | | std::shared_ptr<PartialUpdateInfo> partial_update_info; |
126 | | |
127 | | bool is_transient_rowset_writer = false; |
128 | | |
129 | | segment_v2::HistoricalRowRetrieverContext make_historical_row_retriever_context(); |
130 | | |
131 | | // Intent flag: caller can actively turn merge-file feature on/off for this rowset. |
132 | | // This describes whether we *want* to try small-file merging. |
133 | | bool allow_packed_file = true; |
134 | | |
135 | | // Effective flag: whether this context actually ends up using MergeFileSystem for writes. |
136 | | // This is decided inside fs() based on enable_merge_file plus other conditions |
137 | | // (cloud mode, S3 filesystem, V1 inverted index, global config, etc.), and once |
138 | | // set to true it remains stable even if config::enable_merge_file changes later. |
139 | | mutable bool packed_file_active = false; |
140 | | |
141 | | // Cached FileSystem instance to ensure consistency across multiple fs() calls. |
142 | | // This prevents creating multiple MergeFileSystem instances and ensures |
143 | | // packed_file_active flag remains consistent. |
144 | | mutable io::FileSystemSPtr _cached_fs = nullptr; |
145 | | |
146 | | // For collect segment statistics for compaction |
147 | | std::vector<RowsetReaderSharedPtr> input_rs_readers; |
148 | | |
149 | | // TODO(lihangyu) remove this lock |
150 | | // In semi-structure senario tablet_schema will be updated concurrently, |
151 | | // this lock need to be held when update.Use shared_ptr to avoid delete copy contructor |
152 | | std::shared_ptr<std::mutex> schema_lock; |
153 | | |
154 | | int64_t compaction_level = 0; |
155 | | |
156 | | // For local rowset |
157 | | std::string tablet_path; |
158 | | |
159 | | // For remote rowset |
160 | | std::optional<StorageResource> storage_resource; |
161 | | |
162 | | std::optional<EncryptionAlgorithmPB> encrypt_algorithm; |
163 | | |
164 | | std::string job_id; |
165 | | |
166 | 4.07k | bool is_local_rowset() const { return !storage_resource; } |
167 | | |
168 | 2.93k | std::string segment_path(int seg_id) const { |
169 | 2.93k | if (is_local_rowset()) { |
170 | 2.93k | return local_segment_path(tablet_path, rowset_id.to_string(), seg_id); |
171 | 2.93k | } else { |
172 | 0 | return storage_resource->remote_segment_path(tablet_id, rowset_id.to_string(), seg_id); |
173 | 0 | } |
174 | 2.93k | } |
175 | | |
176 | 2.98k | io::FileSystemSPtr fs() const { |
177 | | // Return cached instance if available to ensure consistency across multiple calls |
178 | 2.98k | if (_cached_fs != nullptr) { |
179 | 2.31k | return _cached_fs; |
180 | 2.31k | } |
181 | | |
182 | 671 | auto fs = [this]() -> io::FileSystemSPtr { |
183 | 671 | if (is_local_rowset()) { |
184 | 671 | return io::global_local_filesystem(); |
185 | 671 | } else { |
186 | 0 | return storage_resource->fs; |
187 | 0 | } |
188 | 671 | }(); |
189 | | |
190 | 671 | bool is_s3_fs = fs->type() == io::FileSystemType::S3; |
191 | | |
192 | 671 | auto algorithm = encrypt_algorithm; |
193 | | |
194 | 671 | if (!algorithm.has_value()) { |
195 | | #ifndef BE_TEST |
196 | | constexpr std::string_view msg = |
197 | | "RowsetWriterContext::determine_encryption is not called when creating this " |
198 | | "RowsetWriterContext, it will result in encrypted rowsets left unencrypted"; |
199 | | auto st = Status::InternalError(msg); |
200 | | |
201 | | LOG(WARNING) << st; |
202 | | DCHECK(false) << st; |
203 | | #else |
204 | 596 | algorithm = EncryptionAlgorithmPB::PLAINTEXT; |
205 | 596 | #endif |
206 | 596 | } |
207 | | |
208 | | // Apply packed file system first for write path if enabled |
209 | | // Create empty index_map for write path |
210 | | // Index information will be populated after write completes |
211 | 671 | bool has_v1_inverted_index = tablet_schema != nullptr && |
212 | 671 | tablet_schema->has_inverted_index() && |
213 | 671 | tablet_schema->get_inverted_index_storage_format() == |
214 | 118 | InvertedIndexStorageFormatPB::V1; |
215 | | |
216 | 671 | if (has_v1_inverted_index && allow_packed_file && config::enable_packed_file) { |
217 | 0 | static constexpr std::string_view kMsg = |
218 | 0 | "Disable packed file for V1 inverted index tablet to avoid missing index " |
219 | 0 | "metadata (temporary workaround)"; |
220 | 0 | LOG(INFO) << kMsg << ", tablet_id=" << tablet_id << ", rowset_id=" << rowset_id; |
221 | 0 | } |
222 | | |
223 | | // Only enable merge file for S3 file system, not for HDFS or other remote file systems |
224 | 671 | packed_file_active = allow_packed_file && config::is_cloud_mode() && |
225 | 671 | config::enable_packed_file && !has_v1_inverted_index && is_s3_fs; |
226 | | |
227 | 671 | if (packed_file_active) { |
228 | 0 | io::PackedAppendContext append_info; |
229 | 0 | append_info.tablet_id = tablet_id; |
230 | 0 | append_info.rowset_id = rowset_id.to_string(); |
231 | 0 | append_info.txn_id = txn_id; |
232 | 0 | append_info.expiration_time = file_cache_ttl_sec > 0 && newest_write_timestamp > 0 |
233 | 0 | ? newest_write_timestamp + file_cache_ttl_sec |
234 | 0 | : 0; |
235 | 0 | fs = std::make_shared<io::PackedFileSystem>(fs, append_info); |
236 | 0 | } |
237 | | |
238 | | // Then apply encryption on top |
239 | 671 | if (algorithm.has_value()) { |
240 | 671 | fs = io::make_file_system(fs, algorithm.value()); |
241 | 671 | } |
242 | | |
243 | | // Cache the result to ensure consistency across multiple calls |
244 | 671 | _cached_fs = fs; |
245 | 671 | return fs; |
246 | 2.98k | } |
247 | | |
248 | 0 | io::FileSystem& fs_ref() const { return *fs(); } |
249 | | |
250 | 2.93k | io::FileWriterOptions get_file_writer_options(bool is_index_file = false) { |
251 | 2.93k | bool should_write_cache = write_file_cache; |
252 | | // If configured to only write index files to cache, skip cache for data files |
253 | 2.93k | if (compaction_output_write_index_only && !is_index_file) { |
254 | 4 | should_write_cache = false; |
255 | 4 | } |
256 | | |
257 | 2.93k | return io::FileWriterOptions {.write_file_cache = should_write_cache, |
258 | 2.93k | .is_cold_data = is_hot_data, |
259 | 2.93k | .file_cache_expiration_time = file_cache_ttl_sec, |
260 | 2.93k | .approximate_bytes_to_write = approximate_bytes_to_write}; |
261 | 2.93k | } |
262 | | |
263 | | struct BinlogOptions { |
264 | | public: |
265 | 11 | void mark_primary_writer() { binlog_write_type = BinlogWriteType::PrimaryWriter; } |
266 | | |
267 | 12 | void mark_binlog_writer() { binlog_write_type = BinlogWriteType::BinlogWriter; } |
268 | | |
269 | 0 | bool is_primary_writer() const { |
270 | 0 | return binlog_write_type == BinlogWriteType::PrimaryWriter; |
271 | 0 | } |
272 | | |
273 | 3.65k | bool is_binlog_writer() const { return binlog_write_type == BinlogWriteType::BinlogWriter; } |
274 | | |
275 | 3 | bool need_build_binlog() const { return binlog_write_type != BinlogWriteType::Unknown; } |
276 | | |
277 | 9 | void set_need_before(bool need_before) { |
278 | 9 | this->_need_before = need_before; |
279 | 9 | _segment_write_binlog_opt.write_before = need_before; |
280 | 9 | } |
281 | | |
282 | 8 | segment_v2::SegmentWriteBinlogOptions& write_binlog_config() { |
283 | 8 | return _segment_write_binlog_opt; |
284 | 8 | } |
285 | | |
286 | 1 | const segment_v2::SegmentWriteBinlogOptions& write_binlog_config() const { |
287 | 1 | return _segment_write_binlog_opt; |
288 | 1 | } |
289 | | |
290 | | private: |
291 | | // if you don't need to build row_binlog, `PrimaryWriter` and `BinlogWriter` are both false |
292 | | // if you need to build row_binlog, the `is_primary_writer` of normal rowset writer is true |
293 | | enum BinlogWriteType { |
294 | | PrimaryWriter, |
295 | | BinlogWriter, |
296 | | Unknown |
297 | | } binlog_write_type = BinlogWriteType::Unknown; |
298 | | bool _need_before = false; |
299 | | segment_v2::SegmentWriteBinlogOptions _segment_write_binlog_opt; |
300 | | } _write_binlog_opt; |
301 | | |
302 | 2.67k | BinlogOptions& write_binlog_opt() { return _write_binlog_opt; } |
303 | | |
304 | 1.01k | const BinlogOptions& write_binlog_opt() const { return _write_binlog_opt; } |
305 | | }; |
306 | | |
307 | | inline segment_v2::HistoricalRowRetrieverContext |
308 | 0 | RowsetWriterContext::make_historical_row_retriever_context() { |
309 | 0 | return segment_v2::HistoricalRowRetrieverContext { |
310 | 0 | .tablet = tablet, |
311 | 0 | .tablet_schema = tablet_schema, |
312 | 0 | .rowset_writer_ctx = this, |
313 | 0 | .partial_update_info = partial_update_info, |
314 | 0 | .is_transient_rowset_writer = is_transient_rowset_writer, |
315 | 0 | .write_type = write_type}; |
316 | 0 | } |
317 | | |
318 | | } // namespace doris |