be/src/exec/scan/jdbc_scanner.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "jdbc_scanner.h" |
19 | | |
20 | | #include <new> |
21 | | #include <ostream> |
22 | | #include <utility> |
23 | | #include <vector> |
24 | | |
25 | | #include "common/logging.h" |
26 | | #include "core/block/block.h" |
27 | | #include "exprs/vexpr_context.h" |
28 | | #include "format/table/jdbc_jni_reader.h" |
29 | | #include "runtime/descriptors.h" |
30 | | #include "runtime/runtime_profile.h" |
31 | | #include "runtime/runtime_state.h" |
32 | | #include "util/jdbc_utils.h" |
33 | | |
34 | | namespace doris { |
35 | | |
36 | | JdbcScanner::JdbcScanner(RuntimeState* state, doris::JDBCScanLocalState* local_state, int64_t limit, |
37 | | const TupleId& tuple_id, const std::string& query_string, |
38 | | TOdbcTableType::type table_type, bool is_tvf, RuntimeProfile* profile) |
39 | 0 | : Scanner(state, local_state, limit, profile), |
40 | 0 | _jdbc_eos(false), |
41 | 0 | _tuple_id(tuple_id), |
42 | 0 | _query_string(query_string), |
43 | 0 | _tuple_desc(nullptr), |
44 | 0 | _table_type(table_type), |
45 | 0 | _is_tvf(is_tvf) { |
46 | 0 | _has_prepared = false; |
47 | 0 | } |
48 | | |
49 | | std::map<std::string, std::string> JdbcScanner::_build_jdbc_params( |
50 | 0 | const TupleDescriptor* tuple_desc) { |
51 | 0 | const JdbcTableDescriptor* jdbc_table = |
52 | 0 | static_cast<const JdbcTableDescriptor*>(tuple_desc->table_desc()); |
53 | |
|
54 | 0 | std::map<std::string, std::string> params; |
55 | 0 | params["jdbc_url"] = jdbc_table->jdbc_url(); |
56 | 0 | params["jdbc_user"] = jdbc_table->jdbc_user(); |
57 | 0 | params["jdbc_password"] = jdbc_table->jdbc_passwd(); |
58 | 0 | params["jdbc_driver_class"] = jdbc_table->jdbc_driver_class(); |
59 | | // Resolve jdbc_driver_url to absolute file:// URL |
60 | | // FE sends just the JAR filename; we need to resolve it to a full path. |
61 | 0 | std::string driver_url; |
62 | 0 | auto resolve_st = JdbcUtils::resolve_driver_url(jdbc_table->jdbc_driver_url(), &driver_url); |
63 | 0 | if (!resolve_st.ok()) { |
64 | 0 | LOG(WARNING) << "Failed to resolve JDBC driver URL: " << resolve_st.to_string(); |
65 | 0 | driver_url = jdbc_table->jdbc_driver_url(); |
66 | 0 | } |
67 | 0 | params["jdbc_driver_url"] = driver_url; |
68 | 0 | params["jdbc_driver_checksum"] = jdbc_table->jdbc_driver_checksum(); |
69 | 0 | params["query_sql"] = _query_string; |
70 | 0 | params["catalog_id"] = std::to_string(jdbc_table->jdbc_catalog_id()); |
71 | 0 | params["table_type"] = _odbc_table_type_to_string(_table_type); |
72 | 0 | params["connection_pool_min_size"] = std::to_string(jdbc_table->connection_pool_min_size()); |
73 | 0 | params["connection_pool_max_size"] = std::to_string(jdbc_table->connection_pool_max_size()); |
74 | 0 | params["connection_pool_max_wait_time"] = |
75 | 0 | std::to_string(jdbc_table->connection_pool_max_wait_time()); |
76 | 0 | params["connection_pool_max_life_time"] = |
77 | 0 | std::to_string(jdbc_table->connection_pool_max_life_time()); |
78 | 0 | params["connection_pool_keep_alive"] = |
79 | 0 | jdbc_table->connection_pool_keep_alive() ? "true" : "false"; |
80 | 0 | return params; |
81 | 0 | } |
82 | | |
83 | 0 | Status JdbcScanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) { |
84 | 0 | VLOG_CRITICAL << "JdbcScanner::init"; |
85 | 0 | RETURN_IF_ERROR(Scanner::init(state, conjuncts)); |
86 | | |
87 | 0 | if (state == nullptr) { |
88 | 0 | return Status::InternalError("input pointer is NULL of JdbcScanner::init."); |
89 | 0 | } |
90 | | |
91 | | // get tuple desc |
92 | 0 | _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); |
93 | 0 | if (_tuple_desc == nullptr) { |
94 | 0 | return Status::InternalError("Failed to get tuple descriptor."); |
95 | 0 | } |
96 | | |
97 | | // get jdbc table info |
98 | 0 | const JdbcTableDescriptor* jdbc_table = |
99 | 0 | static_cast<const JdbcTableDescriptor*>(_tuple_desc->table_desc()); |
100 | 0 | if (jdbc_table == nullptr) { |
101 | 0 | return Status::InternalError("jdbc table pointer is NULL of JdbcScanner::init."); |
102 | 0 | } |
103 | | |
104 | 0 | _local_state->scanner_profile()->add_info_string("JdbcDriverClass", |
105 | 0 | jdbc_table->jdbc_driver_class()); |
106 | 0 | _local_state->scanner_profile()->add_info_string("JdbcDriverUrl", |
107 | 0 | jdbc_table->jdbc_driver_url()); |
108 | 0 | _local_state->scanner_profile()->add_info_string("JdbcUrl", jdbc_table->jdbc_url()); |
109 | 0 | _local_state->scanner_profile()->add_info_string("QuerySql", _query_string); |
110 | | |
111 | | // Build reader params from tuple descriptor |
112 | 0 | auto jdbc_params = _build_jdbc_params(_tuple_desc); |
113 | | |
114 | | // Pass _tuple_desc->slots() directly. JniReader stores _file_slot_descs as a reference, |
115 | | // so we must pass a vector whose lifetime outlives the reader (i.e., _tuple_desc->slots()). |
116 | | // Previously a local vector was passed, causing a dangling reference after init() returned. |
117 | 0 | _jni_reader = JdbcJniReader::create_unique(_tuple_desc->slots(), state, _profile, jdbc_params); |
118 | |
|
119 | 0 | return Status::OK(); |
120 | 0 | } |
121 | | |
122 | 0 | Status JdbcScanner::_open_impl(RuntimeState* state) { |
123 | 0 | VLOG_CRITICAL << "JdbcScanner::open"; |
124 | 0 | if (state == nullptr) { |
125 | 0 | return Status::InternalError("input pointer is NULL of JdbcScanner::open."); |
126 | 0 | } |
127 | | |
128 | 0 | if (!_has_prepared) { |
129 | 0 | return Status::InternalError("used before initialize of JdbcScanner::open."); |
130 | 0 | } |
131 | 0 | RETURN_IF_CANCELLED(state); |
132 | 0 | RETURN_IF_ERROR(Scanner::_open_impl(state)); |
133 | 0 | RETURN_IF_ERROR(_jni_reader->init_reader()); |
134 | 0 | return Status::OK(); |
135 | 0 | } |
136 | | |
137 | 0 | Status JdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) { |
138 | 0 | VLOG_CRITICAL << "JdbcScanner::_get_block_impl"; |
139 | 0 | if (nullptr == state || nullptr == block || nullptr == eof) { |
140 | 0 | return Status::InternalError("input is NULL pointer"); |
141 | 0 | } |
142 | | |
143 | 0 | if (!_has_prepared) { |
144 | 0 | return Status::InternalError("used before initialize of JdbcScanner::get_next."); |
145 | 0 | } |
146 | | |
147 | 0 | if (_jdbc_eos) { |
148 | 0 | *eof = true; |
149 | 0 | return Status::OK(); |
150 | 0 | } |
151 | | |
152 | | // only empty block should be here |
153 | 0 | DCHECK(block->rows() == 0); |
154 | |
|
155 | 0 | do { |
156 | 0 | RETURN_IF_CANCELLED(state); |
157 | | |
158 | 0 | size_t read_rows = 0; |
159 | 0 | bool reader_eof = false; |
160 | 0 | RETURN_IF_ERROR(_jni_reader->get_next_block(block, &read_rows, &reader_eof)); |
161 | | |
162 | 0 | if (reader_eof) { |
163 | 0 | _jdbc_eos = true; |
164 | 0 | if (block->rows() == 0) { |
165 | 0 | *eof = true; |
166 | 0 | } |
167 | 0 | break; |
168 | 0 | } |
169 | | |
170 | 0 | VLOG_ROW << "JdbcScanner output rows: " << block->rows(); |
171 | 0 | } while (block->rows() == 0 && !(*eof)); |
172 | 0 | return Status::OK(); |
173 | 0 | } |
174 | | |
175 | 0 | Status JdbcScanner::close(RuntimeState* state) { |
176 | 0 | if (!_try_close()) { |
177 | 0 | return Status::OK(); |
178 | 0 | } |
179 | 0 | RETURN_IF_ERROR(Scanner::close(state)); |
180 | 0 | if (_jni_reader) { |
181 | 0 | RETURN_IF_ERROR(_jni_reader->close()); |
182 | 0 | } |
183 | 0 | return Status::OK(); |
184 | 0 | } |
185 | | } // namespace doris |