OlapTableStreamWrapper.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog.stream;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.Util;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TPrimitiveType;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.Collection;
import java.util.List;
import java.util.Map;
// runtime-only class for unified query/insert experience, created when bind relation with OlapTableStream
public class OlapTableStreamWrapper extends OlapTable {
private OlapTableStream stream;
private OlapTable baseTable;
protected Map<Long, Pair<Long, Long>> outputUpdateMap = Maps.newHashMap();
public OlapTableStreamWrapper(OlapTableStream stream, OlapTable baseTable) {
super(stream.getId(), stream.getName(), stream.getFullSchema(), baseTable.getKeysType(),
baseTable.getPartitionInfo(), baseTable.getDefaultDistributionInfo());
this.stream = stream;
this.baseTable = baseTable;
this.getOrCreatTableProperty().setEnableUniqueKeyMergeOnWrite(baseTable.getEnableUniqueKeyMergeOnWrite());
}
@Override
public List<Column> getBaseSchema(boolean full) {
return baseTable.getBaseSchema(full);
}
@Override
public List<Column> getBaseSchema() {
return baseTable.getBaseSchema();
}
// for display
public String getIndexNameById(long indexId) {
// always returns base index name
return baseTable.getName();
}
// for olap table to thrift
@Override
public void getColumnDesc(long selectedIndexId, List<TColumn> columnsDesc, List<String> keyColumnNames,
List<TPrimitiveType> keyColumnTypes) {
baseTable.getColumnDesc(selectedIndexId, columnsDesc, keyColumnNames, keyColumnTypes);
}
@Override
public int getIndexSchemaVersion(long indexId) {
return baseTable.getIndexSchemaVersion(indexId);
}
// no need for pre agg on olap table stream
@Override
public boolean isDupKeysOrMergeOnWrite() {
return false;
}
@Override
public long getBaseIndexId() {
return baseTable.getBaseIndexId();
}
@Override
public MaterializedIndexMeta getIndexMetaByIndexId(long indexId) {
return baseTable.getIndexMetaByIndexId(indexId);
}
@Override
public List<Column> getSchemaByIndexId(Long indexId) {
// here is base table indexId, we can ignore it and use olap table stream schema
return getBaseSchema(Util.showHiddenColumns());
}
// override all partition methods, olap table stream inherit all partitions from base table
@Override
public Partition getPartition(String partitionName) {
return baseTable.getPartition(partitionName);
}
@Override
public Partition getPartition(long partitionId) {
return baseTable.getPartition(partitionId);
}
@Override
public Partition getPartition(String partitionName, boolean isTempPartition) {
return baseTable.getPartition(partitionName, isTempPartition);
}
@Override
public List<Long> getPartitionIds() {
return baseTable.getPartitionIds();
}
public Pair<Long, Long> getStreamUpdate(Long partitionId) {
if (!outputUpdateMap.containsKey(partitionId)) {
outputUpdateMap.put(partitionId, stream.getStreamUpdate(partitionId));
}
return stream.getStreamUpdate(partitionId);
}
public Long getStreamDbId() {
return stream.getDatabase().getId();
}
public Long getStreamId() {
return stream.getId();
}
@Override
public boolean hasDeleteSign() {
return getDeleteSignColumn() != null;
}
@Override
public boolean getEnableUniqueKeyMergeOnWrite() {
return baseTable.getEnableUniqueKeyMergeOnWrite();
}
@Override
public boolean isMorTable() {
return baseTable.isMorTable();
}
@Override
public Collection<Partition> getPartitions() {
return baseTable.getPartitions();
}
@Override
public List<Long> selectNonEmptyPartitionIds(Collection<Long> partitionIds) {
List<Long> nonEmptyIds = Lists.newArrayListWithCapacity(partitionIds.size());
for (Long partitionId : partitionIds) {
if (stream.hasData(getPartition(partitionId))) {
nonEmptyIds.add(partitionId);
}
}
return nonEmptyIds;
}
}