Coverage Report

Created: 2026-03-19 10:22

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/table/jdbc_jni_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "jdbc_jni_reader.h"
19
20
#include <sstream>
21
22
#include "core/block/block.h"
23
#include "core/column/column_nullable.h"
24
#include "core/data_type/data_type_nullable.h"
25
#include "core/data_type/data_type_string.h"
26
#include "core/types.h"
27
#include "exprs/function/simple_function_factory.h"
28
#include "format/jni/jni_data_bridge.h"
29
#include "runtime/descriptors.h"
30
#include "util/jdbc_utils.h"
31
32
namespace doris {
33
#include "common/compile_check_begin.h"
34
35
JdbcJniReader::JdbcJniReader(const std::vector<SlotDescriptor*>& file_slot_descs,
36
                             RuntimeState* state, RuntimeProfile* profile,
37
                             const std::map<std::string, std::string>& jdbc_params)
38
0
        : JniReader(
39
0
                  file_slot_descs, state, profile, "org/apache/doris/jdbc/JdbcJniScanner",
40
0
                  [&]() {
41
0
                      std::ostringstream required_fields;
42
0
                      std::ostringstream columns_types;
43
0
                      std::ostringstream replace_string;
44
0
                      int index = 0;
45
0
                      for (const auto& desc : file_slot_descs) {
46
0
                          std::string field = desc->col_name();
47
0
                          std::string type =
48
0
                                  JniDataBridge::get_jni_type_with_different_string(desc->type());
49
50
                          // Determine replace_string for special types
51
                          // (bitmap, hll, quantile_state, jsonb)
52
0
                          std::string replace_type = "not_replace";
53
0
                          auto ptype = desc->type()->get_primitive_type();
54
0
                          if (ptype == PrimitiveType::TYPE_BITMAP) {
55
0
                              replace_type = "bitmap";
56
0
                          } else if (ptype == PrimitiveType::TYPE_HLL) {
57
0
                              replace_type = "hll";
58
0
                          } else if (ptype == PrimitiveType::TYPE_JSONB) {
59
0
                              replace_type = "jsonb";
60
0
                          } else if (ptype == PrimitiveType::TYPE_QUANTILE_STATE) {
61
0
                              replace_type = "quantile_state";
62
0
                          }
63
64
0
                          if (index == 0) {
65
0
                              required_fields << field;
66
0
                              columns_types << type;
67
0
                              replace_string << replace_type;
68
0
                          } else {
69
0
                              required_fields << "," << field;
70
0
                              columns_types << "#" << type;
71
0
                              replace_string << "," << replace_type;
72
0
                          }
73
0
                          index++;
74
0
                      }
75
                      // Merge JDBC-specific params with schema params
76
0
                      std::map<std::string, std::string> params = jdbc_params;
77
0
                      params["required_fields"] = required_fields.str();
78
0
                      params["columns_types"] = columns_types.str();
79
0
                      params["replace_string"] = replace_string.str();
80
                      // Resolve jdbc_driver_url to absolute file:// URL
81
0
                      if (params.count("jdbc_driver_url")) {
82
0
                          std::string resolved;
83
0
                          if (JdbcUtils::resolve_driver_url(params["jdbc_driver_url"], &resolved)
84
0
                                      .ok()) {
85
0
                              params["jdbc_driver_url"] = resolved;
86
0
                          }
87
0
                      }
88
0
                      return params;
89
0
                  }(),
90
0
                  [&]() {
91
0
                      std::vector<std::string> names;
92
0
                      for (const auto& desc : file_slot_descs) {
93
0
                          names.emplace_back(desc->col_name());
94
0
                      }
95
0
                      return names;
96
0
                  }()),
97
0
          _jdbc_params(jdbc_params) {}
98
99
0
Status JdbcJniReader::init_reader() {
100
0
    return open(_state, _profile);
101
0
}
102
103
0
bool JdbcJniReader::_is_special_type(PrimitiveType type) {
104
0
    return type == PrimitiveType::TYPE_BITMAP || type == PrimitiveType::TYPE_HLL ||
105
0
           type == PrimitiveType::TYPE_QUANTILE_STATE || type == PrimitiveType::TYPE_JSONB;
106
0
}
107
108
0
Status JdbcJniReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
109
    // Identify columns with special types (bitmap, HLL, quantile_state, JSONB)
110
    // and temporarily replace them with string columns for JNI data transfer.
111
    // This follows the same pattern as the old vjdbc_connector.cpp _get_reader_params.
112
0
    struct SpecialColumnInfo {
113
0
        int block_idx;
114
0
        DataTypePtr original_type;
115
0
        ColumnPtr original_column;
116
0
    };
117
0
    std::vector<SpecialColumnInfo> special_columns;
118
119
0
    auto name_to_pos_map = block->get_name_to_pos_map();
120
0
    const auto& slots = _file_slot_descs;
121
0
    for (size_t i = 0; i < slots.size(); ++i) {
122
0
        auto* slot = slots[i];
123
0
        auto ptype = slot->type()->get_primitive_type();
124
0
        if (_is_special_type(ptype)) {
125
            // Find the block index for this column
126
0
            int block_idx = name_to_pos_map[slot->col_name()];
127
0
            auto& col_with_type = block->get_by_position(block_idx);
128
129
0
            SpecialColumnInfo info;
130
0
            info.block_idx = block_idx;
131
0
            info.original_type = col_with_type.type;
132
0
            info.original_column = col_with_type.column;
133
0
            special_columns.push_back(info);
134
135
            // Replace block column with string type
136
0
            DataTypePtr string_type = std::make_shared<DataTypeString>();
137
0
            if (slot->is_nullable()) {
138
0
                string_type = make_nullable(string_type);
139
0
            }
140
0
            block->get_by_position(block_idx).column =
141
0
                    string_type->create_column()->convert_to_full_column_if_const();
142
0
            block->get_by_position(block_idx).type = string_type;
143
0
        }
144
0
    }
145
146
    // Call parent to do the actual JNI read with string columns
147
0
    RETURN_IF_ERROR(JniReader::get_next_block(block, read_rows, eof));
148
149
    // Cast string columns back to their target types
150
0
    if (*read_rows > 0 && !special_columns.empty()) {
151
0
        for (size_t i = 0; i < slots.size(); ++i) {
152
0
            auto* slot = slots[i];
153
0
            auto ptype = slot->type()->get_primitive_type();
154
0
            if (_is_special_type(ptype)) {
155
0
                int block_idx = name_to_pos_map[slot->col_name()];
156
0
                RETURN_IF_ERROR(_cast_string_to_special_type(slot, block, block_idx,
157
0
                                                             static_cast<int>(*read_rows)));
158
0
            }
159
0
        }
160
0
    } else if (special_columns.empty()) {
161
        // No special columns, nothing to do
162
0
    } else {
163
        // No rows read but we replaced columns, restore original types for next call
164
0
        for (auto& info : special_columns) {
165
0
            block->get_by_position(info.block_idx).type = info.original_type;
166
0
            block->get_by_position(info.block_idx).column = info.original_column;
167
0
        }
168
0
    }
169
170
0
    return Status::OK();
171
0
}
172
173
Status JdbcJniReader::_cast_string_to_special_type(const SlotDescriptor* slot_desc, Block* block,
174
0
                                                   int column_index, int num_rows) {
175
0
    DataTypePtr target_data_type = slot_desc->get_data_type_ptr();
176
0
    std::string target_data_type_name = target_data_type->get_name();
177
178
    // Build input string type (nullable if slot is nullable)
179
0
    DataTypePtr input_string_type;
180
0
    if (slot_desc->is_nullable()) {
181
0
        input_string_type = make_nullable(std::make_shared<DataTypeString>());
182
0
    } else {
183
0
        input_string_type = std::make_shared<DataTypeString>();
184
0
    }
185
186
0
    auto& input_col = block->get_by_position(column_index).column;
187
188
    // Build CAST function arguments
189
0
    DataTypePtr cast_param_data_type = target_data_type;
190
0
    ColumnPtr cast_param = cast_param_data_type->create_column_const_with_default_value(1);
191
192
0
    ColumnsWithTypeAndName argument_template;
193
0
    argument_template.reserve(2);
194
0
    argument_template.emplace_back(std::move(input_col), input_string_type, "java.sql.String");
195
0
    argument_template.emplace_back(cast_param, cast_param_data_type, target_data_type_name);
196
197
0
    FunctionBasePtr func_cast = SimpleFunctionFactory::instance().get_function(
198
0
            "CAST", argument_template, make_nullable(target_data_type));
199
200
0
    if (func_cast == nullptr) {
201
0
        return Status::InternalError("Failed to find CAST function for type {}",
202
0
                                     target_data_type_name);
203
0
    }
204
205
0
    Block cast_block(argument_template);
206
0
    int result_idx = cast_block.columns();
207
0
    cast_block.insert({nullptr, make_nullable(target_data_type), "cast_result"});
208
0
    RETURN_IF_ERROR(func_cast->execute(nullptr, cast_block, {0}, result_idx, num_rows));
209
210
0
    auto res_col = cast_block.get_by_position(result_idx).column;
211
0
    block->get_by_position(column_index).type = target_data_type;
212
0
    if (target_data_type->is_nullable()) {
213
0
        block->replace_by_position(column_index, res_col);
214
0
    } else {
215
0
        auto nested_ptr =
216
0
                reinterpret_cast<const ColumnNullable*>(res_col.get())->get_nested_column_ptr();
217
0
        block->replace_by_position(column_index, nested_ptr);
218
0
    }
219
220
0
    return Status::OK();
221
0
}
222
223
#include "common/compile_check_end.h"
224
} // namespace doris