/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.io.fs;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.lib.io.fs.AbstractReconciler;
import com.datatorrent.lib.io.fs.Synchronizer;
import com.google.common.collect.Queues;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.Queue;
import javax.validation.constraints.NotNull;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileStitcher<T extends Synchronizer.StitchedFileMetaData>
extends AbstractReconciler<T, T> {
    protected transient FileSystem appFS;
    protected transient FileSystem outputFS;
    @NotNull
    protected String filePath;
    protected transient String blocksDirectoryPath;
    private String blocksDirectory = "blocks";
    protected static final String PART_FILE_EXTENTION = "._COPYING_";
    protected Queue<T> successfulFiles = Queues.newLinkedBlockingQueue();
    protected Queue<T> skippedFiles = Queues.newLinkedBlockingQueue();
    protected Queue<T> failedFiles = Queues.newLinkedBlockingQueue();
    @OutputPortFieldAnnotation(optional=true)
    public final transient DefaultOutputPort<T> completedFilesMetaOutput = new DefaultOutputPort();
    private boolean writeChecksum = true;
    protected transient Path tempOutFilePath;
    protected static final Logger LOG = LoggerFactory.getLogger(FileStitcher.class);

    @Override
    public void setup(Context.OperatorContext context) {
        this.blocksDirectoryPath = (String)context.getValue(Context.DAGContext.APPLICATION_PATH) + "/" + this.blocksDirectory;
        try {
            this.outputFS = this.getOutputFSInstance();
            this.outputFS.setWriteChecksum(this.writeChecksum);
        }
        catch (IOException ex) {
            throw new RuntimeException("Exception in getting output file system.", ex);
        }
        try {
            this.appFS = this.getAppFSInstance();
        }
        catch (IOException ex) {
            try {
                this.outputFS.close();
            }
            catch (IOException e) {
                throw new RuntimeException("Exception in closing output file system.", e);
            }
            throw new RuntimeException("Exception in getting application file system.", ex);
        }
        super.setup(context);
    }

    @Override
    public void endWindow() {
        int size = this.doneTuples.size();
        for (int i = 0; i < size; ++i) {
            Synchronizer.StitchedFileMetaData stitchedFileMetaData = (Synchronizer.StitchedFileMetaData)this.doneTuples.peek();
            if (this.successfulFiles.contains(stitchedFileMetaData)) {
                this.successfulFiles.remove(stitchedFileMetaData);
                LOG.debug("File copy successful: {}", (Object)stitchedFileMetaData.getStitchedFileRelativePath());
            } else if (this.skippedFiles.contains(stitchedFileMetaData)) {
                this.skippedFiles.remove(stitchedFileMetaData);
                LOG.debug("File copy skipped: {}", (Object)stitchedFileMetaData.getStitchedFileRelativePath());
            } else if (this.failedFiles.contains(stitchedFileMetaData)) {
                this.failedFiles.remove(stitchedFileMetaData);
                LOG.debug("File copy failed: {}", (Object)stitchedFileMetaData.getStitchedFileRelativePath());
            } else {
                throw new RuntimeException("Tuple present in doneTuples but not in sucessful /skipped/ failed files: " + stitchedFileMetaData.getStitchedFileRelativePath());
            }
            this.completedFilesMetaOutput.emit((Object)stitchedFileMetaData);
            this.committedTuples.remove(stitchedFileMetaData);
            this.doneTuples.poll();
        }
    }

    protected FileSystem getAppFSInstance() throws IOException {
        return FileSystem.newInstance((URI)new Path(this.blocksDirectoryPath).toUri(), (Configuration)new Configuration());
    }

    protected FileSystem getOutputFSInstance() throws IOException {
        return FileSystem.newInstance((URI)new Path(this.filePath).toUri(), (Configuration)new Configuration());
    }

    @Override
    public void teardown() {
        super.teardown();
        boolean gotException = false;
        try {
            if (this.appFS != null) {
                this.appFS.close();
                this.appFS = null;
            }
        }
        catch (IOException e) {
            gotException = true;
        }
        try {
            if (this.outputFS != null) {
                this.outputFS.close();
                this.outputFS = null;
            }
        }
        catch (IOException e) {
            gotException = true;
        }
        if (gotException) {
            throw new RuntimeException("Exception while closing file systems.");
        }
    }

    @Override
    protected void processTuple(T stitchedFileMetaData) {
        LOG.debug("stitchedFileMetaData: {}", stitchedFileMetaData);
        this.enqueueForProcessing(stitchedFileMetaData);
    }

    @Override
    protected void processCommittedData(T stitchedFileMetaData) {
        try {
            this.mergeOutputFile(stitchedFileMetaData);
        }
        catch (IOException e) {
            throw new RuntimeException("Unable to merge file: " + stitchedFileMetaData.getStitchedFileRelativePath(), e);
        }
    }

    protected void mergeOutputFile(T stitchedFileMetaData) throws IOException {
        this.mergeBlocks(stitchedFileMetaData);
        this.successfulFiles.add(stitchedFileMetaData);
        LOG.debug("Completed processing file: {} ", (Object)stitchedFileMetaData.getStitchedFileRelativePath());
    }

    protected void mergeBlocks(T stitchedFileMetaData) throws IOException {
        final Path dst = new Path(this.filePath, stitchedFileMetaData.getStitchedFileRelativePath());
        PathFilter tempFileFilter = new PathFilter(){

            public boolean accept(Path path) {
                return path.getName().startsWith(dst.getName()) && path.getName().endsWith(FileStitcher.PART_FILE_EXTENTION);
            }
        };
        if (this.outputFS.exists(dst.getParent())) {
            FileStatus[] statuses;
            for (FileStatus status : statuses = this.outputFS.listStatus(dst.getParent(), tempFileFilter)) {
                String statusName = status.getPath().getName();
                LOG.debug("deleting vagrant file {}", (Object)statusName);
                this.outputFS.delete(status.getPath(), true);
            }
        }
        this.tempOutFilePath = new Path(this.filePath, stitchedFileMetaData.getStitchedFileRelativePath() + '.' + System.currentTimeMillis() + PART_FILE_EXTENTION);
        try {
            this.writeTempOutputFile(stitchedFileMetaData);
            this.moveToFinalFile(stitchedFileMetaData);
        }
        catch (BlockNotFoundException e) {
            LOG.warn("Block file {} not found. Assuming recovery mode for file {}. ", (Object)e.getBlockPath(), (Object)stitchedFileMetaData.getStitchedFileRelativePath());
            this.outputFS.delete(this.tempOutFilePath, false);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected OutputStream writeTempOutputFile(T stitchedFileMetaData) throws IOException, BlockNotFoundException {
        try (OutputStream outputStream = this.getOutputStream(this.tempOutFilePath);){
            for (Synchronizer.StitchBlock outputBlock : stitchedFileMetaData.getStitchBlocksList()) {
                outputBlock.writeTo(this.appFS, this.blocksDirectoryPath, outputStream);
            }
        }
        return outputStream;
    }

    protected OutputStream getOutputStream(Path partFilePath) throws IOException {
        return this.outputFS.create(partFilePath);
    }

    protected void moveToFinalFile(T stitchedFileMetaData) throws IOException {
        Path destination = new Path(this.filePath, stitchedFileMetaData.getStitchedFileRelativePath());
        this.moveToFinalFile(this.tempOutFilePath, destination);
    }

    protected void moveToFinalFile(Path tempOutFilePath, Path destination) throws IOException {
        Path src = Path.getPathWithoutSchemeAndAuthority((Path)tempOutFilePath);
        Path dst = Path.getPathWithoutSchemeAndAuthority((Path)destination);
        boolean moveSuccessful = false;
        if (!this.outputFS.exists(dst.getParent())) {
            this.outputFS.mkdirs(dst.getParent());
        }
        if (this.outputFS.exists(dst)) {
            this.outputFS.delete(dst, false);
        }
        if (!(moveSuccessful = this.outputFS.rename(src, dst))) {
            throw new RuntimeException("Unable to move file from " + src + " to " + dst);
        }
        LOG.debug("File {} moved successfully to destination folder.", (Object)dst);
    }

    public String getBlocksDirectory() {
        return this.blocksDirectory;
    }

    public void setBlocksDirectory(String blocksDirectory) {
        this.blocksDirectory = blocksDirectory;
    }

    public String getFilePath() {
        return this.filePath;
    }

    public void setFilePath(String filePath) {
        this.filePath = filePath;
    }

    public boolean isWriteChecksum() {
        return this.writeChecksum;
    }

    public void setWriteChecksum(boolean writeChecksum) {
        this.writeChecksum = writeChecksum;
    }

    public static class BlockNotFoundException
    extends Exception {
        private static final long serialVersionUID = -7409415466834194798L;
        Path blockPath;

        public BlockNotFoundException(Path blockPath) {
            this.blockPath = blockPath;
        }

        public Path getBlockPath() {
            return this.blockPath;
        }
    }
}

