JdbcJniScanner.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.jdbc;
import org.apache.doris.cloud.security.SecurityChecker;
import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.ColumnValueConverter;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.lang.reflect.Array;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* JdbcJniScanner reads data from JDBC sources via the unified JniScanner framework.
* It extends JniScanner to integrate with the JniConnector/JniReader system on the C++ side,
* following the same pattern as PaimonJniScanner, HudiJniScanner, etc.
*
* <p>This class uses the {@link JdbcTypeHandler} strategy pattern for database-specific
* type handling. The appropriate handler is selected via {@link JdbcTypeHandlerFactory}
* based on the "table_type" parameter.
*
* <p>Parameters (passed via constructor params map):
* <ul>
* <li>jdbc_url - JDBC connection URL</li>
* <li>jdbc_user - database user</li>
* <li>jdbc_password - database password</li>
* <li>jdbc_driver_class - JDBC driver class name</li>
* <li>jdbc_driver_url - path to driver JAR</li>
* <li>query_sql - the SELECT SQL to execute</li>
* <li>catalog_id - catalog ID for connection pool keying</li>
* <li>table_type - database type (MYSQL, ORACLE, POSTGRESQL, etc.)</li>
* <li>connection_pool_min_size - min connection pool size</li>
* <li>connection_pool_max_size - max connection pool size</li>
* <li>connection_pool_max_wait_time - max wait time (ms)</li>
* <li>connection_pool_max_life_time - max lifetime (ms)</li>
* <li>connection_pool_keep_alive - "true"/"false"</li>
* <li>required_fields - comma-separated output field names</li>
* <li>columns_types - #-separated column type strings</li>
* </ul>
*/
public class JdbcJniScanner extends JniScanner {
private static final Logger LOG = Logger.getLogger(JdbcJniScanner.class);
private final String jdbcUrl;
private final String jdbcUser;
private final String jdbcPassword;
private final String jdbcDriverClass;
private final String jdbcDriverUrl;
private final String querySql;
private final long catalogId;
private final int connectionPoolMinSize;
private final int connectionPoolMaxSize;
private final int connectionPoolMaxWaitTime;
private final int connectionPoolMaxLifeTime;
private final boolean connectionPoolKeepAlive;
// Database-specific type handling strategy
private final JdbcTypeHandler typeHandler;
// Per-column output converters, initialized once per scan
private ColumnValueConverter[] outputConverters;
private HikariDataSource hikariDataSource = null;
private Connection conn = null;
private PreparedStatement stmt = null;
private ResultSet resultSet = null;
private ResultSetMetaData resultSetMetaData = null;
private ClassLoader classLoader = null;
// Read state
private boolean resultSetOpened = false;
private List<Object[]> block = null;
// Per-column replace strings for special type handling (bitmap, hll)
private String[] replaceStringList;
// Mapping from field index (in types[]/fields[]) to JDBC ResultSet column index (1-based).
// The slot descriptor order may differ from the query SQL column order.
private int[] columnIndexMapping;
// Statistics
private long readRows = 0;
private long readTime = 0;
public JdbcJniScanner(int batchSize, Map<String, String> params) {
this.jdbcUrl = params.getOrDefault("jdbc_url", "");
this.jdbcUser = params.getOrDefault("jdbc_user", "");
this.jdbcPassword = params.getOrDefault("jdbc_password", "");
this.jdbcDriverClass = params.getOrDefault("jdbc_driver_class", "");
this.jdbcDriverUrl = params.getOrDefault("jdbc_driver_url", "");
this.querySql = params.getOrDefault("query_sql", "");
this.catalogId = Long.parseLong(params.getOrDefault("catalog_id", "0"));
this.connectionPoolMinSize = Integer.parseInt(
params.getOrDefault("connection_pool_min_size", "1"));
this.connectionPoolMaxSize = Integer.parseInt(
params.getOrDefault("connection_pool_max_size", "10"));
this.connectionPoolMaxWaitTime = Integer.parseInt(
params.getOrDefault("connection_pool_max_wait_time", "5000"));
this.connectionPoolMaxLifeTime = Integer.parseInt(
params.getOrDefault("connection_pool_max_life_time", "1800000"));
this.connectionPoolKeepAlive = "true".equalsIgnoreCase(
params.getOrDefault("connection_pool_keep_alive", "false"));
// Select database-specific type handler
String tableType = params.getOrDefault("table_type", "");
this.typeHandler = JdbcTypeHandlerFactory.create(tableType);
String requiredFields = params.getOrDefault("required_fields", "");
String columnsTypes = params.getOrDefault("columns_types", "");
String[] fieldArr = requiredFields.isEmpty() ? new String[0] : requiredFields.split(",");
ColumnType[] typeArr;
if (columnsTypes.isEmpty()) {
typeArr = new ColumnType[0];
} else {
String[] typeStrs = columnsTypes.split("#");
typeArr = new ColumnType[typeStrs.length];
for (int i = 0; i < typeStrs.length; i++) {
typeArr[i] = ColumnType.parseType(fieldArr[i], typeStrs[i]);
}
}
initTableInfo(typeArr, fieldArr, batchSize);
// Parse replace_string for special type handling (bitmap, hll, etc.)
String replaceString = params.getOrDefault("replace_string", "");
if (!replaceString.isEmpty()) {
replaceStringList = replaceString.split(",");
} else {
replaceStringList = new String[fieldArr.length];
for (int i = 0; i < fieldArr.length; i++) {
replaceStringList[i] = "not_replace";
}
}
}
@Override
public void open() throws IOException {
ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
try {
// HikariCP's setDriverClassName() uses the thread context classloader
// to load the driver class.
initializeClassLoaderAndDataSource();
conn = hikariDataSource.getConnection();
// Use type handler to create the statement with database-specific settings
stmt = typeHandler.initializeStatement(conn, querySql, batchSize);
LOG.info("JdbcJniScanner: Executing query: " + querySql);
resultSet = stmt.executeQuery();
resultSetMetaData = resultSet.getMetaData();
resultSetOpened = true;
// Build column name -> JDBC ResultSet index mapping.
// The slot descriptors (fields[]) may be in a different order than the
// query SQL columns. We must map by name to avoid reading data with
// the wrong type handler.
columnIndexMapping = new int[fields.length];
int rsColumnCount = resultSetMetaData.getColumnCount();
Map<String, Integer> rsColumnMap = new HashMap<>(rsColumnCount);
for (int i = 1; i <= rsColumnCount; i++) {
String colName = resultSetMetaData.getColumnLabel(i).toLowerCase();
rsColumnMap.put(colName, i);
}
for (int i = 0; i < fields.length; i++) {
String fieldName = fields[i].toLowerCase();
Integer rsIdx = rsColumnMap.get(fieldName);
if (rsIdx != null) {
columnIndexMapping[i] = rsIdx;
} else {
// Fallback to positional mapping if name not found
columnIndexMapping[i] = i + 1;
LOG.warn("Column '" + fields[i] + "' not found in ResultSet by name, "
+ "falling back to positional index " + (i + 1));
}
}
block = new ArrayList<>(types.length);
// Initialize per-column output converters once
outputConverters = new ColumnValueConverter[types.length];
for (int i = 0; i < types.length; i++) {
String replaceStr = (replaceStringList != null && i < replaceStringList.length)
? replaceStringList[i] : "not_replace";
outputConverters[i] = typeHandler.getOutputConverter(types[i], replaceStr);
}
} catch (Exception e) {
LOG.warn("JdbcJniScanner " + jdbcUrl + " open failed: " + e.getMessage(), e);
throw new IOException("JdbcJniScanner open failed: " + e.getMessage(), e);
} finally {
Thread.currentThread().setContextClassLoader(oldClassLoader);
}
}
@Override
protected int getNext() throws IOException {
try {
if (!resultSetOpened || resultSet == null) {
return 0;
}
long startRead = System.nanoTime();
// Initialize block arrays for this batch
block.clear();
for (int i = 0; i < types.length; i++) {
String replaceStr = (replaceStringList != null && i < replaceStringList.length)
? replaceStringList[i] : "not_replace";
if ("bitmap".equals(replaceStr) || "hll".equals(replaceStr)
|| "quantile_state".equals(replaceStr)) {
// bitmap/hll/quantile_state columns: use byte[][] for raw binary data
block.add(new byte[batchSize][]);
} else if (outputConverters[i] != null) {
// When a converter exists, the raw value from getColumnValue() may have a
// different type than the final column type. For example, MySQL returns ARRAY
// columns as JSON Strings, but newObjectContainerArray() creates ArrayList[].
// Use Object[] to avoid ArrayStoreException; the converter will produce
// the correctly typed array before data is written to VectorTable.
block.add(new Object[batchSize]);
} else {
block.add(vectorTable.getColumn(i).newObjectContainerArray(batchSize));
}
}
int curRows = 0;
while (curRows < batchSize) {
if (!resultSet.next()) {
break;
}
for (int col = 0; col < types.length; col++) {
int columnIndex = columnIndexMapping[col];
String replaceStr = (replaceStringList != null && col < replaceStringList.length)
? replaceStringList[col] : "not_replace";
if ("bitmap".equals(replaceStr) || "hll".equals(replaceStr)
|| "quantile_state".equals(replaceStr)) {
// bitmap/hll: read raw bytes directly
byte[] data = resultSet.getBytes(columnIndex);
block.get(col)[curRows] = resultSet.wasNull() ? null : data;
} else {
// Use type handler for database-specific value extraction
Object value = typeHandler.getColumnValue(
resultSet, columnIndex, types[col], resultSetMetaData);
block.get(col)[curRows] = value;
}
}
curRows++;
}
if (curRows > 0) {
for (int col = 0; col < types.length; col++) {
Object[] columnData = block.get(col);
if (curRows < batchSize) {
// Trim to actual size
Class<?> componentType = columnData.getClass().getComponentType();
Object[] trimmed = (Object[]) Array.newInstance(componentType, curRows);
System.arraycopy(columnData, 0, trimmed, 0, curRows);
columnData = trimmed;
}
// Apply column-level output converter if present
if (outputConverters[col] != null) {
columnData = outputConverters[col].convert(columnData);
}
vectorTable.appendData(col, columnData, null, true);
}
}
readTime += System.nanoTime() - startRead;
readRows += curRows;
return curRows;
} catch (Exception e) {
LOG.warn("JdbcJniScanner getNext failed: " + e.getMessage(), e);
throw new IOException("JdbcJniScanner getNext failed: " + e.getMessage(), e);
}
}
@Override
public void close() throws IOException {
try {
// Use type handler for database-specific connection abort
if (conn != null && resultSet != null) {
typeHandler.abortReadConnection(conn, resultSet);
}
} catch (Exception e) {
LOG.warn("JdbcJniScanner abort connection error: " + e.getMessage(), e);
}
try {
if (resultSet != null && !resultSet.isClosed()) {
resultSet.close();
}
if (stmt != null && !stmt.isClosed()) {
stmt.close();
}
if (conn != null && !conn.isClosed()) {
conn.close();
}
} catch (Exception e) {
LOG.warn("JdbcJniScanner close error: " + e.getMessage(), e);
} finally {
resultSet = null;
stmt = null;
conn = null;
if (connectionPoolMinSize == 0 && hikariDataSource != null) {
hikariDataSource.close();
JdbcDataSource.getDataSource().getSourcesMap().remove(createCacheKey());
hikariDataSource = null;
}
}
}
@Override
public Map<String, String> getStatistics() {
Map<String, String> stats = new HashMap<>();
stats.put("counter:ReadRows", String.valueOf(readRows));
stats.put("timer:ReadTime", String.valueOf(readTime));
return stats;
}
private void initializeClassLoaderAndDataSource() throws Exception {
java.net.URL[] urls = {new java.net.URL(jdbcDriverUrl)};
ClassLoader parent = getClass().getClassLoader();
this.classLoader = java.net.URLClassLoader.newInstance(urls, parent);
// Must set thread context classloader BEFORE creating HikariDataSource,
// because HikariCP's setDriverClassName() loads the driver class from
// the thread context classloader.
Thread.currentThread().setContextClassLoader(classLoader);
String cacheKey = createCacheKey();
hikariDataSource = JdbcDataSource.getDataSource().getSource(cacheKey);
if (hikariDataSource == null) {
synchronized (JdbcJniScanner.class) {
hikariDataSource = JdbcDataSource.getDataSource().getSource(cacheKey);
if (hikariDataSource == null) {
HikariDataSource ds = new HikariDataSource();
ds.setDriverClassName(jdbcDriverClass);
ds.setJdbcUrl(SecurityChecker.getInstance().getSafeJdbcUrl(jdbcUrl));
ds.setUsername(jdbcUser);
ds.setPassword(jdbcPassword);
ds.setMinimumIdle(connectionPoolMinSize);
ds.setMaximumPoolSize(connectionPoolMaxSize);
ds.setConnectionTimeout(connectionPoolMaxWaitTime);
ds.setMaxLifetime(connectionPoolMaxLifeTime);
ds.setIdleTimeout(connectionPoolMaxLifeTime / 2L);
// Use type handler for database-specific validation query
typeHandler.setValidationQuery(ds);
if (connectionPoolKeepAlive) {
ds.setKeepaliveTime(connectionPoolMaxLifeTime / 5L);
}
hikariDataSource = ds;
JdbcDataSource.getDataSource().putSource(cacheKey, hikariDataSource);
LOG.info("JdbcJniScanner: Created connection pool for " + jdbcUrl);
}
}
}
}
private String createCacheKey() {
return JdbcDataSource.createCacheKey(catalogId, jdbcUrl, jdbcUser, jdbcPassword,
jdbcDriverUrl, jdbcDriverClass, connectionPoolMinSize, connectionPoolMaxSize,
connectionPoolMaxLifeTime, connectionPoolMaxWaitTime, connectionPoolKeepAlive);
}
}