Coverage Report

Created: 2025-03-29 15:49

/root/doris/be/src/exec/schema_scanner.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/Data_types.h>
21
#include <gen_cpp/Descriptors_types.h>
22
#include <stddef.h>
23
#include <stdint.h>
24
25
#include <condition_variable>
26
#include <memory>
27
#include <string>
28
#include <vector>
29
30
#include "common/factory_creator.h"
31
#include "common/status.h"
32
#include "runtime/define_primitive_type.h"
33
#include "util/runtime_profile.h"
34
35
namespace doris {
36
37
// forehead declare class, because jni function init in DorisServer.
38
39
class RuntimeState;
40
class ObjectPool;
41
class TUserIdentity;
42
43
namespace vectorized {
44
class Block;
45
}
46
47
namespace pipeline {
48
class Dependency;
49
}
50
51
struct SchemaScannerCommonParam {
52
    SchemaScannerCommonParam()
53
            : db(nullptr),
54
              table(nullptr),
55
              wild(nullptr),
56
              user(nullptr),
57
              user_ip(nullptr),
58
              current_user_ident(nullptr),
59
              ip(nullptr),
60
              port(0),
61
0
              catalog(nullptr) {}
62
    const std::string* db = nullptr;
63
    const std::string* table = nullptr;
64
    const std::string* wild = nullptr;
65
    const std::string* user = nullptr;                 // deprecated
66
    const std::string* user_ip = nullptr;              // deprecated
67
    const TUserIdentity* current_user_ident = nullptr; // to replace the user and user ip
68
    const std::string* ip = nullptr;                   // frontend ip
69
    int32_t port;                                      // frontend thrift port
70
    int64_t thread_id;
71
    const std::string* catalog = nullptr;
72
    std::set<TNetworkAddress> fe_addr_list;
73
};
74
75
// scanner parameter from frontend
76
struct SchemaScannerParam {
77
    std::shared_ptr<SchemaScannerCommonParam> common_param;
78
    std::unique_ptr<RuntimeProfile> profile;
79
80
0
    SchemaScannerParam() : common_param(new SchemaScannerCommonParam()) {}
81
};
82
83
// virtual scanner for all schema table
84
class SchemaScanner {
85
    ENABLE_FACTORY_CREATOR(SchemaScanner);
86
87
public:
88
    struct ColumnDesc {
89
        const char* name = nullptr;
90
        PrimitiveType type;
91
        int size;
92
        bool is_null;
93
        /// Only set if type == TYPE_DECIMAL or DATETIMEV2
94
        int precision = -1;
95
        int scale = -1;
96
    };
97
    SchemaScanner(const std::vector<ColumnDesc>& columns);
98
    SchemaScanner(const std::vector<ColumnDesc>& columns, TSchemaTableType::type type);
99
    virtual ~SchemaScanner();
100
101
    // init object need information, schema etc.
102
    virtual Status init(SchemaScannerParam* param, ObjectPool* pool);
103
    Status get_next_block(RuntimeState* state, vectorized::Block* block, bool* eos);
104
    // Start to work
105
    virtual Status start(RuntimeState* state);
106
    virtual Status get_next_block_internal(vectorized::Block* block, bool* eos);
107
0
    const std::vector<ColumnDesc>& get_column_desc() const { return _columns; }
108
    // factory function
109
    static std::unique_ptr<SchemaScanner> create(TSchemaTableType::type type);
110
0
    TSchemaTableType::type type() const { return _schema_table_type; }
111
0
    void set_dependency(std::shared_ptr<pipeline::Dependency> dep) { _dependency = dep; }
112
    Status get_next_block_async(RuntimeState* state);
113
114
protected:
115
    void _init_block(vectorized::Block* src_block);
116
    Status fill_dest_column_for_range(vectorized::Block* block, size_t pos,
117
                                      const std::vector<void*>& datas);
118
119
    Status insert_block_column(TCell cell, int col_index, vectorized::Block* block,
120
                               PrimitiveType type);
121
122
    // get dbname from catalogname.dbname
123
    // if full_name does not have catalog part, just return origin name.
124
    std::string get_db_from_full_name(const std::string& full_name);
125
126
    bool _is_init;
127
    // this is used for sub class
128
    SchemaScannerParam* _param = nullptr;
129
    // schema table's column desc
130
    std::vector<ColumnDesc> _columns;
131
132
    TSchemaTableType::type _schema_table_type;
133
134
    RuntimeProfile::Counter* _get_db_timer = nullptr;
135
    RuntimeProfile::Counter* _get_table_timer = nullptr;
136
    RuntimeProfile::Counter* _get_describe_timer = nullptr;
137
    RuntimeProfile::Counter* _fill_block_timer = nullptr;
138
139
    std::shared_ptr<pipeline::Dependency> _dependency = nullptr;
140
141
    std::unique_ptr<vectorized::Block> _data_block;
142
    AtomicStatus _scanner_status;
143
    std::atomic<bool> _eos = false;
144
    std::atomic<bool> _opened = false;
145
    std::atomic<bool> _async_thread_running = false;
146
};
147
148
} // namespace doris