Coverage Report

Created: 2026-05-12 22:35

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset/rowset_writer_context.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/olap_file.pb.h>
21
#include <glog/logging.h>
22
23
#include <functional>
24
#include <optional>
25
#include <string_view>
26
#include <unordered_map>
27
28
#include "cloud/config.h"
29
#include "common/status.h"
30
#include "io/fs/encrypted_fs_factory.h"
31
#include "io/fs/file_system.h"
32
#include "io/fs/file_writer.h"
33
#include "io/fs/packed_file_system.h"
34
#include "runtime/exec_env.h"
35
#include "storage/binlog.h"
36
#include "storage/olap_define.h"
37
#include "storage/partial_update_info.h"
38
#include "storage/segment/historical_row_retriever.h"
39
#include "storage/storage_policy.h"
40
#include "storage/tablet/tablet.h"
41
#include "storage/tablet/tablet_schema.h"
42
43
namespace doris {
44
45
class RowsetWriterContextBuilder;
46
using RowsetWriterContextBuilderSharedPtr = std::shared_ptr<RowsetWriterContextBuilder>;
47
class DataDir;
48
class Tablet;
49
class FileWriterCreator;
50
class SegmentCollector;
51
52
namespace segment_v2 {
53
struct HistoricalRowRetrieverContext;
54
}
55
56
struct RowsetWriterContext {
57
2.10k
    RowsetWriterContext() : schema_lock(new std::mutex) {
58
2.10k
        load_id.set_hi(0);
59
2.10k
        load_id.set_lo(0);
60
2.10k
    }
61
62
    RowsetId rowset_id;
63
    int64_t db_id {0};
64
    int64_t table_id {0};
65
    int64_t tablet_id {0};
66
    int32_t tablet_schema_hash {0};
67
    int64_t index_id {0};
68
    int64_t partition_id {0};
69
    RowsetTypePB rowset_type {BETA_ROWSET};
70
71
    TabletSchemaSPtr tablet_schema;
72
73
    // PREPARED/COMMITTED for pending rowset
74
    // VISIBLE for non-pending rowset
75
    RowsetStatePB rowset_state {PREPARED};
76
    // properties for non-pending rowset
77
    Version version {0, 0};
78
79
    // properties for pending rowset
80
    int64_t txn_id {0};
81
    int64_t txn_expiration {0}; // For cloud mode
82
    PUniqueId load_id;
83
    TabletUid tablet_uid {0, 0};
84
    // indicate whether the data among segments is overlapping.
85
    // default is OVERLAP_UNKNOWN.
86
    SegmentsOverlapPB segments_overlap {OVERLAP_UNKNOWN};
87
    // segment file use uint32 to represent row number, therefore the maximum is UINT32_MAX.
88
    // the default is set to INT32_MAX to avoid overflow issue when casting from uint32_t to int.
89
    // test cases can change this value to control flush timing
90
    uint32_t max_rows_per_segment = INT32_MAX;
91
    // not owned, point to the data dir of this rowset
92
    // for checking disk capacity when write data to disk.
93
    // ATTN: not support for RowsetConvertor.
94
    // (because it hard to refactor, and RowsetConvertor will be deprecated in future)
95
    DataDir* data_dir = nullptr;
96
97
    int64_t newest_write_timestamp = -1;
98
    bool enable_unique_key_merge_on_write = false;
99
    // store column_unique_id to do index compaction
100
    std::set<int32_t> columns_to_do_index_compaction;
101
    DataWriteType write_type = DataWriteType::TYPE_DEFAULT;
102
    // need to figure out the sub type of compaction
103
    ReaderType compaction_type = ReaderType::UNKNOWN;
104
    BaseTabletSPtr tablet = nullptr;
105
106
    std::shared_ptr<MowContext> mow_context;
107
    std::shared_ptr<FileWriterCreator> file_writer_creator;
108
    std::shared_ptr<SegmentCollector> segment_collector;
109
110
    // memtable_on_sink_support_index_v2 = true, we will create SinkFileWriter to send inverted index file
111
    bool memtable_on_sink_support_index_v2 = false;
112
113
    /// begin file cache opts
114
    bool write_file_cache = false;
115
    bool is_hot_data = false;
116
    uint64_t file_cache_ttl_sec = 0;
117
    uint64_t approximate_bytes_to_write = 0;
118
    // If true, compaction output only writes index files to file cache, not data files
119
    bool compaction_output_write_index_only = false;
120
    /// end file cache opts
121
122
    // segcompaction for this RowsetWriter, only enabled when importing data
123
    bool enable_segcompaction = false;
124
125
    std::shared_ptr<PartialUpdateInfo> partial_update_info;
126
127
    bool is_transient_rowset_writer = false;
128
129
    segment_v2::HistoricalRowRetrieverContext make_historical_row_retriever_context();
130
131
    // Intent flag: caller can actively turn merge-file feature on/off for this rowset.
132
    // This describes whether we *want* to try small-file merging.
133
    bool allow_packed_file = true;
134
135
    // Effective flag: whether this context actually ends up using MergeFileSystem for writes.
136
    // This is decided inside fs() based on enable_merge_file plus other conditions
137
    // (cloud mode, S3 filesystem, V1 inverted index, global config, etc.), and once
138
    // set to true it remains stable even if config::enable_merge_file changes later.
139
    mutable bool packed_file_active = false;
140
141
    // Cached FileSystem instance to ensure consistency across multiple fs() calls.
142
    // This prevents creating multiple MergeFileSystem instances and ensures
143
    // packed_file_active flag remains consistent.
144
    mutable io::FileSystemSPtr _cached_fs = nullptr;
145
146
    // For collect segment statistics for compaction
147
    std::vector<RowsetReaderSharedPtr> input_rs_readers;
148
149
    // TODO(lihangyu) remove this lock
150
    // In semi-structure senario tablet_schema will be updated concurrently,
151
    // this lock need to be held when update.Use shared_ptr to avoid delete copy contructor
152
    std::shared_ptr<std::mutex> schema_lock;
153
154
    int64_t compaction_level = 0;
155
156
    // For local rowset
157
    std::string tablet_path;
158
159
    // For remote rowset
160
    std::optional<StorageResource> storage_resource;
161
162
    std::optional<EncryptionAlgorithmPB> encrypt_algorithm;
163
164
    std::string job_id;
165
166
4.07k
    bool is_local_rowset() const { return !storage_resource; }
167
168
2.93k
    std::string segment_path(int seg_id) const {
169
2.93k
        if (is_local_rowset()) {
170
2.93k
            return local_segment_path(tablet_path, rowset_id.to_string(), seg_id);
171
2.93k
        } else {
172
0
            return storage_resource->remote_segment_path(tablet_id, rowset_id.to_string(), seg_id);
173
0
        }
174
2.93k
    }
175
176
2.98k
    io::FileSystemSPtr fs() const {
177
        // Return cached instance if available to ensure consistency across multiple calls
178
2.98k
        if (_cached_fs != nullptr) {
179
2.31k
            return _cached_fs;
180
2.31k
        }
181
182
671
        auto fs = [this]() -> io::FileSystemSPtr {
183
671
            if (is_local_rowset()) {
184
671
                return io::global_local_filesystem();
185
671
            } else {
186
0
                return storage_resource->fs;
187
0
            }
188
671
        }();
189
190
671
        bool is_s3_fs = fs->type() == io::FileSystemType::S3;
191
192
671
        auto algorithm = encrypt_algorithm;
193
194
671
        if (!algorithm.has_value()) {
195
#ifndef BE_TEST
196
            constexpr std::string_view msg =
197
                    "RowsetWriterContext::determine_encryption is not called when creating this "
198
                    "RowsetWriterContext, it will result in encrypted rowsets left unencrypted";
199
            auto st = Status::InternalError(msg);
200
201
            LOG(WARNING) << st;
202
            DCHECK(false) << st;
203
#else
204
596
            algorithm = EncryptionAlgorithmPB::PLAINTEXT;
205
596
#endif
206
596
        }
207
208
        // Apply packed file system first for write path if enabled
209
        // Create empty index_map for write path
210
        // Index information will be populated after write completes
211
671
        bool has_v1_inverted_index = tablet_schema != nullptr &&
212
671
                                     tablet_schema->has_inverted_index() &&
213
671
                                     tablet_schema->get_inverted_index_storage_format() ==
214
118
                                             InvertedIndexStorageFormatPB::V1;
215
216
671
        if (has_v1_inverted_index && allow_packed_file && config::enable_packed_file) {
217
0
            static constexpr std::string_view kMsg =
218
0
                    "Disable packed file for V1 inverted index tablet to avoid missing index "
219
0
                    "metadata (temporary workaround)";
220
0
            LOG(INFO) << kMsg << ", tablet_id=" << tablet_id << ", rowset_id=" << rowset_id;
221
0
        }
222
223
        // Only enable merge file for S3 file system, not for HDFS or other remote file systems
224
671
        packed_file_active = allow_packed_file && config::is_cloud_mode() &&
225
671
                             config::enable_packed_file && !has_v1_inverted_index && is_s3_fs;
226
227
671
        if (packed_file_active) {
228
0
            io::PackedAppendContext append_info;
229
0
            append_info.tablet_id = tablet_id;
230
0
            append_info.rowset_id = rowset_id.to_string();
231
0
            append_info.txn_id = txn_id;
232
0
            append_info.expiration_time = file_cache_ttl_sec > 0 && newest_write_timestamp > 0
233
0
                                                  ? newest_write_timestamp + file_cache_ttl_sec
234
0
                                                  : 0;
235
0
            fs = std::make_shared<io::PackedFileSystem>(fs, append_info);
236
0
        }
237
238
        // Then apply encryption on top
239
671
        if (algorithm.has_value()) {
240
671
            fs = io::make_file_system(fs, algorithm.value());
241
671
        }
242
243
        // Cache the result to ensure consistency across multiple calls
244
671
        _cached_fs = fs;
245
671
        return fs;
246
2.98k
    }
247
248
0
    io::FileSystem& fs_ref() const { return *fs(); }
249
250
2.93k
    io::FileWriterOptions get_file_writer_options(bool is_index_file = false) {
251
2.93k
        bool should_write_cache = write_file_cache;
252
        // If configured to only write index files to cache, skip cache for data files
253
2.93k
        if (compaction_output_write_index_only && !is_index_file) {
254
4
            should_write_cache = false;
255
4
        }
256
257
2.93k
        return io::FileWriterOptions {.write_file_cache = should_write_cache,
258
2.93k
                                      .is_cold_data = is_hot_data,
259
2.93k
                                      .file_cache_expiration_time = file_cache_ttl_sec,
260
2.93k
                                      .approximate_bytes_to_write = approximate_bytes_to_write};
261
2.93k
    }
262
263
    struct BinlogOptions {
264
    public:
265
11
        void mark_primary_writer() { binlog_write_type = BinlogWriteType::PrimaryWriter; }
266
267
12
        void mark_binlog_writer() { binlog_write_type = BinlogWriteType::BinlogWriter; }
268
269
0
        bool is_primary_writer() const {
270
0
            return binlog_write_type == BinlogWriteType::PrimaryWriter;
271
0
        }
272
273
3.65k
        bool is_binlog_writer() const { return binlog_write_type == BinlogWriteType::BinlogWriter; }
274
275
3
        bool need_build_binlog() const { return binlog_write_type != BinlogWriteType::Unknown; }
276
277
9
        void set_need_before(bool need_before) {
278
9
            this->_need_before = need_before;
279
9
            _segment_write_binlog_opt.write_before = need_before;
280
9
        }
281
282
8
        segment_v2::SegmentWriteBinlogOptions& write_binlog_config() {
283
8
            return _segment_write_binlog_opt;
284
8
        }
285
286
1
        const segment_v2::SegmentWriteBinlogOptions& write_binlog_config() const {
287
1
            return _segment_write_binlog_opt;
288
1
        }
289
290
    private:
291
        // if you don't need to build row_binlog, `PrimaryWriter` and `BinlogWriter` are both false
292
        // if you need to build row_binlog, the `is_primary_writer` of normal rowset writer is true
293
        enum BinlogWriteType {
294
            PrimaryWriter,
295
            BinlogWriter,
296
            Unknown
297
        } binlog_write_type = BinlogWriteType::Unknown;
298
        bool _need_before = false;
299
        segment_v2::SegmentWriteBinlogOptions _segment_write_binlog_opt;
300
    } _write_binlog_opt;
301
302
2.67k
    BinlogOptions& write_binlog_opt() { return _write_binlog_opt; }
303
304
1.01k
    const BinlogOptions& write_binlog_opt() const { return _write_binlog_opt; }
305
};
306
307
inline segment_v2::HistoricalRowRetrieverContext
308
0
RowsetWriterContext::make_historical_row_retriever_context() {
309
0
    return segment_v2::HistoricalRowRetrieverContext {
310
0
            .tablet = tablet,
311
0
            .tablet_schema = tablet_schema,
312
0
            .rowset_writer_ctx = this,
313
0
            .partial_update_info = partial_update_info,
314
0
            .is_transient_rowset_writer = is_transient_rowset_writer,
315
0
            .write_type = write_type};
316
0
}
317
318
} // namespace doris