Coverage Report

Created: 2026-04-11 00:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/table/jdbc_jni_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "jdbc_jni_reader.h"
19
20
#include <sstream>
21
22
#include "core/block/block.h"
23
#include "core/column/column_nullable.h"
24
#include "core/data_type/data_type_nullable.h"
25
#include "core/data_type/data_type_string.h"
26
#include "core/types.h"
27
#include "exprs/function/simple_function_factory.h"
28
#include "format/jni/jni_data_bridge.h"
29
#include "runtime/descriptors.h"
30
#include "util/jdbc_utils.h"
31
32
namespace doris {
33
34
JdbcJniReader::JdbcJniReader(const std::vector<SlotDescriptor*>& file_slot_descs,
35
                             RuntimeState* state, RuntimeProfile* profile,
36
                             const std::map<std::string, std::string>& jdbc_params)
37
0
        : JniReader(
38
0
                  file_slot_descs, state, profile, "org/apache/doris/jdbc/JdbcJniScanner",
39
0
                  [&]() {
40
0
                      std::ostringstream required_fields;
41
0
                      std::ostringstream columns_types;
42
0
                      std::ostringstream replace_string;
43
0
                      int index = 0;
44
0
                      for (const auto& desc : file_slot_descs) {
45
0
                          std::string field = desc->col_name();
46
0
                          std::string type =
47
0
                                  JniDataBridge::get_jni_type_with_different_string(desc->type());
48
49
                          // Determine replace_string for special types
50
                          // (bitmap, hll, quantile_state, jsonb)
51
0
                          std::string replace_type = "not_replace";
52
0
                          auto ptype = desc->type()->get_primitive_type();
53
0
                          if (ptype == PrimitiveType::TYPE_BITMAP) {
54
0
                              replace_type = "bitmap";
55
0
                          } else if (ptype == PrimitiveType::TYPE_HLL) {
56
0
                              replace_type = "hll";
57
0
                          } else if (ptype == PrimitiveType::TYPE_JSONB) {
58
0
                              replace_type = "jsonb";
59
0
                          } else if (ptype == PrimitiveType::TYPE_QUANTILE_STATE) {
60
0
                              replace_type = "quantile_state";
61
0
                          }
62
63
0
                          if (index == 0) {
64
0
                              required_fields << field;
65
0
                              columns_types << type;
66
0
                              replace_string << replace_type;
67
0
                          } else {
68
0
                              required_fields << "," << field;
69
0
                              columns_types << "#" << type;
70
0
                              replace_string << "," << replace_type;
71
0
                          }
72
0
                          index++;
73
0
                      }
74
                      // Merge JDBC-specific params with schema params
75
0
                      std::map<std::string, std::string> params = jdbc_params;
76
0
                      params["required_fields"] = required_fields.str();
77
0
                      params["columns_types"] = columns_types.str();
78
0
                      params["replace_string"] = replace_string.str();
79
                      // Resolve jdbc_driver_url to absolute file:// URL
80
0
                      if (params.count("jdbc_driver_url")) {
81
0
                          std::string resolved;
82
0
                          if (JdbcUtils::resolve_driver_url(params["jdbc_driver_url"], &resolved)
83
0
                                      .ok()) {
84
0
                              params["jdbc_driver_url"] = resolved;
85
0
                          }
86
0
                      }
87
0
                      return params;
88
0
                  }(),
89
0
                  [&]() {
90
0
                      std::vector<std::string> names;
91
0
                      for (const auto& desc : file_slot_descs) {
92
0
                          names.emplace_back(desc->col_name());
93
0
                      }
94
0
                      return names;
95
0
                  }()),
96
0
          _jdbc_params(jdbc_params) {}
97
98
0
Status JdbcJniReader::init_reader() {
99
0
    return open(_state, _profile);
100
0
}
101
102
0
bool JdbcJniReader::_is_special_type(PrimitiveType type) {
103
0
    return type == PrimitiveType::TYPE_BITMAP || type == PrimitiveType::TYPE_HLL ||
104
0
           type == PrimitiveType::TYPE_QUANTILE_STATE || type == PrimitiveType::TYPE_JSONB;
105
0
}
106
107
0
Status JdbcJniReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
108
    // Identify columns with special types (bitmap, HLL, quantile_state, JSONB)
109
    // and temporarily replace them with string columns for JNI data transfer.
110
    // This follows the same pattern as the old vjdbc_connector.cpp _get_reader_params.
111
0
    struct SpecialColumnInfo {
112
0
        int block_idx;
113
0
        DataTypePtr original_type;
114
0
        ColumnPtr original_column;
115
0
    };
116
0
    std::vector<SpecialColumnInfo> special_columns;
117
118
0
    auto name_to_pos_map = block->get_name_to_pos_map();
119
0
    const auto& slots = _file_slot_descs;
120
0
    for (size_t i = 0; i < slots.size(); ++i) {
121
0
        auto* slot = slots[i];
122
0
        auto ptype = slot->type()->get_primitive_type();
123
0
        if (_is_special_type(ptype)) {
124
            // Find the block index for this column
125
0
            int block_idx = name_to_pos_map[slot->col_name()];
126
0
            auto& col_with_type = block->get_by_position(block_idx);
127
128
0
            SpecialColumnInfo info;
129
0
            info.block_idx = block_idx;
130
0
            info.original_type = col_with_type.type;
131
0
            info.original_column = col_with_type.column;
132
0
            special_columns.push_back(info);
133
134
            // Replace block column with string type
135
0
            DataTypePtr string_type = std::make_shared<DataTypeString>();
136
0
            if (slot->is_nullable()) {
137
0
                string_type = make_nullable(string_type);
138
0
            }
139
0
            block->get_by_position(block_idx).column =
140
0
                    string_type->create_column()->convert_to_full_column_if_const();
141
0
            block->get_by_position(block_idx).type = string_type;
142
0
        }
143
0
    }
144
145
    // Call parent to do the actual JNI read with string columns
146
0
    RETURN_IF_ERROR(JniReader::get_next_block(block, read_rows, eof));
147
148
    // Cast string columns back to their target types
149
0
    if (*read_rows > 0 && !special_columns.empty()) {
150
0
        for (size_t i = 0; i < slots.size(); ++i) {
151
0
            auto* slot = slots[i];
152
0
            auto ptype = slot->type()->get_primitive_type();
153
0
            if (_is_special_type(ptype)) {
154
0
                int block_idx = name_to_pos_map[slot->col_name()];
155
0
                RETURN_IF_ERROR(_cast_string_to_special_type(slot, block, block_idx,
156
0
                                                             static_cast<int>(*read_rows)));
157
0
            }
158
0
        }
159
0
    } else if (special_columns.empty()) {
160
        // No special columns, nothing to do
161
0
    } else {
162
        // No rows read but we replaced columns, restore original types for next call
163
0
        for (auto& info : special_columns) {
164
0
            block->get_by_position(info.block_idx).type = info.original_type;
165
0
            block->get_by_position(info.block_idx).column = info.original_column;
166
0
        }
167
0
    }
168
169
0
    return Status::OK();
170
0
}
171
172
Status JdbcJniReader::_cast_string_to_special_type(const SlotDescriptor* slot_desc, Block* block,
173
0
                                                   int column_index, int num_rows) {
174
0
    DataTypePtr target_data_type = slot_desc->get_data_type_ptr();
175
0
    std::string target_data_type_name = target_data_type->get_name();
176
177
    // Build input string type (nullable if slot is nullable)
178
0
    DataTypePtr input_string_type;
179
0
    if (slot_desc->is_nullable()) {
180
0
        input_string_type = make_nullable(std::make_shared<DataTypeString>());
181
0
    } else {
182
0
        input_string_type = std::make_shared<DataTypeString>();
183
0
    }
184
185
0
    auto& input_col = block->get_by_position(column_index).column;
186
187
    // Build CAST function arguments
188
0
    DataTypePtr cast_param_data_type = target_data_type;
189
0
    ColumnPtr cast_param = cast_param_data_type->create_column_const_with_default_value(1);
190
191
0
    ColumnsWithTypeAndName argument_template;
192
0
    argument_template.reserve(2);
193
0
    argument_template.emplace_back(std::move(input_col), input_string_type, "java.sql.String");
194
0
    argument_template.emplace_back(cast_param, cast_param_data_type, target_data_type_name);
195
196
0
    FunctionBasePtr func_cast = SimpleFunctionFactory::instance().get_function(
197
0
            "CAST", argument_template, make_nullable(target_data_type));
198
199
0
    if (func_cast == nullptr) {
200
0
        return Status::InternalError("Failed to find CAST function for type {}",
201
0
                                     target_data_type_name);
202
0
    }
203
204
0
    Block cast_block(argument_template);
205
0
    int result_idx = cast_block.columns();
206
0
    cast_block.insert({nullptr, make_nullable(target_data_type), "cast_result"});
207
0
    RETURN_IF_ERROR(func_cast->execute(nullptr, cast_block, {0}, result_idx, num_rows));
208
209
0
    auto res_col = cast_block.get_by_position(result_idx).column;
210
0
    block->get_by_position(column_index).type = target_data_type;
211
0
    if (target_data_type->is_nullable()) {
212
0
        block->replace_by_position(column_index, res_col);
213
0
    } else {
214
0
        auto nested_ptr =
215
0
                reinterpret_cast<const ColumnNullable*>(res_col.get())->get_nested_column_ptr();
216
0
        block->replace_by_position(column_index, nested_ptr);
217
0
    }
218
219
0
    return Status::OK();
220
0
}
221
222
} // namespace doris