Profile.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.profile;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.stats.HboPlanInfoProvider;
import org.apache.doris.nereids.stats.HboPlanStatisticsManager;
import org.apache.doris.nereids.stats.HboUtils;
import org.apache.doris.nereids.stats.MemoryHboPlanStatisticsProvider;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanNodeAndHash;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.planner.Planner;
import org.apache.doris.statistics.hbo.InputTableStatisticsInfo;
import org.apache.doris.statistics.hbo.PlanStatistics;
import org.apache.doris.statistics.hbo.PlanStatisticsMatchStrategy;
import org.apache.doris.statistics.hbo.PlanStatisticsWithInputInfo;
import org.apache.doris.statistics.hbo.RecentRunsPlanStatistics;
import org.apache.doris.statistics.hbo.RecentRunsPlanStatisticsEntry;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.thrift.TPlanNodeRuntimeStatsItem;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
/**
* Profile is a class to record the execution time of a query. It has the
* following structure: root profile: // summary of this profile, such as start
* time, end time, query id, etc. [SummaryProfile] // each execution profile is
* a complete execution of a query, a job may contain multiple queries.
* [List<ExecutionProfile>].
* There maybe multi execution profiles for one job, for example broker load job.
* It will create one execution profile for every single load task.
*
* SummaryProfile: Summary: Execution Summary:
*
*
* ExecutionProfile1: Fragment 0: Fragment 1: ...
* ExecutionProfile2: Fragment 0: Fragment 1: ...
*
* ExecutionProfile: Fragment 0: Fragment 1: ...
* And also summary profile contains plan information, but execution profile is for
* be execution time.
* StmtExecutor(Profile) ---> Coordinator(ExecutionProfile)
*/
public class Profile {
private static final Logger LOG = LogManager.getLogger(Profile.class);
private static final int MergedProfileLevel = 1;
// profile file name format: time_id
private static final String SEPERATOR = "_";
private static final String PROFILE_ENTRY_SUFFIX = ".profile";
// summaryProfile will be serialized to storage as JSON, and we can recover it from storage
// recover of SummaryProfile is important, because it contains the meta information of the profile
// we need it to construct memory index for profile retrieving.
private SummaryProfile summaryProfile = new SummaryProfile();
// executionProfiles will be stored to storage as text, when getting profile content, we will read
// from storage directly.
List<ExecutionProfile> executionProfiles = Lists.newArrayList();
// profileStoragePath will only be assigned when:
// 1. profile is stored to storage
// 2. or profile is loaded from storage
private String profileStoragePath = "";
// isQueryFinished means the coordinator or stmt executor is finished.
// does not mean the profile report has finished, since the report is async.
// finish of collection of profile is marked by isCompleted of ExecutionProfiles.
boolean isQueryFinished = false;
// when coordinator finishes, it will mark finish time.
// we will wait for about 5 seconds to see if all profiles have been reported.
// if not, we will store the profile to storage, and release the memory,
// further report will be ignored.
// why MAX_VALUE? So that we can use PriorityQueue to sort profile by finish time decreasing order.
private long queryFinishTimestamp = Long.MAX_VALUE;
private Map<Integer, String> planNodeMap = Maps.newHashMap();
private int profileLevel = MergedProfileLevel;
protected long autoProfileDurationMs = -1;
// Profile size is the size of profile file
private long profileSize = 0;
private PhysicalPlan physicalPlan;
public Map<String, Long> rowsProducedMap = new HashMap<>();
private Set<PhysicalRelation> physicalRelations = new LinkedHashSet<>();
private String changedSessionVarCache = "";
// Need default constructor for read from storage
public Profile() {}
public Profile(boolean isEnable, int profileLevel, long autoProfileDurationMs) {
this.summaryProfile = new SummaryProfile();
// if disabled, just set isFinished to true, so that update() will do nothing
this.isQueryFinished = !isEnable;
this.profileLevel = profileLevel;
this.autoProfileDurationMs = autoProfileDurationMs;
}
// check if the profile file is valid and create a file input stream
// user need to close the file stream.
static FileInputStream createPorfileFileInputStream(String path) {
File profileFile = new File(path);
if (!profileFile.isFile()) {
LOG.warn("Profile storage path {} is invalid, its not a file.", profileFile.getAbsolutePath());
return null;
}
String[] parts = path.split(File.separator);
if (parts.length < 1) {
LOG.warn("Profile storage path {} is invalid", profileFile.getAbsolutePath());
return null;
}
// Profile could be a load task with multiple queries, so we call it id.
if (parseProfileFileName(parts[parts.length - 1]) == null) {
LOG.warn("{} is not a valid profile file", profileFile.getAbsolutePath());
return null;
}
FileInputStream profileMetaFileInputStream = null;
try {
profileMetaFileInputStream = new FileInputStream(path);
} catch (Exception e) {
LOG.warn("Open profile file {} failed", path, e);
}
return profileMetaFileInputStream;
}
// For normal profile, the profile id is a TUniqueId, but for broker load, the profile id is a long.
public static String[] parseProfileFileName(String profileFileName) {
if (!profileFileName.endsWith(".zip")) {
LOG.warn("Invalid profile name {}", profileFileName);
return null;
} else {
profileFileName = profileFileName.substring(0, profileFileName.length() - 4);
}
String [] timeAndID = profileFileName.split(SEPERATOR);
if (timeAndID.length != 2) {
return null;
}
try {
DebugUtil.parseTUniqueIdFromString(timeAndID[1]);
} catch (NumberFormatException e) {
try {
Long.parseLong(timeAndID[1]);
} catch (NumberFormatException e2) {
return null;
}
}
return timeAndID;
}
public static Profile read(String path) {
FileInputStream fileInputStream = null;
try {
fileInputStream = new FileInputStream(path);
File profileFile = new File(path);
long fileSize = profileFile.length();
ZipInputStream zipIn = new ZipInputStream(fileInputStream);
ZipEntry entry = zipIn.getNextEntry();
if (entry == null) {
LOG.error("Invalid zip profile file, {}", path);
return null;
}
// Read zip entry content into memory
ByteArrayOutputStream entryContent = new ByteArrayOutputStream();
byte[] buffer = new byte[1024 * 50];
int readBytes;
while ((readBytes = zipIn.read(buffer)) != -1) {
entryContent.write(buffer, 0, readBytes);
}
// Parse profile data using memory stream
DataInputStream memoryDataInput = new DataInputStream(
new ByteArrayInputStream(entryContent.toByteArray()));
Profile res = new Profile();
res.summaryProfile = SummaryProfile.read(memoryDataInput);
res.profileStoragePath = path;
res.isQueryFinished = true;
res.profileSize = fileSize;
String[] parts = path.split(File.separator);
String filename = parts[parts.length - 1];
String queryFinishTimeStr = parseProfileFileName(filename)[0];
res.queryFinishTimestamp = Long.valueOf(queryFinishTimeStr);
if (LOG.isDebugEnabled()) {
LOG.debug("Read profile from storage: {}", res.summaryProfile.getProfileId());
}
return res;
} catch (Exception exception) {
LOG.error("read profile failed", exception);
return null;
} finally {
if (fileInputStream != null) {
try {
fileInputStream.close();
} catch (Exception e) {
LOG.warn("close profile file {} failed", path, e);
}
}
}
}
// For load task, the profile contains many execution profiles
public void addExecutionProfile(ExecutionProfile executionProfile) {
if (executionProfile == null) {
LOG.warn("try to set a null execution profile, it is abnormal", new Exception());
return;
}
executionProfile.setSummaryProfile(summaryProfile);
this.executionProfiles.add(executionProfile);
}
public List<ExecutionProfile> getExecutionProfiles() {
return this.executionProfiles;
}
// This API will also add the profile to ProfileManager, so that we could get the profile from ProfileManager.
// isFinished ONLY means the coordinator or stmt executor is finished.
public synchronized void updateSummary(Map<String, String> summaryInfo, boolean isFinished,
Planner planner) {
try {
if (this.isQueryFinished) {
return;
}
if (planner != null && planner instanceof NereidsPlanner) {
NereidsPlanner nereidsPlanner = ((NereidsPlanner) planner);
physicalPlan = nereidsPlanner.getPhysicalPlan();
physicalRelations.addAll(nereidsPlanner.getPhysicalRelations());
if (profileLevel >= 3) {
FragmentIdMapping<DistributedPlan> distributedPlans = nereidsPlanner.getDistributedPlans();
if (distributedPlans != null) {
summaryInfo.put(SummaryProfile.DISTRIBUTED_PLAN,
DistributedPlan.toString(Lists.newArrayList(distributedPlans.values()))
.replace("\n", "\n ")
);
}
}
}
summaryProfile.update(summaryInfo);
if (isFinished) {
this.markQueryFinished();
long durationMs = this.queryFinishTimestamp - summaryProfile.getQueryBeginTime();
// Duration ls less than autoProfileDuration, remove it from memory.
long durationThreshold = executionProfiles.isEmpty()
? autoProfileDurationMs : executionProfiles.size() * autoProfileDurationMs;
if (this.queryFinishTimestamp != Long.MAX_VALUE && durationMs < durationThreshold) {
ProfileManager.getInstance().removeProfile(this.getId());
if (LOG.isDebugEnabled()) {
LOG.debug("Removed profile {} because it's costs {} is less than {}", this.getId(),
durationMs, autoProfileDurationMs * this.executionProfiles.size());
}
return;
}
summaryProfile.queryFinished();
}
// Nereids native insert not set planner, so it is null
if (planner != null) {
this.planNodeMap = planner.getExplainStringMap();
}
ProfileManager.getInstance().pushProfile(this);
} catch (Throwable t) {
LOG.warn("update profile {} failed", getId(), t);
throw t;
}
}
public SummaryProfile getSummaryProfile() {
return summaryProfile;
}
public String getProfileByLevel() {
StringBuilder builder = new StringBuilder();
// add summary to builder
summaryProfile.prettyPrint(builder);
getChangedSessionVars(builder);
getExecutionProfileContent(builder);
getOnStorageProfile(builder);
return builder.toString();
}
// If the query is already finished, and user wants to get the profile, we should check
// if BE has reported all profiles, if not, sleep 2s.
private void waitProfileCompleteIfNeeded() {
if (!this.isQueryFinished) {
return;
}
boolean allCompleted = true;
for (ExecutionProfile executionProfile : executionProfiles) {
if (!executionProfile.isCompleted()) {
allCompleted = false;
break;
}
}
if (!allCompleted) {
try {
Thread.currentThread().sleep(2000);
} catch (InterruptedException e) {
// Do nothing
}
}
}
private RuntimeProfile composeRootProfile() {
RuntimeProfile rootProfile = new RuntimeProfile(getId());
rootProfile.addChild(summaryProfile.getSummary(), true);
rootProfile.addChild(summaryProfile.getExecutionSummary(), true);
for (ExecutionProfile executionProfile : executionProfiles) {
rootProfile.addChild(executionProfile.getRoot(), true);
}
rootProfile.computeTimeInProfile();
return rootProfile;
}
public String getProfileBrief() {
RuntimeProfile rootProfile = composeRootProfile();
Gson gson = new GsonBuilder().setPrettyPrinting().create();
return gson.toJson(rootProfile.toBrief());
}
public void publishHboPlanStatistics(String queryId, List<TPlanNodeRuntimeStatsItem> curPlanNodeRuntimeStats) {
HboPlanStatisticsManager hboManager = Env.getCurrentEnv().getHboPlanStatisticsManager();
MemoryHboPlanStatisticsProvider hboPlanStatisticsProvider = (MemoryHboPlanStatisticsProvider)
hboManager.getHboPlanStatisticsProvider();
HboPlanInfoProvider planInfoProvider = hboManager.getHboPlanInfoProvider();
if (hboPlanStatisticsProvider != null && planInfoProvider != null) {
Map<Integer, PhysicalPlan> idToPlanMap = planInfoProvider.getIdToPlanMap(queryId);
Map<PhysicalPlan, Integer> planToIdMap = planInfoProvider.getPlanToIdMap(queryId);
Map<RelationId, Set<Expression>> scanToFilterMap = planInfoProvider.getScanToFilterMap(queryId);
if (!idToPlanMap.isEmpty() && idToPlanMap.size() == planToIdMap.size()) {
Map<PlanNodeAndHash, PlanStatisticsWithInputInfo> curPlanStatistics = HboUtils.genPlanStatisticsMap(
idToPlanMap, planToIdMap, scanToFilterMap, curPlanNodeRuntimeStats);
Map<PlanNodeAndHash, RecentRunsPlanStatistics> recentRunsPlanStatisticsMap =
hboPlanStatisticsProvider.getHboPlanStats(
curPlanStatistics.keySet().stream().collect(Collectors.toList()));
// update plan statistics
Map<PlanNodeAndHash, RecentRunsPlanStatistics> newPlanStatistics = curPlanStatistics.entrySet().stream()
.filter(entry -> entry.getKey().getHash().isPresent()
&& entry.getValue().getInputTableInfo().getInputTableStatistics().isPresent())
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> {
RecentRunsPlanStatistics recentRunsPlanStatistics = Optional.ofNullable(
recentRunsPlanStatisticsMap.get(entry.getKey()))
.orElseGet(RecentRunsPlanStatistics::empty);
InputTableStatisticsInfo curInputTableStatisticsInfo = entry
.getValue().getInputTableInfo();
// find the most matching entry to do the refreshment.
return updatePlanStatistics(
recentRunsPlanStatistics,
curInputTableStatisticsInfo.getInputTableStatistics().get(),
entry.getValue().getPlanStatistics());
}));
// publish stats and refresh cache on current matching key hashing
if (!newPlanStatistics.isEmpty()) {
hboPlanStatisticsProvider.putHboPlanStats(ImmutableMap.copyOf(newPlanStatistics));
for (Entry<PlanNodeAndHash, RecentRunsPlanStatistics> entry : newPlanStatistics.entrySet()) {
PlanNodeAndHash planHash = entry.getKey();
RecentRunsPlanStatistics planEntries = entry.getValue();
hboPlanStatisticsProvider.syncHboPlanStats(planHash, planEntries);
}
}
}
}
}
private RecentRunsPlanStatistics updatePlanStatistics(
RecentRunsPlanStatistics recentRunsPlanStatistics,
List<PlanStatistics> curInputTableStatistics,
PlanStatistics newPlanStatistics) {
List<RecentRunsPlanStatisticsEntry> recentRunsStatistics = recentRunsPlanStatistics.getRecentRunsStatistics();
List<RecentRunsPlanStatisticsEntry> newRecentRunsStatistics = new ArrayList<>(recentRunsStatistics);
Optional<Integer> accurateStatsIndex = HboUtils.getAccurateStatsIndex(
recentRunsPlanStatistics, curInputTableStatistics, -1, false,
PlanStatisticsMatchStrategy.FULL_MATCH);
if (accurateStatsIndex.isPresent()) {
newRecentRunsStatistics.remove(accurateStatsIndex.get().intValue());
}
// the newRecentRunsStatistics performs as FIFO way
newRecentRunsStatistics.add(new RecentRunsPlanStatisticsEntry(newPlanStatistics, curInputTableStatistics));
int maxEntryNumber = curInputTableStatistics.isEmpty() ? 1 : Config.hbo_plan_stats_cache_recent_runs_entry_num;
if (newRecentRunsStatistics.size() > maxEntryNumber) {
// entry 0 means the FIFO list's earliest entry.
newRecentRunsStatistics.remove(0);
}
return new RecentRunsPlanStatistics(newRecentRunsStatistics);
}
// Return if profile has been stored to storage
public void getExecutionProfileContent(StringBuilder builder) {
if (builder == null) {
builder = new StringBuilder();
}
if (profileHasBeenStored()) {
return;
}
// For broker load, if it has more than one execution profile, we will not generate merged profile.
RuntimeProfile mergedProfile = null;
if (this.executionProfiles.size() == 1) {
try {
mergedProfile = this.executionProfiles.get(0).getAggregatedFragmentsProfile(planNodeMap);
this.rowsProducedMap.putAll(mergedProfile.rowsProducedMap);
if (physicalPlan != null) {
updateActualRowCountOnPhysicalPlan(physicalPlan);
}
} catch (Throwable aggProfileException) {
LOG.warn("build merged simple profile {} failed", getId(), aggProfileException);
}
}
if (physicalPlan != null) {
builder.append("\nPhysical Plan \n");
StringBuilder physcialPlanBuilder = new StringBuilder();
physcialPlanBuilder.append(physicalPlan.treeString());
physcialPlanBuilder.append("\n");
for (PhysicalRelation relation : physicalRelations) {
if (relation.getStats() != null) {
physcialPlanBuilder.append(relation).append("\n")
.append(relation.getStats().printColumnStats());
}
}
builder.append(
physcialPlanBuilder.toString().replace("\n", "\n "));
}
if (this.executionProfiles.size() == 1) {
List<TPlanNodeRuntimeStatsItem> planNodeRuntimeStatsItems = null;
builder.append("\nMergedProfile \n");
if (mergedProfile != null) {
mergedProfile.prettyPrint(builder, " ");
planNodeRuntimeStatsItems = RuntimeProfile.toTPlanNodeRuntimeStatsItem(mergedProfile, null);
planNodeRuntimeStatsItems = RuntimeProfile.mergeTPlanNodeRuntimeStatsItem(planNodeRuntimeStatsItems);
// TODO: failed sql supporting rely on profile's extension.
boolean isEnableHboInfoCollection = StatisticsUtil.isEnableHboInfoCollection();
if (isEnableHboInfoCollection && isHealthyForHbo() && isSlowQueryForHbo()) {
// publish to hbo manager, currently only support healthy sql.
// NOTE: all statements which no need to collect profile have been excluded.
String queryId = DebugUtil.printId(this.executionProfiles.get(0).getQueryId());
publishHboPlanStatistics(queryId, planNodeRuntimeStatsItems);
}
builder.append("\nHBOStatics \n");
builder.append(DebugUtil.prettyPrintPlanNodeRuntimeStatsItems(planNodeRuntimeStatsItems));
} else {
builder.append("build merged simple profile failed");
}
}
try {
// For load task, they will have multiple execution_profiles.
for (ExecutionProfile executionProfile : executionProfiles) {
builder.append("\n");
executionProfile.getRoot().prettyPrint(builder, "");
}
} catch (Throwable aggProfileException) {
LOG.warn("build profile failed", aggProfileException);
builder.append("build profile failed");
}
}
public long getQueryFinishTimestamp() {
return this.queryFinishTimestamp;
}
// For UT
public void setSummaryProfile(SummaryProfile summaryProfile) {
this.summaryProfile = summaryProfile;
}
public void releaseMemory() {
this.executionProfiles.clear();
this.changedSessionVarCache = "";
}
public boolean shouldStoreToStorage() {
if (profileHasBeenStored()) {
return false;
}
if (!isQueryFinished) {
return false;
}
if (this.queryFinishTimestamp == Long.MAX_VALUE) {
LOG.warn("Logical error, query {} has finished, but queryFinishTimestamp is not set,", getId());
return false;
}
// below is the case where query has finished
boolean hasReportingProfile = false;
for (ExecutionProfile executionProfile : executionProfiles) {
if (!executionProfile.isCompleted()) {
hasReportingProfile = true;
break;
}
}
if (!hasReportingProfile) {
return true;
} else {
long currentTimeMillis = System.currentTimeMillis();
if (this.queryFinishTimestamp != Long.MAX_VALUE
&& (currentTimeMillis - this.queryFinishTimestamp)
> Config.profile_waiting_time_for_spill_seconds * 1000) {
LOG.warn("Profile {} should be stored to storage without waiting for incoming profile,"
+ " since it has been waiting for {} ms, current time {} query finished time: {}",
getId(), currentTimeMillis - this.queryFinishTimestamp, currentTimeMillis,
this.queryFinishTimestamp);
this.summaryProfile.setSystemMessage(
"This profile is not complete, since its collection does not finish in time."
+ " Maybe increase profile_waiting_time_for_spill_secs in fe.conf current val: "
+ String.valueOf(Config.profile_waiting_time_for_spill_seconds));
return true;
}
}
// query finished, wait a while for reporting profile
return false;
}
public String getProfileStoragePath() {
return this.profileStoragePath;
}
public boolean profileHasBeenStored() {
return !Strings.isNullOrEmpty(profileStoragePath);
}
// Profile IO threads races with Coordinator threads.
public void markQueryFinished() {
try {
if (this.profileHasBeenStored()) {
LOG.error("Logical error, profile {} has already been stored to storage", getId());
return;
}
this.isQueryFinished = true;
this.queryFinishTimestamp = System.currentTimeMillis();
} catch (Throwable t) {
LOG.warn("mark query finished failed", t);
throw t;
}
}
public void writeToStorage(String systemProfileStorageDir) {
if (Strings.isNullOrEmpty(getId())) {
LOG.warn("store profile failed, name is empty");
return;
}
if (!Strings.isNullOrEmpty(profileStoragePath)) {
LOG.error("Logical error, profile {} has already been stored to storage", getId());
return;
}
final String profileId = this.summaryProfile.getProfileId();
final String profileFilePath = systemProfileStorageDir + File.separator
+ String.valueOf(this.queryFinishTimestamp)
+ SEPERATOR + profileId + ".zip";
File profileFile = new File(profileFilePath);
if (profileFile.exists()) {
LOG.warn("profile directory {} already exists, remove it", profileFile.getAbsolutePath());
profileFile.delete();
}
FileOutputStream fileOutputStream = null;
ZipOutputStream zipOut = null;
try {
fileOutputStream = new FileOutputStream(profileFilePath);
zipOut = new ZipOutputStream(fileOutputStream);
// First create memory stream to hold all data
ByteArrayOutputStream memoryStream = new ByteArrayOutputStream();
DataOutputStream memoryDataStream = new DataOutputStream(memoryStream);
// Write summary profile and execution profile content to memory
this.summaryProfile.write(memoryDataStream);
StringBuilder builder = new StringBuilder();
getChangedSessionVars(builder);
getExecutionProfileContent(builder);
byte[] executionProfileBytes = builder.toString().getBytes(StandardCharsets.UTF_8);
memoryDataStream.writeInt(executionProfileBytes.length);
memoryDataStream.write(executionProfileBytes);
memoryDataStream.flush();
// Create zip entry with profileId based name
ZipEntry zipEntry = new ZipEntry(profileId + PROFILE_ENTRY_SUFFIX);
zipOut.putNextEntry(zipEntry);
zipOut.write(memoryStream.toByteArray());
zipOut.closeEntry();
this.profileSize = profileFile.length();
this.profileStoragePath = profileFilePath;
} catch (Exception e) {
LOG.error("write {} summary profile failed", getId(), e);
return;
} finally {
try {
if (zipOut != null) {
zipOut.close();
}
if (fileOutputStream != null) {
fileOutputStream.close();
}
} catch (Exception e) {
LOG.warn("close profile file {} failed", profileFilePath, e);
}
}
}
// remove profile from storage
public void deleteFromStorage() {
if (!profileHasBeenStored()) {
return;
}
String storagePath = getProfileStoragePath();
if (Strings.isNullOrEmpty(storagePath)) {
LOG.warn("remove profile failed, storage path is empty");
return;
}
File profileFile = new File(storagePath);
if (!profileFile.exists()) {
LOG.warn("Profile {} does not exist", profileFile.getAbsolutePath());
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Remove profile: {}", getProfileStoragePath());
}
if (!FileUtils.deleteQuietly(profileFile)) {
LOG.warn("remove profile {} failed", profileFile.getAbsolutePath());
}
}
public long getProfileSize() {
return this.profileSize;
}
public boolean shouldBeRemoveFromMemory() {
if (!this.isQueryFinished) {
return false;
}
if (this.profileHasBeenStored()) {
return false;
}
if (this.queryFinishTimestamp - this.summaryProfile.getQueryBeginTime() >= autoProfileDurationMs) {
return false;
}
return true;
}
public PhysicalPlan getPhysicalPlan() {
return physicalPlan;
}
public void setPhysicalPlan(PhysicalPlan physicalPlan) {
this.physicalPlan = physicalPlan;
}
private void updateActualRowCountOnPhysicalPlan(Plan plan) {
if (plan == null || rowsProducedMap.isEmpty()) {
return;
}
Long actualRowCount = rowsProducedMap.get(String.valueOf(((AbstractPlan) plan).getId()));
if (actualRowCount != null) {
((AbstractPlan) plan).updateActualRowCount(actualRowCount);
}
for (Plan child : plan.children()) {
updateActualRowCountOnPhysicalPlan(child);
}
}
public void setChangedSessionVar(String changedSessionVar) {
this.changedSessionVarCache = changedSessionVar;
}
private void getChangedSessionVars(StringBuilder builder) {
if (builder == null) {
builder = new StringBuilder();
}
if (profileHasBeenStored()) {
return;
}
builder.append("\nChanged Session Variables:\n");
builder.append(changedSessionVarCache);
builder.append("\n");
}
private boolean isHealthyForHbo() {
if (this.summaryProfile.getAsInfoStings() == null
|| this.summaryProfile.getAsInfoStings().isEmpty()) {
return false;
} else {
boolean isOk = this.summaryProfile.getAsInfoStings().get(SummaryProfile.TASK_STATE)
.equalsIgnoreCase("ok");
boolean isEof = this.summaryProfile.getAsInfoStings().get(SummaryProfile.TASK_STATE)
.equalsIgnoreCase("eof");
// TODO: zhiqiang will fix the following flag
boolean noErrorMessage = true; //this.summaryProfile.getExecutionSummary()
//.getInfoString(SummaryProfile.SYSTEM_MESSAGE).equalsIgnoreCase("N/A");
return (isOk || isEof) && noErrorMessage;
}
}
private boolean isSlowQueryForHbo() {
long durationMs = this.queryFinishTimestamp - summaryProfile.getQueryBeginTime();
return durationMs > Config.qe_slow_log_ms;
}
void getOnStorageProfile(StringBuilder builder) {
if (!profileHasBeenStored()) {
return;
}
LOG.info("Profile {} has been stored to storage, reading it from storage", getId());
FileInputStream fileInputStream = null;
ZipInputStream zipIn = null;
try {
fileInputStream = createPorfileFileInputStream(profileStoragePath);
if (fileInputStream == null) {
builder.append("Failed to read profile from " + profileStoragePath);
return;
}
// Directly create ZipInputStream from file input stream
zipIn = new ZipInputStream(fileInputStream);
ZipEntry entry = zipIn.getNextEntry();
String expectedEntryName = summaryProfile.getProfileId() + PROFILE_ENTRY_SUFFIX;
if (entry == null || !entry.getName().equals(expectedEntryName)) {
throw new IOException("Invalid zip file format - missing entry: " + expectedEntryName);
}
// Read zip entry content into memory
ByteArrayOutputStream entryContent = new ByteArrayOutputStream();
byte[] buffer = new byte[1024 * 50];
int readBytes;
while ((readBytes = zipIn.read(buffer)) != -1) {
entryContent.write(buffer, 0, readBytes);
}
// Parse profile data using memory stream
DataInputStream memoryDataInput = new DataInputStream(
new ByteArrayInputStream(entryContent.toByteArray()));
// Skip summary profile data
Text.readString(memoryDataInput);
// Read execution profile length and content
int executionProfileLength = memoryDataInput.readInt();
byte[] executionProfileBytes = new byte[executionProfileLength];
memoryDataInput.readFully(executionProfileBytes);
// Append execution profile content
builder.append(new String(executionProfileBytes, StandardCharsets.UTF_8));
} catch (Exception e) {
LOG.error("Failed to read profile from storage: {}", profileStoragePath, e);
builder.append("Failed to read profile from " + profileStoragePath);
} finally {
if (fileInputStream != null) {
try {
fileInputStream.close();
} catch (Exception e) {
LOG.warn("Close profile {} failed", profileStoragePath, e);
}
}
}
}
public String debugInfo() {
StringBuilder builder = new StringBuilder();
builder.append("ProfileId:").append(getId()).append("|");
builder.append("StoragePath:").append(profileStoragePath).append("|");
builder.append("StartTimeStamp:").append(summaryProfile.getQueryBeginTime()).append("|");
builder.append("IsFinished:").append(isQueryFinished).append("|");
builder.append("FinishTimestamp:").append(queryFinishTimestamp).append("|");
builder.append("AutoProfileDuration: ").append(autoProfileDurationMs).append("|");
builder.append("ExecutionProfileCnt: ").append(executionProfiles.size()).append("|");
builder.append("ProfileOnStorageSize:").append(profileSize);
return builder.toString();
}
public void setQueryFinishTimestamp(long l) {
this.queryFinishTimestamp = l;
}
public String getId() {
return summaryProfile.getProfileId();
}
public String toString() {
StringBuilder stringBuilder = new StringBuilder();
getExecutionProfileContent(stringBuilder);
return stringBuilder.toString();
}
}