Coverage Report

Created: 2024-11-21 14:46

/root/doris/be/src/olap/push_handler.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <butil/macros.h>
21
#include <gen_cpp/AgentService_types.h>
22
#include <gen_cpp/Exprs_types.h>
23
#include <stdint.h>
24
25
#include <memory>
26
#include <string>
27
#include <vector>
28
29
#include "common/factory_creator.h"
30
#include "common/object_pool.h"
31
#include "common/status.h"
32
#include "exec/olap_common.h"
33
#include "olap/olap_common.h"
34
#include "olap/rowset/pending_rowset_helper.h"
35
#include "olap/rowset/rowset.h"
36
#include "olap/tablet.h"
37
#include "olap/tablet_schema.h"
38
#include "runtime/runtime_state.h"
39
#include "vec/exec/format/generic_reader.h"
40
41
namespace doris {
42
43
class DescriptorTbl;
44
class RuntimeProfile;
45
class Schema;
46
class TBrokerScanRange;
47
class TDescriptorTable;
48
class TTabletInfo;
49
50
namespace vectorized {
51
class Block;
52
class GenericReader;
53
class VExprContext;
54
} // namespace vectorized
55
56
class PushHandler {
57
public:
58
0
    PushHandler() = default;
59
0
    ~PushHandler() = default;
60
61
    // Load local data file into specified tablet.
62
    Status process_streaming_ingestion(TabletSharedPtr tablet, const TPushReq& request,
63
                                       PushType push_type,
64
                                       std::vector<TTabletInfo>* tablet_info_vec);
65
66
0
    int64_t write_bytes() const { return _write_bytes; }
67
0
    int64_t write_rows() const { return _write_rows; }
68
69
private:
70
    Status _convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur_rowset,
71
                       TabletSchemaSPtr tablet_schema, PushType push_type);
72
73
    // Only for debug
74
    std::string _debug_version_list(const Versions& versions) const;
75
76
    Status _do_streaming_ingestion(TabletSharedPtr tablet, const TPushReq& request,
77
                                   PushType push_type, std::vector<TTabletInfo>* tablet_info_vec);
78
79
    // mainly tablet_id, version and delta file path
80
    TPushReq _request;
81
82
    ObjectPool _pool;
83
    DescriptorTbl* _desc_tbl = nullptr;
84
85
    int64_t _write_bytes = 0;
86
    int64_t _write_rows = 0;
87
    PendingRowsetGuard _pending_rs_guard;
88
    DISALLOW_COPY_AND_ASSIGN(PushHandler);
89
};
90
91
class PushBrokerReader {
92
    ENABLE_FACTORY_CREATOR(PushBrokerReader);
93
94
public:
95
    PushBrokerReader(const Schema* schema, const TBrokerScanRange& t_scan_range,
96
                     const TDescriptorTable& t_desc_tbl);
97
0
    ~PushBrokerReader() = default;
98
    Status init();
99
    Status next(vectorized::Block* block);
100
    void print_profile();
101
102
    Status close();
103
0
    bool eof() const { return _eof; }
104
105
protected:
106
    Status _get_next_reader();
107
    Status _init_src_block();
108
    Status _cast_to_input_block();
109
    Status _convert_to_output_block(vectorized::Block* block);
110
    Status _init_expr_ctxes();
111
112
private:
113
    bool _ready;
114
    bool _eof;
115
    int _next_range;
116
    vectorized::Block* _src_block_ptr = nullptr;
117
    vectorized::Block _src_block;
118
    const TDescriptorTable& _t_desc_tbl;
119
    std::unordered_map<std::string, TypeDescriptor> _name_to_col_type;
120
    std::unordered_set<std::string> _missing_cols;
121
    std::unordered_map<std::string, size_t> _src_block_name_to_idx;
122
    vectorized::VExprContextSPtrs _dest_expr_ctxs;
123
    vectorized::VExprContextSPtr _pre_filter_ctx_ptr;
124
    std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest;
125
    std::unordered_map<int, int> _dest_slot_to_src_slot_index;
126
127
    std::vector<SlotDescriptor*> _src_slot_descs;
128
    std::unique_ptr<RowDescriptor> _row_desc;
129
    const TupleDescriptor* _dest_tuple_desc = nullptr;
130
131
    std::unique_ptr<RuntimeState> _runtime_state;
132
    RuntimeProfile* _runtime_profile = nullptr;
133
    std::unique_ptr<vectorized::GenericReader> _cur_reader;
134
    bool _cur_reader_eof;
135
    const TBrokerScanRangeParams& _params;
136
    const std::vector<TBrokerRangeDesc>& _ranges;
137
    TFileScanRangeParams _file_params;
138
    std::vector<TFileRangeDesc> _file_ranges;
139
140
    std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics;
141
    std::unique_ptr<io::IOContext> _io_ctx;
142
143
    // col names from _slot_descs
144
    std::vector<std::string> _all_col_names;
145
    std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
146
    vectorized::VExprContextSPtrs _push_down_exprs;
147
    const std::unordered_map<std::string, int>* _col_name_to_slot_id;
148
    // single slot filter conjuncts
149
    std::unordered_map<int, vectorized::VExprContextSPtrs> _slot_id_to_filter_conjuncts;
150
    // not single(zero or multi) slot filter conjuncts
151
    vectorized::VExprContextSPtrs _not_single_slot_filter_conjuncts;
152
    // File source slot descriptors
153
    std::vector<SlotDescriptor*> _file_slot_descs;
154
    // row desc for default exprs
155
    std::unique_ptr<RowDescriptor> _default_val_row_desc;
156
    const TupleDescriptor* _real_tuple_desc = nullptr;
157
158
    // Not used, just for placeholding
159
    std::vector<TExpr> _pre_filter_texprs;
160
};
161
162
} // namespace doris