Coverage Report

Created: 2025-09-12 18:53

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/exec/schema_scanner.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/Data_types.h>
21
#include <gen_cpp/Descriptors_types.h>
22
23
#include <condition_variable>
24
#include <cstddef>
25
#include <cstdint>
26
#include <memory>
27
#include <string>
28
#include <vector>
29
30
#include "cctz/time_zone.h"
31
#include "common/factory_creator.h"
32
#include "common/status.h"
33
#include "runtime/define_primitive_type.h"
34
#include "util/runtime_profile.h"
35
36
namespace doris {
37
38
// forehead declare class, because jni function init in DorisServer.
39
40
class RuntimeState;
41
class ObjectPool;
42
class TUserIdentity;
43
44
namespace vectorized {
45
class Block;
46
}
47
48
namespace pipeline {
49
class Dependency;
50
}
51
52
struct SchemaScannerCommonParam {
53
    SchemaScannerCommonParam()
54
0
            : db(nullptr),
55
0
              table(nullptr),
56
0
              wild(nullptr),
57
0
              user(nullptr),
58
0
              user_ip(nullptr),
59
0
              current_user_ident(nullptr),
60
0
              frontend_conjuncts(nullptr),
61
0
              ip(nullptr),
62
0
              port(0),
63
0
              catalog(nullptr) {}
64
    const std::string* db = nullptr;
65
    const std::string* table = nullptr;
66
    const std::string* wild = nullptr;
67
    const std::string* user = nullptr;                 // deprecated
68
    const std::string* user_ip = nullptr;              // deprecated
69
    const TUserIdentity* current_user_ident = nullptr; // to replace the user and user ip
70
    const std::string* frontend_conjuncts = nullptr;   // frontend_conjuncts
71
    const std::string* ip = nullptr;                   // frontend ip
72
    int32_t port;                                      // frontend thrift port
73
    int64_t thread_id;
74
    const std::string* catalog = nullptr;
75
    std::set<TNetworkAddress> fe_addr_list;
76
};
77
78
// scanner parameter from frontend
79
struct SchemaScannerParam {
80
    std::shared_ptr<SchemaScannerCommonParam> common_param;
81
    std::unique_ptr<RuntimeProfile> profile;
82
83
0
    SchemaScannerParam() : common_param(new SchemaScannerCommonParam()) {}
84
};
85
86
// virtual scanner for all schema table
87
class SchemaScanner {
88
public:
89
    struct ColumnDesc {
90
        const char* name = nullptr;
91
        PrimitiveType type;
92
        int size;
93
        bool is_null;
94
        /// Only set if type == TYPE_DECIMAL
95
        int precision = -1;
96
        /// Only set if type == TYPE_DECIMAL or DATETIMEV2
97
        int scale = -1;
98
    };
99
    SchemaScanner(const std::vector<ColumnDesc>& columns,
100
                  TSchemaTableType::type type = TSchemaTableType::SCH_INVALID);
101
    virtual ~SchemaScanner();
102
103
    // init object need information, schema etc.
104
    virtual Status init(RuntimeState* state, SchemaScannerParam* param, ObjectPool* pool);
105
    Status get_next_block(RuntimeState* state, vectorized::Block* block, bool* eos);
106
    // Start to work
107
    virtual Status start(RuntimeState* state);
108
    virtual Status get_next_block_internal(vectorized::Block* block, bool* eos) = 0;
109
1
    const std::vector<ColumnDesc>& get_column_desc() const { return _columns; }
110
    // factory function
111
    static std::unique_ptr<SchemaScanner> create(TSchemaTableType::type type);
112
0
    TSchemaTableType::type type() const { return _schema_table_type; }
113
0
    void set_dependency(std::shared_ptr<pipeline::Dependency> dep) { _dependency = dep; }
114
    Status get_next_block_async(RuntimeState* state);
115
116
protected:
117
    void _init_block(vectorized::Block* src_block);
118
    Status fill_dest_column_for_range(vectorized::Block* block, size_t pos,
119
                                      const std::vector<void*>& datas);
120
121
    Status insert_block_column(TCell cell, int col_index, vectorized::Block* block,
122
                               PrimitiveType type);
123
124
    // get dbname from catalogname.dbname
125
    // if full_name does not have catalog part, just return origin name.
126
    std::string get_db_from_full_name(const std::string& full_name);
127
128
    bool _is_init;
129
    // this is used for sub class
130
    SchemaScannerParam* _param = nullptr;
131
    // schema table's column desc
132
    std::vector<ColumnDesc> _columns;
133
134
    TSchemaTableType::type _schema_table_type;
135
136
    RuntimeProfile::Counter* _get_db_timer = nullptr;
137
    RuntimeProfile::Counter* _get_table_timer = nullptr;
138
    RuntimeProfile::Counter* _get_describe_timer = nullptr;
139
    RuntimeProfile::Counter* _fill_block_timer = nullptr;
140
141
    std::shared_ptr<pipeline::Dependency> _dependency = nullptr;
142
143
    std::unique_ptr<vectorized::Block> _data_block;
144
    AtomicStatus _scanner_status;
145
    std::atomic<bool> _eos = false;
146
    std::atomic<bool> _opened = false;
147
    std::atomic<bool> _async_thread_running = false;
148
    cctz::time_zone _timezone_obj;
149
};
150
151
} // namespace doris