Coverage Report

Created: 2026-03-14 20:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/delta_writer_v2_pool.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/delta_writer_v2_pool.h"
19
20
#include "load/delta_writer/delta_writer_v2.h"
21
#include "runtime/runtime_profile.h"
22
23
namespace doris {
24
#include "common/compile_check_begin.h"
25
class TExpr;
26
27
DeltaWriterV2Map::DeltaWriterV2Map(UniqueId load_id, int num_use, DeltaWriterV2Pool* pool)
28
3
        : _load_id(load_id), _use_cnt(num_use), _pool(pool) {}
29
30
3
DeltaWriterV2Map::~DeltaWriterV2Map() = default;
31
32
std::shared_ptr<DeltaWriterV2> DeltaWriterV2Map::get_or_create(
33
3
        int64_t tablet_id, std::function<std::unique_ptr<DeltaWriterV2>()> creator) {
34
3
    std::lock_guard lock(_mutex);
35
3
    if (_map.contains(tablet_id)) {
36
1
        return _map.at(tablet_id);
37
1
    }
38
2
    std::shared_ptr<DeltaWriterV2> writer = creator();
39
2
    if (writer != nullptr) {
40
2
        _map[tablet_id] = writer;
41
2
    }
42
2
    return writer;
43
3
}
44
45
Status DeltaWriterV2Map::close(std::unordered_map<int64_t, int32_t>& segments_for_tablet,
46
4
                               RuntimeProfile* profile) {
47
4
    int num_use = --_use_cnt;
48
4
    if (num_use > 0) {
49
0
        LOG(INFO) << "keeping DeltaWriterV2Map, load_id=" << _load_id << " , use_cnt=" << num_use;
50
0
        return Status::OK();
51
0
    }
52
4
    if (_pool != nullptr) {
53
4
        _pool->erase(_load_id);
54
4
    }
55
4
    LOG(INFO) << "closing DeltaWriterV2Map, load_id=" << _load_id;
56
4
    std::lock_guard lock(_mutex);
57
4
    for (auto& [_, writer] : _map) {
58
1
        RETURN_IF_ERROR(writer->close());
59
1
    }
60
4
    LOG(INFO) << "close-waiting DeltaWriterV2Map, load_id=" << _load_id;
61
3
    for (auto& [tablet_id, writer] : _map) {
62
0
        int32_t num_segments;
63
0
        RETURN_IF_ERROR(writer->close_wait(num_segments, profile));
64
0
        segments_for_tablet[tablet_id] = num_segments;
65
0
    }
66
3
    return Status::OK();
67
3
}
68
69
0
void DeltaWriterV2Map::cancel(Status status) {
70
0
    int num_use = --_use_cnt;
71
0
    LOG(INFO) << "cancelling DeltaWriterV2Map " << _load_id << ", use_cnt=" << num_use;
72
0
    if (num_use == 0 && _pool != nullptr) {
73
0
        _pool->erase(_load_id);
74
0
    }
75
0
    std::lock_guard lock(_mutex);
76
0
    for (auto& [_, writer] : _map) {
77
0
        static_cast<void>(writer->cancel_with_status(status));
78
0
    }
79
0
}
80
81
2
DeltaWriterV2Pool::DeltaWriterV2Pool() = default;
82
83
2
DeltaWriterV2Pool::~DeltaWriterV2Pool() = default;
84
85
std::shared_ptr<DeltaWriterV2Map> DeltaWriterV2Pool::get_or_create(PUniqueId load_id,
86
4
                                                                   int num_sink) {
87
4
    UniqueId id {load_id};
88
4
    std::lock_guard<std::mutex> lock(_mutex);
89
4
    std::shared_ptr<DeltaWriterV2Map> map = _pool[id];
90
4
    if (map) {
91
1
        return map;
92
1
    }
93
3
    map = std::make_shared<DeltaWriterV2Map>(id, num_sink, this);
94
3
    _pool[id] = map;
95
3
    return map;
96
4
}
97
98
4
void DeltaWriterV2Pool::erase(UniqueId load_id) {
99
4
    std::lock_guard<std::mutex> lock(_mutex);
100
    LOG(INFO) << "erasing DeltaWriterV2Map, load_id=" << load_id;
101
4
    _pool.erase(load_id);
102
4
}
103
104
} // namespace doris