Coverage Report

Created: 2024-11-21 23:45

/root/doris/be/src/runtime/load_stream.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <bthread/mutex.h>
21
#include <gen_cpp/olap_common.pb.h>
22
23
#include <condition_variable>
24
#include <memory>
25
#include <mutex>
26
#include <unordered_map>
27
#include <utility>
28
29
#include "brpc/stream.h"
30
#include "butil/iobuf.h"
31
#include "common/compiler_util.h" // IWYU pragma: keep
32
#include "common/status.h"
33
#include "runtime/load_stream_writer.h"
34
#include "util/runtime_profile.h"
35
36
namespace doris {
37
38
class LoadStreamMgr;
39
class ThreadPoolToken;
40
class OlapTableSchemaParam;
41
42
// origin_segid(index) -> new_segid(value in vector)
43
using SegIdMapping = std::vector<uint32_t>;
44
using FailedTablets = std::vector<std::pair<int64_t, Status>>;
45
class TabletStream {
46
public:
47
    TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id, LoadStreamMgr* load_stream_mgr,
48
                 RuntimeProfile* profile);
49
50
    Status init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t index_id,
51
                int64_t partition_id);
52
53
    Status append_data(const PStreamHeader& header, butil::IOBuf* data);
54
    Status add_segment(const PStreamHeader& header, butil::IOBuf* data);
55
16
    void add_num_segments(int64_t num_segments) { _num_segments += num_segments; }
56
0
    void disable_num_segments_check() { _check_num_segments = false; }
57
    void pre_close();
58
    Status close();
59
14
    int64_t id() const { return _id; }
60
61
    friend std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_stream);
62
63
private:
64
    Status _run_in_heavy_work_pool(std::function<Status()> fn);
65
66
    int64_t _id;
67
    LoadStreamWriterSharedPtr _load_stream_writer;
68
    std::vector<std::unique_ptr<ThreadPoolToken>> _flush_tokens;
69
    std::unordered_map<int64_t, std::unique_ptr<SegIdMapping>> _segids_mapping;
70
    std::atomic<uint32_t> _next_segid;
71
    int64_t _num_segments = 0;
72
    bool _check_num_segments = true;
73
    bthread::Mutex _lock;
74
    AtomicStatus _status;
75
    PUniqueId _load_id;
76
    int64_t _txn_id;
77
    RuntimeProfile* _profile = nullptr;
78
    RuntimeProfile::Counter* _append_data_timer = nullptr;
79
    RuntimeProfile::Counter* _add_segment_timer = nullptr;
80
    RuntimeProfile::Counter* _close_wait_timer = nullptr;
81
    LoadStreamMgr* _load_stream_mgr = nullptr;
82
};
83
84
using TabletStreamSharedPtr = std::shared_ptr<TabletStream>;
85
86
class IndexStream {
87
public:
88
    IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id,
89
                std::shared_ptr<OlapTableSchemaParam> schema, LoadStreamMgr* load_stream_mgr,
90
                RuntimeProfile* profile);
91
92
    Status append_data(const PStreamHeader& header, butil::IOBuf* data);
93
94
    void close(const std::vector<PTabletID>& tablets_to_commit,
95
               std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids);
96
97
private:
98
    void _init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
99
                             int64_t partition_id);
100
101
private:
102
    int64_t _id;
103
    std::unordered_map<int64_t /*tabletid*/, TabletStreamSharedPtr> _tablet_streams_map;
104
    bthread::Mutex _lock;
105
    PUniqueId _load_id;
106
    int64_t _txn_id;
107
    std::shared_ptr<OlapTableSchemaParam> _schema;
108
    std::unordered_map<int64_t, int64_t> _tablet_partitions;
109
    RuntimeProfile* _profile = nullptr;
110
    RuntimeProfile::Counter* _append_data_timer = nullptr;
111
    RuntimeProfile::Counter* _close_wait_timer = nullptr;
112
    LoadStreamMgr* _load_stream_mgr = nullptr;
113
};
114
using IndexStreamSharedPtr = std::shared_ptr<IndexStream>;
115
116
using StreamId = brpc::StreamId;
117
class LoadStream : public brpc::StreamInputHandler {
118
public:
119
    LoadStream(PUniqueId load_id, LoadStreamMgr* load_stream_mgr, bool enable_profile);
120
    ~LoadStream() override;
121
122
    Status init(const POpenLoadStreamRequest* request);
123
124
16
    void add_source(int64_t src_id) {
125
16
        std::lock_guard lock_guard(_lock);
126
16
        _open_streams[src_id]++;
127
16
        if (_is_incremental) {
128
0
            _total_streams++;
129
0
        }
130
16
    }
131
132
    void close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit,
133
               std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids);
134
135
    // callbacks called by brpc
136
    int on_received_messages(StreamId id, butil::IOBuf* const messages[], size_t size) override;
137
    void on_idle_timeout(StreamId id) override;
138
    void on_closed(StreamId id) override;
139
140
    friend std::ostream& operator<<(std::ostream& ostr, const LoadStream& load_stream);
141
142
private:
143
    void _parse_header(butil::IOBuf* const message, PStreamHeader& hdr);
144
    void _dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* data);
145
    Status _append_data(const PStreamHeader& header, butil::IOBuf* data);
146
147
    void _report_result(StreamId stream, const Status& status,
148
                        const std::vector<int64_t>& success_tablet_ids,
149
                        const FailedTablets& failed_tablets, bool eos);
150
    void _report_schema(StreamId stream, const PStreamHeader& hdr);
151
152
    // report failure for one message
153
20
    void _report_failure(StreamId stream, const Status& status, const PStreamHeader& header) {
154
20
        FailedTablets failed_tablets;
155
20
        if (header.has_tablet_id()) {
156
4
            failed_tablets.emplace_back(header.tablet_id(), status);
157
4
        }
158
20
        _report_result(stream, status, {}, failed_tablets, false);
159
20
    }
160
161
    Status _write_stream(StreamId stream, butil::IOBuf& buf);
162
163
private:
164
    PUniqueId _load_id;
165
    std::unordered_map<int64_t, IndexStreamSharedPtr> _index_streams_map;
166
    int32_t _total_streams = 0;
167
    int32_t _close_load_cnt = 0;
168
    std::atomic<int32_t> _close_rpc_cnt = 0;
169
    std::vector<PTabletID> _tablets_to_commit;
170
    bthread::Mutex _lock;
171
    std::unordered_map<int64_t, int32_t> _open_streams;
172
    int64_t _txn_id = 0;
173
    std::shared_ptr<OlapTableSchemaParam> _schema;
174
    bool _enable_profile = false;
175
    std::unique_ptr<RuntimeProfile> _profile;
176
    RuntimeProfile::Counter* _append_data_timer = nullptr;
177
    RuntimeProfile::Counter* _close_wait_timer = nullptr;
178
    LoadStreamMgr* _load_stream_mgr = nullptr;
179
    QueryThreadContext _query_thread_context;
180
    bool _is_incremental = false;
181
};
182
183
using LoadStreamPtr = std::unique_ptr<LoadStream>;
184
185
} // namespace doris