/*
 * Decompiled with CFR 0.152.
 */
package com.slimgears.rxrpc.jettyhttp;

import com.slimgears.rxrpc.core.RxTransport;
import com.slimgears.rxrpc.core.util.Emitters;
import com.slimgears.rxrpc.jettyhttp.JettyHttpAttributes;
import com.slimgears.util.stream.Streams;
import io.reactivex.Completable;
import io.reactivex.Emitter;
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.Disposables;
import io.reactivex.subjects.BehaviorSubject;
import io.reactivex.subjects.Subject;
import java.io.IOException;
import java.math.BigInteger;
import java.security.SecureRandom;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

public class JettyHttpRxTransportServer
implements RxTransport {
    private final Subject<String> outgoingSubject = BehaviorSubject.create();
    private final Emitter<String> outgoing = Emitters.fromObserver(this.outgoingSubject);
    private final Subject<String> incoming = BehaviorSubject.create();
    private final AtomicReference<Queue<String>> messageQueue = new AtomicReference(new LinkedList());
    private final Disposable outgoingSubscription;
    private final AtomicReference<Disposable> disconnectSubscription = new AtomicReference<Disposable>(Disposables.empty());

    public JettyHttpRxTransportServer() {
        this.outgoingSubscription = this.outgoingSubject.subscribe(this::onMessage, arg_0 -> this.incoming.onError(arg_0));
    }

    private void onMessage(String message) {
        this.messageQueue.get().add(message);
    }

    public Iterable<String> dequePendingMessages() {
        Disposable previousSubscription = this.disconnectSubscription.getAndSet(Completable.timer((long)JettyHttpAttributes.ServerKeepAliveTimeout.toMillis(), (TimeUnit)TimeUnit.MILLISECONDS).subscribe(this::close));
        previousSubscription.dispose();
        if (this.messageQueue.get().isEmpty()) {
            return Collections.emptyList();
        }
        return this.messageQueue.getAndSet(new LinkedList());
    }

    public Emitter<String> outgoing() {
        return this.outgoing;
    }

    public Subject<String> incoming() {
        return this.incoming;
    }

    public void close() {
        this.disconnectSubscription.get().dispose();
        this.outgoingSubscription.dispose();
        this.incoming().onComplete();
    }

    public static Builder builder() {
        return new Builder();
    }

    public static class Server
    extends HttpServlet
    implements RxTransport.Server {
        private final Subject<RxTransport> connections = BehaviorSubject.create();
        private final Map<String, JettyHttpRxTransportServer> transportMap = new ConcurrentHashMap<String, JettyHttpRxTransportServer>();

        private Server() {
        }

        @Override
        protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
            String reqURI = req.getRequestURI();
            String reqType = reqURI.substring(reqURI.lastIndexOf(47) + 1);
            String clientId = req.getHeader("X-RPC-CLIENT-ID");
            String message = req.getReader().readLine();
            switch (reqType) {
                case "connect": {
                    this.doConnect(resp);
                    break;
                }
                case "disconnect": {
                    this.doDisconnect(clientId, resp);
                    break;
                }
                case "message": {
                    this.doMessage(clientId, message);
                    break;
                }
                case "polling": {
                    this.doPoll(clientId, resp);
                    break;
                }
                default: {
                    resp.setStatus(400);
                }
            }
        }

        private JettyHttpRxTransportServer transportById(String clientId) {
            return Optional.ofNullable(this.transportMap.get(clientId)).orElseThrow(() -> new RuntimeException("Could not find client " + clientId));
        }

        private void doPoll(String clientId, HttpServletResponse response) {
            Iterable<String> messages = this.transportById(clientId).dequePendingMessages();
            Streams.fromIterable(messages).forEach(msg -> {
                try {
                    response.getWriter().write(msg + "\n");
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
        }

        private void doConnect(HttpServletResponse response) {
            String id = new BigInteger(128, new SecureRandom()).toString(64);
            JettyHttpRxTransportServer transport = new JettyHttpRxTransportServer();
            this.transportMap.put(id, transport);
            this.connections.onNext((Object)transport);
            response.addHeader("X-RPC-CLIENT-ID", id);
        }

        private void doDisconnect(String clientId, HttpServletResponse response) {
            JettyHttpRxTransportServer transport = this.transportMap.remove(clientId);
            if (transport == null) {
                response.setStatus(400);
            } else {
                transport.close();
            }
        }

        private void doMessage(String clientId, String message) {
            this.transportById(clientId).incoming().onNext((Object)message);
        }

        public Observable<RxTransport> connections() {
            return this.connections;
        }
    }

    public static class Builder {
        public Server buildServer() {
            return new Server();
        }
    }
}

