/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.wrappers.streaming.io;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnboundedSocketSource<CheckpointMarkT extends UnboundedSource.CheckpointMark>
extends UnboundedSource<String, CheckpointMarkT> {
    private static final Coder<String> DEFAULT_SOCKET_CODER = StringUtf8Coder.of();
    private static final long serialVersionUID = 1L;
    private static final int DEFAULT_CONNECTION_RETRY_SLEEP = 500;
    private static final int CONNECTION_TIMEOUT_TIME = 0;
    private final String hostname;
    private final int port;
    private final char delimiter;
    private final long maxNumRetries;
    private final long delayBetweenRetries;

    public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries) {
        this(hostname, port, delimiter, maxNumRetries, 500L);
    }

    public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries, long delayBetweenRetries) {
        this.hostname = hostname;
        this.port = port;
        this.delimiter = delimiter;
        this.maxNumRetries = maxNumRetries;
        this.delayBetweenRetries = delayBetweenRetries;
    }

    public String getHostname() {
        return this.hostname;
    }

    public int getPort() {
        return this.port;
    }

    public char getDelimiter() {
        return this.delimiter;
    }

    public long getMaxNumRetries() {
        return this.maxNumRetries;
    }

    public long getDelayBetweenRetries() {
        return this.delayBetweenRetries;
    }

    public List<? extends UnboundedSource<String, CheckpointMarkT>> split(int desiredNumSplits, PipelineOptions options) throws Exception {
        return Collections.singletonList(this);
    }

    public UnboundedSource.UnboundedReader<String> createReader(PipelineOptions options, @Nullable CheckpointMarkT checkpointMark) {
        return new UnboundedSocketReader(this);
    }

    @Nullable
    public Coder getCheckpointMarkCoder() {
        return null;
    }

    public void validate() {
        Preconditions.checkArgument((this.port > 0 && this.port < 65536 ? 1 : 0) != 0, (Object)"port is out of range");
        Preconditions.checkArgument((this.maxNumRetries >= -1L ? 1 : 0) != 0, (Object)"maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");
        Preconditions.checkArgument((this.delayBetweenRetries >= 0L ? 1 : 0) != 0, (Object)"delayBetweenRetries must be zero or positive");
    }

    public Coder<String> getOutputCoder() {
        return DEFAULT_SOCKET_CODER;
    }

    public static class UnboundedSocketReader
    extends UnboundedSource.UnboundedReader<String> {
        private static final Logger LOG = LoggerFactory.getLogger(UnboundedSocketReader.class);
        private final UnboundedSocketSource source;
        private Socket socket;
        private BufferedReader reader;
        private boolean isRunning;
        private String currentRecord;

        public UnboundedSocketReader(UnboundedSocketSource source) {
            this.source = source;
        }

        private void openConnection() throws IOException {
            this.socket = new Socket();
            this.socket.connect(new InetSocketAddress(this.source.getHostname(), this.source.getPort()), 0);
            this.reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream(), StandardCharsets.UTF_8));
            this.isRunning = true;
        }

        public boolean start() throws IOException {
            int attempt = 0;
            while (!this.isRunning) {
                try {
                    this.openConnection();
                    LOG.info("Connected to server socket " + this.source.getHostname() + ':' + this.source.getPort());
                    return this.advance();
                }
                catch (IOException e) {
                    LOG.info("Lost connection to server socket " + this.source.getHostname() + ':' + this.source.getPort() + ". Retrying in " + this.source.getDelayBetweenRetries() + " msecs...");
                    if (this.source.getMaxNumRetries() == -1L || (long)attempt++ < this.source.getMaxNumRetries()) {
                        try {
                            Thread.sleep(this.source.getDelayBetweenRetries());
                        }
                        catch (InterruptedException e1) {
                            LOG.error("Interrupted during retry delay", (Throwable)e1);
                        }
                        continue;
                    }
                    this.isRunning = false;
                    break;
                }
            }
            LOG.error("Unable to connect to host " + this.source.getHostname() + " : " + this.source.getPort());
            return false;
        }

        public boolean advance() throws IOException {
            int data;
            StringBuilder buffer = new StringBuilder();
            while (this.isRunning && (data = this.reader.read()) != -1) {
                if (data != this.source.getDelimiter()) {
                    buffer.append((char)data);
                    continue;
                }
                if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\r') {
                    buffer.setLength(buffer.length() - 1);
                }
                this.currentRecord = buffer.toString();
                buffer.setLength(0);
                return true;
            }
            return false;
        }

        public byte[] getCurrentRecordId() throws NoSuchElementException {
            return new byte[0];
        }

        public String getCurrent() throws NoSuchElementException {
            return this.currentRecord;
        }

        public Instant getCurrentTimestamp() throws NoSuchElementException {
            return Instant.now();
        }

        public void close() throws IOException {
            this.reader.close();
            this.socket.close();
            this.isRunning = false;
            LOG.info("Closed connection to server socket at " + this.source.getHostname() + ":" + this.source.getPort() + ".");
        }

        public Instant getWatermark() {
            return Instant.now();
        }

        public UnboundedSource.CheckpointMark getCheckpointMark() {
            return UnboundedSource.CheckpointMark.NOOP_CHECKPOINT_MARK;
        }

        public UnboundedSource<String, ?> getCurrentSource() {
            return this.source;
        }
    }
}

