be/src/storage/rowset/rowset_writer_context.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <gen_cpp/olap_file.pb.h> |
21 | | #include <glog/logging.h> |
22 | | |
23 | | #include <functional> |
24 | | #include <optional> |
25 | | #include <string_view> |
26 | | #include <unordered_map> |
27 | | |
28 | | #include "cloud/config.h" |
29 | | #include "common/status.h" |
30 | | #include "io/fs/encrypted_fs_factory.h" |
31 | | #include "io/fs/file_system.h" |
32 | | #include "io/fs/file_writer.h" |
33 | | #include "io/fs/packed_file_system.h" |
34 | | #include "runtime/exec_env.h" |
35 | | #include "storage/olap_define.h" |
36 | | #include "storage/partial_update_info.h" |
37 | | #include "storage/storage_policy.h" |
38 | | #include "storage/tablet/tablet.h" |
39 | | #include "storage/tablet/tablet_schema.h" |
40 | | |
41 | | namespace doris { |
42 | | |
43 | | class RowsetWriterContextBuilder; |
44 | | using RowsetWriterContextBuilderSharedPtr = std::shared_ptr<RowsetWriterContextBuilder>; |
45 | | class DataDir; |
46 | | class Tablet; |
47 | | class FileWriterCreator; |
48 | | class SegmentCollector; |
49 | | |
50 | | struct RowsetWriterContext { |
51 | 2.05k | RowsetWriterContext() : schema_lock(new std::mutex) { |
52 | 2.05k | load_id.set_hi(0); |
53 | 2.05k | load_id.set_lo(0); |
54 | 2.05k | } |
55 | | |
56 | | RowsetId rowset_id; |
57 | | int64_t tablet_id {0}; |
58 | | int32_t tablet_schema_hash {0}; |
59 | | int64_t index_id {0}; |
60 | | int64_t partition_id {0}; |
61 | | RowsetTypePB rowset_type {BETA_ROWSET}; |
62 | | |
63 | | TabletSchemaSPtr tablet_schema; |
64 | | |
65 | | // PREPARED/COMMITTED for pending rowset |
66 | | // VISIBLE for non-pending rowset |
67 | | RowsetStatePB rowset_state {PREPARED}; |
68 | | // properties for non-pending rowset |
69 | | Version version {0, 0}; |
70 | | |
71 | | // properties for pending rowset |
72 | | int64_t txn_id {0}; |
73 | | int64_t txn_expiration {0}; // For cloud mode |
74 | | PUniqueId load_id; |
75 | | TabletUid tablet_uid {0, 0}; |
76 | | // indicate whether the data among segments is overlapping. |
77 | | // default is OVERLAP_UNKNOWN. |
78 | | SegmentsOverlapPB segments_overlap {OVERLAP_UNKNOWN}; |
79 | | // segment file use uint32 to represent row number, therefore the maximum is UINT32_MAX. |
80 | | // the default is set to INT32_MAX to avoid overflow issue when casting from uint32_t to int. |
81 | | // test cases can change this value to control flush timing |
82 | | uint32_t max_rows_per_segment = INT32_MAX; |
83 | | // not owned, point to the data dir of this rowset |
84 | | // for checking disk capacity when write data to disk. |
85 | | // ATTN: not support for RowsetConvertor. |
86 | | // (because it hard to refactor, and RowsetConvertor will be deprecated in future) |
87 | | DataDir* data_dir = nullptr; |
88 | | |
89 | | int64_t newest_write_timestamp = -1; |
90 | | bool enable_unique_key_merge_on_write = false; |
91 | | // store column_unique_id to do index compaction |
92 | | std::set<int32_t> columns_to_do_index_compaction; |
93 | | DataWriteType write_type = DataWriteType::TYPE_DEFAULT; |
94 | | // need to figure out the sub type of compaction |
95 | | ReaderType compaction_type = ReaderType::UNKNOWN; |
96 | | BaseTabletSPtr tablet = nullptr; |
97 | | |
98 | | std::shared_ptr<MowContext> mow_context; |
99 | | std::shared_ptr<FileWriterCreator> file_writer_creator; |
100 | | std::shared_ptr<SegmentCollector> segment_collector; |
101 | | |
102 | | // memtable_on_sink_support_index_v2 = true, we will create SinkFileWriter to send inverted index file |
103 | | bool memtable_on_sink_support_index_v2 = false; |
104 | | |
105 | | /// begin file cache opts |
106 | | bool write_file_cache = false; |
107 | | bool is_hot_data = false; |
108 | | uint64_t file_cache_ttl_sec = 0; |
109 | | uint64_t approximate_bytes_to_write = 0; |
110 | | // If true, compaction output only writes index files to file cache, not data files |
111 | | bool compaction_output_write_index_only = false; |
112 | | /// end file cache opts |
113 | | |
114 | | // segcompaction for this RowsetWriter, only enabled when importing data |
115 | | bool enable_segcompaction = false; |
116 | | |
117 | | std::shared_ptr<PartialUpdateInfo> partial_update_info; |
118 | | |
119 | | bool is_transient_rowset_writer = false; |
120 | | |
121 | | // Intent flag: caller can actively turn merge-file feature on/off for this rowset. |
122 | | // This describes whether we *want* to try small-file merging. |
123 | | bool allow_packed_file = true; |
124 | | |
125 | | // Effective flag: whether this context actually ends up using MergeFileSystem for writes. |
126 | | // This is decided inside fs() based on enable_merge_file plus other conditions |
127 | | // (cloud mode, S3 filesystem, V1 inverted index, global config, etc.), and once |
128 | | // set to true it remains stable even if config::enable_merge_file changes later. |
129 | | mutable bool packed_file_active = false; |
130 | | |
131 | | // Cached FileSystem instance to ensure consistency across multiple fs() calls. |
132 | | // This prevents creating multiple MergeFileSystem instances and ensures |
133 | | // packed_file_active flag remains consistent. |
134 | | mutable io::FileSystemSPtr _cached_fs = nullptr; |
135 | | |
136 | | // For collect segment statistics for compaction |
137 | | std::vector<RowsetReaderSharedPtr> input_rs_readers; |
138 | | |
139 | | // TODO(lihangyu) remove this lock |
140 | | // In semi-structure senario tablet_schema will be updated concurrently, |
141 | | // this lock need to be held when update.Use shared_ptr to avoid delete copy contructor |
142 | | std::shared_ptr<std::mutex> schema_lock; |
143 | | |
144 | | int64_t compaction_level = 0; |
145 | | |
146 | | // For local rowset |
147 | | std::string tablet_path; |
148 | | |
149 | | // For remote rowset |
150 | | std::optional<StorageResource> storage_resource; |
151 | | |
152 | | std::optional<EncryptionAlgorithmPB> encrypt_algorithm; |
153 | | |
154 | | std::string job_id; |
155 | | |
156 | 4.05k | bool is_local_rowset() const { return !storage_resource; } |
157 | | |
158 | 2.93k | std::string segment_path(int seg_id) const { |
159 | 2.93k | if (is_local_rowset()) { |
160 | 2.93k | return local_segment_path(tablet_path, rowset_id.to_string(), seg_id); |
161 | 2.93k | } else { |
162 | 0 | return storage_resource->remote_segment_path(tablet_id, rowset_id.to_string(), seg_id); |
163 | 0 | } |
164 | 2.93k | } |
165 | | |
166 | 2.97k | io::FileSystemSPtr fs() const { |
167 | | // Return cached instance if available to ensure consistency across multiple calls |
168 | 2.97k | if (_cached_fs != nullptr) { |
169 | 2.31k | return _cached_fs; |
170 | 2.31k | } |
171 | | |
172 | 667 | auto fs = [this]() -> io::FileSystemSPtr { |
173 | 667 | if (is_local_rowset()) { |
174 | 667 | return io::global_local_filesystem(); |
175 | 667 | } else { |
176 | 0 | return storage_resource->fs; |
177 | 0 | } |
178 | 667 | }(); |
179 | | |
180 | 667 | bool is_s3_fs = fs->type() == io::FileSystemType::S3; |
181 | | |
182 | 667 | auto algorithm = encrypt_algorithm; |
183 | | |
184 | 667 | if (!algorithm.has_value()) { |
185 | | #ifndef BE_TEST |
186 | | constexpr std::string_view msg = |
187 | | "RowsetWriterContext::determine_encryption is not called when creating this " |
188 | | "RowsetWriterContext, it will result in encrypted rowsets left unencrypted"; |
189 | | auto st = Status::InternalError(msg); |
190 | | |
191 | | LOG(WARNING) << st; |
192 | | DCHECK(false) << st; |
193 | | #else |
194 | 596 | algorithm = EncryptionAlgorithmPB::PLAINTEXT; |
195 | 596 | #endif |
196 | 596 | } |
197 | | |
198 | | // Apply packed file system first for write path if enabled |
199 | | // Create empty index_map for write path |
200 | | // Index information will be populated after write completes |
201 | 667 | bool has_v1_inverted_index = tablet_schema != nullptr && |
202 | 667 | tablet_schema->has_inverted_index() && |
203 | 667 | tablet_schema->get_inverted_index_storage_format() == |
204 | 118 | InvertedIndexStorageFormatPB::V1; |
205 | | |
206 | 667 | if (has_v1_inverted_index && allow_packed_file && config::enable_packed_file) { |
207 | 0 | static constexpr std::string_view kMsg = |
208 | 0 | "Disable packed file for V1 inverted index tablet to avoid missing index " |
209 | 0 | "metadata (temporary workaround)"; |
210 | 0 | LOG(INFO) << kMsg << ", tablet_id=" << tablet_id << ", rowset_id=" << rowset_id; |
211 | 0 | } |
212 | | |
213 | | // Only enable merge file for S3 file system, not for HDFS or other remote file systems |
214 | 667 | packed_file_active = allow_packed_file && config::is_cloud_mode() && |
215 | 667 | config::enable_packed_file && !has_v1_inverted_index && is_s3_fs; |
216 | | |
217 | 667 | if (packed_file_active) { |
218 | 0 | io::PackedAppendContext append_info; |
219 | 0 | append_info.tablet_id = tablet_id; |
220 | 0 | append_info.rowset_id = rowset_id.to_string(); |
221 | 0 | append_info.txn_id = txn_id; |
222 | 0 | append_info.expiration_time = file_cache_ttl_sec > 0 && newest_write_timestamp > 0 |
223 | 0 | ? newest_write_timestamp + file_cache_ttl_sec |
224 | 0 | : 0; |
225 | 0 | fs = std::make_shared<io::PackedFileSystem>(fs, append_info); |
226 | 0 | } |
227 | | |
228 | | // Then apply encryption on top |
229 | 667 | if (algorithm.has_value()) { |
230 | 667 | fs = io::make_file_system(fs, algorithm.value()); |
231 | 667 | } |
232 | | |
233 | | // Cache the result to ensure consistency across multiple calls |
234 | 667 | _cached_fs = fs; |
235 | 667 | return fs; |
236 | 2.97k | } |
237 | | |
238 | 0 | io::FileSystem& fs_ref() const { return *fs(); } |
239 | | |
240 | 2.92k | io::FileWriterOptions get_file_writer_options(bool is_index_file = false) { |
241 | 2.92k | bool should_write_cache = write_file_cache; |
242 | | // If configured to only write index files to cache, skip cache for data files |
243 | 2.92k | if (compaction_output_write_index_only && !is_index_file) { |
244 | 4 | should_write_cache = false; |
245 | 4 | } |
246 | | |
247 | 2.92k | return io::FileWriterOptions {.write_file_cache = should_write_cache, |
248 | 2.92k | .is_cold_data = is_hot_data, |
249 | 2.92k | .file_cache_expiration_time = file_cache_ttl_sec, |
250 | 2.92k | .approximate_bytes_to_write = approximate_bytes_to_write}; |
251 | 2.92k | } |
252 | | |
253 | | struct BinlogOptions { |
254 | | public: |
255 | 2 | void mark_primary_writer() { binlog_write_type = BinlogWriteType::PrimaryWriter; } |
256 | | |
257 | 2 | void mark_binlog_writer() { binlog_write_type = BinlogWriteType::BinlogWriter; } |
258 | | |
259 | 0 | bool is_primary_writer() const { |
260 | 0 | return binlog_write_type == BinlogWriteType::PrimaryWriter; |
261 | 0 | } |
262 | | |
263 | 2.40k | bool is_binlog_writer() const { return binlog_write_type == BinlogWriteType::BinlogWriter; } |
264 | | |
265 | 0 | bool need_build_binlog() const { return binlog_write_type != BinlogWriteType::Unknown; } |
266 | | |
267 | 2 | void set_need_before(bool need_before) { this->_need_before = need_before; } |
268 | | |
269 | | private: |
270 | | // if you don't need to build row_binlog, `PrimaryWriter` and `BinlogWriter` are both false |
271 | | // if you need to build row_binlog, the `is_primary_writer` of normal rowset writer is true |
272 | | enum BinlogWriteType { |
273 | | PrimaryWriter, |
274 | | BinlogWriter, |
275 | | Unknown |
276 | | } binlog_write_type = BinlogWriteType::Unknown; |
277 | | bool _need_before = false; |
278 | | } _write_binlog_opt; |
279 | | |
280 | 1.40k | BinlogOptions& write_binlog_opt() { return _write_binlog_opt; } |
281 | | |
282 | 1.00k | const BinlogOptions& write_binlog_opt() const { return _write_binlog_opt; } |
283 | | }; |
284 | | |
285 | | } // namespace doris |