Coverage Report

Created: 2026-03-13 19:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/connector/vjdbc_connector.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Types_types.h>
22
#include <jni.h>
23
#include <stdint.h>
24
25
#include <map>
26
#include <string>
27
#include <vector>
28
29
#include "common/status.h"
30
#include "core/data_type/data_type.h"
31
#include "exec/table_connector.h"
32
#include "exprs/aggregate/aggregate_function.h"
33
#include "util/jni-util.h"
34
35
namespace doris {
36
class RuntimeState;
37
class SlotDescriptor;
38
class TupleDescriptor;
39
40
class Block;
41
class IColumn;
42
class VExprContext;
43
44
struct JdbcConnectorParam {
45
    // use -1 as default value to find error earlier.
46
    int64_t catalog_id = -1;
47
    std::string driver_path;
48
    std::string driver_class;
49
    std::string resource_name;
50
    std::string driver_checksum;
51
    std::string jdbc_url;
52
    std::string user;
53
    std::string passwd;
54
    std::string query_string;
55
    std::string table_name;
56
    bool use_transaction = false;
57
    TOdbcTableType::type table_type;
58
    bool is_tvf = false;
59
    int32_t connection_pool_min_size = -1;
60
    int32_t connection_pool_max_size = -1;
61
    int32_t connection_pool_max_wait_time = -1;
62
    int32_t connection_pool_max_life_time = -1;
63
    bool connection_pool_keep_alive = false;
64
65
    const TupleDescriptor* tuple_desc = nullptr;
66
};
67
68
class JdbcConnector : public TableConnector {
69
public:
70
    struct JdbcStatistic {
71
        int64_t _load_jar_timer = 0;
72
        int64_t _init_connector_timer = 0;
73
        int64_t _get_data_timer = 0;
74
        int64_t _read_and_fill_vector_table_timer = 0;
75
        int64_t _jni_setup_timer = 0;
76
        int64_t _has_next_timer = 0;
77
        int64_t _prepare_params_timer = 0;
78
        int64_t _fill_block_timer = 0;
79
        int64_t _cast_timer = 0;
80
        int64_t _check_type_timer = 0;
81
        int64_t _execte_read_timer = 0;
82
        int64_t _connector_close_timer = 0;
83
    };
84
85
    JdbcConnector(const JdbcConnectorParam& param);
86
87
    ~JdbcConnector() override;
88
89
    Status open(RuntimeState* state, bool read = false);
90
91
    Status query() override;
92
93
    Status get_next(bool* eos, Block* block, int batch_size);
94
95
    Status append(Block* block, const VExprContextSPtrs& _output_vexpr_ctxs,
96
                  uint32_t start_send_row, uint32_t* num_rows_sent,
97
                  TOdbcTableType::type table_type = TOdbcTableType::MYSQL) override;
98
99
    Status exec_stmt_write(Block* block, const VExprContextSPtrs& output_vexpr_ctxs,
100
                           uint32_t* num_rows_sent) override;
101
102
    // use in JDBC transaction
103
    Status begin_trans() override; // should be call after connect and before query or init_to_write
104
    Status abort_trans() override; // should be call after transaction abort
105
    Status finish_trans() override; // should be call after transaction commit
106
107
0
    Status init_to_write(doris::RuntimeProfile* operator_profile) override {
108
0
        init_profile(operator_profile);
109
0
        return Status::OK();
110
0
    }
111
112
0
    JdbcStatistic& get_jdbc_statistic() { return _jdbc_statistic; }
113
114
    Status close(Status s = Status::OK()) override;
115
116
    Status test_connection();
117
    Status clean_datasource();
118
119
protected:
120
    JdbcConnectorParam _conn_param;
121
122
private:
123
    Status _register_func_id(JNIEnv* env);
124
125
    Status _get_reader_params(Block* block, JNIEnv* env, size_t column_size, Jni::LocalObject* ans);
126
127
    Status _cast_string_to_special(Block* block, JNIEnv* env, size_t column_size);
128
    Status _cast_string_to_hll(const SlotDescriptor* slot_desc, Block* block, int column_index,
129
                               int rows);
130
    Status _cast_string_to_bitmap(const SlotDescriptor* slot_desc, Block* block, int column_index,
131
                                  int rows);
132
    Status _cast_string_to_json(const SlotDescriptor* slot_desc, Block* block, int column_index,
133
                                int rows);
134
135
    Status _get_java_table_type(JNIEnv* env, TOdbcTableType::type table_type,
136
                                Jni::LocalObject* java_enum_obj);
137
138
    Status _get_real_url(const std::string& url, std::string* result_url);
139
    Status _check_and_return_default_driver_url(const std::string& url, std::string* result_url);
140
141
    bool _closed = false;
142
143
    Jni::GlobalClass _executor_factory_clazz;
144
    Jni::GlobalClass _executor_clazz;
145
    Jni::GlobalObject _executor_obj;
146
    Jni::MethodId _executor_factory_ctor_id;
147
    Jni::MethodId _executor_ctor_id;
148
    Jni::MethodId _executor_stmt_write_id;
149
    Jni::MethodId _executor_read_id;
150
    Jni::MethodId _executor_has_next_id;
151
    Jni::MethodId _executor_get_block_address_id;
152
    Jni::MethodId _executor_block_rows_id;
153
    Jni::MethodId _executor_close_id;
154
    Jni::MethodId _executor_begin_trans_id;
155
    Jni::MethodId _executor_finish_trans_id;
156
    Jni::MethodId _executor_abort_trans_id;
157
    Jni::MethodId _executor_test_connection_id;
158
    Jni::MethodId _executor_clean_datasource_id;
159
160
    std::map<int, int> _map_column_idx_to_cast_idx_hll;
161
    std::vector<DataTypePtr> _input_hll_string_types;
162
163
    std::map<int, int> _map_column_idx_to_cast_idx_bitmap;
164
    std::vector<DataTypePtr> _input_bitmap_string_types;
165
166
    std::map<int, int> _map_column_idx_to_cast_idx_json;
167
    std::vector<DataTypePtr> _input_json_string_types;
168
169
    JdbcStatistic _jdbc_statistic;
170
};
171
172
} // namespace doris