Coverage Report

Created: 2026-06-09 14:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/delta_writer/delta_writer.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/Types_types.h>
21
#include <gen_cpp/internal_service.pb.h>
22
#include <gen_cpp/types.pb.h>
23
24
#include <atomic>
25
#include <memory>
26
#include <mutex>
27
#include <shared_mutex>
28
#include <unordered_set>
29
#include <vector>
30
31
#include "common/status.h"
32
#include "load/delta_writer/delta_writer_context.h"
33
#include "load/memtable/memtable_writer.h"
34
#include "storage/olap_common.h"
35
#include "storage/rowset/rowset.h"
36
#include "storage/tablet/tablet.h"
37
#include "storage/tablet/tablet_meta.h"
38
#include "storage/tablet/tablet_schema.h"
39
#include "util/uid_util.h"
40
41
namespace doris {
42
43
class FlushToken;
44
class MemTable;
45
class StorageEngine;
46
class TupleDescriptor;
47
class SlotDescriptor;
48
class OlapTableSchemaParam;
49
class RowsetWriter;
50
51
class Block;
52
53
class BaseRowsetBuilder;
54
class RowsetBuilder;
55
56
// Writer for a particular (load, index, tablet).
57
// This class is NOT thread-safe, external synchronization is required.
58
class BaseDeltaWriter {
59
public:
60
    BaseDeltaWriter(const WriteRequest& req, RuntimeProfile* profile, const UniqueId& load_id);
61
62
    virtual ~BaseDeltaWriter();
63
64
    virtual Status write(const Block* block, const DorisVector<uint32_t>& row_idxs,
65
                         bool* memtable_flushed = nullptr) = 0;
66
67
    // flush the last memtable to flush queue, must call it before build_rowset()
68
    virtual Status close() = 0;
69
    // wait for all memtables to be flushed.
70
    // mem_consumption() should be 0 after this function returns.
71
    virtual Status build_rowset();
72
    Status submit_calc_delete_bitmap_task();
73
    Status wait_calc_delete_bitmap();
74
75
    // abandon current memtable and wait for all pending-flushing memtables to be destructed.
76
    // mem_consumption() should be 0 after this function returns.
77
    Status cancel();
78
    virtual Status cancel_with_status(const Status& st);
79
80
    int64_t mem_consumption(MemType mem);
81
82
    // Wait all memtable in flush queue to be flushed
83
    Status wait_flush();
84
85
    virtual Status flush_memtable_async();
86
87
0
    int64_t partition_id() const { return _req.partition_id; }
88
89
    int64_t table_id() const;
90
91
0
    int64_t tablet_id() const { return _req.tablet_id; }
92
93
0
    int64_t txn_id() const { return _req.txn_id; }
94
95
0
    int64_t total_received_rows() const { return _memtable_writer->total_received_rows(); }
96
97
    int64_t num_rows_filtered() const;
98
99
    static void collect_tablet_load_rowset_num_info(
100
            BaseTablet* tablet,
101
            google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>* tablet_infos);
102
103
    void set_tablet_load_rowset_num_info(
104
            google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>* tablet_info);
105
106
protected:
107
    virtual void _init_profile(RuntimeProfile* profile);
108
109
    Status init();
110
111
    bool _is_init = false;
112
    bool _is_cancelled = false;
113
    WriteRequest _req;
114
    std::unique_ptr<BaseRowsetBuilder> _rowset_builder;
115
    std::shared_ptr<MemTableWriter> _memtable_writer;
116
117
    // total rows num written by DeltaWriter
118
    std::atomic<int64_t> _total_received_rows = 0;
119
120
    RuntimeProfile* _profile = nullptr;
121
    RuntimeProfile::Counter* _close_wait_timer = nullptr;
122
    RuntimeProfile::Counter* _wait_flush_limit_timer = nullptr;
123
124
    MonotonicStopWatch _lock_watch;
125
};
126
127
// `StorageEngine` mixin for `BaseDeltaWriter`
128
class DeltaWriter final : public BaseDeltaWriter {
129
public:
130
    DeltaWriter(StorageEngine& engine, const WriteRequest& req, RuntimeProfile* profile,
131
                const UniqueId& load_id);
132
    DeltaWriter(StorageEngine& engine, const WriteRequest& group_build_req,
133
                const WriteRequest& sub_data_req, const WriteRequest& sub_row_binlog_req,
134
                RuntimeProfile* profile, const UniqueId& load_id);
135
136
    ~DeltaWriter() override;
137
138
    Status write(const Block* block, const DorisVector<uint32_t>& row_idxs,
139
                 bool* memtable_flushed = nullptr) override;
140
141
    Status close() override;
142
143
    Status flush_memtable_async() override;
144
145
    Status cancel_with_status(const Status& st) override;
146
147
    Status build_rowset() override;
148
149
    Status commit_txn(const PSlaveTabletNodes& slave_tablet_nodes);
150
151
    bool check_slave_replicas_done(google::protobuf::Map<int64_t, PSuccessSlaveTabletNodeIds>*
152
                                           success_slave_tablet_node_ids);
153
154
    void add_finished_slave_replicas(google::protobuf::Map<int64_t, PSuccessSlaveTabletNodeIds>*
155
                                             success_slave_tablet_node_ids);
156
157
    void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed);
158
159
private:
160
    void _init_profile(RuntimeProfile* profile) override;
161
162
    void _request_slave_tablet_pull_rowset(const PNodeInfo& node_info);
163
164
    std::mutex _lock;
165
166
    StorageEngine& _engine;
167
    std::unordered_set<int64_t> _unfinished_slave_node;
168
    PSuccessSlaveTabletNodeIds _success_slave_node_ids;
169
    std::shared_mutex _slave_node_lock;
170
171
    RuntimeProfile::Counter* _commit_txn_timer = nullptr;
172
};
173
174
} // namespace doris