/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.dstream;

import java.io.EOFException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.storage.StreamBlockId;
import org.apache.spark.streaming.dstream.NetworkReceiver;
import scala.Function0;
import scala.None$;
import scala.Serializable;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001!4Q!\u0001\u0002\u0001\t1\u0011!CU1x\u001d\u0016$xo\u001c:l%\u0016\u001cW-\u001b<fe*\u00111\u0001B\u0001\bIN$(/Z1n\u0015\t)a!A\u0005tiJ,\u0017-\\5oO*\u0011q\u0001C\u0001\u0006gB\f'o\u001b\u0006\u0003\u0013)\ta!\u00199bG\",'\"A\u0006\u0002\u0007=\u0014xm\u0005\u0002\u0001\u001bA\u0019abD\t\u000e\u0003\tI!\u0001\u0005\u0002\u0003\u001f9+Go^8sWJ+7-Z5wKJ\u0004\"AE\u000b\u000e\u0003MQ\u0011\u0001F\u0001\u0006g\u000e\fG.Y\u0005\u0003-M\u00111!\u00118z\u0011!A\u0002A!A!\u0002\u0013Q\u0012\u0001\u00025pgR\u001c\u0001\u0001\u0005\u0002\u001c=9\u0011!\u0003H\u0005\u0003;M\ta\u0001\u0015:fI\u00164\u0017BA\u0010!\u0005\u0019\u0019FO]5oO*\u0011Qd\u0005\u0005\tE\u0001\u0011\t\u0011)A\u0005G\u0005!\u0001o\u001c:u!\t\u0011B%\u0003\u0002&'\t\u0019\u0011J\u001c;\t\u0011\u001d\u0002!\u0011!Q\u0001\n!\nAb\u001d;pe\u0006<W\rT3wK2\u0004\"!\u000b\u0017\u000e\u0003)R!a\u000b\u0004\u0002\u000fM$xN]1hK&\u0011QF\u000b\u0002\r'R|'/Y4f\u0019\u00164X\r\u001c\u0005\u0006_\u0001!\t\u0001M\u0001\u0007y%t\u0017\u000e\u001e \u0015\tE\u00124\u0007\u000e\t\u0003\u001d\u0001AQ\u0001\u0007\u0018A\u0002iAQA\t\u0018A\u0002\rBQa\n\u0018A\u0002!BqA\u000e\u0001A\u0002\u0013\u0005q'\u0001\ncY>\u001c7\u000eU;tQ&tw\r\u00165sK\u0006$W#\u0001\u001d\u0011\u0005erT\"\u0001\u001e\u000b\u0005mb\u0014\u0001\u00027b]\u001eT\u0011!P\u0001\u0005U\u00064\u0018-\u0003\u0002@u\t1A\u000b\u001b:fC\u0012Dq!\u0011\u0001A\u0002\u0013\u0005!)\u0001\fcY>\u001c7\u000eU;tQ&tw\r\u00165sK\u0006$w\fJ3r)\t\u0019e\t\u0005\u0002\u0013\t&\u0011Qi\u0005\u0002\u0005+:LG\u000fC\u0004H\u0001\u0006\u0005\t\u0019\u0001\u001d\u0002\u0007a$\u0013\u0007\u0003\u0004J\u0001\u0001\u0006K\u0001O\u0001\u0014E2|7m\u001b)vg\"Lgn\u001a+ie\u0016\fG\r\t\u0005\u0006\u0017\u0002!\t\u0005T\u0001\u0016O\u0016$Hj\\2bi&|g\u000e\u0015:fM\u0016\u0014XM\\2f)\u0005ieB\u0001\nO\u0013\ty5#\u0001\u0003O_:,\u0007\"B)\u0001\t\u0003\u0011\u0016aB8o'R\f'\u000f\u001e\u000b\u0002\u0007\")A\u000b\u0001C\u0001%\u00061qN\\*u_BDQA\u0016\u0001\u0005\n]\u000b\u0011B]3bI\u001a+H\u000e\\=\u0015\u0007\rC&\rC\u0003Z+\u0002\u0007!,A\u0004dQ\u0006tg.\u001a7\u0011\u0005m\u0003W\"\u0001/\u000b\u0005us\u0016\u0001C2iC:tW\r\\:\u000b\u0005}c\u0014a\u00018j_&\u0011\u0011\r\u0018\u0002\u0014%\u0016\fG-\u00192mK\nKH/Z\"iC:tW\r\u001c\u0005\u0006GV\u0003\r\u0001Z\u0001\u0005I\u0016\u001cH\u000f\u0005\u0002fM6\ta,\u0003\u0002h=\nQ!)\u001f;f\u0005V4g-\u001a:")
public class RawNetworkReceiver
extends NetworkReceiver<Object> {
    public final String org$apache$spark$streaming$dstream$RawNetworkReceiver$$host;
    public final int org$apache$spark$streaming$dstream$RawNetworkReceiver$$port;
    public final StorageLevel org$apache$spark$streaming$dstream$RawNetworkReceiver$$storageLevel;
    private Thread blockPushingThread;

    public Thread blockPushingThread() {
        return this.blockPushingThread;
    }

    public void blockPushingThread_$eq(Thread x$1) {
        this.blockPushingThread = x$1;
    }

    public None$ getLocationPreference() {
        return None$.MODULE$;
    }

    @Override
    public void onStart() {
        this.logInfo((Function0<String>)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ RawNetworkReceiver $outer;

            public final String apply() {
                return new StringBuilder().append((Object)"Connecting to ").append((Object)this.$outer.org$apache$spark$streaming$dstream$RawNetworkReceiver$$host).append((Object)":").append((Object)BoxesRunTime.boxToInteger((int)this.$outer.org$apache$spark$streaming$dstream$RawNetworkReceiver$$port)).toString();
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
            }
        });
        SocketChannel channel = SocketChannel.open();
        channel.configureBlocking(true);
        channel.connect(new InetSocketAddress(this.org$apache$spark$streaming$dstream$RawNetworkReceiver$$host, this.org$apache$spark$streaming$dstream$RawNetworkReceiver$$port));
        this.logInfo((Function0<String>)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ RawNetworkReceiver $outer;

            public final String apply() {
                return new StringBuilder().append((Object)"Connected to ").append((Object)this.$outer.org$apache$spark$streaming$dstream$RawNetworkReceiver$$host).append((Object)":").append((Object)BoxesRunTime.boxToInteger((int)this.$outer.org$apache$spark$streaming$dstream$RawNetworkReceiver$$port)).toString();
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
            }
        });
        ArrayBlockingQueue<ByteBuffer> queue = new ArrayBlockingQueue<ByteBuffer>(2);
        this.blockPushingThread_$eq(new Thread(this, queue){
            private final /* synthetic */ RawNetworkReceiver $outer;
            private final ArrayBlockingQueue queue$1;

            public void run() {
                int nextBlockNumber = 0;
                while (true) {
                    ByteBuffer buffer = (ByteBuffer)this.queue$1.take();
                    StreamBlockId blockId = new StreamBlockId(this.$outer.streamId(), (long)nextBlockNumber);
                    ++nextBlockNumber;
                    this.$outer.pushBlock((BlockId)blockId, buffer, null, this.$outer.org$apache$spark$streaming$dstream$RawNetworkReceiver$$storageLevel);
                }
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
                this.queue$1 = queue$1;
                this.setDaemon(true);
            }
        });
        this.blockPushingThread().start();
        ByteBuffer lengthBuffer = ByteBuffer.allocate(4);
        while (true) {
            lengthBuffer.clear();
            this.readFully(channel, lengthBuffer);
            lengthBuffer.flip();
            int length = lengthBuffer.getInt();
            ByteBuffer dataBuffer = ByteBuffer.allocate(length);
            this.readFully(channel, dataBuffer);
            dataBuffer.flip();
            this.logInfo((Function0<String>)new Serializable(this, length){
                public static final long serialVersionUID = 0L;
                private final int length$1;

                public final String apply() {
                    return new StringBuilder().append((Object)"Read a block with ").append((Object)BoxesRunTime.boxToInteger((int)this.length$1)).append((Object)" bytes").toString();
                }
                {
                    this.length$1 = length$1;
                }
            });
            queue.put(dataBuffer);
        }
    }

    @Override
    public void onStop() {
        if (this.blockPushingThread() != null) {
            this.blockPushingThread().interrupt();
        }
    }

    private void readFully(ReadableByteChannel channel, ByteBuffer dest) {
        while (dest.position() < dest.limit()) {
            if (channel.read(dest) != -1) continue;
            throw new EOFException("End of channel");
        }
    }

    public RawNetworkReceiver(String host, int port, StorageLevel storageLevel) {
        this.org$apache$spark$streaming$dstream$RawNetworkReceiver$$host = host;
        this.org$apache$spark$streaming$dstream$RawNetworkReceiver$$port = port;
        this.org$apache$spark$streaming$dstream$RawNetworkReceiver$$storageLevel = storageLevel;
        super(ClassTag$.MODULE$.Any());
        this.blockPushingThread = null;
    }
}

