/*
 * Decompiled with CFR 0.152.
 */
package stream.net;

import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.data.DataFactory;
import stream.io.AbstractStream;
import stream.io.SourceURL;
import streams.runtime.Hook;
import streams.runtime.Signals;

public class UDPStream
extends AbstractStream
implements Runnable {
    static Logger log = LoggerFactory.getLogger(UDPStream.class);
    protected String address = "0.0.0.0";
    protected Integer port;
    protected DatagramSocket socket;
    static final Data eof = DataFactory.create();
    protected AtomicBoolean running = new AtomicBoolean(false);
    protected Integer packetSize = 1024;
    protected Integer backlog = 100;
    protected final LinkedBlockingQueue<Data> queue = new LinkedBlockingQueue();
    protected Thread t;
    protected String id;

    public UDPStream() {
        super((SourceURL)null);
    }

    public String getId() {
        return this.id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public void init() throws Exception {
        this.socket = new DatagramSocket(this.port);
        if (this.running.get() && this.t.isAlive()) {
            log.error("UDP-Stream {} already running.", (Object)this);
            return;
        }
        this.t = new Thread(this);
        this.t.setDaemon(true);
        this.t.start();
        Signals.register((Hook)new Hook(){

            public void signal(int flags) {
                log.debug("Setting running=false");
                UDPStream.this.running.set(false);
                log.debug("Adding EOF item");
                UDPStream.this.queue.add(eof);
                try {
                    if (UDPStream.this.t != null) {
                        UDPStream.this.t.interrupt();
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }

    public Data read() throws Exception {
        Data item = null;
        while (item == null) {
            try {
                item = this.queue.take();
            }
            catch (InterruptedException ie) {
                if (!this.socket.isClosed()) continue;
                return null;
            }
        }
        if (item == eof) {
            return null;
        }
        Data datum = DataFactory.create();
        datum.putAll((Map)item);
        return datum;
    }

    public void close() throws Exception {
        this.running.set(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        this.running.set(true);
        while (this.running.get()) {
            try {
                byte[] buf = new byte[this.packetSize.intValue()];
                DatagramPacket packet = new DatagramPacket(buf, this.packetSize);
                this.socket.receive(packet);
                Data item = DataFactory.create();
                int off = packet.getOffset();
                int len = packet.getLength() - off;
                byte[] data = new byte[len];
                System.arraycopy(packet.getData(), off, data, 0, len);
                item.put((Object)"udp:data", (Object)data);
                item.put((Object)"udp:source", (Object)packet.getAddress().getHostAddress());
                item.put((Object)"udp:port", (Object)packet.getPort());
                item.put((Object)"udp:size", (Object)len);
                LinkedBlockingQueue<Data> linkedBlockingQueue = this.queue;
                synchronized (linkedBlockingQueue) {
                    if (!this.queue.isEmpty() && this.queue.remainingCapacity() < 1) {
                        this.queue.remove();
                    }
                    this.queue.put(item);
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public String getAddress() {
        return this.address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public Integer getPort() {
        return this.port;
    }

    public void setPort(Integer port) {
        this.port = port;
    }

    public Integer getBacklog() {
        return this.backlog;
    }

    public void setBacklog(Integer backlog) {
        this.backlog = backlog;
    }

    public Data readNext() throws Exception {
        Data item = this.queue.take();
        if (item == eof) {
            return null;
        }
        return item;
    }
}

