be/src/format_v2/jni/jdbc_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format_v2/jni/jdbc_reader.h" |
19 | | |
20 | | #include <memory> |
21 | | #include <utility> |
22 | | |
23 | | #include "common/cast_set.h" |
24 | | #include "core/assert_cast.h" |
25 | | #include "core/block/block.h" |
26 | | #include "core/block/columns_with_type_and_name.h" |
27 | | #include "core/column/column_nullable.h" |
28 | | #include "core/data_type/data_type_nullable.h" |
29 | | #include "core/data_type/data_type_string.h" |
30 | | #include "exprs/function/simple_function_factory.h" |
31 | | #include "exprs/vexpr_context.h" |
32 | | #include "format_v2/table_reader.h" |
33 | | #include "util/jdbc_utils.h" |
34 | | |
35 | | namespace doris::format::jdbc { |
36 | | |
37 | 0 | std::string JdbcJniReader::connector_class() const { |
38 | 0 | return "org/apache/doris/jdbc/JdbcJniScanner"; |
39 | 0 | } |
40 | | |
41 | 0 | Status JdbcJniReader::prepare_split(const format::SplitReadOptions& options) { |
42 | 0 | _jdbc_params.clear(); |
43 | 0 | if (options.current_range.__isset.table_format_params && |
44 | 0 | options.current_range.table_format_params.table_format_type == "jdbc") { |
45 | 0 | _jdbc_params = std::map<std::string, std::string>( |
46 | 0 | options.current_range.table_format_params.jdbc_params.begin(), |
47 | 0 | options.current_range.table_format_params.jdbc_params.end()); |
48 | 0 | } |
49 | 0 | return format::JniTableReader::prepare_split(options); |
50 | 0 | } |
51 | | |
52 | | // need pass to the java side, so the java scanner can parse the params and construct the JDBC connection |
53 | 0 | Status JdbcJniReader::build_scanner_params(std::map<std::string, std::string>* params) const { |
54 | 0 | DORIS_CHECK(params != nullptr); |
55 | 0 | *params = _jdbc_params; |
56 | 0 | if (params->contains("jdbc_driver_url")) { |
57 | 0 | std::string resolved; |
58 | 0 | if (JdbcUtils::resolve_driver_url((*params)["jdbc_driver_url"], &resolved).ok()) { |
59 | 0 | (*params)["jdbc_driver_url"] = resolved; |
60 | 0 | } |
61 | 0 | } |
62 | 0 | return Status::OK(); |
63 | 0 | } |
64 | | |
65 | | Status JdbcJniReader::build_jni_columns( |
66 | 0 | std::vector<format::JniTableReader::JniColumn>* columns) const { |
67 | 0 | DORIS_CHECK(columns != nullptr); |
68 | 0 | columns->clear(); |
69 | 0 | columns->reserve(_projected_columns.size()); |
70 | 0 | for (size_t i = 0; i < _projected_columns.size(); ++i) { |
71 | 0 | const auto& table_column = _projected_columns[i]; |
72 | 0 | const auto primitive_type = remove_nullable(table_column.type)->get_primitive_type(); |
73 | 0 | columns->push_back({ |
74 | 0 | .java_name = table_column.name, |
75 | 0 | .output_index = i, |
76 | 0 | .output_type = table_column.type, |
77 | 0 | .transfer_type = _transfer_type_for(table_column.type), |
78 | 0 | .replace_type = _replace_type_for(primitive_type), |
79 | 0 | }); |
80 | 0 | } |
81 | 0 | return Status::OK(); |
82 | 0 | } |
83 | | |
84 | 0 | Status JdbcJniReader::finalize_jni_block(Block* jni_block, Block* output_block, size_t* rows) { |
85 | 0 | DORIS_CHECK(jni_block != nullptr); |
86 | 0 | DORIS_CHECK(output_block != nullptr); |
87 | 0 | DORIS_CHECK(rows != nullptr); |
88 | 0 | const auto original_rows = *rows; |
89 | 0 | const auto& columns = jni_columns(); |
90 | 0 | DORIS_CHECK(columns.size() == jni_block->columns()); |
91 | |
|
92 | 0 | for (size_t i = 0; i < columns.size(); ++i) { |
93 | 0 | const auto& column = columns[i]; |
94 | 0 | DORIS_CHECK(column.output_type != nullptr); |
95 | 0 | DORIS_CHECK(column.output_index < output_block->columns()); |
96 | 0 | if (_is_special_type(remove_nullable(column.output_type)->get_primitive_type())) { |
97 | 0 | RETURN_IF_ERROR(_cast_string_to_special_type(column, jni_block, i, output_block, |
98 | 0 | original_rows)); |
99 | 0 | continue; |
100 | 0 | } |
101 | 0 | output_block->get_by_position(column.output_index).type = column.output_type; |
102 | 0 | output_block->replace_by_position(column.output_index, |
103 | 0 | jni_block->get_by_position(i).column); |
104 | 0 | } |
105 | 0 | DORIS_CHECK(output_block->rows() == original_rows); |
106 | 0 | if (!_conjuncts.empty()) { |
107 | 0 | RETURN_IF_ERROR( |
108 | 0 | VExprContext::filter_block(_conjuncts, output_block, output_block->columns())); |
109 | 0 | } |
110 | 0 | *rows = output_block->rows(); |
111 | 0 | return Status::OK(); |
112 | 0 | } |
113 | | |
114 | 0 | std::string JdbcJniReader::_replace_type_for(PrimitiveType type) const { |
115 | 0 | switch (type) { |
116 | 0 | case PrimitiveType::TYPE_BITMAP: |
117 | 0 | return "bitmap"; |
118 | 0 | case PrimitiveType::TYPE_HLL: |
119 | 0 | return "hll"; |
120 | 0 | case PrimitiveType::TYPE_QUANTILE_STATE: |
121 | 0 | return "quantile_state"; |
122 | 0 | case PrimitiveType::TYPE_JSONB: |
123 | 0 | return "jsonb"; |
124 | 0 | default: |
125 | 0 | return "not_replace"; |
126 | 0 | } |
127 | 0 | } |
128 | | |
129 | 0 | bool JdbcJniReader::_is_special_type(PrimitiveType type) const { |
130 | 0 | return type == PrimitiveType::TYPE_BITMAP || type == PrimitiveType::TYPE_HLL || |
131 | 0 | type == PrimitiveType::TYPE_QUANTILE_STATE || type == PrimitiveType::TYPE_JSONB; |
132 | 0 | } |
133 | | |
134 | 0 | DataTypePtr JdbcJniReader::_transfer_type_for(const DataTypePtr& output_type) const { |
135 | 0 | DORIS_CHECK(output_type != nullptr); |
136 | 0 | if (!_is_special_type(remove_nullable(output_type)->get_primitive_type())) { |
137 | 0 | return output_type; |
138 | 0 | } |
139 | 0 | DataTypePtr string_type = std::make_shared<DataTypeString>(); |
140 | 0 | if (output_type->is_nullable()) { |
141 | 0 | string_type = make_nullable(string_type); |
142 | 0 | } |
143 | 0 | return string_type; |
144 | 0 | } |
145 | | |
146 | | Status JdbcJniReader::_cast_string_to_special_type(const format::JniTableReader::JniColumn& column, |
147 | | Block* jni_block, size_t jni_column_index, |
148 | 0 | Block* output_block, size_t rows) { |
149 | 0 | DORIS_CHECK(column.output_type != nullptr); |
150 | 0 | DORIS_CHECK(column.transfer_type != nullptr); |
151 | 0 | const auto target_type = column.output_type; |
152 | 0 | const auto target_type_name = target_type->get_name(); |
153 | |
|
154 | 0 | ColumnPtr input_column = jni_block->get_by_position(jni_column_index).column; |
155 | 0 | ColumnPtr cast_param = target_type->create_column_const_with_default_value(1); |
156 | |
|
157 | 0 | ColumnsWithTypeAndName argument_template; |
158 | 0 | argument_template.reserve(2); |
159 | 0 | argument_template.emplace_back(std::move(input_column), column.transfer_type, |
160 | 0 | "java.sql.String"); |
161 | 0 | argument_template.emplace_back(std::move(cast_param), target_type, target_type_name); |
162 | |
|
163 | 0 | FunctionBasePtr cast_function = SimpleFunctionFactory::instance().get_function( |
164 | 0 | "CAST", argument_template, make_nullable(target_type)); |
165 | 0 | if (cast_function == nullptr) { |
166 | 0 | return Status::InternalError("Failed to find CAST function for type {}", target_type_name); |
167 | 0 | } |
168 | | |
169 | 0 | Block cast_block(argument_template); |
170 | 0 | const auto result_idx = cast_set<uint32_t>(cast_block.columns()); |
171 | 0 | cast_block.insert({nullptr, make_nullable(target_type), "cast_result"}); |
172 | 0 | RETURN_IF_ERROR( |
173 | 0 | cast_function->execute(nullptr, cast_block, {0}, result_idx, cast_set<int>(rows))); |
174 | | |
175 | 0 | auto result_column = cast_block.get_by_position(result_idx).column; |
176 | 0 | output_block->get_by_position(column.output_index).type = target_type; |
177 | 0 | if (target_type->is_nullable()) { |
178 | 0 | output_block->replace_by_position(column.output_index, result_column); |
179 | 0 | } else { |
180 | 0 | const auto* nullable_column = assert_cast<const ColumnNullable*>(result_column.get()); |
181 | 0 | output_block->replace_by_position(column.output_index, |
182 | 0 | nullable_column->get_nested_column_ptr()); |
183 | 0 | } |
184 | 0 | return Status::OK(); |
185 | 0 | } |
186 | | |
187 | | } // namespace doris::format::jdbc |