be/src/exec/scan/jdbc_scanner.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/scan/jdbc_scanner.h" |
19 | | |
20 | | #include <new> |
21 | | #include <ostream> |
22 | | #include <utility> |
23 | | #include <vector> |
24 | | |
25 | | #include "common/logging.h" |
26 | | #include "core/block/block.h" |
27 | | #include "core/block/column_with_type_and_name.h" |
28 | | #include "core/column/column.h" |
29 | | #include "core/data_type/data_type.h" |
30 | | #include "exec/connector/vjdbc_connector.h" |
31 | | #include "exprs/vexpr_context.h" |
32 | | #include "runtime/descriptors.h" |
33 | | #include "runtime/runtime_profile.h" |
34 | | #include "runtime/runtime_state.h" |
35 | | |
36 | | namespace doris { |
37 | | |
38 | | JdbcScanner::JdbcScanner(RuntimeState* state, doris::JDBCScanLocalState* local_state, int64_t limit, |
39 | | const TupleId& tuple_id, const std::string& query_string, |
40 | | TOdbcTableType::type table_type, bool is_tvf, RuntimeProfile* profile) |
41 | 0 | : Scanner(state, local_state, limit, profile), |
42 | 0 | _jdbc_eos(false), |
43 | 0 | _tuple_id(tuple_id), |
44 | 0 | _query_string(query_string), |
45 | 0 | _tuple_desc(nullptr), |
46 | 0 | _table_type(table_type), |
47 | 0 | _is_tvf(is_tvf) { |
48 | 0 | _init_profile(local_state->_scanner_profile); |
49 | 0 | _has_prepared = false; |
50 | 0 | } |
51 | | |
52 | 0 | Status JdbcScanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) { |
53 | 0 | VLOG_CRITICAL << "JdbcScanner::init"; |
54 | 0 | RETURN_IF_ERROR(Scanner::init(state, conjuncts)); |
55 | | |
56 | 0 | if (state == nullptr) { |
57 | 0 | return Status::InternalError("input pointer is NULL of VJdbcScanNode::init."); |
58 | 0 | } |
59 | | |
60 | | // get tuple desc |
61 | 0 | _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); |
62 | 0 | if (_tuple_desc == nullptr) { |
63 | 0 | return Status::InternalError("Failed to get tuple descriptor."); |
64 | 0 | } |
65 | | |
66 | | // get jdbc table info |
67 | 0 | const JdbcTableDescriptor* jdbc_table = |
68 | 0 | static_cast<const JdbcTableDescriptor*>(_tuple_desc->table_desc()); |
69 | 0 | if (jdbc_table == nullptr) { |
70 | 0 | return Status::InternalError("jdbc table pointer is NULL of VJdbcScanNode::init."); |
71 | 0 | } |
72 | 0 | _jdbc_param.catalog_id = jdbc_table->jdbc_catalog_id(); |
73 | 0 | _jdbc_param.driver_class = jdbc_table->jdbc_driver_class(); |
74 | 0 | _jdbc_param.driver_path = jdbc_table->jdbc_driver_url(); |
75 | 0 | _jdbc_param.resource_name = jdbc_table->jdbc_resource_name(); |
76 | 0 | _jdbc_param.driver_checksum = jdbc_table->jdbc_driver_checksum(); |
77 | 0 | _jdbc_param.jdbc_url = jdbc_table->jdbc_url(); |
78 | 0 | _jdbc_param.user = jdbc_table->jdbc_user(); |
79 | 0 | _jdbc_param.passwd = jdbc_table->jdbc_passwd(); |
80 | 0 | _jdbc_param.tuple_desc = _tuple_desc; |
81 | 0 | _jdbc_param.query_string = std::move(_query_string); |
82 | 0 | _jdbc_param.use_transaction = false; // not useful for scanner but only sink. |
83 | 0 | _jdbc_param.table_type = _table_type; |
84 | 0 | _jdbc_param.is_tvf = _is_tvf; |
85 | 0 | _jdbc_param.connection_pool_min_size = jdbc_table->connection_pool_min_size(); |
86 | 0 | _jdbc_param.connection_pool_max_size = jdbc_table->connection_pool_max_size(); |
87 | 0 | _jdbc_param.connection_pool_max_life_time = jdbc_table->connection_pool_max_life_time(); |
88 | 0 | _jdbc_param.connection_pool_max_wait_time = jdbc_table->connection_pool_max_wait_time(); |
89 | 0 | _jdbc_param.connection_pool_keep_alive = jdbc_table->connection_pool_keep_alive(); |
90 | |
|
91 | 0 | _local_state->scanner_profile()->add_info_string("JdbcDriverClass", _jdbc_param.driver_class); |
92 | 0 | _local_state->scanner_profile()->add_info_string("JdbcDriverUrl", _jdbc_param.driver_path); |
93 | 0 | _local_state->scanner_profile()->add_info_string("JdbcUrl", _jdbc_param.jdbc_url); |
94 | 0 | _local_state->scanner_profile()->add_info_string("QuerySql", _jdbc_param.query_string); |
95 | |
|
96 | 0 | _jdbc_connector.reset(new (std::nothrow) JdbcConnector(_jdbc_param)); |
97 | 0 | if (_jdbc_connector == nullptr) { |
98 | 0 | return Status::InternalError("new a jdbc scanner failed."); |
99 | 0 | } |
100 | | |
101 | 0 | return Status::OK(); |
102 | 0 | } |
103 | | |
104 | 0 | Status JdbcScanner::_open_impl(RuntimeState* state) { |
105 | 0 | VLOG_CRITICAL << "JdbcScanner::open"; |
106 | 0 | if (state == nullptr) { |
107 | 0 | return Status::InternalError("input pointer is NULL of VJdbcScanNode::open."); |
108 | 0 | } |
109 | | |
110 | 0 | if (!_has_prepared) { |
111 | 0 | return Status::InternalError("used before initialize of VJdbcScanNode::open."); |
112 | 0 | } |
113 | 0 | RETURN_IF_CANCELLED(state); |
114 | 0 | RETURN_IF_ERROR(Scanner::_open_impl(state)); |
115 | 0 | RETURN_IF_ERROR(_jdbc_connector->open(state, true)); |
116 | 0 | RETURN_IF_ERROR(_jdbc_connector->query()); |
117 | 0 | return Status::OK(); |
118 | 0 | } |
119 | | |
120 | 0 | Status JdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) { |
121 | 0 | VLOG_CRITICAL << "JdbcScanner::_get_block_impl"; |
122 | 0 | if (nullptr == state || nullptr == block || nullptr == eof) { |
123 | 0 | return Status::InternalError("input is NULL pointer"); |
124 | 0 | } |
125 | | |
126 | 0 | if (!_has_prepared) { |
127 | 0 | return Status::InternalError("used before initialize of VJdbcScanNode::get_next."); |
128 | 0 | } |
129 | | |
130 | 0 | if (_jdbc_eos == true) { |
131 | 0 | *eof = true; |
132 | 0 | _update_profile(); |
133 | 0 | return Status::OK(); |
134 | 0 | } |
135 | | |
136 | | // only empty block should be here |
137 | 0 | DCHECK(block->rows() == 0); |
138 | |
|
139 | 0 | do { |
140 | 0 | RETURN_IF_CANCELLED(state); |
141 | | |
142 | 0 | RETURN_IF_ERROR(_jdbc_connector->get_next(&_jdbc_eos, block, state->batch_size())); |
143 | | |
144 | 0 | if (_jdbc_eos == true) { |
145 | 0 | if (block->rows() == 0) { |
146 | 0 | _update_profile(); |
147 | 0 | *eof = true; |
148 | 0 | } |
149 | 0 | break; |
150 | 0 | } |
151 | | |
152 | 0 | VLOG_ROW << "NewJdbcScanNode output rows: " << block->rows(); |
153 | 0 | } while (block->rows() == 0 && !(*eof)); |
154 | 0 | return Status::OK(); |
155 | 0 | } |
156 | | |
157 | 0 | void JdbcScanner::_init_profile(const std::shared_ptr<RuntimeProfile>& profile) { |
158 | 0 | _load_jar_timer = ADD_TIMER(profile, "LoadJarTime"); |
159 | 0 | _init_connector_timer = ADD_TIMER(profile, "InitConnectorTime"); |
160 | 0 | _check_type_timer = ADD_TIMER(profile, "CheckTypeTime"); |
161 | 0 | _get_data_timer = ADD_TIMER(profile, "GetDataTime"); |
162 | 0 | _read_and_fill_vector_table_timer = |
163 | 0 | ADD_CHILD_TIMER(profile, "ReadAndFillVectorTableTime", "GetDataTime"); |
164 | 0 | _jni_setup_timer = ADD_CHILD_TIMER(profile, "JniSetupTime", "GetDataTime"); |
165 | 0 | _has_next_timer = ADD_CHILD_TIMER(profile, "HasNextTime", "GetDataTime"); |
166 | 0 | _prepare_params_timer = ADD_CHILD_TIMER(profile, "PrepareParamsTime", "GetDataTime"); |
167 | 0 | _fill_block_timer = ADD_CHILD_TIMER(profile, "FillBlockTime", "GetDataTime"); |
168 | 0 | _cast_timer = ADD_CHILD_TIMER(profile, "CastTime", "GetDataTime"); |
169 | 0 | _execte_read_timer = ADD_TIMER(profile, "ExecteReadTime"); |
170 | 0 | _connector_close_timer = ADD_TIMER(profile, "ConnectorCloseTime"); |
171 | 0 | } |
172 | | |
173 | 0 | void JdbcScanner::_update_profile() { |
174 | 0 | JdbcConnector::JdbcStatistic& jdbc_statistic = _jdbc_connector->get_jdbc_statistic(); |
175 | 0 | COUNTER_UPDATE(_load_jar_timer, jdbc_statistic._load_jar_timer); |
176 | 0 | COUNTER_UPDATE(_init_connector_timer, jdbc_statistic._init_connector_timer); |
177 | 0 | COUNTER_UPDATE(_check_type_timer, jdbc_statistic._check_type_timer); |
178 | 0 | COUNTER_UPDATE(_get_data_timer, jdbc_statistic._get_data_timer); |
179 | 0 | COUNTER_UPDATE(_jni_setup_timer, jdbc_statistic._jni_setup_timer); |
180 | 0 | COUNTER_UPDATE(_has_next_timer, jdbc_statistic._has_next_timer); |
181 | 0 | COUNTER_UPDATE(_prepare_params_timer, jdbc_statistic._prepare_params_timer); |
182 | 0 | COUNTER_UPDATE(_read_and_fill_vector_table_timer, |
183 | 0 | jdbc_statistic._read_and_fill_vector_table_timer); |
184 | 0 | COUNTER_UPDATE(_fill_block_timer, jdbc_statistic._fill_block_timer); |
185 | 0 | COUNTER_UPDATE(_cast_timer, jdbc_statistic._cast_timer); |
186 | 0 | COUNTER_UPDATE(_execte_read_timer, jdbc_statistic._execte_read_timer); |
187 | 0 | COUNTER_UPDATE(_connector_close_timer, jdbc_statistic._connector_close_timer); |
188 | 0 | } |
189 | | |
190 | 0 | Status JdbcScanner::close(RuntimeState* state) { |
191 | 0 | if (!_try_close()) { |
192 | 0 | return Status::OK(); |
193 | 0 | } |
194 | 0 | RETURN_IF_ERROR(Scanner::close(state)); |
195 | 0 | RETURN_IF_ERROR(_jdbc_connector->close()); |
196 | 0 | return Status::OK(); |
197 | 0 | } |
198 | | } // namespace doris |