/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.service.http.impl.service.ws;

import com.mulesoft.service.http.impl.service.ws.WebSocketUtils;
import java.io.InputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.metadata.MediaType;
import org.mule.runtime.api.metadata.MediaTypeUtils;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.util.concurrent.Latch;
import org.mule.runtime.core.api.retry.policy.NoRetryPolicyTemplate;
import org.mule.runtime.core.api.retry.policy.RetryPolicyTemplate;
import org.mule.runtime.http.api.ws.WebSocket;
import org.mule.runtime.http.api.ws.WebSocketBroadcaster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GrizzlyWebSocketBroadcaster
implements WebSocketBroadcaster {
    private static final Logger LOGGER = LoggerFactory.getLogger(GrizzlyWebSocketBroadcaster.class);

    public CompletableFuture<Void> broadcast(Collection<WebSocket> sockets, TypedValue<InputStream> content, BiConsumer<WebSocket, Throwable> errorCallback) {
        return this.broadcast(sockets, content, errorCallback, (RetryPolicyTemplate)new NoRetryPolicyTemplate(), null);
    }

    public CompletableFuture<Void> broadcast(Collection<WebSocket> sockets, TypedValue<InputStream> content, BiConsumer<WebSocket, Throwable> errorCallback, RetryPolicyTemplate retryPolicyTemplate, Scheduler reconnectionScheduler) {
        return new BroadcastAction(sockets, content, errorCallback, retryPolicyTemplate, reconnectionScheduler).broadcast();
    }

    private class BroadcastAction {
        private final Collection<WebSocket> sockets;
        private final TypedValue<InputStream> content;
        private final BiConsumer<WebSocket, Throwable> errorCallback;
        private final Set<String> failedSockets = Collections.newSetFromMap(new ConcurrentHashMap());
        private final FrameFactory frameFactory;
        private final int frameSize = 8192;
        private final RetryPolicyTemplate retryPolicyTemplate;
        private final Scheduler reconnectionScheduler;
        private boolean streaming = false;

        public BroadcastAction(Collection<WebSocket> sockets, TypedValue<InputStream> content, BiConsumer<WebSocket, Throwable> errorCallback, RetryPolicyTemplate retryPolicyTemplate, Scheduler reconnectionScheduler) {
            this.sockets = sockets;
            this.content = content;
            this.errorCallback = errorCallback;
            this.retryPolicyTemplate = retryPolicyTemplate;
            this.reconnectionScheduler = reconnectionScheduler;
            this.frameFactory = MediaTypeUtils.isStringRepresentable((MediaType)content.getDataType().getMediaType()) ? new TextFrameFactory() : new BinaryFrameFactory();
        }

        private CompletableFuture<Void> broadcast() {
            byte[] readBuffer = new byte[8192];
            byte[] writeBuffer = new byte[8192];
            int write = 0;
            CompletionStage composedFuture = null;
            Latch latch = new Latch();
            InputStream value = (InputStream)this.content.getValue();
            try {
                int read;
                while (!this.allSocketsFailed() && (read = value.read(readBuffer, 0, readBuffer.length)) != -1) {
                    if (write > 0) {
                        this.streaming = true;
                        int len = write;
                        CompletionStage future = this.doBroadcast(this.sockets, writeBuffer, (ws, data) -> this.frameFactory.asFragment((WebSocket)ws, (byte[])data, 0, len, false)).whenComplete((v, e) -> {
                            if (e != null) {
                                latch.release();
                            }
                        });
                        composedFuture = composedFuture == null ? future : composedFuture.thenCompose(arg_0 -> BroadcastAction.lambda$broadcast$2((CompletableFuture)future, arg_0));
                    }
                    System.arraycopy(readBuffer, 0, writeBuffer, 0, read);
                    write = read;
                }
                if (composedFuture != null) {
                    composedFuture.whenComplete((v, e) -> latch.release());
                    latch.await();
                }
                if (write == 0 || this.allSocketsFailed()) {
                    this.logBroadcastCompleted();
                    return CompletableFuture.completedFuture(null);
                }
                if (write < writeBuffer.length) {
                    byte[] exactSize = writeBuffer;
                    writeBuffer = new byte[write];
                    System.arraycopy(exactSize, 0, writeBuffer, 0, write);
                }
                return this.doBroadcast(this.sockets, writeBuffer, (ws, data) -> this.streaming ? this.frameFactory.asFragment((WebSocket)ws, (byte[])data, true) : this.frameFactory.asFrame((WebSocket)ws, (byte[])data)).whenComplete((v, e) -> this.logBroadcastCompleted());
            }
            catch (Throwable t) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Could not broadcast message: " + t.getMessage() + ". Target sockets were: " + this.socketsToString(), t);
                }
                return WebSocketUtils.failedFuture((Throwable)new MuleRuntimeException(I18nMessageFactory.createStaticMessage((String)("Could not perform broadcast: " + t.getMessage())), t));
            }
        }

        private CompletableFuture<Void> doBroadcast(Collection<WebSocket> sockets, byte[] data, BiFunction<WebSocket, byte[], byte[]> frameFactory) {
            byte[] frame = null;
            int totalSockets = sockets.size();
            AtomicInteger socketCount = new AtomicInteger(0);
            CompletableFuture<Void> sink = new CompletableFuture<Void>();
            for (WebSocket ws : sockets) {
                try {
                    WebSocket socket = this.assureConnected(ws, this.retryPolicyTemplate.isEnabled());
                    if (socket == null) {
                        this.incrementAndComplete(socketCount, totalSockets, sink);
                        continue;
                    }
                    if (frame == null) {
                        frame = frameFactory.apply(socket, data);
                    }
                    socket.sendFrame(frame).whenComplete((v, e) -> {
                        this.incrementAndComplete(socketCount, totalSockets, sink);
                        if (e != null) {
                            this.handleSocketError(socket, (Throwable)e);
                        }
                    });
                }
                catch (Throwable t) {
                    this.incrementAndComplete(socketCount, totalSockets, sink);
                    this.handleSocketError(ws, t);
                }
            }
            return sink;
        }

        private void incrementAndComplete(AtomicInteger count, int top, CompletableFuture<Void> sink) {
            if (count.addAndGet(1) >= top) {
                sink.complete(null);
            }
        }

        private void handleSocketError(WebSocket socket, Throwable e) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Found exception while broadcasting to WebSocket. " + e.getMessage() + ". Socket was: " + socket.toString(), e);
            }
            this.failedSockets.add(socket.getId());
            this.errorCallback.accept(socket, e);
        }

        private WebSocket assureConnected(WebSocket socket, boolean reconnect) {
            if (this.failedSockets.contains(socket.getId())) {
                return null;
            }
            if (socket.isConnected()) {
                return socket;
            }
            if (reconnect && !socket.isClosed()) {
                if (socket.supportsReconnection()) {
                    try {
                        return this.assureConnected((WebSocket)socket.reconnect(this.retryPolicyTemplate, this.reconnectionScheduler).get(), false);
                    }
                    catch (ExecutionException e) {
                        LOGGER.error(String.format("WebSocket '%s' found exception during reconnection. Will skip from broadcast", socket.getId()), (Throwable)e);
                        return null;
                    }
                    catch (InterruptedException e) {
                        LOGGER.error(String.format("WebSocket '%s' got interrupted during reconnection. Will skip from broadcast", socket.getId()), (Throwable)e);
                    }
                } else {
                    LOGGER.info("WebSocket '{}' is not connected and is not reconnectable. Will skip from broadcast", (Object)socket.getId());
                }
            } else {
                LOGGER.info("WebSocket '{}' is not connected. Will skip from broadcast", (Object)socket.getId());
            }
            return null;
        }

        private boolean allSocketsFailed() {
            return this.failedSockets.size() >= this.sockets.size();
        }

        private void logBroadcastCompleted() {
            if (LOGGER.isDebugEnabled()) {
                String recipients = "Recipient list was: " + this.socketsToString();
                if (this.failedSockets.isEmpty()) {
                    LOGGER.debug("Broadcast successful to all target WebSockets. " + recipients);
                } else {
                    String failed = this.failedSockets.stream().collect(Collectors.joining(", "));
                    LOGGER.debug("Broadcast completed, but delivery to the following WebSockets failed: {}. {}", (Object)failed, (Object)recipients);
                }
            }
        }

        private String socketsToString() {
            return this.sockets.stream().map(WebSocket::getId).collect(Collectors.joining(", "));
        }

        private static /* synthetic */ CompletionStage lambda$broadcast$2(CompletableFuture future, Void v) {
            return future;
        }
    }

    private class BinaryFrameFactory
    implements FrameFactory {
        private BinaryFrameFactory() {
        }

        @Override
        public byte[] asFrame(WebSocket socket, byte[] bytes, int offset, int len) {
            return socket.toBinaryFrame(this.slice(bytes, offset, len), true);
        }

        @Override
        public byte[] asFragment(WebSocket socket, byte[] bytes, int offset, int len, boolean last) {
            return socket.toBinaryFrame(this.slice(bytes, offset, len), last);
        }

        private byte[] slice(byte[] bytes, int offset, int len) {
            if (bytes.length == len) {
                return bytes;
            }
            byte[] slice = new byte[len];
            System.arraycopy(bytes, offset, slice, 0, len);
            return slice;
        }
    }

    private class TextFrameFactory
    implements FrameFactory {
        private TextFrameFactory() {
        }

        @Override
        public byte[] asFrame(WebSocket socket, byte[] bytes, int offset, int len) {
            return socket.toTextFrame(new String(bytes, offset, len), true);
        }

        @Override
        public byte[] asFragment(WebSocket socket, byte[] bytes, int offset, int len, boolean last) {
            return socket.toTextFrame(new String(bytes, offset, len), last);
        }
    }

    private static interface FrameFactory {
        default public byte[] asFrame(WebSocket socket, byte[] bytes) {
            return this.asFrame(socket, bytes, 0, bytes.length);
        }

        default public byte[] asFragment(WebSocket socket, byte[] bytes, boolean last) {
            return this.asFragment(socket, bytes, 0, bytes.length, last);
        }

        public byte[] asFrame(WebSocket var1, byte[] var2, int var3, int var4);

        public byte[] asFragment(WebSocket var1, byte[] var2, int var3, int var4, boolean var5);
    }
}

