Coverage Report

Created: 2026-04-03 20:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/scanner.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <stdint.h>
21
22
#include <algorithm>
23
#include <atomic>
24
#include <vector>
25
26
#include "common/status.h"
27
#include "core/block/block.h"
28
#include "io/cache/block_file_cache_profile.h"
29
#include "runtime/exec_env.h"
30
#include "runtime/runtime_state.h"
31
#include "storage/tablet/tablet.h"
32
#include "util/stopwatch.hpp"
33
34
namespace doris {
35
class RuntimeProfile;
36
class TupleDescriptor;
37
38
class VExprContext;
39
40
class ScanLocalStateBase;
41
} // namespace doris
42
43
namespace doris {
44
45
// Counter for load
46
struct ScannerCounter {
47
39
    ScannerCounter() : num_rows_filtered(0), num_rows_unselected(0) {}
48
49
    int64_t num_rows_filtered;   // unqualified rows (unmatched the dest schema, or no partition)
50
    int64_t num_rows_unselected; // rows filtered by predicates
51
};
52
53
class Scanner {
54
public:
55
    Scanner(RuntimeState* state, ScanLocalStateBase* local_state, int64_t limit,
56
            RuntimeProfile* profile);
57
58
    //only used for FileScanner read one line.
59
    Scanner(RuntimeState* state, RuntimeProfile* profile)
60
16
            : _state(state), _limit(1), _profile(profile), _total_rf_num(0), _has_prepared(false) {
61
16
        DorisMetrics::instance()->scanner_cnt->increment(1);
62
16
    };
63
64
36
    virtual ~Scanner() {
65
36
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_state->query_mem_tracker());
66
36
        _input_block.clear();
67
36
        _conjuncts.clear();
68
36
        _projections.clear();
69
36
        _origin_block.clear();
70
36
        _common_expr_ctxs_push_down.clear();
71
36
        DorisMetrics::instance()->scanner_cnt->increment(-1);
72
36
    }
73
74
    virtual Status init(RuntimeState* state, const VExprContextSPtrs& conjuncts);
75
0
    virtual Status prepare() {
76
0
        _has_prepared = true;
77
0
        return Status::OK();
78
0
    }
79
80
0
    Status open(RuntimeState* state) {
81
0
        SCOPED_RAW_TIMER(&_per_scanner_timer);
82
0
        return _open_impl(state);
83
0
    }
84
85
    Status get_block(RuntimeState* state, Block* block, bool* eos);
86
    Status get_block_after_projects(RuntimeState* state, Block* block, bool* eos);
87
88
    virtual Status close(RuntimeState* state);
89
90
    // Try to stop scanner, and all running readers.
91
0
    virtual void try_stop() { _should_stop = true; };
92
93
0
    virtual std::string get_name() { return ""; }
94
95
    // return the readable name of current scan range.
96
    // eg, for file scanner, return the current file path.
97
0
    virtual std::string get_current_scan_range_name() { return "not implemented"; }
98
99
protected:
100
0
    virtual Status _open_impl(RuntimeState* state) {
101
0
        _block_avg_bytes = state->batch_size() * 8;
102
0
        return Status::OK();
103
0
    }
104
105
    // Subclass should implement this to return data.
106
    virtual Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) = 0;
107
108
0
    Status _merge_padding_block() {
109
0
        if (_padding_block.empty()) {
110
0
            _padding_block.swap(_origin_block);
111
0
        } else if (_origin_block.rows()) {
112
0
            RETURN_IF_ERROR(
113
0
                    MutableBlock::build_mutable_block(&_padding_block).merge(_origin_block));
114
0
        }
115
0
        return Status::OK();
116
0
    }
117
118
    // Update the counters before closing this scanner
119
    virtual void _collect_profile_before_close();
120
121
    // Check if scanner is already closed, if not, mark it as closed.
122
    // Returns true if the scanner was successfully marked as closed (first time).
123
    // Returns false if the scanner was already closed.
124
    bool _try_close();
125
126
    // Filter the output block finally.
127
    Status _filter_output_block(Block* block);
128
129
    Status _do_projections(Block* origin_block, Block* output_block);
130
131
private:
132
    // Call start_wait_worker_timer() when submit the scanner to the thread pool.
133
    // And call update_wait_worker_timer() when it is actually being executed.
134
0
    void _start_wait_worker_timer() {
135
0
        _watch.reset();
136
0
        _watch.start();
137
0
    }
138
139
0
    void _start_scan_cpu_timer() {
140
0
        _cpu_watch.reset();
141
0
        _cpu_watch.start();
142
0
    }
143
144
0
    void _update_wait_worker_timer() { _scanner_wait_worker_timer += _watch.elapsed_time(); }
145
    void _update_scan_cpu_timer();
146
147
public:
148
0
    void resume() {
149
0
        _update_wait_worker_timer();
150
0
        _start_scan_cpu_timer();
151
0
    }
152
0
    void pause() {
153
0
        _update_scan_cpu_timer();
154
0
        _start_wait_worker_timer();
155
0
    }
156
0
    int64_t get_time_cost_ns() const { return _per_scanner_timer; }
157
158
0
    int64_t projection_time() const { return _projection_timer; }
159
0
    int64_t get_rows_read() const { return _num_rows_read; }
160
161
0
    bool has_prepared() const { return _has_prepared; }
162
163
    Status try_append_late_arrival_runtime_filter();
164
165
0
    int64_t get_scanner_wait_worker_timer() const { return _scanner_wait_worker_timer; }
166
167
    // Some counters need to be updated realtime, for example, workload group policy need
168
    // scan bytes to cancel the query exceed limit.
169
0
    virtual void update_realtime_counters() {}
170
171
330
    RuntimeState* runtime_state() { return _state; }
172
173
0
    bool is_open() const { return _is_open; }
174
0
    void set_opened() { _is_open = true; }
175
176
0
    virtual doris::TabletStorageType get_storage_type() {
177
0
        return doris::TabletStorageType::STORAGE_TYPE_REMOTE;
178
0
    }
179
180
0
    bool need_to_close() const { return _need_to_close; }
181
182
0
    void mark_to_need_to_close() {
183
        // If the scanner is failed during init or open, then not need update counters
184
        // because the query is fail and the counter is useless. And it may core during
185
        // update counters. For example, update counters depend on scanner's tablet, but
186
        // the tablet == null when init failed.
187
0
        if (_is_open) {
188
0
            _collect_profile_before_close();
189
0
        }
190
0
        _need_to_close = true;
191
0
    }
192
193
0
    void set_status_on_failure(const Status& st) { _status = st; }
194
195
0
    int64_t limit() const { return _limit; }
196
197
0
    auto get_block_avg_bytes() const { return _block_avg_bytes; }
198
199
0
    void update_block_avg_bytes(size_t block_avg_bytes) { _block_avg_bytes = block_avg_bytes; }
200
201
0
    virtual bool is_slow_task() const { return false; }
202
203
protected:
204
    RuntimeState* _state = nullptr;
205
    ScanLocalStateBase* _local_state = nullptr;
206
207
    // Set if scan node has sort limit info
208
    int64_t _limit = -1;
209
210
    RuntimeProfile* _profile = nullptr;
211
212
    const TupleDescriptor* _output_tuple_desc = nullptr;
213
    const RowDescriptor* _output_row_descriptor = nullptr;
214
215
    // If _input_tuple_desc is set, the scanner will read data into
216
    // this _input_block first, then convert to the output block.
217
    Block _input_block;
218
219
    bool _is_open = false;
220
    std::atomic<bool> _is_closed {false};
221
    bool _need_to_close = false;
222
    Status _status;
223
224
    // If _applied_rf_num == _total_rf_num
225
    // means all runtime filters are arrived and applied.
226
    int _applied_rf_num = 0;
227
    int _total_rf_num = 0;
228
    // Cloned from _conjuncts of scan node.
229
    // It includes predicate in SQL and runtime filters.
230
    VExprContextSPtrs _conjuncts;
231
    VExprContextSPtrs _projections;
232
    // Used in common subexpression elimination to compute intermediate results.
233
    std::vector<VExprContextSPtrs> _intermediate_projections;
234
    Block _origin_block;
235
    Block _padding_block;
236
    bool _alreay_eos = false;
237
238
    VExprContextSPtrs _common_expr_ctxs_push_down;
239
240
    // num of rows read from scanner
241
    int64_t _num_rows_read = 0;
242
243
    int64_t _num_byte_read = 0;
244
245
    // num of rows return from scanner, after filter block
246
    int64_t _num_rows_return = 0;
247
248
    size_t _block_avg_bytes = 0;
249
250
    // Set true after counter is updated finally
251
    bool _has_updated_counter = false;
252
253
    // watch to count the time wait for scanner thread
254
    MonotonicStopWatch _watch;
255
    // Do not use ScopedTimer. There is no guarantee that, the counter
256
    ThreadCpuStopWatch _cpu_watch;
257
    int64_t _scanner_wait_worker_timer = 0;
258
    int64_t _scan_cpu_timer = 0;
259
260
    bool _is_load = false;
261
262
    bool _has_prepared = false;
263
264
    ScannerCounter _counter;
265
    int64_t _per_scanner_timer = 0;
266
    int64_t _projection_timer = 0;
267
268
    bool _should_stop = false;
269
    std::unique_ptr<io::FileCacheProfileReporter> _file_cache_profile_reporter = nullptr;
270
    const int64_t _remote_slow_task_threshold = 0;
271
};
272
273
using ScannerSPtr = std::shared_ptr<Scanner>;
274
275
} // namespace doris