PlsqlMetaClient.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.plsql.metastore;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.ClientPool;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TAddPlsqlPackageRequest;
import org.apache.doris.thrift.TAddPlsqlStoredProcedureRequest;
import org.apache.doris.thrift.TDropPlsqlPackageRequest;
import org.apache.doris.thrift.TDropPlsqlStoredProcedureRequest;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPlsqlPackage;
import org.apache.doris.thrift.TPlsqlProcedureKey;
import org.apache.doris.thrift.TPlsqlStoredProcedure;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;

import org.apache.thrift.TException;

import java.util.Map;
import java.util.Objects;

public class PlsqlMetaClient {
    public PlsqlMetaClient() {
    }

    public void addPlsqlStoredProcedure(String name, long catalogId, long dbId, String packageName,
            String ownerName, String source, String createTime, String modifyTime,
            boolean isForce) {
        checkPriv();
        if (Env.getCurrentEnv().isMaster()) {
            Env.getCurrentEnv().getPlsqlManager()
                    .addPlsqlStoredProcedure(
                            new PlsqlStoredProcedure(name, catalogId, dbId, packageName, ownerName, source,
                            createTime, modifyTime),
                            isForce);
        } else {
            addPlsqlStoredProcedureThrift(name, catalogId, dbId, packageName, ownerName, source,
                                            createTime, modifyTime, isForce);
        }
    }

    public void dropPlsqlStoredProcedure(String name, long catalogId, long dbId) {
        checkPriv();
        if (Env.getCurrentEnv().isMaster()) {
            Env.getCurrentEnv().getPlsqlManager()
                    .dropPlsqlStoredProcedure(new PlsqlProcedureKey(name, catalogId, dbId));
        } else {
            dropStoredProcedureThrift(name, catalogId, dbId);
        }
    }

    public PlsqlStoredProcedure getPlsqlStoredProcedure(String name, long catalogId, long dbId) {
        return Env.getCurrentEnv().getPlsqlManager()
                .getPlsqlStoredProcedure(new PlsqlProcedureKey(name, catalogId, dbId));
    }

    public Map<PlsqlProcedureKey, PlsqlStoredProcedure> getAllPlsqlStoredProcedures() {
        return Env.getCurrentEnv().getPlsqlManager()
                .getAllPlsqlStoredProcedures();
    }

    public void addPlsqlPackage(String name, long catalogId, long dbId, String ownerName, String header,
            String body) {
        checkPriv();
        if (Env.getCurrentEnv().isMaster()) {
            Env.getCurrentEnv().getPlsqlManager()
                    .addPackage(new PlsqlPackage(name, catalogId, dbId, ownerName, header, body),
                            false);
        } else {
            addPlsqlPackageThrift(name, catalogId, dbId, ownerName, header, body);
        }
    }

    public void dropPlsqlPackage(String name, long catalogId, long dbId) {
        checkPriv();
        if (Env.getCurrentEnv().isMaster()) {
            Env.getCurrentEnv().getPlsqlManager().dropPackage(new PlsqlProcedureKey(name, catalogId, dbId));
        } else {
            dropPlsqlPackageThrift(name, catalogId, dbId);
        }
    }

    public PlsqlPackage getPlsqlPackage(String name, long catalogId, long dbId) {
        return Env.getCurrentEnv().getPlsqlManager().getPackage(new PlsqlProcedureKey(name, catalogId, dbId));
    }

    protected void addPlsqlStoredProcedureThrift(String name, long catalogId, long dbId, String packageName,
            String ownerName,
            String source, String createTime, String modifyTime, boolean isForce) {
        TPlsqlStoredProcedure tPlsqlStoredProcedure = new TPlsqlStoredProcedure().setName(name)
                .setCatalogId(catalogId).setDbId(dbId)
                .setPackageName(packageName).setOwnerName(ownerName).setSource(source)
                .setCreateTime(createTime).setModifyTime(modifyTime);

        TAddPlsqlStoredProcedureRequest tAddPlsqlStoredProcedureRequest = new TAddPlsqlStoredProcedureRequest()
                .setPlsqlStoredProcedure(tPlsqlStoredProcedure);
        tAddPlsqlStoredProcedureRequest.setIsForce(isForce);

        try {
            sendUpdateRequest(tAddPlsqlStoredProcedureRequest,
                    (request, client) -> client.addPlsqlStoredProcedure(request).getStatus());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    protected void dropStoredProcedureThrift(String name, long catalogId, long dbId) {
        TPlsqlProcedureKey tPlsqlProcedureKey = new TPlsqlProcedureKey().setName(name).setCatalogId(catalogId)
                .setDbId(dbId);
        TDropPlsqlStoredProcedureRequest tDropPlsqlStoredProcedureRequest
                = new TDropPlsqlStoredProcedureRequest().setPlsqlProcedureKey(
                tPlsqlProcedureKey);

        try {
            sendUpdateRequest(tDropPlsqlStoredProcedureRequest,
                    (request, client) -> client.dropPlsqlStoredProcedure(request).getStatus());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    protected void addPlsqlPackageThrift(String name, long catalogId, long dbId, String ownerName,
            String header, String body) {
        TPlsqlPackage tPlsqlPackage = new TPlsqlPackage().setName(name).setCatalogId(catalogId)
                .setDbId(dbId).setOwnerName(ownerName).setHeader(header).setBody(body);
        TAddPlsqlPackageRequest tAddPlsqlPackageRequest = new TAddPlsqlPackageRequest()
                .setPlsqlPackage(tPlsqlPackage);

        try {
            sendUpdateRequest(tAddPlsqlPackageRequest,
                    (request, client) -> client.addPlsqlPackage(request).getStatus());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    protected void dropPlsqlPackageThrift(String name, long catalogId, long dbId) {
        TPlsqlProcedureKey tPlsqlProcedureKey = new TPlsqlProcedureKey().setName(name).setCatalogId(catalogId)
                .setDbId(dbId);
        TDropPlsqlPackageRequest tDropPlsqlPackageRequest = new TDropPlsqlPackageRequest().setPlsqlProcedureKey(
                tPlsqlProcedureKey);

        try {
            sendUpdateRequest(tDropPlsqlPackageRequest,
                    (request, client) -> client.dropPlsqlPackage(request).getStatus());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void checkPriv() {
        if (!Env.getCurrentEnv().getAccessManager()
                .checkGlobalPriv(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN)) {
            throw new RuntimeException(
                    "Access denied; you need (at least one of) the ADMIN privilege(s) for this operation");
        }
    }

    private <Request> void sendUpdateRequest(Request request,
            BiFunction<Request, FrontendService.Client, TStatus> sendRequest) throws Exception {
        TNetworkAddress masterAddress = new TNetworkAddress(Env.getCurrentEnv().getMasterHost(),
                Env.getCurrentEnv().getMasterRpcPort());
        FrontendService.Client client = ClientPool.frontendPool.borrowObject(masterAddress);
        TStatus status;
        boolean isReturnToPool = true;
        try {
            status = sendRequest.apply(request, client);
            checkResult(status);
        } catch (Exception e) {
            if (!ClientPool.frontendPool.reopen(client)) {
                isReturnToPool = false;
                throw e;
            }

            status = sendRequest.apply(request, client); // retry once
            checkResult(status);
        } finally {
            if (isReturnToPool) {
                ClientPool.frontendPool.returnObject(masterAddress, client);
            } else {
                ClientPool.frontendPool.invalidateObject(masterAddress, client);
            }
        }
    }

    private void checkResult(TStatus status) throws Exception {
        if (Objects.isNull(status) || !status.isSetStatusCode()) {
            throw new TException("Access master error, no status set.");
        }
        if (status.getStatusCode().equals(TStatusCode.OK)) {
            return;
        }
        throw new Exception(
                "Access fe error, code:" + status.getStatusCode().name() + ", mgs:" + status.getErrorMsgs());
    }

    @FunctionalInterface
    public interface BiFunction<T, U, R> {
        R apply(T t, U u) throws Exception;
    }
}