Coverage Report

Created: 2026-05-12 14:22

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset/rowset_writer_context.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/olap_file.pb.h>
21
#include <glog/logging.h>
22
23
#include <functional>
24
#include <optional>
25
#include <string_view>
26
#include <unordered_map>
27
28
#include "cloud/config.h"
29
#include "common/status.h"
30
#include "io/fs/encrypted_fs_factory.h"
31
#include "io/fs/file_system.h"
32
#include "io/fs/file_writer.h"
33
#include "io/fs/packed_file_system.h"
34
#include "runtime/exec_env.h"
35
#include "storage/olap_define.h"
36
#include "storage/partial_update_info.h"
37
#include "storage/storage_policy.h"
38
#include "storage/tablet/tablet.h"
39
#include "storage/tablet/tablet_schema.h"
40
41
namespace doris {
42
43
class RowsetWriterContextBuilder;
44
using RowsetWriterContextBuilderSharedPtr = std::shared_ptr<RowsetWriterContextBuilder>;
45
class DataDir;
46
class Tablet;
47
class FileWriterCreator;
48
class SegmentCollector;
49
50
struct RowsetWriterContext {
51
2.05k
    RowsetWriterContext() : schema_lock(new std::mutex) {
52
2.05k
        load_id.set_hi(0);
53
2.05k
        load_id.set_lo(0);
54
2.05k
    }
55
56
    RowsetId rowset_id;
57
    int64_t tablet_id {0};
58
    int32_t tablet_schema_hash {0};
59
    int64_t index_id {0};
60
    int64_t partition_id {0};
61
    RowsetTypePB rowset_type {BETA_ROWSET};
62
63
    TabletSchemaSPtr tablet_schema;
64
65
    // PREPARED/COMMITTED for pending rowset
66
    // VISIBLE for non-pending rowset
67
    RowsetStatePB rowset_state {PREPARED};
68
    // properties for non-pending rowset
69
    Version version {0, 0};
70
71
    // properties for pending rowset
72
    int64_t txn_id {0};
73
    int64_t txn_expiration {0}; // For cloud mode
74
    PUniqueId load_id;
75
    TabletUid tablet_uid {0, 0};
76
    // indicate whether the data among segments is overlapping.
77
    // default is OVERLAP_UNKNOWN.
78
    SegmentsOverlapPB segments_overlap {OVERLAP_UNKNOWN};
79
    // segment file use uint32 to represent row number, therefore the maximum is UINT32_MAX.
80
    // the default is set to INT32_MAX to avoid overflow issue when casting from uint32_t to int.
81
    // test cases can change this value to control flush timing
82
    uint32_t max_rows_per_segment = INT32_MAX;
83
    // not owned, point to the data dir of this rowset
84
    // for checking disk capacity when write data to disk.
85
    // ATTN: not support for RowsetConvertor.
86
    // (because it hard to refactor, and RowsetConvertor will be deprecated in future)
87
    DataDir* data_dir = nullptr;
88
89
    int64_t newest_write_timestamp = -1;
90
    bool enable_unique_key_merge_on_write = false;
91
    // store column_unique_id to do index compaction
92
    std::set<int32_t> columns_to_do_index_compaction;
93
    DataWriteType write_type = DataWriteType::TYPE_DEFAULT;
94
    // need to figure out the sub type of compaction
95
    ReaderType compaction_type = ReaderType::UNKNOWN;
96
    BaseTabletSPtr tablet = nullptr;
97
98
    std::shared_ptr<MowContext> mow_context;
99
    std::shared_ptr<FileWriterCreator> file_writer_creator;
100
    std::shared_ptr<SegmentCollector> segment_collector;
101
102
    // memtable_on_sink_support_index_v2 = true, we will create SinkFileWriter to send inverted index file
103
    bool memtable_on_sink_support_index_v2 = false;
104
105
    /// begin file cache opts
106
    bool write_file_cache = false;
107
    bool is_hot_data = false;
108
    uint64_t file_cache_ttl_sec = 0;
109
    uint64_t approximate_bytes_to_write = 0;
110
    // If true, compaction output only writes index files to file cache, not data files
111
    bool compaction_output_write_index_only = false;
112
    /// end file cache opts
113
114
    // segcompaction for this RowsetWriter, only enabled when importing data
115
    bool enable_segcompaction = false;
116
117
    std::shared_ptr<PartialUpdateInfo> partial_update_info;
118
119
    bool is_transient_rowset_writer = false;
120
121
    // Intent flag: caller can actively turn merge-file feature on/off for this rowset.
122
    // This describes whether we *want* to try small-file merging.
123
    bool allow_packed_file = true;
124
125
    // Effective flag: whether this context actually ends up using MergeFileSystem for writes.
126
    // This is decided inside fs() based on enable_merge_file plus other conditions
127
    // (cloud mode, S3 filesystem, V1 inverted index, global config, etc.), and once
128
    // set to true it remains stable even if config::enable_merge_file changes later.
129
    mutable bool packed_file_active = false;
130
131
    // Cached FileSystem instance to ensure consistency across multiple fs() calls.
132
    // This prevents creating multiple MergeFileSystem instances and ensures
133
    // packed_file_active flag remains consistent.
134
    mutable io::FileSystemSPtr _cached_fs = nullptr;
135
136
    // For collect segment statistics for compaction
137
    std::vector<RowsetReaderSharedPtr> input_rs_readers;
138
139
    // TODO(lihangyu) remove this lock
140
    // In semi-structure senario tablet_schema will be updated concurrently,
141
    // this lock need to be held when update.Use shared_ptr to avoid delete copy contructor
142
    std::shared_ptr<std::mutex> schema_lock;
143
144
    int64_t compaction_level = 0;
145
146
    // For local rowset
147
    std::string tablet_path;
148
149
    // For remote rowset
150
    std::optional<StorageResource> storage_resource;
151
152
    std::optional<EncryptionAlgorithmPB> encrypt_algorithm;
153
154
    std::string job_id;
155
156
4.05k
    bool is_local_rowset() const { return !storage_resource; }
157
158
2.93k
    std::string segment_path(int seg_id) const {
159
2.93k
        if (is_local_rowset()) {
160
2.93k
            return local_segment_path(tablet_path, rowset_id.to_string(), seg_id);
161
2.93k
        } else {
162
0
            return storage_resource->remote_segment_path(tablet_id, rowset_id.to_string(), seg_id);
163
0
        }
164
2.93k
    }
165
166
2.97k
    io::FileSystemSPtr fs() const {
167
        // Return cached instance if available to ensure consistency across multiple calls
168
2.97k
        if (_cached_fs != nullptr) {
169
2.31k
            return _cached_fs;
170
2.31k
        }
171
172
667
        auto fs = [this]() -> io::FileSystemSPtr {
173
667
            if (is_local_rowset()) {
174
667
                return io::global_local_filesystem();
175
667
            } else {
176
0
                return storage_resource->fs;
177
0
            }
178
667
        }();
179
180
667
        bool is_s3_fs = fs->type() == io::FileSystemType::S3;
181
182
667
        auto algorithm = encrypt_algorithm;
183
184
667
        if (!algorithm.has_value()) {
185
#ifndef BE_TEST
186
            constexpr std::string_view msg =
187
                    "RowsetWriterContext::determine_encryption is not called when creating this "
188
                    "RowsetWriterContext, it will result in encrypted rowsets left unencrypted";
189
            auto st = Status::InternalError(msg);
190
191
            LOG(WARNING) << st;
192
            DCHECK(false) << st;
193
#else
194
596
            algorithm = EncryptionAlgorithmPB::PLAINTEXT;
195
596
#endif
196
596
        }
197
198
        // Apply packed file system first for write path if enabled
199
        // Create empty index_map for write path
200
        // Index information will be populated after write completes
201
667
        bool has_v1_inverted_index = tablet_schema != nullptr &&
202
667
                                     tablet_schema->has_inverted_index() &&
203
667
                                     tablet_schema->get_inverted_index_storage_format() ==
204
118
                                             InvertedIndexStorageFormatPB::V1;
205
206
667
        if (has_v1_inverted_index && allow_packed_file && config::enable_packed_file) {
207
0
            static constexpr std::string_view kMsg =
208
0
                    "Disable packed file for V1 inverted index tablet to avoid missing index "
209
0
                    "metadata (temporary workaround)";
210
0
            LOG(INFO) << kMsg << ", tablet_id=" << tablet_id << ", rowset_id=" << rowset_id;
211
0
        }
212
213
        // Only enable merge file for S3 file system, not for HDFS or other remote file systems
214
667
        packed_file_active = allow_packed_file && config::is_cloud_mode() &&
215
667
                             config::enable_packed_file && !has_v1_inverted_index && is_s3_fs;
216
217
667
        if (packed_file_active) {
218
0
            io::PackedAppendContext append_info;
219
0
            append_info.tablet_id = tablet_id;
220
0
            append_info.rowset_id = rowset_id.to_string();
221
0
            append_info.txn_id = txn_id;
222
0
            append_info.expiration_time = file_cache_ttl_sec > 0 && newest_write_timestamp > 0
223
0
                                                  ? newest_write_timestamp + file_cache_ttl_sec
224
0
                                                  : 0;
225
0
            fs = std::make_shared<io::PackedFileSystem>(fs, append_info);
226
0
        }
227
228
        // Then apply encryption on top
229
667
        if (algorithm.has_value()) {
230
667
            fs = io::make_file_system(fs, algorithm.value());
231
667
        }
232
233
        // Cache the result to ensure consistency across multiple calls
234
667
        _cached_fs = fs;
235
667
        return fs;
236
2.97k
    }
237
238
0
    io::FileSystem& fs_ref() const { return *fs(); }
239
240
2.92k
    io::FileWriterOptions get_file_writer_options(bool is_index_file = false) {
241
2.92k
        bool should_write_cache = write_file_cache;
242
        // If configured to only write index files to cache, skip cache for data files
243
2.92k
        if (compaction_output_write_index_only && !is_index_file) {
244
4
            should_write_cache = false;
245
4
        }
246
247
2.92k
        return io::FileWriterOptions {.write_file_cache = should_write_cache,
248
2.92k
                                      .is_cold_data = is_hot_data,
249
2.92k
                                      .file_cache_expiration_time = file_cache_ttl_sec,
250
2.92k
                                      .approximate_bytes_to_write = approximate_bytes_to_write};
251
2.92k
    }
252
253
    struct BinlogOptions {
254
    public:
255
2
        void mark_primary_writer() { binlog_write_type = BinlogWriteType::PrimaryWriter; }
256
257
2
        void mark_binlog_writer() { binlog_write_type = BinlogWriteType::BinlogWriter; }
258
259
0
        bool is_primary_writer() const {
260
0
            return binlog_write_type == BinlogWriteType::PrimaryWriter;
261
0
        }
262
263
2.40k
        bool is_binlog_writer() const { return binlog_write_type == BinlogWriteType::BinlogWriter; }
264
265
0
        bool need_build_binlog() const { return binlog_write_type != BinlogWriteType::Unknown; }
266
267
2
        void set_need_before(bool need_before) { this->_need_before = need_before; }
268
269
    private:
270
        // if you don't need to build row_binlog, `PrimaryWriter` and `BinlogWriter` are both false
271
        // if you need to build row_binlog, the `is_primary_writer` of normal rowset writer is true
272
        enum BinlogWriteType {
273
            PrimaryWriter,
274
            BinlogWriter,
275
            Unknown
276
        } binlog_write_type = BinlogWriteType::Unknown;
277
        bool _need_before = false;
278
    } _write_binlog_opt;
279
280
1.40k
    BinlogOptions& write_binlog_opt() { return _write_binlog_opt; }
281
282
1.00k
    const BinlogOptions& write_binlog_opt() const { return _write_binlog_opt; }
283
};
284
285
} // namespace doris