Coverage Report

Created: 2026-07-02 13:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/file_scanner_v2.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <map>
21
#include <memory>
22
#include <optional>
23
#include <string>
24
#include <unordered_map>
25
#include <vector>
26
27
#include "common/factory_creator.h"
28
#include "common/status.h"
29
#include "core/block/block.h"
30
#include "exec/operator/file_scan_operator.h"
31
#include "exec/scan/scanner.h"
32
#include "exec/scan/split_source_connector.h"
33
#include "exprs/vexpr_fwd.h"
34
#include "format_v2/column_mapper.h"
35
#include "format_v2/table_reader.h"
36
#include "gen_cpp/Descriptors_types.h"
37
#include "gen_cpp/PlanNodes_types.h"
38
#include "io/io_common.h"
39
#include "runtime/runtime_profile.h"
40
#include "storage/segment/adaptive_block_size_predictor.h"
41
42
namespace doris {
43
44
class RuntimeState;
45
class SlotDescriptor;
46
class TFileRangeDesc;
47
class TFileScanRangeParams;
48
class ShardedKVCache;
49
50
class FileScannerV2 final : public Scanner {
51
    ENABLE_FACTORY_CREATOR(FileScannerV2);
52
53
public:
54
    static constexpr const char* NAME = "FileScannerV2";
55
    static constexpr size_t ADAPTIVE_BATCH_INITIAL_PROBE_ROWS = 32;
56
57
    static bool is_supported(const TFileScanRangeParams& params, const TFileRangeDesc& range);
58
#ifdef BE_TEST
59
    static Status TEST_to_file_format(TFileFormatType::type format_type,
60
                                      format::FileFormat* file_format);
61
    static bool TEST_is_partition_slot(const TFileScanSlotInfo& slot_info,
62
                                       const std::string& column_name);
63
    static bool TEST_is_data_file_slot(const TFileScanSlotInfo& slot_info,
64
                                       const std::string& column_name);
65
    static Status TEST_rewrite_slot_refs_to_global_index(
66
            VExprSPtr* expr,
67
            const std::unordered_map<int32_t, format::GlobalIndex>& slot_id_to_global_index);
68
#endif
69
70
    FileScannerV2(RuntimeState* state, FileScanLocalState* parent, int64_t limit,
71
                  std::shared_ptr<SplitSourceConnector> split_source, RuntimeProfile* profile,
72
                  ShardedKVCache* kv_cache,
73
                  const std::unordered_map<std::string, int>* colname_to_slot_id);
74
75
    Status init(RuntimeState* state, const VExprContextSPtrs& conjuncts) override;
76
    Status _open_impl(RuntimeState* state) override;
77
    Status close(RuntimeState* state) override;
78
    void try_stop() override;
79
0
    std::string get_name() override { return FileScannerV2::NAME; }
80
0
    std::string get_current_scan_range_name() override { return _current_range_path; }
81
    void update_realtime_counters() override;
82
83
protected:
84
    Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override;
85
    void _collect_profile_before_close() override;
86
    bool _should_update_load_counters() const override;
87
88
private:
89
    TFileFormatType::type _get_current_format_type() const;
90
    Status _init_io_ctx();
91
    Status _init_expr_ctxes();
92
    Status _prepare_next_split(bool* eos);
93
    Status _init_table_reader(const TFileRangeDesc& range);
94
    Status _create_table_reader_for_format(const TFileRangeDesc& range,
95
                                           std::unique_ptr<format::TableReader>* reader) const;
96
    Status _prepare_table_reader_split(const TFileRangeDesc& range);
97
    bool _should_enable_file_meta_cache() const;
98
    std::optional<format::GlobalRowIdContext> _create_global_rowid_context(
99
            const TFileRangeDesc& range) const;
100
    Status _generate_partition_values(const TFileRangeDesc& range,
101
                                      std::map<std::string, Field>* partition_values) const;
102
    Status _parse_partition_value(const SlotDescriptor* slot_desc, const std::string& value,
103
                                  bool is_null, Field* field) const;
104
    Status _build_projected_columns(const format::TableReader& table_reader);
105
    Status _build_default_expr(const TFileScanSlotInfo& slot_info, VExprContextSPtr* ctx) const;
106
    static format::ColumnDefinition _build_table_column(const SlotDescriptor* slot_desc);
107
    Status _build_table_column_predicates(format::TableColumnPredicates* predicates) const;
108
    Status _build_table_conjuncts(VExprContextSPtrs* conjuncts) const;
109
    static Status _to_file_format(TFileFormatType::type format_type,
110
                                  format::FileFormat* file_format);
111
    void _reset_adaptive_batch_size_state();
112
    void _init_adaptive_batch_size_state(TFileFormatType::type format_type);
113
    bool _should_enable_adaptive_batch_size(TFileFormatType::type format_type) const;
114
    bool _should_run_adaptive_batch_size() const;
115
    size_t _predict_reader_batch_rows();
116
    void _update_adaptive_batch_size(const Block& block);
117
    void _report_file_reader_predicate_filtered_rows();
118
    void _report_condition_cache_profile();
119
120
    struct PartitionSlotInfo {
121
        const SlotDescriptor* slot_desc = nullptr;
122
        std::string canonical_name;
123
    };
124
125
    const TFileScanRangeParams* _params = nullptr;
126
    std::shared_ptr<SplitSourceConnector> _split_source;
127
    bool _first_scan_range = false;
128
    bool _has_prepared_split = false;
129
    TFileRangeDesc _current_range;
130
    std::string _current_range_path;
131
132
    std::unique_ptr<format::TableReader> _table_reader;
133
    std::vector<format::ColumnDefinition> _projected_columns;
134
    // File formats without embedded schema, such as CSV, still need the FE slot descriptors in
135
    // file-column order. This mirrors old FileScanner::_file_slot_descs and is passed only to
136
    // readers that cannot derive their schema from file metadata.
137
    std::vector<SlotDescriptor*> _file_slot_descs;
138
    bool _need_global_rowid_column = false;
139
    std::unordered_map<int32_t, const SlotDescriptor*> _slot_id_to_desc;
140
    std::unordered_map<int32_t, format::GlobalIndex> _slot_id_to_global_index;
141
    std::unordered_map<std::string, PartitionSlotInfo> _partition_slot_descs;
142
143
    std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics;
144
    std::unique_ptr<io::FileReaderStats> _file_reader_stats;
145
    std::shared_ptr<io::IOContext> _io_ctx;
146
    ShardedKVCache* _kv_cache = nullptr;
147
148
    RuntimeProfile::Counter* _get_block_timer = nullptr;
149
    RuntimeProfile::Counter* _file_counter = nullptr;
150
    RuntimeProfile::Counter* _file_read_bytes_counter = nullptr;
151
    RuntimeProfile::Counter* _file_read_calls_counter = nullptr;
152
    RuntimeProfile::Counter* _file_read_time_counter = nullptr;
153
    RuntimeProfile::Counter* _adaptive_batch_predicted_rows_counter = nullptr;
154
    RuntimeProfile::Counter* _adaptive_batch_actual_bytes_counter = nullptr;
155
    RuntimeProfile::Counter* _adaptive_batch_probe_count_counter = nullptr;
156
    std::unique_ptr<AdaptiveBlockSizePredictor> _block_size_predictor;
157
    int64_t _reported_predicate_filtered_rows = 0;
158
    int64_t _reported_condition_cache_hit_count = 0;
159
    int64_t _reported_condition_cache_filtered_rows = 0;
160
};
161
162
} // namespace doris