/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.io.jms;

import com.datatorrent.lib.io.jms.JMSBaseTransactionableStore;
import java.io.IOException;
import java.net.URI;
import javax.jms.JMSException;
import javax.validation.constraints.NotNull;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
public class FSPsuedoTransactionableStore
extends JMSBaseTransactionableStore {
    private static final Logger logger = LoggerFactory.getLogger(FSPsuedoTransactionableStore.class);
    public static final String DEFAULT_RECOVERY_DIRECTORY = "recovery";
    public static final String COMMITTED_WINDOW_DIR = "DT_CMT";
    private transient boolean connected = false;
    private transient boolean inTransaction = false;
    @NotNull
    protected String recoveryDirectory = "recovery";
    private transient FileSystem fs;

    protected FileSystem getFSInstance() throws IOException {
        FileSystem tempFS = FileSystem.newInstance((URI)new Path(this.recoveryDirectory).toUri(), (Configuration)new Configuration());
        if (tempFS instanceof LocalFileSystem) {
            tempFS = ((LocalFileSystem)tempFS).getRaw();
        }
        return tempFS;
    }

    public void setRecoveryDirectory(String recoveryDirectory) {
        this.recoveryDirectory = recoveryDirectory;
    }

    public String getRecoveryDirectory() {
        return this.recoveryDirectory;
    }

    @Override
    public long getCommittedWindowId(String appId, int operatorId) {
        Path recoveryPath = this.getOperatorRecoveryPath(appId, operatorId);
        try {
            if (!this.fs.exists(recoveryPath)) {
                return -1L;
            }
        }
        catch (IOException ex) {
            throw new RuntimeException(ex);
        }
        long maxWindow = Long.MIN_VALUE;
        try {
            FileStatus[] windowFiles;
            for (FileStatus fileStatus : windowFiles = this.fs.listStatus(recoveryPath)) {
                String windowString = fileStatus.getPath().getName();
                long tempWindow = Long.parseLong(windowString);
                if (maxWindow >= tempWindow) continue;
                maxWindow = tempWindow;
            }
        }
        catch (IOException ex) {
            throw new RuntimeException(ex);
        }
        return maxWindow;
    }

    @Override
    public void storeCommittedWindowId(String appId, int operatorId, long windowId) {
        Path recoveryPath = this.getOperatorRecoveryPath(appId, operatorId);
        Path windowPath = this.getOperatorWindowRecoveryPath(appId, operatorId, windowId);
        String windowString = Long.toString(windowId);
        try {
            FileStatus[] windowFiles;
            this.fs.create(windowPath);
            for (FileStatus fileStatus : windowFiles = this.fs.listStatus(recoveryPath)) {
                Path tempPath = fileStatus.getPath();
                if (tempPath.getName().equals(windowString)) continue;
                this.fs.delete(tempPath, true);
            }
        }
        catch (IOException ex) {
            throw new RuntimeException(ex);
        }
    }

    @Override
    public void removeCommittedWindowId(String appId, int operatorId) {
        try {
            this.fs.delete(this.getOperatorRecoveryPath(appId, operatorId).getParent(), true);
        }
        catch (IOException ex) {
            throw new RuntimeException(ex);
        }
    }

    @Override
    public void beginTransaction() {
        this.inTransaction = true;
    }

    @Override
    public void commitTransaction() {
        try {
            this.getBase().getSession().commit();
        }
        catch (JMSException ex) {
            throw new RuntimeException(ex);
        }
        this.inTransaction = false;
    }

    @Override
    public void rollbackTransaction() {
        try {
            this.getBase().getSession().rollback();
        }
        catch (JMSException ex) {
            throw new RuntimeException(ex);
        }
        this.inTransaction = false;
    }

    @Override
    public boolean isInTransaction() {
        return this.inTransaction;
    }

    @Override
    public void connect() throws IOException {
        this.fs = this.getFSInstance();
        this.connected = true;
    }

    @Override
    public void disconnect() throws IOException {
        this.fs.close();
        this.connected = false;
    }

    @Override
    public boolean isConnected() {
        return this.connected;
    }

    @Override
    protected boolean isExactlyOnce() {
        return false;
    }

    private Path getOperatorRecoveryPath(String appId, int operatorId) {
        return new Path("recovery/" + appId + "/" + operatorId + "/" + COMMITTED_WINDOW_DIR);
    }

    private Path getOperatorWindowRecoveryPath(String appId, int operatorId, long windowId) {
        return new Path("recovery/" + appId + "/" + operatorId + "/" + COMMITTED_WINDOW_DIR + "/" + windowId);
    }
}

