/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.service.http.impl.netty;

import com.mulesoft.service.http.impl.service.ws.WebSocketUtils;
import java.io.InputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.metadata.MediaType;
import org.mule.runtime.api.metadata.MediaTypeUtils;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.retry.policy.RetryPolicyTemplate;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.http.api.ws.WebSocket;
import org.mule.runtime.http.api.ws.WebSocketBroadcaster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyWebSocketBroadcaster
implements WebSocketBroadcaster {
    private static final Logger LOGGER = LoggerFactory.getLogger(NettyWebSocketBroadcaster.class);

    public CompletableFuture<Void> broadcast(Collection<WebSocket> sockets, TypedValue<InputStream> content, BiConsumer<WebSocket, Throwable> errorCallback) {
        return this.broadcast(sockets, content, errorCallback, RetryPolicyTemplate.NO_RETRY_POLICY, null);
    }

    public CompletableFuture<Void> broadcast(Collection<WebSocket> sockets, TypedValue<InputStream> content, BiConsumer<WebSocket, Throwable> errorCallback, RetryPolicyTemplate retryPolicyTemplate, Scheduler reconnectionScheduler) {
        return new BroadcastAction(sockets, content, errorCallback, retryPolicyTemplate, reconnectionScheduler).broadcast();
    }

    public CompletableFuture<Void> broadcast(Collection<WebSocket> sockets, TypedValue<InputStream> content, BiConsumer<WebSocket, Throwable> errorCallback, org.mule.runtime.core.api.retry.policy.RetryPolicyTemplate retryPolicyTemplate, Scheduler reconnectionScheduler) {
        return this.broadcast(sockets, content, errorCallback, (RetryPolicyTemplate)retryPolicyTemplate, reconnectionScheduler);
    }

    private static class BroadcastAction {
        private final Collection<WebSocket> sockets;
        private final TypedValue<InputStream> content;
        private final BiConsumer<WebSocket, Throwable> errorCallback;
        private final Set<String> failedSockets = Collections.newSetFromMap(new ConcurrentHashMap());
        private final RetryPolicyTemplate retryPolicyTemplate;
        private final Scheduler reconnectionScheduler;
        private final boolean isText;

        public BroadcastAction(Collection<WebSocket> sockets, TypedValue<InputStream> content, BiConsumer<WebSocket, Throwable> errorCallback, RetryPolicyTemplate retryPolicyTemplate, Scheduler reconnectionScheduler) {
            this.sockets = sockets;
            this.content = content;
            this.errorCallback = errorCallback;
            this.retryPolicyTemplate = retryPolicyTemplate;
            this.reconnectionScheduler = reconnectionScheduler;
            this.isText = MediaTypeUtils.isStringRepresentable((MediaType)content.getDataType().getMediaType());
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Creating broadcast action. Sockets: [{}]; isText={}", (Object)this.socketsToString(), (Object)this.isText);
            }
        }

        private CompletableFuture<Void> broadcast() {
            byte[] readBuffer = new byte[8192];
            byte[] previousChunk = new byte[8192];
            int previousChunkLength = 0;
            boolean isEndOfStreamReached = false;
            InputStream value = (InputStream)this.content.getValue();
            try {
                while (!this.allSocketsFailed() && !isEndOfStreamReached) {
                    int readLen = value.read(readBuffer, 0, readBuffer.length);
                    if (readLen <= 0) {
                        isEndOfStreamReached = true;
                    }
                    if (previousChunkLength > 0) {
                        this.broadcastFragment(this.shrinkIfNeeded(previousChunk, previousChunkLength), previousChunkLength, isEndOfStreamReached).get();
                    }
                    if (isEndOfStreamReached) continue;
                    System.arraycopy(readBuffer, 0, previousChunk, 0, readLen);
                    previousChunkLength = readLen;
                }
                CompletableFuture<Void> future = new CompletableFuture<Void>();
                this.logBroadcastCompleted();
                future.complete(null);
                return future;
            }
            catch (Exception e) {
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Could not broadcast message: " + e.getMessage() + ". Target sockets were: " + this.socketsToString(), (Throwable)e);
                }
                return WebSocketUtils.failedFuture((Throwable)new MuleRuntimeException(I18nMessageFactory.createStaticMessage((String)("Could not perform broadcast: " + e.getMessage())), (Throwable)e));
            }
        }

        private CompletableFuture<Void> broadcastFragment(byte[] buf, int len, boolean isLast) {
            int totalSockets = this.sockets.size();
            AtomicInteger finishedSocketsCount = new AtomicInteger(0);
            CompletableFuture<Void> allSocketsFinishedFuture = new CompletableFuture<Void>();
            for (WebSocket ws : this.sockets) {
                try {
                    WebSocket socket = this.assureConnected(ws, this.retryPolicyTemplate.isEnabled());
                    if (socket == null) {
                        this.incrementAndComplete(finishedSocketsCount, totalSockets, allSocketsFinishedFuture);
                        continue;
                    }
                    byte[] frameData = this.isText ? socket.toTextFrame(new String(buf, 0, len), isLast) : socket.toBinaryFrame(buf, isLast);
                    CompletableFuture thisSocketFuture = socket.sendFrame(frameData);
                    thisSocketFuture.whenComplete((v, e) -> {
                        this.incrementAndComplete(finishedSocketsCount, totalSockets, allSocketsFinishedFuture);
                        if (e != null) {
                            this.handleSocketError(socket, (Throwable)e);
                        }
                    });
                }
                catch (Throwable t) {
                    this.incrementAndComplete(finishedSocketsCount, totalSockets, allSocketsFinishedFuture);
                    this.handleSocketError(ws, t);
                }
            }
            return allSocketsFinishedFuture;
        }

        private byte[] shrinkIfNeeded(byte[] buf, int len) {
            if (buf.length == len) {
                return buf;
            }
            byte[] newBuf = new byte[len];
            System.arraycopy(buf, 0, newBuf, 0, len);
            return newBuf;
        }

        private void incrementAndComplete(AtomicInteger count, int top, CompletableFuture<Void> sink) {
            if (count.addAndGet(1) >= top) {
                sink.complete(null);
            }
        }

        private void handleSocketError(WebSocket socket, Throwable e) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Found exception while broadcasting to WebSocket. " + e.getMessage() + ". Socket was: " + socket.toString(), e);
            }
            this.failedSockets.add(socket.getId());
            this.errorCallback.accept(socket, e);
        }

        private boolean allSocketsFailed() {
            return this.failedSockets.size() >= this.sockets.size();
        }

        private void logBroadcastCompleted() {
            if (LOGGER.isDebugEnabled()) {
                String recipients = "Recipient list was: " + this.socketsToString();
                if (this.failedSockets.isEmpty()) {
                    LOGGER.debug("Broadcast successful to all target WebSockets. {}", (Object)recipients);
                } else {
                    String failed = String.join((CharSequence)", ", this.failedSockets);
                    LOGGER.debug("Broadcast completed, but delivery to the following WebSockets failed: {}. {}", (Object)failed, (Object)recipients);
                }
            }
        }

        private String socketsToString() {
            return this.sockets.stream().map(WebSocket::getId).collect(Collectors.joining(", "));
        }

        private WebSocket assureConnected(WebSocket socket, boolean reconnect) {
            if (this.failedSockets.contains(socket.getId())) {
                return null;
            }
            if (socket.isConnected()) {
                return socket;
            }
            if (!reconnect || socket.isClosed()) {
                LOGGER.info("WebSocket '{}' is closed. Will skip from broadcast", (Object)socket.getId());
                return null;
            }
            if (!socket.supportsReconnection()) {
                LOGGER.info("WebSocket '{}' is not connected and does not support reconnections. Will skip from broadcast", (Object)socket.getId());
                return null;
            }
            try {
                return this.assureConnected((WebSocket)socket.reconnect(this.retryPolicyTemplate, this.reconnectionScheduler).get(), false);
            }
            catch (ExecutionException e) {
                LOGGER.error(String.format("WebSocket '%s' found exception during reconnection. Will skip from broadcast", socket.getId()), (Throwable)e);
            }
            catch (InterruptedException e) {
                LOGGER.error(String.format("WebSocket '%s' got interrupted during reconnection. Will skip from broadcast", socket.getId()), (Throwable)e);
                Thread.currentThread().interrupt();
            }
            return null;
        }
    }
}

