/*
 * Decompiled with CFR 0.152.
 */
package ru.tinkoff.invest.openapi.okhttp;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.ByteString;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.tinkoff.invest.openapi.StreamingContext;
import ru.tinkoff.invest.openapi.model.streaming.StreamingEvent;
import ru.tinkoff.invest.openapi.model.streaming.StreamingRequest;

class StreamingContextImpl
implements StreamingContext {
    private static final TypeReference<StreamingEvent> streamingEventTypeReference = new TypeReference<StreamingEvent>(){};
    private final WebSocket[] wsClients;
    private final ArrayList<Set<StreamingRequest.ActivatingRequest>> requestsHistory;
    private final ObjectMapper mapper;
    private final Logger logger = LoggerFactory.getLogger(StreamingContextImpl.class);
    private final List<SubscriptionImpl> subscriptions;
    private final OkHttpClient client;
    private final Executor executor;
    private final okhttp3.Request wsRequest;
    private boolean isTerminated;

    StreamingContextImpl(@NotNull OkHttpClient client, @NotNull String streamingUrl, @NotNull String authToken, int streamingParallelism, @NotNull Executor executor) {
        this.client = client;
        this.subscriptions = new LinkedList<SubscriptionImpl>();
        this.executor = executor;
        this.mapper = new ObjectMapper();
        this.mapper.registerModule((Module)new JavaTimeModule());
        this.isTerminated = false;
        this.wsClients = new WebSocket[streamingParallelism];
        this.requestsHistory = new ArrayList(streamingParallelism);
        this.wsRequest = new Request.Builder().url(streamingUrl).header("Authorization", authToken).build();
        for (int i = 0; i < streamingParallelism; ++i) {
            StreamingApiListener streamingCallback = new StreamingApiListener(i + 1);
            this.wsClients[i] = this.client.newWebSocket(this.wsRequest, (WebSocketListener)streamingCallback);
            this.requestsHistory.add(new HashSet());
        }
    }

    public void sendRequest(@NotNull StreamingRequest request) {
        if (this.isTerminated) {
            throw new IllegalStateException("\u0421\u043e\u0435\u0434\u0438\u043d\u0435\u043d\u0438\u0435 \u0437\u0430\u043a\u0440\u044b\u0442\u043e");
        }
        try {
            String message = this.mapper.writeValueAsString((Object)request);
            int clientIndex = Math.abs(request.onOffPairId().hashCode()) % this.wsClients.length;
            WebSocket wsClient = this.wsClients[clientIndex];
            Set<StreamingRequest.ActivatingRequest> wsClientHistory = this.requestsHistory.get(clientIndex);
            wsClientHistory.removeIf(hr -> hr.onOffPairId().equals(request.onOffPairId()));
            if (request instanceof StreamingRequest.ActivatingRequest) {
                wsClientHistory.add((StreamingRequest.ActivatingRequest)request);
            }
            wsClient.send(message);
        }
        catch (JsonProcessingException ex) {
            this.logger.error("\u041d\u0435 \u0443\u0434\u0430\u043b\u043e\u0441\u044c \u0441\u0435\u0440\u0438\u0430\u043b\u0438\u0437\u043e\u0432\u0430\u0442\u044c \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u0435 \u0432 JSON", (Throwable)ex);
            throw new RuntimeException(ex);
        }
    }

    private void restore(@NotNull StreamingApiListener listener) throws Exception {
        WebSocket newWsClient;
        int id = listener.id;
        int index = listener.id - 1;
        WebSocket webSocket = Objects.requireNonNull(this.wsClients[index]);
        this.logger.info("\u041f\u043e\u043f\u044b\u0442\u043a\u0430 \u0432\u043e\u0441\u0441\u0442\u0430\u043d\u043e\u0432\u043b\u0435\u043d\u0438\u044f Streaming API \u043a\u043b\u0438\u0435\u043d\u0442\u0430 #" + id);
        webSocket.close(1000, null);
        Thread.sleep(1000L);
        this.wsClients[index] = newWsClient = this.client.newWebSocket(this.wsRequest, (WebSocketListener)listener);
        Set<StreamingRequest.ActivatingRequest> history = this.requestsHistory.get(index);
        this.logger.info("\u0423 \u043a\u043b\u0438\u0435\u043d\u0442\u0430 #" + id + " \u0430\u043a\u0442\u0438\u0432\u043d\u043e " + history.size() + " \u043f\u043e\u0434\u043f\u0438\u0441\u043e\u043a");
        for (StreamingRequest.ActivatingRequest request : history) {
            String message = this.mapper.writeValueAsString((Object)request);
            newWsClient.send(message);
        }
    }

    public void subscribe(Subscriber<? super StreamingEvent> s) {
        SubscriptionImpl sub = new SubscriptionImpl(s);
        this.subscriptions.add(sub);
        if (this.isTerminated) {
            sub.terminateDueTo(new IllegalStateException("\u0421\u043e\u0435\u0434\u0438\u043d\u0435\u043d\u0438\u0435 \u0437\u0430\u043a\u0440\u044b\u0442\u043e"));
        } else {
            sub.init();
        }
    }

    private final class SubscriptionImpl
    implements Subscription,
    Runnable {
        final Subscriber<? super StreamingEvent> subscriber;
        private boolean cancelled = false;
        private long demand = 0L;
        private final ConcurrentLinkedDeque<Signal> inboundSignals = new ConcurrentLinkedDeque();
        private final AtomicBoolean on = new AtomicBoolean(false);

        SubscriptionImpl(Subscriber<? super StreamingEvent> subscriber) {
            this.subscriber = subscriber;
        }

        private void doRequest(long n) {
            if (n < 1L) {
                this.terminateDueTo(new IllegalArgumentException(this.subscriber + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements."));
            } else {
                this.demand = this.demand + n < 1L ? Long.MAX_VALUE : (this.demand += n);
            }
        }

        private void doCancel() {
            this.cancelled = true;
            StreamingContextImpl.this.subscriptions.remove(this);
        }

        private void doSubscribe() {
            if (!this.cancelled) {
                try {
                    this.subscriber.onSubscribe((Subscription)this);
                }
                catch (Throwable t) {
                    this.terminateDueTo(new IllegalStateException(this.subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", t));
                }
            }
        }

        private void doSend(@NotNull StreamingEvent next) {
            try {
                this.subscriber.onNext((Object)next);
                --this.demand;
            }
            catch (Throwable t) {
                this.doCancel();
                new IllegalStateException(this.subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onNext or onComplete.", t).printStackTrace(System.err);
            }
        }

        private void terminateDueTo(Throwable t) {
            this.cancelled = true;
            try {
                this.subscriber.onError(t);
            }
            catch (Throwable t2) {
                new IllegalStateException(this.subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2).printStackTrace(System.err);
            }
        }

        private void signal(Signal signal) {
            if (signal instanceof Send) {
                this.inboundSignals.offerLast(signal);
            } else {
                this.inboundSignals.offerFirst(signal);
            }
            this.tryScheduleToExecute();
        }

        @Override
        public final void run() {
            if (this.on.get()) {
                try {
                    Signal s = this.inboundSignals.peek();
                    if (!this.cancelled) {
                        if (s instanceof Request) {
                            this.inboundSignals.poll();
                            this.doRequest(((Request)s).n);
                        } else if (s instanceof Send && this.demand > 0L) {
                            this.inboundSignals.poll();
                            this.doSend(((Send)s).payload);
                        } else if (s == Cancel.Instance) {
                            this.inboundSignals.poll();
                            this.doCancel();
                        } else if (s == Subscribe.Instance) {
                            this.inboundSignals.poll();
                            this.doSubscribe();
                        }
                    }
                }
                finally {
                    this.on.set(false);
                    if (!this.inboundSignals.isEmpty()) {
                        this.tryScheduleToExecute();
                    }
                }
            }
        }

        private void tryScheduleToExecute() {
            block6: {
                if (this.on.compareAndSet(false, true)) {
                    try {
                        StreamingContextImpl.this.executor.execute(this);
                    }
                    catch (Throwable t) {
                        if (this.cancelled) break block6;
                        this.doCancel();
                        try {
                            this.terminateDueTo(new IllegalStateException("Publisher terminated due to unavailable Executor.", t));
                        }
                        finally {
                            this.inboundSignals.clear();
                            this.on.set(false);
                        }
                    }
                }
            }
        }

        public void request(long n) {
            this.signal(new Request(n));
        }

        public void cancel() {
            this.signal(Cancel.Instance);
        }

        void init() {
            this.signal(Subscribe.Instance);
        }
    }

    static final class Request
    implements Signal {
        final long n;

        Request(long n) {
            this.n = n;
        }

        public String toString() {
            return "Signal.Request";
        }
    }

    static final class Send
    implements Signal {
        @NotNull
        final StreamingEvent payload;

        Send(@NotNull StreamingEvent payload) {
            this.payload = payload;
        }

        public String toString() {
            return "Signal.Send";
        }
    }

    static enum Subscribe implements Signal
    {
        Instance;


        public String toString() {
            return "Signal.Subscribe";
        }
    }

    static enum Cancel implements Signal
    {
        Instance;


        public String toString() {
            return "Signal.Cancel";
        }
    }

    static interface Signal {
    }

    private class StreamingApiListener
    extends WebSocketListener {
        final int id;

        StreamingApiListener(int id) {
            this.id = id;
        }

        public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) {
            super.onOpen(webSocket, response);
            StreamingContextImpl.this.logger.info("Streaming API \u043a\u043b\u0438\u0435\u043d\u0442 #" + this.id + " \u043f\u043e\u0434\u043a\u043b\u044e\u0447\u0451\u043d");
        }

        public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
            super.onMessage(webSocket, text);
            try {
                StreamingEvent event = (StreamingEvent)StreamingContextImpl.this.mapper.readValue(text, streamingEventTypeReference);
                Send signal = new Send(event);
                for (SubscriptionImpl sub : StreamingContextImpl.this.subscriptions) {
                    sub.signal(signal);
                }
            }
            catch (JsonProcessingException ex) {
                StreamingContextImpl.this.logger.error("\u041d\u0435 \u0443\u0434\u0430\u043b\u043e\u0441\u044c \u0434\u0435\u0441\u0435\u0440\u0438\u0430\u043b\u0438\u0437\u043e\u0432\u0430\u0442\u044c JSON \u043f\u0440\u0438\u0448\u0435\u0434\u0448\u0438\u0439 \u0438\u0437 Streaming API", (Throwable)ex);
            }
        }

        public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
            super.onClosed(webSocket, code, reason);
            StreamingContextImpl.this.logger.info("Streaming API #" + this.id + " \u043a\u043b\u0438\u0435\u043d\u0442 \u043e\u0441\u0442\u0430\u043d\u043e\u0432\u043b\u0435\u043d");
            for (Subscription sub : StreamingContextImpl.this.subscriptions) {
                sub.cancel();
            }
        }

        public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
            super.onClosing(webSocket, code, reason);
            StreamingContextImpl.this.logger.info("\u0421\u0435\u0440\u0432\u0435\u0440 Streaming API \u0438\u043d\u0438\u0446\u0438\u0438\u0440\u043e\u0432\u0430\u043b \u043e\u0441\u0442\u0430\u043d\u043e\u0432\u043a\u0443 \u0434\u043b\u044f \u043a\u043b\u0438\u0435\u043d\u0442\u0430 #" + this.id);
        }

        public void onMessage(@NotNull WebSocket webSocket, @NotNull ByteString bytes) {
            super.onMessage(webSocket, bytes);
            StreamingContextImpl.this.logger.warn("Streaming API #" + this.id + " \u043a\u043b\u0438\u0435\u043d\u0442 \u043f\u043e\u043b\u0443\u0447\u0438\u043b \u0431\u0430\u0439\u0442\u043e\u0432\u044b\u0439 \u0442\u0438\u043f \u0441\u043e\u043e\u0431\u0449\u0435\u043d\u0438\u044f!");
        }

        public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, @Nullable Response response) {
            int responseCode;
            super.onFailure(webSocket, t, response);
            if (response != null && ((responseCode = response.code()) == 401 || responseCode == 403)) {
                StreamingContextImpl.this.isTerminated = true;
                StreamingContextImpl.this.logger.error("\u0414\u043b\u044f Streaming API \u043f\u0435\u0440\u0435\u0434\u0430\u043d \u043d\u0435\u0432\u0435\u0440\u043d\u044b\u0439 \u0442\u043e\u043a\u0435\u043d.", t);
                for (WebSocket ws : StreamingContextImpl.this.wsClients) {
                    ws.close(1000, null);
                }
                for (Subscription sub : StreamingContextImpl.this.subscriptions) {
                    sub.cancel();
                }
                return;
            }
            try {
                StreamingContextImpl.this.logger.error("\u0427\u0442\u043e-\u0442\u043e \u043f\u0440\u043e\u0438\u0437\u043e\u0448\u043b\u043e \u0432 Streaming API \u043a\u043b\u0438\u0435\u043d\u0442\u0435 #" + this.id, t);
                StreamingContextImpl.this.restore(this);
            }
            catch (Exception ex) {
                StreamingContextImpl.this.isTerminated = true;
                for (Subscription sub : StreamingContextImpl.this.subscriptions) {
                    sub.cancel();
                }
                StreamingContextImpl.this.logger.error("\u041f\u0440\u0438 \u0432\u043e\u0441\u0441\u0442\u0430\u043d\u043e\u0432\u043b\u0435\u043d\u0438\u0438 Streaming API \u043a\u043b\u0438\u0435\u043d\u0442\u0430 #" + this.id + " \u0447\u0442\u043e-\u0442\u043e \u043f\u0440\u043e\u0438\u0437\u043e\u0448\u043b\u043e", (Throwable)ex);
            }
        }
    }
}

