be/src/exec/sink/load_stream_map_pool.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/sink/load_stream_map_pool.h" |
19 | | |
20 | | #include "util/debug_points.h" |
21 | | |
22 | | namespace doris { |
23 | | #include "common/compile_check_begin.h" |
24 | | class TExpr; |
25 | | |
26 | | LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int num_use, |
27 | | LoadStreamMapPool* pool) |
28 | 15 | : _load_id(load_id), |
29 | 15 | _src_id(src_id), |
30 | 15 | _num_streams(num_streams), |
31 | 15 | _use_cnt(num_use), |
32 | 15 | _num_incremental_streams(0), |
33 | 15 | _pool(pool), |
34 | 15 | _tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()), |
35 | 15 | _enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()) { |
36 | 15 | DCHECK(num_streams > 0) << "stream num should be greater than 0"; |
37 | 15 | DCHECK(num_use > 0) << "use num should be greater than 0"; |
38 | 15 | } |
39 | | |
40 | 37 | std::shared_ptr<LoadStreamStubs> LoadStreamMap::get_or_create(int64_t dst_id, bool incremental) { |
41 | 37 | std::lock_guard<std::mutex> lock(_mutex); |
42 | 37 | std::shared_ptr<LoadStreamStubs> streams = _streams_for_node[dst_id]; |
43 | 37 | if (streams != nullptr) { |
44 | 1 | return streams; |
45 | 1 | } |
46 | 36 | if (incremental) { |
47 | 0 | _num_incremental_streams.fetch_add(1); |
48 | 0 | } |
49 | 36 | streams = std::make_shared<LoadStreamStubs>(_num_streams, _load_id, _src_id, |
50 | 36 | _tablet_schema_for_index, |
51 | 36 | _enable_unique_mow_for_index, incremental); |
52 | 36 | _streams_for_node[dst_id] = streams; |
53 | 36 | return streams; |
54 | 37 | } |
55 | | |
56 | 0 | std::shared_ptr<LoadStreamStubs> LoadStreamMap::at(int64_t dst_id) { |
57 | 0 | std::lock_guard<std::mutex> lock(_mutex); |
58 | 0 | return _streams_for_node.at(dst_id); |
59 | 0 | } |
60 | | |
61 | 0 | bool LoadStreamMap::contains(int64_t dst_id) { |
62 | 0 | std::lock_guard<std::mutex> lock(_mutex); |
63 | 0 | return _streams_for_node.contains(dst_id); |
64 | 0 | } |
65 | | |
66 | 13 | void LoadStreamMap::for_each(std::function<void(int64_t, LoadStreamStubs&)> fn) { |
67 | 13 | decltype(_streams_for_node) snapshot; |
68 | 13 | { |
69 | 13 | std::lock_guard<std::mutex> lock(_mutex); |
70 | 13 | snapshot = _streams_for_node; |
71 | 13 | } |
72 | 33 | for (auto& [dst_id, streams] : snapshot) { |
73 | 33 | fn(dst_id, *streams); |
74 | 33 | } |
75 | 13 | } |
76 | | |
77 | 0 | Status LoadStreamMap::for_each_st(std::function<Status(int64_t, LoadStreamStubs&)> fn) { |
78 | 0 | decltype(_streams_for_node) snapshot; |
79 | 0 | { |
80 | 0 | std::lock_guard<std::mutex> lock(_mutex); |
81 | 0 | snapshot = _streams_for_node; |
82 | 0 | } |
83 | 0 | Status status = Status::OK(); |
84 | 0 | for (auto& [dst_id, streams] : snapshot) { |
85 | 0 | auto st = fn(dst_id, *streams); |
86 | 0 | if (!st.ok() && status.ok()) { |
87 | 0 | status = st; |
88 | 0 | } |
89 | 0 | } |
90 | 0 | return status; |
91 | 0 | } |
92 | | |
93 | | void LoadStreamMap::save_tablets_to_commit(int64_t dst_id, |
94 | 0 | const std::vector<PTabletID>& tablets_to_commit) { |
95 | 0 | std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex); |
96 | 0 | auto& tablets = _tablets_to_commit[dst_id]; |
97 | 0 | for (const auto& tablet : tablets_to_commit) { |
98 | 0 | tablets.emplace(tablet.tablet_id(), tablet); |
99 | 0 | } |
100 | 0 | } |
101 | | |
102 | 3 | bool LoadStreamMap::release() { |
103 | 3 | int num_use = --_use_cnt; |
104 | 3 | if (num_use == 0) { |
105 | 2 | LOG(INFO) << "releasing streams, load_id=" << _load_id; |
106 | 2 | _pool->erase(_load_id); |
107 | 2 | return true; |
108 | 2 | } |
109 | 3 | LOG(INFO) << "keeping streams, load_id=" << _load_id << ", use_cnt=" << num_use; |
110 | 1 | return false; |
111 | 3 | } |
112 | | |
113 | 0 | void LoadStreamMap::close_load(bool incremental) { |
114 | 0 | for (auto& [dst_id, streams] : _streams_for_node) { |
115 | 0 | if (streams->is_incremental() != incremental) { |
116 | 0 | continue; |
117 | 0 | } |
118 | 0 | std::vector<PTabletID> tablets_to_commit; |
119 | 0 | const auto& tablets = _tablets_to_commit[dst_id]; |
120 | 0 | tablets_to_commit.reserve(tablets.size()); |
121 | 0 | for (const auto& [tablet_id, tablet] : tablets) { |
122 | 0 | tablets_to_commit.push_back(tablet); |
123 | 0 | tablets_to_commit.back().set_num_segments(_segments_for_tablet[tablet_id]); |
124 | 0 | } |
125 | 0 | auto st = streams->close_load(tablets_to_commit, _num_incremental_streams.load()); |
126 | 0 | if (!st.ok()) { |
127 | 0 | LOG(WARNING) << "close_load for " << (incremental ? "incremental" : "non-incremental") |
128 | 0 | << " streams failed: " << st << ", load_id=" << _load_id; |
129 | 0 | } |
130 | 0 | } |
131 | 0 | } |
132 | | |
133 | 1 | LoadStreamMapPool::LoadStreamMapPool() = default; |
134 | | |
135 | 1 | LoadStreamMapPool::~LoadStreamMapPool() = default; |
136 | | std::shared_ptr<LoadStreamMap> LoadStreamMapPool::get_or_create(UniqueId load_id, int64_t src_id, |
137 | 3 | int num_streams, int num_use) { |
138 | 3 | std::lock_guard<std::mutex> lock(_mutex); |
139 | 3 | std::shared_ptr<LoadStreamMap> streams = _pool[load_id]; |
140 | 3 | if (streams != nullptr) { |
141 | 1 | return streams; |
142 | 1 | } |
143 | 2 | streams = std::make_shared<LoadStreamMap>(load_id, src_id, num_streams, num_use, this); |
144 | 2 | _pool[load_id] = streams; |
145 | 2 | return streams; |
146 | 3 | } |
147 | | |
148 | 2 | void LoadStreamMapPool::erase(UniqueId load_id) { |
149 | 2 | std::lock_guard<std::mutex> lock(_mutex); |
150 | 2 | _pool.erase(load_id); |
151 | 2 | } |
152 | | |
153 | | } // namespace doris |