Coverage Report

Created: 2026-06-09 14:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/channel/tablets_channel.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <glog/logging.h>
21
22
#include <atomic>
23
#include <cstdint>
24
#include <mutex>
25
#include <ostream>
26
#include <shared_mutex>
27
#include <string>
28
#include <unordered_map>
29
#include <unordered_set>
30
#include <vector>
31
32
#include "common/status.h"
33
#include "core/custom_allocator.h"
34
#include "exec/sink/vtablet_finder.h"
35
#include "runtime/runtime_profile.h"
36
#include "util/bitmap.h"
37
#include "util/uid_util.h"
38
39
namespace google::protobuf {
40
template <typename Element>
41
class RepeatedField;
42
template <typename Key, typename T>
43
class Map;
44
template <typename T>
45
class RepeatedPtrField;
46
} // namespace google::protobuf
47
48
namespace doris {
49
class PSlaveTabletNodes;
50
class PSuccessSlaveTabletNodeIds;
51
class PTabletError;
52
class PTabletInfo;
53
class PTabletWriterOpenRequest;
54
class PTabletWriterOpenResult;
55
class PTabletWriterAddBlockRequest;
56
class PTabletWriterAddBlockResult;
57
class PUniqueId;
58
class TupleDescriptor;
59
class OpenPartitionRequest;
60
class StorageEngine;
61
62
struct TabletsChannelKey {
63
    UniqueId id;
64
    int64_t index_id;
65
66
0
    TabletsChannelKey(const PUniqueId& pid, int64_t index_id_) : id(pid), index_id(index_id_) {}
67
68
0
    ~TabletsChannelKey() noexcept = default;
69
70
0
    bool operator==(const TabletsChannelKey& rhs) const noexcept {
71
0
        return index_id == rhs.index_id && id == rhs.id;
72
0
    }
73
74
    std::string to_string() const;
75
};
76
77
std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key);
78
79
class BaseDeltaWriter;
80
class MemTableWriter;
81
class OlapTableSchemaParam;
82
class LoadChannel;
83
struct WriteRequest;
84
85
// Write channel for a particular (load, index).
86
class BaseTabletsChannel {
87
public:
88
    BaseTabletsChannel(const TabletsChannelKey& key, const UniqueId& load_id, bool is_high_priority,
89
                       RuntimeProfile* profile);
90
91
    virtual ~BaseTabletsChannel();
92
93
    Status open(const PTabletWriterOpenRequest& request);
94
    // open + open writers
95
    Status incremental_open(const PTabletWriterOpenRequest& params);
96
97
    virtual std::unique_ptr<BaseDeltaWriter> create_delta_writer(const WriteRequest& request) = 0;
98
99
    // no-op when this channel has been closed or cancelled
100
    virtual Status add_batch(const PTabletWriterAddBlockRequest& request,
101
                             PTabletWriterAddBlockResult* response) = 0;
102
103
    // Mark sender with 'sender_id' as closed.
104
    // If all senders are closed, close this channel, set '*finished' to true, update 'tablet_vec'
105
    // to include all tablets written in this channel.
106
    // no-op when this channel has been closed or cancelled
107
    virtual Status close(LoadChannel* parent, const PTabletWriterAddBlockRequest& req,
108
                         PTabletWriterAddBlockResult* res, bool* finished) = 0;
109
110
    // no-op when this channel has been closed or cancelled
111
    virtual Status cancel();
112
113
    void refresh_profile();
114
115
0
    size_t total_received_rows() const { return _total_received_rows; }
116
117
0
    size_t num_rows_filtered() const { return _num_rows_filtered; }
118
119
    // means this tablets in this BE is incremental opened partitions.
120
0
    bool is_incremental_channel() const { return _open_by_incremental; }
121
122
0
    bool is_finished() const { return _state == kFinished; }
123
124
protected:
125
    struct RandomBucketPartitionParam {
126
        std::vector<int64_t> ordered_tablet_ids;
127
    };
128
129
    void _init_receiver_side_random_bucket_state(const PTabletWriterOpenRequest& request);
130
    Status _write_block_data(const PTabletWriterAddBlockRequest& request, int64_t cur_seq,
131
                             std::unordered_map<int64_t, DorisVector<uint32_t>>& tablet_to_rowidxs,
132
                             PTabletWriterAddBlockResult* response);
133
    Status _write_block_data_for_receiver_side_random_bucket(
134
            const PTabletWriterAddBlockRequest& request, int64_t cur_seq,
135
            std::unordered_map<int64_t, DorisVector<uint32_t>>& partition_to_rowidxs,
136
            PTabletWriterAddBlockResult* response);
137
    void _build_partition_to_rowidxs_for_receiver_side_random_bucket(
138
            const PTabletWriterAddBlockRequest& request,
139
            std::unordered_map<int64_t, DorisVector<uint32_t>>* partition_to_rowidxs);
140
    std::shared_ptr<std::mutex> _get_partition_route_lock(int64_t partition_id);
141
142
    Status _get_current_seq(int64_t& cur_seq, const PTabletWriterAddBlockRequest& request);
143
144
    // open all writer
145
    Status _open_all_writers(const PTabletWriterOpenRequest& request);
146
147
    void _add_broken_tablet(int64_t tablet_id);
148
    // thread-unsafe, add a shared lock for `_tablet_writers_lock` if needed
149
    bool _is_broken_tablet(int64_t tablet_id) const;
150
    void _add_error_tablet(google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors,
151
                           int64_t tablet_id, Status error) const;
152
    void _build_tablet_to_rowidxs(
153
            const PTabletWriterAddBlockRequest& request,
154
            std::unordered_map<int64_t /* tablet_id */, DorisVector<uint32_t> /* row index */>*
155
                    tablet_to_rowidxs);
156
    virtual void _init_profile(RuntimeProfile* profile);
157
158
    // id of this load channel
159
    TabletsChannelKey _key;
160
161
    // protect _state change. open and close. when add_batch finished, lock to change _next_seqs also
162
    std::mutex _lock;
163
    enum State {
164
        kInitialized,
165
        kOpened,
166
        kFinished // closed or cancelled
167
    };
168
    State _state;
169
170
    UniqueId _load_id;
171
172
    // initialized in open function
173
    int64_t _txn_id = -1;
174
    int64_t _index_id = -1;
175
    std::shared_ptr<OlapTableSchemaParam> _schema;
176
    TupleDescriptor* _tuple_desc = nullptr;
177
    bool _open_by_incremental = false;
178
179
    // next sequence we expect
180
    std::set<int32_t> _recieved_senders;
181
    int _num_remaining_senders = 0;
182
    std::vector<int64_t> _next_seqs;
183
    Bitmap _closed_senders;
184
    // status to return when operate on an already closed/cancelled channel
185
    // currently it's OK.
186
    Status _close_status;
187
188
    // tablet_id -> TabletChannel. it will only be changed in open() or inc_open()
189
    std::unordered_map<int64_t, std::unique_ptr<BaseDeltaWriter>> _tablet_writers;
190
    // protect _tablet_writers
191
    std::mutex _tablet_writers_lock;
192
    // broken tablet ids.
193
    // If a tablet write fails, it's id will be added to this set.
194
    // So that following batch will not handle this tablet anymore.
195
    std::unordered_set<int64_t> _broken_tablets;
196
197
    std::shared_mutex _broken_tablets_lock;
198
199
    std::unordered_set<int64_t> _reducing_tablets;
200
201
    std::unordered_set<int64_t> _partition_ids;
202
    std::unordered_map<int64_t, RandomBucketPartitionParam> _random_bucket_partition_params;
203
    std::shared_ptr<AdaptiveRandomBucketState> _adaptive_random_bucket_state;
204
    std::mutex _partition_route_locks_lock;
205
    std::unordered_map<int64_t, std::shared_ptr<std::mutex>> _partition_route_locks;
206
207
    static std::atomic<uint64_t> _s_tablet_writer_count;
208
209
    bool _is_high_priority = false;
210
211
    RuntimeProfile* _profile = nullptr;
212
    RuntimeProfile::Counter* _add_batch_number_counter = nullptr;
213
    RuntimeProfile::HighWaterMarkCounter* _memory_usage_counter = nullptr;
214
    RuntimeProfile::HighWaterMarkCounter* _write_memory_usage_counter = nullptr;
215
    RuntimeProfile::HighWaterMarkCounter* _flush_memory_usage_counter = nullptr;
216
    RuntimeProfile::HighWaterMarkCounter* _max_tablet_memory_usage_counter = nullptr;
217
    RuntimeProfile::HighWaterMarkCounter* _max_tablet_write_memory_usage_counter = nullptr;
218
    RuntimeProfile::HighWaterMarkCounter* _max_tablet_flush_memory_usage_counter = nullptr;
219
    RuntimeProfile::Counter* _add_batch_timer = nullptr;
220
    RuntimeProfile::Counter* _write_block_timer = nullptr;
221
    RuntimeProfile::Counter* _incremental_open_timer = nullptr;
222
223
    // record rows received and filtered
224
    size_t _total_received_rows = 0;
225
    size_t _num_rows_filtered = 0;
226
};
227
228
class DeltaWriter;
229
230
// `StorageEngine` mixin for `BaseTabletsChannel`
231
class TabletsChannel final : public BaseTabletsChannel {
232
public:
233
    TabletsChannel(StorageEngine& engine, const TabletsChannelKey& key, const UniqueId& load_id,
234
                   bool is_high_priority, RuntimeProfile* profile);
235
236
    ~TabletsChannel() override;
237
238
    std::unique_ptr<BaseDeltaWriter> create_delta_writer(const WriteRequest& request) override;
239
240
    Status add_batch(const PTabletWriterAddBlockRequest& request,
241
                     PTabletWriterAddBlockResult* response) override;
242
243
    Status close(LoadChannel* parent, const PTabletWriterAddBlockRequest& req,
244
                 PTabletWriterAddBlockResult* res, bool* finished) override;
245
246
    Status cancel() override;
247
248
private:
249
    void _init_profile(RuntimeProfile* profile) override;
250
251
    // deal with DeltaWriter commit_txn(), add tablet to list for return.
252
    void _commit_txn(DeltaWriter* writer, const PTabletWriterAddBlockRequest& req,
253
                     PTabletWriterAddBlockResult* res);
254
255
    StorageEngine& _engine;
256
    bool _write_single_replica = false;
257
    RuntimeProfile::Counter* _slave_replica_timer = nullptr;
258
};
259
260
} // namespace doris