LineageEventProcessor.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.lineage;
import org.apache.doris.common.Config;
import org.apache.doris.extension.loader.ClassLoadingPolicy;
import org.apache.doris.extension.loader.DirectoryPluginRuntimeManager;
import org.apache.doris.extension.loader.LoadFailure;
import org.apache.doris.extension.loader.LoadReport;
import org.apache.doris.extension.loader.PluginHandle;
import org.apache.doris.extension.spi.PluginContext;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
* Processor that queues lineage events and dispatches them to lineage plugins.
* <p>
* Plugins are discovered via two mechanisms (aligned with
* {@code AuthenticationPluginManager} pattern):
* <ol>
* <li>Built-in: {@link ServiceLoader} on classpath</li>
* <li>External: {@link DirectoryPluginRuntimeManager} from
* {@code $plugin_dir/lineage/} directory</li>
* </ol>
* </p>
*/
public class LineageEventProcessor {
private static final Logger LOG = LogManager.getLogger(LineageEventProcessor.class);
private static final long EVENT_POLL_TIMEOUT_SECONDS = 5L;
/** Parent-first prefixes for child-first classloading isolation. */
private static final List<String> LINEAGE_PARENT_FIRST_PREFIXES =
Collections.singletonList("org.apache.doris.nereids.lineage.");
private final AtomicReference<List<LineagePlugin>> lineagePlugins = new AtomicReference<>(Collections.emptyList());
private final BlockingQueue<LineageInfo> eventQueue =
new LinkedBlockingDeque<>(Config.lineage_event_queue_size);
private final AtomicBoolean isInit = new AtomicBoolean(false);
/** Factory registry by plugin name (like AuthenticationPluginManager.factories). */
private final Map<String, LineagePluginFactory> factories = new ConcurrentHashMap<>();
private final DirectoryPluginRuntimeManager<LineagePluginFactory> runtimeManager =
new DirectoryPluginRuntimeManager<>();
private Thread workerThread;
/**
* Create a lineage event processor.
*/
public LineageEventProcessor() {
}
/**
* Start the background worker thread.
*/
public void start() {
if (!isInit.compareAndSet(false, true)) {
return;
}
discoverPlugins();
workerThread = new Thread(new Worker(), "LineageEventProcessor");
workerThread.setDaemon(true);
workerThread.start();
}
/**
* Discover lineage plugins via dual mechanism:
* 1. ServiceLoader for built-in (classpath) factories
* 2. DirectoryPluginRuntimeManager for external (directory) plugins
*
* <p>Aligned with {@code AuthenticationPluginManager} pattern.
*/
private void discoverPlugins() {
// 1. Built-in discovery (classpath ServiceLoader)
try {
ServiceLoader<LineagePluginFactory> serviceLoader = ServiceLoader.load(LineagePluginFactory.class);
Iterator<LineagePluginFactory> iterator = serviceLoader.iterator();
while (true) {
LineagePluginFactory factory;
try {
if (!iterator.hasNext()) {
break;
}
factory = iterator.next();
} catch (ServiceConfigurationError e) {
LOG.warn("Failed to load built-in lineage plugin factory from ServiceLoader, skip provider", e);
continue;
}
String pluginName = safeFactoryName(factory);
if (pluginName.isEmpty()) {
LOG.warn("Skip built-in lineage plugin factory with empty name: {}",
factory == null ? "null" : factory.getClass().getName());
continue;
}
LineagePluginFactory existing = factories.putIfAbsent(pluginName, factory);
if (existing != null) {
LOG.warn("Skip duplicated built-in lineage plugin name: {}", pluginName);
}
}
} catch (Exception e) {
LOG.warn("Failed to discover built-in lineage plugin factories via ServiceLoader", e);
}
// 2. External discovery (plugin_dir/lineage/ directory)
try {
List<Path> pluginRoots = Collections.singletonList(
Paths.get(Config.plugin_dir, "lineage"));
ClassLoadingPolicy policy = new ClassLoadingPolicy(LINEAGE_PARENT_FIRST_PREFIXES);
LoadReport<LineagePluginFactory> report = runtimeManager.loadAll(
pluginRoots, getClass().getClassLoader(),
LineagePluginFactory.class, policy);
for (LoadFailure failure : report.getFailures()) {
LOG.warn("Skip lineage plugin directory due to load failure: pluginDir={}, stage={}, message={}",
failure.getPluginDir(), failure.getStage(), failure.getMessage(), failure.getCause());
}
for (PluginHandle<LineagePluginFactory> handle : report.getSuccesses()) {
String pluginName = handle.getPluginName();
LineagePluginFactory existing = factories.putIfAbsent(pluginName, handle.getFactory());
if (existing != null) {
LOG.warn("Skip duplicated lineage plugin name: {} from directory {}", pluginName,
handle.getPluginDir());
} else {
LOG.info("Loaded external lineage plugin factory: name={}, pluginDir={}, jarCount={}",
pluginName, handle.getPluginDir(), handle.getResolvedJars().size());
}
}
} catch (Exception e) {
LOG.warn("Failed to discover external lineage plugins from plugin directory", e);
}
// 3. Create and initialize plugin instances from all discovered factories
List<LineagePlugin> plugins = new ArrayList<>();
for (Map.Entry<String, LineagePluginFactory> entry : factories.entrySet()) {
String pluginName = entry.getKey();
try {
Map<String, String> props = new HashMap<>();
props.put("plugin.path", resolvePluginPath(pluginName));
props.put("plugin.name", pluginName);
PluginContext context = new PluginContext(props);
LineagePlugin plugin = entry.getValue().create(context);
if (plugin != null) {
plugin.initialize(context);
plugins.add(plugin);
LOG.info("Loaded lineage plugin: {}, pluginPath={}", pluginName, props.get("plugin.path"));
}
} catch (Exception e) {
LOG.warn("Failed to create/initialize lineage plugin: {}", pluginName, e);
}
}
refreshPlugins(plugins);
}
private String safeFactoryName(LineagePluginFactory factory) {
if (factory == null) {
return "";
}
try {
String name = factory.name();
return name == null ? "" : name.trim();
} catch (Throwable t) {
LOG.warn("Failed to get lineage plugin factory name, skip factory class={}",
factory.getClass().getName(), t);
return "";
}
}
/**
* Resolve plugin path: prefer the directory from DirectoryPluginRuntimeManager,
* fallback to convention path.
*/
private String resolvePluginPath(String pluginName) {
return runtimeManager.get(pluginName)
.map(handle -> handle.getPluginDir().toString())
.orElse(Config.plugin_dir + "/lineage/" + pluginName);
}
/**
* Update the active lineage plugin list.
*/
public void refreshPlugins(List<LineagePlugin> plugins) {
List<LineagePlugin> safePlugins = plugins == null ? Collections.emptyList() : plugins;
lineagePlugins.set(safePlugins);
if (safePlugins.isEmpty()) {
clearPendingEvents();
}
}
/**
* Returns true when at least one loaded plugin is currently willing to receive lineage events.
*/
public boolean hasActivePlugins() {
List<LineagePlugin> plugins = lineagePlugins.get();
if (plugins.isEmpty()) {
return false;
}
for (LineagePlugin plugin : plugins) {
if (plugin == null) {
continue;
}
try {
if (plugin.eventFilter()) {
return true;
}
} catch (Throwable t) {
LOG.warn("Failed to evaluate lineage plugin event filter: {}", plugin.getClass().getName(), t);
}
}
return false;
}
private void clearPendingEvents() {
int dropped = 0;
while (eventQueue.poll() != null) {
dropped++;
}
if (dropped > 0) {
LOG.warn("Lineage event queue cleared because no active plugins. dropped={}", dropped);
}
}
/**
* Submit a lineage event to the processing queue.
*
* @param lineageInfo lineage info to submit
* @return true if accepted, false otherwise
*/
public boolean submitLineageEvent(LineageInfo lineageInfo) {
if (lineageInfo == null) {
return false;
}
try {
if (!eventQueue.offer(lineageInfo)) {
String queryId = getQueryId(lineageInfo);
LOG.warn("the lineage event queue is full with size {}, discard the lineage event: {}",
eventQueue.size(), queryId);
return false;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Lineage event enqueued: queryId={}, queueSize={}",
getQueryId(lineageInfo), eventQueue.size());
}
return true;
} catch (Exception e) {
String queryId = getQueryId(lineageInfo);
LOG.warn("encounter exception when handle lineage event {}, discard the event",
queryId, e);
return false;
}
}
/**
* Worker that polls events and invokes lineage plugins.
*/
public class Worker implements Runnable {
/**
* Run the lineage processing loop.
*/
@Override
public void run() {
LineageInfo lineageInfo;
while (true) {
List<LineagePlugin> currentPlugins = lineagePlugins.get();
try {
lineageInfo = eventQueue.poll(EVENT_POLL_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (lineageInfo == null) {
continue;
}
} catch (InterruptedException e) {
LOG.warn("encounter exception when getting lineage event from queue, ignore", e);
continue;
}
for (LineagePlugin lineagePlugin : currentPlugins) {
try {
if (lineagePlugin == null) {
continue;
}
if (!lineagePlugin.eventFilter()) {
continue;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Lineage plugin start: plugin={}, queryId={}",
lineagePlugin.name(), getQueryId(lineageInfo));
}
lineagePlugin.exec(lineageInfo);
if (LOG.isDebugEnabled()) {
LOG.debug("Lineage plugin end: plugin={}, queryId={}",
lineagePlugin.name(), getQueryId(lineageInfo));
}
} catch (Throwable e) {
LOG.warn("encounter exception when processing lineage event {}, ignore",
getQueryId(lineageInfo), e);
}
}
}
}
}
private static String getQueryId(LineageInfo lineageInfo) {
if (lineageInfo == null) {
return "";
}
LineageContext context = lineageInfo.getContext();
return context == null ? "" : context.getQueryId();
}
}