/root/doris/be/src/olap/options.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "olap/options.h" |
19 | | |
20 | | #include <ctype.h> |
21 | | #include <rapidjson/document.h> |
22 | | #include <rapidjson/encodings.h> |
23 | | #include <rapidjson/rapidjson.h> |
24 | | #include <stdlib.h> |
25 | | |
26 | | #include <algorithm> |
27 | | #include <memory> |
28 | | #include <ostream> |
29 | | |
30 | | #include "common/config.h" |
31 | | #include "common/logging.h" |
32 | | #include "common/status.h" |
33 | | #include "gutil/strings/split.h" |
34 | | #include "gutil/strings/strip.h" |
35 | | #include "io/fs/local_file_system.h" |
36 | | #include "olap/olap_define.h" |
37 | | #include "olap/utils.h" |
38 | | #include "util/path_util.h" |
39 | | #include "util/string_util.h" |
40 | | |
41 | | namespace doris { |
42 | | using namespace ErrorCode; |
43 | | |
44 | | using std::string; |
45 | | using std::vector; |
46 | | |
47 | | static std::string CAPACITY_UC = "CAPACITY"; |
48 | | static std::string MEDIUM_UC = "MEDIUM"; |
49 | | static std::string SSD_UC = "SSD"; |
50 | | static std::string HDD_UC = "HDD"; |
51 | | static std::string REMOTE_CACHE_UC = "REMOTE_CACHE"; |
52 | | |
53 | | static std::string CACHE_PATH = "path"; |
54 | | static std::string CACHE_TOTAL_SIZE = "total_size"; |
55 | | static std::string CACHE_QUERY_LIMIT_SIZE = "query_limit"; |
56 | | static std::string CACHE_NORMAL_PERCENT = "normal_percent"; |
57 | | static std::string CACHE_DISPOSABLE_PERCENT = "disposable_percent"; |
58 | | static std::string CACHE_INDEX_PERCENT = "index_percent"; |
59 | | |
60 | | // TODO: should be a general util method |
61 | | // static std::string to_upper(const std::string& str) { |
62 | | // std::string out = str; |
63 | | // std::transform(out.begin(), out.end(), out.begin(), [](auto c) { return std::toupper(c); }); |
64 | | // return out; |
65 | | // } |
66 | | |
67 | | // Currently, both of three following formats are supported(see be.conf), remote cache is the |
68 | | // local cache path for remote storage. |
69 | | // format 1: /home/disk1/palo.HDD,50 |
70 | | // format 2: /home/disk1/palo,medium:ssd,capacity:50 |
71 | | // remote cache format: /home/disk/palo/cache,medium:remote_cache,capacity:50 |
72 | 16 | Status parse_root_path(const string& root_path, StorePath* path) { |
73 | 16 | std::vector<string> tmp_vec = strings::Split(root_path, ",", strings::SkipWhitespace()); |
74 | | |
75 | | // parse root path name |
76 | 16 | StripWhiteSpace(&tmp_vec[0]); |
77 | 16 | tmp_vec[0].erase(tmp_vec[0].find_last_not_of('/') + 1); |
78 | 16 | if (tmp_vec[0].empty() || tmp_vec[0][0] != '/') { |
79 | 0 | return Status::Error<INVALID_ARGUMENT>("invalid store path. path={}", tmp_vec[0]); |
80 | 0 | } |
81 | | |
82 | 16 | string canonicalized_path; |
83 | 16 | RETURN_IF_ERROR(io::global_local_filesystem()->canonicalize(tmp_vec[0], &canonicalized_path)); |
84 | 16 | path->path = tmp_vec[0]; |
85 | | |
86 | | // parse root path capacity and storage medium |
87 | 16 | string capacity_str; |
88 | 16 | string medium_str = HDD_UC; |
89 | | |
90 | 16 | string extension = path_util::file_extension(canonicalized_path); |
91 | 16 | if (!extension.empty()) { |
92 | 5 | medium_str = to_upper(extension.substr(1)); |
93 | 5 | } |
94 | | |
95 | 30 | for (int i = 1; i < tmp_vec.size(); i++) { |
96 | | // <property>:<value> or <value> |
97 | 14 | string property; |
98 | 14 | string value; |
99 | 14 | std::pair<string, string> pair = |
100 | 14 | strings::Split(tmp_vec[i], strings::delimiter::Limit(":", 1)); |
101 | 14 | if (pair.second.empty()) { |
102 | | // format_1: <value> only supports setting capacity |
103 | 4 | property = CAPACITY_UC; |
104 | 4 | value = tmp_vec[i]; |
105 | 10 | } else { |
106 | | // format_2 |
107 | 10 | property = to_upper(pair.first); |
108 | 10 | value = pair.second; |
109 | 10 | } |
110 | | |
111 | 14 | StripWhiteSpace(&property); |
112 | 14 | StripWhiteSpace(&value); |
113 | 14 | if (property == CAPACITY_UC) { |
114 | 8 | capacity_str = value; |
115 | 8 | } else if (property == MEDIUM_UC) { |
116 | | // property 'medium' has a higher priority than the extension of |
117 | | // path, so it can override medium_str |
118 | 6 | medium_str = to_upper(value); |
119 | 6 | } else { |
120 | 0 | return Status::Error<INVALID_ARGUMENT>("invalid property of store path, {}", |
121 | 0 | tmp_vec[i]); |
122 | 0 | } |
123 | 14 | } |
124 | | |
125 | 16 | path->capacity_bytes = -1; |
126 | 16 | if (!capacity_str.empty()) { |
127 | 6 | if (!valid_signed_number<int64_t>(capacity_str) || |
128 | 6 | strtol(capacity_str.c_str(), nullptr, 10) < 0) { |
129 | 0 | LOG(WARNING) << "invalid capacity of store path, capacity=" << capacity_str; |
130 | 0 | return Status::Error<INVALID_ARGUMENT>("invalid capacity of store path, capacity={}", |
131 | 0 | capacity_str); |
132 | 0 | } |
133 | 6 | path->capacity_bytes = strtol(capacity_str.c_str(), nullptr, 10) * GB_EXCHANGE_BYTE; |
134 | 6 | } |
135 | | |
136 | 16 | path->storage_medium = TStorageMedium::HDD; |
137 | 16 | if (!medium_str.empty()) { |
138 | 16 | if (medium_str == SSD_UC) { |
139 | 7 | path->storage_medium = TStorageMedium::SSD; |
140 | 9 | } else if (medium_str == HDD_UC) { |
141 | 9 | path->storage_medium = TStorageMedium::HDD; |
142 | 9 | } else if (medium_str == REMOTE_CACHE_UC) { |
143 | 0 | path->storage_medium = TStorageMedium::REMOTE_CACHE; |
144 | 0 | } else { |
145 | 0 | return Status::Error<INVALID_ARGUMENT>("invalid storage medium. medium={}", medium_str); |
146 | 0 | } |
147 | 16 | } |
148 | | |
149 | 16 | return Status::OK(); |
150 | 16 | } |
151 | | |
152 | 5 | Status parse_conf_store_paths(const string& config_path, std::vector<StorePath>* paths) { |
153 | 5 | std::vector<string> path_vec = strings::Split(config_path, ";", strings::SkipWhitespace()); |
154 | 5 | if (path_vec.empty()) { |
155 | | // means compute node |
156 | 0 | return Status::OK(); |
157 | 0 | } |
158 | 5 | if (path_vec.back().empty()) { |
159 | | // deal with the case that user add `;` to the tail |
160 | 0 | path_vec.pop_back(); |
161 | 0 | } |
162 | | |
163 | 5 | std::set<std::string> real_paths; |
164 | 8 | for (auto& item : path_vec) { |
165 | 8 | StorePath path; |
166 | 8 | auto res = parse_root_path(item, &path); |
167 | 8 | if (res.ok()) { |
168 | 8 | auto success = real_paths.emplace(path.path).second; |
169 | 8 | if (success) { |
170 | 7 | paths->emplace_back(std::move(path)); |
171 | 7 | } else { |
172 | 1 | LOG(WARNING) << "a duplicated path is found " << path.path; |
173 | 1 | return Status::Error<INVALID_ARGUMENT>("a duplicated path is found, path={}", |
174 | 1 | path.path); |
175 | 1 | } |
176 | 8 | } else { |
177 | 0 | LOG(WARNING) << "failed to parse store path " << item << ", res=" << res; |
178 | 0 | } |
179 | 8 | } |
180 | 4 | if ((path_vec.size() != paths->size() && !config::ignore_broken_disk)) { |
181 | 0 | return Status::Error<INVALID_ARGUMENT>("fail to parse storage_root_path config. value={}", |
182 | 0 | config_path); |
183 | 0 | } |
184 | 4 | return Status::OK(); |
185 | 4 | } |
186 | | |
187 | 3 | void parse_conf_broken_store_paths(const string& config_path, std::set<std::string>* paths) { |
188 | 3 | std::vector<string> path_vec = strings::Split(config_path, ";", strings::SkipWhitespace()); |
189 | 3 | if (path_vec.empty()) { |
190 | 0 | return; |
191 | 0 | } |
192 | 3 | if (path_vec.back().empty()) { |
193 | | // deal with the case that user add `;` to the tail |
194 | 0 | path_vec.pop_back(); |
195 | 0 | } |
196 | 5 | for (auto& item : path_vec) { |
197 | 5 | paths->emplace(item); |
198 | 5 | } |
199 | 3 | return; |
200 | 3 | } |
201 | | |
202 | | /** format: |
203 | | * [ |
204 | | * {"path": "storage1", "total_size":53687091200,"query_limit": "10737418240"}, |
205 | | * {"path": "storage2", "total_size":53687091200}, |
206 | | * {"path": "storage3", "total_size":53687091200, "normal_percent":85, "disposable_percent":10, "index_percent":5} |
207 | | * ] |
208 | | */ |
209 | 6 | Status parse_conf_cache_paths(const std::string& config_path, std::vector<CachePath>& paths) { |
210 | 6 | using namespace rapidjson; |
211 | 6 | Document document; |
212 | 6 | document.Parse(config_path.c_str()); |
213 | 6 | DCHECK(document.IsArray()) << config_path << " " << document.GetType(); |
214 | 9 | for (auto& config : document.GetArray()) { |
215 | 9 | auto map = config.GetObject(); |
216 | 9 | DCHECK(map.HasMember(CACHE_PATH.c_str())); |
217 | 9 | std::string path = map.FindMember(CACHE_PATH.c_str())->value.GetString(); |
218 | 9 | int64_t total_size = 0, query_limit_bytes = 0; |
219 | 9 | if (map.HasMember(CACHE_TOTAL_SIZE.c_str())) { |
220 | 8 | auto& value = map.FindMember(CACHE_TOTAL_SIZE.c_str())->value; |
221 | 8 | if (value.IsInt64()) { |
222 | 7 | total_size = value.GetInt64(); |
223 | 7 | } else { |
224 | 1 | return Status::InvalidArgument("total_size should be int64"); |
225 | 1 | } |
226 | 8 | } |
227 | 8 | if (config::enable_file_cache_query_limit) { |
228 | 8 | if (map.HasMember(CACHE_QUERY_LIMIT_SIZE.c_str())) { |
229 | 8 | auto& value = map.FindMember(CACHE_QUERY_LIMIT_SIZE.c_str())->value; |
230 | 8 | if (value.IsInt64()) { |
231 | 7 | query_limit_bytes = value.GetInt64(); |
232 | 7 | } else { |
233 | 1 | return Status::InvalidArgument("query_limit should be int64"); |
234 | 1 | } |
235 | 8 | } |
236 | 8 | } |
237 | 7 | if (total_size <= 0 || (config::enable_file_cache_query_limit && query_limit_bytes <= 0)) { |
238 | 0 | return Status::InvalidArgument( |
239 | 0 | "total_size or query_limit should not less than or equal to zero"); |
240 | 0 | } |
241 | | |
242 | | // percent |
243 | 9 | auto get_percent_value = [&](const std::string& key, size_t& percent) { |
244 | 9 | auto& value = map.FindMember(key.c_str())->value; |
245 | 9 | if (value.IsUint()) { |
246 | 9 | percent = value.GetUint(); |
247 | 9 | } else { |
248 | 0 | return Status::InvalidArgument("percent should be uint"); |
249 | 0 | } |
250 | 9 | return Status::OK(); |
251 | 9 | }; |
252 | | |
253 | 7 | size_t normal_percent = 85; |
254 | 7 | size_t disposable_percent = 10; |
255 | 7 | size_t index_percent = 5; |
256 | 7 | bool has_normal_percent = map.HasMember(CACHE_NORMAL_PERCENT.c_str()); |
257 | 7 | bool has_disposable_percent = map.HasMember(CACHE_DISPOSABLE_PERCENT.c_str()); |
258 | 7 | bool has_index_percent = map.HasMember(CACHE_INDEX_PERCENT.c_str()); |
259 | 7 | if (has_normal_percent && has_disposable_percent && has_index_percent) { |
260 | 3 | RETURN_IF_ERROR(get_percent_value(CACHE_NORMAL_PERCENT, normal_percent)); |
261 | 3 | RETURN_IF_ERROR(get_percent_value(CACHE_DISPOSABLE_PERCENT, disposable_percent)); |
262 | 3 | RETURN_IF_ERROR(get_percent_value(CACHE_INDEX_PERCENT, index_percent)); |
263 | 4 | } else if (has_normal_percent || has_disposable_percent || has_index_percent) { |
264 | 1 | return Status::InvalidArgument( |
265 | 1 | "cache percent config must either be all set or all unset."); |
266 | 1 | } |
267 | 6 | if ((normal_percent + disposable_percent + index_percent) != 100) { |
268 | 1 | return Status::InvalidArgument("The sum of cache percent config must equal 100."); |
269 | 1 | } |
270 | | |
271 | 5 | paths.emplace_back(std::move(path), total_size, query_limit_bytes, normal_percent, |
272 | 5 | disposable_percent, index_percent); |
273 | 5 | } |
274 | 2 | if (paths.empty()) { |
275 | 0 | return Status::InvalidArgument("fail to parse storage_root_path config. value={}", |
276 | 0 | config_path); |
277 | 0 | } |
278 | 2 | return Status::OK(); |
279 | 2 | } |
280 | | |
281 | 5 | io::FileCacheSettings CachePath::init_settings() const { |
282 | 5 | io::FileCacheSettings settings; |
283 | 5 | settings.total_size = total_bytes; |
284 | 5 | settings.max_file_segment_size = config::file_cache_max_file_segment_size; |
285 | 5 | settings.max_query_cache_size = query_limit_bytes; |
286 | 5 | size_t per_size = settings.total_size / 100; |
287 | 5 | settings.disposable_queue_size = per_size * disposable_percent; |
288 | 5 | settings.disposable_queue_elements = |
289 | 5 | std::max(settings.disposable_queue_size / settings.max_file_segment_size, |
290 | 5 | io::REMOTE_FS_OBJECTS_CACHE_DEFAULT_ELEMENTS); |
291 | | |
292 | 5 | settings.index_queue_size = per_size * index_percent; |
293 | 5 | settings.index_queue_elements = |
294 | 5 | std::max(settings.index_queue_size / settings.max_file_segment_size, |
295 | 5 | io::REMOTE_FS_OBJECTS_CACHE_DEFAULT_ELEMENTS); |
296 | | |
297 | 5 | settings.query_queue_size = |
298 | 5 | settings.total_size - settings.disposable_queue_size - settings.index_queue_size; |
299 | 5 | settings.query_queue_elements = |
300 | 5 | std::max(settings.query_queue_size / settings.max_file_segment_size, |
301 | 5 | io::REMOTE_FS_OBJECTS_CACHE_DEFAULT_ELEMENTS); |
302 | 5 | return settings; |
303 | 5 | } |
304 | | |
305 | | } // end namespace doris |