/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming;

import java.io.BufferedReader;
import java.io.Closeable;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.ConnectException;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.TestServer;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.receiver.Receiver;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.spark_project.guava.io.Closeables;

public class JavaReceiverAPISuite
implements Serializable {
    @Before
    public void setUp() {
        System.clearProperty("spark.streaming.clock");
    }

    @After
    public void tearDown() {
        System.clearProperty("spark.streaming.clock");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReceiver() throws InterruptedException {
        TestServer server = new TestServer(0);
        server.start();
        AtomicLong dataCounter = new AtomicLong(0L);
        try {
            JavaStreamingContext ssc = new JavaStreamingContext("local[2]", "test", new Duration(200L));
            JavaReceiverInputDStream input = ssc.receiverStream((Receiver)new JavaSocketReceiver("localhost", server.port()));
            JavaDStream mapped = input.map((Function & Serializable)v1 -> v1 + ".");
            mapped.foreachRDD((VoidFunction & Serializable)rdd -> {
                long count = rdd.count();
                dataCounter.addAndGet(count);
            });
            ssc.start();
            long startTime = System.currentTimeMillis();
            long timeout = 10000L;
            Thread.sleep(200L);
            for (int i = 0; i < 6; ++i) {
                server.send(i + "\n");
                Thread.sleep(100L);
            }
            while (dataCounter.get() == 0L && System.currentTimeMillis() - startTime < timeout) {
                Thread.sleep(100L);
            }
            ssc.stop();
            Assert.assertTrue((dataCounter.get() > 0L ? 1 : 0) != 0);
        }
        finally {
            server.stop();
        }
    }

    private static class JavaSocketReceiver
    extends Receiver<String> {
        private String host = null;
        private int port = -1;

        JavaSocketReceiver(String host_, int port_) {
            super(StorageLevel.MEMORY_AND_DISK());
            this.host = host_;
            this.port = port_;
        }

        public void onStart() {
            new Thread(this::receive).start();
        }

        public void onStop() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void receive() {
            try {
                Socket socket = null;
                BufferedReader in = null;
                try {
                    String userInput;
                    socket = new Socket(this.host, this.port);
                    in = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
                    while ((userInput = in.readLine()) != null) {
                        this.store(userInput);
                    }
                }
                catch (Throwable throwable) {
                    Closeables.close(in, (boolean)true);
                    Closeables.close((Closeable)socket, (boolean)true);
                    throw throwable;
                }
                Closeables.close((Closeable)in, (boolean)true);
                Closeables.close((Closeable)socket, (boolean)true);
            }
            catch (ConnectException ce) {
                ce.printStackTrace();
                this.restart("Could not connect", ce);
            }
            catch (Throwable t) {
                t.printStackTrace();
                this.restart("Error receiving data", t);
            }
        }
    }
}

