be/src/storage/rowset/rowset_writer_context.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <gen_cpp/olap_file.pb.h> |
21 | | #include <glog/logging.h> |
22 | | |
23 | | #include <functional> |
24 | | #include <optional> |
25 | | #include <string_view> |
26 | | #include <unordered_map> |
27 | | |
28 | | #include "cloud/config.h" |
29 | | #include "common/status.h" |
30 | | #include "io/fs/encrypted_fs_factory.h" |
31 | | #include "io/fs/file_system.h" |
32 | | #include "io/fs/file_writer.h" |
33 | | #include "io/fs/packed_file_system.h" |
34 | | #include "runtime/exec_env.h" |
35 | | #include "storage/olap_define.h" |
36 | | #include "storage/partial_update_info.h" |
37 | | #include "storage/storage_policy.h" |
38 | | #include "storage/tablet/tablet.h" |
39 | | #include "storage/tablet/tablet_schema.h" |
40 | | |
41 | | namespace doris { |
42 | | |
43 | | class RowsetWriterContextBuilder; |
44 | | using RowsetWriterContextBuilderSharedPtr = std::shared_ptr<RowsetWriterContextBuilder>; |
45 | | class DataDir; |
46 | | class Tablet; |
47 | | class FileWriterCreator; |
48 | | class SegmentCollector; |
49 | | |
50 | | struct RowsetWriterContext { |
51 | 2.01k | RowsetWriterContext() : schema_lock(new std::mutex) { |
52 | 2.01k | load_id.set_hi(0); |
53 | 2.01k | load_id.set_lo(0); |
54 | 2.01k | } |
55 | | |
56 | | RowsetId rowset_id; |
57 | | int64_t tablet_id {0}; |
58 | | int32_t tablet_schema_hash {0}; |
59 | | int64_t index_id {0}; |
60 | | int64_t partition_id {0}; |
61 | | RowsetTypePB rowset_type {BETA_ROWSET}; |
62 | | |
63 | | TabletSchemaSPtr tablet_schema; |
64 | | // PREPARED/COMMITTED for pending rowset |
65 | | // VISIBLE for non-pending rowset |
66 | | RowsetStatePB rowset_state {PREPARED}; |
67 | | // properties for non-pending rowset |
68 | | Version version {0, 0}; |
69 | | |
70 | | // properties for pending rowset |
71 | | int64_t txn_id {0}; |
72 | | int64_t txn_expiration {0}; // For cloud mode |
73 | | PUniqueId load_id; |
74 | | TabletUid tablet_uid {0, 0}; |
75 | | // indicate whether the data among segments is overlapping. |
76 | | // default is OVERLAP_UNKNOWN. |
77 | | SegmentsOverlapPB segments_overlap {OVERLAP_UNKNOWN}; |
78 | | // segment file use uint32 to represent row number, therefore the maximum is UINT32_MAX. |
79 | | // the default is set to INT32_MAX to avoid overflow issue when casting from uint32_t to int. |
80 | | // test cases can change this value to control flush timing |
81 | | uint32_t max_rows_per_segment = INT32_MAX; |
82 | | // not owned, point to the data dir of this rowset |
83 | | // for checking disk capacity when write data to disk. |
84 | | // ATTN: not support for RowsetConvertor. |
85 | | // (because it hard to refactor, and RowsetConvertor will be deprecated in future) |
86 | | DataDir* data_dir = nullptr; |
87 | | |
88 | | int64_t newest_write_timestamp = -1; |
89 | | bool enable_unique_key_merge_on_write = false; |
90 | | // store column_unique_id to do index compaction |
91 | | std::set<int32_t> columns_to_do_index_compaction; |
92 | | DataWriteType write_type = DataWriteType::TYPE_DEFAULT; |
93 | | // need to figure out the sub type of compaction |
94 | | ReaderType compaction_type = ReaderType::UNKNOWN; |
95 | | BaseTabletSPtr tablet = nullptr; |
96 | | |
97 | | std::shared_ptr<MowContext> mow_context; |
98 | | std::shared_ptr<FileWriterCreator> file_writer_creator; |
99 | | std::shared_ptr<SegmentCollector> segment_collector; |
100 | | |
101 | | // memtable_on_sink_support_index_v2 = true, we will create SinkFileWriter to send inverted index file |
102 | | bool memtable_on_sink_support_index_v2 = false; |
103 | | |
104 | | /// begin file cache opts |
105 | | bool write_file_cache = false; |
106 | | bool is_hot_data = false; |
107 | | uint64_t file_cache_ttl_sec = 0; |
108 | | uint64_t approximate_bytes_to_write = 0; |
109 | | // If true, compaction output only writes index files to file cache, not data files |
110 | | bool compaction_output_write_index_only = false; |
111 | | /// end file cache opts |
112 | | |
113 | | // segcompaction for this RowsetWriter, only enabled when importing data |
114 | | bool enable_segcompaction = false; |
115 | | |
116 | | std::shared_ptr<PartialUpdateInfo> partial_update_info; |
117 | | |
118 | | bool is_transient_rowset_writer = false; |
119 | | |
120 | | // Intent flag: caller can actively turn merge-file feature on/off for this rowset. |
121 | | // This describes whether we *want* to try small-file merging. |
122 | | bool allow_packed_file = true; |
123 | | |
124 | | // Effective flag: whether this context actually ends up using MergeFileSystem for writes. |
125 | | // This is decided inside fs() based on enable_merge_file plus other conditions |
126 | | // (cloud mode, S3 filesystem, V1 inverted index, global config, etc.), and once |
127 | | // set to true it remains stable even if config::enable_merge_file changes later. |
128 | | mutable bool packed_file_active = false; |
129 | | |
130 | | // Cached FileSystem instance to ensure consistency across multiple fs() calls. |
131 | | // This prevents creating multiple MergeFileSystem instances and ensures |
132 | | // packed_file_active flag remains consistent. |
133 | | mutable io::FileSystemSPtr _cached_fs = nullptr; |
134 | | |
135 | | // For collect segment statistics for compaction |
136 | | std::vector<RowsetReaderSharedPtr> input_rs_readers; |
137 | | |
138 | | // TODO(lihangyu) remove this lock |
139 | | // In semi-structure senario tablet_schema will be updated concurrently, |
140 | | // this lock need to be held when update.Use shared_ptr to avoid delete copy contructor |
141 | | std::shared_ptr<std::mutex> schema_lock; |
142 | | |
143 | | int64_t compaction_level = 0; |
144 | | |
145 | | // For local rowset |
146 | | std::string tablet_path; |
147 | | |
148 | | // For remote rowset |
149 | | std::optional<StorageResource> storage_resource; |
150 | | |
151 | | std::optional<EncryptionAlgorithmPB> encrypt_algorithm; |
152 | | |
153 | | std::string job_id; |
154 | | |
155 | 6.85k | bool is_local_rowset() const { return !storage_resource; } |
156 | | |
157 | 5.74k | std::string segment_path(int seg_id) const { |
158 | 5.74k | if (is_local_rowset()) { |
159 | 5.74k | return local_segment_path(tablet_path, rowset_id.to_string(), seg_id); |
160 | 5.74k | } else { |
161 | 0 | return storage_resource->remote_segment_path(tablet_id, rowset_id.to_string(), seg_id); |
162 | 0 | } |
163 | 5.74k | } |
164 | | |
165 | 5.78k | io::FileSystemSPtr fs() const { |
166 | | // Return cached instance if available to ensure consistency across multiple calls |
167 | 5.78k | if (_cached_fs != nullptr) { |
168 | 5.13k | return _cached_fs; |
169 | 5.13k | } |
170 | | |
171 | 655 | auto fs = [this]() -> io::FileSystemSPtr { |
172 | 655 | if (is_local_rowset()) { |
173 | 655 | return io::global_local_filesystem(); |
174 | 655 | } else { |
175 | 0 | return storage_resource->fs; |
176 | 0 | } |
177 | 655 | }(); |
178 | | |
179 | 655 | bool is_s3_fs = fs->type() == io::FileSystemType::S3; |
180 | | |
181 | 655 | auto algorithm = encrypt_algorithm; |
182 | | |
183 | 655 | if (!algorithm.has_value()) { |
184 | | #ifndef BE_TEST |
185 | | constexpr std::string_view msg = |
186 | | "RowsetWriterContext::determine_encryption is not called when creating this " |
187 | | "RowsetWriterContext, it will result in encrypted rowsets left unencrypted"; |
188 | | auto st = Status::InternalError(msg); |
189 | | |
190 | | LOG(WARNING) << st; |
191 | | DCHECK(false) << st; |
192 | | #else |
193 | 584 | algorithm = EncryptionAlgorithmPB::PLAINTEXT; |
194 | 584 | #endif |
195 | 584 | } |
196 | | |
197 | | // Apply packed file system first for write path if enabled |
198 | | // Create empty index_map for write path |
199 | | // Index information will be populated after write completes |
200 | 655 | bool has_v1_inverted_index = tablet_schema != nullptr && |
201 | 655 | tablet_schema->has_inverted_index() && |
202 | 655 | tablet_schema->get_inverted_index_storage_format() == |
203 | 118 | InvertedIndexStorageFormatPB::V1; |
204 | | |
205 | 655 | if (has_v1_inverted_index && allow_packed_file && config::enable_packed_file) { |
206 | 0 | static constexpr std::string_view kMsg = |
207 | 0 | "Disable packed file for V1 inverted index tablet to avoid missing index " |
208 | 0 | "metadata (temporary workaround)"; |
209 | 0 | LOG(INFO) << kMsg << ", tablet_id=" << tablet_id << ", rowset_id=" << rowset_id; |
210 | 0 | } |
211 | | |
212 | | // Only enable merge file for S3 file system, not for HDFS or other remote file systems |
213 | 655 | packed_file_active = allow_packed_file && config::is_cloud_mode() && |
214 | 655 | config::enable_packed_file && !has_v1_inverted_index && is_s3_fs; |
215 | | |
216 | 655 | if (packed_file_active) { |
217 | 0 | io::PackedAppendContext append_info; |
218 | 0 | append_info.tablet_id = tablet_id; |
219 | 0 | append_info.rowset_id = rowset_id.to_string(); |
220 | 0 | append_info.txn_id = txn_id; |
221 | 0 | append_info.expiration_time = file_cache_ttl_sec > 0 && newest_write_timestamp > 0 |
222 | 0 | ? newest_write_timestamp + file_cache_ttl_sec |
223 | 0 | : 0; |
224 | 0 | fs = std::make_shared<io::PackedFileSystem>(fs, append_info); |
225 | 0 | } |
226 | | |
227 | | // Then apply encryption on top |
228 | 655 | if (algorithm.has_value()) { |
229 | 655 | fs = io::make_file_system(fs, algorithm.value()); |
230 | 655 | } |
231 | | |
232 | | // Cache the result to ensure consistency across multiple calls |
233 | 655 | _cached_fs = fs; |
234 | 655 | return fs; |
235 | 5.78k | } |
236 | | |
237 | 0 | io::FileSystem& fs_ref() const { return *fs(); } |
238 | | |
239 | 5.73k | io::FileWriterOptions get_file_writer_options(bool is_index_file = false) { |
240 | 5.73k | bool should_write_cache = write_file_cache; |
241 | | // If configured to only write index files to cache, skip cache for data files |
242 | 5.73k | if (compaction_output_write_index_only && !is_index_file) { |
243 | 4 | should_write_cache = false; |
244 | 4 | } |
245 | | |
246 | 5.73k | return io::FileWriterOptions {.write_file_cache = should_write_cache, |
247 | 5.73k | .is_cold_data = is_hot_data, |
248 | 5.73k | .file_cache_expiration_time = file_cache_ttl_sec, |
249 | 5.73k | .approximate_bytes_to_write = approximate_bytes_to_write}; |
250 | 5.73k | } |
251 | | }; |
252 | | |
253 | | } // namespace doris |