Coverage Report

Created: 2026-06-17 02:02

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/delta_writer/delta_writer_v2.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/delta_writer/delta_writer_v2.h"
19
20
#include <brpc/controller.h>
21
#include <butil/errno.h>
22
#include <fmt/format.h>
23
#include <gen_cpp/internal_service.pb.h>
24
#include <gen_cpp/olap_file.pb.h>
25
26
#include <filesystem>
27
#include <ostream>
28
#include <string>
29
#include <utility>
30
31
#include "common/compiler_util.h" // IWYU pragma: keep
32
#include "common/config.h"
33
#include "common/logging.h"
34
#include "common/status.h"
35
#include "core/block/block.h"
36
#include "exec/sink/load_stream_stub.h"
37
#include "io/fs/file_writer.h" // IWYU pragma: keep
38
#include "runtime/exec_env.h"
39
#include "runtime/query_context.h"
40
#include "service/backend_options.h"
41
#include "storage/data_dir.h"
42
#include "storage/index/inverted/inverted_index_desc.h"
43
#include "storage/olap_define.h"
44
#include "storage/rowset/beta_rowset.h"
45
#include "storage/rowset/beta_rowset_writer_v2.h"
46
#include "storage/rowset/rowset_meta.h"
47
#include "storage/rowset/rowset_writer.h"
48
#include "storage/rowset/rowset_writer_context.h"
49
#include "storage/schema.h"
50
#include "storage/schema_change/schema_change.h"
51
#include "storage/segment/segment.h"
52
#include "storage/storage_engine.h"
53
#include "storage/tablet/tablet_manager.h"
54
#include "storage/tablet/tablet_schema.h"
55
#include "storage/tablet/tablet_schema_cache.h"
56
#include "storage/tablet_info.h"
57
#include "util/brpc_client_cache.h"
58
#include "util/brpc_closure.h"
59
#include "util/debug_points.h"
60
#include "util/mem_info.h"
61
#include "util/stopwatch.hpp"
62
#include "util/time.h"
63
64
namespace doris {
65
using namespace ErrorCode;
66
67
DeltaWriterV2::DeltaWriterV2(WriteRequest* req,
68
                             const std::vector<std::shared_ptr<LoadStreamStub>>& streams,
69
                             std::shared_ptr<WorkloadGroup> workload_group)
70
3
        : _req(*req),
71
3
          _workload_group(std::move(workload_group)),
72
3
          _tablet_schema(new TabletSchema),
73
3
          _memtable_writer(new MemTableWriter(*req)),
74
3
          _streams(streams) {}
75
76
0
void DeltaWriterV2::_update_profile(RuntimeProfile* profile) {
77
0
    auto child = profile->create_child(fmt::format("DeltaWriterV2 {}", _req.tablet_id), true, true);
78
0
    auto write_memtable_timer = ADD_TIMER(child, "WriteMemTableTime");
79
0
    auto wait_flush_limit_timer = ADD_TIMER(child, "WaitFlushLimitTime");
80
0
    auto close_wait_timer = ADD_TIMER(child, "CloseWaitTime");
81
0
    COUNTER_SET(write_memtable_timer, _write_memtable_time);
82
0
    COUNTER_SET(wait_flush_limit_timer, _wait_flush_limit_time);
83
0
    COUNTER_SET(close_wait_timer, _close_wait_time);
84
0
}
85
86
3
DeltaWriterV2::~DeltaWriterV2() {
87
3
    if (!_is_init) {
88
2
        return;
89
2
    }
90
91
    // cancel and wait all memtables in flush queue to be finished
92
1
    static_cast<void>(_memtable_writer->cancel());
93
1
}
94
95
2
Status DeltaWriterV2::init() {
96
2
    if (_is_init) {
97
0
        return Status::OK();
98
0
    }
99
    // build tablet schema in request level
100
2
    DBUG_EXECUTE_IF("DeltaWriterV2.init.stream_size", { _streams.clear(); });
101
2
    if (_streams.size() == 0 || _streams[0]->tablet_schema(_req.index_id) == nullptr) {
102
1
        return Status::InternalError("failed to find tablet schema for {}", _req.index_id);
103
1
    }
104
1
    RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(),
105
1
                                                 *_streams[0]->tablet_schema(_req.index_id)));
106
1
    RowsetWriterContext context;
107
1
    context.txn_id = _req.txn_id;
108
1
    context.load_id = _req.load_id;
109
1
    context.index_id = _req.index_id;
110
1
    context.partition_id = _req.partition_id;
111
1
    context.rowset_state = PREPARED;
112
1
    context.segments_overlap = OVERLAPPING;
113
1
    context.tablet_schema = _tablet_schema;
114
1
    context.db_id = _tablet_schema->db_id();
115
1
    context.table_id = _tablet_schema->table_id();
116
1
    context.newest_write_timestamp = UnixSeconds();
117
1
    context.tablet = nullptr;
118
1
    context.write_type = DataWriteType::TYPE_DIRECT;
119
1
    context.tablet_id = _req.tablet_id;
120
1
    context.partition_id = _req.partition_id;
121
1
    context.tablet_schema_hash = _req.schema_hash;
122
1
    context.enable_unique_key_merge_on_write = _streams[0]->enable_unique_mow(_req.index_id);
123
1
    context.rowset_type = RowsetTypePB::BETA_ROWSET;
124
1
    context.rowset_id = ExecEnv::GetInstance()->storage_engine().next_rowset_id();
125
1
    context.data_dir = nullptr;
126
1
    context.partial_update_info = _partial_update_info;
127
1
    context.memtable_on_sink_support_index_v2 = true;
128
1
    context.encrypt_algorithm = EncryptionAlgorithmPB::PLAINTEXT;
129
130
1
    _rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams);
131
1
    RETURN_IF_ERROR(_rowset_writer->init(context));
132
1
    RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, _partial_update_info,
133
1
                                           _workload_group,
134
1
                                           _streams[0]->enable_unique_mow(_req.index_id)));
135
1
    ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
136
1
    _is_init = true;
137
1
    _streams.clear();
138
1
    return Status::OK();
139
1
}
140
141
Status DeltaWriterV2::write(const Block* block, const DorisVector<uint32_t>& row_idxs,
142
1
                            const std::function<Status()>& cancel_check) {
143
1
    if (UNLIKELY(row_idxs.empty())) {
144
0
        return Status::OK();
145
0
    }
146
1
    _lock_watch.start();
147
1
    std::lock_guard<std::mutex> l(_lock);
148
1
    _lock_watch.stop();
149
1
    if (!_is_init && !_is_cancelled) {
150
1
        RETURN_IF_ERROR(init());
151
1
    }
152
1
    {
153
1
        SCOPED_RAW_TIMER(&_wait_flush_limit_time);
154
1
        auto memtable_flush_running_count_limit = config::memtable_flush_running_count_limit;
155
1
        DBUG_EXECUTE_IF("DeltaWriterV2.write.back_pressure",
156
1
                        { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
157
1
        while (_memtable_writer->flush_running_count() >= memtable_flush_running_count_limit) {
158
0
            DBUG_EXECUTE_IF("DeltaWriterV2.write.flush_limit_wait", DBUG_RUN_CALLBACK());
159
0
            RETURN_IF_ERROR(cancel_check());
160
0
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
161
0
        }
162
1
    }
163
1
    SCOPED_RAW_TIMER(&_write_memtable_time);
164
1
    return _memtable_writer->write(block, row_idxs);
165
1
}
166
167
1
Status DeltaWriterV2::close() {
168
1
    _lock_watch.start();
169
1
    std::lock_guard<std::mutex> l(_lock);
170
1
    _lock_watch.stop();
171
1
    if (!_is_init && !_is_cancelled) {
172
        // if this delta writer is not initialized, but close() is called.
173
        // which means this tablet has no data loaded, but at least one tablet
174
        // in same partition has data loaded.
175
        // so we have to also init this DeltaWriterV2, so that it can create an empty rowset
176
        // for this tablet when being closed.
177
1
        RETURN_IF_ERROR(init());
178
1
    }
179
0
    return _memtable_writer->close();
180
1
}
181
182
0
Status DeltaWriterV2::close_wait(int32_t& num_segments, RuntimeProfile* profile) {
183
0
    SCOPED_RAW_TIMER(&_close_wait_time);
184
0
    std::lock_guard<std::mutex> l(_lock);
185
0
    DCHECK(_is_init)
186
0
            << "delta writer is supposed be to initialized before close_wait() being called";
187
188
0
    if (profile != nullptr) {
189
0
        _update_profile(profile);
190
0
    }
191
0
    RETURN_IF_ERROR(_memtable_writer->close_wait(profile));
192
0
    num_segments = _rowset_writer->next_segment_id();
193
194
0
    _delta_written_success = true;
195
0
    return Status::OK();
196
0
}
197
198
0
Status DeltaWriterV2::cancel() {
199
0
    return cancel_with_status(Status::Cancelled("already cancelled"));
200
0
}
201
202
1
Status DeltaWriterV2::cancel_with_status(const Status& st) {
203
1
    std::lock_guard<std::mutex> l(_lock);
204
1
    if (_is_cancelled) {
205
0
        return Status::OK();
206
0
    }
207
1
    RETURN_IF_ERROR(_memtable_writer->cancel_with_status(st));
208
1
    _is_cancelled = true;
209
1
    return Status::OK();
210
1
}
211
212
Status DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,
213
                                                   const OlapTableSchemaParam* table_schema_param,
214
1
                                                   const TabletSchema& ori_tablet_schema) {
215
    // find the right index id
216
1
    const OlapTableIndexSchema* index_schema = nullptr;
217
1
    for (const auto* schema : table_schema_param->indexes()) {
218
1
        if (schema->index_id == index_id) {
219
1
            index_schema = schema;
220
1
            break;
221
1
        }
222
1
    }
223
224
1
    auto cache_key = TabletSchemaCache::build_load_schema_cache_key(
225
1
            index_id, table_schema_param, ori_tablet_schema, index_schema);
226
1
    auto cached_schema = TabletSchemaCache::instance()->lookup_schema(cache_key);
227
1
    if (cached_schema.first != nullptr) {
228
0
        _tablet_schema = cached_schema.second;
229
0
        TabletSchemaCache::instance()->release(cached_schema.first);
230
1
    } else {
231
1
        _tablet_schema->copy_from(ori_tablet_schema);
232
1
        if (index_schema != nullptr && !index_schema->columns.empty() &&
233
1
            index_schema->columns[0]->unique_id() >= 0) {
234
1
            _tablet_schema->build_current_tablet_schema(
235
1
                    index_id, static_cast<int32_t>(table_schema_param->version()), index_schema,
236
1
                    ori_tablet_schema);
237
1
        }
238
1
        _tablet_schema->set_table_id(table_schema_param->table_id());
239
1
        _tablet_schema->set_db_id(table_schema_param->db_id());
240
1
        if (table_schema_param->is_partial_update()) {
241
0
            _tablet_schema->set_auto_increment_column(table_schema_param->auto_increment_coulumn());
242
0
        }
243
1
        auto inserted_schema = TabletSchemaCache::instance()->insert(cache_key, _tablet_schema);
244
1
        _tablet_schema = inserted_schema.second;
245
1
        TabletSchemaCache::instance()->release(inserted_schema.first);
246
1
    }
247
248
    // set partial update columns info
249
1
    _partial_update_info = std::make_shared<PartialUpdateInfo>();
250
1
    RETURN_IF_ERROR(_partial_update_info->init(
251
1
            _req.tablet_id, _req.txn_id, *_tablet_schema,
252
1
            table_schema_param->unique_key_update_mode(),
253
1
            table_schema_param->partial_update_new_key_policy(),
254
1
            table_schema_param->partial_update_input_columns(),
255
1
            table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(),
256
1
            table_schema_param->nano_seconds(), table_schema_param->timezone(),
257
1
            table_schema_param->auto_increment_coulumn()));
258
1
    return Status::OK();
259
1
}
260
261
} // namespace doris