be/src/exec/connector/vjdbc_connector.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/Types_types.h> |
22 | | #include <jni.h> |
23 | | #include <stdint.h> |
24 | | |
25 | | #include <map> |
26 | | #include <string> |
27 | | #include <vector> |
28 | | |
29 | | #include "common/status.h" |
30 | | #include "core/data_type/data_type.h" |
31 | | #include "exec/table_connector.h" |
32 | | #include "exprs/aggregate/aggregate_function.h" |
33 | | #include "util/jni-util.h" |
34 | | |
35 | | namespace doris { |
36 | | class RuntimeState; |
37 | | class SlotDescriptor; |
38 | | class TupleDescriptor; |
39 | | |
40 | | class Block; |
41 | | class IColumn; |
42 | | class VExprContext; |
43 | | |
44 | | struct JdbcConnectorParam { |
45 | | // use -1 as default value to find error earlier. |
46 | | int64_t catalog_id = -1; |
47 | | std::string driver_path; |
48 | | std::string driver_class; |
49 | | std::string resource_name; |
50 | | std::string driver_checksum; |
51 | | std::string jdbc_url; |
52 | | std::string user; |
53 | | std::string passwd; |
54 | | std::string query_string; |
55 | | std::string table_name; |
56 | | bool use_transaction = false; |
57 | | TOdbcTableType::type table_type; |
58 | | bool is_tvf = false; |
59 | | int32_t connection_pool_min_size = -1; |
60 | | int32_t connection_pool_max_size = -1; |
61 | | int32_t connection_pool_max_wait_time = -1; |
62 | | int32_t connection_pool_max_life_time = -1; |
63 | | bool connection_pool_keep_alive = false; |
64 | | |
65 | | const TupleDescriptor* tuple_desc = nullptr; |
66 | | }; |
67 | | |
68 | | class JdbcConnector : public TableConnector { |
69 | | public: |
70 | | struct JdbcStatistic { |
71 | | int64_t _load_jar_timer = 0; |
72 | | int64_t _init_connector_timer = 0; |
73 | | int64_t _get_data_timer = 0; |
74 | | int64_t _read_and_fill_vector_table_timer = 0; |
75 | | int64_t _jni_setup_timer = 0; |
76 | | int64_t _has_next_timer = 0; |
77 | | int64_t _prepare_params_timer = 0; |
78 | | int64_t _fill_block_timer = 0; |
79 | | int64_t _cast_timer = 0; |
80 | | int64_t _check_type_timer = 0; |
81 | | int64_t _execte_read_timer = 0; |
82 | | int64_t _connector_close_timer = 0; |
83 | | }; |
84 | | |
85 | | JdbcConnector(const JdbcConnectorParam& param); |
86 | | |
87 | | ~JdbcConnector() override; |
88 | | |
89 | | Status open(RuntimeState* state, bool read = false); |
90 | | |
91 | | Status query() override; |
92 | | |
93 | | Status get_next(bool* eos, Block* block, int batch_size); |
94 | | |
95 | | Status append(Block* block, const VExprContextSPtrs& _output_vexpr_ctxs, |
96 | | uint32_t start_send_row, uint32_t* num_rows_sent, |
97 | | TOdbcTableType::type table_type = TOdbcTableType::MYSQL) override; |
98 | | |
99 | | Status exec_stmt_write(Block* block, const VExprContextSPtrs& output_vexpr_ctxs, |
100 | | uint32_t* num_rows_sent) override; |
101 | | |
102 | | // use in JDBC transaction |
103 | | Status begin_trans() override; // should be call after connect and before query or init_to_write |
104 | | Status abort_trans() override; // should be call after transaction abort |
105 | | Status finish_trans() override; // should be call after transaction commit |
106 | | |
107 | 0 | Status init_to_write(doris::RuntimeProfile* operator_profile) override { |
108 | 0 | init_profile(operator_profile); |
109 | 0 | return Status::OK(); |
110 | 0 | } |
111 | | |
112 | 0 | JdbcStatistic& get_jdbc_statistic() { return _jdbc_statistic; } |
113 | | |
114 | | Status close(Status s = Status::OK()) override; |
115 | | |
116 | | Status test_connection(); |
117 | | Status clean_datasource(); |
118 | | |
119 | | protected: |
120 | | JdbcConnectorParam _conn_param; |
121 | | |
122 | | private: |
123 | | Status _register_func_id(JNIEnv* env); |
124 | | |
125 | | Status _get_reader_params(Block* block, JNIEnv* env, size_t column_size, Jni::LocalObject* ans); |
126 | | |
127 | | Status _cast_string_to_special(Block* block, JNIEnv* env, size_t column_size); |
128 | | Status _cast_string_to_hll(const SlotDescriptor* slot_desc, Block* block, int column_index, |
129 | | int rows); |
130 | | Status _cast_string_to_bitmap(const SlotDescriptor* slot_desc, Block* block, int column_index, |
131 | | int rows); |
132 | | Status _cast_string_to_json(const SlotDescriptor* slot_desc, Block* block, int column_index, |
133 | | int rows); |
134 | | |
135 | | Status _get_java_table_type(JNIEnv* env, TOdbcTableType::type table_type, |
136 | | Jni::LocalObject* java_enum_obj); |
137 | | |
138 | | Status _get_real_url(const std::string& url, std::string* result_url); |
139 | | Status _check_and_return_default_driver_url(const std::string& url, std::string* result_url); |
140 | | |
141 | | bool _closed = false; |
142 | | |
143 | | Jni::GlobalClass _executor_factory_clazz; |
144 | | Jni::GlobalClass _executor_clazz; |
145 | | Jni::GlobalObject _executor_obj; |
146 | | Jni::MethodId _executor_factory_ctor_id; |
147 | | Jni::MethodId _executor_ctor_id; |
148 | | Jni::MethodId _executor_stmt_write_id; |
149 | | Jni::MethodId _executor_read_id; |
150 | | Jni::MethodId _executor_has_next_id; |
151 | | Jni::MethodId _executor_get_block_address_id; |
152 | | Jni::MethodId _executor_block_rows_id; |
153 | | Jni::MethodId _executor_close_id; |
154 | | Jni::MethodId _executor_begin_trans_id; |
155 | | Jni::MethodId _executor_finish_trans_id; |
156 | | Jni::MethodId _executor_abort_trans_id; |
157 | | Jni::MethodId _executor_test_connection_id; |
158 | | Jni::MethodId _executor_clean_datasource_id; |
159 | | |
160 | | std::map<int, int> _map_column_idx_to_cast_idx_hll; |
161 | | std::vector<DataTypePtr> _input_hll_string_types; |
162 | | |
163 | | std::map<int, int> _map_column_idx_to_cast_idx_bitmap; |
164 | | std::vector<DataTypePtr> _input_bitmap_string_types; |
165 | | |
166 | | std::map<int, int> _map_column_idx_to_cast_idx_json; |
167 | | std::vector<DataTypePtr> _input_json_string_types; |
168 | | |
169 | | JdbcStatistic _jdbc_statistic; |
170 | | }; |
171 | | |
172 | | } // namespace doris |