/*
 * Decompiled with CFR 0.152.
 */
package stream.net;

import java.io.BufferedInputStream;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.net.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.annotations.Parameter;
import stream.io.DataObjectStream;
import stream.io.SourceURL;
import stream.util.parser.TimeParser;

public class BufferedDataObjectStream
extends DataObjectStream {
    private final Logger log = LoggerFactory.getLogger(BufferedDataObjectStream.class);
    protected int bufferSize;
    protected int connectionRetries;
    protected long reconnectInterval;
    protected String urlCache;
    protected boolean connected;

    public BufferedDataObjectStream(InputStream in) {
        super(in);
    }

    public BufferedDataObjectStream(SourceURL url) {
        super(url);
    }

    public int getBufferSize() {
        return this.bufferSize;
    }

    @Parameter(required=false, defaultValue="0", description="The buffer size (in bytes) used by this stream. If set to <= 0: The BufferedInputStream's default buffer size is taken (usually 8192)")
    public void setBufferSize(int bufferSize) {
        this.bufferSize = bufferSize;
    }

    public int getConnectionRetries() {
        return this.connectionRetries;
    }

    @Parameter(required=false, defaultValue="Integer.MAX_VALUE", description="the maximum amount of connection retries before the general connection attempt finally fails")
    public void setConnectionRetries(int connectionRetries) {
        this.connectionRetries = connectionRetries;
    }

    public long getReconnectInterval() {
        return this.reconnectInterval;
    }

    @Parameter(required=false, defaultValue="5s", description="the time between two connection attempts if the first connection attempt was unsuccessful")
    public void setReconnectInterval(String reconnectIntervalString) throws Exception {
        this.reconnectInterval = TimeParser.parseTime((String)reconnectIntervalString);
    }

    @Override
    public void init() throws Exception {
        this.input = null;
        this.in = null;
        this.bufferSize = 0;
        this.connectionRetries = Integer.MAX_VALUE;
        this.reconnectInterval = 5000L;
        this.urlCache = null;
        this.connected = false;
    }

    protected void connect() throws ConnectException {
        this.close();
        boolean connected = false;
        int connectionRetryCounter = 0;
        while (!connected && connectionRetryCounter <= this.connectionRetries) {
            try {
                this.log.info("Trying to open connection to {} ...", (Object)this.getConnectionString());
                if (connectionRetryCounter > 0) {
                    this.log.info("Connection retry counter: {}", (Object)connectionRetryCounter);
                }
                InputStream innerIn = this.getInputStream();
                innerIn = this.bufferSize > 0 ? new BufferedInputStream(innerIn, this.bufferSize) : new BufferedInputStream(innerIn);
                this.input = new ObjectInputStream(innerIn);
                connected = true;
            }
            catch (Exception e) {
                this.log.warn("Unable to connect to {}: {}", (Object)this.getConnectionString(), (Object)e.toString());
                this.close();
                try {
                    Thread.sleep(this.reconnectInterval);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                ++connectionRetryCounter;
            }
        }
        if (!connected) {
            this.log.error("Giving up connection attempt after {} retries. Connection to {} unavailable.", (Object)(connectionRetryCounter - 1), (Object)this.getConnectionString());
            this.close();
            throw new ConnectException();
        }
        this.log.info("Successfully connected to {}", (Object)this.getConnectionString());
    }

    @Override
    public Data readNext() throws ConnectException {
        if (!this.connected) {
            this.connect();
            this.connected = true;
        }
        while (true) {
            try {
                return (Data)this.input.readObject();
            }
            catch (Exception e) {
                this.log.error("Exception while reading data from socket stream {}: {}", (Object)this.getConnectionString(), (Object)e.toString());
                this.log.info("Trying to restart connection to {} ...", (Object)this.getConnectionString());
                this.connect();
                continue;
            }
            break;
        }
    }

    protected String getConnectionString() {
        if (this.urlCache == null) {
            StringBuilder sb = new StringBuilder(this.url.getProtocol());
            sb.append("://");
            sb.append(this.url.getHost());
            sb.append(":");
            sb.append(this.url.getPort());
            this.urlCache = sb.toString();
        }
        return this.urlCache;
    }

    @Override
    public void close() {
        if (this.input == null && this.in == null) {
            return;
        }
        this.log.info("Closing connection to {} ...", (Object)this.getConnectionString());
        try {
            if (this.input != null) {
                this.input.close();
            }
            if (this.in != null) {
                this.in.close();
            }
        }
        catch (Exception e) {
            this.log.warn("Exception while closing connection to {}: {}", (Object)this.getConnectionString(), (Object)e.toString());
        }
        this.input = null;
        this.in = null;
        this.log.info("Connection to {} closed.", (Object)this.getConnectionString());
    }
}

