/*
 * Decompiled with CFR 0.152.
 */
package com.gojek.mqtt.client;

import com.gojek.courier.extensions.TimeUnitExtensionsKt;
import com.gojek.courier.logging.ILogger;
import com.gojek.courier.utils.Clock;
import com.gojek.mqtt.client.IncomingMsgController;
import com.gojek.mqtt.client.listener.MessageListener;
import com.gojek.mqtt.event.EventHandler;
import com.gojek.mqtt.event.MqttEvent;
import com.gojek.mqtt.exception.CourierExceptionKt;
import com.gojek.mqtt.persistence.IMqttReceivePersistence;
import com.gojek.mqtt.persistence.model.MqttReceivePacket;
import com.gojek.mqtt.persistence.model.MqttReceivePacketKt;
import com.gojek.mqtt.utils.MqttUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

@Metadata(mv={1, 5, 1}, k=1, xi=48, d1={"\u0000v\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\t\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\u0010 \n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\u000b\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\u0002\n\u0002\b\t\b\u0000\u0018\u0000 +2\u00020\u0001:\u0003*+,B=\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\u0006\u0010\b\u001a\u00020\t\u0012\u0006\u0010\n\u001a\u00020\u000b\u0012\u0006\u0010\f\u001a\u00020\u000b\u0012\u0006\u0010\r\u001a\u00020\u000e\u00a2\u0006\u0002\u0010\u000fJ\u0010\u0010\u001f\u001a\u00020 2\u0006\u0010!\u001a\u00020\"H\u0002J\u0018\u0010#\u001a\u00020$2\u0006\u0010%\u001a\u00020\u001c2\u0006\u0010&\u001a\u00020\u001eH\u0016J\b\u0010'\u001a\u00020$H\u0002J\b\u0010(\u001a\u00020$H\u0016J\u0018\u0010)\u001a\u00020$2\u0006\u0010%\u001a\u00020\u001c2\u0006\u0010&\u001a\u00020\u001eH\u0016R\u0014\u0010\u0010\u001a\b\u0012\u0002\b\u0003\u0018\u00010\u0011X\u0082\u000e\u00a2\u0006\u0002\n\u0000R\u000e\u0010\f\u001a\u00020\u000bX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u0012\u0010\u0012\u001a\u00060\u0013R\u00020\u0000X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0014\u001a\u00020\u0015X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\r\u001a\u00020\u000eX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\b\u001a\u00020\tX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u0012\u0010\u0016\u001a\u00060\u0017R\u00020\u0000X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0018\u001a\u00020\u0019X\u0082\u0004\u00a2\u0006\u0002\n\u0000R \u0010\u001a\u001a\u0014\u0012\u0004\u0012\u00020\u001c\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u001e0\u001d0\u001bX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\n\u001a\u00020\u000bX\u0082\u0004\u00a2\u0006\u0002\n\u0000\u00a8\u0006-"}, d2={"Lcom/gojek/mqtt/client/IncomingMsgControllerImpl;", "Lcom/gojek/mqtt/client/IncomingMsgController;", "mqttUtils", "Lcom/gojek/mqtt/utils/MqttUtils;", "mqttReceivePersistence", "Lcom/gojek/mqtt/persistence/IMqttReceivePersistence;", "logger", "Lcom/gojek/courier/logging/ILogger;", "eventHandler", "Lcom/gojek/mqtt/event/EventHandler;", "ttlSeconds", "", "cleanupIntervalSeconds", "clock", "Lcom/gojek/courier/utils/Clock;", "(Lcom/gojek/mqtt/utils/MqttUtils;Lcom/gojek/mqtt/persistence/IMqttReceivePersistence;Lcom/gojek/courier/logging/ILogger;Lcom/gojek/mqtt/event/EventHandler;JJLcom/gojek/courier/utils/Clock;)V", "cleanupFuture", "Ljava/util/concurrent/ScheduledFuture;", "cleanupMessagesTrigger", "Lcom/gojek/mqtt/client/IncomingMsgControllerImpl$CleanupExpiredMessages;", "cleanupThreadPool", "Ljava/util/concurrent/ScheduledThreadPoolExecutor;", "handleMessageTrigger", "Lcom/gojek/mqtt/client/IncomingMsgControllerImpl$HandleMessage;", "handleMsgThreadPool", "Ljava/util/concurrent/ThreadPoolExecutor;", "listenerMap", "Ljava/util/concurrent/ConcurrentHashMap;", "", "", "Lcom/gojek/mqtt/client/listener/MessageListener;", "notifyListeners", "", "message", "Lcom/gojek/mqtt/persistence/model/MqttReceivePacket;", "registerListener", "", "topic", "listener", "scheduleMessagesCleanup", "triggerHandleMessage", "unregisterListener", "CleanupExpiredMessages", "Companion", "HandleMessage", "mqtt-client_release"})
public final class IncomingMsgControllerImpl
implements IncomingMsgController {
    @NotNull
    public static final Companion Companion = new Companion(null);
    @NotNull
    private final MqttUtils mqttUtils;
    @NotNull
    private final IMqttReceivePersistence mqttReceivePersistence;
    @NotNull
    private final ILogger logger;
    @NotNull
    private final EventHandler eventHandler;
    private final long ttlSeconds;
    private final long cleanupIntervalSeconds;
    @NotNull
    private final Clock clock;
    @NotNull
    private final ThreadPoolExecutor handleMsgThreadPool;
    @NotNull
    private final ScheduledThreadPoolExecutor cleanupThreadPool;
    @NotNull
    private final HandleMessage handleMessageTrigger;
    @NotNull
    private final CleanupExpiredMessages cleanupMessagesTrigger;
    @NotNull
    private final ConcurrentHashMap<String, List<MessageListener>> listenerMap;
    @Nullable
    private ScheduledFuture<?> cleanupFuture;
    @NotNull
    public static final String TAG = "IncomingMsgController";

    /*
     * WARNING - void declaration
     */
    public IncomingMsgControllerImpl(@NotNull MqttUtils mqttUtils, @NotNull IMqttReceivePersistence mqttReceivePersistence, @NotNull ILogger logger, @NotNull EventHandler eventHandler, long ttlSeconds, long cleanupIntervalSeconds, @NotNull Clock clock) {
        void $this$handleMsgThreadPool_u24lambda_u2d0;
        Intrinsics.checkNotNullParameter((Object)mqttUtils, (String)"mqttUtils");
        Intrinsics.checkNotNullParameter((Object)mqttReceivePersistence, (String)"mqttReceivePersistence");
        Intrinsics.checkNotNullParameter((Object)logger, (String)"logger");
        Intrinsics.checkNotNullParameter((Object)eventHandler, (String)"eventHandler");
        Intrinsics.checkNotNullParameter((Object)clock, (String)"clock");
        this.mqttUtils = mqttUtils;
        this.mqttReceivePersistence = mqttReceivePersistence;
        this.logger = logger;
        this.eventHandler = eventHandler;
        this.ttlSeconds = ttlSeconds;
        this.cleanupIntervalSeconds = cleanupIntervalSeconds;
        this.clock = clock;
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS, (BlockingQueue<Runnable>)new LinkedBlockingQueue(1), this.mqttUtils.threadFactory("msg-store", false));
        boolean bl = false;
        boolean bl2 = false;
        ThreadPoolExecutor threadPoolExecutor2 = threadPoolExecutor;
        IncomingMsgControllerImpl incomingMsgControllerImpl = this;
        boolean bl3 = false;
        $this$handleMsgThreadPool_u24lambda_u2d0.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        Unit unit = Unit.INSTANCE;
        incomingMsgControllerImpl.handleMsgThreadPool = threadPoolExecutor;
        this.cleanupThreadPool = new ScheduledThreadPoolExecutor(1, this.mqttUtils.threadFactory("msg-store-cleanup", false), new ThreadPoolExecutor.DiscardPolicy());
        this.handleMessageTrigger = new HandleMessage();
        this.cleanupMessagesTrigger = new CleanupExpiredMessages();
        this.listenerMap = new ConcurrentHashMap();
    }

    @Override
    public void triggerHandleMessage() {
        this.handleMsgThreadPool.submit(this.handleMessageTrigger);
    }

    private final void scheduleMessagesCleanup() {
        ScheduledFuture<?> scheduledFuture = this.cleanupFuture;
        if (scheduledFuture != null) {
            scheduledFuture.cancel(false);
        }
        this.cleanupFuture = this.cleanupThreadPool.schedule(this.cleanupMessagesTrigger, this.cleanupIntervalSeconds, TimeUnit.SECONDS);
    }

    @Override
    public synchronized void registerListener(@NotNull String topic, @NotNull MessageListener listener2) {
        Intrinsics.checkNotNullParameter((Object)topic, (String)"topic");
        Intrinsics.checkNotNullParameter((Object)listener2, (String)"listener");
        Map map = this.listenerMap;
        List list = this.listenerMap.get(topic);
        list = CollectionsKt.plus((Collection)(list == null ? CollectionsKt.emptyList() : list), (Object)listener2);
        boolean bl = false;
        map.put(topic, list);
        this.triggerHandleMessage();
    }

    @Override
    public synchronized void unregisterListener(@NotNull String topic, @NotNull MessageListener listener2) {
        Intrinsics.checkNotNullParameter((Object)topic, (String)"topic");
        Intrinsics.checkNotNullParameter((Object)listener2, (String)"listener");
        Map map = this.listenerMap;
        List list = this.listenerMap.get(topic);
        list = CollectionsKt.minus((Iterable)(list == null ? CollectionsKt.emptyList() : list), (Object)listener2);
        boolean bl = false;
        map.put(topic, list);
        List<MessageListener> list2 = this.listenerMap.get(topic);
        Intrinsics.checkNotNull(list2);
        if (list2.isEmpty()) {
            this.listenerMap.remove(topic);
        }
    }

    private final boolean notifyListeners(MqttReceivePacket message) {
        boolean notified = false;
        try {
            List<MessageListener> list = this.listenerMap.get(message.getTopic());
            Intrinsics.checkNotNull(list);
            List<MessageListener> list2 = list;
            Intrinsics.checkNotNullExpressionValue(list2, (String)"listenerMap[message.topic]!!");
            Iterable $this$forEach$iv = list2;
            boolean $i$f$forEach = false;
            for (Object element$iv : $this$forEach$iv) {
                MessageListener it = (MessageListener)element$iv;
                boolean bl = false;
                notified = true;
                it.onMessageReceived(MqttReceivePacketKt.toMqttMessage(message));
            }
            return notified;
        }
        catch (Throwable e) {
            this.logger.d(TAG, Intrinsics.stringPlus((String)"Exception while processing message ", (Object)e));
            this.eventHandler.onEvent(new MqttEvent.MqttMessageReceiveErrorEvent(message.getTopic(), message.getMessage().length, CourierExceptionKt.toCourierException(e), null, 8, null));
            return notified;
        }
    }

    @Metadata(mv={1, 5, 1}, k=1, xi=48, d1={"\u0000\"\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\b\n\u0000\n\u0002\u0010 \n\u0002\u0010\t\n\u0000\n\u0002\u0010\u0002\n\u0000\b\u0082\u0004\u0018\u00002\u00020\u0001B\u0005\u00a2\u0006\u0002\u0010\u0002J\u0016\u0010\u0003\u001a\u00020\u00042\f\u0010\u0005\u001a\b\u0012\u0004\u0012\u00020\u00070\u0006H\u0002J\b\u0010\b\u001a\u00020\tH\u0016\u00a8\u0006\n"}, d2={"Lcom/gojek/mqtt/client/IncomingMsgControllerImpl$HandleMessage;", "Ljava/lang/Runnable;", "(Lcom/gojek/mqtt/client/IncomingMsgControllerImpl;)V", "deleteMessages", "", "messageIds", "", "", "run", "", "mqtt-client_release"})
    private final class HandleMessage
    implements Runnable {
        public HandleMessage() {
            Intrinsics.checkNotNullParameter((Object)IncomingMsgControllerImpl.this, (String)"this$0");
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                if (IncomingMsgControllerImpl.this.listenerMap.keySet().isEmpty()) {
                    IncomingMsgControllerImpl.this.logger.d(IncomingMsgControllerImpl.TAG, "No listeners registered");
                    return;
                }
                IMqttReceivePersistence iMqttReceivePersistence = IncomingMsgControllerImpl.this.mqttReceivePersistence;
                Set set = IncomingMsgControllerImpl.this.listenerMap.keySet();
                Intrinsics.checkNotNullExpressionValue((Object)set, (String)"listenerMap.keys");
                List<MqttReceivePacket> messages = iMqttReceivePersistence.getAllIncomingMessagesWithTopicFilter(set);
                if (IncomingMsgControllerImpl.this.mqttUtils.isEmpty((Iterable)messages)) {
                    IncomingMsgControllerImpl.this.logger.d(IncomingMsgControllerImpl.TAG, "No Messages in Table");
                    return;
                }
                boolean bl = false;
                List deletedMsgIds = new ArrayList();
                for (MqttReceivePacket message : messages) {
                    IncomingMsgControllerImpl.this.logger.d(IncomingMsgControllerImpl.TAG, Intrinsics.stringPlus((String)"Going to process ", (Object)message.getMessageId()));
                    boolean listenersNotified = IncomingMsgControllerImpl.this.notifyListeners(message);
                    if (listenersNotified) {
                        deletedMsgIds.add(message.getMessageId());
                    }
                    IncomingMsgControllerImpl.this.logger.d(IncomingMsgControllerImpl.TAG, Intrinsics.stringPlus((String)"Successfully Processed Message ", (Object)message.getMessageId()));
                }
                Collection collection = deletedMsgIds;
                boolean bl2 = false;
                if (!collection.isEmpty()) {
                    int deletedMessagesCount = this.deleteMessages(deletedMsgIds);
                    IncomingMsgControllerImpl.this.logger.d(IncomingMsgControllerImpl.TAG, "Deleted " + deletedMessagesCount + " messages");
                }
            }
            finally {
                IncomingMsgControllerImpl.this.scheduleMessagesCleanup();
            }
        }

        private final int deleteMessages(List<Long> messageIds) {
            return IncomingMsgControllerImpl.this.mqttReceivePersistence.removeReceivedMessages(messageIds);
        }
    }

    @Metadata(mv={1, 5, 1}, k=1, xi=48, d1={"\u0000\u0012\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0002\n\u0000\b\u0082\u0004\u0018\u00002\u00020\u0001B\u0005\u00a2\u0006\u0002\u0010\u0002J\b\u0010\u0003\u001a\u00020\u0004H\u0016\u00a8\u0006\u0005"}, d2={"Lcom/gojek/mqtt/client/IncomingMsgControllerImpl$CleanupExpiredMessages;", "Ljava/lang/Runnable;", "(Lcom/gojek/mqtt/client/IncomingMsgControllerImpl;)V", "run", "", "mqtt-client_release"})
    private final class CleanupExpiredMessages
    implements Runnable {
        public CleanupExpiredMessages() {
            Intrinsics.checkNotNullParameter((Object)IncomingMsgControllerImpl.this, (String)"this$0");
        }

        @Override
        public void run() {
            IncomingMsgControllerImpl.this.logger.d(IncomingMsgControllerImpl.TAG, "Deleting expired messages");
            long currentTime = IncomingMsgControllerImpl.this.clock.nanoTime();
            long expiryTime = currentTime - TimeUnitExtensionsKt.fromSecondsToNanos((long)IncomingMsgControllerImpl.this.ttlSeconds);
            int deletedMsgsCount = IncomingMsgControllerImpl.this.mqttReceivePersistence.removeMessagesWithOlderTimestamp(expiryTime);
            IncomingMsgControllerImpl.this.logger.d(IncomingMsgControllerImpl.TAG, "Deleted " + deletedMsgsCount + " expired messages");
        }
    }

    @Metadata(mv={1, 5, 1}, k=1, xi=48, d1={"\u0000\u0012\n\u0002\u0018\u0002\n\u0002\u0010\u0000\n\u0002\b\u0002\n\u0002\u0010\u000e\n\u0000\b\u0086\u0003\u0018\u00002\u00020\u0001B\u0007\b\u0002\u00a2\u0006\u0002\u0010\u0002R\u000e\u0010\u0003\u001a\u00020\u0004X\u0086T\u00a2\u0006\u0002\n\u0000\u00a8\u0006\u0005"}, d2={"Lcom/gojek/mqtt/client/IncomingMsgControllerImpl$Companion;", "", "()V", "TAG", "", "mqtt-client_release"})
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker $constructor_marker) {
            this();
        }
    }
}

