PostgresResourceValidator.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.extensions.insert.streaming;
import org.apache.doris.datasource.jdbc.client.JdbcClient;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
import org.apache.doris.job.common.DataSourceType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.util.StreamingJobUtils;
import org.apache.commons.lang3.StringUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Fail-fast validation of PostgreSQL replication slot and publication at CREATE JOB time,
* before the CDC client connects. Catches mistakes (user-provided slot/publication missing,
* conflicting jobId) with actionable errors. Validation runs only on initial create; restarts
* skip this path by design, so an active slot held by the previous BE does not self-conflict.
*/
public class PostgresResourceValidator {
public static void validate(Map<String, String> sourceProperties, String jobId, List<String> tableNames)
throws JobException {
String slotName = resolveSlotName(sourceProperties, jobId);
String publicationName = resolvePublicationName(sourceProperties, jobId);
// Pattern-match ownership: name equals the default = Doris-owned (auto); otherwise user.
String defaultSlot = DataSourceConfigKeys.defaultSlotName(jobId);
String defaultPub = DataSourceConfigKeys.defaultPublicationName(jobId);
boolean slotUserProvided = !defaultSlot.equals(slotName);
boolean pubUserProvided = !defaultPub.equals(publicationName);
String pgSchema = sourceProperties.get(DataSourceConfigKeys.SCHEMA);
List<String> qualifiedTables = new ArrayList<>();
for (String name : tableNames) {
qualifiedTables.add(pgSchema + "." + name);
}
JdbcClient jdbcClient = StreamingJobUtils.getJdbcClient(DataSourceType.POSTGRES, sourceProperties);
try (Connection conn = jdbcClient.getConnection()) {
boolean pubExists = publicationExists(conn, publicationName);
if (!pubExists && pubUserProvided) {
throw new JobException(
"publication does not exist: " + publicationName
+ ". Create it before starting the job or omit "
+ DataSourceConfigKeys.PUBLICATION_NAME
+ " to let Doris create one.");
}
if (pubExists) {
List<String> missing = findMissingTables(conn, publicationName, qualifiedTables);
if (!missing.isEmpty()) {
if (pubUserProvided) {
throw new JobException(
"publication " + publicationName
+ " is missing required tables: " + missing
+ ". Add them via ALTER PUBLICATION ... ADD TABLE before starting.");
} else {
throw new JobException(
"publication " + publicationName
+ " already exists but does not cover the configured"
+ " include_tables (missing: " + missing
+ "). Another Doris cluster may be using the same jobId."
+ " Please set " + DataSourceConfigKeys.PUBLICATION_NAME
+ " explicitly to avoid the conflict.");
}
}
}
Boolean slotActive = queryReplicationSlotActive(conn, slotName);
if (slotUserProvided && slotActive == null) {
throw new JobException(
"replication slot does not exist: " + slotName
+ ". Create it before starting the job or omit "
+ DataSourceConfigKeys.SLOT_NAME
+ " to let Doris create one.");
}
if (!slotUserProvided && Boolean.TRUE.equals(slotActive)) {
throw new JobException(
"replication slot " + slotName
+ " is active, held by another consumer. Another Doris"
+ " cluster may be using the same jobId. Please set "
+ DataSourceConfigKeys.SLOT_NAME
+ " explicitly to avoid the conflict.");
}
} catch (JobException e) {
throw e;
} catch (Exception e) {
throw new JobException(
"Failed to validate PG resources for publication " + publicationName
+ ": " + e.getMessage(), e);
} finally {
jdbcClient.closeClient();
}
}
private static String resolveSlotName(Map<String, String> config, String jobId) {
String name = config.get(DataSourceConfigKeys.SLOT_NAME);
return StringUtils.isNotBlank(name) ? name : DataSourceConfigKeys.defaultSlotName(jobId);
}
private static String resolvePublicationName(Map<String, String> config, String jobId) {
String name = config.get(DataSourceConfigKeys.PUBLICATION_NAME);
return StringUtils.isNotBlank(name) ? name : DataSourceConfigKeys.defaultPublicationName(jobId);
}
private static boolean publicationExists(Connection conn, String publicationName) throws Exception {
try (PreparedStatement ps = conn.prepareStatement("SELECT 1 FROM pg_publication WHERE pubname = ?")) {
ps.setString(1, publicationName);
try (ResultSet rs = ps.executeQuery()) {
return rs.next();
}
}
}
private static List<String> findMissingTables(Connection conn, String publicationName, List<String> tables)
throws Exception {
Set<String> covered = new HashSet<>();
try (PreparedStatement ps = conn.prepareStatement(
"SELECT schemaname, tablename FROM pg_publication_tables WHERE pubname = ?")) {
ps.setString(1, publicationName);
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
covered.add(rs.getString(1) + "." + rs.getString(2));
}
}
}
List<String> missing = new ArrayList<>();
for (String table : tables) {
if (!covered.contains(table)) {
missing.add(table);
}
}
return missing;
}
/** Returns the slot's active flag, or null when the slot does not exist. */
private static Boolean queryReplicationSlotActive(Connection conn, String slotName) throws Exception {
try (PreparedStatement ps = conn.prepareStatement(
"SELECT active FROM pg_replication_slots WHERE slot_name = ?")) {
ps.setString(1, slotName);
try (ResultSet rs = ps.executeQuery()) {
if (!rs.next()) {
return null;
}
return rs.getBoolean(1);
}
}
}
}