/*
 * Decompiled with CFR 0.152.
 */
package org.somda.sdc.dpws.udp;

import com.google.common.eventbus.EventBus;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Service;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.somda.sdc.common.logging.InstanceLogger;
import org.somda.sdc.dpws.soap.exception.TransportException;
import org.somda.sdc.dpws.udp.UdpBindingService;
import org.somda.sdc.dpws.udp.UdpMessage;
import org.somda.sdc.dpws.udp.UdpMessageQueueObserver;
import org.somda.sdc.dpws.udp.UdpMessageQueueService;

public class UdpMessageQueueServiceImpl
extends AbstractIdleService
implements Service,
UdpMessageQueueService {
    private static final Logger LOG = LogManager.getLogger(UdpMessageQueueServiceImpl.class);
    private static final AtomicInteger INSTANCE_ID_COUNTER = new AtomicInteger(0);
    private final int instanceId;
    private final LinkedBlockingDeque<UdpMessage> incomingMessageQueue;
    private final LinkedBlockingDeque<UdpMessage> outgoingMessageQueue;
    private final EventBus eventBus;
    private final Logger instanceLogger;
    private UdpBindingService udpBinding;
    private Thread outgoingThread;
    private Thread incomingThread;

    @Inject
    UdpMessageQueueServiceImpl(EventBus eventBus, @Named(value="Common.InstanceIdentifier") String frameworkIdentifier) {
        this.instanceLogger = InstanceLogger.wrapLogger((Logger)LOG, (String)frameworkIdentifier);
        this.instanceId = INSTANCE_ID_COUNTER.getAndIncrement();
        this.incomingMessageQueue = new LinkedBlockingDeque();
        this.outgoingMessageQueue = new LinkedBlockingDeque();
        this.eventBus = eventBus;
        this.udpBinding = null;
    }

    @Override
    public void setUdpBinding(UdpBindingService udpBinding) {
        this.udpBinding = udpBinding;
    }

    @Override
    public boolean sendMessage(UdpMessage message) {
        return this.isRunning() && this.outgoingMessageQueue.offer(message);
    }

    @Override
    public void registerUdpMessageQueueObserver(UdpMessageQueueObserver observer) {
        this.eventBus.register((Object)observer);
    }

    @Override
    public void unregisterUdpMessageQueueObserver(UdpMessageQueueObserver observer) {
        this.eventBus.unregister((Object)observer);
    }

    protected void startUp() throws Exception {
        this.instanceLogger.info("[{}] Start UDP message queue for binding {}", (Object)this.instanceId, (Object)this.udpBinding);
        if (this.udpBinding == null) {
            String msg = "Cannot startup without UDP binding";
            this.instanceLogger.warn("[{}] {}", (Object)this.instanceId, (Object)msg);
            throw new Exception(msg);
        }
        this.startProcessingOfIncomingMessages();
        this.startProcessingOfOutgoingMessages();
        this.instanceLogger.info("[{}] UDP message queue for binding {} is running", (Object)this.instanceId, (Object)this.udpBinding);
    }

    private void startProcessingOfOutgoingMessages() {
        this.outgoingThread = new Thread(() -> {
            try {
                while (true) {
                    try {
                        while (true) {
                            UdpMessage message = this.outgoingMessageQueue.take();
                            this.instanceLogger.trace("[{}] Outgoing UdpMessageQueueService received UDP message, sending: {}", (Object)this.instanceId, (Object)message);
                            this.udpBinding.sendMessage(message);
                        }
                    }
                    catch (IOException e) {
                        this.instanceLogger.warn("[{}] Outgoing UdpMessageQueueService IO exception caught", (Object)this.instanceId, (Object)e);
                        continue;
                    }
                    catch (InterruptedException e) {
                        this.instanceLogger.info("[{}] Outgoing UdpMessageQueueService interrupted", (Object)this.instanceId);
                        this.instanceLogger.trace("[{}] Outgoing UdpMessageQueueService interrupted", (Object)this.instanceId, (Object)e);
                    }
                    catch (TransportException e) {
                        this.instanceLogger.info("[{}] Outgoing UdpMessageQueueService transport exception caught", (Object)this.instanceId, (Object)e);
                        continue;
                    }
                    break;
                }
            }
            finally {
                this.instanceLogger.info("[{}] Outgoing UdpMessageQueueService ended", (Object)this.instanceId);
            }
        });
        this.outgoingThread.setName(String.format("[%s] Outgoing UdpMessageQueueService", this.instanceId));
        this.outgoingThread.setDaemon(true);
        this.outgoingThread.start();
    }

    private void startProcessingOfIncomingMessages() {
        this.incomingThread = new Thread(() -> {
            try {
                while (true) {
                    try {
                        while (true) {
                            UdpMessage message = this.incomingMessageQueue.take();
                            this.instanceLogger.trace("[{}] Incoming UdpMessageQueueService received UDP message, posting to EventBus: {}", (Object)this.instanceId, (Object)message);
                            this.eventBus.post((Object)message);
                        }
                    }
                    catch (InterruptedException e) {
                        this.instanceLogger.info("[{}] Incoming UdpMessageQueueService interrupted", (Object)this.instanceId);
                        this.instanceLogger.trace("[{}] Incoming UdpMessageQueueService interrupted", (Object)this.instanceId, (Object)e);
                    }
                    catch (Exception e) {
                        this.instanceLogger.warn("[{}] Incoming UdpMessageQueueService encountered an error on event dissemination", (Object)this.instanceId, (Object)e);
                        continue;
                    }
                    break;
                }
            }
            finally {
                this.instanceLogger.info("[{}] Incoming UdpMessageQueueService ended", (Object)this.instanceId);
            }
        });
        this.incomingThread.setName(String.format("[%s] Incoming UdpMessageQueueService", this.instanceId));
        this.incomingThread.setDaemon(true);
        this.incomingThread.start();
    }

    protected void shutDown() {
        this.instanceLogger.info("[{}] Shut down UDP message queue for binding {}", (Object)this.instanceId, (Object)this.udpBinding);
        this.incomingMessageQueue.clear();
        this.outgoingMessageQueue.clear();
        this.incomingThread.interrupt();
        this.outgoingThread.interrupt();
        this.instanceLogger.info("[{}] UDP message queue for binding {} shut down", (Object)this.instanceId, (Object)this.udpBinding);
    }

    @Override
    public void receive(UdpMessage udpMessage) {
        this.instanceLogger.debug("[{}] Received UDP message, adding to queue", (Object)this.instanceId);
        if (!this.incomingMessageQueue.offer(udpMessage)) {
            this.instanceLogger.error("[{}] Lost incoming UDP message in message queue: {}", (Object)this.instanceId, (Object)udpMessage);
        }
    }
}

