PluginMgr.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.plugin;
import org.apache.doris.analysis.InstallPluginStmt;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.nereids.parser.Dialect;
import org.apache.doris.plugin.PluginInfo.PluginType;
import org.apache.doris.plugin.PluginLoader.PluginStatus;
import org.apache.doris.plugin.audit.AuditLoader;
import org.apache.doris.plugin.audit.AuditLogBuilder;
import org.apache.doris.plugin.dialect.HttpDialectConverterPlugin;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
public class PluginMgr implements Writable {
private static final Logger LOG = LogManager.getLogger(PluginMgr.class);
public static final String BUILTIN_PLUGIN_PREFIX = "__builtin_";
private final Map<String, PluginLoader>[] plugins;
// use this for
private final Map<String, DialectConverterPlugin>[] dialectPlugins;
// all dynamic plugins should have unique names,
private final Set<String> dynamicPluginNames;
// Save this handler for external call
private AuditLoader auditLoader = null;
public PluginMgr() {
plugins = new Map[PluginType.MAX_PLUGIN_TYPE_SIZE];
for (int i = 0; i < PluginType.MAX_PLUGIN_TYPE_SIZE; i++) {
// use synchronized wrapper for thread-safe
plugins[i] = Collections.synchronizedSortedMap(Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER));
}
// use synchronized wrapper for thread-safe
dynamicPluginNames = Collections.synchronizedSortedSet(Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER));
dialectPlugins = new Map[Dialect.MAX_DIALECT_SIZE];
for (int i = 0; i < Dialect.MAX_DIALECT_SIZE; i++) {
// use synchronized wrapper for thread-safe
dialectPlugins[i] = Collections.synchronizedSortedMap(Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER));
}
}
// create the plugin dir if missing
public void init() throws PluginException {
File file = new File(Config.plugin_dir);
if (file.exists() && !file.isDirectory()) {
throw new PluginException("FE plugin dir " + Config.plugin_dir + " is not a directory");
}
if (!file.exists()) {
if (!file.mkdir()) {
throw new PluginException("failed to create FE plugin dir " + Config.plugin_dir);
}
}
initBuiltinPlugins();
}
private boolean checkDynamicPluginNameExist(String name) {
return dynamicPluginNames.contains(name);
}
private boolean addDynamicPluginNameIfAbsent(String name) {
return dynamicPluginNames.add(name);
}
private boolean removeDynamicPluginName(String name) {
return dynamicPluginNames.remove(name);
}
private void initBuiltinPlugins() {
// AuditLog: log audit log to file
AuditLogBuilder auditLogBuilder = new AuditLogBuilder();
if (!registerBuiltinPlugin(auditLogBuilder.getPluginInfo(), auditLogBuilder)) {
LOG.warn("failed to register audit log builder");
}
// AuditLoader: log audit log to internal table
this.auditLoader = new AuditLoader();
if (!registerBuiltinPlugin(auditLoader.getPluginInfo(), auditLoader)) {
LOG.warn("failed to register audit log builder");
}
// sql dialect converter
HttpDialectConverterPlugin httpDialectConverterPlugin = new HttpDialectConverterPlugin();
if (!registerBuiltinPlugin(httpDialectConverterPlugin.getPluginInfo(), httpDialectConverterPlugin)) {
LOG.warn("failed to register http dialect converter plugin");
}
// other builtin plugins
}
// install a plugin from user's command.
// install should be successfully, or nothing should be left if failed to install.
public PluginInfo installPlugin(String pluginPath, Map<String, String> properties, String md5Sum)
throws IOException, UserException {
PluginLoader pluginLoader = new DynamicPluginLoader(Config.plugin_dir, pluginPath, md5Sum);
pluginLoader.setStatus(PluginStatus.INSTALLING);
try {
PluginInfo info = pluginLoader.getPluginInfo();
if (properties != null) {
info.setProperties(properties);
}
if (checkDynamicPluginNameExist(info.getName())) {
throw new UserException("plugin " + info.getName() + " has already been installed.");
}
// install plugin
pluginLoader.install();
pluginLoader.setStatus(PluginStatus.INSTALLED);
if (!addDynamicPluginNameIfAbsent(info.getName())) {
throw new UserException("plugin " + info.getName() + " has already been installed.");
}
plugins[info.getTypeId()].put(info.getName(), pluginLoader);
// add dialect plugin
Plugin plugin = pluginLoader.getPlugin();
if (plugin instanceof DialectConverterPlugin) {
addDialectPlugin((DialectConverterPlugin) plugin, info);
}
Env.getCurrentEnv().getEditLog().logInstallPlugin(info);
LOG.info("install plugin {}", info.getName());
return info;
} catch (IOException | UserException e) {
pluginLoader.uninstall();
throw e;
}
}
// install a plugin from user's command.
// install should be successfully, or nothing should be left if failed to install.
public PluginInfo installPlugin(InstallPluginStmt stmt) throws IOException, UserException {
return installPlugin(stmt.getPluginPath(), stmt.getProperties(), stmt.getMd5sum());
}
private void addDialectPlugin(DialectConverterPlugin plugin, PluginInfo info) {
for (Dialect dialect : plugin.acceptDialects()) {
dialectPlugins[dialect.ordinal()].put(info.getName(), plugin);
}
}
private void removeDialectPlugin(String name) {
for (int i = 0; i < Dialect.MAX_DIALECT_SIZE; i++) {
dialectPlugins[i].remove(name);
}
}
/**
* Dynamic uninstall plugin.
* If uninstall failed, the plugin should NOT be removed from plugin manager.
*/
public PluginInfo uninstallPlugin(String name) throws IOException, UserException {
if (!checkDynamicPluginNameExist(name)) {
throw new DdlException("Plugin " + name + " does not exist");
}
for (int i = 0; i < PluginType.MAX_PLUGIN_TYPE_SIZE; i++) {
if (plugins[i].containsKey(name)) {
PluginLoader loader = plugins[i].get(name);
if (loader == null) {
// this is not an atomic operation, so even if containsKey() is true,
// we may still get null object by get() method
continue;
}
if (!loader.isDynamicPlugin()) {
throw new DdlException("Only support uninstall dynamic plugins");
}
loader.pluginUninstallValid();
loader.setStatus(PluginStatus.UNINSTALLING);
// uninstall plugin
loader.uninstall();
// uninstall succeed, remove the plugin
plugins[i].remove(name);
loader.setStatus(PluginStatus.UNINSTALLED);
removeDynamicPluginName(name);
removeDialectPlugin(name);
// do not get plugin info by calling loader.getPluginInfo(). That method will try to
// reload the plugin properties from source if this plugin is not installed successfully.
// Here we only need the plugin's name for persisting.
// TODO(cmy): This is a bad design to couple the persist info with PluginInfo, but for
// the compatibility, I till use this method.
return new PluginInfo(name);
}
}
throw new DdlException("Plugin " + name + " does not exist");
}
/**
* For built-in Plugin register
*/
public boolean registerBuiltinPlugin(PluginInfo pluginInfo, Plugin plugin) {
if (Objects.isNull(pluginInfo) || Objects.isNull(plugin) || Objects.isNull(pluginInfo.getType())
|| Strings.isNullOrEmpty(pluginInfo.getName())) {
return false;
}
PluginLoader loader = new BuiltinPluginLoader(Config.plugin_dir, pluginInfo, plugin);
try {
loader.install();
} catch (Exception e) {
LOG.warn("failed to register builtin plugin {}", pluginInfo.getName(), e);
return false;
}
// add dialect plugin
if (plugin instanceof DialectConverterPlugin) {
addDialectPlugin((DialectConverterPlugin) plugin, pluginInfo);
}
PluginLoader checkLoader = plugins[pluginInfo.getTypeId()].putIfAbsent(pluginInfo.getName(), loader);
return checkLoader == null;
}
/*
* replay load plugin.
* It must add the plugin to the "plugins" and "dynamicPluginNames", even if the plugin
* is not loaded successfully.
*/
public void replayLoadDynamicPlugin(PluginInfo info) throws IOException, UserException {
DynamicPluginLoader pluginLoader = new DynamicPluginLoader(Config.plugin_dir, info);
try {
// should add to "plugins" first before loading.
PluginLoader checkLoader = plugins[info.getTypeId()].putIfAbsent(info.getName(), pluginLoader);
if (checkLoader != null) {
throw new UserException("plugin " + info.getName() + " has already been installed.");
}
pluginLoader.setStatus(PluginStatus.INSTALLING);
// install plugin
pluginLoader.reload();
pluginLoader.setStatus(PluginStatus.INSTALLED);
// add dialect plugin
Plugin plugin = pluginLoader.getPlugin();
if (plugin instanceof DialectConverterPlugin) {
addDialectPlugin((DialectConverterPlugin) plugin, info);
}
} catch (IOException | UserException e) {
pluginLoader.setStatus(PluginStatus.ERROR, e.getMessage());
throw e;
} finally {
// this is a replay process, so whether it is successful or not, add it's name.
addDynamicPluginNameIfAbsent(info.getName());
}
}
public final Plugin getActivePlugin(String name, PluginType type) {
PluginLoader loader = plugins[type.ordinal()].get(name);
if (null != loader && loader.getStatus() == PluginStatus.INSTALLED) {
return loader.getPlugin();
}
return null;
}
public final List<Plugin> getActivePluginList(PluginType type) {
Map<String, PluginLoader> m = plugins[type.ordinal()];
List<Plugin> l = Lists.newArrayListWithCapacity(m.size());
m.values().forEach(d -> {
if (d.getStatus() == PluginStatus.INSTALLED) {
if (d.getPlugin() == null) {
LOG.warn("PluginLoader({}) status is INSTALLED, but plugin is null", d);
return;
}
l.add(d.getPlugin());
}
});
return ImmutableList.copyOf(l);
}
public final List<DialectConverterPlugin> getActiveDialectPluginList(Dialect dialect) {
Map<String, DialectConverterPlugin> m = dialectPlugins[dialect.ordinal()];
return ImmutableList.copyOf(m.values());
}
public final List<PluginInfo> getAllDynamicPluginInfo() {
List<PluginInfo> list = Lists.newArrayList();
for (Map<String, PluginLoader> map : plugins) {
map.values().forEach(loader -> {
try {
if (loader.isDynamicPlugin()) {
list.add(loader.getPluginInfo());
}
} catch (Exception e) {
LOG.warn("load plugin from {} failed", loader.source, e);
}
});
}
return list;
}
public List<List<String>> getPluginShowInfos() {
List<List<String>> rows = Lists.newArrayList();
for (Map<String, PluginLoader> map : plugins) {
for (Map.Entry<String, PluginLoader> entry : map.entrySet()) {
List<String> r = Lists.newArrayList();
PluginLoader loader = entry.getValue();
PluginInfo pi = null;
try {
pi = loader.getPluginInfo();
} catch (Exception e) {
// plugin may not be loaded successfully
LOG.warn("failed to get plugin info for plugin: {}", entry.getKey(), e);
}
r.add(entry.getKey());
r.add(pi != null ? pi.getType().name() : "UNKNOWN");
r.add(pi != null ? pi.getDescription() : "UNKNOWN");
r.add(pi != null ? pi.getVersion().toString() : "UNKNOWN");
r.add(pi != null ? pi.getJavaVersion().toString() : "UNKNOWN");
r.add(pi != null ? pi.getClassName() : "UNKNOWN");
r.add(pi != null ? pi.getSoName() : "UNKNOWN");
if (Strings.isNullOrEmpty(loader.source)) {
r.add("Builtin");
} else {
r.add(loader.source);
}
r.add(loader.getStatus().toString());
r.add(pi != null ? "{" + new PrintableMap<>(pi.getProperties(),
"=", true, false, true) + "}" : "UNKNOWN");
rows.add(r);
}
}
return rows;
}
public void flushAuditLog() {
if (auditLoader != null) {
auditLoader.loadIfNecessary(true);
}
}
public void readFields(DataInputStream dis) throws IOException {
int size = dis.readInt();
for (int i = 0; i < size; i++) {
try {
PluginInfo pluginInfo = PluginInfo.read(dis);
replayLoadDynamicPlugin(pluginInfo);
} catch (Exception e) {
LOG.warn("load plugin failed.", e);
}
}
}
@Override
public void write(DataOutput out) throws IOException {
// only need to persist dynamic plugins
List<PluginInfo> list = getAllDynamicPluginInfo();
int size = list.size();
out.writeInt(size);
for (PluginInfo pc : list) {
pc.write(out);
}
}
}