CloudPartition.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.cloud.catalog;

import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.Partition;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.proto.Cloud.MetaServiceCode;
import org.apache.doris.cloud.rpc.VersionHelper;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.rpc.RpcException;

import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

/**
 * Internal representation of partition-related metadata.
 */
public class CloudPartition extends Partition {
    private static final Logger LOG = LogManager.getLogger(CloudPartition.class);

    // not Serialized
    @SerializedName(value = "dbId")
    private long dbId;
    @SerializedName(value = "tableId")
    private long tableId;

    // This value is set when get the version from meta-service, 0 means version is not cached yet
    private long lastVersionCachedTimeMs = 0;

    private ReentrantLock lock = new ReentrantLock(true);

    public CloudPartition(long id, String name, MaterializedIndex baseIndex,
                          DistributionInfo distributionInfo, long dbId, long tableId) {
        super(id, name, baseIndex, distributionInfo);
        super.setVisibleVersion(-1); // cloud partition version is not resident in FE memory, -1 mean unknown
        super.nextVersion = -1;
        this.dbId = dbId;
        this.tableId = tableId;
    }

    public CloudPartition() {
        super();
    }

    public long getDbId() {
        return this.dbId;
    }

    public void setDbId(long dbId) {
        this.dbId = dbId;
    }

    public long getTableId() {
        return this.tableId;
    }

    public void setTableId(long tableId) {
        this.tableId = tableId;
    }

    protected void setVisibleVersion(long visibleVersion) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("setVisibleVersion use CloudPartition {}", super.getName());
        }
        return;
    }

    public void setCachedVisibleVersion(long version, Long versionUpdateTimeMs) {
        // we only care the version should increase monotonically and ignore the readers
        LOG.debug("setCachedVisibleVersion use CloudPartition {}, version: {}, old version: {}",
                super.getId(), version, super.getVisibleVersion());
        lock.lock();
        if (version > super.getVisibleVersion()) {
            super.setVisibleVersionAndTime(version, versionUpdateTimeMs);
        }
        lock.unlock();

        // versionUpdateTimeMs is the version mtime in MS, which is unlikely equal to lastVersionCachedTimeMs in FE
        lastVersionCachedTimeMs = System.currentTimeMillis();
    }

    @Override
    public long getCachedVisibleVersion() {
        return super.getVisibleVersion();
    }

    public boolean isCachedVersionExpired() {
        long cacheExpirationMs = SessionVariable.cloudPartitionVersionCacheTtlMs;
        if (cacheExpirationMs <= 0) { // always expired
            return true;
        }
        return System.currentTimeMillis() - lastVersionCachedTimeMs > cacheExpirationMs;
    }

    @Override
    public long getVisibleVersion() {
        if (Env.isCheckpointThread() || Config.enable_check_compatibility_mode) {
            return super.getVisibleVersion();
        }

        if (!isCachedVersionExpired()) {
            return getCachedVisibleVersion();
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("getVisibleVersion use CloudPartition {}", super.getName());
        }

        Cloud.GetVersionRequest request = Cloud.GetVersionRequest.newBuilder()
                .setDbId(this.dbId)
                .setTableId(this.tableId)
                .setPartitionId(super.getId())
                .setBatchMode(false)
                .build();

        try {
            Cloud.GetVersionResponse resp = VersionHelper.getVersionFromMeta(request);
            long version = -1;
            long mTime = -1;
            if (resp.getStatus().getCode() == MetaServiceCode.OK) {
                version = resp.getVersion();
                // Cache visible version, see hasData() for details.
                mTime = resp.getVersionUpdateTimeMsList().size() == 1 ? resp.getVersionUpdateTimeMs(0) : 0;
            } else {
                assert resp.getStatus().getCode() == MetaServiceCode.VERSION_NOT_FOUND;
                version = Partition.PARTITION_INIT_VERSION;
                mTime = System.currentTimeMillis();
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("get version from meta service, version: {}, partition: {}", version, super.getId());
            }
            setCachedVisibleVersion(version, mTime);
            return version;
        } catch (RpcException e) {
            throw new RuntimeException("get version from meta service failed");
        }
    }

    // Select the non-empty partitions and return the ids.
    public static List<Long> selectNonEmptyPartitionIds(List<CloudPartition> partitions) {
        List<Long> nonEmptyPartitionIds = partitions.stream()
                .filter(CloudPartition::hasDataCached)
                .map(CloudPartition::getId)
                .collect(Collectors.toList());
        if (nonEmptyPartitionIds.size() == partitions.size()) {
            return nonEmptyPartitionIds;
        }

        List<CloudPartition> unknowns = partitions.stream()
                .filter(p -> !p.hasDataCached())
                .collect(Collectors.toList());

        SummaryProfile profile = getSummaryProfile();
        if (profile != null) {
            profile.incGetPartitionVersionByHasDataCount();
        }

        try {
            List<Long> versions = CloudPartition.getSnapshotVisibleVersion(unknowns);

            int size = versions.size();
            for (int i = 0; i < size; i++) {
                if (versions.get(i) > Partition.PARTITION_INIT_VERSION) {
                    nonEmptyPartitionIds.add(unknowns.get(i).getId());
                }
            }

            return nonEmptyPartitionIds;
        } catch (RpcException e) {
            throw new RuntimeException("get version from meta service failed");
        }
    }

    // Get visible version from the specified partitions;
    //
    // Return the visible version in order of the specified partition ids
    public static List<Long> getSnapshotVisibleVersionFromMs(List<CloudPartition> partitions) throws RpcException {
        if (partitions.isEmpty()) {
            return new ArrayList<>();
        }

        List<Long> dbIds = new ArrayList<>();
        List<Long> tableIds = new ArrayList<>();
        List<Long> partitionIds = new ArrayList<>();
        List<Long> versionUpdateTimesMs = new ArrayList<>();
        for (CloudPartition partition : partitions) {
            dbIds.add(partition.getDbId());
            tableIds.add(partition.getTableId());
            partitionIds.add(partition.getId());
        }

        List<Long> versions = getSnapshotVisibleVersion(dbIds, tableIds, partitionIds, versionUpdateTimesMs);

        // Cache visible version, see hasData() for details.
        int size = versions.size();
        for (int i = 0; i < size; ++i) {
            Long version = versions.get(i);
            if (version > Partition.PARTITION_INIT_VERSION) {
                // For compatibility, the existing partitions may not have mtime
                long mTime = versions.size() == versionUpdateTimesMs.size() ? versionUpdateTimesMs.get(i) : 0;
                partitions.get(i).setCachedVisibleVersion(versions.get(i), mTime);
            } else { // No data has been written to this partition
                partitions.get(i).setCachedVisibleVersion(Partition.PARTITION_INIT_VERSION, System.currentTimeMillis());
            }
        }

        return versions;
    }

    // Get visible version from the specified partitions;
    //
    // Return the visible version in order of the specified partition ids, -1 means version NOT FOUND.
    public static List<Long> getSnapshotVisibleVersion(List<CloudPartition> partitions) throws RpcException {
        if (partitions.isEmpty()) {
            return new ArrayList<>();
        }

        if (SessionVariable.cloudPartitionVersionCacheTtlMs <= 0) { // No cached versions will be used
            return getSnapshotVisibleVersionFromMs(partitions);
        }

        // partitionId -> cachedVersion
        List<Pair<Long, Long>> allVersions = new ArrayList<>();
        List<CloudPartition> expiredPartitions = new ArrayList<>();
        for (CloudPartition partition : partitions) {
            long ver = partition.getCachedVisibleVersion();
            if (partition.isCachedVersionExpired()) {
                expiredPartitions.add(partition);
                ver = 0L; // 0 means to be get from meta-service
            }
            allVersions.add(Pair.of(partition.getId(), ver));
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("cloudPartitionVersionCacheTtlMs={}, numPartitions={}, numFilteredPartitions={}",
                    SessionVariable.cloudPartitionVersionCacheTtlMs,
                    partitions.size(), partitions.size() - expiredPartitions.size());
        }

        List<Long> versions = null;
        if (!expiredPartitions.isEmpty()) { // Not all partition versions are from cache
            versions = getSnapshotVisibleVersionFromMs(expiredPartitions); // Get the rest versions from meta-service
        }
        int verMsIdx = 0;
        for (Pair<Long, Long> v : allVersions) { // ATTN: keep the assigning order!!!
            if (v.second == 0L && versions != null) {
                v.second = versions.get(verMsIdx++);
            }
        }
        if (!expiredPartitions.isEmpty()) { // Not all partition versions are from cache
            assert verMsIdx == versions.size() : "size not match, idx=" + verMsIdx + " verSize=" + versions.size();
        }
        versions = allVersions.stream().map(i -> i.second).collect(Collectors.toList());

        return versions;
    }

    // Get visible versions for the specified partitions.
    //
    // Return the visible version in order of the specified partition ids
    private static List<Long> getSnapshotVisibleVersion(List<Long> dbIds, List<Long> tableIds, List<Long> partitionIds,
            List<Long> versionUpdateTimesMs)
            throws RpcException {
        assert dbIds.size() == partitionIds.size() :
                "partition ids size: " + partitionIds.size() + " should equals to db ids size: " + dbIds.size();
        assert tableIds.size() == partitionIds.size() :
                "partition ids size: " + partitionIds.size() + " should equals to tablet ids size: " + tableIds.size();

        Cloud.GetVersionRequest req = Cloud.GetVersionRequest.newBuilder()
                .setDbId(-1)
                .setTableId(-1)
                .setPartitionId(-1)
                .setBatchMode(true)
                .addAllDbIds(dbIds)
                .addAllTableIds(tableIds)
                .addAllPartitionIds(partitionIds)
                .build();

        if (LOG.isDebugEnabled()) {
            LOG.debug("getVisibleVersion use CloudPartition {}", partitionIds.toString());
        }
        Cloud.GetVersionResponse resp = VersionHelper.getVersionFromMeta(req);
        if (resp.getStatus().getCode() != MetaServiceCode.OK) {
            throw new RpcException("get visible version", "unexpected status " + resp.getStatus());
        }

        List<Long> versions = resp.getVersionsList();
        if (versions.size() != partitionIds.size()) {
            throw new RpcException("get visible version",
                    "wrong number of versions, required " + partitionIds.size() + ", but got " + versions.size());
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("get version from meta service, partitions: {}, versions: {}", partitionIds, versions);
        }

        if (versionUpdateTimesMs != null) {
            versionUpdateTimesMs.addAll(resp.getVersionUpdateTimeMsList());
        }

        ArrayList<Long> news = new ArrayList<>();
        for (Long v : versions) { // -1 means version NOT FOUND ==> no data has been written
            news.add(v == -1 ?  Partition.PARTITION_INIT_VERSION : v);
        }
        return news;
    }

    @Override
    public long getNextVersion() {
        // use meta service visibleVersion
        if (LOG.isDebugEnabled()) {
            LOG.debug("getNextVersion use CloudPartition {}", super.getName());
        }
        return -1;
    }

    @Override
    public void setNextVersion(long nextVersion) {
        // use meta service visibleVersion
        if (LOG.isDebugEnabled()) {
            LOG.debug("setNextVersion use CloudPartition {} Version {}", super.getName(), nextVersion);
        }
        return;
    }

    @Override
    public void updateVersionForRestore(long visibleVersion) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("updateVersionForRestore use CloudPartition {} version for restore: visible: {}",
                    super.getName(), visibleVersion);
        }
        return;
    }

    @Override
    public void updateVisibleVersion(long visibleVersion) {
        // use meta service visibleVersion
        if (LOG.isDebugEnabled()) {
            LOG.debug("updateVisibleVersion use CloudPartition {} version for restore: visible: {}",
                    super.getName(), visibleVersion);
        }

        return;
    }

    // Determine whether data this partition has, according to the cached visible version.
    public boolean hasDataCached() {
        // In order to determine whether a partition is empty, a get_version RPC is issued to
        // the meta service. The pruning process will be very slow when there are lots of empty
        // partitions. This option disables the empty partition prune optimization to speed SQL
        // analysis/plan phase.
        if (isEmptyPartitionPruneDisabled()) {
            return true;
        }

        // Every partition starts from version 1, version 1 has no data.
        // So as long as version is greater than 1, it can be determined that there is data here.
        return super.getVisibleVersion() > Partition.PARTITION_INIT_VERSION;
    }

    /**
     * CloudPartition always has data
     */
    @Override
    public boolean hasData() {
        // To avoid sending an RPC request, see the cached visible version here first.
        if (hasDataCached()) {
            return true;
        }

        SummaryProfile profile = getSummaryProfile();
        if (profile != null) {
            profile.incGetPartitionVersionByHasDataCount();
        }

        return getVisibleVersion() > Partition.PARTITION_INIT_VERSION;
    }

    private static boolean isEmptyPartitionPruneDisabled() {
        ConnectContext ctx = ConnectContext.get();
        if (ctx != null && (ctx.getSessionVariable().getDisableNereidsRules().get(RuleType.valueOf(
                "PRUNE_EMPTY_PARTITION").type()) || ctx.getSessionVariable().getDisableEmptyPartitionPrune())) {
            return true;
        }
        return false;
    }

    private static SummaryProfile getSummaryProfile() {
        ConnectContext ctx = ConnectContext.get();
        if (ctx != null) {
            StmtExecutor executor = ctx.getExecutor();
            if (executor != null) {
                return executor.getSummaryProfile();
            }
        }
        return null;
    }

    @Deprecated
    @Override
    public void readFields(DataInput in) throws IOException {
        super.readFields(in);
        this.dbId = in.readLong();
        this.tableId = in.readLong();
    }

    public boolean equals(Object obj) {
        if (!super.equals(obj)) {
            return false;
        }

        if (!(obj instanceof CloudPartition)) {
            return false;
        }
        CloudPartition cloudPartition = (CloudPartition) obj;
        return (dbId == cloudPartition.dbId) && (tableId == cloudPartition.tableId);
    }

    public String toString() {
        StringBuilder buffer = new StringBuilder();
        buffer.append(super.toString());
        buffer.append("dbId: ").append(this.dbId).append("; ");
        buffer.append("tableId: ").append(this.tableId).append("; ");
        return buffer.toString();
    }
}