/root/doris/be/src/olap/collection_statistics.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "collection_statistics.h" |
19 | | |
20 | | #include <set> |
21 | | #include <sstream> |
22 | | |
23 | | #include "common/exception.h" |
24 | | #include "olap/rowset/rowset.h" |
25 | | #include "olap/rowset/rowset_reader.h" |
26 | | #include "olap/rowset/segment_v2/index_file_reader.h" |
27 | | #include "olap/rowset/segment_v2/index_reader_helper.h" |
28 | | #include "olap/rowset/segment_v2/inverted_index/analyzer/analyzer.h" |
29 | | #include "olap/rowset/segment_v2/inverted_index/util/string_helper.h" |
30 | | #include "util/uid_util.h" |
31 | | #include "vec/exprs/vexpr.h" |
32 | | #include "vec/exprs/vexpr_context.h" |
33 | | #include "vec/exprs/vliteral.h" |
34 | | #include "vec/exprs/vslot_ref.h" |
35 | | |
36 | | namespace doris { |
37 | | #include "common/compile_check_begin.h" |
38 | | |
39 | | Status CollectionStatistics::collect( |
40 | | RuntimeState* state, const std::vector<RowSetSplits>& rs_splits, |
41 | | const TabletSchemaSPtr& tablet_schema, |
42 | 10 | const vectorized::VExprContextSPtrs& common_expr_ctxs_push_down, io::IOContext* io_ctx) { |
43 | 10 | std::unordered_map<std::wstring, CollectInfo> collect_infos; |
44 | 10 | RETURN_IF_ERROR( |
45 | 10 | extract_collect_info(state, common_expr_ctxs_push_down, tablet_schema, &collect_infos)); |
46 | 10 | if (collect_infos.empty()) { |
47 | 10 | LOG(WARNING) << "Index statistics collection: no collect info extracted."; |
48 | 10 | return Status::OK(); |
49 | 10 | } |
50 | | |
51 | 0 | for (const auto& rs_split : rs_splits) { |
52 | 0 | const auto& rs_reader = rs_split.rs_reader; |
53 | 0 | auto rowset = rs_reader->rowset(); |
54 | 0 | auto num_segments = rowset->num_segments(); |
55 | 0 | for (int32_t seg_id = 0; seg_id < num_segments; ++seg_id) { |
56 | 0 | auto status = |
57 | 0 | process_segment(rowset, seg_id, tablet_schema.get(), collect_infos, io_ctx); |
58 | 0 | if (!status.ok()) { |
59 | 0 | if (status.code() == ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND || |
60 | 0 | status.code() == ErrorCode::INVERTED_INDEX_BYPASS) { |
61 | 0 | LOG(ERROR) << "Index statistics collection failed: " << status.to_string(); |
62 | 0 | } else { |
63 | 0 | return status; |
64 | 0 | } |
65 | 0 | } |
66 | 0 | } |
67 | 0 | } |
68 | | |
69 | | // Build a single-line log with query_id, tablet_ids, and per-field term statistics |
70 | 0 | if (VLOG_IS_ON(1)) { |
71 | 0 | std::set<int64_t> tablet_ids; |
72 | 0 | for (const auto& rs_split : rs_splits) { |
73 | 0 | if (rs_split.rs_reader && rs_split.rs_reader->rowset()) { |
74 | 0 | tablet_ids.insert(rs_split.rs_reader->rowset()->rowset_meta()->tablet_id()); |
75 | 0 | } |
76 | 0 | } |
77 | |
|
78 | 0 | std::ostringstream oss; |
79 | 0 | oss << "CollectionStatistics: query_id=" << print_id(state->query_id()); |
80 | |
|
81 | 0 | oss << ", tablet_ids=["; |
82 | 0 | bool first_tablet = true; |
83 | 0 | for (int64_t tid : tablet_ids) { |
84 | 0 | if (!first_tablet) oss << ","; |
85 | 0 | oss << tid; |
86 | 0 | first_tablet = false; |
87 | 0 | } |
88 | 0 | oss << "]"; |
89 | |
|
90 | 0 | oss << ", total_num_docs=" << _total_num_docs; |
91 | |
|
92 | 0 | for (const auto& [ws_field_name, num_tokens] : _total_num_tokens) { |
93 | 0 | oss << ", {field=" << StringHelper::to_string(ws_field_name) |
94 | 0 | << ", num_tokens=" << num_tokens << ", terms=["; |
95 | |
|
96 | 0 | bool first_term = true; |
97 | 0 | for (const auto& [term, doc_freq] : _term_doc_freqs.at(ws_field_name)) { |
98 | 0 | if (!first_term) oss << ", "; |
99 | 0 | oss << "(" << StringHelper::to_string(term) << ":" << doc_freq << ")"; |
100 | 0 | first_term = false; |
101 | 0 | } |
102 | 0 | oss << "]}"; |
103 | 0 | } |
104 | |
|
105 | 0 | VLOG(1) << oss.str(); |
106 | 0 | } |
107 | |
|
108 | 0 | return Status::OK(); |
109 | 0 | } |
110 | | |
111 | 14 | vectorized::VSlotRef* find_slot_ref(const vectorized::VExprSPtr& expr) { |
112 | 14 | if (!expr) return nullptr; |
113 | 13 | auto cur = vectorized::VExpr::expr_without_cast(expr); |
114 | 13 | if (cur->node_type() == TExprNodeType::SLOT_REF) { |
115 | 12 | return static_cast<vectorized::VSlotRef*>(cur.get()); |
116 | 12 | } |
117 | 1 | for (auto& ch : cur->children()) { |
118 | 1 | if (auto* s = find_slot_ref(ch)) return s; |
119 | 1 | } |
120 | 0 | return nullptr; |
121 | 1 | } |
122 | | |
123 | | Status handle_match_pred(RuntimeState* state, const TabletSchemaSPtr& tablet_schema, |
124 | | const vectorized::VExprSPtr& expr, |
125 | 9 | std::unordered_map<std::wstring, CollectInfo>* collect_infos) { |
126 | 9 | auto* left_slot_ref = find_slot_ref(expr->children()[0]); |
127 | 9 | if (left_slot_ref == nullptr) { |
128 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_NOT_SUPPORTED>( |
129 | 0 | "Index statistics collection failed: Cannot find slot reference in match predicate " |
130 | 0 | "left expression"); |
131 | 0 | } |
132 | 9 | auto* right_literal = static_cast<vectorized::VLiteral*>(expr->children()[1].get()); |
133 | 9 | DCHECK(right_literal != nullptr); |
134 | | |
135 | 9 | const auto* sd = state->desc_tbl().get_slot_descriptor(left_slot_ref->slot_id()); |
136 | 9 | if (sd == nullptr) { |
137 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_NOT_SUPPORTED>( |
138 | 0 | "Index statistics collection failed: Cannot find slot descriptor for slot_id={}", |
139 | 0 | left_slot_ref->slot_id()); |
140 | 0 | } |
141 | 9 | int32_t col_idx = tablet_schema->field_index(left_slot_ref->column_name()); |
142 | 9 | if (col_idx == -1) { |
143 | 0 | return Status::Error<ErrorCode::INVERTED_INDEX_NOT_SUPPORTED>( |
144 | 0 | "Index statistics collection failed: Cannot find column index for column={}", |
145 | 0 | left_slot_ref->column_name()); |
146 | 0 | } |
147 | | |
148 | 9 | const auto& column = tablet_schema->column(col_idx); |
149 | 9 | auto index_metas = tablet_schema->inverted_indexs(sd->col_unique_id(), column.suffix_path()); |
150 | | #ifndef BE_TEST |
151 | | if (index_metas.empty()) { |
152 | | return Status::Error<ErrorCode::INVERTED_INDEX_NOT_SUPPORTED>( |
153 | | "Index statistics collection failed: Score query is not supported without inverted " |
154 | | "index for column={}", |
155 | | left_slot_ref->column_name()); |
156 | | } |
157 | | #endif |
158 | | |
159 | 9 | auto format_options = vectorized::DataTypeSerDe::get_default_format_options(); |
160 | 9 | format_options.timezone = &state->timezone_obj(); |
161 | 9 | for (const auto* index_meta : index_metas) { |
162 | 0 | if (!InvertedIndexAnalyzer::should_analyzer(index_meta->properties())) { |
163 | 0 | continue; |
164 | 0 | } |
165 | 0 | if (!segment_v2::IndexReaderHelper::is_need_similarity_score(expr->op(), index_meta)) { |
166 | 0 | continue; |
167 | 0 | } |
168 | | |
169 | 0 | auto term_infos = InvertedIndexAnalyzer::get_analyse_result( |
170 | 0 | right_literal->value(format_options), index_meta->properties()); |
171 | 0 | if (term_infos.empty()) { |
172 | 0 | LOG(WARNING) << "Index statistics collection: no terms extracted from literal value, " |
173 | 0 | << "col_unique_id=" << index_meta->col_unique_ids()[0]; |
174 | 0 | continue; |
175 | 0 | } |
176 | | |
177 | 0 | std::string field_name = std::to_string(index_meta->col_unique_ids()[0]); |
178 | 0 | if (!column.suffix_path().empty()) { |
179 | 0 | field_name += "." + column.suffix_path(); |
180 | 0 | } |
181 | 0 | std::wstring ws_field_name = StringHelper::to_wstring(field_name); |
182 | 0 | auto iter = collect_infos->find(ws_field_name); |
183 | 0 | if (iter == collect_infos->end()) { |
184 | 0 | CollectInfo collect_info; |
185 | 0 | collect_info.term_infos.insert(term_infos.begin(), term_infos.end()); |
186 | 0 | collect_info.index_meta = index_meta; |
187 | 0 | (*collect_infos)[ws_field_name] = std::move(collect_info); |
188 | 0 | } else { |
189 | 0 | iter->second.term_infos.insert(term_infos.begin(), term_infos.end()); |
190 | 0 | } |
191 | 0 | } |
192 | 9 | return Status::OK(); |
193 | 9 | } |
194 | | |
195 | | Status CollectionStatistics::extract_collect_info( |
196 | | RuntimeState* state, const vectorized::VExprContextSPtrs& common_expr_ctxs_push_down, |
197 | | const TabletSchemaSPtr& tablet_schema, |
198 | 10 | std::unordered_map<std::wstring, CollectInfo>* collect_infos) { |
199 | 10 | for (const auto& root_expr_ctx : common_expr_ctxs_push_down) { |
200 | 10 | const auto& root_expr = root_expr_ctx->root(); |
201 | 10 | if (root_expr == nullptr) { |
202 | 0 | continue; |
203 | 0 | } |
204 | | |
205 | 10 | std::stack<vectorized::VExprSPtr> stack; |
206 | 10 | stack.emplace(root_expr); |
207 | | |
208 | 24 | while (!stack.empty()) { |
209 | 14 | const auto& expr = stack.top(); |
210 | 14 | stack.pop(); |
211 | | |
212 | 14 | if (expr->node_type() == TExprNodeType::MATCH_PRED) { |
213 | 9 | RETURN_IF_ERROR(handle_match_pred(state, tablet_schema, expr, collect_infos)); |
214 | 9 | } |
215 | | |
216 | 14 | const auto& children = expr->children(); |
217 | 37 | for (int32_t i = static_cast<int32_t>(children.size()) - 1; i >= 0; --i) { |
218 | 23 | if (!children[i]->children().empty()) { |
219 | 4 | stack.emplace(children[i]); |
220 | 4 | } |
221 | 23 | } |
222 | 14 | } |
223 | 10 | } |
224 | 10 | return Status::OK(); |
225 | 10 | } |
226 | | |
227 | | Status CollectionStatistics::process_segment( |
228 | | const RowsetSharedPtr& rowset, int32_t seg_id, const TabletSchema* tablet_schema, |
229 | 0 | const std::unordered_map<std::wstring, CollectInfo>& collect_infos, io::IOContext* io_ctx) { |
230 | 0 | auto seg_path = DORIS_TRY(rowset->segment_path(seg_id)); |
231 | 0 | auto rowset_meta = rowset->rowset_meta(); |
232 | |
|
233 | 0 | auto idx_file_reader = std::make_unique<IndexFileReader>( |
234 | 0 | rowset_meta->fs(), |
235 | 0 | std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)}, |
236 | 0 | tablet_schema->get_inverted_index_storage_format(), |
237 | 0 | rowset_meta->inverted_index_file_info(seg_id)); |
238 | 0 | RETURN_IF_ERROR(idx_file_reader->init(config::inverted_index_read_buffer_size, io_ctx)); |
239 | | |
240 | 0 | int32_t total_seg_num_docs = 0; |
241 | 0 | for (const auto& [ws_field_name, collect_info] : collect_infos) { |
242 | 0 | #ifdef BE_TEST |
243 | 0 | auto compound_reader = DORIS_TRY(idx_file_reader->open(collect_info.index_meta, io_ctx)); |
244 | 0 | auto* reader = lucene::index::IndexReader::open(compound_reader.get()); |
245 | 0 | auto index_searcher = std::make_shared<lucene::search::IndexSearcher>(reader, true); |
246 | |
|
247 | 0 | auto* index_reader = index_searcher->getReader(); |
248 | | #else |
249 | | InvertedIndexCacheHandle inverted_index_cache_handle; |
250 | | auto index_file_key = idx_file_reader->get_index_file_cache_key(collect_info.index_meta); |
251 | | InvertedIndexSearcherCache::CacheKey searcher_cache_key(index_file_key); |
252 | | if (!InvertedIndexSearcherCache::instance()->lookup(searcher_cache_key, |
253 | | &inverted_index_cache_handle)) { |
254 | | auto compound_reader = |
255 | | DORIS_TRY(idx_file_reader->open(collect_info.index_meta, io_ctx)); |
256 | | auto* reader = lucene::index::IndexReader::open(compound_reader.get()); |
257 | | size_t reader_size = reader->getTermInfosRAMUsed(); |
258 | | auto index_searcher = std::make_shared<lucene::search::IndexSearcher>(reader, true); |
259 | | auto* cache_value = new InvertedIndexSearcherCache::CacheValue( |
260 | | std::move(index_searcher), reader_size, UnixMillis()); |
261 | | InvertedIndexSearcherCache::instance()->insert(searcher_cache_key, cache_value, |
262 | | &inverted_index_cache_handle); |
263 | | } |
264 | | |
265 | | auto searcher_variant = inverted_index_cache_handle.get_index_searcher(); |
266 | | auto index_searcher = std::get<FulltextIndexSearcherPtr>(searcher_variant); |
267 | | auto* index_reader = index_searcher->getReader(); |
268 | | #endif |
269 | |
|
270 | 0 | total_seg_num_docs = std::max(total_seg_num_docs, index_reader->maxDoc()); |
271 | 0 | _total_num_tokens[ws_field_name] += |
272 | 0 | index_reader->sumTotalTermFreq(ws_field_name.c_str()).value_or(0); |
273 | |
|
274 | 0 | for (const auto& term_info : collect_info.term_infos) { |
275 | 0 | auto iter = TermIterator::create(io_ctx, false, index_reader, ws_field_name, |
276 | 0 | term_info.get_single_term()); |
277 | 0 | _term_doc_freqs[ws_field_name][iter->term()] += iter->doc_freq(); |
278 | 0 | } |
279 | 0 | } |
280 | 0 | _total_num_docs += total_seg_num_docs; |
281 | 0 | return Status::OK(); |
282 | 0 | } |
283 | | |
284 | | uint64_t CollectionStatistics::get_term_doc_freq_by_col(const std::wstring& lucene_col_name, |
285 | 22 | const std::wstring& term) { |
286 | 22 | if (!_term_doc_freqs.contains(lucene_col_name)) { |
287 | 1 | throw Exception(ErrorCode::INVERTED_INDEX_CLUCENE_ERROR, |
288 | 1 | "Index statistics collection failed: Not such column {}", |
289 | 1 | StringHelper::to_string(lucene_col_name)); |
290 | 1 | } |
291 | | |
292 | 21 | if (!_term_doc_freqs[lucene_col_name].contains(term)) { |
293 | 0 | throw Exception(ErrorCode::INVERTED_INDEX_CLUCENE_ERROR, |
294 | 0 | "Index statistics collection failed: Not such term {}", |
295 | 0 | StringHelper::to_string(term)); |
296 | 0 | } |
297 | | |
298 | 21 | return _term_doc_freqs[lucene_col_name][term]; |
299 | 21 | } |
300 | | |
301 | 12 | uint64_t CollectionStatistics::get_total_term_cnt_by_col(const std::wstring& lucene_col_name) { |
302 | 12 | if (!_total_num_tokens.contains(lucene_col_name)) { |
303 | 2 | throw Exception(ErrorCode::INVERTED_INDEX_CLUCENE_ERROR, |
304 | 2 | "Index statistics collection failed: Not such column {}", |
305 | 2 | StringHelper::to_string(lucene_col_name)); |
306 | 2 | } |
307 | | |
308 | 10 | return _total_num_tokens[lucene_col_name]; |
309 | 12 | } |
310 | | |
311 | 31 | uint64_t CollectionStatistics::get_doc_num() const { |
312 | 31 | if (_total_num_docs == 0) { |
313 | 3 | throw Exception( |
314 | 3 | ErrorCode::INVERTED_INDEX_CLUCENE_ERROR, |
315 | 3 | "Index statistics collection failed: No data available for SimilarityCollector"); |
316 | 3 | } |
317 | | |
318 | 28 | return _total_num_docs; |
319 | 31 | } |
320 | | |
321 | 11 | float CollectionStatistics::get_or_calculate_avg_dl(const std::wstring& lucene_col_name) { |
322 | 11 | auto iter = _avg_dl_by_col.find(lucene_col_name); |
323 | 11 | if (iter != _avg_dl_by_col.end()) { |
324 | 2 | return iter->second; |
325 | 2 | } |
326 | | |
327 | 9 | const uint64_t total_term_cnt = get_total_term_cnt_by_col(lucene_col_name); |
328 | 9 | const uint64_t total_doc_cnt = get_doc_num(); |
329 | 9 | float avg_dl = total_doc_cnt > 0 ? float((double)total_term_cnt / (double)total_doc_cnt) : 0.0F; |
330 | 9 | _avg_dl_by_col[lucene_col_name] = avg_dl; |
331 | 9 | return avg_dl; |
332 | 11 | } |
333 | | |
334 | | float CollectionStatistics::get_or_calculate_idf(const std::wstring& lucene_col_name, |
335 | 21 | const std::wstring& term) { |
336 | 21 | auto iter = _idf_by_col_term.find(lucene_col_name); |
337 | 21 | if (iter != _idf_by_col_term.end()) { |
338 | 12 | auto term_iter = iter->second.find(term); |
339 | 12 | if (term_iter != iter->second.end()) { |
340 | 1 | return term_iter->second; |
341 | 1 | } |
342 | 12 | } |
343 | | |
344 | 20 | const uint64_t doc_num = get_doc_num(); |
345 | 20 | const uint64_t doc_freq = get_term_doc_freq_by_col(lucene_col_name, term); |
346 | 20 | auto idf = (float)std::log(1 + ((double)doc_num - (double)doc_freq + (double)0.5) / |
347 | 20 | ((double)doc_freq + (double)0.5)); |
348 | 20 | _idf_by_col_term[lucene_col_name][term] = idf; |
349 | 20 | return idf; |
350 | 21 | } |
351 | | |
352 | | #include "common/compile_check_end.h" |
353 | | } // namespace doris |