/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.io.fs;

import com.datatorrent.api.Context;
import com.datatorrent.lib.io.fs.FileMerger;
import com.datatorrent.lib.io.fs.FileStitcher;
import com.datatorrent.lib.io.fs.Synchronizer;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
public class HDFSFileMerger
extends FileMerger {
    private transient boolean fastMergeActive;
    private transient long defaultBlockSize;
    private transient FastMergerDecisionMaker fastMergerDecisionMaker;
    private static final Logger LOG = LoggerFactory.getLogger(HDFSFileMerger.class);

    @Override
    public void setup(Context.OperatorContext context) {
        super.setup(context);
        this.fastMergeActive = this.outputFS.getConf().getBoolean("dfs.support.append", true) && this.appFS.getUri().equals(this.outputFS.getUri());
        LOG.debug("appFS.getUri():{}", (Object)this.appFS.getUri());
        LOG.debug("outputFS.getUri():{}", (Object)this.outputFS.getUri());
        this.defaultBlockSize = this.outputFS.getDefaultBlockSize(new Path(this.filePath));
        this.fastMergerDecisionMaker = new FastMergerDecisionMaker(this.blocksDirectoryPath, this.appFS, this.defaultBlockSize);
    }

    @Override
    protected void mergeBlocks(Synchronizer.OutputFileMetadata outputFileMetadata) throws IOException {
        try {
            LOG.debug("fastMergeActive: {}", (Object)this.fastMergeActive);
            if (this.fastMergeActive && this.fastMergerDecisionMaker.isFastMergePossible(outputFileMetadata) && outputFileMetadata.getNumberOfBlocks() > 0) {
                LOG.debug("Using fast merge on HDFS.");
                this.concatBlocks(outputFileMetadata);
                return;
            }
            LOG.debug("Falling back to slow merge on HDFS.");
            super.mergeBlocks(outputFileMetadata);
        }
        catch (FileStitcher.BlockNotFoundException e) {
            if (this.recover(outputFileMetadata)) {
                LOG.debug("Recovery attempt successful.");
                this.successfulFiles.add(outputFileMetadata);
            }
            this.failedFiles.add(outputFileMetadata);
        }
    }

    private void concatBlocks(Synchronizer.OutputFileMetadata outputFileMetadata) throws IOException {
        Path outputFilePath = new Path(this.filePath, outputFileMetadata.getRelativePath());
        int numBlocks = outputFileMetadata.getNumberOfBlocks();
        long[] blocksArray = outputFileMetadata.getBlockIds();
        Path firstBlock = new Path(this.blocksDirectoryPath, Long.toString(blocksArray[0]));
        if (numBlocks > 1) {
            Path[] blockFiles = new Path[numBlocks - 1];
            for (int index = 1; index < numBlocks; ++index) {
                blockFiles[index - 1] = new Path(this.blocksDirectoryPath, Long.toString(blocksArray[index]));
            }
            this.outputFS.concat(firstBlock, blockFiles);
        }
        this.moveToFinalFile(firstBlock, outputFilePath);
    }

    @VisibleForTesting
    protected boolean recover(Synchronizer.OutputFileMetadata outputFileMetadata) throws IOException {
        Path firstBlockPath = new Path(this.blocksDirectoryPath + "/" + outputFileMetadata.getBlockIds()[0]);
        Path outputFilePath = new Path(this.filePath, outputFileMetadata.getRelativePath());
        if (this.appFS.exists(firstBlockPath)) {
            FileStatus status = this.appFS.getFileStatus(firstBlockPath);
            if (status.getLen() == outputFileMetadata.getFileLength()) {
                this.moveToFinalFile(firstBlockPath, outputFilePath);
                return true;
            }
            LOG.error("Unable to recover in FileMerger for file: {}", (Object)outputFilePath);
            return false;
        }
        if (this.outputFS.exists(outputFilePath)) {
            LOG.debug("Output file already present at the destination, nothing to recover.");
            return true;
        }
        LOG.error("Unable to recover in FileMerger for file: {}", (Object)outputFilePath);
        return false;
    }

    public static class FastMergerDecisionMaker {
        private String blocksDir;
        private FileSystem appFS;
        private long defaultBlockSize;

        public FastMergerDecisionMaker(String blocksDir, FileSystem appFS, long defaultBlockSize) {
            this.blocksDir = blocksDir;
            this.appFS = appFS;
            this.defaultBlockSize = defaultBlockSize;
        }

        public boolean isFastMergePossible(Synchronizer.OutputFileMetadata outputFileMetadata) throws IOException, FileStitcher.BlockNotFoundException {
            short replicationFactor = 0;
            boolean sameReplicationFactor = true;
            boolean multipleOfBlockSize = true;
            int numBlocks = outputFileMetadata.getNumberOfBlocks();
            LOG.debug("fileMetadata.getNumberOfBlocks(): {}", (Object)outputFileMetadata.getNumberOfBlocks());
            long[] blocksArray = outputFileMetadata.getBlockIds();
            LOG.debug("fileMetadata.getBlockIds().len: {}", (Object)outputFileMetadata.getBlockIds().length);
            for (int index = 0; index < numBlocks && sameReplicationFactor && multipleOfBlockSize; ++index) {
                Path blockFilePath = new Path(this.blocksDir + "/" + blocksArray[index]);
                if (!this.appFS.exists(blockFilePath)) {
                    throw new FileStitcher.BlockNotFoundException(blockFilePath);
                }
                FileStatus status = this.appFS.getFileStatus(new Path(this.blocksDir + "/" + blocksArray[index]));
                if (index == 0) {
                    replicationFactor = status.getReplication();
                    LOG.debug("replicationFactor: {}", (Object)replicationFactor);
                } else {
                    sameReplicationFactor = replicationFactor == status.getReplication();
                    LOG.debug("sameReplicationFactor: {}", (Object)sameReplicationFactor);
                }
                if (index == numBlocks - 1) continue;
                multipleOfBlockSize = status.getLen() % this.defaultBlockSize == 0L;
                LOG.debug("multipleOfBlockSize: {}", (Object)multipleOfBlockSize);
            }
            return sameReplicationFactor && multipleOfBlockSize;
        }
    }
}

