Coverage Report

Created: 2026-03-18 19:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/delta_writer_v2_pool.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
#include <brpc/controller.h>
20
#include <bthread/types.h>
21
#include <butil/errno.h>
22
#include <fmt/format.h>
23
#include <gen_cpp/PaloInternalService_types.h>
24
#include <gen_cpp/Types_types.h>
25
#include <gen_cpp/internal_service.pb.h>
26
#include <gen_cpp/types.pb.h>
27
#include <glog/logging.h>
28
#include <google/protobuf/stubs/callback.h>
29
#include <stddef.h>
30
#include <stdint.h>
31
32
#include <atomic>
33
// IWYU pragma: no_include <bits/chrono.h>
34
35
#include <chrono> // IWYU pragma: keep
36
#include <functional>
37
#include <initializer_list>
38
#include <map>
39
#include <memory>
40
#include <mutex>
41
#include <ostream>
42
#include <queue>
43
#include <set>
44
#include <string>
45
#include <unordered_map>
46
#include <unordered_set>
47
#include <utility>
48
#include <vector>
49
50
#include "common/config.h"
51
#include "util/uid_util.h"
52
53
namespace doris {
54
#include "common/compile_check_begin.h"
55
56
class DeltaWriterV2;
57
class RuntimeProfile;
58
59
class DeltaWriterV2Pool;
60
61
class DeltaWriterV2Map {
62
public:
63
    DeltaWriterV2Map(UniqueId load_id, int num_use = 1, DeltaWriterV2Pool* pool = nullptr);
64
65
    ~DeltaWriterV2Map();
66
67
    // get or create delta writer for the given tablet, memory is managed by DeltaWriterV2Map
68
    std::shared_ptr<DeltaWriterV2> get_or_create(
69
            int64_t tablet_id, std::function<std::unique_ptr<DeltaWriterV2>()> creator);
70
71
    // close all delta writers in this DeltaWriterV2Map if there is no other users
72
    Status close(std::unordered_map<int64_t, int32_t>& segments_for_tablet,
73
                 RuntimeProfile* profile = nullptr);
74
75
    // cancel all delta writers in this DeltaWriterV2Map
76
    void cancel(Status status);
77
78
1
    size_t size() const { return _map.size(); }
79
80
private:
81
    UniqueId _load_id;
82
    std::mutex _mutex;
83
    std::unordered_map<int64_t, std::shared_ptr<DeltaWriterV2>> _map;
84
    std::atomic<int> _use_cnt;
85
    DeltaWriterV2Pool* _pool = nullptr;
86
};
87
88
class DeltaWriterV2Pool {
89
public:
90
    DeltaWriterV2Pool();
91
92
    ~DeltaWriterV2Pool();
93
94
    std::shared_ptr<DeltaWriterV2Map> get_or_create(PUniqueId load_id, int num_sink = 1);
95
96
    void erase(UniqueId load_id);
97
98
4
    size_t size() {
99
4
        std::lock_guard<std::mutex> lock(_mutex);
100
4
        return _pool.size();
101
4
    }
102
103
private:
104
    std::mutex _mutex;
105
    std::unordered_map<UniqueId, std::shared_ptr<DeltaWriterV2Map>> _pool;
106
};
107
108
} // namespace doris
109
110
#include "common/compile_check_end.h"