/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.service.http.impl.functional.ws.netty;

import com.mulesoft.service.http.impl.functional.ws.netty.AbstractNettyWebSocketTestCase;
import io.qameta.allure.Issue;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mule.runtime.api.metadata.DataType;
import org.mule.runtime.api.metadata.MediaType;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.retry.policy.RetryPolicyTemplate;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.retry.policy.SimpleRetryPolicyTemplate;
import org.mule.runtime.core.api.util.IOUtils;
import org.mule.runtime.core.api.util.func.CheckedConsumer;
import org.mule.runtime.http.api.client.HttpRequestOptions;
import org.mule.runtime.http.api.client.ws.WebSocketCallback;
import org.mule.runtime.http.api.domain.message.request.HttpRequest;
import org.mule.runtime.http.api.server.ws.WebSocketConnectionHandler;
import org.mule.runtime.http.api.server.ws.WebSocketHandler;
import org.mule.runtime.http.api.server.ws.WebSocketMessageHandler;
import org.mule.runtime.http.api.server.ws.WebSocketRequest;
import org.mule.runtime.http.api.ws.WebSocket;
import org.mule.runtime.http.api.ws.WebSocketCloseCode;
import org.mule.tck.SimpleUnitTestSupportCustomScheduler;
import org.mule.tck.probe.PollingProber;

@Issue(value="W-16264887")
public class NettyWebSocketBroadcasterTestCase
extends AbstractNettyWebSocketTestCase {
    private static final String PATH = "/quotes";
    private static final String SHORT_TEXT_MESSAGE = "Hello There!";
    private static final int TIMEOUT_MILLIS = 15000;
    private static final int POLL_DELAY_MILLIS = 1000;
    private static final int LARGE_MESSAGE_SIZE = 24676;
    private static final int CLIENT_COUNT = 4;
    private final List<WebSocket> serverSockets = new ArrayList<WebSocket>(4);
    private final List<WebSocket> clientSockets = new ArrayList<WebSocket>(4);
    private final Map<String, TypedValue<InputStream>> messages = new ConcurrentHashMap<String, TypedValue<InputStream>>();
    private final Map<String, List<Throwable>> exceptions = new ConcurrentHashMap<String, List<Throwable>>();
    private final AtomicInteger serverSocketCounter = new AtomicInteger(0);
    private final AtomicInteger clientSocketCounter = new AtomicInteger(0);
    private final AtomicInteger closedClientSocketCounter = new AtomicInteger(0);
    private final long idleSocketTimeout = 15000L;
    private BiConsumer<WebSocket, Throwable> errorCallback;
    private RetryPolicyTemplate retryPolicyTemplate = RetryPolicyTemplate.NO_RETRY_POLICY;
    private final SimpleUnitTestSupportCustomScheduler scheduler = new SimpleUnitTestSupportCustomScheduler(1, Thread::new, (RejectedExecutionHandler)new ThreadPoolExecutor.AbortPolicy());

    @Before
    public void before() throws Exception {
        this.errorCallback = (ws, t) -> this.exceptions.computeIfAbsent(ws.getId(), k -> new LinkedList()).add(t);
        this.handlerManager = this.muleHttpServer.addWebSocketHandler((WebSocketHandler)new SaveMessagesWebSocketHandler(this.serverSocketCounter, this.serverSockets, this.messages, 15000L));
        this.handlerManager.start();
        for (int i = 0; i < 4; ++i) {
            this.connect(new SaveClientsWebSocketCallback(this.closedClientSocketCounter, this.clientSockets));
        }
        MatcherAssert.assertThat(this.clientSockets, (Matcher)Matchers.hasSize((int)4));
        MatcherAssert.assertThat(this.serverSockets, (Matcher)Matchers.hasSize((int)4));
    }

    @After
    public void after() {
        this.scheduler.stop();
    }

    @Test
    public void broadcastToEmptyCollection() {
        BiConsumer errorCallback = (BiConsumer)Mockito.mock(BiConsumer.class);
        this.service.newWebSocketBroadcaster().broadcast(Collections.emptyList(), new TypedValue(null, DataType.INPUT_STREAM), errorCallback);
        PollingProber.checkNot((long)15000L, (long)1000L, () -> !this.messages.isEmpty());
        Mockito.verifyNoInteractions((Object[])new Object[]{errorCallback});
    }

    @Test
    public void broadcastShortTextMessage() throws Exception {
        this.assertTextBroadcast(SHORT_TEXT_MESSAGE);
    }

    @Test
    public void broadcastLargeTextMessage() throws Exception {
        this.assertTextBroadcast(RandomStringUtils.randomAlphanumeric((int)24676));
    }

    @Test
    public void broadcastShortBinaryMessage() throws Exception {
        this.assertBinaryBroadcast(RandomUtils.nextBytes((int)30));
    }

    @Test
    public void broadcastLargeBinaryMessage() throws Exception {
        this.assertBinaryBroadcast(RandomUtils.nextBytes((int)24676));
    }

    @Test
    public void broadcastToReconnectableSockets() throws Exception {
        PollingProber.check((long)16000L, (long)5000L, () -> this.closedClientSocketCounter.get() > 0);
        this.retryPolicyTemplate = new SimpleRetryPolicyTemplate(1000L, 2);
        this.assertTextBroadcast(SHORT_TEXT_MESSAGE);
    }

    private void assertTextBroadcast(String text) throws Exception {
        TypedValue content = new TypedValue((Object)new ByteArrayInputStream(text.getBytes()), DataType.TEXT_STRING);
        this.assertBroadcast((TypedValue<InputStream>)content, (CheckedConsumer<TypedValue<InputStream>>)((CheckedConsumer)message -> {
            MatcherAssert.assertThat((Object)message.getDataType().getType(), (Matcher)Matchers.equalTo(InputStream.class));
            MatcherAssert.assertThat((Object)message.getDataType().getMediaType(), (Matcher)Matchers.equalTo((Object)DataType.TEXT_STRING.getMediaType()));
        }));
    }

    private void assertBinaryBroadcast(byte[] data) throws Exception {
        TypedValue content = new TypedValue((Object)new ByteArrayInputStream(data), DataType.INPUT_STREAM);
        this.assertBroadcast((TypedValue<InputStream>)content, (CheckedConsumer<TypedValue<InputStream>>)((CheckedConsumer)message -> MatcherAssert.assertThat((Object)message.getDataType().getMediaType(), (Matcher)Matchers.equalTo((Object)MediaType.BINARY))));
    }

    private void assertBroadcast(TypedValue<InputStream> content, CheckedConsumer<TypedValue<InputStream>> mediaTypeAssertion) throws Exception {
        ((InputStream)content.getValue()).mark(Integer.MAX_VALUE);
        this.service.newWebSocketBroadcaster().broadcast(new ArrayList<WebSocket>(this.clientSockets), content, this.errorCallback, this.retryPolicyTemplate, (Scheduler)this.scheduler);
        PollingProber.probe((long)15000L, (long)1000L, () -> this.messages.size() == 4);
        for (WebSocket socket : this.serverSockets) {
            if (!socket.isConnected()) continue;
            TypedValue<InputStream> message = this.messages.get(socket.getId());
            MatcherAssert.assertThat(message, (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
            mediaTypeAssertion.accept(message);
            ((InputStream)content.getValue()).reset();
            MatcherAssert.assertThat((Object)IOUtils.toByteArray((InputStream)((InputStream)message.getValue())), (Matcher)Matchers.equalTo((Object)IOUtils.toByteArray((InputStream)((InputStream)content.getValue()))));
        }
    }

    private void connect(WebSocketCallback callback) throws Exception {
        PollingProber.probe((long)15000L, (long)1000L, () -> {
            String uri = String.format("ws://localhost:%d%s", this.mulePort.getNumber(), PATH);
            this.muleHttpClient.openWebSocket(HttpRequest.builder().uri(uri).method("GET").build(), HttpRequestOptions.builder().responseTimeout(3000).followsRedirect(true).authentication(null).build(), "" + this.clientSocketCounter.addAndGet(1), callback).get();
            return true;
        });
    }

    private static class SaveMessagesWebSocketHandler
    implements WebSocketHandler {
        private final AtomicInteger serverSocketCounter;
        private final List<WebSocket> serverSockets;
        private final Map<String, TypedValue<InputStream>> messages;
        private final long idleSocketTimeout;

        SaveMessagesWebSocketHandler(AtomicInteger serverSocketCounter, List<WebSocket> serverSockets, Map<String, TypedValue<InputStream>> messages, long idleSocketTimeout) {
            this.serverSocketCounter = serverSocketCounter;
            this.serverSockets = serverSockets;
            this.messages = messages;
            this.idleSocketTimeout = idleSocketTimeout;
        }

        public String getPath() {
            return NettyWebSocketBroadcasterTestCase.PATH;
        }

        public WebSocketConnectionHandler getConnectionHandler() {
            return new SaveSocketsConnectionHandler(this.serverSocketCounter, this.serverSockets);
        }

        public WebSocketMessageHandler getMessageHandler() {
            return message -> this.messages.put(message.getSocket().getId(), (TypedValue<InputStream>)message.getContent());
        }

        public long getIdleSocketTimeoutMills() {
            return this.idleSocketTimeout;
        }
    }

    private static class SaveClientsWebSocketCallback
    implements WebSocketCallback {
        private final AtomicInteger closedClientSocketCounter;
        private final List<WebSocket> clientSockets;

        public SaveClientsWebSocketCallback(AtomicInteger closedClientSocketCounter, List<WebSocket> clientSockets) {
            this.closedClientSocketCounter = closedClientSocketCounter;
            this.clientSockets = clientSockets;
        }

        public void onConnect(WebSocket webSocket) {
            this.clientSockets.add(webSocket);
        }

        public void onClose(WebSocket webSocket, WebSocketCloseCode code, String reason) {
            this.closedClientSocketCounter.addAndGet(1);
        }

        public void onMessage(WebSocket webSocket, TypedValue<InputStream> content) {
        }
    }

    private static class SaveSocketsConnectionHandler
    implements WebSocketConnectionHandler {
        private final AtomicInteger serverSocketCounter;
        private final List<WebSocket> serverSockets;

        SaveSocketsConnectionHandler(AtomicInteger serverSocketCounter, List<WebSocket> serverSockets) {
            this.serverSocketCounter = serverSocketCounter;
            this.serverSockets = serverSockets;
        }

        public String getSocketId(WebSocketRequest request) {
            return "" + this.serverSocketCounter.addAndGet(1);
        }

        public void onConnect(WebSocket socket, WebSocketRequest request) {
            this.serverSockets.add(socket);
        }

        public void onClose(WebSocket socket, WebSocketRequest request, WebSocketCloseCode closeCode, String reason) {
        }
    }
}

