Coverage Report

Created: 2026-03-12 14:02

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/autoinc_buffer.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
#include <list>
20
21
#include "common/cast_set.h"
22
#include "common/config.h"
23
#include "common/factory_creator.h"
24
#include "common/status.h"
25
#include "util/threadpool.h"
26
27
namespace doris {
28
#include "common/compile_check_begin.h"
29
30
class VOlapTableSink;
31
class OlapTableBlockConvertor;
32
33
struct AutoIncIDAllocator {
34
26
    int64_t next_id() {
35
26
        DCHECK(!ids.empty());
36
26
        if (ids.front().second > 0) {
37
26
            --ids.front().second;
38
26
            --total_count;
39
26
            return ids.front().first++;
40
26
        }
41
0
        ids.pop_front();
42
0
        DCHECK(!ids.empty() && ids.front().second > 0);
43
0
        --ids.front().second;
44
0
        --total_count;
45
0
        return ids.front().first++;
46
26
    }
47
48
4
    void insert_ids(int64_t start, size_t length) {
49
4
        total_count += length;
50
4
        ids.emplace_back(start, length);
51
4
    }
52
53
    size_t total_count {0};
54
    std::list<std::pair<int64_t, size_t>> ids;
55
};
56
57
class AutoIncIDBuffer {
58
    ENABLE_FACTORY_CREATOR(AutoIncIDBuffer);
59
    // GenericReader::_MIN_BATCH_SIZE = 4064
60
    static constexpr size_t MIN_BATCH_SIZE = 4064;
61
62
public:
63
    // all public functions are thread safe
64
    AutoIncIDBuffer(int64_t _db_id, int64_t _table_id, int64_t column_id);
65
    void set_batch_size_at_least(size_t batch_size);
66
    Status sync_request_ids(size_t request_length, std::vector<std::pair<int64_t, size_t>>* result);
67
68
    struct AutoIncRange {
69
        int64_t start;
70
        size_t length;
71
72
4
        bool empty() const { return length == 0; }
73
74
4
        void consume(size_t l) {
75
4
            start += l;
76
4
            length -= l;
77
4
        }
78
    };
79
80
private:
81
4
    [[nodiscard]] size_t _prefetch_size() const {
82
4
        return _batch_size * config::auto_inc_prefetch_size_ratio;
83
4
    }
84
85
4
    [[nodiscard]] size_t _low_water_level_mark() const {
86
4
        return _batch_size * config::auto_inc_low_water_level_mark_size_ratio;
87
4
    };
88
89
    void _get_autoinc_ranges_from_buffers(size_t& request_length,
90
                                          std::vector<std::pair<int64_t, size_t>>* result);
91
92
    Status _launch_async_fetch_task(size_t length);
93
94
    Result<int64_t> _fetch_ids_from_fe(size_t length);
95
96
    std::atomic<size_t> _batch_size {MIN_BATCH_SIZE};
97
98
    int64_t _db_id;
99
    int64_t _table_id;
100
    int64_t _column_id;
101
102
    std::unique_ptr<ThreadPoolToken> _rpc_token;
103
    Status _rpc_status {Status::OK()};
104
105
    std::atomic<bool> _is_fetching {false};
106
107
    std::mutex _mutex;
108
109
    mutable std::mutex _latch;
110
    std::list<AutoIncRange> _buffers;
111
    size_t _current_volume {0};
112
};
113
114
class GlobalAutoIncBuffers {
115
public:
116
12
    static GlobalAutoIncBuffers* GetInstance() {
117
12
        static GlobalAutoIncBuffers buffers;
118
12
        return &buffers;
119
12
    }
120
121
2
    GlobalAutoIncBuffers() {
122
2
        static_cast<void>(ThreadPoolBuilder("AsyncFetchAutoIncIDExecutor")
123
2
                                  .set_min_threads(cast_set<int>(config::auto_inc_fetch_thread_num))
124
2
                                  .set_max_threads(cast_set<int>(config::auto_inc_fetch_thread_num))
125
2
                                  .set_max_queue_size(std::numeric_limits<int>::max())
126
2
                                  .build(&_fetch_autoinc_id_executor));
127
2
    }
128
1
    ~GlobalAutoIncBuffers() = default;
129
130
4
    std::unique_ptr<ThreadPoolToken> create_token() {
131
4
        return _fetch_autoinc_id_executor->new_token(ThreadPool::ExecutionMode::CONCURRENT);
132
4
    }
133
134
    std::shared_ptr<AutoIncIDBuffer> get_auto_inc_buffer(int64_t db_id, int64_t table_id,
135
8
                                                         int64_t column_id) {
136
8
        std::lock_guard<std::mutex> lock(_mutex);
137
8
        auto key = std::make_tuple(db_id, table_id, column_id);
138
8
        auto it = _buffers.find(key);
139
8
        if (it == _buffers.end()) {
140
4
            _buffers.emplace(key, AutoIncIDBuffer::create_shared(db_id, table_id, column_id));
141
4
        }
142
8
        return _buffers[{db_id, table_id, column_id}];
143
8
    }
144
145
private:
146
    std::unique_ptr<ThreadPool> _fetch_autoinc_id_executor;
147
    std::map<std::tuple<int64_t, int64_t, int64_t>, std::shared_ptr<AutoIncIDBuffer>> _buffers;
148
    std::mutex _mutex;
149
};
150
151
} // namespace doris
152
#include "common/compile_check_end.h"