JsonStreamResponse.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.httpv2.util.streamresponse;

import org.apache.doris.httpv2.rest.RestApiStatusCode;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.stream.JsonWriter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpServletResponse;

/**
 * Serialize the ResultSet to JSON, and then response to client
 */
public class JsonStreamResponse extends StreamResponseInf {
    private static final Logger LOG = LogManager.getLogger(JsonStreamResponse.class);
    private static final Gson gson = new Gson();
    private JsonWriter jsonWriter;

    public static final String Name = "Json";

    public JsonStreamResponse(HttpServletResponse response) {
        super(response);
    }

    /**
     * Result json sample:
     * {
     *  "data": {
     *      "type": "result_set",
     *      "meta": {},
     *      "data": [],
     *      "time": 10
     *  },
     *  "msg" : "success",
     *  "code" : 0
     * }
     */
    @Override
    public void handleQueryAndShow(ResultSet rs, long startTime) throws Exception {
        response.setContentType("application/json;charset=utf-8");
        out = response.getWriter();
        jsonWriter = new JsonWriter(out);
        jsonWriter.setIndent("    ");
        try {
            // begin write response
            jsonWriter.beginObject();
            // data
            writeResultSetData(rs, jsonWriter, startTime);
            // suffix contains msg, code.
            writeResponseSuffix(jsonWriter);
            jsonWriter.endObject();
        } catch (SQLException e) {
            LOG.warn("Write response error.", e);
        } finally {
            jsonWriter.flush();
            try {
                jsonWriter.close();
            } catch (IOException e) {
                LOG.warn("JSONWriter close exception: ", e);
            }
        }
    }

    /**
     * Result json sample:
     * {
     *  "data": {
     *      "type": "exec_status",
     *      "status": {},
     *      "time": 10
     *  },
     *  "msg" : "success",
     *  "code" : 0
     * }
     */
    @Override
    public void handleDdlAndExport(long startTime) throws Exception {
        response.setContentType("application/json;charset=utf-8");
        out = response.getWriter();
        jsonWriter = new JsonWriter(out);
        jsonWriter.setIndent("    ");
        jsonWriter.beginObject();
        jsonWriter.name("msg").value("success")
                .name("code").value(RestApiStatusCode.OK.code);
        writeExecStatusData(startTime);
        jsonWriter.endObject();

    }

    public StreamResponseType getType() {
        return StreamResponseType.JSON;
    }

    private void writeResultSetData(ResultSet rs, JsonWriter jsonWriter, long startTime)
                                    throws IOException, SQLException {
        // data
        jsonWriter.name("data");
        jsonWriter.beginObject();
        // data-type
        jsonWriter.name("type").value(StreamResponseInf.TYPE_RESULT_SET);
        if (rs == null) {
            jsonWriter.endObject(); // data
            return;
        }
        ResultSetMetaData metaData = rs.getMetaData();
        int colNum = metaData.getColumnCount();
        List<Map<String, String>> metaFields = Lists.newArrayList();
        // index start from 1
        for (int i = 1; i <= colNum; ++i) {
            Map<String, String> field = Maps.newHashMap();
            field.put("name", metaData.getColumnName(i));
            field.put("type", metaData.getColumnTypeName(i));
            metaFields.add(field);
        }
        // data-meta
        String metaJson = gson.toJson(metaFields);
        jsonWriter.name("meta").jsonValue(metaJson);
        // data-data
        jsonWriter.name("data");
        jsonWriter.beginArray();
        // when bufferSize == batchSize, flush jsonWriter.
        int bufferSize = 0;
        long firstRowTime = 0;
        boolean begin = false;
        while (rs.next()) {
            List<Object> row = Lists.newArrayListWithCapacity(colNum);
            // index start from 1
            for (int i = 1; i <= colNum; ++i) {
                String type = rs.getMetaData().getColumnTypeName(i);
                if ("DATE".equalsIgnoreCase(type) || "DATETIME".equalsIgnoreCase(type)
                        || "DATEV2".equalsIgnoreCase(type) || "DATETIMEV2".equalsIgnoreCase(type)) {
                    row.add(rs.getString(i));
                } else {
                    row.add(rs.getObject(i));
                }
            }
            if (begin == false) {
                firstRowTime = (System.currentTimeMillis() - startTime);
                begin = true;
            }
            String rowJson = gson.toJson(row);
            jsonWriter.jsonValue(rowJson);
            ++bufferSize;
            if (bufferSize == streamBatchSize) {
                jsonWriter.flush();
                bufferSize = 0;
            }
        }
        jsonWriter.endArray();
        // data-time
        // the time seems no meaning, because the time contains json serialize time.
        jsonWriter.name("time").value(firstRowTime);
        jsonWriter.endObject(); // data
        jsonWriter.flush();
    }

    private void writeExecStatusData(long startTime) throws IOException {
        // data
        jsonWriter.name("data");
        jsonWriter.beginObject();
        // data-type
        jsonWriter.name("type").value(StreamResponseInf.TYPE_EXEC_STATUS);
        String statusJson = gson.toJson(Maps.newHashMap());
        // data-status
        jsonWriter.name("status").jsonValue(statusJson);
        // data-time
        jsonWriter.name("time").value((System.currentTimeMillis() - startTime));
        jsonWriter.endObject();
    }

    private void writeResponseSuffix(JsonWriter jsonWriter) throws IOException {
        // msg
        jsonWriter.name("msg").value("success");
        // code
        jsonWriter.name("code").value(RestApiStatusCode.OK.code);
    }
}