/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram;

import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.util.FSUtil;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.ObjectStreamClass;
import java.io.OutputStream;
import java.net.URI;
import java.util.EnumSet;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FSRecoveryHandler
implements StreamingContainerManager.RecoveryHandler {
    private static final Logger LOG = LoggerFactory.getLogger(FSRecoveryHandler.class);
    private final Path basedir;
    private final Path logPath;
    private final Path logBackupPath;
    private final FileSystem fs;
    private final Path snapshotPath;
    private final Path snapshotBackupPath;
    private final Path heartbeatPath;
    public static final String FILE_LOG = "log";
    public static final String FILE_LOG_BACKUP = "log0";
    public static final String FILE_SNAPSHOT = "snapshot";
    public static final String FILE_SNAPSHOT_BACKUP = "snapshot0";
    private static final String DIRECTORY_RECOVERY = "recovery";
    private static final String FILE_HEARTBEATURI = "heartbeatUri";

    public FSRecoveryHandler(String appDir, Configuration conf) throws IOException {
        this.basedir = new Path(appDir, DIRECTORY_RECOVERY);
        this.fs = FileSystem.newInstance((URI)this.basedir.toUri(), (Configuration)conf);
        this.logPath = new Path(this.basedir, FILE_LOG);
        this.logBackupPath = new Path(this.basedir, FILE_LOG_BACKUP);
        this.snapshotPath = new Path(this.basedir, FILE_SNAPSHOT);
        this.snapshotBackupPath = new Path(this.basedir, FILE_SNAPSHOT_BACKUP);
        this.heartbeatPath = new Path(this.basedir, FILE_HEARTBEATURI);
    }

    public String getDir() {
        return this.basedir.toUri().toString();
    }

    @Override
    public DataOutputStream rotateLog() throws IOException {
        FSDataOutputStream fsOutputStream;
        if (this.fs.exists(this.logBackupPath)) {
            throw new AssertionError((Object)("Snapshot state prior to log rotation: " + this.logBackupPath));
        }
        if (this.fs.exists(this.logPath)) {
            LOG.debug("Creating log backup {}", (Object)this.logBackupPath);
            if (!this.fs.rename(this.logPath, this.logBackupPath)) {
                throw new IOException("Failed to rotate log: " + this.logPath);
            }
        }
        LOG.info("Creating {}", (Object)this.logPath);
        String scheme = null;
        try {
            scheme = this.fs.getScheme();
        }
        catch (UnsupportedOperationException e) {
            LOG.warn("{} doesn't implement getScheme() method", (Object)this.fs.getClass().getName());
        }
        if ("file".equals(scheme)) {
            FSUtil.mkdirs(this.fs, this.logPath.getParent());
            fsOutputStream = new FSDataOutputStream((OutputStream)new FileOutputStream(Path.getPathWithoutSchemeAndAuthority((Path)this.logPath).toString()), null);
        } else {
            fsOutputStream = this.fs.create(this.logPath);
        }
        DataOutputStream osWrapper = new DataOutputStream((OutputStream)fsOutputStream){

            @Override
            public void flush() throws IOException {
                super.flush();
                fsOutputStream.hflush();
            }

            @Override
            public void close() throws IOException {
                LOG.debug("Closing {}", (Object)FSRecoveryHandler.this.logPath);
                super.close();
            }
        };
        return osWrapper;
    }

    @Override
    public DataInputStream getLog() throws IOException {
        if (this.fs.exists(this.logBackupPath)) {
            throw new AssertionError((Object)("Restore state prior to reading log: " + this.logBackupPath));
        }
        if (this.fs.exists(this.logPath)) {
            LOG.debug("Opening existing log ({})", (Object)this.logPath);
            return this.fs.open(this.logPath);
        }
        LOG.debug("No existing log ({})", (Object)this.logPath);
        return new DataInputStream(new ByteArrayInputStream(new byte[0]));
    }

    @Override
    public void save(Object state) throws IOException {
        if (this.fs.exists(this.snapshotBackupPath)) {
            throw new IllegalStateException("Found previous backup " + this.snapshotBackupPath);
        }
        if (this.fs.exists(this.snapshotPath)) {
            LOG.debug("Backup {} to {}", (Object)this.snapshotPath, (Object)this.snapshotBackupPath);
            this.fs.rename(this.snapshotPath, this.snapshotBackupPath);
        }
        LOG.debug("Writing checkpoint to {}", (Object)this.snapshotPath);
        try (FSDataOutputStream fsOutputStream = this.fs.create(this.snapshotPath);
             ObjectOutputStream oos = new ObjectOutputStream((OutputStream)fsOutputStream);){
            oos.writeObject(state);
        }
        if (this.fs.exists(this.snapshotBackupPath) && !this.fs.delete(this.snapshotBackupPath, false)) {
            throw new IOException("Failed to remove " + this.snapshotBackupPath);
        }
        Path logBackup = new Path(this.basedir + "/" + FILE_LOG_BACKUP);
        if (this.fs.exists(logBackup) && !this.fs.delete(logBackup, false)) {
            throw new IOException("Failed to remove " + logBackup);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public Object restore() throws IOException {
        Object object;
        FileContext fc = FileContext.getFileContext((URI)this.fs.getUri());
        if (fc.util().exists(this.snapshotBackupPath)) {
            LOG.warn("Incomplete checkpoint, reverting to {}", (Object)this.snapshotBackupPath);
            fc.rename(this.snapshotBackupPath, this.snapshotPath, new Options.Rename[]{Options.Rename.OVERWRITE});
            Path tmpLogPath = new Path(this.basedir, "log.combined");
            try (FSDataOutputStream fsOut = fc.create(tmpLogPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), new Options.CreateOpts[0]);){
                FSDataInputStream fsIn = fc.open(this.logBackupPath);
                object = null;
                try {
                    IOUtils.copy((InputStream)fsIn, (OutputStream)fsOut);
                }
                catch (Throwable x2) {
                    object = x2;
                    throw x2;
                }
                finally {
                    if (fsIn != null) {
                        if (object != null) {
                            try {
                                fsIn.close();
                            }
                            catch (Throwable x2) {
                                ((Throwable)object).addSuppressed(x2);
                            }
                        } else {
                            fsIn.close();
                        }
                    }
                }
                fsIn = fc.open(this.logPath);
                object = null;
                try {
                    IOUtils.copy((InputStream)fsIn, (OutputStream)fsOut);
                }
                catch (Throwable x2) {
                    object = x2;
                    throw x2;
                }
                finally {
                    if (fsIn != null) {
                        if (object != null) {
                            try {
                                fsIn.close();
                            }
                            catch (Throwable x2) {
                                ((Throwable)object).addSuppressed(x2);
                            }
                        } else {
                            fsIn.close();
                        }
                    }
                }
            }
            fc.rename(tmpLogPath, this.logPath, new Options.Rename[]{Options.Rename.OVERWRITE});
            fc.delete(this.logBackupPath, false);
        } else if (fc.util().exists(this.logBackupPath)) {
            LOG.warn("Found {}, did checkpointing fail?", (Object)this.logBackupPath);
            fc.rename(this.logBackupPath, this.logPath, new Options.Rename[]{Options.Rename.OVERWRITE});
        }
        if (!fc.util().exists(this.snapshotPath)) {
            LOG.debug("No existing checkpoint.");
            return null;
        }
        LOG.debug("Reading checkpoint {}", (Object)this.snapshotPath);
        FSDataInputStream is = fc.open(this.snapshotPath);
        final ClassLoader loader = Thread.currentThread().getContextClassLoader();
        try (ObjectInputStream ois = new ObjectInputStream((InputStream)is){

            @Override
            protected Class<?> resolveClass(ObjectStreamClass objectStreamClass) throws IOException, ClassNotFoundException {
                return Class.forName(objectStreamClass.getName(), true, loader);
            }
        };){
            object = ois.readObject();
            return object;
        }
        catch (ClassNotFoundException cnfe) {
            throw new IOException("Failed to read checkpointed state", cnfe);
        }
    }

    public void writeConnectUri(String uri) throws IOException {
        try (FSDataOutputStream out = this.fs.create(this.heartbeatPath, true);){
            out.write(uri.getBytes());
        }
        LOG.debug("Connect address: {} written to {} ", (Object)uri, (Object)this.heartbeatPath);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public String readConnectUri() throws IOException {
        byte[] bytes;
        try (FSDataInputStream in = this.fs.open(this.heartbeatPath);){
            bytes = IOUtils.toByteArray((InputStream)in);
        }
        String uri = new String(bytes);
        LOG.debug("Connect address: {} from {} ", (Object)uri, (Object)this.heartbeatPath);
        return uri;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void finalize() throws Throwable {
        try {
            this.fs.close();
        }
        finally {
            super.finalize();
        }
    }
}

