Coverage Report

Created: 2024-11-21 18:14

/root/doris/be/src/exec/schema_scanner.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/Data_types.h>
21
#include <gen_cpp/Descriptors_types.h>
22
23
#include <condition_variable>
24
#include <cstddef>
25
#include <cstdint>
26
#include <memory>
27
#include <string>
28
#include <vector>
29
30
#include "common/factory_creator.h"
31
#include "common/status.h"
32
#include "runtime/define_primitive_type.h"
33
#include "util/runtime_profile.h"
34
35
namespace doris {
36
37
// forehead declare class, because jni function init in DorisServer.
38
39
class RuntimeState;
40
class ObjectPool;
41
class TUserIdentity;
42
43
namespace vectorized {
44
class Block;
45
}
46
47
namespace pipeline {
48
class Dependency;
49
}
50
51
struct SchemaScannerCommonParam {
52
    SchemaScannerCommonParam()
53
            : db(nullptr),
54
              table(nullptr),
55
              wild(nullptr),
56
              user(nullptr),
57
              user_ip(nullptr),
58
              current_user_ident(nullptr),
59
              ip(nullptr),
60
              port(0),
61
0
              catalog(nullptr) {}
62
    const std::string* db = nullptr;
63
    const std::string* table = nullptr;
64
    const std::string* wild = nullptr;
65
    const std::string* user = nullptr;                 // deprecated
66
    const std::string* user_ip = nullptr;              // deprecated
67
    const TUserIdentity* current_user_ident = nullptr; // to replace the user and user ip
68
    const std::string* ip = nullptr;                   // frontend ip
69
    int32_t port;                                      // frontend thrift port
70
    int64_t thread_id;
71
    const std::string* catalog = nullptr;
72
    std::set<TNetworkAddress> fe_addr_list;
73
};
74
75
// scanner parameter from frontend
76
struct SchemaScannerParam {
77
    std::shared_ptr<SchemaScannerCommonParam> common_param;
78
    std::unique_ptr<RuntimeProfile> profile;
79
80
0
    SchemaScannerParam() : common_param(new SchemaScannerCommonParam()) {}
81
};
82
83
// virtual scanner for all schema table
84
class SchemaScanner {
85
public:
86
    struct ColumnDesc {
87
        const char* name = nullptr;
88
        PrimitiveType type;
89
        int size;
90
        bool is_null;
91
        /// Only set if type == TYPE_DECIMAL or DATETIMEV2
92
        int precision = -1;
93
        int scale = -1;
94
    };
95
    SchemaScanner(const std::vector<ColumnDesc>& columns,
96
                  TSchemaTableType::type type = TSchemaTableType::SCH_INVALID);
97
    virtual ~SchemaScanner();
98
99
    // init object need information, schema etc.
100
    virtual Status init(SchemaScannerParam* param, ObjectPool* pool);
101
    Status get_next_block(RuntimeState* state, vectorized::Block* block, bool* eos);
102
    // Start to work
103
    virtual Status start(RuntimeState* state);
104
    virtual Status get_next_block_internal(vectorized::Block* block, bool* eos) = 0;
105
0
    const std::vector<ColumnDesc>& get_column_desc() const { return _columns; }
106
    // factory function
107
    static std::unique_ptr<SchemaScanner> create(TSchemaTableType::type type);
108
0
    TSchemaTableType::type type() const { return _schema_table_type; }
109
    void set_dependency(std::shared_ptr<pipeline::Dependency> dep,
110
0
                        std::shared_ptr<pipeline::Dependency> fin_dep) {
111
0
        _dependency = dep;
112
0
        _finish_dependency = fin_dep;
113
0
    }
114
    Status get_next_block_async(RuntimeState* state);
115
116
protected:
117
    void _init_block(vectorized::Block* src_block);
118
    Status fill_dest_column_for_range(vectorized::Block* block, size_t pos,
119
                                      const std::vector<void*>& datas);
120
121
    Status insert_block_column(TCell cell, int col_index, vectorized::Block* block,
122
                               PrimitiveType type);
123
124
    // get dbname from catalogname.dbname
125
    // if full_name does not have catalog part, just return origin name.
126
    std::string get_db_from_full_name(const std::string& full_name);
127
128
    bool _is_init;
129
    // this is used for sub class
130
    SchemaScannerParam* _param = nullptr;
131
    // schema table's column desc
132
    std::vector<ColumnDesc> _columns;
133
134
    TSchemaTableType::type _schema_table_type;
135
136
    RuntimeProfile::Counter* _get_db_timer = nullptr;
137
    RuntimeProfile::Counter* _get_table_timer = nullptr;
138
    RuntimeProfile::Counter* _get_describe_timer = nullptr;
139
    RuntimeProfile::Counter* _fill_block_timer = nullptr;
140
141
    std::shared_ptr<pipeline::Dependency> _dependency = nullptr;
142
    std::shared_ptr<pipeline::Dependency> _finish_dependency = nullptr;
143
144
    std::unique_ptr<vectorized::Block> _data_block;
145
    AtomicStatus _scanner_status;
146
    std::atomic<bool> _eos = false;
147
    std::atomic<bool> _opened = false;
148
    std::atomic<bool> _async_thread_running = false;
149
};
150
151
} // namespace doris