be/src/exec/sink/delta_writer_v2_pool.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/sink/delta_writer_v2_pool.h" |
19 | | |
20 | | #include "load/delta_writer/delta_writer_v2.h" |
21 | | #include "runtime/runtime_profile.h" |
22 | | |
23 | | namespace doris { |
24 | | #include "common/compile_check_begin.h" |
25 | | class TExpr; |
26 | | |
27 | | DeltaWriterV2Map::DeltaWriterV2Map(UniqueId load_id, int num_use, DeltaWriterV2Pool* pool) |
28 | 3 | : _load_id(load_id), _use_cnt(num_use), _pool(pool) {} |
29 | | |
30 | 3 | DeltaWriterV2Map::~DeltaWriterV2Map() = default; |
31 | | |
32 | | std::shared_ptr<DeltaWriterV2> DeltaWriterV2Map::get_or_create( |
33 | 3 | int64_t tablet_id, std::function<std::unique_ptr<DeltaWriterV2>()> creator) { |
34 | 3 | std::lock_guard lock(_mutex); |
35 | 3 | if (_map.contains(tablet_id)) { |
36 | 1 | return _map.at(tablet_id); |
37 | 1 | } |
38 | 2 | std::shared_ptr<DeltaWriterV2> writer = creator(); |
39 | 2 | if (writer != nullptr) { |
40 | 2 | _map[tablet_id] = writer; |
41 | 2 | } |
42 | 2 | return writer; |
43 | 3 | } |
44 | | |
45 | | Status DeltaWriterV2Map::close(std::unordered_map<int64_t, int32_t>& segments_for_tablet, |
46 | 4 | RuntimeProfile* profile) { |
47 | 4 | int num_use = --_use_cnt; |
48 | 4 | if (num_use > 0) { |
49 | 0 | LOG(INFO) << "keeping DeltaWriterV2Map, load_id=" << _load_id << " , use_cnt=" << num_use; |
50 | 0 | return Status::OK(); |
51 | 0 | } |
52 | 4 | if (_pool != nullptr) { |
53 | 4 | _pool->erase(_load_id); |
54 | 4 | } |
55 | 4 | LOG(INFO) << "closing DeltaWriterV2Map, load_id=" << _load_id; |
56 | 4 | std::lock_guard lock(_mutex); |
57 | 4 | for (auto& [_, writer] : _map) { |
58 | 1 | RETURN_IF_ERROR(writer->close()); |
59 | 1 | } |
60 | 4 | LOG(INFO) << "close-waiting DeltaWriterV2Map, load_id=" << _load_id; |
61 | 3 | for (auto& [tablet_id, writer] : _map) { |
62 | 0 | int32_t num_segments; |
63 | 0 | RETURN_IF_ERROR(writer->close_wait(num_segments, profile)); |
64 | 0 | segments_for_tablet[tablet_id] = num_segments; |
65 | 0 | } |
66 | 3 | return Status::OK(); |
67 | 3 | } |
68 | | |
69 | 0 | void DeltaWriterV2Map::cancel(Status status) { |
70 | 0 | int num_use = --_use_cnt; |
71 | 0 | LOG(INFO) << "cancelling DeltaWriterV2Map " << _load_id << ", use_cnt=" << num_use; |
72 | 0 | if (num_use == 0 && _pool != nullptr) { |
73 | 0 | _pool->erase(_load_id); |
74 | 0 | } |
75 | 0 | std::lock_guard lock(_mutex); |
76 | 0 | for (auto& [_, writer] : _map) { |
77 | 0 | static_cast<void>(writer->cancel_with_status(status)); |
78 | 0 | } |
79 | 0 | } |
80 | | |
81 | 2 | DeltaWriterV2Pool::DeltaWriterV2Pool() = default; |
82 | | |
83 | 2 | DeltaWriterV2Pool::~DeltaWriterV2Pool() = default; |
84 | | |
85 | | std::shared_ptr<DeltaWriterV2Map> DeltaWriterV2Pool::get_or_create(PUniqueId load_id, |
86 | 4 | int num_sink) { |
87 | 4 | UniqueId id {load_id}; |
88 | 4 | std::lock_guard<std::mutex> lock(_mutex); |
89 | 4 | std::shared_ptr<DeltaWriterV2Map> map = _pool[id]; |
90 | 4 | if (map) { |
91 | 1 | return map; |
92 | 1 | } |
93 | 3 | map = std::make_shared<DeltaWriterV2Map>(id, num_sink, this); |
94 | 3 | _pool[id] = map; |
95 | 3 | return map; |
96 | 4 | } |
97 | | |
98 | 4 | void DeltaWriterV2Pool::erase(UniqueId load_id) { |
99 | 4 | std::lock_guard<std::mutex> lock(_mutex); |
100 | | LOG(INFO) << "erasing DeltaWriterV2Map, load_id=" << load_id; |
101 | 4 | _pool.erase(load_id); |
102 | 4 | } |
103 | | |
104 | | } // namespace doris |