Coverage Report

Created: 2026-04-13 11:59

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/autoinc_buffer.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
#include <list>
20
21
#include "common/cast_set.h"
22
#include "common/config.h"
23
#include "common/factory_creator.h"
24
#include "common/status.h"
25
#include "util/threadpool.h"
26
27
namespace doris {
28
29
class VOlapTableSink;
30
class OlapTableBlockConvertor;
31
32
struct AutoIncIDAllocator {
33
65.4k
    int64_t next_id() {
34
65.4k
        DCHECK(!ids.empty());
35
65.4k
        if (ids.front().second > 0) {
36
65.4k
            --ids.front().second;
37
65.4k
            --total_count;
38
65.4k
            return ids.front().first++;
39
65.4k
        }
40
12
        ids.pop_front();
41
12
        DCHECK(!ids.empty() && ids.front().second > 0);
42
12
        --ids.front().second;
43
12
        --total_count;
44
12
        return ids.front().first++;
45
65.4k
    }
46
47
188
    void insert_ids(int64_t start, size_t length) {
48
188
        total_count += length;
49
188
        ids.emplace_back(start, length);
50
188
    }
51
52
    size_t total_count {0};
53
    std::list<std::pair<int64_t, size_t>> ids;
54
};
55
56
class AutoIncIDBuffer {
57
    ENABLE_FACTORY_CREATOR(AutoIncIDBuffer);
58
    // GenericReader::_MIN_BATCH_SIZE = 4064
59
    static constexpr size_t MIN_BATCH_SIZE = 4064;
60
61
public:
62
    // all public functions are thread safe
63
    AutoIncIDBuffer(int64_t _db_id, int64_t _table_id, int64_t column_id);
64
    void set_batch_size_at_least(size_t batch_size);
65
    Status sync_request_ids(size_t request_length, std::vector<std::pair<int64_t, size_t>>* result);
66
67
    struct AutoIncRange {
68
        int64_t start;
69
        size_t length;
70
71
188
        bool empty() const { return length == 0; }
72
73
188
        void consume(size_t l) {
74
188
            start += l;
75
188
            length -= l;
76
188
        }
77
    };
78
79
private:
80
85
    [[nodiscard]] size_t _prefetch_size() const {
81
85
        return _batch_size * config::auto_inc_prefetch_size_ratio;
82
85
    }
83
84
193
    [[nodiscard]] size_t _low_water_level_mark() const {
85
193
        return _batch_size * config::auto_inc_low_water_level_mark_size_ratio;
86
193
    };
87
88
    void _get_autoinc_ranges_from_buffers(size_t& request_length,
89
                                          std::vector<std::pair<int64_t, size_t>>* result);
90
91
    Status _launch_async_fetch_task(size_t length);
92
93
    Result<int64_t> _fetch_ids_from_fe(size_t length);
94
95
    std::atomic<size_t> _batch_size {MIN_BATCH_SIZE};
96
97
    int64_t _db_id;
98
    int64_t _table_id;
99
    int64_t _column_id;
100
101
    std::unique_ptr<ThreadPoolToken> _rpc_token;
102
    Status _rpc_status {Status::OK()};
103
104
    std::atomic<bool> _is_fetching {false};
105
106
    std::mutex _mutex;
107
108
    mutable std::mutex _latch;
109
    std::list<AutoIncRange> _buffers;
110
    size_t _current_volume {0};
111
};
112
113
class GlobalAutoIncBuffers {
114
public:
115
462
    static GlobalAutoIncBuffers* GetInstance() {
116
462
        static GlobalAutoIncBuffers buffers;
117
462
        return &buffers;
118
462
    }
119
120
3
    GlobalAutoIncBuffers() {
121
3
        static_cast<void>(ThreadPoolBuilder("AsyncFetchAutoIncIDExecutor")
122
3
                                  .set_min_threads(cast_set<int>(config::auto_inc_fetch_thread_num))
123
3
                                  .set_max_threads(cast_set<int>(config::auto_inc_fetch_thread_num))
124
3
                                  .set_max_queue_size(std::numeric_limits<int>::max())
125
3
                                  .build(&_fetch_autoinc_id_executor));
126
3
    }
127
1
    ~GlobalAutoIncBuffers() = default;
128
129
108
    std::unique_ptr<ThreadPoolToken> create_token() {
130
108
        return _fetch_autoinc_id_executor->new_token(ThreadPool::ExecutionMode::CONCURRENT);
131
108
    }
132
133
    std::shared_ptr<AutoIncIDBuffer> get_auto_inc_buffer(int64_t db_id, int64_t table_id,
134
355
                                                         int64_t column_id) {
135
355
        std::lock_guard<std::mutex> lock(_mutex);
136
355
        auto key = std::make_tuple(db_id, table_id, column_id);
137
355
        auto it = _buffers.find(key);
138
355
        if (it == _buffers.end()) {
139
108
            _buffers.emplace(key, AutoIncIDBuffer::create_shared(db_id, table_id, column_id));
140
108
        }
141
355
        return _buffers[{db_id, table_id, column_id}];
142
355
    }
143
144
private:
145
    std::unique_ptr<ThreadPool> _fetch_autoinc_id_executor;
146
    std::map<std::tuple<int64_t, int64_t, int64_t>, std::shared_ptr<AutoIncIDBuffer>> _buffers;
147
    std::mutex _mutex;
148
};
149
150
} // namespace doris