ColocateGroupSchema.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Writable;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/*
* This class saves the schema of a colocation group
*/
public class ColocateGroupSchema implements Writable {
@SerializedName(value = "groupId")
private GroupId groupId;
@SerializedName(value = "distributionColTypes")
private List<Type> distributionColTypes = Lists.newArrayList();
@SerializedName(value = "bucketsNum")
private int bucketsNum;
@SerializedName(value = "replicaAlloc")
private ReplicaAllocation replicaAlloc;
private ColocateGroupSchema() {
}
public ColocateGroupSchema(GroupId groupId, List<Column> distributionCols,
int bucketsNum, ReplicaAllocation replicaAlloc) {
this.groupId = groupId;
this.distributionColTypes = distributionCols.stream().map(c -> c.getType()).collect(Collectors.toList());
this.bucketsNum = bucketsNum;
this.replicaAlloc = replicaAlloc;
}
public GroupId getGroupId() {
return groupId;
}
public int getBucketsNum() {
return bucketsNum;
}
public ReplicaAllocation getReplicaAlloc() {
return replicaAlloc;
}
public void setReplicaAlloc(ReplicaAllocation replicaAlloc) {
this.replicaAlloc = replicaAlloc;
}
public List<Type> getDistributionColTypes() {
return distributionColTypes;
}
public void checkColocateSchema(OlapTable tbl) throws DdlException {
checkDistribution(tbl.getDefaultDistributionInfo());
// We add a table with many partitions to the colocate group,
// we need to check whether all partitions comply with the colocate group specification
for (Partition partition : tbl.getAllPartitions()) {
checkDistribution(partition.getDistributionInfo());
}
checkReplicaAllocation(tbl.getPartitionInfo());
}
public void checkDistribution(DistributionInfo distributionInfo) throws DdlException {
if (distributionInfo instanceof HashDistributionInfo) {
HashDistributionInfo info = (HashDistributionInfo) distributionInfo;
// buckets num
if (info.getBucketNum() != bucketsNum) {
ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_TABLE_MUST_HAS_SAME_BUCKET_NUM,
info.getBucketNum(), bucketsNum);
}
// distribution col size
if (info.getDistributionColumns().size() != distributionColTypes.size()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_TABLE_MUST_HAS_SAME_DISTRIBUTION_COLUMN_SIZE,
info.getDistributionColumns().size(), distributionColTypes.size());
}
// distribution col type
for (int i = 0; i < distributionColTypes.size(); i++) {
Type targetColType = distributionColTypes.get(i);
// varchar and string has same distribution hash value if it's data is same
if (targetColType.isVarcharOrStringType() && info.getDistributionColumns().get(i).getType()
.isVarcharOrStringType()) {
continue;
}
if (!targetColType.equals(info.getDistributionColumns().get(i).getType())) {
String typeName = info.getDistributionColumns().get(i).getType().toString();
String colName = info.getDistributionColumns().get(i).getName();
String formattedString = colName + "(" + typeName + ")";
ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_TABLE_MUST_HAS_SAME_DISTRIBUTION_COLUMN_TYPE,
formattedString, targetColType);
}
}
}
}
private void checkReplicaAllocation(PartitionInfo partitionInfo) throws DdlException {
for (ReplicaAllocation alloc : partitionInfo.idToReplicaAllocation.values()) {
checkReplicaAllocation(alloc);
}
}
public void checkReplicaAllocation(ReplicaAllocation replicaAlloc) throws DdlException {
if (!replicaAlloc.equals(this.replicaAlloc)) {
ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_TABLE_MUST_HAS_SAME_REPLICATION_ALLOCATION,
replicaAlloc, this.replicaAlloc);
}
}
public void checkDynamicPartition(Map<String, String> properties,
DistributionInfo distributionInfo) throws DdlException {
if (properties.get(DynamicPartitionProperty.BUCKETS) != null) {
HashDistributionInfo info = (HashDistributionInfo) distributionInfo;
if (info.getBucketNum() != Integer.parseInt(properties.get(DynamicPartitionProperty.BUCKETS))) {
ErrorReport.reportDdlException(
ErrorCode.ERR_DYNAMIC_PARTITION_MUST_HAS_SAME_BUCKET_NUM_WITH_COLOCATE_TABLE, bucketsNum);
}
}
}
public static ColocateGroupSchema read(DataInput in) throws IOException {
ColocateGroupSchema schema = new ColocateGroupSchema();
schema.readFields(in);
return schema;
}
@Override
public void write(DataOutput out) throws IOException {
groupId.write(out);
out.writeInt(distributionColTypes.size());
for (Type type : distributionColTypes) {
ColumnType.write(out, type);
}
out.writeInt(bucketsNum);
this.replicaAlloc.write(out);
}
public void readFields(DataInput in) throws IOException {
groupId = GroupId.read(in);
int size = in.readInt();
for (int i = 0; i < size; i++) {
distributionColTypes.add(ColumnType.read(in));
}
bucketsNum = in.readInt();
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_105) {
short replicationNum = in.readShort();
this.replicaAlloc = new ReplicaAllocation(replicationNum);
} else {
this.replicaAlloc = ReplicaAllocation.read(in);
}
}
}