/*
 * Decompiled with CFR 0.152.
 */
package com.slimgears.rxrpc.jettywebsocket;

import com.slimgears.rxrpc.core.RxTransport;
import com.slimgears.rxrpc.core.util.Emitters;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Emitter;
import io.reactivex.FlowableSubscriber;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.Disposables;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.BehaviorSubject;
import io.reactivex.subjects.CompletableSubject;
import io.reactivex.subjects.Subject;
import io.reactivex.subscribers.DisposableSubscriber;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.client.WebSocketUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JettyWebSocketRxTransport
implements RxTransport,
WebSocketListener {
    private static final Logger log = LoggerFactory.getLogger(JettyWebSocketRxTransport.class);
    private final AtomicReference<Disposable> disposable = new AtomicReference<Disposable>(Disposables.empty());
    private final Emitter<String> outgoing;
    private final Subject<String> outgoingSubject = BehaviorSubject.create();
    private final Subject<String> incoming = BehaviorSubject.create();
    private final CompletableSubject connected = CompletableSubject.create();

    public JettyWebSocketRxTransport() {
        this.outgoing = Emitters.fromObserver(this.outgoingSubject);
    }

    public Emitter<String> outgoing() {
        return this.outgoing;
    }

    public Subject<String> incoming() {
        return this.incoming;
    }

    @Override
    public void onWebSocketBinary(byte[] payload, int offset, int len) {
    }

    @Override
    public void onWebSocketText(String message) {
        this.incoming.onNext((Object)message);
    }

    @Override
    public void onWebSocketClose(int statusCode, String reason) {
        JettyWebSocketRxTransport.completeIfNotTerminated(this.incoming);
    }

    @Override
    public synchronized void onWebSocketConnect(Session session) {
        this.disposable.getAndSet(this.subscribeOutgoing(session));
        JettyWebSocketRxTransport.completeIfNotTerminated(this.connected);
    }

    @Override
    public void onWebSocketError(Throwable cause) {
        JettyWebSocketRxTransport.errorIfNotTerminated(this.connected, cause);
        JettyWebSocketRxTransport.errorIfNotTerminated(this.incoming, cause);
    }

    public void close() {
        this.outgoing().onComplete();
        this.incoming().onComplete();
    }

    private static boolean isTerminated(Subject<?> subject) {
        return subject.hasComplete() || subject.hasThrowable();
    }

    private static boolean isTerminated(CompletableSubject subject) {
        return subject.hasComplete() || subject.hasThrowable();
    }

    private static void errorIfNotTerminated(CompletableSubject subject, Throwable error) {
        if (!JettyWebSocketRxTransport.isTerminated(subject)) {
            subject.onError(error);
        }
    }

    private static void completeIfNotTerminated(CompletableSubject subject) {
        if (!JettyWebSocketRxTransport.isTerminated(subject)) {
            subject.onComplete();
        }
    }

    private static void errorIfNotTerminated(Subject<?> subject, Throwable error) {
        if (!JettyWebSocketRxTransport.isTerminated(subject)) {
            subject.onError(error);
        }
    }

    private static void completeIfNotTerminated(Subject<?> subject) {
        if (!JettyWebSocketRxTransport.isTerminated(subject)) {
            subject.onComplete();
        }
    }

    private Disposable subscribeOutgoing(final Session session) {
        DisposableSubscriber<String> subscriber = new DisposableSubscriber<String>(){

            public void onNext(String s) {
                session.getRemote().sendString(s, new WriteCallback(){

                    @Override
                    public void writeFailed(Throwable x) {
                        this.onError(x);
                    }

                    @Override
                    public void writeSuccess() {
                        this.request(1L);
                    }
                });
            }

            public void onError(Throwable t) {
                session.close(1006, t.getMessage());
            }

            public void onComplete() {
                session.close();
            }

            public void onStart() {
                this.request(1L);
            }
        };
        this.outgoingSubject.toFlowable(BackpressureStrategy.BUFFER).observeOn(Schedulers.io()).subscribe((FlowableSubscriber)subscriber);
        return subscriber;
    }

    public static ClientBuilder clientBuilder() {
        return new ClientBuilder();
    }

    public static ServerBuilder serverBuilder() {
        return new ServerBuilder();
    }

    public static class Client
    implements RxTransport.Client {
        private final Supplier<HttpClient> httpClientFactory;
        private final Consumer<WebSocketPolicy> policyConfigurator;
        private final Consumer<ClientUpgradeRequest> requestConfigurator;

        private Client(Supplier<HttpClient> httpClientFactory, Consumer<WebSocketPolicy> policyConfigurator, Consumer<ClientUpgradeRequest> requestConfigurator) {
            this.policyConfigurator = policyConfigurator;
            this.httpClientFactory = httpClientFactory;
            this.requestConfigurator = requestConfigurator;
        }

        public Single<RxTransport> connect(URI uri) {
            try {
                HttpClient httpClient = this.httpClientFactory.get();
                boolean isStarted = httpClient.isStarted();
                if (!isStarted) {
                    httpClient.start();
                }
                WebSocketClient webSocketClient = new WebSocketClient(httpClient);
                this.policyConfigurator.accept(webSocketClient.getPolicy());
                webSocketClient.start();
                JettyWebSocketRxTransport transport = new JettyWebSocketRxTransport();
                WebSocketUpgradeRequest webSocketUpgradeRequest = new WebSocketUpgradeRequest(webSocketClient, httpClient, uri, transport);
                ClientUpgradeRequest request = new ClientUpgradeRequest(webSocketUpgradeRequest);
                request.getCookies().addAll(httpClient.getCookieStore().getCookies());
                this.requestConfigurator.accept(request);
                webSocketClient.connect(transport, uri, request);
                transport.incoming().doFinally(() -> {
                    webSocketClient.stop();
                    if (!isStarted) {
                        httpClient.stop();
                    }
                }).subscribe();
                return transport.connected.toSingle(() -> transport);
            }
            catch (Exception e) {
                log.error("Could not connect to: " + uri, (Throwable)e);
                return Single.error((Throwable)e);
            }
        }
    }

    public static class Server
    extends WebSocketServlet
    implements RxTransport.Server {
        private final Subject<RxTransport> connections = BehaviorSubject.create();
        private final Consumer<WebSocketPolicy> policyConfigurator;

        private Server(Consumer<WebSocketPolicy> policyConfigurator) {
            this.policyConfigurator = policyConfigurator;
        }

        @Override
        public void configure(WebSocketServletFactory factory) {
            this.policyConfigurator.accept(factory.getPolicy());
            factory.setCreator((servletUpgradeRequest, servletUpgradeResponse) -> {
                JettyWebSocketRxTransport transport = new JettyWebSocketRxTransport();
                this.connections.onNext((Object)transport);
                return transport;
            });
        }

        public Observable<RxTransport> connections() {
            return this.connections;
        }
    }

    public static class ClientBuilder
    extends Builder<ClientBuilder> {
        private Supplier<SslContextFactory> sslContextFactorySupplier = SslContextFactory.Client::new;
        private Supplier<HttpClient> httpClientSupplier = () -> new HttpClient(this.sslContextFactorySupplier.get());
        private Consumer<ClientUpgradeRequest> requestConfigurator = request -> {};

        public ClientBuilder configureRequest(Consumer<ClientUpgradeRequest> requestConfigurator) {
            this.requestConfigurator = this.requestConfigurator.andThen(requestConfigurator);
            return this;
        }

        public ClientBuilder sslContextFactory(Supplier<SslContextFactory> contextFactorySupplier) {
            this.sslContextFactorySupplier = contextFactorySupplier;
            return this;
        }

        public ClientBuilder sslContextFactory(SslContextFactory contextFactory) {
            return this.sslContextFactory(() -> contextFactory);
        }

        public ClientBuilder httpClient(Supplier<HttpClient> httpClientSupplier) {
            this.httpClientSupplier = httpClientSupplier;
            return this;
        }

        public ClientBuilder httpClient(HttpClient httpClient) {
            return this.httpClient(() -> httpClient);
        }

        public Client build() {
            return new Client(this.httpClientSupplier, (Consumer)this.policyConfigurator.get(), this.requestConfigurator);
        }
    }

    public static class ServerBuilder
    extends Builder<ServerBuilder> {
        public Server build() {
            return new Server((Consumer)this.policyConfigurator.get());
        }
    }

    public static class Builder<B extends Builder<B>> {
        protected final AtomicReference<Consumer<WebSocketPolicy>> policyConfigurator = new AtomicReference<Consumer<WebSocketPolicy>>(p -> {});

        protected B self() {
            return (B)this;
        }

        public B idleTimeout(Duration idleTimeout) {
            return this.addPolicyConfig(p -> p.setIdleTimeout(idleTimeout.toMillis()));
        }

        public B inputBufferSize(int bytes) {
            return this.addPolicyConfig(p -> p.setInputBufferSize(bytes));
        }

        public B outputBufferSize(int bytes) {
            return this.addPolicyConfig(p -> p.setMaxTextMessageBufferSize(bytes), p -> p.setMaxTextMessageSize(bytes));
        }

        @SafeVarargs
        private final B addPolicyConfig(Consumer<WebSocketPolicy> ... config) {
            this.policyConfigurator.updateAndGet(pc -> Stream.concat(Stream.of(pc), Stream.of(config)).reduce(Consumer::andThen).orElse(p -> {}));
            return this.self();
        }
    }
}

