Coverage Report

Created: 2026-04-16 15:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/delta_writer_v2_pool.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/delta_writer_v2_pool.h"
19
20
#include "load/delta_writer/delta_writer_v2.h"
21
#include "runtime/runtime_profile.h"
22
23
namespace doris {
24
class TExpr;
25
26
DeltaWriterV2Map::DeltaWriterV2Map(UniqueId load_id, int num_use, DeltaWriterV2Pool* pool)
27
3
        : _load_id(load_id), _use_cnt(num_use), _pool(pool) {}
28
29
3
DeltaWriterV2Map::~DeltaWriterV2Map() = default;
30
31
std::shared_ptr<DeltaWriterV2> DeltaWriterV2Map::get_or_create(
32
3
        int64_t tablet_id, std::function<std::unique_ptr<DeltaWriterV2>()> creator) {
33
3
    std::lock_guard lock(_mutex);
34
3
    if (_map.contains(tablet_id)) {
35
1
        return _map.at(tablet_id);
36
1
    }
37
2
    std::shared_ptr<DeltaWriterV2> writer = creator();
38
2
    if (writer != nullptr) {
39
2
        _map[tablet_id] = writer;
40
2
    }
41
2
    return writer;
42
3
}
43
44
Status DeltaWriterV2Map::close(std::unordered_map<int64_t, int32_t>& segments_for_tablet,
45
4
                               RuntimeProfile* profile) {
46
4
    int num_use = --_use_cnt;
47
4
    if (num_use > 0) {
48
0
        LOG(INFO) << "keeping DeltaWriterV2Map, load_id=" << _load_id << " , use_cnt=" << num_use;
49
0
        return Status::OK();
50
0
    }
51
4
    if (_pool != nullptr) {
52
4
        _pool->erase(_load_id);
53
4
    }
54
4
    LOG(INFO) << "closing DeltaWriterV2Map, load_id=" << _load_id;
55
4
    std::lock_guard lock(_mutex);
56
4
    for (auto& [_, writer] : _map) {
57
1
        RETURN_IF_ERROR(writer->close());
58
1
    }
59
4
    LOG(INFO) << "close-waiting DeltaWriterV2Map, load_id=" << _load_id;
60
3
    for (auto& [tablet_id, writer] : _map) {
61
0
        int32_t num_segments;
62
0
        RETURN_IF_ERROR(writer->close_wait(num_segments, profile));
63
0
        segments_for_tablet[tablet_id] = num_segments;
64
0
    }
65
3
    return Status::OK();
66
3
}
67
68
0
void DeltaWriterV2Map::cancel(Status status) {
69
0
    int num_use = --_use_cnt;
70
0
    LOG(INFO) << "cancelling DeltaWriterV2Map " << _load_id << ", use_cnt=" << num_use;
71
0
    if (num_use == 0 && _pool != nullptr) {
72
0
        _pool->erase(_load_id);
73
0
    }
74
0
    std::lock_guard lock(_mutex);
75
0
    for (auto& [_, writer] : _map) {
76
0
        static_cast<void>(writer->cancel_with_status(status));
77
0
    }
78
0
}
79
80
2
DeltaWriterV2Pool::DeltaWriterV2Pool() = default;
81
82
2
DeltaWriterV2Pool::~DeltaWriterV2Pool() = default;
83
84
std::shared_ptr<DeltaWriterV2Map> DeltaWriterV2Pool::get_or_create(PUniqueId load_id,
85
4
                                                                   int num_sink) {
86
4
    UniqueId id {load_id};
87
4
    std::lock_guard<std::mutex> lock(_mutex);
88
4
    std::shared_ptr<DeltaWriterV2Map> map = _pool[id];
89
4
    if (map) {
90
1
        return map;
91
1
    }
92
3
    map = std::make_shared<DeltaWriterV2Map>(id, num_sink, this);
93
3
    _pool[id] = map;
94
3
    return map;
95
4
}
96
97
4
void DeltaWriterV2Pool::erase(UniqueId load_id) {
98
4
    std::lock_guard<std::mutex> lock(_mutex);
99
    LOG(INFO) << "erasing DeltaWriterV2Map, load_id=" << load_id;
100
4
    _pool.erase(load_id);
101
4
}
102
103
} // namespace doris