/*
 * Decompiled with CFR 0.152.
 */
package io.joynr.messaging.routing;

import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.joynr.exceptions.JoynrDelayMessageException;
import io.joynr.exceptions.JoynrIllegalStateException;
import io.joynr.exceptions.JoynrMessageNotSentException;
import io.joynr.exceptions.JoynrRuntimeException;
import io.joynr.exceptions.JoynrShutdownException;
import io.joynr.messaging.FailureAction;
import io.joynr.messaging.IMessagingMulticastSubscriber;
import io.joynr.messaging.IMessagingSkeleton;
import io.joynr.messaging.IMessagingStub;
import io.joynr.messaging.MessagingSkeletonFactory;
import io.joynr.messaging.SuccessAction;
import io.joynr.messaging.routing.AddressManager;
import io.joynr.messaging.routing.DelayableImmutableMessage;
import io.joynr.messaging.routing.MessageProcessedListener;
import io.joynr.messaging.routing.MessageQueue;
import io.joynr.messaging.routing.MessageRouter;
import io.joynr.messaging.routing.MessagingStubFactory;
import io.joynr.messaging.routing.MulticastReceiverRegistry;
import io.joynr.messaging.routing.RoutingTable;
import io.joynr.runtime.ShutdownListener;
import io.joynr.runtime.ShutdownNotifier;
import io.joynr.statusmetrics.MessageWorkerStatus;
import io.joynr.statusmetrics.StatusReceiver;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.inject.Singleton;
import joynr.ImmutableMessage;
import joynr.system.RoutingTypes.Address;
import joynr.system.RoutingTypes.WebSocketClientAddress;
import org.apache.commons.lang.time.FastDateFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractMessageRouter
implements MessageRouter,
ShutdownListener {
    private static final Logger logger = LoggerFactory.getLogger(AbstractMessageRouter.class);
    private static final FastDateFormat dateFormatter = FastDateFormat.getInstance((String)"dd/MM/yyyy HH:mm:ss:sss z", (TimeZone)TimeZone.getTimeZone("UTC"));
    protected final RoutingTable routingTable;
    private ScheduledExecutorService scheduler;
    private long sendMsgRetryIntervalMs;
    private long routingTableCleanupIntervalMs;
    @Inject(optional=true)
    @Named(value="joynr.messaging.routingmaxretrycount")
    private long maxRetryCount = -1L;
    @Inject(optional=true)
    @Named(value="joynr.messaging.maxDelayWithExponentialBackoffMs")
    private long maxDelayMs = -1L;
    private MessagingStubFactory messagingStubFactory;
    private final MessagingSkeletonFactory messagingSkeletonFactory;
    private AddressManager addressManager;
    protected final MulticastReceiverRegistry multicastReceiverRegistry;
    private final MessageQueue messageQueue;
    private final StatusReceiver statusReceiver;
    private List<MessageProcessedListener> messageProcessedListeners;
    private List<MessageWorker> messageWorkers;
    private final ConcurrentHashMap<WeakReference<Object>, ProxyInformation> proxyMap;
    private final ReferenceQueue<Object> garbageCollectedProxiesQueue;
    private final ShutdownNotifier shutdownNotifier;

    @Inject
    @Singleton
    public AbstractMessageRouter(RoutingTable routingTable, @Named(value="io.joynr.messaging.scheduledthreadpool") ScheduledExecutorService scheduler, @Named(value="joynr.messaging.sendmsgretryintervalms") long sendMsgRetryIntervalMs, @Named(value="joynr.messaging.maximumparallelsends") int maxParallelSends, @Named(value="joynr.messaging.routingtablecleanupintervalms") long routingTableCleanupIntervalMs, MessagingStubFactory messagingStubFactory, MessagingSkeletonFactory messagingSkeletonFactory, AddressManager addressManager, MulticastReceiverRegistry multicastReceiverRegistry, MessageQueue messageQueue, ShutdownNotifier shutdownNotifier, StatusReceiver statusReceiver) {
        this.routingTable = routingTable;
        this.scheduler = scheduler;
        this.sendMsgRetryIntervalMs = sendMsgRetryIntervalMs;
        this.routingTableCleanupIntervalMs = routingTableCleanupIntervalMs;
        this.messagingStubFactory = messagingStubFactory;
        this.messagingSkeletonFactory = messagingSkeletonFactory;
        this.addressManager = addressManager;
        this.multicastReceiverRegistry = multicastReceiverRegistry;
        this.messageQueue = messageQueue;
        this.statusReceiver = statusReceiver;
        this.proxyMap = new ConcurrentHashMap();
        this.garbageCollectedProxiesQueue = new ReferenceQueue();
        this.shutdownNotifier = shutdownNotifier;
        shutdownNotifier.registerForShutdown(this);
        this.messageProcessedListeners = new ArrayList<MessageProcessedListener>();
        this.startMessageWorkerThreads(maxParallelSends);
        this.startRoutingTableCleanupThread();
    }

    private void startMessageWorkerThreads(int numberOfWorkThreads) {
        this.messageWorkers = new ArrayList<MessageWorker>(numberOfWorkThreads);
        for (int i = 0; i < numberOfWorkThreads; ++i) {
            MessageWorker messageWorker = new MessageWorker(i);
            this.scheduler.schedule(messageWorker, 0L, TimeUnit.MILLISECONDS);
            this.messageWorkers.add(messageWorker);
        }
    }

    private void startRoutingTableCleanupThread() {
        this.scheduler.scheduleWithFixedDelay(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                Reference r;
                AbstractMessageRouter.this.routingTable.purge();
                ReferenceQueue referenceQueue = AbstractMessageRouter.this.garbageCollectedProxiesQueue;
                synchronized (referenceQueue) {
                    r = AbstractMessageRouter.this.garbageCollectedProxiesQueue.poll();
                }
                while (r != null) {
                    ProxyInformation proxyInformation = (ProxyInformation)AbstractMessageRouter.this.proxyMap.get(r);
                    logger.debug("removing garbage collected proxy participantId {}", (Object)proxyInformation.participantId);
                    AbstractMessageRouter.this.removeNextHop(proxyInformation.participantId);
                    AbstractMessageRouter.this.shutdownNotifier.unregister(proxyInformation.shutdownListener);
                    AbstractMessageRouter.this.proxyMap.remove(r);
                    ReferenceQueue referenceQueue2 = AbstractMessageRouter.this.garbageCollectedProxiesQueue;
                    synchronized (referenceQueue2) {
                        r = AbstractMessageRouter.this.garbageCollectedProxiesQueue.poll();
                    }
                }
            }
        }, this.routingTableCleanupIntervalMs, this.routingTableCleanupIntervalMs, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void registerMessageProcessedListener(MessageProcessedListener messageProcessedListener) {
        List<MessageProcessedListener> list = this.messageProcessedListeners;
        synchronized (list) {
            this.messageProcessedListeners.add(messageProcessedListener);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void unregisterMessageProcessedListener(MessageProcessedListener messageProcessedListener) {
        List<MessageProcessedListener> list = this.messageProcessedListeners;
        synchronized (list) {
            this.messageProcessedListeners.remove(messageProcessedListener);
        }
    }

    @Override
    public void removeNextHop(String participantId) {
        this.routingTable.remove(participantId);
    }

    @Override
    public boolean resolveNextHop(String participantId) {
        return this.routingTable.containsKey(participantId);
    }

    @Override
    public void addMulticastReceiver(final String multicastId, String subscriberParticipantId, String providerParticipantId) {
        logger.trace("Adding multicast receiver {} for multicast {} on provider {}", new Object[]{subscriberParticipantId, multicastId, providerParticipantId});
        this.multicastReceiverRegistry.registerMulticastReceiver(multicastId, subscriberParticipantId);
        this.performSubscriptionOperation(multicastId, providerParticipantId, new SubscriptionOperation(){

            @Override
            public void perform(IMessagingMulticastSubscriber messagingMulticastSubscriber) {
                messagingMulticastSubscriber.registerMulticastSubscription(multicastId);
            }
        });
    }

    @Override
    public void removeMulticastReceiver(final String multicastId, String subscriberParticipantId, String providerParticipantId) {
        this.multicastReceiverRegistry.unregisterMulticastReceiver(multicastId, subscriberParticipantId);
        this.performSubscriptionOperation(multicastId, providerParticipantId, new SubscriptionOperation(){

            @Override
            public void perform(IMessagingMulticastSubscriber messagingMulticastSubscriber) {
                messagingMulticastSubscriber.unregisterMulticastSubscription(multicastId);
            }
        });
    }

    private void performSubscriptionOperation(String multicastId, String providerParticipantId, SubscriptionOperation operation) {
        Address providerAddress = this.routingTable.get(providerParticipantId);
        IMessagingSkeleton messagingSkeleton = this.messagingSkeletonFactory.getSkeleton(providerAddress);
        if (messagingSkeleton != null && messagingSkeleton instanceof IMessagingMulticastSubscriber) {
            operation.perform((IMessagingMulticastSubscriber)((Object)messagingSkeleton));
        } else {
            logger.trace("No messaging skeleton found for address {}, not performing multicast subscription.", (Object)providerAddress);
        }
    }

    @Override
    public void addNextHop(String participantId, Address address, boolean isGloballyVisible) {
        long expiryDateMs = Long.MAX_VALUE;
        this.addToRoutingTable(participantId, address, isGloballyVisible, Long.MAX_VALUE);
    }

    @Override
    public void addToRoutingTable(String participantId, Address address, boolean isGloballyVisible, long expiryDateMs) {
        this.routingTable.put(participantId, address, isGloballyVisible, expiryDateMs);
    }

    @Override
    public void route(ImmutableMessage message) {
        this.checkExpiry(message);
        this.routeInternal(message, 0L, 0);
    }

    protected Set<Address> getAddresses(ImmutableMessage message) {
        return this.addressManager.getAddresses(message);
    }

    private void checkFoundAddresses(Set<Address> foundAddresses, ImmutableMessage message) {
        if (foundAddresses.isEmpty()) {
            if ("m".equals(message.getType())) {
                throw new JoynrMessageNotSentException("Failed to route multicast publication: No address found for given message: " + message);
            }
            if ("p".equals(message.getType())) {
                throw new JoynrMessageNotSentException("Failed to route publication: No address found for given message: " + message);
            }
            if (message.isReply()) {
                throw new JoynrMessageNotSentException("Failed to route reply: No address found for given message: " + message);
            }
            throw new JoynrIllegalStateException("Unable to find address for recipient with participant ID " + message.getRecipient());
        }
    }

    private void routeInternal(ImmutableMessage message, long delayMs, int retriesCount) {
        logger.trace("Scheduling message {} with delay {} and retries {}", new Object[]{message, delayMs, retriesCount});
        Set<Address> addresses = this.getAddresses(message);
        try {
            this.checkFoundAddresses(addresses, message);
        }
        catch (JoynrMessageNotSentException error) {
            logger.error(" ERROR SENDING: aborting send of messageId: {}. Error: {}", new Object[]{message.getId(), error.getMessage()});
            this.callMessageProcessedListeners(message.getId());
            return;
        }
        catch (Exception error) {
            logger.warn("PROBLEM SENDING, will retry. messageId: {}. Error: {} Message: {}", new Object[]{message.getId(), error.getClass().getName(), error.getMessage()});
            delayMs = this.createDelayWithExponentialBackoff(this.sendMsgRetryIntervalMs, retriesCount);
            logger.error("Rescheduling messageId: {} with delay {} ms, TTL is: {}", new Object[]{message.getId(), delayMs, dateFormatter.format(message.getTtlMs())});
        }
        DelayableImmutableMessage delayableMessage = new DelayableImmutableMessage(message, delayMs, addresses, retriesCount);
        this.scheduleMessage(delayableMessage);
    }

    private void scheduleMessage(DelayableImmutableMessage delayableMessage) {
        String messageId = delayableMessage.getMessage().getId();
        if (this.maxRetryCount > -1L) {
            int retriesCount = delayableMessage.getRetriesCount();
            if ((long)retriesCount > this.maxRetryCount) {
                logger.error("Max-retry-count (" + this.maxRetryCount + ") reached. Dropping message " + messageId);
                this.callMessageProcessedListeners(messageId);
                return;
            }
            if (retriesCount > 0) {
                logger.debug("Retry {}/{} sending message {}", new Object[]{retriesCount, this.maxRetryCount, messageId});
            }
        }
        this.messageQueue.put(delayableMessage);
    }

    private void checkExpiry(ImmutableMessage message) {
        if (!message.isTtlAbsolute()) {
            this.callMessageProcessedListeners(message.getId());
            throw new JoynrRuntimeException("Relative ttl not supported");
        }
        long currentTimeMillis = System.currentTimeMillis();
        long ttlExpirationDateMs = message.getTtlMs();
        if (ttlExpirationDateMs <= currentTimeMillis) {
            String errorMessage = MessageFormat.format("ttl must be greater than 0 / ttl timestamp must be in the future: now: {0} ({1}) abs_ttl: {2} ({3}) msg_id: {4}", currentTimeMillis, dateFormatter.format(currentTimeMillis), ttlExpirationDateMs, dateFormatter.format(ttlExpirationDateMs), message.getId());
            logger.error(errorMessage);
            this.callMessageProcessedListeners(message.getId());
            throw new JoynrMessageNotSentException(errorMessage);
        }
    }

    private FailureAction createFailureAction(final DelayableImmutableMessage delayableMessage) {
        FailureAction failureAction = new FailureAction(){
            final String messageId;
            private boolean failureActionExecutedOnce;
            {
                this.messageId = delayableMessage.getMessage().getId();
                this.failureActionExecutedOnce = false;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void execute(Throwable error) {
                4 var2_2 = this;
                synchronized (var2_2) {
                    if (this.failureActionExecutedOnce) {
                        logger.trace("Failure action for message with id {} already executed once. Ignoring further call.", (Object)this.messageId);
                        return;
                    }
                    this.failureActionExecutedOnce = true;
                }
                if (error instanceof JoynrShutdownException) {
                    logger.warn("{}", (Object)error.getMessage());
                    return;
                }
                if (error instanceof JoynrMessageNotSentException) {
                    logger.error(" ERROR SENDING:  aborting send of messageId: {}. Error: {}", new Object[]{this.messageId, error.getMessage()});
                    AbstractMessageRouter.this.callMessageProcessedListeners(this.messageId);
                    return;
                }
                logger.warn("PROBLEM SENDING, will retry. messageId: {}. Error: {} Message: {}", new Object[]{this.messageId, error.getClass().getName(), error.getMessage()});
                long delayMs = error instanceof JoynrDelayMessageException ? ((JoynrDelayMessageException)error).getDelayMs() : AbstractMessageRouter.this.createDelayWithExponentialBackoff(AbstractMessageRouter.this.sendMsgRetryIntervalMs, delayableMessage.getRetriesCount());
                if ("m".equals(delayableMessage.getMessage().getType()) || delayableMessage.getDestinationAddresses().removeIf(address -> address instanceof WebSocketClientAddress)) {
                    AbstractMessageRouter.this.routeInternal(delayableMessage.getMessage(), delayMs, delayableMessage.getRetriesCount() + 1);
                    return;
                }
                delayableMessage.setDelay(delayMs);
                delayableMessage.setRetriesCount(delayableMessage.getRetriesCount() + 1);
                logger.error("Rescheduling messageId: {} with delay {} ms, TTL: {}, retries: {}", new Object[]{this.messageId, delayMs, dateFormatter.format(delayableMessage.getMessage().getTtlMs()), delayableMessage.getRetriesCount()});
                try {
                    AbstractMessageRouter.this.scheduleMessage(delayableMessage);
                }
                catch (Exception e) {
                    logger.warn("Rescheduling of message failed (messageId {})", (Object)this.messageId);
                    AbstractMessageRouter.this.callMessageProcessedListeners(this.messageId);
                }
            }
        };
        return failureAction;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void callMessageProcessedListeners(String messageId) {
        List<MessageProcessedListener> list = this.messageProcessedListeners;
        synchronized (list) {
            for (MessageProcessedListener messageProcessedListener : this.messageProcessedListeners) {
                messageProcessedListener.messageProcessed(messageId);
            }
        }
    }

    private SuccessAction createMessageProcessedAction(final String messageId, final int numberOfCalls) {
        SuccessAction successAction = new SuccessAction(){
            private int callCount;
            {
                this.callCount = numberOfCalls;
            }

            @Override
            public void execute() {
                --this.callCount;
                if (this.callCount == 0) {
                    AbstractMessageRouter.this.callMessageProcessedListeners(messageId);
                }
            }
        };
        return successAction;
    }

    @Override
    public void prepareForShutdown() {
        this.messageQueue.waitForQueueToDrain();
    }

    @Override
    public void shutdown() {
        CountDownLatch countDownLatch = new CountDownLatch(this.messageWorkers.size());
        for (MessageWorker worker : this.messageWorkers) {
            worker.stopWorker(countDownLatch);
        }
        try {
            countDownLatch.await(1500L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            logger.error("Interrupted while waiting for message workers to stop.", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void registerProxy(Object proxy, String proxyParticipantId, ShutdownListener shutdownListener) {
        ReferenceQueue<Object> referenceQueue = this.garbageCollectedProxiesQueue;
        synchronized (referenceQueue) {
            this.proxyMap.putIfAbsent(new WeakReference<Object>(proxy, this.garbageCollectedProxiesQueue), new ProxyInformation(proxyParticipantId, shutdownListener));
        }
    }

    private long createDelayWithExponentialBackoff(long sendMsgRetryIntervalMs, int retries) {
        logger.trace("TRIES: " + retries);
        long millis = sendMsgRetryIntervalMs + (long)((double)((long)(2 ^ retries) * sendMsgRetryIntervalMs) * Math.random());
        if (this.maxDelayMs >= sendMsgRetryIntervalMs && millis > this.maxDelayMs) {
            millis = this.maxDelayMs;
            logger.trace("set MILLIS to " + millis + " since maxDelayMs is " + this.maxDelayMs);
        }
        logger.trace("MILLIS: " + millis);
        return millis;
    }

    class MessageWorker
    implements Runnable {
        private Logger logger = LoggerFactory.getLogger(MessageWorker.class);
        private int number;
        private volatile CountDownLatch countDownLatch;
        private volatile boolean stopped;

        public MessageWorker(int number) {
            this.number = number;
            this.countDownLatch = null;
            this.stopped = false;
        }

        void stopWorker(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
            this.stopped = true;
        }

        @Override
        public void run() {
            Thread.currentThread().setName("joynrMessageWorker-" + this.number);
            while (!this.stopped) {
                DelayableImmutableMessage delayableMessage = null;
                try {
                    AbstractMessageRouter.this.statusReceiver.updateMessageWorkerStatus(this.number, new MessageWorkerStatus(System.currentTimeMillis(), true));
                    delayableMessage = AbstractMessageRouter.this.messageQueue.poll(1000L, TimeUnit.MILLISECONDS);
                    if (delayableMessage == null) continue;
                    AbstractMessageRouter.this.statusReceiver.updateMessageWorkerStatus(this.number, new MessageWorkerStatus(System.currentTimeMillis(), false));
                    ImmutableMessage message = delayableMessage.getMessage();
                    this.logger.trace("Starting processing of message {}", (Object)message);
                    AbstractMessageRouter.this.checkExpiry(message);
                    Set<Address> addresses = delayableMessage.getDestinationAddresses();
                    if (addresses.isEmpty()) {
                        boolean delayMs = false;
                        AbstractMessageRouter.this.routeInternal(message, 0L, delayableMessage.getRetriesCount() + 1);
                        continue;
                    }
                    SuccessAction messageProcessedAction = AbstractMessageRouter.this.createMessageProcessedAction(message.getId(), addresses.size());
                    FailureAction failureAction = AbstractMessageRouter.this.createFailureAction(delayableMessage);
                    for (Address address : addresses) {
                        this.logger.trace(">>>>> SEND message {} to address {}", (Object)message.getId(), (Object)address);
                        IMessagingStub messagingStub = AbstractMessageRouter.this.messagingStubFactory.create(address);
                        messagingStub.transmit(message, messageProcessedAction, failureAction);
                    }
                }
                catch (InterruptedException e) {
                    this.logger.trace("Message Worker interrupted. Stopping.");
                    Thread.currentThread().interrupt();
                    return;
                }
                catch (Exception error) {
                    this.logger.error("error in scheduled message router thread: {}", (Object)error.getMessage());
                    FailureAction failureAction = AbstractMessageRouter.this.createFailureAction(delayableMessage);
                    failureAction.execute(error);
                }
            }
            this.countDownLatch.countDown();
        }
    }

    private static interface SubscriptionOperation {
        public void perform(IMessagingMulticastSubscriber var1);
    }

    private static class ProxyInformation {
        public String participantId;
        public ShutdownListener shutdownListener;

        public ProxyInformation(String participantId, ShutdownListener shutdownListener) {
            this.participantId = participantId;
            this.shutdownListener = shutdownListener;
        }
    }
}

