Coverage Report

Created: 2026-03-13 09:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/compaction/collection_statistics.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/compaction/collection_statistics.h"
19
20
#include <set>
21
#include <sstream>
22
23
#include "common/exception.h"
24
#include "exprs/vexpr.h"
25
#include "exprs/vexpr_context.h"
26
#include "exprs/vliteral.h"
27
#include "exprs/vslot_ref.h"
28
#include "storage/index/index_file_reader.h"
29
#include "storage/index/index_reader_helper.h"
30
#include "storage/index/inverted/analyzer/analyzer.h"
31
#include "storage/index/inverted/util/string_helper.h"
32
#include "storage/rowset/rowset.h"
33
#include "storage/rowset/rowset_reader.h"
34
#include "util/uid_util.h"
35
36
namespace doris {
37
#include "common/compile_check_begin.h"
38
39
Status CollectionStatistics::collect(RuntimeState* state,
40
                                     const std::vector<RowSetSplits>& rs_splits,
41
                                     const TabletSchemaSPtr& tablet_schema,
42
                                     const VExprContextSPtrs& common_expr_ctxs_push_down,
43
24
                                     io::IOContext* io_ctx) {
44
24
    std::unordered_map<std::wstring, CollectInfo> collect_infos;
45
24
    RETURN_IF_ERROR(
46
24
            extract_collect_info(state, common_expr_ctxs_push_down, tablet_schema, &collect_infos));
47
24
    if (collect_infos.empty()) {
48
10
        LOG(WARNING) << "Index statistics collection: no collect info extracted.";
49
10
        return Status::OK();
50
10
    }
51
52
48
    for (const auto& rs_split : rs_splits) {
53
48
        const auto& rs_reader = rs_split.rs_reader;
54
48
        auto rowset = rs_reader->rowset();
55
48
        auto num_segments = rowset->num_segments();
56
83
        for (int32_t seg_id = 0; seg_id < num_segments; ++seg_id) {
57
35
            auto status =
58
35
                    process_segment(rowset, seg_id, tablet_schema.get(), collect_infos, io_ctx);
59
35
            if (!status.ok()) {
60
0
                if (status.code() == ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND ||
61
0
                    status.code() == ErrorCode::INVERTED_INDEX_BYPASS) {
62
0
                    LOG(ERROR) << "Index statistics collection failed: " << status.to_string();
63
0
                } else {
64
0
                    return status;
65
0
                }
66
0
            }
67
35
        }
68
48
    }
69
70
    // Build a single-line log with query_id, tablet_ids, and per-field term statistics
71
14
    if (VLOG_IS_ON(1)) {
72
0
        std::set<int64_t> tablet_ids;
73
0
        for (const auto& rs_split : rs_splits) {
74
0
            if (rs_split.rs_reader && rs_split.rs_reader->rowset()) {
75
0
                tablet_ids.insert(rs_split.rs_reader->rowset()->rowset_meta()->tablet_id());
76
0
            }
77
0
        }
78
79
0
        std::ostringstream oss;
80
0
        oss << "CollectionStatistics: query_id=" << print_id(state->query_id());
81
82
0
        oss << ", tablet_ids=[";
83
0
        bool first_tablet = true;
84
0
        for (int64_t tid : tablet_ids) {
85
0
            if (!first_tablet) oss << ",";
86
0
            oss << tid;
87
0
            first_tablet = false;
88
0
        }
89
0
        oss << "]";
90
91
0
        oss << ", total_num_docs=" << _total_num_docs;
92
93
0
        for (const auto& [ws_field_name, num_tokens] : _total_num_tokens) {
94
0
            oss << ", {field=" << StringHelper::to_string(ws_field_name)
95
0
                << ", num_tokens=" << num_tokens << ", terms=[";
96
97
0
            bool first_term = true;
98
0
            for (const auto& [term, doc_freq] : _term_doc_freqs.at(ws_field_name)) {
99
0
                if (!first_term) oss << ", ";
100
0
                oss << "(" << StringHelper::to_string(term) << ":" << doc_freq << ")";
101
0
                first_term = false;
102
0
            }
103
0
            oss << "]}";
104
0
        }
105
106
0
        VLOG(1) << oss.str();
107
0
    }
108
109
14
    return Status::OK();
110
14
}
111
112
28
VSlotRef* find_slot_ref(const VExprSPtr& expr) {
113
28
    if (!expr) return nullptr;
114
27
    auto cur = VExpr::expr_without_cast(expr);
115
27
    if (cur->node_type() == TExprNodeType::SLOT_REF) {
116
26
        return static_cast<VSlotRef*>(cur.get());
117
26
    }
118
1
    for (auto& ch : cur->children()) {
119
1
        if (auto* s = find_slot_ref(ch)) return s;
120
1
    }
121
0
    return nullptr;
122
1
}
123
124
Status handle_match_pred(RuntimeState* state, const TabletSchemaSPtr& tablet_schema,
125
                         const VExprSPtr& expr,
126
14
                         std::unordered_map<std::wstring, CollectInfo>* collect_infos) {
127
14
    auto* left_slot_ref = find_slot_ref(expr->children()[0]);
128
14
    if (left_slot_ref == nullptr) {
129
0
        return Status::Error<ErrorCode::INVERTED_INDEX_NOT_SUPPORTED>(
130
0
                "Index statistics collection failed: Cannot find slot reference in match predicate "
131
0
                "left expression");
132
0
    }
133
14
    auto* right_literal = static_cast<VLiteral*>(expr->children()[1].get());
134
14
    DCHECK(right_literal != nullptr);
135
136
14
    const auto* sd = state->desc_tbl().get_slot_descriptor(left_slot_ref->slot_id());
137
14
    if (sd == nullptr) {
138
0
        return Status::Error<ErrorCode::INVERTED_INDEX_NOT_SUPPORTED>(
139
0
                "Index statistics collection failed: Cannot find slot descriptor for slot_id={}",
140
0
                left_slot_ref->slot_id());
141
0
    }
142
14
    int32_t col_idx = tablet_schema->field_index(left_slot_ref->column_name());
143
14
    if (col_idx == -1) {
144
0
        return Status::Error<ErrorCode::INVERTED_INDEX_NOT_SUPPORTED>(
145
0
                "Index statistics collection failed: Cannot find column index for column={}",
146
0
                left_slot_ref->column_name());
147
0
    }
148
149
14
    const auto& column = tablet_schema->column(col_idx);
150
14
    auto index_metas = tablet_schema->inverted_indexs(sd->col_unique_id(), column.suffix_path());
151
14
#ifndef BE_TEST
152
14
    if (index_metas.empty()) {
153
0
        return Status::Error<ErrorCode::INVERTED_INDEX_NOT_SUPPORTED>(
154
0
                "Index statistics collection failed: Score query is not supported without inverted "
155
0
                "index for column={}",
156
0
                left_slot_ref->column_name());
157
0
    }
158
14
#endif
159
160
14
    auto format_options = DataTypeSerDe::get_default_format_options();
161
14
    format_options.timezone = &state->timezone_obj();
162
14
    for (const auto* index_meta : index_metas) {
163
14
        if (!InvertedIndexAnalyzer::should_analyzer(index_meta->properties())) {
164
0
            continue;
165
0
        }
166
14
        if (!segment_v2::IndexReaderHelper::is_need_similarity_score(expr->op(), index_meta)) {
167
0
            continue;
168
0
        }
169
170
14
        auto term_infos = InvertedIndexAnalyzer::get_analyse_result(
171
14
                right_literal->value(format_options), index_meta->properties());
172
14
        if (term_infos.empty()) {
173
0
            LOG(WARNING) << "Index statistics collection: no terms extracted from literal value, "
174
0
                         << "col_unique_id=" << index_meta->col_unique_ids()[0];
175
0
            continue;
176
0
        }
177
178
14
        std::string field_name = std::to_string(index_meta->col_unique_ids()[0]);
179
14
        if (!column.suffix_path().empty()) {
180
0
            field_name += "." + column.suffix_path();
181
0
        }
182
14
        std::wstring ws_field_name = StringHelper::to_wstring(field_name);
183
14
        auto iter = collect_infos->find(ws_field_name);
184
14
        if (iter == collect_infos->end()) {
185
14
            CollectInfo collect_info;
186
14
            collect_info.term_infos.insert(term_infos.begin(), term_infos.end());
187
14
            collect_info.index_meta = index_meta;
188
14
            (*collect_infos)[ws_field_name] = std::move(collect_info);
189
14
        } else {
190
0
            iter->second.term_infos.insert(term_infos.begin(), term_infos.end());
191
0
        }
192
14
    }
193
14
    return Status::OK();
194
14
}
195
196
Status CollectionStatistics::extract_collect_info(
197
        RuntimeState* state, const VExprContextSPtrs& common_expr_ctxs_push_down,
198
        const TabletSchemaSPtr& tablet_schema,
199
24
        std::unordered_map<std::wstring, CollectInfo>* collect_infos) {
200
24
    for (const auto& root_expr_ctx : common_expr_ctxs_push_down) {
201
24
        const auto& root_expr = root_expr_ctx->root();
202
24
        if (root_expr == nullptr) {
203
0
            continue;
204
0
        }
205
206
24
        std::stack<VExprSPtr> stack;
207
24
        stack.emplace(root_expr);
208
209
52
        while (!stack.empty()) {
210
28
            const auto& expr = stack.top();
211
28
            stack.pop();
212
213
28
            if (expr->node_type() == TExprNodeType::MATCH_PRED) {
214
23
                RETURN_IF_ERROR(handle_match_pred(state, tablet_schema, expr, collect_infos));
215
23
            }
216
217
28
            const auto& children = expr->children();
218
79
            for (int32_t i = static_cast<int32_t>(children.size()) - 1; i >= 0; --i) {
219
51
                if (!children[i]->children().empty()) {
220
4
                    stack.emplace(children[i]);
221
4
                }
222
51
            }
223
28
        }
224
24
    }
225
24
    return Status::OK();
226
24
}
227
228
Status CollectionStatistics::process_segment(
229
        const RowsetSharedPtr& rowset, int32_t seg_id, const TabletSchema* tablet_schema,
230
35
        const std::unordered_map<std::wstring, CollectInfo>& collect_infos, io::IOContext* io_ctx) {
231
35
    auto seg_path = DORIS_TRY(rowset->segment_path(seg_id));
232
35
    auto rowset_meta = rowset->rowset_meta();
233
234
35
    auto idx_file_reader = std::make_unique<IndexFileReader>(
235
35
            rowset_meta->fs(),
236
35
            std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)},
237
35
            tablet_schema->get_inverted_index_storage_format(),
238
35
            rowset_meta->inverted_index_file_info(seg_id));
239
35
    RETURN_IF_ERROR(idx_file_reader->init(config::inverted_index_read_buffer_size, io_ctx));
240
241
35
    int32_t total_seg_num_docs = 0;
242
35
    for (const auto& [ws_field_name, collect_info] : collect_infos) {
243
#ifdef BE_TEST
244
        auto compound_reader = DORIS_TRY(idx_file_reader->open(collect_info.index_meta, io_ctx));
245
        auto* reader = lucene::index::IndexReader::open(compound_reader.get());
246
        auto index_searcher = std::make_shared<lucene::search::IndexSearcher>(reader, true);
247
248
        auto* index_reader = index_searcher->getReader();
249
#else
250
35
        InvertedIndexCacheHandle inverted_index_cache_handle;
251
35
        auto index_file_key = idx_file_reader->get_index_file_cache_key(collect_info.index_meta);
252
35
        InvertedIndexSearcherCache::CacheKey searcher_cache_key(index_file_key);
253
35
        if (!InvertedIndexSearcherCache::instance()->lookup(searcher_cache_key,
254
35
                                                            &inverted_index_cache_handle)) {
255
4
            auto compound_reader =
256
4
                    DORIS_TRY(idx_file_reader->open(collect_info.index_meta, io_ctx));
257
4
            auto* reader = lucene::index::IndexReader::open(compound_reader.get());
258
4
            size_t reader_size = reader->getTermInfosRAMUsed();
259
4
            auto index_searcher = std::make_shared<lucene::search::IndexSearcher>(reader, true);
260
4
            auto* cache_value = new InvertedIndexSearcherCache::CacheValue(
261
4
                    std::move(index_searcher), reader_size, UnixMillis());
262
4
            InvertedIndexSearcherCache::instance()->insert(searcher_cache_key, cache_value,
263
4
                                                           &inverted_index_cache_handle);
264
4
        }
265
266
35
        auto searcher_variant = inverted_index_cache_handle.get_index_searcher();
267
35
        auto index_searcher = std::get<FulltextIndexSearcherPtr>(searcher_variant);
268
35
        auto* index_reader = index_searcher->getReader();
269
35
#endif
270
271
35
        total_seg_num_docs = std::max(total_seg_num_docs, index_reader->maxDoc());
272
35
        _total_num_tokens[ws_field_name] +=
273
35
                index_reader->sumTotalTermFreq(ws_field_name.c_str()).value_or(0);
274
275
49
        for (const auto& term_info : collect_info.term_infos) {
276
49
            auto iter = TermIterator::create(io_ctx, false, index_reader, ws_field_name,
277
49
                                             term_info.get_single_term());
278
49
            _term_doc_freqs[ws_field_name][iter->term()] += iter->doc_freq();
279
49
        }
280
35
    }
281
35
    _total_num_docs += total_seg_num_docs;
282
35
    return Status::OK();
283
35
}
284
285
uint64_t CollectionStatistics::get_term_doc_freq_by_col(const std::wstring& lucene_col_name,
286
48
                                                        const std::wstring& term) {
287
48
    if (!_term_doc_freqs.contains(lucene_col_name)) {
288
1
        throw Exception(ErrorCode::INVERTED_INDEX_CLUCENE_ERROR,
289
1
                        "Index statistics collection failed: Not such column {}",
290
1
                        StringHelper::to_string(lucene_col_name));
291
1
    }
292
293
47
    if (!_term_doc_freqs[lucene_col_name].contains(term)) {
294
0
        throw Exception(ErrorCode::INVERTED_INDEX_CLUCENE_ERROR,
295
0
                        "Index statistics collection failed: Not such term {}",
296
0
                        StringHelper::to_string(term));
297
0
    }
298
299
47
    return _term_doc_freqs[lucene_col_name][term];
300
47
}
301
302
27
uint64_t CollectionStatistics::get_total_term_cnt_by_col(const std::wstring& lucene_col_name) {
303
27
    if (!_total_num_tokens.contains(lucene_col_name)) {
304
2
        throw Exception(ErrorCode::INVERTED_INDEX_CLUCENE_ERROR,
305
2
                        "Index statistics collection failed: Not such column {}",
306
2
                        StringHelper::to_string(lucene_col_name));
307
2
    }
308
309
25
    return _total_num_tokens[lucene_col_name];
310
27
}
311
312
72
uint64_t CollectionStatistics::get_doc_num() const {
313
72
    if (_total_num_docs == 0) {
314
3
        throw Exception(
315
3
                ErrorCode::INVERTED_INDEX_CLUCENE_ERROR,
316
3
                "Index statistics collection failed: No data available for SimilarityCollector");
317
3
    }
318
319
69
    return _total_num_docs;
320
72
}
321
322
59
float CollectionStatistics::get_or_calculate_avg_dl(const std::wstring& lucene_col_name) {
323
59
    auto iter = _avg_dl_by_col.find(lucene_col_name);
324
59
    if (iter != _avg_dl_by_col.end()) {
325
35
        return iter->second;
326
35
    }
327
328
24
    const uint64_t total_term_cnt = get_total_term_cnt_by_col(lucene_col_name);
329
24
    const uint64_t total_doc_cnt = get_doc_num();
330
24
    float avg_dl = total_doc_cnt > 0 ? float((double)total_term_cnt / (double)total_doc_cnt) : 0.0F;
331
24
    _avg_dl_by_col[lucene_col_name] = avg_dl;
332
24
    return avg_dl;
333
59
}
334
335
float CollectionStatistics::get_or_calculate_idf(const std::wstring& lucene_col_name,
336
71
                                                 const std::wstring& term) {
337
71
    auto iter = _idf_by_col_term.find(lucene_col_name);
338
71
    if (iter != _idf_by_col_term.end()) {
339
47
        auto term_iter = iter->second.find(term);
340
47
        if (term_iter != iter->second.end()) {
341
25
            return term_iter->second;
342
25
        }
343
47
    }
344
345
46
    const uint64_t doc_num = get_doc_num();
346
46
    const uint64_t doc_freq = get_term_doc_freq_by_col(lucene_col_name, term);
347
46
    auto idf = (float)std::log(1 + ((double)doc_num - (double)doc_freq + (double)0.5) /
348
46
                                           ((double)doc_freq + (double)0.5));
349
46
    _idf_by_col_term[lucene_col_name][term] = idf;
350
46
    return idf;
351
71
}
352
353
#include "common/compile_check_end.h"
354
} // namespace doris