Coverage Report

Created: 2026-06-29 16:39

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format_v2/jni/jdbc_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format_v2/jni/jdbc_reader.h"
19
20
#include <memory>
21
#include <utility>
22
23
#include "common/cast_set.h"
24
#include "core/assert_cast.h"
25
#include "core/block/block.h"
26
#include "core/block/columns_with_type_and_name.h"
27
#include "core/column/column_nullable.h"
28
#include "core/data_type/data_type_nullable.h"
29
#include "core/data_type/data_type_string.h"
30
#include "exprs/function/simple_function_factory.h"
31
#include "exprs/vexpr_context.h"
32
#include "format_v2/table_reader.h"
33
#include "util/jdbc_utils.h"
34
35
namespace doris::format::jdbc {
36
37
0
std::string JdbcJniReader::connector_class() const {
38
0
    return "org/apache/doris/jdbc/JdbcJniScanner";
39
0
}
40
41
0
Status JdbcJniReader::prepare_split(const format::SplitReadOptions& options) {
42
0
    _jdbc_params.clear();
43
0
    if (options.current_range.__isset.table_format_params &&
44
0
        options.current_range.table_format_params.table_format_type == "jdbc") {
45
0
        _jdbc_params = std::map<std::string, std::string>(
46
0
                options.current_range.table_format_params.jdbc_params.begin(),
47
0
                options.current_range.table_format_params.jdbc_params.end());
48
0
    }
49
0
    return format::JniTableReader::prepare_split(options);
50
0
}
51
52
// need pass to the java side, so the java scanner can parse the params and construct the JDBC connection
53
0
Status JdbcJniReader::build_scanner_params(std::map<std::string, std::string>* params) const {
54
0
    DORIS_CHECK(params != nullptr);
55
0
    *params = _jdbc_params;
56
0
    if (params->contains("jdbc_driver_url")) {
57
0
        std::string resolved;
58
0
        if (JdbcUtils::resolve_driver_url((*params)["jdbc_driver_url"], &resolved).ok()) {
59
0
            (*params)["jdbc_driver_url"] = resolved;
60
0
        }
61
0
    }
62
0
    return Status::OK();
63
0
}
64
65
Status JdbcJniReader::build_jni_columns(
66
0
        std::vector<format::JniTableReader::JniColumn>* columns) const {
67
0
    DORIS_CHECK(columns != nullptr);
68
0
    columns->clear();
69
0
    columns->reserve(_projected_columns.size());
70
0
    for (size_t i = 0; i < _projected_columns.size(); ++i) {
71
0
        const auto& table_column = _projected_columns[i];
72
0
        const auto primitive_type = remove_nullable(table_column.type)->get_primitive_type();
73
0
        columns->push_back({
74
0
                .java_name = table_column.name,
75
0
                .output_index = i,
76
0
                .output_type = table_column.type,
77
0
                .transfer_type = _transfer_type_for(table_column.type),
78
0
                .replace_type = _replace_type_for(primitive_type),
79
0
        });
80
0
    }
81
0
    return Status::OK();
82
0
}
83
84
0
Status JdbcJniReader::finalize_jni_block(Block* jni_block, Block* output_block, size_t* rows) {
85
0
    DORIS_CHECK(jni_block != nullptr);
86
0
    DORIS_CHECK(output_block != nullptr);
87
0
    DORIS_CHECK(rows != nullptr);
88
0
    const auto original_rows = *rows;
89
0
    const auto& columns = jni_columns();
90
0
    DORIS_CHECK(columns.size() == jni_block->columns());
91
92
0
    for (size_t i = 0; i < columns.size(); ++i) {
93
0
        const auto& column = columns[i];
94
0
        DORIS_CHECK(column.output_type != nullptr);
95
0
        DORIS_CHECK(column.output_index < output_block->columns());
96
0
        if (_is_special_type(remove_nullable(column.output_type)->get_primitive_type())) {
97
0
            RETURN_IF_ERROR(_cast_string_to_special_type(column, jni_block, i, output_block,
98
0
                                                         original_rows));
99
0
            continue;
100
0
        }
101
0
        output_block->get_by_position(column.output_index).type = column.output_type;
102
0
        output_block->replace_by_position(column.output_index,
103
0
                                          jni_block->get_by_position(i).column);
104
0
    }
105
0
    DORIS_CHECK(output_block->rows() == original_rows);
106
0
    if (!_conjuncts.empty()) {
107
0
        RETURN_IF_ERROR(
108
0
                VExprContext::filter_block(_conjuncts, output_block, output_block->columns()));
109
0
    }
110
0
    *rows = output_block->rows();
111
0
    return Status::OK();
112
0
}
113
114
0
std::string JdbcJniReader::_replace_type_for(PrimitiveType type) const {
115
0
    switch (type) {
116
0
    case PrimitiveType::TYPE_BITMAP:
117
0
        return "bitmap";
118
0
    case PrimitiveType::TYPE_HLL:
119
0
        return "hll";
120
0
    case PrimitiveType::TYPE_QUANTILE_STATE:
121
0
        return "quantile_state";
122
0
    case PrimitiveType::TYPE_JSONB:
123
0
        return "jsonb";
124
0
    default:
125
0
        return "not_replace";
126
0
    }
127
0
}
128
129
0
bool JdbcJniReader::_is_special_type(PrimitiveType type) const {
130
0
    return type == PrimitiveType::TYPE_BITMAP || type == PrimitiveType::TYPE_HLL ||
131
0
           type == PrimitiveType::TYPE_QUANTILE_STATE || type == PrimitiveType::TYPE_JSONB;
132
0
}
133
134
0
DataTypePtr JdbcJniReader::_transfer_type_for(const DataTypePtr& output_type) const {
135
0
    DORIS_CHECK(output_type != nullptr);
136
0
    if (!_is_special_type(remove_nullable(output_type)->get_primitive_type())) {
137
0
        return output_type;
138
0
    }
139
0
    DataTypePtr string_type = std::make_shared<DataTypeString>();
140
0
    if (output_type->is_nullable()) {
141
0
        string_type = make_nullable(string_type);
142
0
    }
143
0
    return string_type;
144
0
}
145
146
Status JdbcJniReader::_cast_string_to_special_type(const format::JniTableReader::JniColumn& column,
147
                                                   Block* jni_block, size_t jni_column_index,
148
0
                                                   Block* output_block, size_t rows) {
149
0
    DORIS_CHECK(column.output_type != nullptr);
150
0
    DORIS_CHECK(column.transfer_type != nullptr);
151
0
    const auto target_type = column.output_type;
152
0
    const auto target_type_name = target_type->get_name();
153
154
0
    ColumnPtr input_column = jni_block->get_by_position(jni_column_index).column;
155
0
    ColumnPtr cast_param = target_type->create_column_const_with_default_value(1);
156
157
0
    ColumnsWithTypeAndName argument_template;
158
0
    argument_template.reserve(2);
159
0
    argument_template.emplace_back(std::move(input_column), column.transfer_type,
160
0
                                   "java.sql.String");
161
0
    argument_template.emplace_back(std::move(cast_param), target_type, target_type_name);
162
163
0
    FunctionBasePtr cast_function = SimpleFunctionFactory::instance().get_function(
164
0
            "CAST", argument_template, make_nullable(target_type));
165
0
    if (cast_function == nullptr) {
166
0
        return Status::InternalError("Failed to find CAST function for type {}", target_type_name);
167
0
    }
168
169
0
    Block cast_block(argument_template);
170
0
    const auto result_idx = cast_set<uint32_t>(cast_block.columns());
171
0
    cast_block.insert({nullptr, make_nullable(target_type), "cast_result"});
172
0
    RETURN_IF_ERROR(
173
0
            cast_function->execute(nullptr, cast_block, {0}, result_idx, cast_set<int>(rows)));
174
175
0
    auto result_column = cast_block.get_by_position(result_idx).column;
176
0
    output_block->get_by_position(column.output_index).type = target_type;
177
0
    if (target_type->is_nullable()) {
178
0
        output_block->replace_by_position(column.output_index, result_column);
179
0
    } else {
180
0
        const auto* nullable_column = assert_cast<const ColumnNullable*>(result_column.get());
181
0
        output_block->replace_by_position(column.output_index,
182
0
                                          nullable_column->get_nested_column_ptr());
183
0
    }
184
0
    return Status::OK();
185
0
}
186
187
} // namespace doris::format::jdbc