Coverage Report

Created: 2025-04-14 13:20

/root/doris/be/src/exec/schema_scanner.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/Data_types.h>
21
#include <gen_cpp/Descriptors_types.h>
22
23
#include <condition_variable>
24
#include <cstddef>
25
#include <cstdint>
26
#include <memory>
27
#include <string>
28
#include <vector>
29
30
#include "cctz/time_zone.h"
31
#include "common/factory_creator.h"
32
#include "common/status.h"
33
#include "runtime/define_primitive_type.h"
34
#include "util/runtime_profile.h"
35
36
namespace doris {
37
38
// forehead declare class, because jni function init in DorisServer.
39
40
class RuntimeState;
41
class ObjectPool;
42
class TUserIdentity;
43
44
namespace vectorized {
45
class Block;
46
}
47
48
namespace pipeline {
49
class Dependency;
50
}
51
52
struct SchemaScannerCommonParam {
53
    SchemaScannerCommonParam()
54
            : db(nullptr),
55
              table(nullptr),
56
              wild(nullptr),
57
              user(nullptr),
58
              user_ip(nullptr),
59
              current_user_ident(nullptr),
60
              ip(nullptr),
61
              port(0),
62
0
              catalog(nullptr) {}
63
    const std::string* db = nullptr;
64
    const std::string* table = nullptr;
65
    const std::string* wild = nullptr;
66
    const std::string* user = nullptr;                 // deprecated
67
    const std::string* user_ip = nullptr;              // deprecated
68
    const TUserIdentity* current_user_ident = nullptr; // to replace the user and user ip
69
    const std::string* ip = nullptr;                   // frontend ip
70
    int32_t port;                                      // frontend thrift port
71
    int64_t thread_id;
72
    const std::string* catalog = nullptr;
73
    std::set<TNetworkAddress> fe_addr_list;
74
};
75
76
// scanner parameter from frontend
77
struct SchemaScannerParam {
78
    std::shared_ptr<SchemaScannerCommonParam> common_param;
79
    std::unique_ptr<RuntimeProfile> profile;
80
81
0
    SchemaScannerParam() : common_param(new SchemaScannerCommonParam()) {}
82
};
83
84
// virtual scanner for all schema table
85
class SchemaScanner {
86
public:
87
    struct ColumnDesc {
88
        const char* name = nullptr;
89
        PrimitiveType type;
90
        int size;
91
        bool is_null;
92
        /// Only set if type == TYPE_DECIMAL
93
        int precision = -1;
94
        /// Only set if type == TYPE_DECIMAL or DATETIMEV2
95
        int scale = -1;
96
    };
97
    SchemaScanner(const std::vector<ColumnDesc>& columns,
98
                  TSchemaTableType::type type = TSchemaTableType::SCH_INVALID);
99
    virtual ~SchemaScanner();
100
101
    // init object need information, schema etc.
102
    virtual Status init(RuntimeState* state, SchemaScannerParam* param, ObjectPool* pool);
103
    Status get_next_block(RuntimeState* state, vectorized::Block* block, bool* eos);
104
    // Start to work
105
    virtual Status start(RuntimeState* state);
106
    virtual Status get_next_block_internal(vectorized::Block* block, bool* eos) = 0;
107
0
    const std::vector<ColumnDesc>& get_column_desc() const { return _columns; }
108
    // factory function
109
    static std::unique_ptr<SchemaScanner> create(TSchemaTableType::type type);
110
0
    TSchemaTableType::type type() const { return _schema_table_type; }
111
0
    void set_dependency(std::shared_ptr<pipeline::Dependency> dep) { _dependency = dep; }
112
    Status get_next_block_async(RuntimeState* state);
113
114
protected:
115
    void _init_block(vectorized::Block* src_block);
116
    Status fill_dest_column_for_range(vectorized::Block* block, size_t pos,
117
                                      const std::vector<void*>& datas);
118
119
    Status insert_block_column(TCell cell, int col_index, vectorized::Block* block,
120
                               PrimitiveType type);
121
122
    // get dbname from catalogname.dbname
123
    // if full_name does not have catalog part, just return origin name.
124
    std::string get_db_from_full_name(const std::string& full_name);
125
126
    bool _is_init;
127
    // this is used for sub class
128
    SchemaScannerParam* _param = nullptr;
129
    // schema table's column desc
130
    std::vector<ColumnDesc> _columns;
131
132
    TSchemaTableType::type _schema_table_type;
133
134
    RuntimeProfile::Counter* _get_db_timer = nullptr;
135
    RuntimeProfile::Counter* _get_table_timer = nullptr;
136
    RuntimeProfile::Counter* _get_describe_timer = nullptr;
137
    RuntimeProfile::Counter* _fill_block_timer = nullptr;
138
139
    std::shared_ptr<pipeline::Dependency> _dependency = nullptr;
140
141
    std::unique_ptr<vectorized::Block> _data_block;
142
    AtomicStatus _scanner_status;
143
    std::atomic<bool> _eos = false;
144
    std::atomic<bool> _opened = false;
145
    std::atomic<bool> _async_thread_running = false;
146
    cctz::time_zone _timezone_obj;
147
};
148
149
} // namespace doris