SessionController.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.httpv2.controller;
import org.apache.doris.catalog.Env;
import org.apache.doris.httpv2.entity.ResponseBody;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.rest.RestBaseController;
import org.apache.doris.httpv2.rest.manager.HttpUtils;
import org.apache.doris.httpv2.rest.manager.NodeAction;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.system.Frontend;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
@RestController
@RequestMapping("/rest/v1")
public class SessionController extends RestBaseController {
private static final List<String> SESSION_TABLE_HEADER = Lists.newArrayList();
private static final Logger LOG = LogManager.getLogger(SessionController.class);
static {
SESSION_TABLE_HEADER.add("CurrentConnected");
SESSION_TABLE_HEADER.add("Id");
SESSION_TABLE_HEADER.add("User");
SESSION_TABLE_HEADER.add("Host");
SESSION_TABLE_HEADER.add("LoginTime");
SESSION_TABLE_HEADER.add("Catalog");
SESSION_TABLE_HEADER.add("Db");
SESSION_TABLE_HEADER.add("Command");
SESSION_TABLE_HEADER.add("Time");
SESSION_TABLE_HEADER.add("State");
SESSION_TABLE_HEADER.add("QueryId");
SESSION_TABLE_HEADER.add("Info");
SESSION_TABLE_HEADER.add("FE");
SESSION_TABLE_HEADER.add("CloudCluster");
}
@RequestMapping(path = "/session/all", method = RequestMethod.GET)
public Object allSession(HttpServletRequest request) {
Map<String, Object> result = Maps.newHashMap();
result.put("column_names", SESSION_TABLE_HEADER);
List<Map<String, String>> sessionInfo = Env.getCurrentEnv().getFrontends(null)
.stream()
.filter(Frontend::isAlive)
.map(frontend -> {
try {
return Env.getCurrentEnv().getSelfNode().getHost().equals(frontend.getHost())
? getSessionInfo()
: getOtherSessionInfo(request, frontend);
} catch (IOException e) {
LOG.warn("", e);
return null;
}
})
.filter(Objects::nonNull)
.flatMap(Collection::stream)
.collect(Collectors.toList());
result.put("rows", sessionInfo);
ResponseEntity entity = ResponseEntityBuilder.ok(result);
((ResponseBody) entity.getBody()).setCount(sessionInfo.size());
return entity;
}
@RequestMapping(path = "/session", method = RequestMethod.GET)
public Object session() {
Map<String, Object> result = Maps.newHashMap();
result.put("column_names", SESSION_TABLE_HEADER);
result.put("rows", getSessionInfo());
ResponseEntity entity = ResponseEntityBuilder.ok(result);
((ResponseBody) entity.getBody()).setCount(result.size());
return entity;
}
private List<Map<String, String>> getSessionInfo() {
List<ConnectContext.ThreadInfo> threadInfos = ExecuteEnv.getInstance().getScheduler()
.listConnection("root", false);
long nowMs = System.currentTimeMillis();
return threadInfos.stream()
.map(info -> info.toRow(-1, nowMs, Optional.empty()))
.map(row -> {
Map<String, String> record = new HashMap<>();
for (int i = 0; i < row.size(); i++) {
record.put(SESSION_TABLE_HEADER.get(i), row.get(i));
}
return record;
})
.collect(Collectors.toList());
}
private List<Map<String, String>> getOtherSessionInfo(HttpServletRequest request,
Frontend frontend) throws IOException {
Map<String, String> header = Maps.newHashMap();
header.put(NodeAction.AUTHORIZATION, request.getHeader(NodeAction.AUTHORIZATION));
String res = HttpUtils.doGet(String.format("http://%s:%s/rest/v1/session",
frontend.getHost(), Env.getCurrentEnv().getMasterHttpPort()), header);
ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> jsonMap = objectMapper.readValue(res,
new TypeReference<Map<String, Object>>() {});
List<Map<String, String>> maps = (List<Map<String, String>>)
((Map<String, Object>) jsonMap.get("data")).get("rows");
return maps;
}
}