PluginDrivenMvccExternalTable.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource;
import org.apache.doris.analysis.PartitionValue;
import org.apache.doris.analysis.TableScanParams;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.connector.api.Connector;
import org.apache.doris.connector.api.ConnectorMetadata;
import org.apache.doris.connector.api.ConnectorPartitionInfo;
import org.apache.doris.connector.api.ConnectorSession;
import org.apache.doris.connector.api.ConnectorTableSchema;
import org.apache.doris.connector.api.handle.ConnectorTableHandle;
import org.apache.doris.connector.api.mvcc.ConnectorMvccSnapshot;
import org.apache.doris.connector.api.mvcc.ConnectorTimeTravelSpec;
import org.apache.doris.datasource.hive.HiveUtil;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.datasource.mvcc.MvccTable;
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.mtmv.MTMVBaseTableIf;
import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVSnapshotIdSnapshot;
import org.apache.doris.mtmv.MTMVSnapshotIf;
import org.apache.doris.mtmv.MTMVTimestampSnapshot;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* Generic MVCC/MTMV-capable {@link PluginDrivenExternalTable} for connectors that expose a
* point-in-time snapshot (e.g. Paimon, Iceberg, Hudi). All behavior is source-agnostic and driven
* through the connector SPI; the data-source-specific rendering of partition names/dates happens in
* the connector, so this class never parses raw values or imports any data-source library.
*
* <p>Selected by a capability factory and wired into the scan node in a later dispatch; until then
* it has no production caller and is exercised only by direct-construction unit tests.</p>
*
* <p><b>MVCC/MTMV contract:</b> a connector that advertises this capability MUST supply a real
* per-partition {@code lastModifiedMillis}. An {@link ConnectorPartitionInfo#UNKNOWN}(-1) is not a
* valid timestamp: it pins {@code MTMVTimestampSnapshot(-1)} in {@link #getPartitionSnapshot}, which
* degrades MTMV to conservative over-refresh (the partition never matches its prior snapshot).</p>
*/
public class PluginDrivenMvccExternalTable extends PluginDrivenExternalTable
implements MTMVRelatedTableIf, MTMVBaseTableIf, MvccTable {
private static final Logger LOG = LogManager.getLogger(PluginDrivenMvccExternalTable.class);
/** Matches an all-digits string (epoch millis / snapshot id). Parity with {@code PaimonUtil.isDigitalString}. */
private static final Pattern DIGITAL_REGEX = Pattern.compile("\\d+");
/** No-arg constructor for GSON deserialization. */
public PluginDrivenMvccExternalTable() {
super();
}
public PluginDrivenMvccExternalTable(long id, String name, String remoteName,
ExternalCatalog catalog, ExternalDatabase db) {
super(id, name, remoteName, catalog, db);
}
// ──────────────────── snapshot materialization ────────────────────
/**
* Returns the pinned snapshot if the caller supplied one (read the PIN, do NOT re-list), else
* materializes the LATEST snapshot from the connector. The query-begin pin path goes through
* {@link #loadSnapshot} which calls {@link #materializeLatest()} once; subsequent accessors
* receive that pin here and never re-query the connector (single-pin invariant).
*/
private PluginDrivenMvccSnapshot getOrMaterialize(Optional<MvccSnapshot> snapshot) {
if (snapshot.isPresent()) {
return (PluginDrivenMvccSnapshot) snapshot.get();
}
return materializeLatest();
}
/**
* Lists the partition set at LATEST and pins the connector snapshot. The per-partition build is
* delegated to {@link #listLatestPartitions} (shared with the @incr path, which legacy also reads
* at LATEST).
*/
private PluginDrivenMvccSnapshot materializeLatest() {
makeSureInitialized();
PluginDrivenExternalCatalog pluginCatalog = (PluginDrivenExternalCatalog) catalog;
Connector connector = pluginCatalog.getConnector();
ConnectorSession session = pluginCatalog.buildConnectorSession();
ConnectorMetadata metadata = connector.getMetadata(session);
Optional<ConnectorTableHandle> handleOpt = resolveConnectorTableHandle(session, metadata);
if (!handleOpt.isPresent()) {
// No handle (e.g. table dropped): still return a valid empty pin so callers degrade to
// UNPARTITIONED / snapshot id -1 instead of NPE-ing.
return new PluginDrivenMvccSnapshot(emptySnapshot(),
Collections.emptyMap(), Collections.emptyMap());
}
ConnectorTableHandle handle = handleOpt.get();
// An empty (no-snapshot) connector still pins: fall back to a snapshot id of -1.
ConnectorMvccSnapshot connectorSnapshot =
metadata.beginQuerySnapshot(session, handle).orElseGet(this::emptySnapshot);
Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMap();
Map<String, Long> nameToLastModifiedMillis = Maps.newHashMap();
listLatestPartitions(metadata, session, handle, nameToPartitionItem, nameToLastModifiedMillis);
return new PluginDrivenMvccSnapshot(connectorSnapshot, nameToPartitionItem,
nameToLastModifiedMillis);
}
/**
* Lists the partition set at LATEST into the two supplied maps (rendered name -> built
* {@link PartitionItem} / -> last-modified epoch millis). Mirrors legacy
* {@code PaimonUtil.generatePartitionInfo}: per-partition build is wrapped in try/catch so a single
* un-parseable name is logged and skipped (leaving the listed-name set larger than the built-item
* set, which {@link PluginDrivenMvccSnapshot#isPartitionInvalid} then treats as UNPARTITIONED)
* rather than failing the whole query.
*/
private void listLatestPartitions(ConnectorMetadata metadata, ConnectorSession session,
ConnectorTableHandle handle, Map<String, PartitionItem> nameToPartitionItem,
Map<String, Long> nameToLastModifiedMillis) {
List<Column> partitionColumns = getPartitionColumns();
List<Type> types = partitionColumns.stream().map(Column::getType).collect(Collectors.toList());
List<ConnectorPartitionInfo> parts = metadata.listPartitions(session, handle, Optional.empty());
for (ConnectorPartitionInfo part : parts) {
String partitionName = part.getPartitionName();
nameToLastModifiedMillis.put(partitionName, part.getLastModifiedMillis());
try {
// The connector already renders values (incl. dates) into getPartitionName(), so
// building from the rendered name is byte-parity with legacy. Partition values may be
// malformed; catch to avoid affecting the query (parity generatePartitionInfo).
nameToPartitionItem.put(partitionName, toListPartitionItem(partitionName, types));
} catch (Exception e) {
LOG.warn("toListPartitionItem failed, partitionColumns: {}, partitionName: {}",
partitionColumns, partitionName, e);
}
}
}
private ConnectorMvccSnapshot emptySnapshot() {
return ConnectorMvccSnapshot.builder().snapshotId(-1L).build();
}
/**
* Builds a {@link ListPartitionItem} from a RENDERED partition name (e.g. {@code "dt=2024-01-01"}).
* Copied verbatim from legacy {@code PaimonUtil.toListPartitionItem}; it is source-agnostic.
*/
private static ListPartitionItem toListPartitionItem(String partitionName, List<Type> types)
throws AnalysisException {
// Partition name will be in format: nation=cn/city=beijing
// parse it to get values "cn" and "beijing"
List<String> partitionValues = HiveUtil.toPartitionValues(partitionName);
Preconditions.checkState(partitionValues.size() == types.size(), partitionName + " vs. " + types);
List<PartitionValue> values = Lists.newArrayListWithExpectedSize(types.size());
for (String partitionValue : partitionValues) {
values.add(new PartitionValue(partitionValue, false));
}
PartitionKey key = PartitionKey.createListPartitionKeyWithTypes(values, types, true);
return new ListPartitionItem(Lists.newArrayList(key));
}
// ──────────────────── MvccTable ────────────────────
@Override
public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot, Optional<TableScanParams> scanParams) {
if (!tableSnapshot.isPresent() && !scanParams.isPresent()) {
// B5a implicit query-begin (latest) pin.
return materializeLatest();
}
// Mutual exclusion — parity with legacy PaimonScanNode.getProcessedTable:891-892.
if (tableSnapshot.isPresent() && scanParams.isPresent()) {
throw new RuntimeException("Can not specify scan params and table snapshot at same time.");
}
ConnectorTimeTravelSpec spec = toTimeTravelSpec(tableSnapshot, scanParams);
makeSureInitialized();
PluginDrivenExternalCatalog pluginCatalog = (PluginDrivenExternalCatalog) catalog;
Connector connector = pluginCatalog.getConnector();
ConnectorSession session = pluginCatalog.buildConnectorSession();
ConnectorMetadata metadata = connector.getMetadata(session);
String dbName = db != null ? db.getRemoteName() : "";
String tableName = getRemoteName();
Optional<ConnectorTableHandle> handleOpt = resolveConnectorTableHandle(session, metadata);
if (!handleOpt.isPresent()) {
throw new RuntimeException("can not find table for time travel: " + dbName + "." + tableName);
}
ConnectorTableHandle handle = handleOpt.get();
// The connector owns all provider-specific resolution (snapshot-id lookup, datetime parsing,
// tag/branch resolution, incremental-window validation). It returns empty when the target is
// not found; a DorisConnectorException (TZ-alias / incremental-validation) propagates as-is
// (fail-loud — a degraded result would read wrong rows for a time-travel query).
Optional<ConnectorMvccSnapshot> resolved = metadata.resolveTimeTravel(session, handle, spec);
if (!resolved.isPresent()) {
throw new RuntimeException(notFoundMessage(spec));
}
ConnectorMvccSnapshot connectorSnapshot = resolved.get();
if (spec.getKind() == ConnectorTimeTravelSpec.Kind.INCREMENTAL) {
// @incr is NOT a point-in-time snapshot pin. Legacy PaimonExternalTable.getPaimonSnapshotCacheValue
// falls through (it is neither tag/branch nor FOR VERSION/TIME AS OF) to getLatestSnapshotCacheValue
// — i.e. the LATEST partition view + LATEST schema — and applies the incremental-between window at
// SCAN time. Mirror that: list the LATEST partitions and use the LATEST schema (pinnedSchema == null
// so getSchemaCacheValue() falls back to latest), while carrying the incremental scan options on the
// pin (connectorSnapshot.getProperties()); the scan node's applySnapshot threads them onto the handle.
// Partitions are listed on the BASE handle at latest — the full latest set, identical to the
// normal-read materializeLatest path — NOT a snapshot-pinned handle.
Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMap();
Map<String, Long> nameToLastModifiedMillis = Maps.newHashMap();
listLatestPartitions(metadata, session, handle, nameToPartitionItem, nameToLastModifiedMillis);
return new PluginDrivenMvccSnapshot(connectorSnapshot, nameToPartitionItem,
nameToLastModifiedMillis, null);
}
// Schema-at-snapshot: thread the pin onto the handle FIRST (applySnapshot), so a BRANCH pin
// yields a branch-aware handle whose schemaManager resolves the branch schema; then resolve
// the schema AT the pinned schemaId. For non-branch specs applySnapshot only adds scan options
// that getTableSchema ignores, so passing the pinned handle is harmless. (Apply-before-
// getTableSchema is REQUIRED for branch — using the base handle would resolve the branch
// schemaId against the base table's schemaManager = wrong schema.)
ConnectorTableHandle pinnedHandle = metadata.applySnapshot(session, handle, connectorSnapshot);
ConnectorTableSchema atSchema = metadata.getTableSchema(session, pinnedHandle, connectorSnapshot);
PluginDrivenSchemaCacheValue pinnedSchema =
toSchemaCacheValue(metadata, session, dbName, tableName, atSchema);
// Explicit point-in-time time-travel (snapshot id / tag / timestamp / branch) does NOT list
// partitions (EMPTY partition maps) — parity with legacy PaimonPartitionInfo.EMPTY. The empty
// maps make isPartitionInvalid() == (0!=0) == false, so getPartitionColumns(snapshot) flows
// through super -> the schema-aware getSchemaCacheValue() below -> the pinned schema's partition
// columns. Partition pruning is deferred to the connector's predicate pushdown (the generic scan
// node's resolveRequiredPartitions treats this empty-universe pin as scan-all).
return new PluginDrivenMvccSnapshot(connectorSnapshot,
Collections.emptyMap(), Collections.emptyMap(), pinnedSchema);
}
/**
* Source-agnostic dispatch of the analyzer's {@code FOR VERSION/TIME AS OF} ({@link TableSnapshot})
* or {@code @tag/@branch/@incr} ({@link TableScanParams}) into a {@link ConnectorTimeTravelSpec}.
* Mirrors the legacy {@code PaimonExternalTable.getPaimonSnapshotCacheValue} + {@code PaimonScanNode}
* dispatch: a digital {@code FOR VERSION AS OF} is a snapshot id, a non-digital one is a tag name.
*/
private ConnectorTimeTravelSpec toTimeTravelSpec(Optional<TableSnapshot> ts, Optional<TableScanParams> sp) {
if (ts.isPresent()) {
TableSnapshot snap = ts.get();
String value = snap.getValue();
if (snap.getType() == TableSnapshot.VersionType.TIME) {
return ConnectorTimeTravelSpec.timestamp(value, isDigital(value)); // FOR TIME AS OF
}
// FOR VERSION AS OF: digital -> snapshot id, non-digital -> tag name.
return isDigital(value)
? ConnectorTimeTravelSpec.snapshotId(value)
: ConnectorTimeTravelSpec.tag(value);
}
TableScanParams params = sp.get();
if (params.isTag()) {
return ConnectorTimeTravelSpec.tag(extractBranchOrTagName(params));
}
if (params.isBranch()) {
return ConnectorTimeTravelSpec.branch(extractBranchOrTagName(params));
}
if (params.incrementalRead()) {
return ConnectorTimeTravelSpec.incremental(params.getMapParams());
}
throw new RuntimeException("unsupported scan params: " + params.getParamType());
}
/** Parity: {@code PaimonUtil.isDigitalString}. */
private static boolean isDigital(String value) {
return value != null && DIGITAL_REGEX.matcher(value).matches();
}
/** Parity: {@code PaimonUtil.extractBranchOrTagName} (uses {@code TableScanParams.PARAMS_NAME == "name"}). */
private static String extractBranchOrTagName(TableScanParams params) {
if (!params.getMapParams().isEmpty()) {
if (!params.getMapParams().containsKey(TableScanParams.PARAMS_NAME)) {
throw new IllegalArgumentException("must contain key 'name' in params");
}
return params.getMapParams().get(TableScanParams.PARAMS_NAME);
}
if (params.getListParams().isEmpty() || params.getListParams().get(0) == null) {
throw new IllegalArgumentException("must contain a branch/tag name in params");
}
return params.getListParams().get(0);
}
/** Translates a {@code resolveTimeTravel}-returned empty into a kind-specific user error message. */
private static String notFoundMessage(ConnectorTimeTravelSpec spec) {
switch (spec.getKind()) {
case SNAPSHOT_ID:
return "can't find snapshot by id: " + spec.getStringValue(); // parity PaimonUtil:687
case TAG:
return "can't find snapshot by tag: " + spec.getStringValue(); // parity PaimonUtil:694
case BRANCH:
return "can't find branch: " + spec.getStringValue(); // parity PaimonUtil:707
case TIMESTAMP:
// Best-effort: the connector returns empty (it owns the parsed millis + earliest
// snapshot, which fe-core cannot see), so this diverges from legacy's detailed
// "...the earliest snapshot's timestamp is [...]" message in TEXT ONLY (same error
// condition). Documented divergence.
return "can't find snapshot earlier than or equal to time: " + spec.getStringValue();
default:
return "can't resolve time travel: " + spec;
}
}
// ──────────────────── schema (snapshot-aware) ────────────────────
/**
* Returns the schema AS OF the context-pinned snapshot when an explicit time-travel pin carries a
* pinned schema (schema-at-snapshot under schema evolution), else the latest schema. Parity with
* legacy {@code PaimonExternalTable.getSchemaCacheValue}, which returns the schema of the
* context-pinned snapshot.
*/
@Override
public Optional<SchemaCacheValue> getSchemaCacheValue() {
Optional<MvccSnapshot> ctx = MvccUtil.getSnapshotFromContext(this);
if (ctx.isPresent() && ctx.get() instanceof PluginDrivenMvccSnapshot) {
SchemaCacheValue pinned = ((PluginDrivenMvccSnapshot) ctx.get()).getPinnedSchema();
if (pinned != null) {
return Optional.of(pinned); // time-travel: schema AS OF the pinned snapshot
}
}
return getLatestSchemaCacheValue(); // latest (B5a pin has pinnedSchema==null, or no pin)
}
/** Seam for the LATEST (non-pinned) schema; default delegates to the cached super. Overridable in tests. */
protected Optional<SchemaCacheValue> getLatestSchemaCacheValue() {
return super.getSchemaCacheValue();
}
// ──────────────────── partition view (snapshot-aware) ────────────────────
@Override
public Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
return getOrMaterialize(snapshot).getNameToPartitionItem();
}
@Override
public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
return new HashMap<>(getNameToPartitionItems(snapshot));
}
@Override
public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
if (getOrMaterialize(snapshot).isPartitionInvalid()) {
return PartitionType.UNPARTITIONED;
}
return getPartitionColumns(snapshot).size() > 0 ? PartitionType.LIST : PartitionType.UNPARTITIONED;
}
@Override
public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
// Legacy empties the partition columns on an invalid partition set so the table is treated
// as UNPARTITIONED everywhere downstream.
return getOrMaterialize(snapshot).isPartitionInvalid() ? Collections.emptyList() : super.getPartitionColumns();
}
@Override
public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) {
return getPartitionColumns(snapshot).stream()
.map(c -> c.getName().toLowerCase()).collect(Collectors.toSet());
}
// ──────────────────── MTMV snapshots ────────────────────
@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
Optional<MvccSnapshot> snapshot) throws AnalysisException {
Long ts = getOrMaterialize(snapshot).getNameToLastModifiedMillis().get(partitionName);
if (ts == null) {
throw new AnalysisException("can not find partition: " + partitionName);
}
return new MTMVTimestampSnapshot(ts);
}
@Override
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot)
throws AnalysisException {
return getTableSnapshot(snapshot);
}
@Override
public MTMVSnapshotIf getTableSnapshot(Optional<MvccSnapshot> snapshot) throws AnalysisException {
return new MTMVSnapshotIdSnapshot(getOrMaterialize(snapshot).getConnectorSnapshot().getSnapshotId());
}
@Override
public long getNewestUpdateVersionOrTime() {
// Dictionary-update path: always probe LATEST (bypass any context pin), mirroring legacy
// which passes empty/empty to force a fresh listing.
// Skip the UNKNOWN(-1) sentinel (a connector that did not collect a modified time): legacy
// used Paimon's lastFileCreationTime() which has no -1 sentinel, so feeding -1 into max()
// would let the sentinel win on an all-unknown table (returning -1 instead of the legacy 0).
return materializeLatest().getNameToLastModifiedMillis().values().stream()
.mapToLong(Long::longValue).filter(v -> v >= 0).max().orElse(0L);
}
@Override
public boolean isPartitionColumnAllowNull() {
// Returns true so MTMV creation over a snapshot connector is not blocked: a source may write a
// physical "null" partition that does not match Doris' empty-partition semantics (e.g. paimon
// writes both null and the literal 'null' to the 'null' partition). Returning false would
// reject the MV; the connector owns the null-partition semantics, so we allow it. Parity with
// legacy PaimonExternalTable.isPartitionColumnAllowNull.
return true;
}
// ──────────────────── MTMVBaseTableIf ────────────────────
@Override
public void beforeMTMVRefresh(MTMV mtmv) {
// No-op: parity with legacy PaimonExternalTable.beforeMTMVRefresh.
}
}