RangePartitionInfo.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog;

import org.apache.doris.analysis.AllPartitionDesc;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.PartitionDesc;
import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.analysis.RangePartitionDesc;
import org.apache.doris.analysis.SinglePartitionDesc;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.RangeUtils;
import org.apache.doris.persist.gson.GsonUtils;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;

import java.io.DataInput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

public class RangePartitionInfo extends PartitionInfo {

    public RangePartitionInfo() {
        // for persist
        super();
    }

    public RangePartitionInfo(List<Column> partitionColumns) {
        super(PartitionType.RANGE);
        this.partitionColumns = partitionColumns;
        this.isMultiColumnPartition = partitionColumns.size() > 1;
    }

    public RangePartitionInfo(boolean isAutoCreatePartitions, ArrayList<Expr> exprs, List<Column> partitionColumns) {
        super(PartitionType.RANGE, partitionColumns);
        this.isAutoCreatePartitions = isAutoCreatePartitions;
        if (exprs != null) {
            this.partitionExprs.addAll(exprs);
        }
    }

    public Map<Long, PartitionItem> getPartitionItems(Collection<Long> partitionIds) {
        Map<Long, PartitionItem> columnRanges = Maps.newLinkedHashMapWithExpectedSize(partitionIds.size());
        for (Long partitionId : partitionIds) {
            PartitionItem partitionItem = idToItem.get(partitionId);
            if (partitionItem == null) {
                partitionItem = idToTempItem.get(partitionId);
            }
            if (partitionItem == null) {
                throw new IllegalStateException("Can not found partition item: " + partitionId);
            }
            columnRanges.put(partitionId, partitionItem);
        }
        return columnRanges;
    }

    @Override
    public PartitionItem createAndCheckPartitionItem(SinglePartitionDesc desc, boolean isTemp) throws DdlException {
        Range<PartitionKey> newRange = null;
        PartitionKeyDesc partitionKeyDesc = desc.getPartitionKeyDesc();
        // check range
        if (partitionKeyDesc.hasInValues()) {
            throw new DdlException("Range partition expected 'VALUES [LESS THAN or [(\"xxx\" ,...), (\"xxx\", ...))]'");
        }
        try {
            newRange = createAndCheckNewRange(partitionKeyDesc, isTemp);
        } catch (AnalysisException e) {
            throw new DdlException("Invalid range value format: " + e.getMessage());
        }

        Preconditions.checkNotNull(newRange);
        return new RangePartitionItem(newRange);
    }

    @Override
    public List<Map.Entry<Long, PartitionItem>> getPartitionItemEntryList(boolean isTemp, boolean isSorted) {
        Map<Long, PartitionItem> tmpMap = idToItem;
        if (isTemp) {
            tmpMap = idToTempItem;
        }
        List<Map.Entry<Long, PartitionItem>> itemEntryList = Lists.newArrayList(tmpMap.entrySet());
        if (isSorted) {
            Collections.sort(itemEntryList, RangeUtils.RANGE_MAP_ENTRY_COMPARATOR);
        }
        return itemEntryList;
    }

    public List<Map.Entry<Long, PartitionItem>> getAllPartitionItemEntryList(boolean isSorted) {
        Map<Long, PartitionItem> tmpMap = Maps.newHashMap();

        tmpMap.putAll(idToItem);
        tmpMap.putAll(idToTempItem);

        List<Map.Entry<Long, PartitionItem>> itemEntryList = Lists.newArrayList(tmpMap.entrySet());
        if (isSorted) {
            Collections.sort(itemEntryList, RangeUtils.RANGE_MAP_ENTRY_COMPARATOR);
        }
        return itemEntryList;
    }

    // create a new range and check it.
    private Range<PartitionKey> createAndCheckNewRange(PartitionKeyDesc partKeyDesc, boolean isTemp)
            throws AnalysisException, DdlException {
        boolean isFixedPartitionKeyValueType
                = partKeyDesc.getPartitionType() == PartitionKeyDesc.PartitionKeyValueType.FIXED;

        // generate partitionItemEntryList
        List<Map.Entry<Long, PartitionItem>> partitionItemEntryList = isFixedPartitionKeyValueType
                        ? getPartitionItemEntryList(isTemp, false) : getPartitionItemEntryList(isTemp, true);

        if (isFixedPartitionKeyValueType) {
            return createNewRangeForFixedPartitionValueType(partKeyDesc, partitionItemEntryList);
        } else {
            Range<PartitionKey> newRange = null;
            // create upper values for new range
            PartitionKey newRangeUpper = null;
            if (partKeyDesc.isMax()) {
                newRangeUpper = PartitionKey.createInfinityPartitionKey(partitionColumns, true);
            } else {
                newRangeUpper = PartitionKey.createPartitionKey(partKeyDesc.getUpperValues(), partitionColumns);
            }
            if (newRangeUpper.isMinValue()) {
                throw new DdlException("Partition's upper value should not be MIN VALUE: " + partKeyDesc.toSql());
            }

            Range<PartitionKey> lastRange = null;
            Range<PartitionKey> currentRange = null;
            for (Map.Entry<Long, PartitionItem> entry : partitionItemEntryList) {
                currentRange = entry.getValue().getItems();
                // check if equals to upper bound
                PartitionKey upperKey = currentRange.upperEndpoint();
                if (upperKey.compareTo(newRangeUpper) >= 0) {
                    newRange = createNewRangeForLessThanPartitionValueType(newRangeUpper, lastRange, currentRange);
                    break;
                } else {
                    lastRange = currentRange;
                }
            } // end for ranges

            if (newRange == null) /* the new range's upper value is larger than any existing ranges */ {
                newRange = createNewRangeForLessThanPartitionValueType(newRangeUpper, lastRange, currentRange);
            }
            return newRange;
        }
    }

    private Range<PartitionKey> createNewRangeForFixedPartitionValueType(PartitionKeyDesc partKeyDesc,
            List<Map.Entry<Long, PartitionItem>> partitionItemEntryList)
            throws AnalysisException, DdlException {
        PartitionKey lowKey = PartitionKey.createPartitionKey(partKeyDesc.getLowerValues(), partitionColumns);
        PartitionKey upperKey =  PartitionKey.createPartitionKey(partKeyDesc.getUpperValues(), partitionColumns);
        if (lowKey.compareTo(upperKey) >= 0) {
            throw new AnalysisException("The lower values must smaller than upper values");
        }
        Range<PartitionKey> newRange = Range.closedOpen(lowKey, upperKey);
        for (Map.Entry<Long, PartitionItem> partitionItemEntry : partitionItemEntryList) {
            RangeUtils.checkRangeIntersect(newRange, partitionItemEntry.getValue().getItems());
        }
        return newRange;
    }

    private Range<PartitionKey> createNewRangeForLessThanPartitionValueType(PartitionKey newRangeUpper,
            Range<PartitionKey> lastRange, Range<PartitionKey> currentRange) throws AnalysisException, DdlException {
        PartitionKey lowKey = lastRange == null
                ? PartitionKey.createInfinityPartitionKey(partitionColumns, false) : lastRange.upperEndpoint();

        // check: [left, right), error if left equal right
        if (lowKey.compareTo(newRangeUpper) >= 0) {
            throw new AnalysisException("The lower values must smaller than upper values");
        }
        Range<PartitionKey> newRange = Range.closedOpen(lowKey, newRangeUpper);

        if (currentRange != null) {
            // check if range intersected
            RangeUtils.checkRangeIntersect(newRange, currentRange);
        }
        return newRange;
    }

    public static void checkPartitionColumn(Column column) throws AnalysisException {
        PrimitiveType type = column.getDataType();
        if (!type.isFixedPointType() && !type.isDateType()) {
            throw new AnalysisException("Column[" + column.getName() + "] type[" + type
                    + "] cannot be a range partition key.");
        }
    }

    @Override
    public void checkPartitionItemListsMatch(List<PartitionItem> list1, List<PartitionItem> list2)
            throws DdlException {
        RangeUtils.checkPartitionItemListsMatch(list1, list2);
    }

    @Override
    public void checkPartitionItemListsConflict(List<PartitionItem> list1, List<PartitionItem> list2)
            throws DdlException {
        RangeUtils.checkRangeConflict(list1, list2);
    }

    @Deprecated
    public static PartitionInfo read(DataInput in) throws IOException {
        if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_136) {
            return GsonUtils.GSON.fromJson(Text.readString(in), RangePartitionInfo.class);
        }

        PartitionInfo partitionInfo = new RangePartitionInfo();
        partitionInfo.readFields(in);
        return partitionInfo;
    }

    @Deprecated
    public void readFields(DataInput in) throws IOException {
        super.readFields(in);

        int counter = in.readInt();
        for (int i = 0; i < counter; i++) {
            Column column = Column.read(in);
            partitionColumns.add(column);
        }

        this.isMultiColumnPartition = partitionColumns.size() > 1;

        counter = in.readInt();
        for (int i = 0; i < counter; i++) {
            long partitionId = in.readLong();
            Range<PartitionKey> range = RangeUtils.readRange(in);
            idToItem.put(partitionId, new RangePartitionItem(range));
        }

        counter = in.readInt();
        for (int i = 0; i < counter; i++) {
            long partitionId = in.readLong();
            Range<PartitionKey> range = RangeUtils.readRange(in);
            idToTempItem.put(partitionId, new RangePartitionItem(range));
        }
    }

    @Override
    public String toSql(OlapTable table, List<Long> partitionId) {
        StringBuilder sb = new StringBuilder();
        int idx = 0;
        if (enableAutomaticPartition()) {
            sb.append("AUTO PARTITION BY RANGE ");
            for (Expr e : partitionExprs) {
                sb.append("(");
                sb.append(e.toSql());
                sb.append(")");
            }
            sb.append("\n(");
        } else {
            sb.append("PARTITION BY RANGE(");
            for (Column column : partitionColumns) {
                if (idx != 0) {
                    sb.append(", ");
                }
                sb.append("`").append(column.getName()).append("`");
                idx++;
            }
            sb.append(")\n(");
        }

        // sort range
        List<Map.Entry<Long, PartitionItem>> entries = new ArrayList<>(this.idToItem.entrySet());
        Collections.sort(entries, RangeUtils.RANGE_MAP_ENTRY_COMPARATOR);

        idx = 0;
        for (Map.Entry<Long, PartitionItem> entry : entries) {
            Partition partition = table.getPartition(entry.getKey());
            String partitionName = partition.getName();
            Range<PartitionKey> range = entry.getValue().getItems();

            // print all partitions' range is fixed range, even if some of them is created by less than range
            sb.append("PARTITION ").append(partitionName).append(" VALUES [");
            sb.append(range.lowerEndpoint().toSql());
            sb.append(", ").append(range.upperEndpoint().toSql()).append(")");
            if (!"".equals(getStoragePolicy(entry.getKey()))) {
                sb.append("(\"storage_policy\" = \"").append(getStoragePolicy(entry.getKey())).append("\")");
            }

            if (partitionId != null) {
                partitionId.add(entry.getKey());
                break;
            }

            if (idx != entries.size() - 1) {
                sb.append(",\n");
            }
            idx++;
        }
        sb.append(")");
        return sb.toString();
    }

    @Override
    public PartitionDesc toPartitionDesc(OlapTable table) throws AnalysisException {
        List<String> partitionColumnNames = partitionColumns.stream().map(Column::getName).collect(Collectors.toList());
        List<AllPartitionDesc> allPartitionDescs = Lists.newArrayListWithCapacity(this.idToItem.size());

        // sort range
        List<Map.Entry<Long, PartitionItem>> entries = new ArrayList<>(this.idToItem.entrySet());
        Collections.sort(entries, RangeUtils.RANGE_MAP_ENTRY_COMPARATOR);
        for (Map.Entry<Long, PartitionItem> entry : entries) {
            Partition partition = table.getPartition(entry.getKey());
            String partitionName = partition.getName();

            Range<PartitionKey> range = entry.getValue().getItems();
            PartitionKeyDesc partitionKeyDesc = PartitionKeyDesc.createFixed(
                    PartitionInfo.toPartitionValue(range.lowerEndpoint()),
                    PartitionInfo.toPartitionValue(range.upperEndpoint()));

            Map<String, String> properties = Maps.newHashMap();
            Optional.ofNullable(this.idToStoragePolicy.get(entry.getKey())).ifPresent(p -> {
                if (!p.equals("")) {
                    properties.put("STORAGE POLICY", p);
                }
            });

            allPartitionDescs.add(new SinglePartitionDesc(false, partitionName, partitionKeyDesc, properties));
        }
        return new RangePartitionDesc(this.partitionExprs, partitionColumnNames, allPartitionDescs);
    }
}