/*
 * Decompiled with CFR 0.152.
 */
package com.slimgears.rxrpc.jettyhttp;

import com.slimgears.rxrpc.core.RxTransport;
import com.slimgears.rxrpc.core.util.Emitters;
import com.slimgears.rxrpc.jettyhttp.JettyHttpAttributes;
import com.slimgears.util.rx.Completables;
import io.reactivex.Completable;
import io.reactivex.Emitter;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.subjects.BehaviorSubject;
import io.reactivex.subjects.Subject;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.util.StringContentProvider;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JettyHttpRxTransportClient
implements RxTransport {
    private static final Logger log = LoggerFactory.getLogger(JettyHttpRxTransportClient.class);
    private static final Duration pollingPeriod = JettyHttpAttributes.ClientPollingPeriod;
    private final Subject<String> incoming = BehaviorSubject.create();
    private final Subject<String> outgoingSubject = BehaviorSubject.create();
    private final Emitter<String> outgoingEmitter = Emitters.fromObserver(this.outgoingSubject);
    private final Disposable outgoingSubscription;
    private final Disposable pollingSubscription;
    private final HttpClient httpClient;
    private final URI uri;
    private final String clientId;

    public JettyHttpRxTransportClient(HttpClient httpClient, URI uri, String clientId) {
        this.httpClient = httpClient;
        this.uri = uri;
        this.clientId = clientId;
        this.outgoingSubscription = this.outgoingSubject.subscribe(this::sendMessage, arg_0 -> this.incoming.onError(arg_0));
        this.pollingSubscription = Observable.interval((long)pollingPeriod.toMillis(), (TimeUnit)TimeUnit.MILLISECONDS).flatMapCompletable(i -> this.poll()).compose(Completables.backOffDelayRetry(e -> true, (Duration)JettyHttpAttributes.ClientPollingRetryInitialDelay, (int)10)).subscribe(() -> this.incoming.onComplete(), arg_0 -> this.incoming.onError(arg_0));
    }

    private void sendMessage(String message) {
        this.httpClient.POST(URI.create(this.uri + "/message")).header("X-RPC-CLIENT-ID", this.clientId).content(new StringContentProvider(message), "text/plain").send(result -> {
            if (result.isFailed()) {
                this.outgoingEmitter.onError(result.getFailure());
            }
        });
    }

    public Emitter<String> outgoing() {
        return this.outgoingEmitter;
    }

    public Subject<String> incoming() {
        return this.incoming;
    }

    public void close() {
        this.outgoingSubscription.dispose();
        this.pollingSubscription.dispose();
        this.incoming().onComplete();
    }

    private Completable poll() {
        return Completable.create(emitter -> this.httpClient.POST(URI.create(this.uri + "/polling")).header("X-RPC-CLIENT-ID", this.clientId).onResponseContent((response, content) -> Arrays.stream(StandardCharsets.UTF_8.decode(content).toString().split("\n")).forEach(arg_0 -> this.incoming.onNext(arg_0))).send(result -> {
            if (result.isSucceeded()) {
                emitter.onComplete();
            } else {
                emitter.onError(result.getFailure());
            }
        }));
    }

    public static Builder builder() {
        return new Builder();
    }

    public static class Client
    implements RxTransport.Client {
        private final HttpClient httpClient;

        public Client(SslContextFactory sslContextFactory) {
            this.httpClient = new HttpClient(sslContextFactory);
        }

        public Single<RxTransport> connect(URI uri) {
            return Single.create(emitter -> {
                try {
                    this.httpClient.start();
                    this.httpClient.POST(URI.create(uri + "/connect")).onResponseHeader((response, field) -> {
                        if (field.getName().equals("X-RPC-CLIENT-ID")) {
                            String clientId = field.getValue();
                            emitter.onSuccess((Object)new JettyHttpRxTransportClient(this.httpClient, uri, clientId));
                        }
                        return true;
                    }).send(result -> {
                        if (result.isFailed()) {
                            log.error("Could not connect to: " + uri, result.getFailure());
                            emitter.onError(result.getFailure());
                        }
                    });
                }
                catch (Exception e) {
                    log.error("Could not start the httpClient", (Throwable)e);
                    emitter.onError((Throwable)e);
                }
            });
        }
    }

    public static class Builder {
        public Client buildClient(SslContextFactory contextFactory) {
            return new Client(contextFactory);
        }

        public Client buildClient() {
            return this.buildClient(new SslContextFactory.Client());
        }
    }
}

