Coverage Report

Created: 2026-03-19 11:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/compaction/collection_statistics.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/compaction/collection_statistics.h"
19
20
#include <set>
21
#include <sstream>
22
23
#include "common/exception.h"
24
#include "exprs/vexpr.h"
25
#include "exprs/vexpr_context.h"
26
#include "exprs/vliteral.h"
27
#include "exprs/vslot_ref.h"
28
#include "storage/index/index_file_reader.h"
29
#include "storage/index/index_reader_helper.h"
30
#include "storage/index/inverted/analyzer/analyzer.h"
31
#include "storage/index/inverted/util/string_helper.h"
32
#include "storage/index/inverted/util/term_iterator.h"
33
#include "storage/rowset/rowset.h"
34
#include "storage/rowset/rowset_reader.h"
35
#include "util/uid_util.h"
36
37
namespace doris {
38
#include "common/compile_check_begin.h"
39
40
Status CollectionStatistics::collect(RuntimeState* state,
41
                                     const std::vector<RowSetSplits>& rs_splits,
42
                                     const TabletSchemaSPtr& tablet_schema,
43
                                     const VExprContextSPtrs& common_expr_ctxs_push_down,
44
24
                                     io::IOContext* io_ctx) {
45
24
    std::unordered_map<std::wstring, CollectInfo> collect_infos;
46
24
    RETURN_IF_ERROR(
47
24
            extract_collect_info(state, common_expr_ctxs_push_down, tablet_schema, &collect_infos));
48
24
    if (collect_infos.empty()) {
49
10
        LOG(WARNING) << "Index statistics collection: no collect info extracted.";
50
10
        return Status::OK();
51
10
    }
52
53
70
    for (const auto& rs_split : rs_splits) {
54
70
        const auto& rs_reader = rs_split.rs_reader;
55
70
        auto rowset = rs_reader->rowset();
56
70
        auto num_segments = rowset->num_segments();
57
133
        for (int32_t seg_id = 0; seg_id < num_segments; ++seg_id) {
58
63
            auto status =
59
63
                    process_segment(rowset, seg_id, tablet_schema.get(), collect_infos, io_ctx);
60
63
            if (!status.ok()) {
61
0
                if (status.code() == ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND ||
62
0
                    status.code() == ErrorCode::INVERTED_INDEX_BYPASS) {
63
0
                    LOG(ERROR) << "Index statistics collection failed: " << status.to_string();
64
0
                } else {
65
0
                    return status;
66
0
                }
67
0
            }
68
63
        }
69
70
    }
70
71
    // Build a single-line log with query_id, tablet_ids, and per-field term statistics
72
14
    if (VLOG_IS_ON(1)) {
73
0
        std::set<int64_t> tablet_ids;
74
0
        for (const auto& rs_split : rs_splits) {
75
0
            if (rs_split.rs_reader && rs_split.rs_reader->rowset()) {
76
0
                tablet_ids.insert(rs_split.rs_reader->rowset()->rowset_meta()->tablet_id());
77
0
            }
78
0
        }
79
80
0
        std::ostringstream oss;
81
0
        oss << "CollectionStatistics: query_id=" << print_id(state->query_id());
82
83
0
        oss << ", tablet_ids=[";
84
0
        bool first_tablet = true;
85
0
        for (int64_t tid : tablet_ids) {
86
0
            if (!first_tablet) oss << ",";
87
0
            oss << tid;
88
0
            first_tablet = false;
89
0
        }
90
0
        oss << "]";
91
92
0
        oss << ", total_num_docs=" << _total_num_docs;
93
94
0
        for (const auto& [ws_field_name, num_tokens] : _total_num_tokens) {
95
0
            oss << ", {field=" << StringHelper::to_string(ws_field_name)
96
0
                << ", num_tokens=" << num_tokens << ", terms=[";
97
98
0
            bool first_term = true;
99
0
            for (const auto& [term, doc_freq] : _term_doc_freqs.at(ws_field_name)) {
100
0
                if (!first_term) oss << ", ";
101
0
                oss << "(" << StringHelper::to_string(term) << ":" << doc_freq << ")";
102
0
                first_term = false;
103
0
            }
104
0
            oss << "]}";
105
0
        }
106
107
0
        VLOG(1) << oss.str();
108
0
    }
109
110
14
    return Status::OK();
111
14
}
112
113
Status CollectionStatistics::extract_collect_info(
114
        RuntimeState* state, const VExprContextSPtrs& common_expr_ctxs_push_down,
115
24
        const TabletSchemaSPtr& tablet_schema, CollectInfoMap* collect_infos) {
116
24
    DCHECK(collect_infos != nullptr);
117
118
24
    std::unordered_map<TExprNodeType::type, PredicateCollectorPtr> collectors;
119
24
    collectors[TExprNodeType::MATCH_PRED] = std::make_unique<MatchPredicateCollector>();
120
24
    collectors[TExprNodeType::SEARCH_EXPR] = std::make_unique<SearchPredicateCollector>();
121
122
24
    for (const auto& root_expr_ctx : common_expr_ctxs_push_down) {
123
24
        const auto& root_expr = root_expr_ctx->root();
124
24
        if (root_expr == nullptr) {
125
0
            continue;
126
0
        }
127
128
24
        std::stack<VExprSPtr> stack;
129
24
        stack.emplace(root_expr);
130
131
99
        while (!stack.empty()) {
132
75
            auto expr = stack.top();
133
75
            stack.pop();
134
135
75
            if (!expr) {
136
0
                continue;
137
0
            }
138
139
75
            auto collector_it = collectors.find(expr->node_type());
140
75
            if (collector_it != collectors.end()) {
141
23
                RETURN_IF_ERROR(
142
23
                        collector_it->second->collect(state, tablet_schema, expr, collect_infos));
143
23
            }
144
145
75
            const auto& children = expr->children();
146
75
            for (const auto& child : children) {
147
51
                stack.push(child);
148
51
            }
149
75
        }
150
24
    }
151
152
24
    LOG(INFO) << "Extracted collect info for " << collect_infos->size() << " fields";
153
154
24
    return Status::OK();
155
24
}
156
157
Status CollectionStatistics::process_segment(const RowsetSharedPtr& rowset, int32_t seg_id,
158
                                             const TabletSchema* tablet_schema,
159
                                             const CollectInfoMap& collect_infos,
160
63
                                             io::IOContext* io_ctx) {
161
63
    auto seg_path = DORIS_TRY(rowset->segment_path(seg_id));
162
63
    auto rowset_meta = rowset->rowset_meta();
163
164
63
    auto idx_file_reader = std::make_unique<IndexFileReader>(
165
63
            rowset_meta->fs(),
166
63
            std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)},
167
63
            tablet_schema->get_inverted_index_storage_format(),
168
63
            rowset_meta->inverted_index_file_info(seg_id));
169
63
    RETURN_IF_ERROR(idx_file_reader->init(config::inverted_index_read_buffer_size, io_ctx));
170
171
63
    int32_t total_seg_num_docs = 0;
172
173
63
    for (const auto& [ws_field_name, collect_info] : collect_infos) {
174
63
        lucene::search::IndexSearcher* index_searcher = nullptr;
175
63
        lucene::index::IndexReader* index_reader = nullptr;
176
177
#ifdef BE_TEST
178
        auto compound_reader = DORIS_TRY(idx_file_reader->open(collect_info.index_meta, io_ctx));
179
        auto* reader = lucene::index::IndexReader::open(compound_reader.get());
180
        auto searcher_ptr = std::make_shared<lucene::search::IndexSearcher>(reader, true);
181
        index_searcher = searcher_ptr.get();
182
        index_reader = index_searcher->getReader();
183
#else
184
63
        InvertedIndexCacheHandle inverted_index_cache_handle;
185
63
        auto index_file_key = idx_file_reader->get_index_file_cache_key(collect_info.index_meta);
186
63
        InvertedIndexSearcherCache::CacheKey searcher_cache_key(index_file_key);
187
188
63
        if (!InvertedIndexSearcherCache::instance()->lookup(searcher_cache_key,
189
63
                                                            &inverted_index_cache_handle)) {
190
9
            auto compound_reader =
191
9
                    DORIS_TRY(idx_file_reader->open(collect_info.index_meta, io_ctx));
192
9
            auto* reader = lucene::index::IndexReader::open(compound_reader.get());
193
9
            size_t reader_size = reader->getTermInfosRAMUsed();
194
9
            auto searcher_ptr = std::make_shared<lucene::search::IndexSearcher>(reader, true);
195
9
            auto* cache_value = new InvertedIndexSearcherCache::CacheValue(
196
9
                    std::move(searcher_ptr), reader_size, UnixMillis());
197
9
            InvertedIndexSearcherCache::instance()->insert(searcher_cache_key, cache_value,
198
9
                                                           &inverted_index_cache_handle);
199
9
        }
200
201
63
        auto searcher_variant = inverted_index_cache_handle.get_index_searcher();
202
63
        auto index_searcher_ptr = std::get<FulltextIndexSearcherPtr>(searcher_variant);
203
63
        index_searcher = index_searcher_ptr.get();
204
63
        index_reader = index_searcher->getReader();
205
63
#endif
206
63
        total_seg_num_docs = std::max(total_seg_num_docs, index_reader->maxDoc());
207
208
63
        _total_num_tokens[ws_field_name] +=
209
63
                index_reader->sumTotalTermFreq(ws_field_name.c_str()).value_or(0);
210
211
81
        for (const auto& term_info : collect_info.term_infos) {
212
81
            auto iter = TermIterator::create(io_ctx, false, index_reader, ws_field_name,
213
81
                                             term_info.get_single_term());
214
81
            _term_doc_freqs[ws_field_name][iter->term()] += iter->doc_freq();
215
81
        }
216
63
    }
217
218
63
    _total_num_docs += total_seg_num_docs;
219
220
63
    return Status::OK();
221
63
}
222
223
uint64_t CollectionStatistics::get_term_doc_freq_by_col(const std::wstring& lucene_col_name,
224
48
                                                        const std::wstring& term) {
225
48
    if (!_term_doc_freqs.contains(lucene_col_name)) {
226
1
        throw Exception(ErrorCode::INVERTED_INDEX_CLUCENE_ERROR,
227
1
                        "Index statistics collection failed: Not such column {}",
228
1
                        StringHelper::to_string(lucene_col_name));
229
1
    }
230
231
47
    if (!_term_doc_freqs[lucene_col_name].contains(term)) {
232
0
        throw Exception(ErrorCode::INVERTED_INDEX_CLUCENE_ERROR,
233
0
                        "Index statistics collection failed: Not such term {}",
234
0
                        StringHelper::to_string(term));
235
0
    }
236
237
47
    return _term_doc_freqs[lucene_col_name][term];
238
47
}
239
240
27
uint64_t CollectionStatistics::get_total_term_cnt_by_col(const std::wstring& lucene_col_name) {
241
27
    if (!_total_num_tokens.contains(lucene_col_name)) {
242
2
        throw Exception(ErrorCode::INVERTED_INDEX_CLUCENE_ERROR,
243
2
                        "Index statistics collection failed: Not such column {}",
244
2
                        StringHelper::to_string(lucene_col_name));
245
2
    }
246
247
25
    return _total_num_tokens[lucene_col_name];
248
27
}
249
250
72
uint64_t CollectionStatistics::get_doc_num() const {
251
72
    if (_total_num_docs == 0) {
252
3
        throw Exception(
253
3
                ErrorCode::INVERTED_INDEX_CLUCENE_ERROR,
254
3
                "Index statistics collection failed: No data available for SimilarityCollector");
255
3
    }
256
257
69
    return _total_num_docs;
258
72
}
259
260
91
float CollectionStatistics::get_or_calculate_avg_dl(const std::wstring& lucene_col_name) {
261
91
    auto iter = _avg_dl_by_col.find(lucene_col_name);
262
91
    if (iter != _avg_dl_by_col.end()) {
263
67
        return iter->second;
264
67
    }
265
266
24
    const uint64_t total_term_cnt = get_total_term_cnt_by_col(lucene_col_name);
267
24
    const uint64_t total_doc_cnt = get_doc_num();
268
24
    float avg_dl = total_doc_cnt > 0 ? float((double)total_term_cnt / (double)total_doc_cnt) : 0.0F;
269
24
    _avg_dl_by_col[lucene_col_name] = avg_dl;
270
24
    return avg_dl;
271
91
}
272
273
float CollectionStatistics::get_or_calculate_idf(const std::wstring& lucene_col_name,
274
103
                                                 const std::wstring& term) {
275
103
    auto iter = _idf_by_col_term.find(lucene_col_name);
276
103
    if (iter != _idf_by_col_term.end()) {
277
79
        auto term_iter = iter->second.find(term);
278
79
        if (term_iter != iter->second.end()) {
279
57
            return term_iter->second;
280
57
        }
281
79
    }
282
283
46
    const uint64_t doc_num = get_doc_num();
284
46
    const uint64_t doc_freq = get_term_doc_freq_by_col(lucene_col_name, term);
285
46
    auto idf = (float)std::log(1 + ((double)doc_num - (double)doc_freq + (double)0.5) /
286
46
                                           ((double)doc_freq + (double)0.5));
287
46
    _idf_by_col_term[lucene_col_name][term] = idf;
288
46
    return idf;
289
103
}
290
291
#include "common/compile_check_end.h"
292
} // namespace doris