Coverage Report

Created: 2026-03-16 21:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset/rowset_writer_context.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/olap_file.pb.h>
21
#include <glog/logging.h>
22
23
#include <functional>
24
#include <optional>
25
#include <string_view>
26
#include <unordered_map>
27
28
#include "cloud/config.h"
29
#include "common/status.h"
30
#include "io/fs/encrypted_fs_factory.h"
31
#include "io/fs/file_system.h"
32
#include "io/fs/file_writer.h"
33
#include "io/fs/packed_file_system.h"
34
#include "runtime/exec_env.h"
35
#include "storage/olap_define.h"
36
#include "storage/partial_update_info.h"
37
#include "storage/storage_policy.h"
38
#include "storage/tablet/tablet.h"
39
#include "storage/tablet/tablet_schema.h"
40
41
namespace doris {
42
43
class RowsetWriterContextBuilder;
44
using RowsetWriterContextBuilderSharedPtr = std::shared_ptr<RowsetWriterContextBuilder>;
45
class DataDir;
46
class Tablet;
47
class FileWriterCreator;
48
class SegmentCollector;
49
50
struct RowsetWriterContext {
51
2.01k
    RowsetWriterContext() : schema_lock(new std::mutex) {
52
2.01k
        load_id.set_hi(0);
53
2.01k
        load_id.set_lo(0);
54
2.01k
    }
55
56
    RowsetId rowset_id;
57
    int64_t tablet_id {0};
58
    int32_t tablet_schema_hash {0};
59
    int64_t index_id {0};
60
    int64_t partition_id {0};
61
    RowsetTypePB rowset_type {BETA_ROWSET};
62
63
    TabletSchemaSPtr tablet_schema;
64
    // PREPARED/COMMITTED for pending rowset
65
    // VISIBLE for non-pending rowset
66
    RowsetStatePB rowset_state {PREPARED};
67
    // properties for non-pending rowset
68
    Version version {0, 0};
69
70
    // properties for pending rowset
71
    int64_t txn_id {0};
72
    int64_t txn_expiration {0}; // For cloud mode
73
    PUniqueId load_id;
74
    TabletUid tablet_uid {0, 0};
75
    // indicate whether the data among segments is overlapping.
76
    // default is OVERLAP_UNKNOWN.
77
    SegmentsOverlapPB segments_overlap {OVERLAP_UNKNOWN};
78
    // segment file use uint32 to represent row number, therefore the maximum is UINT32_MAX.
79
    // the default is set to INT32_MAX to avoid overflow issue when casting from uint32_t to int.
80
    // test cases can change this value to control flush timing
81
    uint32_t max_rows_per_segment = INT32_MAX;
82
    // not owned, point to the data dir of this rowset
83
    // for checking disk capacity when write data to disk.
84
    // ATTN: not support for RowsetConvertor.
85
    // (because it hard to refactor, and RowsetConvertor will be deprecated in future)
86
    DataDir* data_dir = nullptr;
87
88
    int64_t newest_write_timestamp = -1;
89
    bool enable_unique_key_merge_on_write = false;
90
    // store column_unique_id to do index compaction
91
    std::set<int32_t> columns_to_do_index_compaction;
92
    DataWriteType write_type = DataWriteType::TYPE_DEFAULT;
93
    // need to figure out the sub type of compaction
94
    ReaderType compaction_type = ReaderType::UNKNOWN;
95
    BaseTabletSPtr tablet = nullptr;
96
97
    std::shared_ptr<MowContext> mow_context;
98
    std::shared_ptr<FileWriterCreator> file_writer_creator;
99
    std::shared_ptr<SegmentCollector> segment_collector;
100
101
    // memtable_on_sink_support_index_v2 = true, we will create SinkFileWriter to send inverted index file
102
    bool memtable_on_sink_support_index_v2 = false;
103
104
    /// begin file cache opts
105
    bool write_file_cache = false;
106
    bool is_hot_data = false;
107
    uint64_t file_cache_ttl_sec = 0;
108
    uint64_t approximate_bytes_to_write = 0;
109
    // If true, compaction output only writes index files to file cache, not data files
110
    bool compaction_output_write_index_only = false;
111
    /// end file cache opts
112
113
    // segcompaction for this RowsetWriter, only enabled when importing data
114
    bool enable_segcompaction = false;
115
116
    std::shared_ptr<PartialUpdateInfo> partial_update_info;
117
118
    bool is_transient_rowset_writer = false;
119
120
    // Intent flag: caller can actively turn merge-file feature on/off for this rowset.
121
    // This describes whether we *want* to try small-file merging.
122
    bool allow_packed_file = true;
123
124
    // Effective flag: whether this context actually ends up using MergeFileSystem for writes.
125
    // This is decided inside fs() based on enable_merge_file plus other conditions
126
    // (cloud mode, S3 filesystem, V1 inverted index, global config, etc.), and once
127
    // set to true it remains stable even if config::enable_merge_file changes later.
128
    mutable bool packed_file_active = false;
129
130
    // Cached FileSystem instance to ensure consistency across multiple fs() calls.
131
    // This prevents creating multiple MergeFileSystem instances and ensures
132
    // packed_file_active flag remains consistent.
133
    mutable io::FileSystemSPtr _cached_fs = nullptr;
134
135
    // For collect segment statistics for compaction
136
    std::vector<RowsetReaderSharedPtr> input_rs_readers;
137
138
    // TODO(lihangyu) remove this lock
139
    // In semi-structure senario tablet_schema will be updated concurrently,
140
    // this lock need to be held when update.Use shared_ptr to avoid delete copy contructor
141
    std::shared_ptr<std::mutex> schema_lock;
142
143
    int64_t compaction_level = 0;
144
145
    // For local rowset
146
    std::string tablet_path;
147
148
    // For remote rowset
149
    std::optional<StorageResource> storage_resource;
150
151
    std::optional<EncryptionAlgorithmPB> encrypt_algorithm;
152
153
    std::string job_id;
154
155
6.85k
    bool is_local_rowset() const { return !storage_resource; }
156
157
5.74k
    std::string segment_path(int seg_id) const {
158
5.74k
        if (is_local_rowset()) {
159
5.74k
            return local_segment_path(tablet_path, rowset_id.to_string(), seg_id);
160
5.74k
        } else {
161
0
            return storage_resource->remote_segment_path(tablet_id, rowset_id.to_string(), seg_id);
162
0
        }
163
5.74k
    }
164
165
5.78k
    io::FileSystemSPtr fs() const {
166
        // Return cached instance if available to ensure consistency across multiple calls
167
5.78k
        if (_cached_fs != nullptr) {
168
5.13k
            return _cached_fs;
169
5.13k
        }
170
171
655
        auto fs = [this]() -> io::FileSystemSPtr {
172
655
            if (is_local_rowset()) {
173
655
                return io::global_local_filesystem();
174
655
            } else {
175
0
                return storage_resource->fs;
176
0
            }
177
655
        }();
178
179
655
        bool is_s3_fs = fs->type() == io::FileSystemType::S3;
180
181
655
        auto algorithm = encrypt_algorithm;
182
183
655
        if (!algorithm.has_value()) {
184
#ifndef BE_TEST
185
            constexpr std::string_view msg =
186
                    "RowsetWriterContext::determine_encryption is not called when creating this "
187
                    "RowsetWriterContext, it will result in encrypted rowsets left unencrypted";
188
            auto st = Status::InternalError(msg);
189
190
            LOG(WARNING) << st;
191
            DCHECK(false) << st;
192
#else
193
584
            algorithm = EncryptionAlgorithmPB::PLAINTEXT;
194
584
#endif
195
584
        }
196
197
        // Apply packed file system first for write path if enabled
198
        // Create empty index_map for write path
199
        // Index information will be populated after write completes
200
655
        bool has_v1_inverted_index = tablet_schema != nullptr &&
201
655
                                     tablet_schema->has_inverted_index() &&
202
655
                                     tablet_schema->get_inverted_index_storage_format() ==
203
118
                                             InvertedIndexStorageFormatPB::V1;
204
205
655
        if (has_v1_inverted_index && allow_packed_file && config::enable_packed_file) {
206
0
            static constexpr std::string_view kMsg =
207
0
                    "Disable packed file for V1 inverted index tablet to avoid missing index "
208
0
                    "metadata (temporary workaround)";
209
0
            LOG(INFO) << kMsg << ", tablet_id=" << tablet_id << ", rowset_id=" << rowset_id;
210
0
        }
211
212
        // Only enable merge file for S3 file system, not for HDFS or other remote file systems
213
655
        packed_file_active = allow_packed_file && config::is_cloud_mode() &&
214
655
                             config::enable_packed_file && !has_v1_inverted_index && is_s3_fs;
215
216
655
        if (packed_file_active) {
217
0
            io::PackedAppendContext append_info;
218
0
            append_info.tablet_id = tablet_id;
219
0
            append_info.rowset_id = rowset_id.to_string();
220
0
            append_info.txn_id = txn_id;
221
0
            append_info.expiration_time = file_cache_ttl_sec > 0 && newest_write_timestamp > 0
222
0
                                                  ? newest_write_timestamp + file_cache_ttl_sec
223
0
                                                  : 0;
224
0
            fs = std::make_shared<io::PackedFileSystem>(fs, append_info);
225
0
        }
226
227
        // Then apply encryption on top
228
655
        if (algorithm.has_value()) {
229
655
            fs = io::make_file_system(fs, algorithm.value());
230
655
        }
231
232
        // Cache the result to ensure consistency across multiple calls
233
655
        _cached_fs = fs;
234
655
        return fs;
235
5.78k
    }
236
237
0
    io::FileSystem& fs_ref() const { return *fs(); }
238
239
5.73k
    io::FileWriterOptions get_file_writer_options(bool is_index_file = false) {
240
5.73k
        bool should_write_cache = write_file_cache;
241
        // If configured to only write index files to cache, skip cache for data files
242
5.73k
        if (compaction_output_write_index_only && !is_index_file) {
243
4
            should_write_cache = false;
244
4
        }
245
246
5.73k
        return io::FileWriterOptions {.write_file_cache = should_write_cache,
247
5.73k
                                      .is_cold_data = is_hot_data,
248
5.73k
                                      .file_cache_expiration_time = file_cache_ttl_sec,
249
5.73k
                                      .approximate_bytes_to_write = approximate_bytes_to_write};
250
5.73k
    }
251
};
252
253
} // namespace doris