be/src/exec/scan/jdbc_scanner.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "jdbc_scanner.h" |
19 | | |
20 | | #include <new> |
21 | | #include <ostream> |
22 | | #include <utility> |
23 | | #include <vector> |
24 | | |
25 | | #include "common/logging.h" |
26 | | #include "core/block/block.h" |
27 | | #include "exprs/vexpr_context.h" |
28 | | #include "format/table/jdbc_jni_reader.h" |
29 | | #include "runtime/descriptors.h" |
30 | | #include "runtime/runtime_profile.h" |
31 | | #include "runtime/runtime_state.h" |
32 | | #include "util/jdbc_utils.h" |
33 | | |
34 | | namespace doris { |
35 | | |
36 | | JdbcScanner::JdbcScanner(RuntimeState* state, doris::JDBCScanLocalState* local_state, int64_t limit, |
37 | | const TupleId& tuple_id, const std::string& query_string, |
38 | | TOdbcTableType::type table_type, bool is_tvf, RuntimeProfile* profile) |
39 | | : Scanner(state, local_state, limit, profile), |
40 | | _jdbc_eos(false), |
41 | 2.31k | _tuple_id(tuple_id), |
42 | 2.31k | _query_string(query_string), |
43 | 2.31k | _tuple_desc(nullptr), |
44 | 2.31k | _table_type(table_type), |
45 | 2.31k | _is_tvf(is_tvf) { |
46 | 2.31k | _has_prepared = false; |
47 | 2.31k | } |
48 | 2.31k | |
49 | 2.31k | std::map<std::string, std::string> JdbcScanner::_build_jdbc_params( |
50 | 2.31k | const TupleDescriptor* tuple_desc) { |
51 | | const JdbcTableDescriptor* jdbc_table = |
52 | 2.32k | static_cast<const JdbcTableDescriptor*>(tuple_desc->table_desc()); |
53 | 2.32k |
|
54 | 2.32k | std::map<std::string, std::string> params; |
55 | | params["jdbc_url"] = jdbc_table->jdbc_url(); |
56 | 2.32k | params["jdbc_user"] = jdbc_table->jdbc_user(); |
57 | 0 | params["jdbc_password"] = jdbc_table->jdbc_passwd(); |
58 | 0 | params["jdbc_driver_class"] = jdbc_table->jdbc_driver_class(); |
59 | | // Resolve jdbc_driver_url to absolute file:// URL |
60 | | // FE sends just the JAR filename; we need to resolve it to a full path. |
61 | 2.32k | std::string driver_url; |
62 | 2.32k | auto resolve_st = JdbcUtils::resolve_driver_url(jdbc_table->jdbc_driver_url(), &driver_url); |
63 | 0 | if (!resolve_st.ok()) { |
64 | 0 | LOG(WARNING) << "Failed to resolve JDBC driver URL: " << resolve_st.to_string(); |
65 | | driver_url = jdbc_table->jdbc_driver_url(); |
66 | | } |
67 | 2.32k | params["jdbc_driver_url"] = driver_url; |
68 | 2.32k | params["jdbc_driver_checksum"] = jdbc_table->jdbc_driver_checksum(); |
69 | 2.32k | params["query_sql"] = _query_string; |
70 | 0 | params["catalog_id"] = std::to_string(jdbc_table->jdbc_catalog_id()); |
71 | 0 | params["table_type"] = _odbc_table_type_to_string(_table_type); |
72 | 2.32k | params["connection_pool_min_size"] = std::to_string(jdbc_table->connection_pool_min_size()); |
73 | 2.32k | params["connection_pool_max_size"] = std::to_string(jdbc_table->connection_pool_max_size()); |
74 | 2.32k | params["connection_pool_max_wait_time"] = |
75 | 2.32k | std::to_string(jdbc_table->connection_pool_max_wait_time()); |
76 | 2.32k | params["connection_pool_max_life_time"] = |
77 | 2.32k | std::to_string(jdbc_table->connection_pool_max_life_time()); |
78 | 2.32k | params["connection_pool_keep_alive"] = |
79 | 2.32k | jdbc_table->connection_pool_keep_alive() ? "true" : "false"; |
80 | 2.32k | return params; |
81 | 2.32k | } |
82 | 2.32k | |
83 | 2.32k | Status JdbcScanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) { |
84 | 2.32k | VLOG_CRITICAL << "JdbcScanner::init"; |
85 | 2.32k | RETURN_IF_ERROR(Scanner::init(state, conjuncts)); |
86 | 2.32k | |
87 | 2.32k | if (state == nullptr) { |
88 | 2.32k | return Status::InternalError("input pointer is NULL of JdbcScanner::init."); |
89 | 2.32k | } |
90 | | |
91 | 2.32k | // get tuple desc |
92 | 2.32k | _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); |
93 | 2.32k | if (_tuple_desc == nullptr) { |
94 | 2.32k | return Status::InternalError("Failed to get tuple descriptor."); |
95 | | } |
96 | 2.32k | |
97 | 2.32k | // get jdbc table info |
98 | 0 | const JdbcTableDescriptor* jdbc_table = |
99 | 0 | static_cast<const JdbcTableDescriptor*>(_tuple_desc->table_desc()); |
100 | | if (jdbc_table == nullptr) { |
101 | 2.32k | return Status::InternalError("jdbc table pointer is NULL of JdbcScanner::init."); |
102 | 2.32k | } |
103 | | |
104 | 2.31k | _local_state->scanner_profile()->add_info_string("JdbcDriverClass", |
105 | 2.31k | jdbc_table->jdbc_driver_class()); |
106 | 2.31k | _local_state->scanner_profile()->add_info_string("JdbcDriverUrl", |
107 | 0 | jdbc_table->jdbc_driver_url()); |
108 | 0 | _local_state->scanner_profile()->add_info_string("JdbcUrl", jdbc_table->jdbc_url()); |
109 | | _local_state->scanner_profile()->add_info_string("QuerySql", _query_string); |
110 | 2.31k |
|
111 | 0 | // Build reader params from tuple descriptor |
112 | 0 | auto jdbc_params = _build_jdbc_params(_tuple_desc); |
113 | 2.31k | |
114 | 2.31k | // Pass _tuple_desc->slots() directly. JniReader stores _file_slot_descs as a reference, |
115 | 2.31k | // so we must pass a vector whose lifetime outlives the reader (i.e., _tuple_desc->slots()). |
116 | 2.31k | // Previously a local vector was passed, causing a dangling reference after init() returned. |
117 | 2.31k | _jni_reader = JdbcJniReader::create_unique(_tuple_desc->slots(), state, _profile, jdbc_params); |
118 | 2.31k | |
119 | | return Status::OK(); |
120 | 4.55k | } |
121 | 4.55k |
|
122 | 4.55k | Status JdbcScanner::_open_impl(RuntimeState* state) { |
123 | 0 | VLOG_CRITICAL << "JdbcScanner::open"; |
124 | 0 | if (state == nullptr) { |
125 | | return Status::InternalError("input pointer is NULL of JdbcScanner::open."); |
126 | 4.55k | } |
127 | 0 |
|
128 | 0 | if (!_has_prepared) { |
129 | | return Status::InternalError("used before initialize of JdbcScanner::open."); |
130 | 4.55k | } |
131 | 0 | RETURN_IF_CANCELLED(state); |
132 | 0 | RETURN_IF_ERROR(Scanner::_open_impl(state)); |
133 | 0 | RETURN_IF_ERROR(_jni_reader->init_reader()); |
134 | 0 | return Status::OK(); |
135 | | } |
136 | | |
137 | 4.55k | Status JdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) { |
138 | | VLOG_CRITICAL << "JdbcScanner::_get_block_impl"; |
139 | 4.55k | if (nullptr == state || nullptr == block || nullptr == eof) { |
140 | 4.55k | return Status::InternalError("input is NULL pointer"); |
141 | | } |
142 | 4.55k | |
143 | | if (!_has_prepared) { |
144 | 4.55k | return Status::InternalError("used before initialize of JdbcScanner::get_next."); |
145 | 2.30k | } |
146 | 2.30k | |
147 | 2.30k | if (_jdbc_eos) { |
148 | 2.30k | *eof = true; |
149 | 2.30k | return Status::OK(); |
150 | 2.30k | } |
151 | | |
152 | 2.25k | // only empty block should be here |
153 | 2.25k | DCHECK(block->rows() == 0); |
154 | 4.55k | |
155 | 4.55k | do { |
156 | | RETURN_IF_CANCELLED(state); |
157 | 2.31k | |
158 | 2.31k | size_t read_rows = 0; |
159 | 2.31k | bool reader_eof = false; |
160 | 2.31k | RETURN_IF_ERROR(_jni_reader->get_next_block(block, &read_rows, &reader_eof)); |
161 | 2.31k | |
162 | 2.31k | if (reader_eof) { |
163 | 2.31k | _jdbc_eos = true; |
164 | 2.31k | if (block->rows() == 0) { |
165 | 2.31k | *eof = true; |
166 | 2.31k | } |
167 | 2.31k | break; |
168 | 2.31k | } |
169 | 2.31k | |
170 | 2.31k | VLOG_ROW << "JdbcScanner output rows: " << block->rows(); |
171 | 2.31k | } while (block->rows() == 0 && !(*eof)); |
172 | | return Status::OK(); |
173 | 2.30k | } |
174 | 2.30k | |
175 | 2.30k | Status JdbcScanner::close(RuntimeState* state) { |
176 | 2.30k | if (!_try_close()) { |
177 | 2.30k | return Status::OK(); |
178 | 2.30k | } |
179 | 2.30k | RETURN_IF_ERROR(Scanner::close(state)); |
180 | 2.30k | if (_jni_reader) { |
181 | 2.30k | RETURN_IF_ERROR(_jni_reader->close()); |
182 | 2.30k | } |
183 | 2.30k | return Status::OK(); |
184 | 2.30k | } |
185 | 2.30k | } // namespace doris |