ParquetReader.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.parquet;
import org.apache.doris.analysis.BrokerDesc;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.RecordReader;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import java.io.File;
import java.io.IOException;
import java.util.List;
public class ParquetReader {
private ParquetFileReader fileReader;
private ParquetReader(InputFile inputFile) throws IOException {
ParquetReadOptions readOptions = ParquetReadOptions.builder().build();
this.fileReader = new ParquetFileReader(inputFile, readOptions);
}
public static ParquetReader create(String filePath, BrokerDesc brokerDesc) throws IOException {
BrokerInputFile inputFile = BrokerInputFile.create(filePath, brokerDesc);
return new ParquetReader(inputFile);
}
public static ParquetReader create(String filePath) throws IOException {
LocalInputFile inputFile = new LocalInputFile(new File(filePath));
return new ParquetReader(inputFile);
}
// For test only. ip port is broker ip port
public static ParquetReader create(String filePath, BrokerDesc brokerDesc, String ip, int port) throws IOException {
BrokerInputFile inputFile = BrokerInputFile.create(filePath, brokerDesc, ip, port);
return new ParquetReader(inputFile);
}
// Get file schema as a list of column name
public List<String> getSchema(boolean debug) {
List<String> colNames = Lists.newArrayList();
FileMetaData metaData = fileReader.getFileMetaData();
MessageType messageType = metaData.getSchema();
List<ColumnDescriptor> columnDescriptors = messageType.getColumns();
for (ColumnDescriptor column : columnDescriptors) {
if (debug) {
colNames.add(column.toString());
} else {
String colName = column.getPath()[0];
if (column.getMaxDefinitionLevel() > 1) {
// this is a nested column, print then definition level
colName += " (" + column.getMaxDefinitionLevel() + ")";
}
colNames.add(colName);
}
}
return colNames;
}
// get limit number of file content as 2-dimension array
public List<List<String>> getLines(int limit) throws IOException {
List<List<String>> lines = Lists.newArrayList();
FileMetaData metaData = fileReader.getFileMetaData();
MessageType schema = metaData.getSchema();
final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
int readLines = 0;
PageReadStore pages = null;
while (null != (pages = fileReader.readNextRowGroup()) && readLines < limit) {
final long rows = pages.getRowCount();
final RecordReader<Group> recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));
for (int i = 0; i < rows && readLines < limit; i++) {
List<String> line = Lists.newArrayList();
final Group g = recordReader.read();
parseGroup(g, line);
lines.add(line);
readLines++;
}
}
return lines;
}
private void parseGroup(Group g, List<String> line) {
int fieldCount = g.getType().getFieldCount();
for (int field = 0; field < fieldCount; field++) {
int valueCount = g.getFieldRepetitionCount(field);
Type fieldType = g.getType().getType(field);
if (valueCount == 1) {
line.add(g.getValueToString(field, 0));
} else if (valueCount > 1) {
List<String> array = Lists.newArrayList();
for (int index = 0; index < valueCount; index++) {
if (fieldType.isPrimitive()) {
array.add(g.getValueToString(field, index));
}
}
line.add("[" + Joiner.on(",").join(array) + "]");
} else {
line.add("");
}
}
}
public void close() throws IOException {
fileReader.close();
}
}