JobWarmUpStats.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.cloud;
import org.apache.doris.monitor.unit.ByteSizeValue;
import com.google.gson.JsonObject;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
/**
* Per-Job aggregated warmup statistics.
* Aggregates requested (from source cluster) and finished/failed (from target cluster)
* across all matched tables, then computes gap = requested - finished.
*/
public class JobWarmUpStats {
private static final DateTimeFormatter TIME_FMT = DateTimeFormatter.ofPattern("HH:mm:ss");
// Aggregated requested
public long requestedSegmentNum5m;
public long requestedSegmentNum30m;
public long requestedSegmentNum1h;
public long requestedSegmentSize5m;
public long requestedSegmentSize30m;
public long requestedSegmentSize1h;
public long requestedIndexNum5m;
public long requestedIndexNum30m;
public long requestedIndexNum1h;
public long requestedIndexSize5m;
public long requestedIndexSize30m;
public long requestedIndexSize1h;
public long lastTriggerTs;
// Aggregated finished
public long finishSegmentNum5m;
public long finishSegmentNum30m;
public long finishSegmentNum1h;
public long finishSegmentSize5m;
public long finishSegmentSize30m;
public long finishSegmentSize1h;
public long finishIndexNum5m;
public long finishIndexNum30m;
public long finishIndexNum1h;
public long finishIndexSize5m;
public long finishIndexSize30m;
public long finishIndexSize1h;
// Aggregated failed
public long failSegmentNum5m;
public long failSegmentNum30m;
public long failSegmentNum1h;
public long failSegmentSize5m;
public long failSegmentSize30m;
public long failSegmentSize1h;
public long failIndexNum5m;
public long failIndexNum30m;
public long failIndexNum1h;
public long failIndexSize5m;
public long failIndexSize30m;
public long failIndexSize1h;
public long lastFinishTs;
// Aggregated from target BEs. FE takes the minimum positive target progress watermark so the
// slowest target BE decides how far the job has caught up to source-side triggers.
public long progressTriggerTs;
// gap = requested - finished
public long gapSegmentNum5m;
public long gapSegmentNum30m;
public long gapSegmentNum1h;
public long gapSegmentSize5m;
public long gapSegmentSize30m;
public long gapSegmentSize1h;
public long gapIndexNum5m;
public long gapIndexNum30m;
public long gapIndexNum1h;
public long gapIndexSize5m;
public long gapIndexSize30m;
public long gapIndexSize1h;
// Source last trigger timestamp minus target progress watermark. A caught-up target reports its
// latest finished trigger as progress, so this value naturally becomes 0.
public long triggerGapMs;
/** Accumulate requested stats from a table in the source cluster. */
public void mergeRequested(TableWarmUpWindowedStats t) {
requestedSegmentNum5m += t.requestedSegmentNum5m;
requestedSegmentNum30m += t.requestedSegmentNum30m;
requestedSegmentNum1h += t.requestedSegmentNum1h;
requestedSegmentSize5m += t.requestedSegmentSize5m;
requestedSegmentSize30m += t.requestedSegmentSize30m;
requestedSegmentSize1h += t.requestedSegmentSize1h;
requestedIndexNum5m += t.requestedIndexNum5m;
requestedIndexNum30m += t.requestedIndexNum30m;
requestedIndexNum1h += t.requestedIndexNum1h;
requestedIndexSize5m += t.requestedIndexSize5m;
requestedIndexSize30m += t.requestedIndexSize30m;
requestedIndexSize1h += t.requestedIndexSize1h;
lastTriggerTs = Math.max(lastTriggerTs, t.lastTriggerTs);
}
/** Accumulate finished/failed stats from a table in the target cluster. */
public void mergeFinished(TableWarmUpWindowedStats t) {
finishSegmentNum5m += t.finishSegmentNum5m;
finishSegmentNum30m += t.finishSegmentNum30m;
finishSegmentNum1h += t.finishSegmentNum1h;
finishSegmentSize5m += t.finishSegmentSize5m;
finishSegmentSize30m += t.finishSegmentSize30m;
finishSegmentSize1h += t.finishSegmentSize1h;
finishIndexNum5m += t.finishIndexNum5m;
finishIndexNum30m += t.finishIndexNum30m;
finishIndexNum1h += t.finishIndexNum1h;
finishIndexSize5m += t.finishIndexSize5m;
finishIndexSize30m += t.finishIndexSize30m;
finishIndexSize1h += t.finishIndexSize1h;
failSegmentNum5m += t.failSegmentNum5m;
failSegmentNum30m += t.failSegmentNum30m;
failSegmentNum1h += t.failSegmentNum1h;
failSegmentSize5m += t.failSegmentSize5m;
failSegmentSize30m += t.failSegmentSize30m;
failSegmentSize1h += t.failSegmentSize1h;
failIndexNum5m += t.failIndexNum5m;
failIndexNum30m += t.failIndexNum30m;
failIndexNum1h += t.failIndexNum1h;
failIndexSize5m += t.failIndexSize5m;
failIndexSize30m += t.failIndexSize30m;
failIndexSize1h += t.failIndexSize1h;
lastFinishTs = Math.max(lastFinishTs, t.lastFinishTs);
progressTriggerTs = minPositive(progressTriggerTs, t.progressTriggerTs);
}
/** Compute gap = requested - finished for all window/metric combinations. */
public void computeGap() {
gapSegmentNum5m = requestedSegmentNum5m - finishSegmentNum5m;
gapSegmentNum30m = requestedSegmentNum30m - finishSegmentNum30m;
gapSegmentNum1h = requestedSegmentNum1h - finishSegmentNum1h;
gapSegmentSize5m = requestedSegmentSize5m - finishSegmentSize5m;
gapSegmentSize30m = requestedSegmentSize30m - finishSegmentSize30m;
gapSegmentSize1h = requestedSegmentSize1h - finishSegmentSize1h;
gapIndexNum5m = requestedIndexNum5m - finishIndexNum5m;
gapIndexNum30m = requestedIndexNum30m - finishIndexNum30m;
gapIndexNum1h = requestedIndexNum1h - finishIndexNum1h;
gapIndexSize5m = requestedIndexSize5m - finishIndexSize5m;
gapIndexSize30m = requestedIndexSize30m - finishIndexSize30m;
gapIndexSize1h = requestedIndexSize1h - finishIndexSize1h;
triggerGapMs = lastTriggerTs > 0 && progressTriggerTs > 0
? Math.max(0, lastTriggerTs - progressTriggerTs) : 0;
}
/** Serialize compact 30m SyncStats summary for SHOW WARM UP JOB list output. */
public String toSummaryJsonString() {
JsonObject root = new JsonObject();
root.addProperty("window", "30m");
long srcSize = requestedSegmentSize30m + requestedIndexSize30m;
long dstSize = finishSegmentSize30m + finishIndexSize30m;
root.addProperty("src_size", humanReadableSize(srcSize));
root.addProperty("dst_size", humanReadableSize(dstSize));
root.addProperty("gap_size", humanReadableSize(srcSize - dstSize));
// Compact SHOW WARM UP JOB output still exposes the active incremental warm-up time lag.
root.addProperty("trigger_gap_ms", triggerGapMs);
return root.toString();
}
/** Serialize detailed SyncStats JSON for SHOW WARM UP JOB WHERE ID = ... output. */
public String toJsonString() {
JsonObject root = new JsonObject();
// seg_num
JsonObject segNum = new JsonObject();
segNum.addProperty("requested_5m", requestedSegmentNum5m);
segNum.addProperty("finish_5m", finishSegmentNum5m);
segNum.addProperty("gap_5m", gapSegmentNum5m);
segNum.addProperty("fail_5m", failSegmentNum5m);
segNum.addProperty("requested_30m", requestedSegmentNum30m);
segNum.addProperty("finish_30m", finishSegmentNum30m);
segNum.addProperty("gap_30m", gapSegmentNum30m);
segNum.addProperty("fail_30m", failSegmentNum30m);
segNum.addProperty("requested_1h", requestedSegmentNum1h);
segNum.addProperty("finish_1h", finishSegmentNum1h);
segNum.addProperty("gap_1h", gapSegmentNum1h);
segNum.addProperty("fail_1h", failSegmentNum1h);
root.add("seg_num", segNum);
// seg_size
JsonObject segSize = new JsonObject();
segSize.addProperty("requested_5m", humanReadableSize(requestedSegmentSize5m));
segSize.addProperty("finish_5m", humanReadableSize(finishSegmentSize5m));
segSize.addProperty("gap_5m", humanReadableSize(gapSegmentSize5m));
segSize.addProperty("fail_5m", humanReadableSize(failSegmentSize5m));
segSize.addProperty("requested_30m", humanReadableSize(requestedSegmentSize30m));
segSize.addProperty("finish_30m", humanReadableSize(finishSegmentSize30m));
segSize.addProperty("gap_30m", humanReadableSize(gapSegmentSize30m));
segSize.addProperty("fail_30m", humanReadableSize(failSegmentSize30m));
segSize.addProperty("requested_1h", humanReadableSize(requestedSegmentSize1h));
segSize.addProperty("finish_1h", humanReadableSize(finishSegmentSize1h));
segSize.addProperty("gap_1h", humanReadableSize(gapSegmentSize1h));
segSize.addProperty("fail_1h", humanReadableSize(failSegmentSize1h));
root.add("seg_size", segSize);
// idx_num
JsonObject idxNum = new JsonObject();
idxNum.addProperty("requested_5m", requestedIndexNum5m);
idxNum.addProperty("finish_5m", finishIndexNum5m);
idxNum.addProperty("gap_5m", gapIndexNum5m);
idxNum.addProperty("fail_5m", failIndexNum5m);
idxNum.addProperty("requested_30m", requestedIndexNum30m);
idxNum.addProperty("finish_30m", finishIndexNum30m);
idxNum.addProperty("gap_30m", gapIndexNum30m);
idxNum.addProperty("fail_30m", failIndexNum30m);
idxNum.addProperty("requested_1h", requestedIndexNum1h);
idxNum.addProperty("finish_1h", finishIndexNum1h);
idxNum.addProperty("gap_1h", gapIndexNum1h);
idxNum.addProperty("fail_1h", failIndexNum1h);
root.add("idx_num", idxNum);
// idx_size
JsonObject idxSize = new JsonObject();
idxSize.addProperty("requested_5m", humanReadableSize(requestedIndexSize5m));
idxSize.addProperty("finish_5m", humanReadableSize(finishIndexSize5m));
idxSize.addProperty("gap_5m", humanReadableSize(gapIndexSize5m));
idxSize.addProperty("fail_5m", humanReadableSize(failIndexSize5m));
idxSize.addProperty("requested_30m", humanReadableSize(requestedIndexSize30m));
idxSize.addProperty("finish_30m", humanReadableSize(finishIndexSize30m));
idxSize.addProperty("gap_30m", humanReadableSize(gapIndexSize30m));
idxSize.addProperty("fail_30m", humanReadableSize(failIndexSize30m));
idxSize.addProperty("requested_1h", humanReadableSize(requestedIndexSize1h));
idxSize.addProperty("finish_1h", humanReadableSize(finishIndexSize1h));
idxSize.addProperty("gap_1h", humanReadableSize(gapIndexSize1h));
idxSize.addProperty("fail_1h", humanReadableSize(failIndexSize1h));
root.add("idx_size", idxSize);
// timestamps
root.addProperty("last_trigger_ts", formatEpochMs(lastTriggerTs));
root.addProperty("last_finish_ts", formatEpochMs(lastFinishTs));
root.addProperty("progress_trigger_ts", formatEpochMs(progressTriggerTs));
root.addProperty("trigger_gap_ms", triggerGapMs);
return root.toString();
}
private static long minPositive(long current, long candidate) {
if (current <= 0) {
return Math.max(candidate, 0);
}
if (candidate <= 0) {
return current;
}
return Math.min(current, candidate);
}
private static String humanReadableSize(long bytes) {
if (bytes < 0) {
return "-" + new ByteSizeValue(-bytes).toString();
}
return new ByteSizeValue(bytes).toString();
}
private static String formatEpochMs(long epochMs) {
if (epochMs <= 0) {
return "";
}
try {
return LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMs), ZoneId.systemDefault())
.format(TIME_FMT);
} catch (Exception e) {
return "";
}
}
}