/*
 * Decompiled with CFR 0.152.
 */
package com.github.twitch4j.pubsub;

import com.github.philippheuer.credentialmanager.domain.OAuth2Credential;
import com.github.philippheuer.events4j.core.EventManager;
import com.github.twitch4j.client.websocket.WebsocketConnection;
import com.github.twitch4j.client.websocket.domain.WebsocketConnectionState;
import com.github.twitch4j.common.config.ProxyConfig;
import com.github.twitch4j.common.events.TwitchEvent;
import com.github.twitch4j.common.util.CryptoUtils;
import com.github.twitch4j.common.util.TimeUtils;
import com.github.twitch4j.common.util.TypeConvert;
import com.github.twitch4j.pubsub.ITwitchPubSub;
import com.github.twitch4j.pubsub.PubSubSubscription;
import com.github.twitch4j.pubsub.domain.PubSubRequest;
import com.github.twitch4j.pubsub.domain.PubSubResponse;
import com.github.twitch4j.pubsub.domain.PubSubResponsePayload;
import com.github.twitch4j.pubsub.enums.PubSubType;
import com.github.twitch4j.pubsub.events.PubSubAuthRevokeEvent;
import com.github.twitch4j.pubsub.events.PubSubConnectionStateEvent;
import com.github.twitch4j.pubsub.events.PubSubListenResponseEvent;
import com.github.twitch4j.pubsub.handlers.HandlerRegistry;
import com.github.twitch4j.pubsub.handlers.TopicHandler;
import com.github.twitch4j.util.IBackoffStrategy;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.Generated;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.ApiStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TwitchPubSub
implements ITwitchPubSub {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(TwitchPubSub.class);
    @Generated
    private final Object $lock = new Object[0];
    public static final int REQUIRED_THREAD_COUNT = 1;
    private static final Pattern LISTEN_AUTH_TOKEN = Pattern.compile("(\\{.*\"type\"\\s*?:\\s*?\"LISTEN\".*\"data\"\\s*?:\\s*?\\{.*\"auth_token\"\\s*?:\\s*?\").+(\".*}\\s*?})");
    private final EventManager eventManager;
    private final WebsocketConnection connection;
    private static final String WEB_SOCKET_SERVER = "wss://pubsub-edge.twitch.tv:443";
    private final AtomicBoolean flushing = new AtomicBoolean();
    private final AtomicBoolean flushRequested = new AtomicBoolean();
    private final Runnable flushCommand;
    protected final Future<?> queueTask;
    protected final Future<?> heartbeatTask;
    protected volatile boolean isClosed = false;
    protected final BlockingQueue<String> commandQueue = new ArrayBlockingQueue<String>(128);
    protected final Set<PubSubRequest> subscribedTopics = ConcurrentHashMap.newKeySet();
    protected volatile long lastPing = TimeUtils.getCurrentTimeInMillis() - 240000L;
    protected volatile long lastPong = TimeUtils.getCurrentTimeInMillis();
    protected final ScheduledExecutorService taskExecutor;
    private final Collection<String> botOwnerIds;
    private final Consumer<PubSubResponsePayload> fallbackTopicHandler;

    @Deprecated
    @ApiStatus.ScheduledForRemoval(inVersion="2.0.0")
    public TwitchPubSub(WebsocketConnection websocketConnection, EventManager eventManager, ScheduledThreadPoolExecutor taskExecutor, ProxyConfig proxyConfig, Collection<String> botOwnerIds, int wsPingPeriod, IBackoffStrategy connectionBackoffStrategy, int wsCloseDelay, Consumer<PubSubResponsePayload> fallbackTopicHandler) {
        this.eventManager = eventManager;
        this.taskExecutor = taskExecutor;
        this.botOwnerIds = botOwnerIds;
        this.fallbackTopicHandler = fallbackTopicHandler != null ? fallbackTopicHandler : message -> log.warn("Unparsable Message: " + message.getTopic() + "|" + message.getMessage());
        this.connection = websocketConnection == null ? new WebsocketConnection(spec -> {
            spec.baseUrl(WEB_SOCKET_SERVER);
            spec.closeDelay(wsCloseDelay);
            spec.wsPingPeriod(wsPingPeriod);
            spec.onStateChanged((oldState, newState) -> eventManager.publish((Object)new PubSubConnectionStateEvent((WebsocketConnectionState)oldState, (WebsocketConnectionState)newState, this)));
            spec.onPreConnect(this::onPreConnect);
            spec.onConnected(this::onConnected);
            spec.onTextMessage(this::onTextMessage);
            spec.onPostDisconnect(this.commandQueue::clear);
            spec.taskExecutor((ScheduledExecutorService)taskExecutor);
            spec.proxyConfig(proxyConfig);
            if (connectionBackoffStrategy != null) {
                spec.backoffStrategy(connectionBackoffStrategy);
            }
        }) : websocketConnection;
        this.eventManager.getServiceMediator().addService("twitch4j-pubsub", (Object)this);
        this.connect();
        this.heartbeatTask = taskExecutor.scheduleAtFixedRate(() -> {
            if (this.isClosed || this.connection.getConnectionState() != WebsocketConnectionState.CONNECTED) {
                return;
            }
            PubSubRequest request = new PubSubRequest();
            request.setType(PubSubType.PING);
            this.sendCommand(TypeConvert.objectToJson((Object)request));
            log.debug("PubSub: Sending PING!");
            this.lastPing = TimeUtils.getCurrentTimeInMillis();
        }, 0L, 4L, TimeUnit.MINUTES);
        this.flushCommand = () -> {
            if (this.flushing.getAndSet(true)) {
                return;
            }
            while (!this.isClosed) {
                try {
                    String command;
                    if (this.lastPong < this.lastPing && TimeUtils.getCurrentTimeInMillis() >= this.lastPing + 10000L) {
                        log.warn("PubSub: Didn't receive a PONG response in time, reconnecting to obtain a connection to a different server.");
                        this.reconnect();
                        break;
                    }
                    if (!WebsocketConnectionState.CONNECTED.equals((Object)this.connection.getConnectionState()) || (command = (String)this.commandQueue.poll()) == null) break;
                    this.sendCommand(command);
                    if (!log.isDebugEnabled()) continue;
                    Matcher matcher = LISTEN_AUTH_TOKEN.matcher(command);
                    String cmd = matcher.find() ? matcher.group(1) + "\u2022\u2022\u2022" + matcher.group(2) : command;
                    log.debug("Processed command from queue: [{}].", (Object)cmd);
                }
                catch (Exception ex) {
                    log.error("PubSub: Unexpected error in worker thread", (Throwable)ex);
                    break;
                }
            }
            this.flushRequested.set(false);
            this.flushing.set(false);
        };
        this.queueTask = taskExecutor.scheduleWithFixedDelay(this.flushCommand, 0L, 2500L, TimeUnit.MILLISECONDS);
        log.debug("PubSub: Started Queue Worker Thread");
    }

    public void connect() {
        this.connection.connect();
    }

    public void disconnect() {
        this.connection.disconnect();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void reconnect() {
        Object object = this.$lock;
        synchronized (object) {
            this.connection.reconnect();
        }
    }

    protected void onPreConnect() {
        this.lastPong = TimeUtils.getCurrentTimeInMillis();
        this.lastPing = this.lastPong - 240000L;
    }

    protected void onConnected() {
        log.info("Connected to Twitch PubSub {}", (Object)WEB_SOCKET_SERVER);
        this.subscribedTopics.forEach(this::queueRequest);
    }

    protected void onTextMessage(String text) {
        block26: {
            try {
                log.trace("Received WebSocketMessage: " + text);
                PubSubResponse message = (PubSubResponse)TypeConvert.jsonToObject((String)text, PubSubResponse.class);
                if (message.getType().equals((Object)PubSubType.MESSAGE)) {
                    String topic = message.getData().getTopic();
                    String[] topicParts = StringUtils.split((String)topic, (char)'.');
                    String topicName = topicParts[0];
                    TopicHandler handler = HandlerRegistry.INSTANCE.getHandlers().get(topicName);
                    boolean fallback = true;
                    if (handler != null) {
                        TwitchEvent event = null;
                        try {
                            event = (TwitchEvent)handler.apply(new TopicHandler.Args(topicParts, message.getData().getMessage(), this.botOwnerIds));
                        }
                        catch (Exception e) {
                            log.warn("PubSub: Encountered exception when parsing message", (Throwable)e);
                        }
                        if (event != null) {
                            fallback = false;
                            try {
                                this.eventManager.publish((Object)event);
                            }
                            catch (Exception e) {
                                log.warn("An event consumer threw an exception while processing a PubSub event", (Throwable)e);
                            }
                        }
                    }
                    if (fallback) {
                        this.fallbackTopicHandler.accept(message.getData());
                    }
                    break block26;
                }
                if (message.getType().equals((Object)PubSubType.RESPONSE)) {
                    Supplier<PubSubRequest> findListenRequest = () -> {
                        for (PubSubRequest topic : this.subscribedTopics) {
                            if (topic == null || !StringUtils.equals((CharSequence)message.getNonce(), (CharSequence)topic.getNonce())) continue;
                            return topic;
                        }
                        return null;
                    };
                    this.eventManager.publish((Object)new PubSubListenResponseEvent(message.getNonce(), message.getError(), findListenRequest));
                    if (message.getError().length() > 0) {
                        if (message.getError().equalsIgnoreCase("ERR_BADAUTH")) {
                            log.error("PubSub: You used a invalid oauth token to subscribe to the topic. Please use a token that is authorized for the specified channel.");
                        } else {
                            log.error("PubSub: Failed to subscribe to topic - [" + message.getError() + "]");
                        }
                    }
                } else if (message.getType().equals((Object)PubSubType.PONG)) {
                    log.debug("PubSub: Received PONG response!");
                    this.lastPong = TimeUtils.getCurrentTimeInMillis();
                } else if (message.getType().equals((Object)PubSubType.RECONNECT)) {
                    log.warn("PubSub: Server instance we're connected to will go down for maintenance soon, reconnecting to obtain a new connection!");
                    this.reconnect();
                } else if (message.getType() == PubSubType.AUTH_REVOKED) {
                    PubSubRequest revocation = (PubSubRequest)TypeConvert.jsonToObject((String)text, PubSubRequest.class);
                    Object topicsObj = revocation.getData().get("topics");
                    if (topicsObj instanceof Collection) {
                        HashMap revoked = new HashMap();
                        for (Object topicObj : (Collection)topicsObj) {
                            if (topicObj instanceof String) {
                                revoked.put((String)topicObj, null);
                                continue;
                            }
                            log.warn("Unparsable Revocation Topic: {}", topicObj);
                        }
                        if (revoked.isEmpty()) {
                            return;
                        }
                        this.subscribedTopics.removeIf(req -> {
                            Object topics = req.getData().get("topics");
                            if (topics instanceof Collection && ((Collection)topics).size() == 1) {
                                Object topic = ((Collection)topics).iterator().next();
                                return topic instanceof String && revoked.replace((String)topic, null, req);
                            }
                            return false;
                        });
                        this.eventManager.publish((Object)new PubSubAuthRevokeEvent(this, Collections.unmodifiableMap(revoked)));
                    } else {
                        log.warn("Unparsable Revocation: {}", (Object)text);
                    }
                } else {
                    log.debug("PubSub: Unknown Message Type: " + message);
                }
            }
            catch (Exception ex) {
                log.warn("PubSub: Unparsable Message: " + text + " - [" + ex.getMessage() + "]", (Throwable)ex);
            }
        }
    }

    private void sendCommand(String command) {
        if (WebsocketConnectionState.CONNECTED.equals((Object)this.connection.getConnectionState()) || WebsocketConnectionState.CONNECTING.equals((Object)this.connection.getConnectionState())) {
            this.connection.sendText(command);
        } else {
            log.warn("Can't send IRC-WS Command [{}]", (Object)command);
        }
    }

    private void queueRequest(PubSubRequest request) {
        OAuth2Credential credential = request.getCredential();
        if (credential != null) {
            request.getData().put("auth_token", credential.getAccessToken());
        }
        this.commandQueue.add(TypeConvert.objectToJson((Object)request));
        if (!this.flushing.get() && !this.flushRequested.getAndSet(true)) {
            this.taskExecutor.schedule(this.flushCommand, 50L, TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public PubSubSubscription listenOnTopic(PubSubRequest request) {
        if (this.subscribedTopics.add(request)) {
            this.checkListenCount(request);
            this.queueRequest(request);
        }
        return new PubSubSubscription(request);
    }

    @Override
    public boolean unsubscribeFromTopic(PubSubSubscription subscription) {
        PubSubRequest request = subscription.getRequest();
        if (request.getType() != PubSubType.LISTEN) {
            log.warn("Cannot unsubscribe using request with unexpected type: {}", (Object)request.getType());
            return false;
        }
        boolean removed = this.subscribedTopics.remove(request);
        if (!removed) {
            log.warn("Not subscribed to topic: {}", (Object)request);
            return false;
        }
        PubSubRequest unlistenRequest = new PubSubRequest();
        unlistenRequest.setType(PubSubType.UNLISTEN);
        unlistenRequest.setNonce(CryptoUtils.generateNonce((int)30));
        unlistenRequest.setData(request.getData());
        this.queueRequest(unlistenRequest);
        return true;
    }

    @Override
    public long getLatency() {
        return this.connection.getLatency();
    }

    @Override
    public void close() {
        if (!this.isClosed) {
            this.isClosed = true;
            this.heartbeatTask.cancel(false);
            this.queueTask.cancel(false);
            this.connection.close();
        }
    }

    private void checkListenCount(PubSubRequest request) {
        Object topics = request.getData().get("topics");
        if (topics instanceof Collection && ((Collection)topics).size() > 1) {
            log.warn("Listening to multiple PubSub topics in a single request is not recommended; automatic topic management can degrade upon PubSubAuthRevokeEvent");
        }
    }

    @Override
    @Generated
    public EventManager getEventManager() {
        return this.eventManager;
    }
}

