AcidUtil.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.hive;

import org.apache.doris.backup.Status;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue;
import org.apache.doris.fsv2.FileSystem;
import org.apache.doris.fsv2.remote.RemoteFile;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;
import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.common.ValidWriteIdList.RangeResponse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class AcidUtil {
    private static final Logger LOG = LogManager.getLogger(AcidUtil.class);

    public static final String VALID_TXNS_KEY = "hive.txn.valid.txns";
    public static final String VALID_WRITEIDS_KEY = "hive.txn.valid.writeids";

    private static final String HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX = "bucket_";
    private static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length";

    // An `_orc_acid_version` file is written to each base/delta/delete_delta dir written by a full acid write
    // or compaction.  This is the primary mechanism for versioning acid data.
    // Each individual ORC file written stores the current version as a user property in ORC footer. All data files
    // produced by Acid write should have this (starting with Hive 3.0), including those written by compactor.This is
    // more for sanity checking in case someone moved the files around or something like that.
    // In hive, methods for getting/reading the version from files were moved to test which is the only place they are
    // used (after HIVE-23506), in order to keep devs out of temptation, since they access the FileSystem which
    // is expensive.
    // After `HIVE-23825: Create a flag to turn off _orc_acid_version file creation`, introduce variables to
    // control whether to generate `_orc_acid_version` file. So don't need check this file exist.
    private static final String HIVE_ORC_ACID_VERSION_FILE = "_orc_acid_version";

    @Getter
    @ToString
    private static class ParsedBase {
        private final long writeId;
        private final long visibilityId;

        public ParsedBase(long writeId, long visibilityId) {
            this.writeId = writeId;
            this.visibilityId = visibilityId;
        }
    }

    private static ParsedBase parseBase(String name) {
        //format1 : base_writeId
        //format2 : base_writeId_visibilityId  detail: https://issues.apache.org/jira/browse/HIVE-20823
        name = name.substring("base_".length());
        int index = name.indexOf("_v");
        if (index == -1) {
            return new ParsedBase(Long.parseLong(name), 0);
        }
        return new ParsedBase(
                Long.parseLong(name.substring(0, index)),
                Long.parseLong(name.substring(index + 2)));
    }

    @Getter
    @ToString
    @EqualsAndHashCode
    private static class ParsedDelta implements Comparable<ParsedDelta> {
        private final long min;
        private final long max;
        private final String path;
        private final int statementId;
        private final boolean deleteDelta;
        private final long visibilityId;

        public ParsedDelta(long min, long max, @NonNull String path, int statementId,
                boolean deleteDelta, long visibilityId) {
            this.min = min;
            this.max = max;
            this.path = path;
            this.statementId = statementId;
            this.deleteDelta = deleteDelta;
            this.visibilityId = visibilityId;
        }

        /*
         * Smaller minWID orders first;
         * If minWID is the same, larger maxWID orders first;
         * Otherwise, sort by stmtID; files w/o stmtID orders first.
         *
         * Compactions (Major/Minor) merge deltas/bases but delete of old files
         * happens in a different process; thus it's possible to have bases/deltas with
         * overlapping writeId boundaries.  The sort order helps figure out the "best" set of files
         * to use to get data.
         * This sorts "wider" delta before "narrower" i.e. delta_5_20 sorts before delta_5_10 (and delta_11_20)
         */
        @Override
        public int compareTo(ParsedDelta other) {
            return min != other.min ? Long.compare(min, other.min) :
                    other.max != max ? Long.compare(other.max, max) :
                            statementId != other.statementId
                                    ? Integer.compare(statementId, other.statementId) :
                                    path.compareTo(other.path);
        }
    }


    private static boolean isValidMetaDataFile(FileSystem fileSystem, String baseDir)
            throws IOException {
        String fileLocation = baseDir + "_metadata_acid";
        Status status = fileSystem.exists(fileLocation);
        if (status != Status.OK) {
            return false;
        }
        //In order to save the cost of reading the file content, we only check whether the file exists.
        // File Contents: {"thisFileVersion":"0","dataFormat":"compacted"}
        //
        // Map<String, String> metadata;
        // try (var in = read(fileLocation)) {
        //     metadata = new ObjectMapper().readValue(in, new TypeReference<>() {});
        // }
        // catch (IOException e) {
        //     throw new IOException(String.format("Failed to read %s: %s", fileLocation, e.getMessage()), e);
        // }
        //
        // String version = metadata.get("thisFileVersion");
        // if (!"0".equals(version)) {
        //     throw new IOException("Unexpected ACID metadata version: " + version);
        // }
        //
        // String format = metadata.get("dataFormat");
        // if (!"compacted".equals(format)) {
        //     throw new IOException("Unexpected value for ACID dataFormat: " + format);
        // }
        return true;
    }

    private static boolean isValidBase(FileSystem fileSystem, String baseDir,
            ParsedBase base, ValidWriteIdList writeIdList) throws IOException {
        if (base.writeId == Long.MIN_VALUE) {
            //Ref: https://issues.apache.org/jira/browse/HIVE-13369
            //such base is created by 1st compaction in case of non-acid to acid table conversion.(you
            //will get dir: `base_-9223372036854775808`)
            //By definition there are no open txns with id < 1.
            //After this: https://issues.apache.org/jira/browse/HIVE-18192, txns(global transaction ID) => writeId.
            return true;
        }

        // hive 4 : just check "_v" suffix, before hive 4 : check `_metadata_acid` file in baseDir.
        if ((base.visibilityId > 0) || isValidMetaDataFile(fileSystem, baseDir)) {
            return writeIdList.isValidBase(base.writeId);
        }

        // if here, it's a result of IOW
        return writeIdList.isWriteIdValid(base.writeId);
    }

    private static ParsedDelta parseDelta(String fileName, String deltaPrefix, String path) {
        // format1: delta_min_max_statementId_visibilityId, delete_delta_min_max_statementId_visibilityId
        //     _visibilityId maybe not exists.
        //     detail: https://issues.apache.org/jira/browse/HIVE-20823
        // format2: delta_min_max_visibilityId, delete_delta_min_visibilityId
        //     when minor compaction runs, we collapse per statement delta files inside a single
        //     transaction so we no longer need a statementId in the file name

        // String fileName = fileName.substring(name.lastIndexOf('/') + 1);
        // checkArgument(fileName.startsWith(deltaPrefix), "File does not start with '%s': %s", deltaPrefix, path);

        long visibilityId = 0;
        int visibilityIdx = fileName.indexOf("_v");
        if (visibilityIdx != -1) {
            visibilityId = Long.parseLong(fileName.substring(visibilityIdx + 2));
            fileName = fileName.substring(0, visibilityIdx);
        }

        boolean deleteDelta = deltaPrefix.equals("delete_delta_");

        String rest = fileName.substring(deltaPrefix.length());
        int split = rest.indexOf('_');
        int split2 = rest.indexOf('_', split + 1);
        long min = Long.parseLong(rest.substring(0, split));

        if (split2 == -1) {
            long max = Long.parseLong(rest.substring(split + 1));
            return new ParsedDelta(min, max, path, -1, deleteDelta, visibilityId);
        }

        long max = Long.parseLong(rest.substring(split + 1, split2));
        int statementId = Integer.parseInt(rest.substring(split2 + 1));
        return new ParsedDelta(min, max, path, statementId, deleteDelta, visibilityId);
    }

    public interface FileFilter {
        public boolean accept(String fileName);
    }

    public static final class  FullAcidFileFilter implements FileFilter {
        @Override
        public boolean accept(String fileName) {
            return fileName.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)
                    && !fileName.endsWith(DELTA_SIDE_FILE_SUFFIX);
        }
    }

    public static final class InsertOnlyFileFilter implements FileFilter {
        @Override
        public boolean accept(String fileName) {
            return true;
        }
    }

    //Since the hive3 library cannot read the hive4 transaction table normally, and there are many problems
    // when using the Hive 4 library directly, this method is implemented.
    //Ref: hive/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java#getAcidState
    public static FileCacheValue getAcidState(FileSystem fileSystem, HivePartition partition,
            Map<String, String> txnValidIds, Map<String, String> catalogProps, boolean isFullAcid) throws Exception {

        // Ref: https://issues.apache.org/jira/browse/HIVE-18192
        // Readers should use the combination of ValidTxnList and ValidWriteIdList(Table) for snapshot isolation.
        // ValidReadTxnList implements ValidTxnList
        // ValidReaderWriteIdList implements ValidWriteIdList
        ValidTxnList validTxnList = null;
        if (txnValidIds.containsKey(VALID_TXNS_KEY)) {
            validTxnList = new ValidReadTxnList();
            validTxnList.readFromString(
                    txnValidIds.get(VALID_TXNS_KEY)
            );
        } else {
            throw new RuntimeException("Miss ValidTxnList");
        }

        ValidWriteIdList validWriteIdList = null;
        if (txnValidIds.containsKey(VALID_WRITEIDS_KEY)) {
            validWriteIdList = new ValidReaderWriteIdList();
            validWriteIdList.readFromString(
                    txnValidIds.get(VALID_WRITEIDS_KEY)
            );
        } else {
            throw new RuntimeException("Miss ValidWriteIdList");
        }

        String partitionPath = partition.getPath();
        //hdfs://xxxxx/user/hive/warehouse/username/data_id=200103

        List<RemoteFile> lsPartitionPath = new ArrayList<>();
        Status status = fileSystem.globList(partitionPath + "/*", lsPartitionPath);
        // List all files and folders, without recursion.
        // FileStatus[] lsPartitionPath = null;
        // fileSystem.listFileStatuses(partitionPath,lsPartitionPath);
        if (status != Status.OK) {
            throw new IOException(status.toString());
        }

        String oldestBase = null;
        long oldestBaseWriteId = Long.MAX_VALUE;
        String bestBasePath = null;
        long bestBaseWriteId = 0;
        boolean haveOriginalFiles = false;
        List<ParsedDelta> workingDeltas = new ArrayList<>();

        for (RemoteFile remotePath : lsPartitionPath) {
            if (remotePath.isDirectory()) {
                String dirName = remotePath.getName(); //dirName: base_xxx,delta_xxx,...
                String dirPath = partitionPath + "/" + dirName;

                if (dirName.startsWith("base_")) {
                    ParsedBase base = parseBase(dirName);
                    if (!validTxnList.isTxnValid(base.visibilityId)) {
                        //checks visibilityTxnId to see if it is committed in current snapshot.
                        continue;
                    }

                    long writeId = base.writeId;
                    if (oldestBaseWriteId > writeId) {
                        oldestBase = dirPath;
                        oldestBaseWriteId = writeId;
                    }

                    if (((bestBasePath == null) || (bestBaseWriteId < writeId))
                            && isValidBase(fileSystem, dirPath, base, validWriteIdList)) {
                        //IOW will generator a base_N/ directory: https://issues.apache.org/jira/browse/HIVE-14988
                        //So maybe need consider: https://issues.apache.org/jira/browse/HIVE-25777

                        bestBasePath =  dirPath;
                        bestBaseWriteId = writeId;
                    }
                } else if (dirName.startsWith("delta_") || dirName.startsWith("delete_delta_")) {
                    String deltaPrefix = dirName.startsWith("delta_") ? "delta_" : "delete_delta_";
                    ParsedDelta delta = parseDelta(dirName, deltaPrefix, dirPath);

                    if (!validTxnList.isTxnValid(delta.visibilityId)) {
                        continue;
                    }

                    // No need check (validWriteIdList.isWriteIdRangeAborted(min,max) != RangeResponse.ALL)
                    // It is a subset of (validWriteIdList.isWriteIdRangeValid(min, max) != RangeResponse.NONE)
                    if (validWriteIdList.isWriteIdRangeValid(delta.min, delta.max) != RangeResponse.NONE) {
                        workingDeltas.add(delta);
                    }
                } else {
                    //Sometimes hive will generate temporary directories(`.hive-staging_hive_xxx` ),
                    // which do not need to be read.
                    LOG.warn("Read Hive Acid Table ignore the contents of this folder:" + dirName);
                }
            } else {
                haveOriginalFiles = true;
            }
        }

        if (bestBasePath == null && haveOriginalFiles) {
            // ALTER TABLE nonAcidTbl SET TBLPROPERTIES ('transactional'='true');
            throw new UnsupportedOperationException("For no acid table convert to acid, please COMPACT 'major'.");
        }

        if ((oldestBase != null) && (bestBasePath == null)) {
            /*
             * If here, it means there was a base_x (> 1 perhaps) but none were suitable for given
             * {@link writeIdList}.  Note that 'original' files are logically a base_Long.MIN_VALUE and thus
             * cannot have any data for an open txn.  We could check {@link deltas} has files to cover
             * [1,n] w/o gaps but this would almost never happen...
             *
             * We only throw for base_x produced by Compactor since that base erases all history and
             * cannot be used for a client that has a snapshot in which something inside this base is
             * open.  (Nor can we ignore this base of course)  But base_x which is a result of IOW,
             * contains all history so we treat it just like delta wrt visibility.  Imagine, IOW which
             * aborts. It creates a base_x, which can and should just be ignored.*/
            long[] exceptions = validWriteIdList.getInvalidWriteIds();
            String minOpenWriteId = ((exceptions != null)
                    && (exceptions.length > 0)) ? String.valueOf(exceptions[0]) : "x";
            throw new IOException(
                    String.format("Not enough history available for ({},{}). Oldest available base: {}",
                            validWriteIdList.getHighWatermark(), minOpenWriteId, oldestBase));
        }

        workingDeltas.sort(null);

        List<ParsedDelta> deltas = new ArrayList<>();
        long current = bestBaseWriteId;
        int lastStatementId = -1;
        ParsedDelta prev = null;
        // find need read delta/delete_delta file.
        for (ParsedDelta next : workingDeltas) {
            if (next.max > current) {
                if (validWriteIdList.isWriteIdRangeValid(current + 1, next.max) != RangeResponse.NONE) {
                    deltas.add(next);
                    current = next.max;
                    lastStatementId = next.statementId;
                    prev = next;
                }
            } else if ((next.max == current) && (lastStatementId >= 0)) {
                //make sure to get all deltas within a single transaction;  multi-statement txn
                //generate multiple delta files with the same txnId range
                //of course, if maxWriteId has already been minor compacted,
                // all per statement deltas are obsolete

                deltas.add(next);
                prev = next;
            } else if ((prev != null)
                    && (next.max == prev.max)
                    && (next.min == prev.min)
                    && (next.statementId == prev.statementId)) {
                // The 'next' parsedDelta may have everything equal to the 'prev' parsedDelta, except
                // the path. This may happen when we have split update and we have two types of delta
                // directories- 'delta_x_y' and 'delete_delta_x_y' for the SAME txn range.

                // Also note that any delete_deltas in between a given delta_x_y range would be made
                // obsolete. For example, a delta_30_50 would make delete_delta_40_40 obsolete.
                // This is valid because minor compaction always compacts the normal deltas and the delete
                // deltas for the same range. That is, if we had 3 directories, delta_30_30,
                // delete_delta_40_40 and delta_50_50, then running minor compaction would produce
                // delta_30_50 and delete_delta_30_50.
                deltas.add(next);
                prev = next;
            }
        }

        FileCacheValue fileCacheValue = new FileCacheValue();
        List<DeleteDeltaInfo> deleteDeltas = new ArrayList<>();

        FileFilter fileFilter = isFullAcid ? new FullAcidFileFilter() : new InsertOnlyFileFilter();

        // delta directories
        for (ParsedDelta delta : deltas) {
            String location = delta.getPath();

            List<RemoteFile> remoteFiles = new ArrayList<>();
            status = fileSystem.listFiles(location, false, remoteFiles);
            if (status.ok()) {
                if (delta.isDeleteDelta()) {
                    List<String> deleteDeltaFileNames = remoteFiles.stream()
                            .map(RemoteFile::getName).filter(fileFilter::accept)
                            .collect(Collectors.toList());
                    deleteDeltas.add(new DeleteDeltaInfo(location, deleteDeltaFileNames));
                    continue;
                }
                remoteFiles.stream().filter(f -> fileFilter.accept(f.getName())).forEach(file -> {
                    LocationPath path = new LocationPath(file.getPath().toString(), catalogProps);
                    fileCacheValue.addFile(file, path);
                });
            } else {
                throw new RuntimeException(status.getErrMsg());
            }
        }

        // base
        if (bestBasePath != null) {
            List<RemoteFile> remoteFiles = new ArrayList<>();
            status = fileSystem.listFiles(bestBasePath, false, remoteFiles);
            if (status.ok()) {
                remoteFiles.stream().filter(f -> fileFilter.accept(f.getName()))
                        .forEach(file -> {
                            LocationPath path = new LocationPath(file.getPath().toString(), catalogProps);
                            fileCacheValue.addFile(file, path);
                        });
            } else {
                throw new RuntimeException(status.getErrMsg());
            }
        }

        if (isFullAcid) {
            fileCacheValue.setAcidInfo(new AcidInfo(partition.getPath(), deleteDeltas));
        } else if (!deleteDeltas.isEmpty()) {
            throw new RuntimeException("No Hive Full Acid Table have delete_delta_* Dir.");
        }
        return fileCacheValue;
    }
}