Coverage Report

Created: 2026-03-14 20:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/delta_writer/delta_writer_v2.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/delta_writer/delta_writer_v2.h"
19
20
#include <brpc/controller.h>
21
#include <butil/errno.h>
22
#include <fmt/format.h>
23
#include <gen_cpp/internal_service.pb.h>
24
#include <gen_cpp/olap_file.pb.h>
25
26
#include <filesystem>
27
#include <ostream>
28
#include <string>
29
#include <utility>
30
31
#include "common/compiler_util.h" // IWYU pragma: keep
32
#include "common/config.h"
33
#include "common/logging.h"
34
#include "common/status.h"
35
#include "core/block/block.h"
36
#include "exec/sink/load_stream_stub.h"
37
#include "io/fs/file_writer.h" // IWYU pragma: keep
38
#include "runtime/exec_env.h"
39
#include "runtime/query_context.h"
40
#include "service/backend_options.h"
41
#include "storage/data_dir.h"
42
#include "storage/index/inverted/inverted_index_desc.h"
43
#include "storage/olap_define.h"
44
#include "storage/rowset/beta_rowset.h"
45
#include "storage/rowset/beta_rowset_writer_v2.h"
46
#include "storage/rowset/rowset_meta.h"
47
#include "storage/rowset/rowset_writer.h"
48
#include "storage/rowset/rowset_writer_context.h"
49
#include "storage/schema.h"
50
#include "storage/schema_change/schema_change.h"
51
#include "storage/segment/segment.h"
52
#include "storage/storage_engine.h"
53
#include "storage/tablet/tablet_manager.h"
54
#include "storage/tablet/tablet_schema.h"
55
#include "storage/tablet_info.h"
56
#include "util/brpc_client_cache.h"
57
#include "util/brpc_closure.h"
58
#include "util/debug_points.h"
59
#include "util/mem_info.h"
60
#include "util/stopwatch.hpp"
61
#include "util/time.h"
62
63
namespace doris {
64
#include "common/compile_check_begin.h"
65
using namespace ErrorCode;
66
67
DeltaWriterV2::DeltaWriterV2(WriteRequest* req,
68
                             const std::vector<std::shared_ptr<LoadStreamStub>>& streams,
69
                             RuntimeState* state)
70
2
        : _state(state),
71
2
          _req(*req),
72
2
          _tablet_schema(new TabletSchema),
73
2
          _memtable_writer(new MemTableWriter(*req)),
74
2
          _streams(streams) {}
75
76
0
void DeltaWriterV2::_update_profile(RuntimeProfile* profile) {
77
0
    auto child = profile->create_child(fmt::format("DeltaWriterV2 {}", _req.tablet_id), true, true);
78
0
    auto write_memtable_timer = ADD_TIMER(child, "WriteMemTableTime");
79
0
    auto wait_flush_limit_timer = ADD_TIMER(child, "WaitFlushLimitTime");
80
0
    auto close_wait_timer = ADD_TIMER(child, "CloseWaitTime");
81
0
    COUNTER_SET(write_memtable_timer, _write_memtable_time);
82
0
    COUNTER_SET(wait_flush_limit_timer, _wait_flush_limit_time);
83
0
    COUNTER_SET(close_wait_timer, _close_wait_time);
84
0
}
85
86
2
DeltaWriterV2::~DeltaWriterV2() {
87
2
    if (!_is_init) {
88
2
        return;
89
2
    }
90
91
    // cancel and wait all memtables in flush queue to be finished
92
0
    static_cast<void>(_memtable_writer->cancel());
93
0
}
94
95
1
Status DeltaWriterV2::init() {
96
1
    if (_is_init) {
97
0
        return Status::OK();
98
0
    }
99
    // build tablet schema in request level
100
1
    DBUG_EXECUTE_IF("DeltaWriterV2.init.stream_size", { _streams.clear(); });
101
1
    if (_streams.size() == 0 || _streams[0]->tablet_schema(_req.index_id) == nullptr) {
102
1
        return Status::InternalError("failed to find tablet schema for {}", _req.index_id);
103
1
    }
104
0
    RETURN_IF_ERROR(_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(),
105
0
                                                 *_streams[0]->tablet_schema(_req.index_id)));
106
0
    RowsetWriterContext context;
107
0
    context.txn_id = _req.txn_id;
108
0
    context.load_id = _req.load_id;
109
0
    context.index_id = _req.index_id;
110
0
    context.partition_id = _req.partition_id;
111
0
    context.rowset_state = PREPARED;
112
0
    context.segments_overlap = OVERLAPPING;
113
0
    context.tablet_schema = _tablet_schema;
114
0
    context.newest_write_timestamp = UnixSeconds();
115
0
    context.tablet = nullptr;
116
0
    context.write_type = DataWriteType::TYPE_DIRECT;
117
0
    context.tablet_id = _req.tablet_id;
118
0
    context.partition_id = _req.partition_id;
119
0
    context.tablet_schema_hash = _req.schema_hash;
120
0
    context.enable_unique_key_merge_on_write = _streams[0]->enable_unique_mow(_req.index_id);
121
0
    context.rowset_type = RowsetTypePB::BETA_ROWSET;
122
0
    context.rowset_id = ExecEnv::GetInstance()->storage_engine().next_rowset_id();
123
0
    context.data_dir = nullptr;
124
0
    context.partial_update_info = _partial_update_info;
125
0
    context.memtable_on_sink_support_index_v2 = true;
126
0
    context.encrypt_algorithm = EncryptionAlgorithmPB::PLAINTEXT;
127
128
0
    _rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams);
129
0
    RETURN_IF_ERROR(_rowset_writer->init(context));
130
0
    std::shared_ptr<WorkloadGroup> wg_sptr = nullptr;
131
0
    if (_state->get_query_ctx()) {
132
0
        wg_sptr = _state->get_query_ctx()->workload_group();
133
0
    }
134
0
    RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, _partial_update_info,
135
0
                                           wg_sptr, _streams[0]->enable_unique_mow(_req.index_id)));
136
0
    ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
137
0
    _is_init = true;
138
0
    _streams.clear();
139
0
    return Status::OK();
140
0
}
141
142
0
Status DeltaWriterV2::write(const Block* block, const DorisVector<uint32_t>& row_idxs) {
143
0
    if (UNLIKELY(row_idxs.empty())) {
144
0
        return Status::OK();
145
0
    }
146
0
    _lock_watch.start();
147
0
    std::lock_guard<std::mutex> l(_lock);
148
0
    _lock_watch.stop();
149
0
    if (!_is_init && !_is_cancelled) {
150
0
        RETURN_IF_ERROR(init());
151
0
    }
152
0
    {
153
0
        SCOPED_RAW_TIMER(&_wait_flush_limit_time);
154
0
        auto memtable_flush_running_count_limit = config::memtable_flush_running_count_limit;
155
0
        DBUG_EXECUTE_IF("DeltaWriterV2.write.back_pressure",
156
0
                        { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
157
0
        while (_memtable_writer->flush_running_count() >= memtable_flush_running_count_limit) {
158
0
            if (_state->is_cancelled()) {
159
0
                return _state->cancel_reason();
160
0
            }
161
0
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
162
0
        }
163
0
    }
164
0
    SCOPED_RAW_TIMER(&_write_memtable_time);
165
0
    return _memtable_writer->write(block, row_idxs);
166
0
}
167
168
1
Status DeltaWriterV2::close() {
169
1
    _lock_watch.start();
170
1
    std::lock_guard<std::mutex> l(_lock);
171
1
    _lock_watch.stop();
172
1
    if (!_is_init && !_is_cancelled) {
173
        // if this delta writer is not initialized, but close() is called.
174
        // which means this tablet has no data loaded, but at least one tablet
175
        // in same partition has data loaded.
176
        // so we have to also init this DeltaWriterV2, so that it can create an empty rowset
177
        // for this tablet when being closed.
178
1
        RETURN_IF_ERROR(init());
179
1
    }
180
0
    return _memtable_writer->close();
181
1
}
182
183
0
Status DeltaWriterV2::close_wait(int32_t& num_segments, RuntimeProfile* profile) {
184
0
    SCOPED_RAW_TIMER(&_close_wait_time);
185
0
    std::lock_guard<std::mutex> l(_lock);
186
0
    DCHECK(_is_init)
187
0
            << "delta writer is supposed be to initialized before close_wait() being called";
188
189
0
    if (profile != nullptr) {
190
0
        _update_profile(profile);
191
0
    }
192
0
    RETURN_IF_ERROR(_memtable_writer->close_wait(profile));
193
0
    num_segments = _rowset_writer->next_segment_id();
194
195
0
    _delta_written_success = true;
196
0
    return Status::OK();
197
0
}
198
199
0
Status DeltaWriterV2::cancel() {
200
0
    return cancel_with_status(Status::Cancelled("already cancelled"));
201
0
}
202
203
0
Status DeltaWriterV2::cancel_with_status(const Status& st) {
204
0
    std::lock_guard<std::mutex> l(_lock);
205
0
    if (_is_cancelled) {
206
0
        return Status::OK();
207
0
    }
208
0
    RETURN_IF_ERROR(_memtable_writer->cancel_with_status(st));
209
0
    _is_cancelled = true;
210
0
    return Status::OK();
211
0
}
212
213
Status DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,
214
                                                   const OlapTableSchemaParam* table_schema_param,
215
0
                                                   const TabletSchema& ori_tablet_schema) {
216
0
    _tablet_schema->copy_from(ori_tablet_schema);
217
    // find the right index id
218
0
    int i = 0;
219
0
    auto indexes = table_schema_param->indexes();
220
0
    for (; i < indexes.size(); i++) {
221
0
        if (indexes[i]->index_id == index_id) {
222
0
            break;
223
0
        }
224
0
    }
225
226
0
    if (!indexes.empty() && !indexes[i]->columns.empty() &&
227
0
        indexes[i]->columns[0]->unique_id() >= 0) {
228
0
        _tablet_schema->build_current_tablet_schema(
229
0
                index_id, static_cast<int32_t>(table_schema_param->version()), indexes[i],
230
0
                ori_tablet_schema);
231
0
    }
232
233
0
    _tablet_schema->set_table_id(table_schema_param->table_id());
234
0
    _tablet_schema->set_db_id(table_schema_param->db_id());
235
0
    if (table_schema_param->is_partial_update()) {
236
0
        _tablet_schema->set_auto_increment_column(table_schema_param->auto_increment_coulumn());
237
0
    }
238
    // set partial update columns info
239
0
    _partial_update_info = std::make_shared<PartialUpdateInfo>();
240
0
    RETURN_IF_ERROR(_partial_update_info->init(
241
0
            _req.tablet_id, _req.txn_id, *_tablet_schema,
242
0
            table_schema_param->unique_key_update_mode(),
243
0
            table_schema_param->partial_update_new_key_policy(),
244
0
            table_schema_param->partial_update_input_columns(),
245
0
            table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(),
246
0
            table_schema_param->nano_seconds(), table_schema_param->timezone(),
247
0
            table_schema_param->auto_increment_coulumn()));
248
0
    return Status::OK();
249
0
}
250
251
#include "common/compile_check_end.h"
252
} // namespace doris