Coverage Report

Created: 2026-03-12 14:02

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/olap_scanner.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/olap_scanner.h"
19
20
#include <gen_cpp/Descriptors_types.h>
21
#include <gen_cpp/PlanNodes_types.h>
22
#include <gen_cpp/Types_types.h>
23
#include <glog/logging.h>
24
#include <stdlib.h>
25
#include <thrift/protocol/TDebugProtocol.h>
26
27
#include <algorithm>
28
#include <atomic>
29
#include <iterator>
30
#include <ostream>
31
#include <set>
32
33
#include "cloud/cloud_storage_engine.h"
34
#include "cloud/cloud_tablet_hotspot.h"
35
#include "cloud/config.h"
36
#include "common/config.h"
37
#include "common/consts.h"
38
#include "common/logging.h"
39
#include "common/metrics/doris_metrics.h"
40
#include "core/block/block.h"
41
#include "exec/common/variant_util.h"
42
#include "exec/operator/olap_scan_operator.h"
43
#include "exec/scan/scan_node.h"
44
#include "exprs/function_filter.h"
45
#include "exprs/vexpr.h"
46
#include "exprs/vexpr_context.h"
47
#include "io/cache/block_file_cache_profile.h"
48
#include "io/io_common.h"
49
#include "runtime/descriptors.h"
50
#include "runtime/exec_env.h"
51
#include "runtime/runtime_profile.h"
52
#include "runtime/runtime_state.h"
53
#include "service/backend_options.h"
54
#include "storage/cache/schema_cache.h"
55
#include "storage/id_manager.h"
56
#include "storage/index/inverted/inverted_index_profile.h"
57
#include "storage/iterator/block_reader.h"
58
#include "storage/olap_common.h"
59
#include "storage/olap_tuple.h"
60
#include "storage/olap_utils.h"
61
#include "storage/storage_engine.h"
62
#include "storage/tablet/tablet_schema.h"
63
#include "util/json/path_in_data.h"
64
65
namespace doris {
66
#include "common/compile_check_avoid_begin.h"
67
68
using ReadSource = TabletReadSource;
69
70
OlapScanner::OlapScanner(ScanLocalStateBase* parent, OlapScanner::Params&& params)
71
319k
        : Scanner(params.state, parent, params.limit, params.profile),
72
319k
          _key_ranges(std::move(params.key_ranges)),
73
319k
          _tablet_reader_params({.tablet = std::move(params.tablet),
74
319k
                                 .tablet_schema {},
75
319k
                                 .aggregation = params.aggregation,
76
319k
                                 .version = {0, params.version},
77
319k
                                 .start_key {},
78
319k
                                 .end_key {},
79
319k
                                 .predicates {},
80
319k
                                 .function_filters {},
81
319k
                                 .delete_predicates {},
82
319k
                                 .target_cast_type_for_variants {},
83
319k
                                 .all_access_paths {},
84
319k
                                 .predicate_access_paths {},
85
319k
                                 .rs_splits {},
86
319k
                                 .return_columns {},
87
319k
                                 .output_columns {},
88
319k
                                 .remaining_conjunct_roots {},
89
319k
                                 .common_expr_ctxs_push_down {},
90
319k
                                 .topn_filter_source_node_ids {},
91
319k
                                 .filter_block_conjuncts {},
92
319k
                                 .key_group_cluster_key_idxes {},
93
319k
                                 .virtual_column_exprs {},
94
319k
                                 .vir_cid_to_idx_in_block {},
95
319k
                                 .vir_col_idx_to_type {},
96
319k
                                 .score_runtime {},
97
319k
                                 .collection_statistics {},
98
319k
                                 .ann_topn_runtime {},
99
319k
                                 .condition_cache_digest = parent->get_condition_cache_digest()}) {
100
319k
    _tablet_reader_params.set_read_source(std::move(params.read_source),
101
319k
                                          _state->skip_delete_bitmap());
102
319k
    _has_prepared = false;
103
319k
    _vector_search_params = params.state->get_vector_search_params();
104
319k
}
105
106
static std::string read_columns_to_string(TabletSchemaSPtr tablet_schema,
107
0
                                          const std::vector<uint32_t>& read_columns) {
108
    // avoid too long for one line,
109
    // it is hard to display in `show profile` stmt if one line is too long.
110
0
    const int col_per_line = 10;
111
0
    int i = 0;
112
0
    std::string read_columns_string;
113
0
    read_columns_string += "[";
114
0
    for (auto it = read_columns.cbegin(); it != read_columns.cend(); it++) {
115
0
        if (it != read_columns.cbegin()) {
116
0
            read_columns_string += ", ";
117
0
        }
118
0
        read_columns_string += tablet_schema->columns().at(*it)->name();
119
0
        if (i >= col_per_line) {
120
0
            read_columns_string += "\n";
121
0
            i = 0;
122
0
        } else {
123
0
            ++i;
124
0
        }
125
0
    }
126
0
    read_columns_string += "]";
127
0
    return read_columns_string;
128
0
}
129
130
319k
Status OlapScanner::prepare() {
131
319k
    auto* local_state = static_cast<OlapScanLocalState*>(_local_state);
132
319k
    auto& tablet = _tablet_reader_params.tablet;
133
319k
    auto& tablet_schema = _tablet_reader_params.tablet_schema;
134
319k
    DBUG_EXECUTE_IF("CloudTablet.capture_rs_readers.return.e-230", {
135
319k
        LOG_WARNING("CloudTablet.capture_rs_readers.return e-230 init")
136
319k
                .tag("tablet_id", tablet->tablet_id());
137
319k
        return Status::Error<false>(-230, "injected error");
138
319k
    });
139
140
319k
    for (auto& ctx : local_state->_common_expr_ctxs_push_down) {
141
184
        VExprContextSPtr context;
142
184
        RETURN_IF_ERROR(ctx->clone(_state, context));
143
184
        _common_expr_ctxs_push_down.emplace_back(context);
144
184
        context->prepare_ann_range_search(_vector_search_params);
145
184
    }
146
147
319k
    for (auto pair : local_state->_slot_id_to_virtual_column_expr) {
148
        // Scanner will be executed in a different thread, so we need to clone the context.
149
0
        VExprContextSPtr context;
150
0
        RETURN_IF_ERROR(pair.second->clone(_state, context));
151
0
        _slot_id_to_virtual_column_expr[pair.first] = context;
152
0
    }
153
154
319k
    _slot_id_to_index_in_block = local_state->_slot_id_to_index_in_block;
155
319k
    _slot_id_to_col_type = local_state->_slot_id_to_col_type;
156
319k
    _score_runtime = local_state->_score_runtime;
157
    // All scanners share the same ann_topn_runtime.
158
319k
    _ann_topn_runtime = local_state->_ann_topn_runtime;
159
160
    // set limit to reduce end of rowset and segment mem use
161
319k
    _tablet_reader = std::make_unique<BlockReader>();
162
    // batch size is passed down to segment iterator, use _state->batch_size()
163
    // instead of _parent->limit(), because if _parent->limit() is a very small
164
    // value (e.g. select a from t where a .. and b ... limit 1),
165
    // it will be very slow when reading data in segment iterator
166
319k
    _tablet_reader->set_batch_size(_state->batch_size());
167
319k
    TabletSchemaSPtr cached_schema;
168
319k
    std::string schema_key;
169
319k
    {
170
319k
        TOlapScanNode& olap_scan_node = local_state->olap_scan_node();
171
172
319k
        const auto check_can_use_cache = [&]() {
173
319k
            if (!(olap_scan_node.__isset.schema_version && olap_scan_node.__isset.columns_desc &&
174
319k
                  !olap_scan_node.columns_desc.empty() &&
175
319k
                  olap_scan_node.columns_desc[0].col_unique_id >= 0 && // Why check first column?
176
319k
                  tablet->tablet_schema()->num_variant_columns() == 0 &&
177
319k
                  tablet->tablet_schema()->num_virtual_columns() == 0)) {
178
26
                return false;
179
26
            }
180
181
319k
            const bool has_pruned_column =
182
4.73M
                    std::ranges::any_of(_output_tuple_desc->slots(), [](const auto& slot) {
183
4.73M
                        if ((slot->type()->get_primitive_type() == PrimitiveType::TYPE_STRUCT ||
184
4.73M
                             slot->type()->get_primitive_type() == PrimitiveType::TYPE_MAP ||
185
4.73M
                             slot->type()->get_primitive_type() == PrimitiveType::TYPE_ARRAY) &&
186
4.73M
                            !slot->all_access_paths().empty()) {
187
456
                            return true;
188
456
                        }
189
4.73M
                        return false;
190
4.73M
                    });
191
319k
            return !has_pruned_column;
192
319k
        }();
193
194
319k
        if (check_can_use_cache) {
195
318k
            schema_key =
196
318k
                    SchemaCache::get_schema_key(tablet->tablet_id(), olap_scan_node.columns_desc,
197
318k
                                                olap_scan_node.schema_version);
198
318k
            cached_schema = SchemaCache::instance()->get_schema(schema_key);
199
318k
        }
200
319k
        if (cached_schema && cached_schema->num_virtual_columns() == 0) {
201
314k
            tablet_schema = cached_schema;
202
314k
        } else {
203
            // If schema is not cached or cached schema has virtual columns,
204
            // we need to create a new TabletSchema.
205
4.47k
            tablet_schema = std::make_shared<TabletSchema>();
206
4.47k
            tablet_schema->copy_from(*tablet->tablet_schema());
207
4.47k
            if (olap_scan_node.__isset.columns_desc && !olap_scan_node.columns_desc.empty() &&
208
4.47k
                olap_scan_node.columns_desc[0].col_unique_id >= 0) {
209
                // Originally scanner get TabletSchema from tablet object in BE.
210
                // To support lightweight schema change for adding / dropping columns,
211
                // tabletschema is bounded to rowset and tablet's schema maybe outdated,
212
                //  so we have to use schema from a query plan witch FE puts it in query plans.
213
4.13k
                tablet_schema->clear_columns();
214
30.0k
                for (const auto& column_desc : olap_scan_node.columns_desc) {
215
30.0k
                    tablet_schema->append_column(TabletColumn(column_desc));
216
30.0k
                }
217
4.14k
                if (olap_scan_node.__isset.schema_version) {
218
4.14k
                    tablet_schema->set_schema_version(olap_scan_node.schema_version);
219
4.14k
                }
220
4.13k
            }
221
4.47k
            if (olap_scan_node.__isset.indexes_desc) {
222
4.14k
                tablet_schema->update_indexes_from_thrift(olap_scan_node.indexes_desc);
223
4.14k
            }
224
4.47k
        }
225
226
319k
        if (_tablet_reader_params.rs_splits.empty()) {
227
            // Non-pipeline mode, Tablet : Scanner = 1 : 1
228
            // acquire tablet rowset readers at the beginning of the scan node
229
            // to prevent this case: when there are lots of olap scanners to run for example 10000
230
            // the rowsets maybe compacted when the last olap scanner starts
231
0
            ReadSource read_source;
232
233
0
            if (config::is_cloud_mode()) {
234
                // FIXME(plat1ko): Avoid pointer cast
235
0
                ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet);
236
0
            }
237
238
0
            auto maybe_read_source = tablet->capture_read_source(
239
0
                    _tablet_reader_params.version,
240
0
                    {
241
0
                            .skip_missing_versions = _state->skip_missing_version(),
242
0
                            .enable_fetch_rowsets_from_peers =
243
0
                                    config::enable_fetch_rowsets_from_peer_replicas,
244
0
                            .enable_prefer_cached_rowset =
245
0
                                    config::is_cloud_mode() ? _state->enable_prefer_cached_rowset()
246
0
                                                            : false,
247
0
                            .query_freshness_tolerance_ms =
248
0
                                    config::is_cloud_mode() ? _state->query_freshness_tolerance_ms()
249
0
                                                            : -1,
250
0
                    });
251
0
            if (!maybe_read_source) {
252
0
                LOG(WARNING) << "fail to init reader. res=" << maybe_read_source.error();
253
0
                return maybe_read_source.error();
254
0
            }
255
256
0
            read_source = std::move(maybe_read_source.value());
257
258
0
            if (config::enable_mow_verbose_log && tablet->enable_unique_key_merge_on_write()) {
259
0
                LOG_INFO("finish capture_rs_readers for tablet={}, query_id={}",
260
0
                         tablet->tablet_id(), print_id(_state->query_id()));
261
0
            }
262
263
0
            if (!_state->skip_delete_predicate()) {
264
0
                read_source.fill_delete_predicates();
265
0
            }
266
0
            _tablet_reader_params.set_read_source(std::move(read_source));
267
0
        }
268
269
        // Initialize tablet_reader_params
270
319k
        RETURN_IF_ERROR(_init_tablet_reader_params(
271
319k
                local_state->_parent->cast<OlapScanOperatorX>()._slot_id_to_slot_desc, _key_ranges,
272
319k
                local_state->_slot_id_to_predicates, local_state->_push_down_functions));
273
319k
    }
274
275
    // add read columns in profile
276
319k
    if (_state->enable_profile()) {
277
0
        _profile->add_info_string("ReadColumns",
278
0
                                  read_columns_to_string(tablet_schema, _return_columns));
279
0
    }
280
281
    // Add newly created tablet schema to schema cache if it does not have virtual columns.
282
319k
    if (cached_schema == nullptr && !schema_key.empty() &&
283
319k
        tablet_schema->num_virtual_columns() == 0 && !tablet_schema->has_pruned_columns()) {
284
3.63k
        SchemaCache::instance()->insert_schema(schema_key, tablet_schema);
285
3.63k
    }
286
287
319k
    if (_tablet_reader_params.score_runtime) {
288
0
        SCOPED_TIMER(local_state->_statistics_collect_timer);
289
0
        _tablet_reader_params.collection_statistics = std::make_shared<CollectionStatistics>();
290
291
0
        io::IOContext io_ctx {
292
0
                .reader_type = ReaderType::READER_QUERY,
293
0
                .expiration_time = tablet->ttl_seconds(),
294
0
                .query_id = &_state->query_id(),
295
0
                .file_cache_stats = &_tablet_reader->mutable_stats()->file_cache_stats,
296
0
                .is_inverted_index = true,
297
0
        };
298
299
0
        RETURN_IF_ERROR(_tablet_reader_params.collection_statistics->collect(
300
0
                _state, _tablet_reader_params.rs_splits, _tablet_reader_params.tablet_schema,
301
0
                _tablet_reader_params.common_expr_ctxs_push_down, &io_ctx));
302
0
    }
303
304
319k
    _has_prepared = true;
305
319k
    return Status::OK();
306
319k
}
307
308
318k
Status OlapScanner::_open_impl(RuntimeState* state) {
309
318k
    RETURN_IF_ERROR(Scanner::_open_impl(state));
310
318k
    SCOPED_TIMER(_local_state->cast<OlapScanLocalState>()._reader_init_timer);
311
312
318k
    auto res = _tablet_reader->init(_tablet_reader_params);
313
318k
    if (!res.ok()) {
314
4
        res.append("failed to initialize storage reader. tablet=" +
315
4
                   std::to_string(_tablet_reader_params.tablet->tablet_id()) +
316
4
                   ", backend=" + BackendOptions::get_localhost());
317
4
        return res;
318
4
    }
319
320
    // Do not hold rs_splits any more to release memory.
321
318k
    _tablet_reader_params.rs_splits.clear();
322
323
318k
    return Status::OK();
324
318k
}
325
326
// it will be called under tablet read lock because capture rs readers need
327
Status OlapScanner::_init_tablet_reader_params(
328
        const phmap::flat_hash_map<int, SlotDescriptor*>& slot_id_to_slot_desc,
329
        const std::vector<OlapScanRange*>& key_ranges,
330
        const phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>&
331
                slot_to_predicates,
332
318k
        const std::vector<FunctionFilter>& function_filters) {
333
    // if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty
334
318k
    const bool single_version = _tablet_reader_params.has_single_version();
335
336
318k
    if (_state->skip_storage_engine_merge()) {
337
0
        _tablet_reader_params.direct_mode = true;
338
0
        _tablet_reader_params.aggregation = true;
339
318k
    } else {
340
318k
        auto push_down_agg_type = _local_state->get_push_down_agg_type();
341
318k
        _tablet_reader_params.direct_mode = _tablet_reader_params.aggregation || single_version ||
342
318k
                                            (push_down_agg_type != TPushAggOp::NONE &&
343
618
                                             push_down_agg_type != TPushAggOp::COUNT_ON_INDEX);
344
318k
    }
345
346
318k
    RETURN_IF_ERROR(_init_variant_columns());
347
318k
    RETURN_IF_ERROR(_init_return_columns());
348
349
318k
    _tablet_reader_params.reader_type = ReaderType::READER_QUERY;
350
318k
    _tablet_reader_params.push_down_agg_type_opt = _local_state->get_push_down_agg_type();
351
352
    // TODO: If a new runtime filter arrives after `_conjuncts` move to `_common_expr_ctxs_push_down`,
353
318k
    if (_common_expr_ctxs_push_down.empty()) {
354
318k
        for (auto& conjunct : _conjuncts) {
355
0
            _tablet_reader_params.remaining_conjunct_roots.emplace_back(conjunct->root());
356
0
        }
357
18.4E
    } else {
358
18.4E
        for (auto& ctx : _common_expr_ctxs_push_down) {
359
186
            _tablet_reader_params.remaining_conjunct_roots.emplace_back(ctx->root());
360
186
        }
361
18.4E
    }
362
363
318k
    _tablet_reader_params.common_expr_ctxs_push_down = _common_expr_ctxs_push_down;
364
318k
    _tablet_reader_params.virtual_column_exprs = _virtual_column_exprs;
365
318k
    _tablet_reader_params.vir_cid_to_idx_in_block = _vir_cid_to_idx_in_block;
366
318k
    _tablet_reader_params.vir_col_idx_to_type = _vir_col_idx_to_type;
367
318k
    _tablet_reader_params.score_runtime = _score_runtime;
368
318k
    _tablet_reader_params.output_columns = ((OlapScanLocalState*)_local_state)->_output_column_ids;
369
318k
    _tablet_reader_params.ann_topn_runtime = _ann_topn_runtime;
370
318k
    for (const auto& ele : ((OlapScanLocalState*)_local_state)->_cast_types_for_variants) {
371
4
        _tablet_reader_params.target_cast_type_for_variants[ele.first] = ele.second;
372
4
    };
373
318k
    auto& tablet_schema = _tablet_reader_params.tablet_schema;
374
3.55M
    for (auto& predicates : slot_to_predicates) {
375
3.55M
        const int sid = predicates.first;
376
3.55M
        DCHECK(slot_id_to_slot_desc.contains(sid));
377
3.55M
        int32_t index =
378
3.55M
                tablet_schema->field_index(slot_id_to_slot_desc.find(sid)->second->col_name());
379
3.55M
        if (index < 0) {
380
0
            throw Exception(
381
0
                    Status::InternalError("Column {} not found in tablet schema",
382
0
                                          slot_id_to_slot_desc.find(sid)->second->col_name()));
383
0
        }
384
3.55M
        for (auto& predicate : predicates.second) {
385
299k
            _tablet_reader_params.predicates.push_back(predicate->clone(index));
386
299k
        }
387
3.55M
    }
388
389
318k
    std::copy(function_filters.cbegin(), function_filters.cend(),
390
318k
              std::inserter(_tablet_reader_params.function_filters,
391
318k
                            _tablet_reader_params.function_filters.begin()));
392
393
    // Merge the columns in delete predicate that not in latest schema in to current tablet schema
394
318k
    for (auto& del_pred : _tablet_reader_params.delete_predicates) {
395
18
        tablet_schema->merge_dropped_columns(*del_pred->tablet_schema());
396
18
    }
397
398
    // Range
399
323k
    for (auto* key_range : key_ranges) {
400
323k
        if (key_range->begin_scan_range.size() == 1 &&
401
323k
            key_range->begin_scan_range.get_value(0) == NEGATIVE_INFINITY) {
402
16.0k
            continue;
403
16.0k
        }
404
405
307k
        _tablet_reader_params.start_key_include = key_range->begin_include;
406
307k
        _tablet_reader_params.end_key_include = key_range->end_include;
407
408
307k
        _tablet_reader_params.start_key.push_back(key_range->begin_scan_range);
409
307k
        _tablet_reader_params.end_key.push_back(key_range->end_scan_range);
410
307k
    }
411
412
318k
    _tablet_reader_params.profile = _local_state->custom_profile();
413
318k
    _tablet_reader_params.runtime_state = _state;
414
415
318k
    _tablet_reader_params.origin_return_columns = &_return_columns;
416
318k
    _tablet_reader_params.tablet_columns_convert_to_null_set = &_tablet_columns_convert_to_null_set;
417
418
318k
    if (_tablet_reader_params.direct_mode) {
419
317k
        _tablet_reader_params.return_columns = _return_columns;
420
317k
    } else {
421
        // we need to fetch all key columns to do the right aggregation on storage engine side.
422
2.23k
        for (size_t i = 0; i < tablet_schema->num_key_columns(); ++i) {
423
1.28k
            _tablet_reader_params.return_columns.push_back(i);
424
1.28k
        }
425
1.86k
        for (auto index : _return_columns) {
426
1.86k
            if (tablet_schema->column(index).is_key()) {
427
1.25k
                continue;
428
1.25k
            }
429
612
            _tablet_reader_params.return_columns.push_back(index);
430
612
        }
431
        // expand the sequence column
432
946
        if (tablet_schema->has_sequence_col() || tablet_schema->has_seq_map()) {
433
0
            bool has_replace_col = false;
434
0
            for (auto col : _return_columns) {
435
0
                if (tablet_schema->column(col).aggregation() ==
436
0
                    FieldAggregationMethod::OLAP_FIELD_AGGREGATION_REPLACE) {
437
0
                    has_replace_col = true;
438
0
                    break;
439
0
                }
440
0
            }
441
0
            if (auto sequence_col_idx = tablet_schema->sequence_col_idx();
442
0
                has_replace_col && tablet_schema->has_sequence_col() &&
443
0
                std::find(_return_columns.begin(), _return_columns.end(), sequence_col_idx) ==
444
0
                        _return_columns.end()) {
445
0
                _tablet_reader_params.return_columns.push_back(sequence_col_idx);
446
0
            }
447
0
            if (has_replace_col) {
448
0
                const auto& val_to_seq = tablet_schema->value_col_idx_to_seq_col_idx();
449
0
                std::set<uint32_t> return_seq_columns;
450
451
0
                for (auto col : _tablet_reader_params.return_columns) {
452
                    // we need to add the necessary sequence column in _return_columns, and
453
                    // Avoid adding the same seq column twice
454
0
                    const auto val_iter = val_to_seq.find(col);
455
0
                    if (val_iter != val_to_seq.end()) {
456
0
                        auto seq = val_iter->second;
457
0
                        if (std::find(_tablet_reader_params.return_columns.begin(),
458
0
                                      _tablet_reader_params.return_columns.end(),
459
0
                                      seq) == _tablet_reader_params.return_columns.end()) {
460
0
                            return_seq_columns.insert(seq);
461
0
                        }
462
0
                    }
463
0
                }
464
0
                _tablet_reader_params.return_columns.insert(
465
0
                        std::end(_tablet_reader_params.return_columns),
466
0
                        std::begin(return_seq_columns), std::end(return_seq_columns));
467
0
            }
468
0
        }
469
946
    }
470
471
318k
    _tablet_reader_params.use_page_cache = _state->enable_page_cache();
472
473
318k
    DBUG_EXECUTE_IF("NewOlapScanner::_init_tablet_reader_params.block", DBUG_BLOCK);
474
475
318k
    if (!_state->skip_storage_engine_merge()) {
476
318k
        auto* olap_scan_local_state = (OlapScanLocalState*)_local_state;
477
318k
        TOlapScanNode& olap_scan_node = olap_scan_local_state->olap_scan_node();
478
        // order by table keys optimization for topn
479
        // will only read head/tail of data file since it's already sorted by keys
480
318k
        if (olap_scan_node.__isset.sort_info && !olap_scan_node.sort_info.is_asc_order.empty()) {
481
0
            _limit = _local_state->limit_per_scanner();
482
0
            _tablet_reader_params.read_orderby_key = true;
483
0
            if (!olap_scan_node.sort_info.is_asc_order[0]) {
484
0
                _tablet_reader_params.read_orderby_key_reverse = true;
485
0
            }
486
0
            _tablet_reader_params.read_orderby_key_num_prefix_columns =
487
0
                    olap_scan_node.sort_info.is_asc_order.size();
488
0
            _tablet_reader_params.read_orderby_key_limit = _limit;
489
490
0
            if (_tablet_reader_params.read_orderby_key_limit > 0 &&
491
0
                olap_scan_local_state->_storage_no_merge()) {
492
0
                _tablet_reader_params.filter_block_conjuncts = _conjuncts;
493
0
                _conjuncts.clear();
494
0
            }
495
0
        }
496
497
        // set push down topn filter
498
318k
        _tablet_reader_params.topn_filter_source_node_ids =
499
318k
                olap_scan_local_state->get_topn_filter_source_node_ids(_state, true);
500
318k
        if (!_tablet_reader_params.topn_filter_source_node_ids.empty()) {
501
0
            _tablet_reader_params.topn_filter_target_node_id =
502
0
                    olap_scan_local_state->parent()->node_id();
503
0
        }
504
318k
    }
505
506
    // If this is a Two-Phase read query, and we need to delay the release of Rowset
507
    // by rowset->update_delayed_expired_timestamp().This could expand the lifespan of Rowset
508
318k
    if (tablet_schema->field_index(BeConsts::ROWID_COL) >= 0) {
509
0
        constexpr static int delayed_s = 60;
510
0
        for (auto rs_reader : _tablet_reader_params.rs_splits) {
511
0
            uint64_t delayed_expired_timestamp =
512
0
                    UnixSeconds() + _tablet_reader_params.runtime_state->execution_timeout() +
513
0
                    delayed_s;
514
0
            rs_reader.rs_reader->rowset()->update_delayed_expired_timestamp(
515
0
                    delayed_expired_timestamp);
516
0
            ExecEnv::GetInstance()->storage_engine().add_quering_rowset(
517
0
                    rs_reader.rs_reader->rowset());
518
0
        }
519
0
    }
520
521
318k
    if (tablet_schema->has_global_row_id()) {
522
10
        auto& id_file_map = _state->get_id_file_map();
523
10
        for (auto rs_reader : _tablet_reader_params.rs_splits) {
524
10
            id_file_map->add_temp_rowset(rs_reader.rs_reader->rowset());
525
10
        }
526
10
    }
527
528
318k
    return Status::OK();
529
318k
}
530
531
318k
Status OlapScanner::_init_variant_columns() {
532
318k
    auto& tablet_schema = _tablet_reader_params.tablet_schema;
533
318k
    if (tablet_schema->num_variant_columns() == 0) {
534
318k
        return Status::OK();
535
318k
    }
536
    // Parent column has path info to distinction from each other
537
18.4E
    for (auto* slot : _output_tuple_desc->slots()) {
538
60
        if (slot->type()->get_primitive_type() == PrimitiveType::TYPE_VARIANT) {
539
            // Such columns are not exist in frontend schema info, so we need to
540
            // add them into tablet_schema for later column indexing.
541
30
            TabletColumn subcol = TabletColumn::create_materialized_variant_column(
542
30
                    tablet_schema->column_by_uid(slot->col_unique_id()).name_lower_case(),
543
30
                    slot->column_paths(), slot->col_unique_id(),
544
30
                    assert_cast<const DataTypeVariant&>(*remove_nullable(slot->type()))
545
30
                            .variant_max_subcolumns_count());
546
30
            if (tablet_schema->field_index(*subcol.path_info_ptr()) < 0) {
547
18
                tablet_schema->append_column(subcol, TabletSchema::ColumnType::VARIANT);
548
18
            }
549
30
        }
550
60
    }
551
18.4E
    variant_util::inherit_column_attributes(tablet_schema);
552
18.4E
    return Status::OK();
553
318k
}
554
555
318k
Status OlapScanner::_init_return_columns() {
556
4.71M
    for (auto* slot : _output_tuple_desc->slots()) {
557
        // variant column using path to index a column
558
4.71M
        int32_t index = 0;
559
4.71M
        auto& tablet_schema = _tablet_reader_params.tablet_schema;
560
4.71M
        if (slot->type()->get_primitive_type() == PrimitiveType::TYPE_VARIANT) {
561
30
            index = tablet_schema->field_index(PathInData(
562
30
                    tablet_schema->column_by_uid(slot->col_unique_id()).name_lower_case(),
563
30
                    slot->column_paths()));
564
4.71M
        } else {
565
4.73M
            index = slot->col_unique_id() >= 0 ? tablet_schema->field_index(slot->col_unique_id())
566
18.4E
                                               : tablet_schema->field_index(slot->col_name());
567
4.71M
        }
568
569
4.71M
        if (index < 0) {
570
0
            return Status::InternalError(
571
0
                    "field name is invalid. field={}, field_name_to_index={}, col_unique_id={}",
572
0
                    slot->col_name(), tablet_schema->get_all_field_names(), slot->col_unique_id());
573
0
        }
574
575
4.71M
        if (slot->get_virtual_column_expr()) {
576
0
            ColumnId virtual_column_cid = index;
577
0
            _virtual_column_exprs[virtual_column_cid] = _slot_id_to_virtual_column_expr[slot->id()];
578
0
            size_t idx_in_block = _slot_id_to_index_in_block[slot->id()];
579
0
            _vir_cid_to_idx_in_block[virtual_column_cid] = idx_in_block;
580
0
            _vir_col_idx_to_type[idx_in_block] = _slot_id_to_col_type[slot->id()];
581
582
0
            VLOG_DEBUG << fmt::format(
583
0
                    "Virtual column, slot id: {}, cid {}, column index: {}, type: {}", slot->id(),
584
0
                    virtual_column_cid, _vir_cid_to_idx_in_block[virtual_column_cid],
585
0
                    _vir_col_idx_to_type[idx_in_block]->get_name());
586
0
        }
587
588
4.71M
        const auto& column = tablet_schema->column(index);
589
4.71M
        int32_t unique_id = column.unique_id() > 0 ? column.unique_id() : column.parent_unique_id();
590
4.71M
        if (!slot->all_access_paths().empty()) {
591
1.28k
            _tablet_reader_params.all_access_paths.insert({unique_id, slot->all_access_paths()});
592
1.28k
        }
593
594
4.71M
        if (!slot->predicate_access_paths().empty()) {
595
4
            _tablet_reader_params.predicate_access_paths.insert(
596
4
                    {unique_id, slot->predicate_access_paths()});
597
4
        }
598
599
4.71M
        if ((slot->type()->get_primitive_type() == PrimitiveType::TYPE_STRUCT ||
600
4.73M
             slot->type()->get_primitive_type() == PrimitiveType::TYPE_MAP ||
601
4.73M
             slot->type()->get_primitive_type() == PrimitiveType::TYPE_ARRAY) &&
602
4.71M
            !slot->all_access_paths().empty()) {
603
1.26k
            tablet_schema->add_pruned_columns_data_type(column.unique_id(), slot->type());
604
1.26k
        }
605
606
4.71M
        _return_columns.push_back(index);
607
4.71M
        if (slot->is_nullable() && !tablet_schema->column(index).is_nullable()) {
608
0
            _tablet_columns_convert_to_null_set.emplace(index);
609
4.71M
        } else if (!slot->is_nullable() && tablet_schema->column(index).is_nullable()) {
610
0
            return Status::Error<ErrorCode::INVALID_SCHEMA>(
611
0
                    "slot(id: {}, name: {})'s nullable does not match "
612
0
                    "column(tablet id: {}, index: {}, name: {}) ",
613
0
                    slot->id(), slot->col_name(), tablet_schema->table_id(), index,
614
0
                    tablet_schema->column(index).name());
615
0
        }
616
4.71M
    }
617
618
318k
    if (_return_columns.empty()) {
619
0
        return Status::InternalError("failed to build storage scanner, no materialized slot!");
620
0
    }
621
622
318k
    return Status::OK();
623
318k
}
624
625
319k
doris::TabletStorageType OlapScanner::get_storage_type() {
626
319k
    if (config::is_cloud_mode()) {
627
        // we don't have cold storage in cloud mode, all storage is treated as local
628
0
        return doris::TabletStorageType::STORAGE_TYPE_LOCAL;
629
0
    }
630
319k
    int local_reader = 0;
631
565k
    for (const auto& reader : _tablet_reader_params.rs_splits) {
632
565k
        local_reader += reader.rs_reader->rowset()->is_local();
633
565k
    }
634
319k
    int total_reader = _tablet_reader_params.rs_splits.size();
635
636
319k
    if (local_reader == total_reader) {
637
319k
        return doris::TabletStorageType::STORAGE_TYPE_LOCAL;
638
319k
    } else if (local_reader == 0) {
639
0
        return doris::TabletStorageType::STORAGE_TYPE_REMOTE;
640
0
    }
641
0
    return doris::TabletStorageType::STORAGE_TYPE_REMOTE_AND_LOCAL;
642
319k
}
643
644
336k
Status OlapScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
645
    // Read one block from block reader
646
    // ATTN: Here we need to let the _get_block_impl method guarantee the semantics of the interface,
647
    // that is, eof can be set to true only when the returned block is empty.
648
336k
    RETURN_IF_ERROR(_tablet_reader->next_block_with_aggregation(block, eof));
649
336k
    if (block->rows() > 0) {
650
17.5k
        _tablet_reader_params.tablet->read_block_count.fetch_add(1, std::memory_order_relaxed);
651
17.5k
        *eof = false;
652
17.5k
    }
653
336k
    return Status::OK();
654
336k
}
655
656
319k
Status OlapScanner::close(RuntimeState* state) {
657
319k
    if (!_try_close()) {
658
134
        return Status::OK();
659
134
    }
660
319k
    RETURN_IF_ERROR(Scanner::close(state));
661
319k
    return Status::OK();
662
319k
}
663
664
16
void OlapScanner::update_realtime_counters() {
665
16
    OlapScanLocalState* local_state = static_cast<OlapScanLocalState*>(_local_state);
666
16
    const OlapReaderStatistics& stats = _tablet_reader->stats();
667
16
    COUNTER_UPDATE(local_state->_read_compressed_counter, stats.compressed_bytes_read);
668
16
    COUNTER_UPDATE(local_state->_read_uncompressed_counter, stats.uncompressed_bytes_read);
669
16
    COUNTER_UPDATE(local_state->_scan_bytes, stats.uncompressed_bytes_read);
670
16
    COUNTER_UPDATE(local_state->_scan_rows, stats.raw_rows_read);
671
672
    // Make sure the scan bytes and scan rows counter in audit log is the same as the counter in
673
    // doris metrics.
674
    // ScanBytes is the uncompressed bytes read from local + remote
675
    // bytes_read_from_local is the compressed bytes read from local
676
    // bytes_read_from_remote is the compressed bytes read from remote
677
    // scan bytes > bytes_read_from_local + bytes_read_from_remote
678
16
    _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_rows(stats.raw_rows_read);
679
16
    _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes(
680
16
            stats.uncompressed_bytes_read);
681
682
    // In case of no cache, we still need to update the IO stats. uncompressed bytes read == local + remote
683
16
    if (stats.file_cache_stats.bytes_read_from_local == 0 &&
684
16
        stats.file_cache_stats.bytes_read_from_remote == 0) {
685
16
        _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage(
686
16
                stats.compressed_bytes_read);
687
16
        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
688
16
                stats.compressed_bytes_read);
689
16
    } else {
690
0
        _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage(
691
0
                stats.file_cache_stats.bytes_read_from_local - _bytes_read_from_local);
692
0
        _state->get_query_ctx()
693
0
                ->resource_ctx()
694
0
                ->io_context()
695
0
                ->update_scan_bytes_from_remote_storage(
696
0
                        stats.file_cache_stats.bytes_read_from_remote - _bytes_read_from_remote);
697
698
0
        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
699
0
                stats.file_cache_stats.bytes_read_from_local - _bytes_read_from_local);
700
0
        DorisMetrics::instance()->query_scan_bytes_from_remote->increment(
701
0
                stats.file_cache_stats.bytes_read_from_remote - _bytes_read_from_remote);
702
0
    }
703
704
16
    _tablet_reader->mutable_stats()->compressed_bytes_read = 0;
705
16
    _tablet_reader->mutable_stats()->uncompressed_bytes_read = 0;
706
16
    _tablet_reader->mutable_stats()->raw_rows_read = 0;
707
708
16
    _bytes_read_from_local = _tablet_reader->stats().file_cache_stats.bytes_read_from_local;
709
16
    _bytes_read_from_remote = _tablet_reader->stats().file_cache_stats.bytes_read_from_remote;
710
16
}
711
712
318k
void OlapScanner::_collect_profile_before_close() {
713
    //  Please don't directly enable the profile here, we need to set QueryStatistics using the counter inside.
714
318k
    if (_has_updated_counter) {
715
0
        return;
716
0
    }
717
318k
    _has_updated_counter = true;
718
318k
    _tablet_reader->update_profile(_profile);
719
720
318k
    Scanner::_collect_profile_before_close();
721
722
    // Update counters for OlapScanner
723
    // Update counters from tablet reader's stats
724
318k
    auto& stats = _tablet_reader->stats();
725
318k
    auto* local_state = (OlapScanLocalState*)_local_state;
726
318k
    COUNTER_UPDATE(local_state->_io_timer, stats.io_ns);
727
318k
    COUNTER_UPDATE(local_state->_read_compressed_counter, stats.compressed_bytes_read);
728
318k
    COUNTER_UPDATE(local_state->_scan_bytes, stats.uncompressed_bytes_read);
729
318k
    COUNTER_UPDATE(local_state->_decompressor_timer, stats.decompress_ns);
730
318k
    COUNTER_UPDATE(local_state->_read_uncompressed_counter, stats.uncompressed_bytes_read);
731
318k
    COUNTER_UPDATE(local_state->_block_load_timer, stats.block_load_ns);
732
318k
    COUNTER_UPDATE(local_state->_block_load_counter, stats.blocks_load);
733
318k
    COUNTER_UPDATE(local_state->_block_fetch_timer, stats.block_fetch_ns);
734
318k
    COUNTER_UPDATE(local_state->_delete_bitmap_get_agg_timer, stats.delete_bitmap_get_agg_ns);
735
318k
    COUNTER_UPDATE(local_state->_scan_rows, stats.raw_rows_read);
736
318k
    COUNTER_UPDATE(local_state->_vec_cond_timer, stats.vec_cond_ns);
737
318k
    COUNTER_UPDATE(local_state->_short_cond_timer, stats.short_cond_ns);
738
318k
    COUNTER_UPDATE(local_state->_expr_filter_timer, stats.expr_filter_ns);
739
318k
    COUNTER_UPDATE(local_state->_block_init_timer, stats.block_init_ns);
740
318k
    COUNTER_UPDATE(local_state->_block_init_seek_timer, stats.block_init_seek_ns);
741
318k
    COUNTER_UPDATE(local_state->_block_init_seek_counter, stats.block_init_seek_num);
742
318k
    COUNTER_UPDATE(local_state->_segment_generate_row_range_by_keys_timer,
743
318k
                   stats.generate_row_ranges_by_keys_ns);
744
318k
    COUNTER_UPDATE(local_state->_segment_generate_row_range_by_column_conditions_timer,
745
318k
                   stats.generate_row_ranges_by_column_conditions_ns);
746
318k
    COUNTER_UPDATE(local_state->_segment_generate_row_range_by_bf_timer,
747
318k
                   stats.generate_row_ranges_by_bf_ns);
748
318k
    COUNTER_UPDATE(local_state->_collect_iterator_merge_next_timer,
749
318k
                   stats.collect_iterator_merge_next_timer);
750
318k
    COUNTER_UPDATE(local_state->_segment_generate_row_range_by_zonemap_timer,
751
318k
                   stats.generate_row_ranges_by_zonemap_ns);
752
318k
    COUNTER_UPDATE(local_state->_segment_generate_row_range_by_dict_timer,
753
318k
                   stats.generate_row_ranges_by_dict_ns);
754
318k
    COUNTER_UPDATE(local_state->_predicate_column_read_timer, stats.predicate_column_read_ns);
755
318k
    COUNTER_UPDATE(local_state->_non_predicate_column_read_timer, stats.non_predicate_read_ns);
756
318k
    COUNTER_UPDATE(local_state->_predicate_column_read_seek_timer,
757
318k
                   stats.predicate_column_read_seek_ns);
758
318k
    COUNTER_UPDATE(local_state->_predicate_column_read_seek_counter,
759
318k
                   stats.predicate_column_read_seek_num);
760
318k
    COUNTER_UPDATE(local_state->_lazy_read_timer, stats.lazy_read_ns);
761
318k
    COUNTER_UPDATE(local_state->_lazy_read_seek_timer, stats.block_lazy_read_seek_ns);
762
318k
    COUNTER_UPDATE(local_state->_lazy_read_seek_counter, stats.block_lazy_read_seek_num);
763
318k
    COUNTER_UPDATE(local_state->_output_col_timer, stats.output_col_ns);
764
318k
    COUNTER_UPDATE(local_state->_rows_vec_cond_filtered_counter, stats.rows_vec_cond_filtered);
765
318k
    COUNTER_UPDATE(local_state->_rows_short_circuit_cond_filtered_counter,
766
318k
                   stats.rows_short_circuit_cond_filtered);
767
318k
    COUNTER_UPDATE(local_state->_rows_expr_cond_filtered_counter, stats.rows_expr_cond_filtered);
768
318k
    COUNTER_UPDATE(local_state->_rows_vec_cond_input_counter, stats.vec_cond_input_rows);
769
318k
    COUNTER_UPDATE(local_state->_rows_short_circuit_cond_input_counter,
770
318k
                   stats.short_circuit_cond_input_rows);
771
318k
    COUNTER_UPDATE(local_state->_rows_expr_cond_input_counter, stats.expr_cond_input_rows);
772
318k
    COUNTER_UPDATE(local_state->_stats_filtered_counter, stats.rows_stats_filtered);
773
318k
    COUNTER_UPDATE(local_state->_stats_rp_filtered_counter, stats.rows_stats_rp_filtered);
774
318k
    COUNTER_UPDATE(local_state->_dict_filtered_counter, stats.segment_dict_filtered);
775
318k
    COUNTER_UPDATE(local_state->_bf_filtered_counter, stats.rows_bf_filtered);
776
318k
    COUNTER_UPDATE(local_state->_del_filtered_counter, stats.rows_del_filtered);
777
318k
    COUNTER_UPDATE(local_state->_del_filtered_counter, stats.rows_del_by_bitmap);
778
318k
    COUNTER_UPDATE(local_state->_del_filtered_counter, stats.rows_vec_del_cond_filtered);
779
318k
    COUNTER_UPDATE(local_state->_conditions_filtered_counter, stats.rows_conditions_filtered);
780
318k
    COUNTER_UPDATE(local_state->_key_range_filtered_counter, stats.rows_key_range_filtered);
781
318k
    COUNTER_UPDATE(local_state->_total_pages_num_counter, stats.total_pages_num);
782
318k
    COUNTER_UPDATE(local_state->_cached_pages_num_counter, stats.cached_pages_num);
783
318k
    COUNTER_UPDATE(local_state->_inverted_index_filter_counter, stats.rows_inverted_index_filtered);
784
318k
    COUNTER_UPDATE(local_state->_inverted_index_filter_timer, stats.inverted_index_filter_timer);
785
318k
    COUNTER_UPDATE(local_state->_inverted_index_query_cache_hit_counter,
786
318k
                   stats.inverted_index_query_cache_hit);
787
318k
    COUNTER_UPDATE(local_state->_inverted_index_query_cache_miss_counter,
788
318k
                   stats.inverted_index_query_cache_miss);
789
318k
    COUNTER_UPDATE(local_state->_inverted_index_query_timer, stats.inverted_index_query_timer);
790
318k
    COUNTER_UPDATE(local_state->_inverted_index_query_null_bitmap_timer,
791
318k
                   stats.inverted_index_query_null_bitmap_timer);
792
318k
    COUNTER_UPDATE(local_state->_inverted_index_query_bitmap_copy_timer,
793
318k
                   stats.inverted_index_query_bitmap_copy_timer);
794
318k
    COUNTER_UPDATE(local_state->_inverted_index_searcher_open_timer,
795
318k
                   stats.inverted_index_searcher_open_timer);
796
318k
    COUNTER_UPDATE(local_state->_inverted_index_searcher_search_timer,
797
318k
                   stats.inverted_index_searcher_search_timer);
798
318k
    COUNTER_UPDATE(local_state->_inverted_index_searcher_search_init_timer,
799
318k
                   stats.inverted_index_searcher_search_init_timer);
800
318k
    COUNTER_UPDATE(local_state->_inverted_index_searcher_search_exec_timer,
801
318k
                   stats.inverted_index_searcher_search_exec_timer);
802
318k
    COUNTER_UPDATE(local_state->_inverted_index_searcher_cache_hit_counter,
803
318k
                   stats.inverted_index_searcher_cache_hit);
804
318k
    COUNTER_UPDATE(local_state->_inverted_index_searcher_cache_miss_counter,
805
318k
                   stats.inverted_index_searcher_cache_miss);
806
318k
    COUNTER_UPDATE(local_state->_inverted_index_downgrade_count_counter,
807
318k
                   stats.inverted_index_downgrade_count);
808
318k
    COUNTER_UPDATE(local_state->_inverted_index_analyzer_timer,
809
318k
                   stats.inverted_index_analyzer_timer);
810
318k
    COUNTER_UPDATE(local_state->_inverted_index_lookup_timer, stats.inverted_index_lookup_timer);
811
318k
    COUNTER_UPDATE(local_state->_variant_scan_sparse_column_timer,
812
318k
                   stats.variant_scan_sparse_column_timer_ns);
813
318k
    COUNTER_UPDATE(local_state->_variant_scan_sparse_column_bytes,
814
318k
                   stats.variant_scan_sparse_column_bytes);
815
318k
    COUNTER_UPDATE(local_state->_variant_fill_path_from_sparse_column_timer,
816
318k
                   stats.variant_fill_path_from_sparse_column_timer_ns);
817
318k
    COUNTER_UPDATE(local_state->_variant_subtree_default_iter_count,
818
318k
                   stats.variant_subtree_default_iter_count);
819
318k
    COUNTER_UPDATE(local_state->_variant_subtree_leaf_iter_count,
820
318k
                   stats.variant_subtree_leaf_iter_count);
821
318k
    COUNTER_UPDATE(local_state->_variant_subtree_hierarchical_iter_count,
822
318k
                   stats.variant_subtree_hierarchical_iter_count);
823
318k
    COUNTER_UPDATE(local_state->_variant_subtree_sparse_iter_count,
824
318k
                   stats.variant_subtree_sparse_iter_count);
825
318k
    COUNTER_UPDATE(local_state->_variant_doc_value_column_iter_count,
826
318k
                   stats.variant_doc_value_column_iter_count);
827
828
318k
    InvertedIndexProfileReporter inverted_index_profile;
829
318k
    inverted_index_profile.update(local_state->_index_filter_profile.get(),
830
318k
                                  &stats.inverted_index_stats);
831
832
    // only cloud deploy mode will use file cache.
833
318k
    if (config::is_cloud_mode() && config::enable_file_cache) {
834
0
        io::FileCacheProfileReporter cache_profile(local_state->_segment_profile.get());
835
0
        cache_profile.update(&stats.file_cache_stats);
836
0
        _state->get_query_ctx()->resource_ctx()->io_context()->update_bytes_write_into_cache(
837
0
                stats.file_cache_stats.bytes_write_into_cache);
838
0
    }
839
318k
    COUNTER_UPDATE(local_state->_output_index_result_column_timer,
840
318k
                   stats.output_index_result_column_timer);
841
318k
    COUNTER_UPDATE(local_state->_filtered_segment_counter, stats.filtered_segment_number);
842
318k
    COUNTER_UPDATE(local_state->_total_segment_counter, stats.total_segment_number);
843
318k
    COUNTER_UPDATE(local_state->_condition_cache_hit_segment_counter,
844
318k
                   stats.condition_cache_hit_seg_nums);
845
318k
    COUNTER_UPDATE(local_state->_condition_cache_filtered_rows_counter,
846
318k
                   stats.condition_cache_filtered_rows);
847
848
318k
    COUNTER_UPDATE(local_state->_tablet_reader_init_timer, stats.tablet_reader_init_timer_ns);
849
318k
    COUNTER_UPDATE(local_state->_tablet_reader_capture_rs_readers_timer,
850
318k
                   stats.tablet_reader_capture_rs_readers_timer_ns);
851
318k
    COUNTER_UPDATE(local_state->_tablet_reader_init_return_columns_timer,
852
318k
                   stats.tablet_reader_init_return_columns_timer_ns);
853
318k
    COUNTER_UPDATE(local_state->_tablet_reader_init_keys_param_timer,
854
318k
                   stats.tablet_reader_init_keys_param_timer_ns);
855
318k
    COUNTER_UPDATE(local_state->_tablet_reader_init_orderby_keys_param_timer,
856
318k
                   stats.tablet_reader_init_orderby_keys_param_timer_ns);
857
318k
    COUNTER_UPDATE(local_state->_tablet_reader_init_conditions_param_timer,
858
318k
                   stats.tablet_reader_init_conditions_param_timer_ns);
859
318k
    COUNTER_UPDATE(local_state->_tablet_reader_init_delete_condition_param_timer,
860
318k
                   stats.tablet_reader_init_delete_condition_param_timer_ns);
861
318k
    COUNTER_UPDATE(local_state->_block_reader_vcollect_iter_init_timer,
862
318k
                   stats.block_reader_vcollect_iter_init_timer_ns);
863
318k
    COUNTER_UPDATE(local_state->_block_reader_rs_readers_init_timer,
864
318k
                   stats.block_reader_rs_readers_init_timer_ns);
865
318k
    COUNTER_UPDATE(local_state->_block_reader_build_heap_init_timer,
866
318k
                   stats.block_reader_build_heap_init_timer_ns);
867
868
318k
    COUNTER_UPDATE(local_state->_rowset_reader_get_segment_iterators_timer,
869
318k
                   stats.rowset_reader_get_segment_iterators_timer_ns);
870
318k
    COUNTER_UPDATE(local_state->_rowset_reader_create_iterators_timer,
871
318k
                   stats.rowset_reader_create_iterators_timer_ns);
872
318k
    COUNTER_UPDATE(local_state->_rowset_reader_init_iterators_timer,
873
318k
                   stats.rowset_reader_init_iterators_timer_ns);
874
318k
    COUNTER_UPDATE(local_state->_rowset_reader_load_segments_timer,
875
318k
                   stats.rowset_reader_load_segments_timer_ns);
876
877
318k
    COUNTER_UPDATE(local_state->_segment_iterator_init_timer, stats.segment_iterator_init_timer_ns);
878
318k
    COUNTER_UPDATE(local_state->_segment_iterator_init_return_column_iterators_timer,
879
318k
                   stats.segment_iterator_init_return_column_iterators_timer_ns);
880
318k
    COUNTER_UPDATE(local_state->_segment_iterator_init_index_iterators_timer,
881
318k
                   stats.segment_iterator_init_index_iterators_timer_ns);
882
883
318k
    COUNTER_UPDATE(local_state->_segment_create_column_readers_timer,
884
318k
                   stats.segment_create_column_readers_timer_ns);
885
318k
    COUNTER_UPDATE(local_state->_segment_load_index_timer, stats.segment_load_index_timer_ns);
886
887
    // Update metrics
888
318k
    DorisMetrics::instance()->query_scan_bytes->increment(
889
318k
            local_state->_read_uncompressed_counter->value());
890
318k
    DorisMetrics::instance()->query_scan_rows->increment(local_state->_scan_rows->value());
891
318k
    auto& tablet = _tablet_reader_params.tablet;
892
318k
    tablet->query_scan_bytes->increment(local_state->_read_uncompressed_counter->value());
893
318k
    tablet->query_scan_rows->increment(local_state->_scan_rows->value());
894
318k
    tablet->query_scan_count->increment(1);
895
896
318k
    COUNTER_UPDATE(local_state->_ann_range_search_filter_counter,
897
318k
                   stats.rows_ann_index_range_filtered);
898
318k
    COUNTER_UPDATE(local_state->_ann_topn_filter_counter, stats.rows_ann_index_topn_filtered);
899
318k
    COUNTER_UPDATE(local_state->_ann_index_load_costs, stats.ann_index_load_ns);
900
318k
    COUNTER_UPDATE(local_state->_ann_range_search_costs, stats.ann_index_range_search_ns);
901
318k
    COUNTER_UPDATE(local_state->_ann_range_search_cnt, stats.ann_index_range_search_cnt);
902
318k
    COUNTER_UPDATE(local_state->_ann_range_engine_search_costs, stats.ann_range_engine_search_ns);
903
    // Engine prepare before search
904
318k
    COUNTER_UPDATE(local_state->_ann_range_pre_process_costs, stats.ann_range_pre_process_ns);
905
    // Post process parent: Doris result process + engine convert
906
318k
    COUNTER_UPDATE(local_state->_ann_range_post_process_costs,
907
318k
                   stats.ann_range_result_convert_ns + stats.ann_range_engine_convert_ns);
908
    // Engine convert (child under post-process)
909
318k
    COUNTER_UPDATE(local_state->_ann_range_engine_convert_costs, stats.ann_range_engine_convert_ns);
910
    // Doris-side result convert (child under post-process)
911
318k
    COUNTER_UPDATE(local_state->_ann_range_result_convert_costs, stats.ann_range_result_convert_ns);
912
913
318k
    COUNTER_UPDATE(local_state->_ann_topn_search_costs, stats.ann_topn_search_ns);
914
318k
    COUNTER_UPDATE(local_state->_ann_topn_search_cnt, stats.ann_index_topn_search_cnt);
915
916
    // Detailed ANN timers
917
    // ANN TopN timers with hierarchy
918
    // Engine search time (FAISS)
919
318k
    COUNTER_UPDATE(local_state->_ann_topn_engine_search_costs,
920
318k
                   stats.ann_index_topn_engine_search_ns);
921
    // Engine prepare time (allocations/buffer setup before search)
922
318k
    COUNTER_UPDATE(local_state->_ann_topn_pre_process_costs,
923
318k
                   stats.ann_index_topn_engine_prepare_ns);
924
    // Post process parent includes Doris result processing + engine convert
925
318k
    COUNTER_UPDATE(local_state->_ann_topn_post_process_costs,
926
318k
                   stats.ann_index_topn_result_process_ns + stats.ann_index_topn_engine_convert_ns);
927
    // Engine-side conversion time inside FAISS wrappers (child under post-process)
928
318k
    COUNTER_UPDATE(local_state->_ann_topn_engine_convert_costs,
929
318k
                   stats.ann_index_topn_engine_convert_ns);
930
931
    // Doris-side result convert costs (show separately as another child counter); use pure process time
932
318k
    COUNTER_UPDATE(local_state->_ann_topn_result_convert_costs,
933
318k
                   stats.ann_index_topn_result_process_ns);
934
935
    // Overhead counter removed; precise instrumentation is reported via engine_prepare above.
936
318k
}
937
938
#include "common/compile_check_avoid_end.h"
939
} // namespace doris