/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.connectors.ws.internal.connection;

import com.mulesoft.connectors.ws.internal.connection.FluxCapacitor;
import com.mulesoft.connectors.ws.internal.util.SynchronizedWebSocketDecorator;
import java.io.InputStream;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import org.mule.runtime.api.metadata.MediaType;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.retry.policy.RetryPolicyTemplate;
import org.mule.runtime.core.api.util.func.CheckedSupplier;
import org.mule.runtime.http.api.ws.WebSocket;
import org.mule.runtime.http.api.ws.WebSocketCloseCode;

class ReconnectableWebSocketDecorator
extends SynchronizedWebSocketDecorator {
    private final FluxCapacitor fluxCapacitor;
    private final AtomicReference<CompletableFuture<WebSocket>> ongoingReconnection = new AtomicReference<Object>(null);
    private final Condition reconnectionEnded = this.socketLock.newCondition();

    public ReconnectableWebSocketDecorator(WebSocket delegate, FluxCapacitor fluxCapacitor) {
        super(delegate);
        this.fluxCapacitor = fluxCapacitor;
    }

    @Override
    public CompletableFuture<Void> send(InputStream content, MediaType mediaType) {
        return (CompletableFuture)this.reconnectionSafe(() -> super.send(content, mediaType));
    }

    @Override
    public CompletableFuture<Void> sendFrame(byte[] frameBytes) {
        return (CompletableFuture)this.reconnectionSafe(() -> super.sendFrame(frameBytes));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<WebSocket> reconnect(RetryPolicyTemplate retryPolicyTemplate, Scheduler scheduler) {
        CompletableFuture<Object> f;
        if (!this.delegate.supportsReconnection()) {
            this.fluxCapacitor.notifyClosed(this.delegate, WebSocketCloseCode.PROTOCOL_ERROR, "", true);
            return super.reconnect(retryPolicyTemplate, scheduler);
        }
        this.socketLock.lock();
        try {
            f = this.ongoingReconnection.get();
            if (f != null) {
                CompletableFuture<WebSocket> completableFuture = f;
                return completableFuture;
            }
            f = new CompletableFuture();
            this.ongoingReconnection.set(f);
        }
        finally {
            this.socketLock.unlock();
        }
        CompletableFuture<Object> effectiveFuture = f;
        this.delegate.reconnect(retryPolicyTemplate, scheduler).whenComplete((newSocket, e) -> {
            this.socketLock.lock();
            try {
                if (newSocket != null) {
                    this.delegate = newSocket;
                }
            }
            finally {
                this.ongoingReconnection.set(null);
                this.reconnectionEnded.signalAll();
                this.socketLock.unlock();
            }
            if (e != null) {
                effectiveFuture.completeExceptionally((Throwable)e);
            } else {
                effectiveFuture.complete(this);
            }
        });
        return effectiveFuture;
    }

    private void awaitReconnection() {
        try {
            while (this.ongoingReconnection.get() != null) {
                this.reconnectionEnded.await();
            }
        }
        catch (InterruptedException e) {
            throw new IllegalStateException(String.format("Socket '%s' Interrupted while waiting for reconnection to finish", this.getId()), e);
        }
    }

    private <T> T reconnectionSafe(CheckedSupplier<T> action) {
        this.socketLock.lock();
        try {
            this.awaitReconnection();
            Object object = action.get();
            return (T)object;
        }
        finally {
            this.socketLock.unlock();
        }
    }
}

