ShowLoadWarningsCommand.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands;

import org.apache.doris.analysis.LimitElement;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.cloud.load.CloudLoadManager;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.trees.expressions.And;
import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
import org.apache.doris.nereids.trees.expressions.CompoundPredicate;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.literal.IntegerLikeLiteral;
import org.apache.doris.nereids.trees.expressions.literal.StringLikeLiteral;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSet;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
 * show load warnings command
 */
public class ShowLoadWarningsCommand extends ShowCommand {
    private static final Logger LOG = LogManager.getLogger(ShowLoadWarningsCommand.class);
    private static final ShowResultSetMetaData META_DATA =
            ShowResultSetMetaData.builder()
                .addColumn(new Column("JobId", ScalarType.createVarchar(128)))
                .addColumn(new Column("Label", ScalarType.createVarchar(128)))
                .addColumn(new Column("ErrorMsgDetail", ScalarType.createVarchar(200)))
                .build();
    private static final String LABEL = "label";
    private static final String LOAD_JOB_ID = "load_job_id";
    private String dbName;
    private URL url;
    private String label;
    private long jobId;
    private final String rawUrl;
    private final Expression wildWhere;
    private final long limit;
    private final long offset;

    /**
     * constructor for show load warnings
     */
    public ShowLoadWarningsCommand(String dbName, Expression wildWhere, long limit, long offset, String rawUrl) {
        super(PlanType.SHOW_LOAD_WARNINGS_COMMAND);
        this.dbName = dbName;
        this.wildWhere = wildWhere;
        this.limit = limit;
        this.offset = offset;
        this.rawUrl = rawUrl;
    }

    private void analyzeUrl() throws AnalysisException {
        try {
            url = new URL(rawUrl);
        } catch (MalformedURLException e) {
            throw new AnalysisException("Invalid url: " + e.getMessage());
        }
    }

    private void analyzeSubPredicate(Expression subExpr) throws AnalysisException {
        boolean valid = false;
        boolean hasLabel = false;
        boolean hasLoadJobId = false;
        do {
            if (subExpr == null) {
                valid = false;
                break;
            }

            if (subExpr instanceof ComparisonPredicate) {
                if (!(subExpr instanceof EqualTo)) {
                    valid = false;
                    break;
                }
            } else {
                valid = false;
                break;
            }

            // left child
            if (!(subExpr.child(0) instanceof UnboundSlot)) {
                valid = false;
                break;
            }
            String leftKey = ((UnboundSlot) subExpr.child(0)).getName();
            if (leftKey.equalsIgnoreCase(LABEL)) {
                hasLabel = true;
            } else if (leftKey.equalsIgnoreCase(LOAD_JOB_ID)) {
                hasLoadJobId = true;
            } else {
                valid = false;
                break;
            }

            if (hasLabel) {
                if (!(subExpr.child(1) instanceof StringLikeLiteral)) {
                    valid = false;
                    break;
                }

                String value = ((StringLikeLiteral) subExpr.child(1)).getStringValue();
                if (Strings.isNullOrEmpty(value)) {
                    valid = false;
                    break;
                }

                label = value;
            }

            if (hasLoadJobId) {
                if (!(subExpr.child(1) instanceof IntegerLikeLiteral)) {
                    LOG.warn("load_job_id is not IntLiteral. value: {}", subExpr.toSql());
                    valid = false;
                    break;
                }
                jobId = ((IntegerLikeLiteral) subExpr.child(1)).getLongValue();
            }

            valid = true;
        } while (false);

        if (!valid) {
            throw new AnalysisException("Where clause should looks like: LABEL = \"your_load_label\","
                + " or LOAD_JOB_ID = $job_id");
        }
    }

    private void validate(ConnectContext ctx) throws AnalysisException {
        if (rawUrl != null) {
            // get load error from url
            if (rawUrl.isEmpty()) {
                throw new AnalysisException("Error load url is missing");
            }

            LimitElement limitElement = null;
            if (limit > 0) {
                limitElement = new LimitElement(offset == -1L ? 0 : offset, limit);
            }
            if (dbName != null || wildWhere != null || limitElement != null) {
                throw new AnalysisException(
                        "Can not set database, where or limit clause if getting error log from url");
            }

            // url should like:
            // http://be_ip:be_http_port/api/_load_error_log?file=__shard_xxx/error_log_xxx
            analyzeUrl();
        } else {
            if (Strings.isNullOrEmpty(dbName)) {
                dbName = ctx.getDatabase();
                if (Strings.isNullOrEmpty(dbName)) {
                    ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
                }
            }

            // analyze where clause if not null
            if (wildWhere == null) {
                throw new AnalysisException("should supply condition like: LABEL = \"your_load_label\","
                    + " or LOAD_JOB_ID = $job_id");
            }

            if (wildWhere != null) {
                if (wildWhere instanceof CompoundPredicate) {
                    if (!(wildWhere instanceof And)) {
                        throw new AnalysisException("Only allow compound predicate with operator AND");
                    }
                    for (Expression child : wildWhere.children()) {
                        analyzeSubPredicate(child);
                    }
                } else {
                    analyzeSubPredicate(wildWhere);
                }
            }
        }
    }

    private ShowResultSet handleShowLoadWarningsFromURL(URL url) throws AnalysisException {
        String host = url.getHost();
        if (host.startsWith("[") && host.endsWith("]")) {
            host = host.substring(1, host.length() - 1);
        }
        int port = url.getPort();
        SystemInfoService infoService = Env.getCurrentSystemInfo();
        Backend be = infoService.getBackendWithHttpPort(host, port);
        if (be == null) {
            throw new AnalysisException(NetUtils.getHostPortInAccessibleFormat(host, port) + " is not a valid backend");
        }
        if (!be.isAlive()) {
            throw new AnalysisException(
                "Backend " + NetUtils.getHostPortInAccessibleFormat(host, port) + " is not alive");
        }

        if (!url.getPath().equals("/api/_load_error_log")) {
            throw new AnalysisException(
                "Invalid error log path: " + url.getPath() + ". path should be: /api/_load_error_log");
        }

        List<List<String>> rows = Lists.newArrayList();
        try {
            URLConnection urlConnection = url.openConnection();
            InputStream inputStream = urlConnection.getInputStream();
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
                int limit = 100;
                while (reader.ready() && limit > 0) {
                    String line = reader.readLine();
                    rows.add(Lists.newArrayList("-1", FeConstants.null_string, line));
                    limit--;
                }
            }
        } catch (Exception e) {
            LOG.warn("failed to get error log from url: " + url, e);
            throw new AnalysisException(
                "failed to get error log from url: " + url + ". reason: " + e.getMessage());
        }

        return new ShowResultSet(getMetaData(), rows);
    }

    private ShowResultSet handleShowLoadWarningV2(Database db) throws AnalysisException {
        LoadManager loadManager = Env.getCurrentEnv().getLoadManager();
        if (label != null) {
            List<List<Comparable>> loadJobInfosByDb;
            if (!Config.isCloudMode()) {
                loadJobInfosByDb = loadManager.getLoadJobInfosByDb(db.getId(), label, true, null);
            } else {
                loadJobInfosByDb = ((CloudLoadManager) loadManager)
                    .getLoadJobInfosByDb(db.getId(), label, true, null, null, null, false, null, false, null, false);
            }
            if (CollectionUtils.isEmpty(loadJobInfosByDb)) {
                throw new AnalysisException("job does not exist");
            }
            List<List<String>> infoList = Lists.newArrayListWithCapacity(loadJobInfosByDb.size());
            for (List<Comparable> comparables : loadJobInfosByDb) {
                List<String> singleInfo = comparables.stream().map(Object::toString).collect(Collectors.toList());
                infoList.add(singleInfo);
            }
            return new ShowResultSet(getMetaData(), infoList);
        }
        org.apache.doris.load.loadv2.LoadJob loadJob = loadManager.getLoadJob(jobId);
        if (loadJob == null) {
            throw new AnalysisException("job does not exist");
        }
        List<String> singleInfo;
        try {
            singleInfo = loadJob
                .getShowInfo()
                .stream()
                .map(Objects::toString)
                .collect(Collectors.toList());
        } catch (DdlException e) {
            throw new AnalysisException(e.getMessage());
        }
        return new ShowResultSet(getMetaData(), Lists.newArrayList(Collections.singleton(singleInfo)));
    }

    @VisibleForTesting
    protected ShowResultSet handleShowLoadWarnings(ConnectContext ctx, StmtExecutor executor) throws Exception {
        validate(ctx);

        if (url != null) {
            return handleShowLoadWarningsFromURL(url);
        }

        Env env = Env.getCurrentEnv();
        // try to fetch load id from mysql load first and mysql load only support find by label.
        if (label != null) {
            String urlString = env.getLoadManager().getMysqlLoadManager().getErrorUrlByLoadId(label);
            if (urlString != null && !urlString.isEmpty()) {
                URL url;
                try {
                    url = new URL(urlString);
                } catch (MalformedURLException e) {
                    throw new AnalysisException("Invalid url: " + e.getMessage());
                }
                return handleShowLoadWarningsFromURL(url);
            }
        }

        Database db = env.getInternalCatalog().getDbOrAnalysisException(dbName);
        return handleShowLoadWarningV2(db);
    }

    @Override
    public ShowResultSetMetaData getMetaData() {
        return META_DATA;
    }

    @Override
    public ShowResultSet doRun(ConnectContext ctx, StmtExecutor executor) throws Exception {
        return handleShowLoadWarnings(ctx, executor);
    }

    @Override
    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
        return visitor.visitShowLoadWarningsCommand(this, context);
    }
}