Coverage Report

Created: 2026-01-04 14:12

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/olap/collection_statistics.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "collection_statistics.h"
19
20
#include <sstream>
21
22
#include "common/exception.h"
23
#include "olap/rowset/rowset.h"
24
#include "olap/rowset/rowset_reader.h"
25
#include "olap/rowset/segment_v2/index_file_reader.h"
26
#include "olap/rowset/segment_v2/index_reader_helper.h"
27
#include "olap/rowset/segment_v2/inverted_index/analyzer/analyzer.h"
28
#include "olap/rowset/segment_v2/inverted_index/util/string_helper.h"
29
#include "vec/exprs/vexpr.h"
30
#include "vec/exprs/vexpr_context.h"
31
#include "vec/exprs/vliteral.h"
32
#include "vec/exprs/vslot_ref.h"
33
34
namespace doris {
35
#include "common/compile_check_begin.h"
36
37
Status CollectionStatistics::collect(
38
        RuntimeState* state, const std::vector<RowSetSplits>& rs_splits,
39
        const TabletSchemaSPtr& tablet_schema,
40
10
        const vectorized::VExprContextSPtrs& common_expr_ctxs_push_down, io::IOContext* io_ctx) {
41
10
    std::unordered_map<std::wstring, CollectInfo> collect_infos;
42
10
    RETURN_IF_ERROR(
43
10
            extract_collect_info(state, common_expr_ctxs_push_down, tablet_schema, &collect_infos));
44
10
    if (collect_infos.empty()) {
45
10
        LOG(WARNING) << "Index statistics collection: no collect info extracted.";
46
10
        return Status::OK();
47
10
    }
48
49
0
    for (const auto& rs_split : rs_splits) {
50
0
        const auto& rs_reader = rs_split.rs_reader;
51
0
        auto rowset = rs_reader->rowset();
52
0
        auto num_segments = rowset->num_segments();
53
0
        for (int32_t seg_id = 0; seg_id < num_segments; ++seg_id) {
54
0
            auto status =
55
0
                    process_segment(rowset, seg_id, tablet_schema.get(), collect_infos, io_ctx);
56
0
            if (!status.ok()) {
57
0
                if (status.code() == ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND ||
58
0
                    status.code() == ErrorCode::INVERTED_INDEX_BYPASS) {
59
0
                    LOG(ERROR) << "Index statistics collection failed: " << status.to_string();
60
0
                } else {
61
0
                    return status;
62
0
                }
63
0
            }
64
0
        }
65
0
    }
66
67
0
#ifndef NDEBUG
68
0
    std::stringstream ss;
69
0
    ss << "term_num_docs: " << _total_num_docs;
70
0
    for (const auto& [ws_field_name, num_tokens] : _total_num_tokens) {
71
0
        ss << ", [field_name: " << StringHelper::to_string(ws_field_name)
72
0
           << ", num_tokens: " << num_tokens;
73
0
        auto it = _term_doc_freqs.find(ws_field_name);
74
0
        if (it != _term_doc_freqs.end()) {
75
0
            ss << ", terms: {";
76
0
            bool first = true;
77
0
            for (const auto& [term, doc_freq] : it->second) {
78
0
                if (!first) {
79
0
                    ss << ", ";
80
0
                }
81
0
                ss << StringHelper::to_string(term) << ": " << doc_freq;
82
0
                first = false;
83
0
            }
84
0
            ss << "}";
85
0
        } else {
86
0
            ss << ", (no term stats)";
87
0
        }
88
0
        ss << "]";
89
0
    }
90
0
    LOG(INFO) << "CollectionStatistics: " << ss.str();
91
0
#endif
92
93
0
    return Status::OK();
94
0
}
95
96
14
vectorized::VSlotRef* find_slot_ref(const vectorized::VExprSPtr& expr) {
97
14
    if (!expr) return nullptr;
98
13
    auto cur = vectorized::VExpr::expr_without_cast(expr);
99
13
    if (cur->node_type() == TExprNodeType::SLOT_REF) {
100
12
        return static_cast<vectorized::VSlotRef*>(cur.get());
101
12
    }
102
1
    for (auto& ch : cur->children()) {
103
1
        if (auto* s = find_slot_ref(ch)) return s;
104
1
    }
105
0
    return nullptr;
106
1
}
107
108
Status handle_match_pred(RuntimeState* state, const TabletSchemaSPtr& tablet_schema,
109
                         const vectorized::VExprSPtr& expr,
110
9
                         std::unordered_map<std::wstring, CollectInfo>* collect_infos) {
111
9
    auto* left_slot_ref = find_slot_ref(expr->children()[0]);
112
9
    if (left_slot_ref == nullptr) {
113
0
        return Status::Error<ErrorCode::INVERTED_INDEX_NOT_SUPPORTED>(
114
0
                "Index statistics collection failed: Cannot find slot reference in match predicate "
115
0
                "left expression");
116
0
    }
117
9
    auto* right_literal = static_cast<vectorized::VLiteral*>(expr->children()[1].get());
118
9
    DCHECK(right_literal != nullptr);
119
120
9
    const auto* sd = state->desc_tbl().get_slot_descriptor(left_slot_ref->slot_id());
121
9
    if (sd == nullptr) {
122
0
        return Status::Error<ErrorCode::INVERTED_INDEX_NOT_SUPPORTED>(
123
0
                "Index statistics collection failed: Cannot find slot descriptor for slot_id={}",
124
0
                left_slot_ref->slot_id());
125
0
    }
126
9
    int32_t col_idx = tablet_schema->field_index(left_slot_ref->column_name());
127
9
    if (col_idx == -1) {
128
0
        return Status::Error<ErrorCode::INVERTED_INDEX_NOT_SUPPORTED>(
129
0
                "Index statistics collection failed: Cannot find column index for column={}",
130
0
                left_slot_ref->column_name());
131
0
    }
132
133
9
    const auto& column = tablet_schema->column(col_idx);
134
9
    auto index_metas = tablet_schema->inverted_indexs(sd->col_unique_id(), column.suffix_path());
135
#ifndef BE_TEST
136
    if (index_metas.empty()) {
137
        return Status::Error<ErrorCode::INVERTED_INDEX_NOT_SUPPORTED>(
138
                "Index statistics collection failed: Score query is not supported without inverted "
139
                "index for column={}",
140
                left_slot_ref->column_name());
141
    }
142
#endif
143
144
9
    auto format_options = vectorized::DataTypeSerDe::get_default_format_options();
145
9
    format_options.timezone = &state->timezone_obj();
146
9
    for (const auto* index_meta : index_metas) {
147
0
        if (!InvertedIndexAnalyzer::should_analyzer(index_meta->properties())) {
148
0
            continue;
149
0
        }
150
0
        if (!segment_v2::IndexReaderHelper::is_need_similarity_score(expr->op(), index_meta)) {
151
0
            continue;
152
0
        }
153
154
0
        auto term_infos = InvertedIndexAnalyzer::get_analyse_result(
155
0
                right_literal->value(format_options), index_meta->properties());
156
0
        if (term_infos.empty()) {
157
0
            LOG(WARNING) << "Index statistics collection: no terms extracted from literal value, "
158
0
                         << "col_unique_id=" << index_meta->col_unique_ids()[0];
159
0
            continue;
160
0
        }
161
162
0
        std::string field_name = std::to_string(index_meta->col_unique_ids()[0]);
163
0
        if (!column.suffix_path().empty()) {
164
0
            field_name += "." + column.suffix_path();
165
0
        }
166
0
        std::wstring ws_field_name = StringHelper::to_wstring(field_name);
167
0
        auto iter = collect_infos->find(ws_field_name);
168
0
        if (iter == collect_infos->end()) {
169
0
            CollectInfo collect_info;
170
0
            collect_info.term_infos.insert(term_infos.begin(), term_infos.end());
171
0
            collect_info.index_meta = index_meta;
172
0
            (*collect_infos)[ws_field_name] = std::move(collect_info);
173
0
        } else {
174
0
            iter->second.term_infos.insert(term_infos.begin(), term_infos.end());
175
0
        }
176
0
    }
177
9
    return Status::OK();
178
9
}
179
180
Status CollectionStatistics::extract_collect_info(
181
        RuntimeState* state, const vectorized::VExprContextSPtrs& common_expr_ctxs_push_down,
182
        const TabletSchemaSPtr& tablet_schema,
183
10
        std::unordered_map<std::wstring, CollectInfo>* collect_infos) {
184
10
    for (const auto& root_expr_ctx : common_expr_ctxs_push_down) {
185
10
        const auto& root_expr = root_expr_ctx->root();
186
10
        if (root_expr == nullptr) {
187
0
            continue;
188
0
        }
189
190
10
        std::stack<vectorized::VExprSPtr> stack;
191
10
        stack.emplace(root_expr);
192
193
24
        while (!stack.empty()) {
194
14
            const auto& expr = stack.top();
195
14
            stack.pop();
196
197
14
            if (expr->node_type() == TExprNodeType::MATCH_PRED) {
198
9
                RETURN_IF_ERROR(handle_match_pred(state, tablet_schema, expr, collect_infos));
199
9
            }
200
201
14
            const auto& children = expr->children();
202
37
            for (int32_t i = static_cast<int32_t>(children.size()) - 1; i >= 0; --i) {
203
23
                if (!children[i]->children().empty()) {
204
4
                    stack.emplace(children[i]);
205
4
                }
206
23
            }
207
14
        }
208
10
    }
209
10
    return Status::OK();
210
10
}
211
212
Status CollectionStatistics::process_segment(
213
        const RowsetSharedPtr& rowset, int32_t seg_id, const TabletSchema* tablet_schema,
214
0
        const std::unordered_map<std::wstring, CollectInfo>& collect_infos, io::IOContext* io_ctx) {
215
0
    auto seg_path = DORIS_TRY(rowset->segment_path(seg_id));
216
0
    auto rowset_meta = rowset->rowset_meta();
217
218
0
    auto idx_file_reader = std::make_unique<IndexFileReader>(
219
0
            rowset_meta->fs(),
220
0
            std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)},
221
0
            tablet_schema->get_inverted_index_storage_format(),
222
0
            rowset_meta->inverted_index_file_info(seg_id));
223
0
    RETURN_IF_ERROR(idx_file_reader->init(config::inverted_index_read_buffer_size, io_ctx));
224
225
0
    int32_t total_seg_num_docs = 0;
226
0
    for (const auto& [ws_field_name, collect_info] : collect_infos) {
227
0
#ifdef BE_TEST
228
0
        auto compound_reader = DORIS_TRY(idx_file_reader->open(collect_info.index_meta, io_ctx));
229
0
        auto* reader = lucene::index::IndexReader::open(compound_reader.get());
230
0
        auto index_searcher = std::make_shared<lucene::search::IndexSearcher>(reader, true);
231
232
0
        auto* index_reader = index_searcher->getReader();
233
#else
234
        InvertedIndexCacheHandle inverted_index_cache_handle;
235
        auto index_file_key = idx_file_reader->get_index_file_cache_key(collect_info.index_meta);
236
        InvertedIndexSearcherCache::CacheKey searcher_cache_key(index_file_key);
237
        if (!InvertedIndexSearcherCache::instance()->lookup(searcher_cache_key,
238
                                                            &inverted_index_cache_handle)) {
239
            auto compound_reader =
240
                    DORIS_TRY(idx_file_reader->open(collect_info.index_meta, io_ctx));
241
            auto* reader = lucene::index::IndexReader::open(compound_reader.get());
242
            size_t reader_size = reader->getTermInfosRAMUsed();
243
            auto index_searcher = std::make_shared<lucene::search::IndexSearcher>(reader, true);
244
            auto* cache_value = new InvertedIndexSearcherCache::CacheValue(
245
                    std::move(index_searcher), reader_size, UnixMillis());
246
            InvertedIndexSearcherCache::instance()->insert(searcher_cache_key, cache_value,
247
                                                           &inverted_index_cache_handle);
248
        }
249
250
        auto searcher_variant = inverted_index_cache_handle.get_index_searcher();
251
        auto index_searcher = std::get<FulltextIndexSearcherPtr>(searcher_variant);
252
        auto* index_reader = index_searcher->getReader();
253
#endif
254
255
0
        total_seg_num_docs = std::max(total_seg_num_docs, index_reader->maxDoc());
256
0
        _total_num_tokens[ws_field_name] +=
257
0
                index_reader->sumTotalTermFreq(ws_field_name.c_str()).value_or(0);
258
259
0
        for (const auto& term_info : collect_info.term_infos) {
260
0
            auto iter = TermIterator::create(io_ctx, false, index_reader, ws_field_name,
261
0
                                             term_info.get_single_term());
262
0
            _term_doc_freqs[ws_field_name][iter->term()] += iter->doc_freq();
263
0
        }
264
0
    }
265
0
    _total_num_docs += total_seg_num_docs;
266
0
    return Status::OK();
267
0
}
268
269
uint64_t CollectionStatistics::get_term_doc_freq_by_col(const std::wstring& lucene_col_name,
270
22
                                                        const std::wstring& term) {
271
22
    if (!_term_doc_freqs.contains(lucene_col_name)) {
272
1
        throw Exception(ErrorCode::INVERTED_INDEX_CLUCENE_ERROR,
273
1
                        "Index statistics collection failed: Not such column {}",
274
1
                        StringHelper::to_string(lucene_col_name));
275
1
    }
276
277
21
    if (!_term_doc_freqs[lucene_col_name].contains(term)) {
278
0
        throw Exception(ErrorCode::INVERTED_INDEX_CLUCENE_ERROR,
279
0
                        "Index statistics collection failed: Not such term {}",
280
0
                        StringHelper::to_string(term));
281
0
    }
282
283
21
    return _term_doc_freqs[lucene_col_name][term];
284
21
}
285
286
12
uint64_t CollectionStatistics::get_total_term_cnt_by_col(const std::wstring& lucene_col_name) {
287
12
    if (!_total_num_tokens.contains(lucene_col_name)) {
288
2
        throw Exception(ErrorCode::INVERTED_INDEX_CLUCENE_ERROR,
289
2
                        "Index statistics collection failed: Not such column {}",
290
2
                        StringHelper::to_string(lucene_col_name));
291
2
    }
292
293
10
    return _total_num_tokens[lucene_col_name];
294
12
}
295
296
31
uint64_t CollectionStatistics::get_doc_num() const {
297
31
    if (_total_num_docs == 0) {
298
3
        throw Exception(
299
3
                ErrorCode::INVERTED_INDEX_CLUCENE_ERROR,
300
3
                "Index statistics collection failed: No data available for SimilarityCollector");
301
3
    }
302
303
28
    return _total_num_docs;
304
31
}
305
306
11
float CollectionStatistics::get_or_calculate_avg_dl(const std::wstring& lucene_col_name) {
307
11
    auto iter = _avg_dl_by_col.find(lucene_col_name);
308
11
    if (iter != _avg_dl_by_col.end()) {
309
2
        return iter->second;
310
2
    }
311
312
9
    const uint64_t total_term_cnt = get_total_term_cnt_by_col(lucene_col_name);
313
9
    const uint64_t total_doc_cnt = get_doc_num();
314
9
    float avg_dl = total_doc_cnt > 0 ? float((double)total_term_cnt / (double)total_doc_cnt) : 0.0F;
315
9
    _avg_dl_by_col[lucene_col_name] = avg_dl;
316
9
    return avg_dl;
317
11
}
318
319
float CollectionStatistics::get_or_calculate_idf(const std::wstring& lucene_col_name,
320
21
                                                 const std::wstring& term) {
321
21
    auto iter = _idf_by_col_term.find(lucene_col_name);
322
21
    if (iter != _idf_by_col_term.end()) {
323
12
        auto term_iter = iter->second.find(term);
324
12
        if (term_iter != iter->second.end()) {
325
1
            return term_iter->second;
326
1
        }
327
12
    }
328
329
20
    const uint64_t doc_num = get_doc_num();
330
20
    const uint64_t doc_freq = get_term_doc_freq_by_col(lucene_col_name, term);
331
20
    auto idf = (float)std::log(1 + ((double)doc_num - (double)doc_freq + (double)0.5) /
332
20
                                           ((double)doc_freq + (double)0.5));
333
20
    _idf_by_col_term[lucene_col_name][term] = idf;
334
20
    return idf;
335
21
}
336
337
#include "common/compile_check_end.h"
338
} // namespace doris