Coverage Report

Created: 2026-03-17 00:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/jdbc_scanner.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "jdbc_scanner.h"
19
20
#include <new>
21
#include <ostream>
22
#include <utility>
23
#include <vector>
24
25
#include "common/logging.h"
26
#include "core/block/block.h"
27
#include "exprs/vexpr_context.h"
28
#include "format/table/jdbc_jni_reader.h"
29
#include "runtime/descriptors.h"
30
#include "runtime/runtime_profile.h"
31
#include "runtime/runtime_state.h"
32
#include "util/jdbc_utils.h"
33
34
namespace doris {
35
36
JdbcScanner::JdbcScanner(RuntimeState* state, doris::JDBCScanLocalState* local_state, int64_t limit,
37
                         const TupleId& tuple_id, const std::string& query_string,
38
                         TOdbcTableType::type table_type, bool is_tvf, RuntimeProfile* profile)
39
0
        : Scanner(state, local_state, limit, profile),
40
0
          _jdbc_eos(false),
41
0
          _tuple_id(tuple_id),
42
0
          _query_string(query_string),
43
0
          _tuple_desc(nullptr),
44
0
          _table_type(table_type),
45
0
          _is_tvf(is_tvf) {
46
0
    _has_prepared = false;
47
0
}
48
49
std::map<std::string, std::string> JdbcScanner::_build_jdbc_params(
50
0
        const TupleDescriptor* tuple_desc) {
51
0
    const JdbcTableDescriptor* jdbc_table =
52
0
            static_cast<const JdbcTableDescriptor*>(tuple_desc->table_desc());
53
54
0
    std::map<std::string, std::string> params;
55
0
    params["jdbc_url"] = jdbc_table->jdbc_url();
56
0
    params["jdbc_user"] = jdbc_table->jdbc_user();
57
0
    params["jdbc_password"] = jdbc_table->jdbc_passwd();
58
0
    params["jdbc_driver_class"] = jdbc_table->jdbc_driver_class();
59
    // Resolve jdbc_driver_url to absolute file:// URL
60
    // FE sends just the JAR filename; we need to resolve it to a full path.
61
0
    std::string driver_url;
62
0
    auto resolve_st = JdbcUtils::resolve_driver_url(jdbc_table->jdbc_driver_url(), &driver_url);
63
0
    if (!resolve_st.ok()) {
64
0
        LOG(WARNING) << "Failed to resolve JDBC driver URL: " << resolve_st.to_string();
65
0
        driver_url = jdbc_table->jdbc_driver_url();
66
0
    }
67
0
    params["jdbc_driver_url"] = driver_url;
68
0
    params["jdbc_driver_checksum"] = jdbc_table->jdbc_driver_checksum();
69
0
    params["query_sql"] = _query_string;
70
0
    params["catalog_id"] = std::to_string(jdbc_table->jdbc_catalog_id());
71
0
    params["table_type"] = _odbc_table_type_to_string(_table_type);
72
0
    params["connection_pool_min_size"] = std::to_string(jdbc_table->connection_pool_min_size());
73
0
    params["connection_pool_max_size"] = std::to_string(jdbc_table->connection_pool_max_size());
74
0
    params["connection_pool_max_wait_time"] =
75
0
            std::to_string(jdbc_table->connection_pool_max_wait_time());
76
0
    params["connection_pool_max_life_time"] =
77
0
            std::to_string(jdbc_table->connection_pool_max_life_time());
78
0
    params["connection_pool_keep_alive"] =
79
0
            jdbc_table->connection_pool_keep_alive() ? "true" : "false";
80
0
    return params;
81
0
}
82
83
0
Status JdbcScanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) {
84
0
    VLOG_CRITICAL << "JdbcScanner::init";
85
0
    RETURN_IF_ERROR(Scanner::init(state, conjuncts));
86
87
0
    if (state == nullptr) {
88
0
        return Status::InternalError("input pointer is NULL of JdbcScanner::init.");
89
0
    }
90
91
    // get tuple desc
92
0
    _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
93
0
    if (_tuple_desc == nullptr) {
94
0
        return Status::InternalError("Failed to get tuple descriptor.");
95
0
    }
96
97
    // get jdbc table info
98
0
    const JdbcTableDescriptor* jdbc_table =
99
0
            static_cast<const JdbcTableDescriptor*>(_tuple_desc->table_desc());
100
0
    if (jdbc_table == nullptr) {
101
0
        return Status::InternalError("jdbc table pointer is NULL of JdbcScanner::init.");
102
0
    }
103
104
0
    _local_state->scanner_profile()->add_info_string("JdbcDriverClass",
105
0
                                                     jdbc_table->jdbc_driver_class());
106
0
    _local_state->scanner_profile()->add_info_string("JdbcDriverUrl",
107
0
                                                     jdbc_table->jdbc_driver_url());
108
0
    _local_state->scanner_profile()->add_info_string("JdbcUrl", jdbc_table->jdbc_url());
109
0
    _local_state->scanner_profile()->add_info_string("QuerySql", _query_string);
110
111
    // Build reader params from tuple descriptor
112
0
    auto jdbc_params = _build_jdbc_params(_tuple_desc);
113
114
    // Pass _tuple_desc->slots() directly. JniReader stores _file_slot_descs as a reference,
115
    // so we must pass a vector whose lifetime outlives the reader (i.e., _tuple_desc->slots()).
116
    // Previously a local vector was passed, causing a dangling reference after init() returned.
117
0
    _jni_reader = JdbcJniReader::create_unique(_tuple_desc->slots(), state, _profile, jdbc_params);
118
119
0
    return Status::OK();
120
0
}
121
122
0
Status JdbcScanner::_open_impl(RuntimeState* state) {
123
0
    VLOG_CRITICAL << "JdbcScanner::open";
124
0
    if (state == nullptr) {
125
0
        return Status::InternalError("input pointer is NULL of JdbcScanner::open.");
126
0
    }
127
128
0
    if (!_has_prepared) {
129
0
        return Status::InternalError("used before initialize of JdbcScanner::open.");
130
0
    }
131
0
    RETURN_IF_CANCELLED(state);
132
0
    RETURN_IF_ERROR(Scanner::_open_impl(state));
133
0
    RETURN_IF_ERROR(_jni_reader->init_reader());
134
0
    return Status::OK();
135
0
}
136
137
0
Status JdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
138
0
    VLOG_CRITICAL << "JdbcScanner::_get_block_impl";
139
0
    if (nullptr == state || nullptr == block || nullptr == eof) {
140
0
        return Status::InternalError("input is NULL pointer");
141
0
    }
142
143
0
    if (!_has_prepared) {
144
0
        return Status::InternalError("used before initialize of JdbcScanner::get_next.");
145
0
    }
146
147
0
    if (_jdbc_eos) {
148
0
        *eof = true;
149
0
        return Status::OK();
150
0
    }
151
152
    // only empty block should be here
153
0
    DCHECK(block->rows() == 0);
154
155
0
    do {
156
0
        RETURN_IF_CANCELLED(state);
157
158
0
        size_t read_rows = 0;
159
0
        bool reader_eof = false;
160
0
        RETURN_IF_ERROR(_jni_reader->get_next_block(block, &read_rows, &reader_eof));
161
162
0
        if (reader_eof) {
163
0
            _jdbc_eos = true;
164
0
            if (block->rows() == 0) {
165
0
                *eof = true;
166
0
            }
167
0
            break;
168
0
        }
169
170
0
        VLOG_ROW << "JdbcScanner output rows: " << block->rows();
171
0
    } while (block->rows() == 0 && !(*eof));
172
0
    return Status::OK();
173
0
}
174
175
0
Status JdbcScanner::close(RuntimeState* state) {
176
0
    if (!_try_close()) {
177
0
        return Status::OK();
178
0
    }
179
0
    RETURN_IF_ERROR(Scanner::close(state));
180
0
    if (_jni_reader) {
181
0
        RETURN_IF_ERROR(_jni_reader->close());
182
0
    }
183
0
    return Status::OK();
184
0
}
185
} // namespace doris