AvroJNIScanner.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.avro;

import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.TableSchema;
import org.apache.doris.thrift.TFileType;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.mapred.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.NullWritable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

public class AvroJNIScanner extends JniScanner {

    private static final Logger LOG = LogManager.getLogger(AvroJNIScanner.class);
    private final TFileType fileType;
    private final String uri;
    private final Map<String, String> requiredParams;
    private final Integer fetchSize;
    private final ClassLoader classLoader;
    private int[] requiredColumnIds;
    private String[] columnTypes;
    private String[] requiredFields;
    private Set<String> requiredFieldSet;
    private ColumnType[] requiredTypes;
    private AvroReader avroReader;
    private final boolean isGetTableSchema;
    private StructObjectInspector rowInspector;
    private Deserializer deserializer;
    private StructField[] structFields;
    private ObjectInspector[] fieldInspectors;
    private String serde;
    private AvroFileContext avroFileContext;
    private AvroWrapper<Pair<Integer, Long>> inputPair;
    private NullWritable ignore;
    private Long splitStartOffset;
    private Long splitSize;
    private Long splitFileSize;

    /**
     * Call by JNI for get table data or get table schema
     *
     * @param fetchSize The size of data fetched each time
     * @param requiredParams required params
     */
    public AvroJNIScanner(int fetchSize, Map<String, String> requiredParams) {
        this.classLoader = this.getClass().getClassLoader();
        this.requiredParams = requiredParams;
        this.fetchSize = fetchSize;
        this.isGetTableSchema = Boolean.parseBoolean(requiredParams.get(AvroProperties.IS_GET_TABLE_SCHEMA));
        this.fileType = TFileType.findByValue(Integer.parseInt(requiredParams.get(AvroProperties.FILE_TYPE)));
        this.uri = requiredParams.get(AvroProperties.URI);
        if (!isGetTableSchema) {
            this.columnTypes = requiredParams.get(AvroProperties.COLUMNS_TYPES)
                    .split(AvroProperties.COLUMNS_TYPE_DELIMITER);
            this.requiredFields = requiredParams.get(AvroProperties.REQUIRED_FIELDS)
                    .split(AvroProperties.FIELDS_DELIMITER);
            this.requiredFieldSet = new HashSet<>(Arrays.asList(requiredFields));
            this.requiredTypes = new ColumnType[requiredFields.length];
            this.serde = requiredParams.get(AvroProperties.HIVE_SERDE);
            this.structFields = new StructField[requiredFields.length];
            this.fieldInspectors = new ObjectInspector[requiredFields.length];
            this.inputPair = new AvroWrapper<>(null);
            this.ignore = NullWritable.get();
            this.splitStartOffset = Long.parseLong(requiredParams.get(AvroProperties.SPLIT_START_OFFSET));
            this.splitSize = Long.parseLong(requiredParams.get(AvroProperties.SPLIT_SIZE));
            this.splitFileSize = Long.parseLong(requiredParams.get(AvroProperties.SPLIT_FILE_SIZE));
        }
    }

    private void initFieldInspector() throws Exception {
        requiredColumnIds = new int[requiredFields.length];
        for (int i = 0; i < requiredFields.length; i++) {
            ColumnType columnType = ColumnType.parseType(requiredFields[i], columnTypes[i]);
            requiredTypes[i] = columnType;
            requiredColumnIds[i] = i;
        }

        Properties properties = createProperties();
        deserializer = getDeserializer(new Configuration(), properties, this.serde);
        rowInspector = (StructObjectInspector) deserializer.getObjectInspector();

        for (int i = 0; i < requiredFields.length; i++) {
            StructField field = rowInspector.getStructFieldRef(requiredFields[i]);
            structFields[i] = field;
            fieldInspectors[i] = field.getFieldObjectInspector();
        }
    }

    public Properties createProperties() {
        Properties properties = new Properties();
        properties.setProperty(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,
                Arrays.stream(this.requiredColumnIds).mapToObj(String::valueOf).collect(Collectors.joining(",")));
        properties.setProperty(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, String.join(",", requiredFields));
        properties.setProperty(AvroProperties.COLUMNS, String.join(",", requiredFields));
        properties.setProperty(AvroProperties.COLUMNS2TYPES, String.join(",", columnTypes));
        properties.setProperty(serdeConstants.SERIALIZATION_LIB, this.serde);
        return properties;
    }

    private Deserializer getDeserializer(Configuration configuration, Properties properties, String name)
            throws Exception {
        Class<? extends Deserializer> deserializerClass = Class.forName(name, true, JavaUtils.getClassLoader())
                .asSubclass(Deserializer.class);
        Deserializer deserializer = deserializerClass.getConstructor().newInstance();
        deserializer.initialize(configuration, properties);
        return deserializer;
    }

    @Override
    public void open() throws IOException {
        Thread.currentThread().setContextClassLoader(classLoader);
        switch (fileType) {
            case FILE_HDFS:
                this.avroReader = new HDFSFileReader(uri);
                break;
            case FILE_S3:
                String accessKey = requiredParams.get(AvroProperties.S3_ACCESS_KEY);
                String secretKey = requiredParams.get(AvroProperties.S3_SECRET_KEY);
                String endpoint = requiredParams.get(AvroProperties.S3_ENDPOINT);
                String region = requiredParams.get(AvroProperties.S3_REGION);
                this.avroReader = new S3FileReader(accessKey, secretKey, endpoint, region, uri);
                break;
            default:
                LOG.warn("Unsupported " + fileType.name() + " file type.");
                throw new IOException("Unsupported " + fileType.name() + " file type.");
        }
        if (!isGetTableSchema) {
            initDataReader();
        }
        this.avroReader.open(avroFileContext, isGetTableSchema);
    }

    private void initDataReader() {
        try {
            initAvroFileContext();
            initFieldInspector();
            initTableInfo(requiredTypes, requiredFields, fetchSize);
        } catch (Exception e) {
            LOG.warn("Failed to init avro scanner. ", e);
            throw new RuntimeException(e);
        }
    }

    private void initAvroFileContext() {
        avroFileContext = new AvroFileContext();
        avroFileContext.setRequiredFields(requiredFieldSet);
        avroFileContext.setSplitStartOffset(splitStartOffset);
        avroFileContext.setSplitSize(splitSize);
    }

    @Override
    public void close() throws IOException {
        if (Objects.nonNull(avroReader)) {
            avroReader.close();
        }
    }

    @Override
    protected int getNext() throws IOException {
        int numRows = 0;
        for (; numRows < getBatchSize(); numRows++) {
            if (!avroReader.hasNext(inputPair, ignore)) {
                break;
            }
            GenericRecord rowRecord = (GenericRecord) avroReader.getNext();
            for (int i = 0; i < requiredFields.length; i++) {
                Object fieldData = rowRecord.get(requiredFields[i]);
                if (fieldData == null) {
                    appendData(i, null);
                } else {
                    AvroColumnValue fieldValue = new AvroColumnValue(fieldInspectors[i], fieldData);
                    appendData(i, fieldValue);
                }
            }
        }
        return numRows;
    }

    @Override
    protected TableSchema parseTableSchema() throws UnsupportedOperationException {
        Schema schema = avroReader.getSchema();
        return AvroTypeUtils.parseTableSchema(schema);
    }

}