MaterializationNode.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.computegroup.ComputeGroup;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.system.Backend;
import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TMaterializationNode;
import org.apache.doris.thrift.TNodeInfo;
import org.apache.doris.thrift.TPaloNodesInfo;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
import java.util.ArrayList;
import java.util.List;
/**
* struct TMaterializationNode {
* // A Materialization materializes all tuple
* // 如果 child 的output 是[a, row_id1, b, row_id2],
* // row_id1 生成 e f
* // row_id2 生成 c d
* // 那么 tuple_id = [a, e, f, b, c, d]
* 1: optional Types.TTupleId tuple_id
*
* // Nodes in this cluster, used for second phase fetch, BE 节点信息
* 2: optional Descriptors.TPaloNodesInfo nodes_info;
*
* // Separate list of expr for fetch data
* // row_id 字段对应的slotRef 的列表: [row_id1, row_id2]
* 3: optional list<Exprs.TExpr>> fetch_expr_lists
* // Fetch schema
* //[[e, f],[c, d]],
* 4: optional list<list<Descriptors.TColumn>> column_descs_lists;
*
* // 和column_descs_lists 对应,slot_locs_lists=[[1, 2], [4, 5]],
* // 其中 1表示e在tuple_id 中的第1号slotDesc,f 对应tuple_id 中第2号 slotDesc
* 5: optional list<list<int>> slot_locs_lists;
*
* // Whether fetch row store
* // fe根据是否有行存 以及 延迟物化列和总列数的比例 延迟物化列数 判断是否使用行存优化
* 6: optional list<bool> fetch_row_store;
*
* 7:bool do_gc = false; // 最靠近root的 MaterializeNode设置为true,其它M 设置为false
*
* 类似于 slot_locs_lists, 不过其中的数字表示在 table 中的第几列
* 8. optional list<list<int>> table_idx_lists;
*
* }
*/
public class MaterializationNode extends PlanNode {
private TPaloNodesInfo nodesInfo;
private TupleDescriptor materializeTupleDescriptor;
private List<Expr> rowIds;
private List<List<Column>> lazyColumns;
private List<List<Integer>> locations;
private List<List<Integer>> idxs;
private List<Boolean> rowStoreFlags;
private boolean isTopMaterializeNode;
public MaterializationNode(PlanNodeId id, TupleDescriptor desc, PlanNode child) {
super(id, desc.getId().asList(), "MaterializeNode", StatisticalType.DEFAULT);
this.materializeTupleDescriptor = desc;
initNodeInfo();
this.children.add(child);
}
public void initNodeInfo() {
BeSelectionPolicy policy = new BeSelectionPolicy.Builder()
.needQueryAvailable()
.setRequireAliveBe()
.build();
nodesInfo = new TPaloNodesInfo();
ConnectContext context = ConnectContext.get();
if (context == null) {
context = new ConnectContext();
}
ComputeGroup computeGroup = context.getComputeGroupSafely();
for (Backend backend : policy.getCandidateBackends(computeGroup.getBackendList())) {
nodesInfo.addToNodes(new TNodeInfo(backend.getId(), 0, backend.getHost(), backend.getBrpcPort()));
}
}
@Override
public String getNodeExplainString(String detailPrefix, TExplainLevel detailLevel) {
StringBuilder output = new StringBuilder();
output.append(detailPrefix)
.append("materialize tuple id:")
.append(materializeTupleDescriptor.getId()).append("\n");
if (!projectList.isEmpty()) {
output.append(detailPrefix)
.append("output tuple id:").append(outputTupleDesc.getId()).append("\n");
output.append(detailPrefix)
.append("projectList:").append(projectList.toString()).append("\n");
}
output.append(detailPrefix).append("column_descs_lists").append(lazyColumns).append("\n");
output.append(detailPrefix).append("locations: ").append(locations).append("\n");
output.append(detailPrefix).append("table_idxs: ").append(idxs).append("\n");
output.append(detailPrefix).append("row_ids: ").append(rowIds).append("\n");
output.append(detailPrefix).append("isTopMaterializeNode: ").append(isTopMaterializeNode).append("\n");
return output.toString();
}
@Override
protected void toThrift(TPlanNode msg) {
msg.node_type = TPlanNodeType.MATERIALIZATION_NODE;
msg.materialization_node = new TMaterializationNode();
msg.materialization_node.setTupleId(tupleIds.get(0).asInt());
msg.materialization_node.setIntermediateTupleId(materializeTupleDescriptor.getId().asInt());
msg.materialization_node.setNodesInfo(nodesInfo);
msg.materialization_node.setFetchExprLists(Expr.treesToThrift(rowIds));
List<List<TColumn>> thriftCols = new ArrayList<>();
for (List<Column> cols : lazyColumns) {
List<TColumn> array = new ArrayList<>();
for (Column col : cols) {
array.add(col.toThrift());
}
thriftCols.add(array);
}
msg.materialization_node.setColumnDescsLists(thriftCols);
msg.materialization_node.setSlotLocsLists(locations);
msg.materialization_node.setColumnIdxsLists(idxs);
msg.materialization_node.setFetchRowStores(rowStoreFlags);
msg.materialization_node.setGcIdMap(isTopMaterializeNode);
}
public void setRowIds(List<Expr> rowIds) {
this.rowIds = rowIds;
}
public void setLazyColumns(List<List<Column>> lazyColumns) {
this.lazyColumns = lazyColumns;
}
public void setLocations(List<List<Integer>> locations) {
this.locations = locations;
}
public void setIdxs(List<List<Integer>> idxs) {
this.idxs = idxs;
}
public void setRowStoreFlags(List<Boolean> rowStoreFlags) {
this.rowStoreFlags = rowStoreFlags;
}
public void setTopMaterializeNode(boolean topMaterializeNode) {
isTopMaterializeNode = topMaterializeNode;
}
public TupleDescriptor getMaterializeTupleDescriptor() {
return materializeTupleDescriptor;
}
public List<Expr> getRowIds() {
return rowIds;
}
public List<List<Column>> getLazyColumns() {
return lazyColumns;
}
public List<List<Integer>> getLocations() {
return locations;
}
public List<Boolean> getRowStoreFlags() {
return rowStoreFlags;
}
public boolean isTopMaterializeNode() {
return isTopMaterializeNode;
}
@Override
public boolean isSerialOperator() {
return true;
}
}