LoadSubmitter.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.httpv2.util;

import org.apache.doris.catalog.Env;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.httpv2.rest.UploadAction;
import org.apache.doris.system.Backend;
import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.system.SystemInfoService;

import com.google.common.base.Strings;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Type;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

public class LoadSubmitter {
    private static final Logger LOG = LogManager.getLogger(LoadSubmitter.class);

    private ThreadPoolExecutor executor = ThreadPoolManager.newDaemonCacheThreadPoolThrowException(
                        Config.http_load_submitter_max_worker_threads, "load-submitter", true);

    private static final String STREAM_LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load";

    public Future<SubmitResult> submit(UploadAction.LoadContext loadContext) {
        LoadSubmitter.Worker worker = new LoadSubmitter.Worker(loadContext);
        return executor.submit(worker);
    }

    private static class Worker implements Callable<SubmitResult> {

        private UploadAction.LoadContext loadContext;

        public Worker(UploadAction.LoadContext loadContext) {
            this.loadContext = loadContext;
        }

        @Override
        public SubmitResult call() throws Exception {
            try {
                return load();
            } catch (Throwable e) {
                LOG.warn("failed to submit load. label: {}", loadContext.label, e);
                throw e;
            }
        }

        private SubmitResult load() throws Exception {
            // choose a backend to submit the stream load
            Backend be = selectOneBackend();

            String hostPort = NetUtils.getHostPortInAccessibleFormat(be.getHost(), be.getHttpPort());
            String loadUrlStr = String.format(STREAM_LOAD_URL_PATTERN, hostPort, loadContext.db, loadContext.tbl);
            URL loadUrl = new URL(loadUrlStr);
            HttpURLConnection conn = (HttpURLConnection) loadUrl.openConnection();
            conn.setRequestMethod("PUT");
            String auth = String.format("%s:%s", ClusterNamespace.getNameFromFullName(loadContext.user),
                    loadContext.passwd);
            String authEncoding = Base64.getEncoder().encodeToString(auth.getBytes(StandardCharsets.UTF_8));
            conn.setRequestProperty("Authorization", "Basic " + authEncoding);
            conn.addRequestProperty("Expect", "100-continue");
            conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
            if (!Strings.isNullOrEmpty(loadContext.columns)) {
                conn.addRequestProperty("columns", loadContext.columns);
            }
            if (!Strings.isNullOrEmpty(loadContext.columnSeparator)) {
                conn.addRequestProperty("column_separator", loadContext.columnSeparator);
            }
            if (!Strings.isNullOrEmpty(loadContext.label)) {
                conn.addRequestProperty("label", loadContext.label);
            }
            conn.setDoOutput(true);
            conn.setDoInput(true);

            File loadFile = checkAndGetFile(loadContext.file);
            try (BufferedOutputStream bos = new BufferedOutputStream(conn.getOutputStream());
                    BufferedInputStream bis = new BufferedInputStream(new FileInputStream(loadFile));) {
                int i;
                while ((i = bis.read()) > 0) {
                    bos.write(i);
                }
            }

            int status = conn.getResponseCode();
            String respMsg = conn.getResponseMessage();

            LOG.info("get status: {}, response msg: {}", status, respMsg);

            InputStream stream = (InputStream) conn.getContent();
            BufferedReader br = new BufferedReader(new InputStreamReader(stream));
            StringBuilder sb = new StringBuilder();
            String line;
            while ((line = br.readLine()) != null) {
                sb.append(line);
            }
            Type type = new TypeToken<SubmitResult>() {
            }.getType();
            SubmitResult result = new Gson().fromJson(sb.toString(), type);
            return result;
        }

        private File checkAndGetFile(TmpFileMgr.TmpFile tmpFile) {
            File file = new File(tmpFile.absPath);
            return file;
        }

        private Backend selectOneBackend() throws LoadException {
            BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().build();
            List<Long> backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
            if (backendIds.isEmpty()) {
                throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
            }
            Backend backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
            if (backend == null) {
                throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
            }
            return backend;
        }
    }

    // CHECKSTYLE OFF: These name must match the name in json, case-sensitive.
    public static class SubmitResult {
        public String TxnId;
        public String Label;
        public String Status;
        public String ExistingJobStatus;
        public String Message;
        public String NumberTotalRows;
        public String NumberLoadedRows;
        public String NumberFilteredRows;
        public String NumberUnselectedRows;
        public String LoadBytes;
        public String LoadTimeMs;
        public String BeginTxnTimeMs;
        public String StreamLoadPutTimeMs;
        public String ReadDataTimeMs;
        public String WriteDataTimeMs;
        public String CommitAndPublishTimeMs;
        public String ErrorURL;
    }
    // CHECKSTYLE ON
}