Coverage Report

Created: 2026-03-12 17:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exprs/function/dictionary_factory.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include <gen_cpp/BackendService_types.h>
19
20
#include <mutex>
21
22
#include "common/config.h"
23
#include "common/logging.h"
24
#include "exprs/function/dictionary.h"
25
26
namespace doris {
27
class MemTrackerLimiter;
28
}
29
namespace doris {
30
31
class DictionaryFactory : private boost::noncopyable {
32
public:
33
    DictionaryFactory();
34
    ~DictionaryFactory();
35
36
    // Returns nullptr if failed
37
97
    std::shared_ptr<const IDictionary> get(int64_t dict_id, int64_t version_id) {
38
97
        std::unique_lock lc(_mutex);
39
        // dict_id and version_id must match
40
97
        if (_dict_id_to_dict_map.contains(dict_id) &&
41
97
            _dict_id_to_version_id_map[dict_id] == version_id) {
42
95
            return _dict_id_to_dict_map[dict_id];
43
95
        }
44
2
        return nullptr;
45
97
    }
46
47
98
    Status refresh_dict(int64_t dict_id, int64_t version_id, DictionaryPtr dict) {
48
98
        VLOG_DEBUG << "DictionaryFactory refresh dictionary"
49
0
                   << " dict_id: " << dict_id << " version_id: " << version_id
50
0
                   << " dict name: " << dict->dict_name();
51
98
        std::unique_lock lc(_mutex);
52
98
        dict->_mem_tracker = _mem_tracker;
53
98
        _refreshing_dict_map[dict_id] = std::make_pair(version_id, dict);
54
        // Set the mem tracker for the dictionary
55
98
        return Status::OK();
56
98
    }
57
58
6
    Status abort_refresh_dict(int64_t dict_id, int64_t version_id) {
59
6
        VLOG_DEBUG << "DictionaryFactory abort refresh dictionary"
60
0
                   << " dict_id: " << dict_id << " version_id: " << version_id;
61
6
        std::unique_lock lc(_mutex);
62
6
        if (!_refreshing_dict_map.contains(dict_id)) {
63
            // FE will abort all, including succeed and failed.
64
1
            return Status::OK();
65
1
        }
66
5
        auto [refresh_version_id, dict] = _refreshing_dict_map[dict_id];
67
5
        if (version_id != refresh_version_id) {
68
1
            return Status::InvalidArgument(
69
1
                    "Version ID is not equal to the refreshing version ID. {} : {}", version_id,
70
1
                    refresh_version_id);
71
1
        }
72
4
        _refreshing_dict_map.erase(dict_id);
73
4
        return Status::OK();
74
5
    }
75
76
94
    Status commit_refresh_dict(int64_t dict_id, int64_t version_id) {
77
94
        VLOG_DEBUG << "DictionaryFactory commit refresh dictionary"
78
0
                   << " dict_id: " << dict_id << " version_id: " << version_id;
79
94
        std::unique_lock lc(_mutex);
80
94
        if (!_refreshing_dict_map.contains(dict_id)) {
81
1
            return Status::InvalidArgument("Dictionary is not refreshing dict_id: {}", dict_id);
82
1
        }
83
93
        auto [refresh_version_id, dict] = _refreshing_dict_map[dict_id];
84
93
        if (version_id != refresh_version_id) {
85
2
            return Status::InvalidArgument(
86
2
                    "Version ID is not equal to the refreshing version ID. {} : {}", version_id,
87
2
                    refresh_version_id);
88
2
        }
89
91
        {
90
            // commit the dictionary
91
91
            if (_dict_id_to_version_id_map.contains(dict_id)) {
92
                // check version_id
93
13
                if (version_id <= _dict_id_to_version_id_map[dict_id]) {
94
1
                    LOG_WARNING(
95
1
                            "DictionaryFactory Failed to commit dictionary because version ID "
96
1
                            "is not greater than the existing version ID")
97
1
                            .tag("dict_id", dict_id)
98
1
                            .tag("version_id", version_id)
99
1
                            .tag("dict name", dict->dict_name())
100
1
                            .tag("existing version ID", _dict_id_to_version_id_map[dict_id]);
101
1
                    return Status::InvalidArgument(
102
1
                            "Version ID is not greater than the existing version ID for the "
103
1
                            "dictionary. {} : {}",
104
1
                            version_id, _dict_id_to_version_id_map[dict_id]);
105
1
                }
106
13
            }
107
90
            LOG_INFO("DictionaryFactory Successfully commit dictionary")
108
90
                    .tag("dict_id", dict_id)
109
90
                    .tag("version_id", version_id)
110
90
                    .tag("dict name", dict->dict_name());
111
90
            _dict_id_to_dict_map[dict_id] = dict;
112
90
            _dict_id_to_version_id_map[dict_id] = version_id;
113
90
            _refreshing_dict_map.erase(dict_id);
114
90
        }
115
0
        return Status::OK();
116
91
    }
117
118
3
    Status delete_dict(int64_t dict_id) {
119
3
        VLOG_DEBUG << "DictionaryFactory delete dictionary, dict_id: " << dict_id;
120
3
        std::unique_lock lc(_mutex);
121
3
        if (!_dict_id_to_dict_map.contains(dict_id)) {
122
0
            LOG_WARNING("DictionaryFactory Failed to delete dictionary").tag("dict_id", dict_id);
123
0
            return Status::OK();
124
0
        }
125
3
        auto dict = _dict_id_to_dict_map[dict_id];
126
3
        LOG_INFO("DictionaryFactory Successfully delete dictionary")
127
3
                .tag("dict_id", dict_id)
128
3
                .tag("dict name", dict->dict_name());
129
3
        _dict_id_to_dict_map.erase(dict_id);
130
3
        _dict_id_to_version_id_map.erase(dict_id);
131
3
        return Status::OK();
132
3
    }
133
134
272
    std::shared_ptr<MemTrackerLimiter> mem_tracker() const { return _mem_tracker; }
135
136
    void get_dictionary_status(std::vector<TDictionaryStatus>& result,
137
                               std::vector<int64_t> dict_ids);
138
139
private:
140
    std::map<int64_t, DictionaryPtr> _dict_id_to_dict_map;
141
    std::map<int64_t, int64_t> _dict_id_to_version_id_map;
142
143
    std::map<int64_t, std::pair<int64_t, DictionaryPtr>>
144
            _refreshing_dict_map; // dict_id -> (version_id, dict)
145
146
    std::shared_mutex _mutex;
147
148
    std::shared_ptr<MemTrackerLimiter> _mem_tracker;
149
};
150
151
} // namespace doris