Coverage Report

Created: 2026-03-15 17:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/connector/vjdbc_connector.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/connector/vjdbc_connector.h"
19
20
#include <gen_cpp/Types_types.h>
21
22
#include <algorithm>
23
// IWYU pragma: no_include <bits/std_abs.h>
24
#include <cmath> // IWYU pragma: keep
25
#include <memory>
26
#include <ostream>
27
#include <utility>
28
29
#include "absl/strings/substitute.h"
30
#include "cloud/config.h"
31
#include "common/logging.h"
32
#include "common/status.h"
33
#include "core/block/block.h"
34
#include "core/column/column_nullable.h"
35
#include "core/data_type/data_type_nullable.h"
36
#include "core/data_type/data_type_string.h"
37
#include "exec/connector/jni_connector.h"
38
#include "exec/table_connector.h"
39
#include "exprs/function/simple_function_factory.h"
40
#include "exprs/vexpr.h"
41
#include "jni.h"
42
#include "runtime/descriptors.h"
43
#include "runtime/plugin/cloud_plugin_downloader.h"
44
#include "runtime/runtime_profile.h"
45
#include "runtime/runtime_state.h"
46
#include "runtime/user_function_cache.h"
47
#include "util/jni-util.h"
48
49
namespace doris {
50
#include "common/compile_check_begin.h"
51
const char* JDBC_EXECUTOR_FACTORY_CLASS = "org/apache/doris/jdbc/JdbcExecutorFactory";
52
const char* JDBC_EXECUTOR_CTOR_SIGNATURE = "([B)V";
53
const char* JDBC_EXECUTOR_STMT_WRITE_SIGNATURE = "(Ljava/util/Map;)I";
54
const char* JDBC_EXECUTOR_HAS_NEXT_SIGNATURE = "()Z";
55
const char* JDBC_EXECUTOR_CLOSE_SIGNATURE = "()V";
56
const char* JDBC_EXECUTOR_TRANSACTION_SIGNATURE = "()V";
57
58
JdbcConnector::JdbcConnector(const JdbcConnectorParam& param)
59
11
        : TableConnector(param.tuple_desc, param.use_transaction, param.table_name,
60
11
                         param.query_string),
61
11
          _conn_param(param),
62
11
          _closed(false) {}
63
64
11
JdbcConnector::~JdbcConnector() {
65
11
    if (!_closed) {
66
11
        static_cast<void>(close());
67
11
    }
68
11
}
69
70
11
Status JdbcConnector::close(Status /*unused*/) {
71
11
    SCOPED_RAW_TIMER(&_jdbc_statistic._connector_close_timer);
72
11
    if (_closed) {
73
0
        return Status::OK();
74
0
    }
75
11
    if (!_is_open) {
76
11
        _closed = true;
77
11
        return Status::OK();
78
11
    }
79
80
    // Try to abort transaction and call Java close(), but don't block cleanup
81
0
    if (_is_in_transaction) {
82
0
        Status abort_status = abort_trans();
83
0
        if (!abort_status.ok()) {
84
0
            LOG(WARNING) << "Failed to abort transaction: " << abort_status.to_string();
85
0
        }
86
0
    }
87
88
0
    JNIEnv* env = nullptr;
89
0
    RETURN_IF_ERROR(Jni::Env::Get(&env));
90
0
    RETURN_IF_ERROR(
91
0
            _executor_obj.call_nonvirtual_void_method(env, _executor_clazz, _executor_close_id)
92
0
                    .call());
93
0
    _closed = true;
94
0
    return Status::OK();
95
0
}
96
97
0
Status JdbcConnector::open(RuntimeState* state, bool read) {
98
0
    if (_is_open) {
99
0
        LOG(INFO) << "this scanner of jdbc already opened";
100
0
        return Status::OK();
101
0
    }
102
103
0
    JNIEnv* env = nullptr;
104
0
    RETURN_IF_ERROR(Jni::Env::Get(&env));
105
106
0
    RETURN_IF_ERROR(Jni::Util::get_jni_scanner_class(env, JDBC_EXECUTOR_FACTORY_CLASS,
107
0
                                                     &_executor_factory_clazz));
108
109
0
    RETURN_IF_ERROR(_executor_factory_clazz.get_static_method(
110
0
            env, "getExecutorClass", "(Lorg/apache/doris/thrift/TOdbcTableType;)Ljava/lang/String;",
111
0
            &_executor_factory_ctor_id));
112
113
0
    Jni::LocalObject jtable_type;
114
0
    RETURN_IF_ERROR(_get_java_table_type(env, _conn_param.table_type, &jtable_type));
115
116
0
    Jni::LocalString executor_name;
117
0
    RETURN_IF_ERROR(
118
0
            _executor_factory_clazz.call_static_object_method(env, _executor_factory_ctor_id)
119
0
                    .with_arg(jtable_type)
120
0
                    .call(&executor_name));
121
122
0
    Jni::LocalStringBufferGuard executor_name_str;
123
0
    RETURN_IF_ERROR(executor_name.get_string_chars(env, &executor_name_str));
124
125
0
    RETURN_IF_ERROR(
126
0
            Jni::Util::get_jni_scanner_class(env, executor_name_str.get(), &_executor_clazz));
127
128
0
#undef GET_BASIC_JAVA_CLAZZ
129
0
    RETURN_IF_ERROR(_register_func_id(env));
130
131
0
    std::string driver_path;
132
0
    RETURN_IF_ERROR(_get_real_url(_conn_param.driver_path, &driver_path));
133
134
0
    TJdbcExecutorCtorParams ctor_params;
135
0
    ctor_params.__set_statement(_sql_str);
136
0
    ctor_params.__set_catalog_id(_conn_param.catalog_id);
137
0
    ctor_params.__set_jdbc_url(_conn_param.jdbc_url);
138
0
    ctor_params.__set_jdbc_user(_conn_param.user);
139
0
    ctor_params.__set_jdbc_password(_conn_param.passwd);
140
0
    ctor_params.__set_jdbc_driver_class(_conn_param.driver_class);
141
0
    ctor_params.__set_driver_path(driver_path);
142
0
    ctor_params.__set_jdbc_driver_checksum(_conn_param.driver_checksum);
143
0
    if (state == nullptr) {
144
0
        ctor_params.__set_batch_size(read ? 1 : 0);
145
0
    } else {
146
0
        ctor_params.__set_batch_size(read ? state->batch_size() : 0);
147
0
    }
148
0
    ctor_params.__set_op(read ? TJdbcOperation::READ : TJdbcOperation::WRITE);
149
0
    ctor_params.__set_table_type(_conn_param.table_type);
150
0
    ctor_params.__set_connection_pool_min_size(_conn_param.connection_pool_min_size);
151
0
    ctor_params.__set_connection_pool_max_size(_conn_param.connection_pool_max_size);
152
0
    ctor_params.__set_connection_pool_max_wait_time(_conn_param.connection_pool_max_wait_time);
153
0
    ctor_params.__set_connection_pool_max_life_time(_conn_param.connection_pool_max_life_time);
154
0
    ctor_params.__set_connection_pool_cache_clear_time(
155
0
            config::jdbc_connection_pool_cache_clear_time_sec);
156
0
    ctor_params.__set_connection_pool_keep_alive(_conn_param.connection_pool_keep_alive);
157
0
    ctor_params.__set_is_tvf(_conn_param.is_tvf);
158
159
0
    Jni::LocalArray ctor_params_bytes;
160
0
    RETURN_IF_ERROR(Jni::Util::SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
161
162
0
    {
163
0
        SCOPED_RAW_TIMER(&_jdbc_statistic._init_connector_timer);
164
0
        RETURN_IF_ERROR(_executor_clazz.new_object(env, _executor_ctor_id)
165
0
                                .with_arg(ctor_params_bytes)
166
0
                                .call(&_executor_obj));
167
0
    }
168
0
    _is_open = true;
169
0
    RETURN_IF_ERROR(begin_trans());
170
171
0
    return Status::OK();
172
0
}
173
174
0
Status JdbcConnector::test_connection() {
175
0
    RETURN_IF_ERROR(open(nullptr, true));
176
177
0
    JNIEnv* env = nullptr;
178
0
    RETURN_IF_ERROR(Jni::Env::Get(&env));
179
180
0
    return _executor_obj
181
0
            .call_nonvirtual_void_method(env, _executor_clazz, _executor_test_connection_id)
182
0
            .call();
183
0
}
184
185
0
Status JdbcConnector::clean_datasource() {
186
0
    if (!_is_open) {
187
0
        return Status::OK();
188
0
    }
189
0
    JNIEnv* env = nullptr;
190
0
    RETURN_IF_ERROR(Jni::Env::Get(&env));
191
192
0
    return _executor_obj
193
0
            .call_nonvirtual_void_method(env, _executor_clazz, _executor_clean_datasource_id)
194
0
            .call();
195
0
}
196
197
0
Status JdbcConnector::query() {
198
0
    if (!_is_open) {
199
0
        return Status::InternalError("Query before open of JdbcConnector.");
200
0
    }
201
    // check materialize num equal
202
0
    auto materialize_num = _tuple_desc->slots().size();
203
204
0
    JNIEnv* env = nullptr;
205
0
    RETURN_IF_ERROR(Jni::Env::Get(&env));
206
0
    {
207
0
        SCOPED_RAW_TIMER(&_jdbc_statistic._execte_read_timer);
208
209
0
        jint colunm_count = 0;
210
0
        auto st = _executor_obj.call_nonvirtual_int_method(env, _executor_clazz, _executor_read_id)
211
0
                          .call(&colunm_count);
212
0
        if (!st.ok()) {
213
0
            return Status::InternalError("GetJniExceptionMsg meet error, query={}, msg={}",
214
0
                                         _conn_param.query_string, st.to_string());
215
0
        }
216
0
        if (colunm_count < materialize_num) {
217
0
            return Status::InternalError(
218
0
                    "JDBC query returned fewer columns ({}) than required ({}).", colunm_count,
219
0
                    materialize_num);
220
0
        }
221
0
    }
222
223
0
    LOG(INFO) << "JdbcConnector::query has exec success: " << _sql_str;
224
0
    return Status::OK();
225
0
}
226
227
0
Status JdbcConnector::get_next(bool* eos, Block* block, int batch_size) {
228
0
    SCOPED_RAW_TIMER(&_jdbc_statistic._get_data_timer); // Timer for the entire method
229
230
0
    if (!_is_open) {
231
0
        return Status::InternalError("get_next before open of jdbc connector.");
232
0
    }
233
234
0
    JNIEnv* env = nullptr;
235
0
    {
236
0
        SCOPED_RAW_TIMER(&_jdbc_statistic._jni_setup_timer); // Timer for setting up JNI environment
237
0
        RETURN_IF_ERROR(Jni::Env::Get(&env));
238
0
    } // _jni_setup_timer stops when going out of this scope
239
240
0
    jboolean has_next = JNI_FALSE;
241
0
    {
242
0
        SCOPED_RAW_TIMER(&_jdbc_statistic._has_next_timer); // Timer for hasNext check
243
244
0
        RETURN_IF_ERROR(
245
0
                _executor_obj
246
0
                        .call_nonvirtual_boolean_method(env, _executor_clazz, _executor_has_next_id)
247
0
                        .call(&has_next));
248
0
    } // _has_next_timer stops here
249
250
0
    if (has_next != JNI_TRUE) {
251
0
        *eos = true;
252
0
        return Status::OK();
253
0
    }
254
255
0
    auto column_size = _tuple_desc->slots().size();
256
0
    auto slots = _tuple_desc->slots();
257
258
0
    Jni::LocalObject map;
259
0
    {
260
0
        SCOPED_RAW_TIMER(&_jdbc_statistic._prepare_params_timer); // Timer for preparing params
261
0
        RETURN_IF_ERROR(_get_reader_params(block, env, column_size, &map));
262
0
    } // _prepare_params_timer stops here
263
264
0
    long address = 0;
265
0
    {
266
0
        SCOPED_RAW_TIMER(
267
0
                &_jdbc_statistic
268
0
                         ._read_and_fill_vector_table_timer); // Timer for getBlockAddress call
269
0
        RETURN_IF_ERROR(_executor_obj.call_long_method(env, _executor_get_block_address_id)
270
0
                                .with_arg(batch_size)
271
0
                                .with_arg(map)
272
0
                                .call(&address));
273
0
    } // _get_block_address_timer stops here
274
275
0
    std::vector<uint32_t> all_columns;
276
0
    for (uint32_t i = 0; i < column_size; ++i) {
277
0
        all_columns.push_back(i);
278
0
    }
279
280
0
    Status fill_block_status;
281
0
    {
282
0
        SCOPED_RAW_TIMER(&_jdbc_statistic._fill_block_timer); // Timer for fill_block
283
0
        fill_block_status = JniConnector::fill_block(block, all_columns, address);
284
0
    } // _fill_block_timer stops here
285
286
0
    if (!fill_block_status) {
287
0
        return fill_block_status;
288
0
    }
289
290
0
    Status cast_status;
291
0
    {
292
0
        SCOPED_RAW_TIMER(&_jdbc_statistic._cast_timer); // Timer for casting process
293
0
        cast_status = _cast_string_to_special(block, env, column_size);
294
0
    } // _cast_timer stops here
295
296
0
    return Status::OK();
297
0
}
298
299
Status JdbcConnector::append(Block* block, const VExprContextSPtrs& output_vexpr_ctxs,
300
                             uint32_t start_send_row, uint32_t* num_rows_sent,
301
0
                             TOdbcTableType::type table_type) {
302
0
    RETURN_IF_ERROR(exec_stmt_write(block, output_vexpr_ctxs, num_rows_sent));
303
0
    COUNTER_UPDATE(_sent_rows_counter, *num_rows_sent);
304
0
    return Status::OK();
305
0
}
306
307
Status JdbcConnector::exec_stmt_write(Block* block, const VExprContextSPtrs& output_vexpr_ctxs,
308
0
                                      uint32_t* num_rows_sent) {
309
0
    SCOPED_TIMER(_result_send_timer);
310
0
    JNIEnv* env = nullptr;
311
0
    RETURN_IF_ERROR(Jni::Env::Get(&env));
312
313
    // prepare table meta information
314
0
    std::unique_ptr<long[]> meta_data;
315
0
    RETURN_IF_ERROR(JniConnector::to_java_table(block, meta_data));
316
0
    long meta_address = (long)meta_data.get();
317
0
    auto table_schema = JniConnector::parse_table_schema(block);
318
319
    // prepare constructor parameters
320
0
    std::map<String, String> write_params = {{"meta_address", std::to_string(meta_address)},
321
0
                                             {"required_fields", table_schema.first},
322
0
                                             {"columns_types", table_schema.second}};
323
0
    Jni::LocalObject hashmap_object;
324
0
    RETURN_IF_ERROR(Jni::Util::convert_to_java_map(env, write_params, &hashmap_object));
325
326
0
    RETURN_IF_ERROR(
327
0
            _executor_obj.call_nonvirtual_int_method(env, _executor_clazz, _executor_stmt_write_id)
328
0
                    .with_arg(hashmap_object)
329
0
                    .call());
330
331
0
    *num_rows_sent = static_cast<uint32_t>(block->rows());
332
0
    return Status::OK();
333
0
}
334
335
0
Status JdbcConnector::begin_trans() {
336
0
    if (_use_tranaction) {
337
0
        JNIEnv* env = nullptr;
338
0
        RETURN_IF_ERROR(Jni::Env::Get(&env));
339
340
0
        RETURN_IF_ERROR(
341
0
                _executor_obj
342
0
                        .call_nonvirtual_void_method(env, _executor_clazz, _executor_begin_trans_id)
343
0
                        .call());
344
0
        _is_in_transaction = true;
345
0
    }
346
0
    return Status::OK();
347
0
}
348
349
0
Status JdbcConnector::abort_trans() {
350
0
    if (!_is_in_transaction) {
351
0
        return Status::InternalError("Abort transaction before begin trans.");
352
0
    }
353
0
    JNIEnv* env = nullptr;
354
0
    RETURN_IF_ERROR(Jni::Env::Get(&env));
355
356
0
    RETURN_IF_ERROR(
357
0
            _executor_obj
358
0
                    .call_nonvirtual_void_method(env, _executor_clazz, _executor_abort_trans_id)
359
0
                    .call());
360
0
    return Status::OK();
361
0
}
362
363
0
Status JdbcConnector::finish_trans() {
364
0
    if (_use_tranaction && _is_in_transaction) {
365
0
        JNIEnv* env = nullptr;
366
0
        RETURN_IF_ERROR(Jni::Env::Get(&env));
367
368
0
        RETURN_IF_ERROR(_executor_obj
369
0
                                .call_nonvirtual_void_method(env, _executor_clazz,
370
0
                                                             _executor_finish_trans_id)
371
0
                                .call());
372
373
0
        _is_in_transaction = false;
374
0
    }
375
0
    return Status::OK();
376
0
}
377
378
0
Status JdbcConnector::_register_func_id(JNIEnv* env) {
379
0
    RETURN_IF_ERROR(_executor_clazz.get_method(env, "<init>", JDBC_EXECUTOR_CTOR_SIGNATURE,
380
0
                                               &_executor_ctor_id));
381
0
    RETURN_IF_ERROR(_executor_clazz.get_method(env, "write", JDBC_EXECUTOR_STMT_WRITE_SIGNATURE,
382
0
                                               &_executor_stmt_write_id));
383
0
    RETURN_IF_ERROR(_executor_clazz.get_method(env, "read", "()I", &_executor_read_id));
384
0
    RETURN_IF_ERROR(_executor_clazz.get_method(env, "close", JDBC_EXECUTOR_CLOSE_SIGNATURE,
385
0
                                               &_executor_close_id));
386
0
    RETURN_IF_ERROR(_executor_clazz.get_method(env, "hasNext", JDBC_EXECUTOR_HAS_NEXT_SIGNATURE,
387
0
                                               &_executor_has_next_id));
388
0
    RETURN_IF_ERROR(_executor_clazz.get_method(env, "getBlockAddress", "(ILjava/util/Map;)J",
389
0
                                               &_executor_get_block_address_id));
390
0
    RETURN_IF_ERROR(
391
0
            _executor_clazz.get_method(env, "getCurBlockRows", "()I", &_executor_block_rows_id));
392
393
0
    RETURN_IF_ERROR(_executor_clazz.get_method(
394
0
            env, "openTrans", JDBC_EXECUTOR_TRANSACTION_SIGNATURE, &_executor_begin_trans_id));
395
0
    RETURN_IF_ERROR(_executor_clazz.get_method(
396
0
            env, "commitTrans", JDBC_EXECUTOR_TRANSACTION_SIGNATURE, &_executor_finish_trans_id));
397
0
    RETURN_IF_ERROR(_executor_clazz.get_method(
398
0
            env, "rollbackTrans", JDBC_EXECUTOR_TRANSACTION_SIGNATURE, &_executor_abort_trans_id));
399
0
    RETURN_IF_ERROR(_executor_clazz.get_method(env, "testConnection", "()V",
400
0
                                               &_executor_test_connection_id));
401
0
    RETURN_IF_ERROR(_executor_clazz.get_method(env, "cleanDataSource", "()V",
402
0
                                               &_executor_clean_datasource_id));
403
404
0
    return Status::OK();
405
0
}
406
407
Status JdbcConnector::_get_reader_params(Block* block, JNIEnv* env, size_t column_size,
408
0
                                         Jni::LocalObject* ans) {
409
0
    std::ostringstream columns_nullable;
410
0
    std::ostringstream columns_replace_string;
411
0
    std::ostringstream required_fields;
412
0
    std::ostringstream columns_types;
413
414
0
    for (int i = 0; i < column_size; ++i) {
415
0
        auto* slot = _tuple_desc->slots()[i];
416
0
        auto type = slot->type();
417
        // Record if column is nullable
418
0
        columns_nullable << (slot->is_nullable() ? "true" : "false") << ",";
419
        // Check column type and replace accordingly
420
0
        std::string replace_type = "not_replace";
421
0
        if (type->get_primitive_type() == PrimitiveType::TYPE_BITMAP) {
422
0
            replace_type = "bitmap";
423
0
        } else if (type->get_primitive_type() == PrimitiveType::TYPE_HLL) {
424
0
            replace_type = "hll";
425
0
        } else if (type->get_primitive_type() == PrimitiveType::TYPE_JSONB) {
426
0
            replace_type = "jsonb";
427
0
        }
428
0
        columns_replace_string << replace_type << ",";
429
0
        if (replace_type != "not_replace") {
430
0
            block->get_by_position(i).column = std::make_shared<DataTypeString>()
431
0
                                                       ->create_column()
432
0
                                                       ->convert_to_full_column_if_const();
433
0
            block->get_by_position(i).type = std::make_shared<DataTypeString>();
434
0
            if (slot->is_nullable()) {
435
0
                block->get_by_position(i).column = make_nullable(block->get_by_position(i).column);
436
0
                block->get_by_position(i).type = make_nullable(block->get_by_position(i).type);
437
0
            }
438
0
        }
439
        // Record required fields and column types
440
0
        std::string field = slot->col_name();
441
0
        std::string jni_type;
442
0
        if (slot->type()->get_primitive_type() == PrimitiveType::TYPE_BITMAP ||
443
0
            slot->type()->get_primitive_type() == PrimitiveType::TYPE_HLL ||
444
0
            slot->type()->get_primitive_type() == PrimitiveType::TYPE_JSONB) {
445
0
            jni_type = "string";
446
0
        } else {
447
0
            jni_type = JniConnector::get_jni_type_with_different_string(slot->type());
448
0
        }
449
0
        required_fields << (i != 0 ? "," : "") << field;
450
0
        columns_types << (i != 0 ? "#" : "") << jni_type;
451
0
    }
452
453
0
    std::map<String, String> reader_params = {{"is_nullable", columns_nullable.str()},
454
0
                                              {"replace_string", columns_replace_string.str()},
455
0
                                              {"required_fields", required_fields.str()},
456
0
                                              {"columns_types", columns_types.str()}};
457
0
    return Jni::Util::convert_to_java_map(env, reader_params, ans);
458
0
}
459
460
0
Status JdbcConnector::_cast_string_to_special(Block* block, JNIEnv* env, size_t column_size) {
461
0
    for (size_t column_index = 0; column_index < column_size; ++column_index) {
462
0
        auto* slot_desc = _tuple_desc->slots()[column_index];
463
0
        jint num_rows = 0;
464
0
        RETURN_IF_ERROR(
465
0
                _executor_obj
466
0
                        .call_nonvirtual_int_method(env, _executor_clazz, _executor_block_rows_id)
467
0
                        .call(&num_rows));
468
469
0
        if (slot_desc->type()->get_primitive_type() == PrimitiveType::TYPE_HLL) {
470
0
            RETURN_IF_ERROR(_cast_string_to_hll(slot_desc, block, static_cast<int>(column_index),
471
0
                                                static_cast<int>(num_rows)));
472
0
        } else if (slot_desc->type()->get_primitive_type() == PrimitiveType::TYPE_JSONB) {
473
0
            RETURN_IF_ERROR(_cast_string_to_json(slot_desc, block, static_cast<int>(column_index),
474
0
                                                 static_cast<int>(num_rows)));
475
0
        } else if (slot_desc->type()->get_primitive_type() == PrimitiveType::TYPE_BITMAP) {
476
0
            RETURN_IF_ERROR(_cast_string_to_bitmap(slot_desc, block, static_cast<int>(column_index),
477
0
                                                   static_cast<int>(num_rows)));
478
0
        }
479
0
    }
480
0
    return Status::OK();
481
0
}
482
483
Status JdbcConnector::_cast_string_to_hll(const SlotDescriptor* slot_desc, Block* block,
484
0
                                          int column_index, int rows) {
485
0
    _map_column_idx_to_cast_idx_hll[column_index] =
486
0
            static_cast<int>(_input_hll_string_types.size());
487
0
    if (slot_desc->is_nullable()) {
488
0
        _input_hll_string_types.push_back(make_nullable(std::make_shared<DataTypeString>()));
489
0
    } else {
490
0
        _input_hll_string_types.push_back(std::make_shared<DataTypeString>());
491
0
    }
492
493
0
    DataTypePtr _target_data_type = slot_desc->get_data_type_ptr();
494
0
    std::string _target_data_type_name = _target_data_type->get_name();
495
0
    DataTypePtr _cast_param_data_type = _target_data_type;
496
0
    ColumnPtr _cast_param = _cast_param_data_type->create_column_const_with_default_value(1);
497
498
0
    auto& input_col = block->get_by_position(column_index).column;
499
500
0
    ColumnsWithTypeAndName argument_template;
501
0
    argument_template.reserve(2);
502
0
    argument_template.emplace_back(
503
0
            std::move(input_col),
504
0
            _input_hll_string_types[_map_column_idx_to_cast_idx_hll[column_index]],
505
0
            "java.sql.String");
506
0
    argument_template.emplace_back(_cast_param, _cast_param_data_type, _target_data_type_name);
507
0
    FunctionBasePtr func_cast = SimpleFunctionFactory::instance().get_function(
508
0
            "CAST", argument_template, make_nullable(_target_data_type));
509
510
0
    Block cast_block(argument_template);
511
0
    int result_idx = cast_block.columns();
512
0
    cast_block.insert({nullptr, make_nullable(_target_data_type), "cast_result"});
513
0
    RETURN_IF_ERROR(func_cast->execute(nullptr, cast_block, {0}, result_idx, rows));
514
515
0
    auto res_col = cast_block.get_by_position(result_idx).column;
516
0
    block->get_by_position(column_index).type = _target_data_type;
517
0
    if (_target_data_type->is_nullable()) {
518
0
        block->replace_by_position(column_index, res_col);
519
0
    } else {
520
0
        auto nested_ptr =
521
0
                reinterpret_cast<const ColumnNullable*>(res_col.get())->get_nested_column_ptr();
522
0
        block->replace_by_position(column_index, nested_ptr);
523
0
    }
524
525
0
    return Status::OK();
526
0
}
527
528
Status JdbcConnector::_cast_string_to_bitmap(const SlotDescriptor* slot_desc, Block* block,
529
0
                                             int column_index, int rows) {
530
0
    _map_column_idx_to_cast_idx_bitmap[column_index] =
531
0
            static_cast<int>(_input_bitmap_string_types.size());
532
0
    if (slot_desc->is_nullable()) {
533
0
        _input_bitmap_string_types.push_back(make_nullable(std::make_shared<DataTypeString>()));
534
0
    } else {
535
0
        _input_bitmap_string_types.push_back(std::make_shared<DataTypeString>());
536
0
    }
537
538
0
    DataTypePtr _target_data_type = slot_desc->get_data_type_ptr();
539
0
    std::string _target_data_type_name = _target_data_type->get_name();
540
0
    DataTypePtr _cast_param_data_type = _target_data_type;
541
0
    ColumnPtr _cast_param = _cast_param_data_type->create_column_const_with_default_value(1);
542
543
0
    auto& input_col = block->get_by_position(column_index).column;
544
545
0
    ColumnsWithTypeAndName argument_template;
546
0
    argument_template.reserve(2);
547
0
    argument_template.emplace_back(
548
0
            std::move(input_col),
549
0
            _input_bitmap_string_types[_map_column_idx_to_cast_idx_bitmap[column_index]],
550
0
            "java.sql.String");
551
0
    argument_template.emplace_back(_cast_param, _cast_param_data_type, _target_data_type_name);
552
0
    FunctionBasePtr func_cast = SimpleFunctionFactory::instance().get_function(
553
0
            "CAST", argument_template, make_nullable(_target_data_type));
554
555
0
    Block cast_block(argument_template);
556
0
    int result_idx = cast_block.columns();
557
0
    cast_block.insert({nullptr, make_nullable(_target_data_type), "cast_result"});
558
0
    RETURN_IF_ERROR(func_cast->execute(nullptr, cast_block, {0}, result_idx, rows));
559
560
0
    auto res_col = cast_block.get_by_position(result_idx).column;
561
0
    block->get_by_position(column_index).type = _target_data_type;
562
0
    if (_target_data_type->is_nullable()) {
563
0
        block->replace_by_position(column_index, res_col);
564
0
    } else {
565
0
        auto nested_ptr =
566
0
                reinterpret_cast<const ColumnNullable*>(res_col.get())->get_nested_column_ptr();
567
0
        block->replace_by_position(column_index, nested_ptr);
568
0
    }
569
570
0
    return Status::OK();
571
0
}
572
573
// Deprecated, this code is retained only for compatibility with query problems that may be encountered when upgrading the version that maps JSON to JSONB to this version, and will be deleted in subsequent versions.
574
Status JdbcConnector::_cast_string_to_json(const SlotDescriptor* slot_desc, Block* block,
575
0
                                           int column_index, int rows) {
576
0
    _map_column_idx_to_cast_idx_json[column_index] =
577
0
            static_cast<int>(_input_json_string_types.size());
578
0
    if (slot_desc->is_nullable()) {
579
0
        _input_json_string_types.push_back(make_nullable(std::make_shared<DataTypeString>()));
580
0
    } else {
581
0
        _input_json_string_types.push_back(std::make_shared<DataTypeString>());
582
0
    }
583
0
    DataTypePtr _target_data_type = slot_desc->get_data_type_ptr();
584
0
    std::string _target_data_type_name = _target_data_type->get_name();
585
0
    DataTypePtr _cast_param_data_type = _target_data_type;
586
0
    ColumnPtr _cast_param =
587
0
            _cast_param_data_type->create_column_const(1, Field::create_field<TYPE_STRING>("{}"));
588
589
0
    auto& input_col = block->get_by_position(column_index).column;
590
591
0
    ColumnsWithTypeAndName argument_template;
592
0
    argument_template.reserve(2);
593
0
    argument_template.emplace_back(
594
0
            std::move(input_col),
595
0
            _input_json_string_types[_map_column_idx_to_cast_idx_json[column_index]],
596
0
            "java.sql.String");
597
0
    argument_template.emplace_back(_cast_param, _cast_param_data_type, _target_data_type_name);
598
0
    FunctionBasePtr func_cast = SimpleFunctionFactory::instance().get_function(
599
0
            "CAST", argument_template, make_nullable(_target_data_type));
600
601
0
    Block cast_block(argument_template);
602
0
    int result_idx = cast_block.columns();
603
0
    cast_block.insert({nullptr, make_nullable(_target_data_type), "cast_result"});
604
0
    RETURN_IF_ERROR(func_cast->execute(nullptr, cast_block, {0}, result_idx, rows));
605
606
0
    auto res_col = cast_block.get_by_position(result_idx).column;
607
0
    block->get_by_position(column_index).type = _target_data_type;
608
0
    if (_target_data_type->is_nullable()) {
609
0
        block->replace_by_position(column_index, res_col);
610
0
    } else {
611
0
        auto nested_ptr =
612
0
                reinterpret_cast<const ColumnNullable*>(res_col.get())->get_nested_column_ptr();
613
0
        block->replace_by_position(column_index, nested_ptr);
614
0
    }
615
616
0
    return Status::OK();
617
0
}
618
619
Status JdbcConnector::_get_java_table_type(JNIEnv* env, TOdbcTableType::type table_type,
620
0
                                           Jni::LocalObject* java_enum_obj) {
621
0
    Jni::LocalClass enum_class;
622
0
    RETURN_IF_ERROR(
623
0
            Jni::Util::find_class(env, "org/apache/doris/thrift/TOdbcTableType", &enum_class));
624
625
0
    Jni::MethodId find_by_value_method;
626
0
    RETURN_IF_ERROR(enum_class.get_static_method(env, "findByValue",
627
0
                                                 "(I)Lorg/apache/doris/thrift/TOdbcTableType;",
628
0
                                                 &find_by_value_method));
629
630
0
    return enum_class.call_static_object_method(env, find_by_value_method)
631
0
            .with_arg(static_cast<jint>(table_type))
632
0
            .call(java_enum_obj);
633
0
}
634
635
15
Status JdbcConnector::_get_real_url(const std::string& url, std::string* result_url) {
636
15
    if (url.find(":/") == std::string::npos) {
637
7
        return _check_and_return_default_driver_url(url, result_url);
638
7
    }
639
8
    *result_url = url;
640
8
    return Status::OK();
641
15
}
642
643
Status JdbcConnector::_check_and_return_default_driver_url(const std::string& url,
644
15
                                                           std::string* result_url) {
645
15
    const char* doris_home = std::getenv("DORIS_HOME");
646
15
    std::string default_url = std::string(doris_home) + "/plugins/jdbc_drivers";
647
15
    std::string default_old_url = std::string(doris_home) + "/jdbc_drivers";
648
649
15
    if (config::jdbc_drivers_dir == default_url) {
650
        // If true, which means user does not set `jdbc_drivers_dir` and use the default one.
651
        // Because in 2.1.8, we change the default value of `jdbc_drivers_dir`
652
        // from `DORIS_HOME/jdbc_drivers` to `DORIS_HOME/plugins/jdbc_drivers`,
653
        // so we need to check the old default dir for compatibility.
654
7
        std::string target_path = default_url + "/" + url;
655
7
        std::string old_target_path = default_old_url + "/" + url;
656
7
        if (std::filesystem::exists(target_path)) {
657
            // File exists in new default directory
658
4
            *result_url = "file://" + target_path;
659
4
            return Status::OK();
660
4
        } else if (std::filesystem::exists(old_target_path)) {
661
            // File exists in old default directory
662
3
            *result_url = "file://" + old_target_path;
663
3
            return Status::OK();
664
3
        } else if (config::is_cloud_mode()) {
665
            // Cloud mode: try to download from cloud to new default directory
666
0
            std::string downloaded_path;
667
0
            Status status = CloudPluginDownloader::download_from_cloud(
668
0
                    CloudPluginDownloader::PluginType::JDBC_DRIVERS, url, target_path,
669
0
                    &downloaded_path);
670
0
            if (status.ok() && !downloaded_path.empty()) {
671
0
                *result_url = "file://" + downloaded_path;
672
0
                return Status::OK();
673
0
            }
674
            // Download failed, log warning but continue to fallback
675
0
            LOG(WARNING) << "Failed to download JDBC driver from cloud: " << status.to_string()
676
0
                         << ", fallback to old directory";
677
0
        } else {
678
0
            return Status::InternalError("JDBC driver file does not exist: " + url);
679
0
        }
680
8
    } else {
681
        // User specified custom directory - use directly
682
8
        *result_url = "file://" + config::jdbc_drivers_dir + "/" + url;
683
8
    }
684
8
    return Status::OK();
685
15
}
686
#include "common/compile_check_end.h"
687
} // namespace doris