Coverage Report

Created: 2026-04-16 17:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/delta_writer/delta_writer_v2.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/delta_writer/delta_writer_v2.h"
19
20
#include <brpc/controller.h>
21
#include <butil/errno.h>
22
#include <fmt/format.h>
23
#include <gen_cpp/internal_service.pb.h>
24
#include <gen_cpp/olap_file.pb.h>
25
26
#include <filesystem>
27
#include <ostream>
28
#include <string>
29
#include <utility>
30
31
#include "common/compiler_util.h" // IWYU pragma: keep
32
#include "common/config.h"
33
#include "common/logging.h"
34
#include "common/status.h"
35
#include "core/block/block.h"
36
#include "exec/sink/load_stream_stub.h"
37
#include "io/fs/file_writer.h" // IWYU pragma: keep
38
#include "runtime/exec_env.h"
39
#include "runtime/query_context.h"
40
#include "service/backend_options.h"
41
#include "storage/data_dir.h"
42
#include "storage/index/inverted/inverted_index_desc.h"
43
#include "storage/olap_define.h"
44
#include "storage/rowset/beta_rowset.h"
45
#include "storage/rowset/beta_rowset_writer_v2.h"
46
#include "storage/rowset/rowset_meta.h"
47
#include "storage/rowset/rowset_writer.h"
48
#include "storage/rowset/rowset_writer_context.h"
49
#include "storage/schema.h"
50
#include "storage/schema_change/schema_change.h"
51
#include "storage/segment/segment.h"
52
#include "storage/storage_engine.h"
53
#include "storage/tablet/tablet_manager.h"
54
#include "storage/tablet/tablet_schema.h"
55
#include "storage/tablet_info.h"
56
#include "util/brpc_client_cache.h"
57
#include "util/brpc_closure.h"
58
#include "util/debug_points.h"
59
#include "util/mem_info.h"
60
#include "util/stopwatch.hpp"
61
#include "util/time.h"
62
63
namespace doris {
64
using namespace ErrorCode;
65
66
DeltaWriterV2::DeltaWriterV2(WriteRequest* req,
67
                             const std::vector<std::shared_ptr<LoadStreamStub>>& streams,
68
                             RuntimeState* state)
69
2
        : _state(state),
70
2
          _req(*req),
71
2
          _tablet_schema(new TabletSchema),
72
2
          _memtable_writer(new MemTableWriter(*req)),
73
2
          _streams(streams) {}
74
75
0
void DeltaWriterV2::_update_profile(RuntimeProfile* profile) {
76
0
    auto child = profile->create_child(fmt::format("DeltaWriterV2 {}", _req.tablet_id), true, true);
77
0
    auto write_memtable_timer = ADD_TIMER(child, "WriteMemTableTime");
78
0
    auto wait_flush_limit_timer = ADD_TIMER(child, "WaitFlushLimitTime");
79
0
    auto close_wait_timer = ADD_TIMER(child, "CloseWaitTime");
80
0
    COUNTER_SET(write_memtable_timer, _write_memtable_time);
81
0
    COUNTER_SET(wait_flush_limit_timer, _wait_flush_limit_time);
82
0
    COUNTER_SET(close_wait_timer, _close_wait_time);
83
0
}
84
85
2
DeltaWriterV2::~DeltaWriterV2() {
86
2
    if (!_is_init) {
87
2
        return;
88
2
    }
89
90
    // cancel and wait all memtables in flush queue to be finished
91
0
    static_cast<void>(_memtable_writer->cancel());
92
0
}
93
94
1
Status DeltaWriterV2::init() {
95
1
    if (_is_init) {
96
0
        return Status::OK();
97
0
    }
98
    // build tablet schema in request level
99
1
    DBUG_EXECUTE_IF("DeltaWriterV2.init.stream_size", { _streams.clear(); });
100
1
    if (_streams.size() == 0 || _streams[0]->tablet_schema(_req.index_id) == nullptr) {
101
1
        return Status::InternalError("failed to find tablet schema for {}", _req.index_id);
102
1
    }
103
0
    RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(),
104
0
                                                 *_streams[0]->tablet_schema(_req.index_id)));
105
0
    RowsetWriterContext context;
106
0
    context.txn_id = _req.txn_id;
107
0
    context.load_id = _req.load_id;
108
0
    context.index_id = _req.index_id;
109
0
    context.partition_id = _req.partition_id;
110
0
    context.rowset_state = PREPARED;
111
0
    context.segments_overlap = OVERLAPPING;
112
0
    context.tablet_schema = _tablet_schema;
113
0
    context.newest_write_timestamp = UnixSeconds();
114
0
    context.tablet = nullptr;
115
0
    context.write_type = DataWriteType::TYPE_DIRECT;
116
0
    context.tablet_id = _req.tablet_id;
117
0
    context.partition_id = _req.partition_id;
118
0
    context.tablet_schema_hash = _req.schema_hash;
119
0
    context.enable_unique_key_merge_on_write = _streams[0]->enable_unique_mow(_req.index_id);
120
0
    context.rowset_type = RowsetTypePB::BETA_ROWSET;
121
0
    context.rowset_id = ExecEnv::GetInstance()->storage_engine().next_rowset_id();
122
0
    context.data_dir = nullptr;
123
0
    context.partial_update_info = _partial_update_info;
124
0
    context.memtable_on_sink_support_index_v2 = true;
125
0
    context.encrypt_algorithm = EncryptionAlgorithmPB::PLAINTEXT;
126
127
0
    _rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams);
128
0
    RETURN_IF_ERROR(_rowset_writer->init(context));
129
0
    std::shared_ptr<WorkloadGroup> wg_sptr = nullptr;
130
0
    if (_state->get_query_ctx()) {
131
0
        wg_sptr = _state->get_query_ctx()->workload_group();
132
0
    }
133
0
    RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, _partial_update_info,
134
0
                                           wg_sptr, _streams[0]->enable_unique_mow(_req.index_id)));
135
0
    ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
136
0
    _is_init = true;
137
0
    _streams.clear();
138
0
    return Status::OK();
139
0
}
140
141
0
Status DeltaWriterV2::write(const Block* block, const DorisVector<uint32_t>& row_idxs) {
142
0
    if (UNLIKELY(row_idxs.empty())) {
143
0
        return Status::OK();
144
0
    }
145
0
    _lock_watch.start();
146
0
    std::lock_guard<std::mutex> l(_lock);
147
0
    _lock_watch.stop();
148
0
    if (!_is_init && !_is_cancelled) {
149
0
        RETURN_IF_ERROR(init());
150
0
    }
151
0
    {
152
0
        SCOPED_RAW_TIMER(&_wait_flush_limit_time);
153
0
        auto memtable_flush_running_count_limit = config::memtable_flush_running_count_limit;
154
0
        DBUG_EXECUTE_IF("DeltaWriterV2.write.back_pressure",
155
0
                        { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
156
0
        while (_memtable_writer->flush_running_count() >= memtable_flush_running_count_limit) {
157
0
            if (_state->is_cancelled()) {
158
0
                return _state->cancel_reason();
159
0
            }
160
0
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
161
0
        }
162
0
    }
163
0
    SCOPED_RAW_TIMER(&_write_memtable_time);
164
0
    return _memtable_writer->write(block, row_idxs);
165
0
}
166
167
1
Status DeltaWriterV2::close() {
168
1
    _lock_watch.start();
169
1
    std::lock_guard<std::mutex> l(_lock);
170
1
    _lock_watch.stop();
171
1
    if (!_is_init && !_is_cancelled) {
172
        // if this delta writer is not initialized, but close() is called.
173
        // which means this tablet has no data loaded, but at least one tablet
174
        // in same partition has data loaded.
175
        // so we have to also init this DeltaWriterV2, so that it can create an empty rowset
176
        // for this tablet when being closed.
177
1
        RETURN_IF_ERROR(init());
178
1
    }
179
0
    return _memtable_writer->close();
180
1
}
181
182
0
Status DeltaWriterV2::close_wait(int32_t& num_segments, RuntimeProfile* profile) {
183
0
    SCOPED_RAW_TIMER(&_close_wait_time);
184
0
    std::lock_guard<std::mutex> l(_lock);
185
0
    DCHECK(_is_init)
186
0
            << "delta writer is supposed be to initialized before close_wait() being called";
187
188
0
    if (profile != nullptr) {
189
0
        _update_profile(profile);
190
0
    }
191
0
    RETURN_IF_ERROR(_memtable_writer->close_wait(profile));
192
0
    num_segments = _rowset_writer->next_segment_id();
193
194
0
    _delta_written_success = true;
195
0
    return Status::OK();
196
0
}
197
198
0
Status DeltaWriterV2::cancel() {
199
0
    return cancel_with_status(Status::Cancelled("already cancelled"));
200
0
}
201
202
0
Status DeltaWriterV2::cancel_with_status(const Status& st) {
203
0
    std::lock_guard<std::mutex> l(_lock);
204
0
    if (_is_cancelled) {
205
0
        return Status::OK();
206
0
    }
207
0
    RETURN_IF_ERROR(_memtable_writer->cancel_with_status(st));
208
0
    _is_cancelled = true;
209
0
    return Status::OK();
210
0
}
211
212
Status DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,
213
                                                   const OlapTableSchemaParam* table_schema_param,
214
0
                                                   const TabletSchema& ori_tablet_schema) {
215
0
    _tablet_schema->copy_from(ori_tablet_schema);
216
    // find the right index id
217
0
    int i = 0;
218
0
    auto indexes = table_schema_param->indexes();
219
0
    for (; i < indexes.size(); i++) {
220
0
        if (indexes[i]->index_id == index_id) {
221
0
            break;
222
0
        }
223
0
    }
224
225
0
    if (!indexes.empty() && !indexes[i]->columns.empty() &&
226
0
        indexes[i]->columns[0]->unique_id() >= 0) {
227
0
        _tablet_schema->build_current_tablet_schema(
228
0
                index_id, static_cast<int32_t>(table_schema_param->version()), indexes[i],
229
0
                ori_tablet_schema);
230
0
    }
231
232
0
    _tablet_schema->set_table_id(table_schema_param->table_id());
233
0
    _tablet_schema->set_db_id(table_schema_param->db_id());
234
0
    if (table_schema_param->is_partial_update()) {
235
0
        _tablet_schema->set_auto_increment_column(table_schema_param->auto_increment_coulumn());
236
0
    }
237
    // set partial update columns info
238
0
    _partial_update_info = std::make_shared<PartialUpdateInfo>();
239
0
    RETURN_IF_ERROR(_partial_update_info->init(
240
0
            _req.tablet_id, _req.txn_id, *_tablet_schema,
241
0
            table_schema_param->unique_key_update_mode(),
242
0
            table_schema_param->partial_update_new_key_policy(),
243
0
            table_schema_param->partial_update_input_columns(),
244
0
            table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(),
245
0
            table_schema_param->nano_seconds(), table_schema_param->timezone(),
246
0
            table_schema_param->auto_increment_coulumn()));
247
0
    return Status::OK();
248
0
}
249
250
} // namespace doris