BrokerTable.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
import org.apache.doris.thrift.TBrokerTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.util.List;
import java.util.Map;
public class BrokerTable extends Table {
private static final Logger LOG = LogManager.getLogger(BrokerTable.class);
private static final String BROKER_NAME = "broker_name";
private static final String PATH = "path";
private static final String COLUMN_SEPARATOR = "column_separator";
private static final String LINE_DELIMITER = "line_delimiter";
private static final String FILE_FORMAT = "format";
@SerializedName("bn")
private String brokerName;
@SerializedName("ps")
private List<String> paths;
@SerializedName("cs")
private String columnSeparator;
@SerializedName("ld")
private String lineDelimiter;
private String fileFormat;
@SerializedName("bp")
private Map<String, String> brokerProperties;
public BrokerTable() {
super(TableType.BROKER);
}
public BrokerTable(long id, String name, List<Column> schema, Map<String, String> properties)
throws DdlException {
super(id, name, TableType.BROKER, schema);
validate(properties);
}
public void setBrokerProperties(Map<String, String> brokerProperties) {
this.brokerProperties = brokerProperties;
if (this.brokerProperties == null) {
this.brokerProperties = Maps.newHashMap();
}
}
public String getBrokerName() {
return brokerName;
}
public List<String> getPaths() {
return paths;
}
public List<String> getEncodedPaths() {
List<String> encodedPath = Lists.newArrayList();
// we encode ',' and '%' to %2c and %25
for (String path : paths) {
encodedPath.add(path.replaceAll(",", "%2c").replaceAll("%", "%25"));
}
return encodedPath;
}
public String getColumnSeparator() {
return columnSeparator;
}
public String getReadableColumnSeparator() {
return StringEscapeUtils.escapeJava(columnSeparator);
}
public String getLineDelimiter() {
return lineDelimiter;
}
public String getReadableLineDelimiter() {
return StringEscapeUtils.escapeJava(lineDelimiter);
}
public String getFileFormat() {
return fileFormat;
}
public Map<String, String> getBrokerProperties() {
return brokerProperties;
}
// Check whether insert into table
public boolean isWritable() {
for (String path : paths) {
if (path.endsWith("/*")) {
return true;
}
}
return false;
}
public String getWritablePath() {
for (String path : paths) {
if (path.endsWith("/*")) {
return path.substring(0, path.lastIndexOf("*"));
}
}
return null;
}
private void validate(Map<String, String> properties) throws DdlException {
if (properties == null) {
throw new DdlException("Please set properties of broker table, "
+ "they are: broker_name, path, column_delimiter, line_delimiter and format.");
}
Map<String, String> copiedProps = Maps.newHashMap(properties);
brokerName = copiedProps.get(BROKER_NAME);
if (Strings.isNullOrEmpty(brokerName)) {
throw new DdlException("Broker name is null. "
+ "Please add properties('broker_name'='xxx') when create table");
}
copiedProps.remove(BROKER_NAME);
// TODO(zc)
// check if broker name exist
String pathsStr = copiedProps.get(PATH);
if (Strings.isNullOrEmpty(pathsStr)) {
throw new DdlException("Path is null. "
+ "Please add properties('path'='xxx') when create table");
}
copiedProps.remove(PATH);
String[] origPaths = pathsStr.split(",");
paths = Lists.newArrayList();
// user will write %2c and %25 instead of ',' and '%'
// we need to decode these escape character to what they really are.
try {
for (String origPath : origPaths) {
origPath = origPath.trim();
origPath = URLDecoder.decode(origPath, "UTF-8");
paths.add(origPath);
}
} catch (UnsupportedEncodingException e) {
throw new DdlException("Encounter path encoding exception: " + e.getMessage());
}
columnSeparator = copiedProps.get(COLUMN_SEPARATOR);
if (Strings.isNullOrEmpty(columnSeparator)) {
// use default separator
columnSeparator = "\t";
}
copiedProps.remove(COLUMN_SEPARATOR);
lineDelimiter = copiedProps.get(LINE_DELIMITER);
if (Strings.isNullOrEmpty(lineDelimiter)) {
// use default delimiter
lineDelimiter = "\n";
}
copiedProps.remove(LINE_DELIMITER);
fileFormat = copiedProps.get(FILE_FORMAT);
if (fileFormat != null) {
fileFormat = fileFormat.toLowerCase();
switch (fileFormat) {
case "csv":
case "parquet":
break;
default:
throw new DdlException("Invalid file type: " + copiedProps + ".Only support csv and parquet.");
}
}
copiedProps.remove(FILE_FORMAT);
if (!copiedProps.isEmpty()) {
throw new DdlException("Unknown table properties: " + copiedProps);
}
}
public TTableDescriptor toThrift() {
TBrokerTable tBrokerTable = new TBrokerTable();
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.BROKER_TABLE,
fullSchema.size(), 0, getName(), "");
tTableDescriptor.setBrokerTable(tBrokerTable);
return tTableDescriptor;
}
@Deprecated
public void readFields(DataInput in) throws IOException {
super.readFields(in);
brokerName = Text.readString(in);
int size = in.readInt();
paths = Lists.newArrayList();
for (int i = 0; i < size; i++) {
paths.add(Text.readString(in));
}
columnSeparator = Text.readString(in);
lineDelimiter = Text.readString(in);
brokerProperties = Maps.newHashMap();
size = in.readInt();
for (int i = 0; i < size; i++) {
String key = Text.readString(in);
String val = Text.readString(in);
brokerProperties.put(key, val);
}
}
}