/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.service.http.impl.functional.ws;

import com.mulesoft.service.http.impl.functional.ws.AbstractWebSocketTestCase;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mule.runtime.api.metadata.DataType;
import org.mule.runtime.api.metadata.MediaType;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.retry.policy.NoRetryPolicyTemplate;
import org.mule.runtime.core.api.retry.policy.RetryPolicyTemplate;
import org.mule.runtime.core.api.retry.policy.SimpleRetryPolicyTemplate;
import org.mule.runtime.core.api.util.IOUtils;
import org.mule.runtime.core.api.util.func.CheckedConsumer;
import org.mule.runtime.http.api.client.HttpRequestOptions;
import org.mule.runtime.http.api.client.ws.WebSocketCallback;
import org.mule.runtime.http.api.domain.message.request.HttpRequest;
import org.mule.runtime.http.api.server.ws.WebSocketConnectionHandler;
import org.mule.runtime.http.api.server.ws.WebSocketConnectionRejectedException;
import org.mule.runtime.http.api.server.ws.WebSocketHandler;
import org.mule.runtime.http.api.server.ws.WebSocketMessageHandler;
import org.mule.runtime.http.api.server.ws.WebSocketRequest;
import org.mule.runtime.http.api.ws.WebSocket;
import org.mule.runtime.http.api.ws.WebSocketCloseCode;
import org.mule.tck.SimpleUnitTestSupportCustomScheduler;
import org.mule.tck.probe.PollingProber;

public class BroadcastWebSocketTestCase
extends AbstractWebSocketTestCase {
    private static final String PATH = "/quotes";
    private static final String SHORT_TEXT_MESSAGE = "Hello There!";
    private static final int TIMEOUT_MILLIS = 15000;
    private static final int POLL_DELAY_MILLIS = 1000;
    private static final int LARGE_MESSAGE_SIZE = 24676;
    private static final int CLIENT_COUNT = 3;
    private final List<WebSocket> serverSockets = new ArrayList<WebSocket>(3);
    private final List<WebSocket> clientSockets = new ArrayList<WebSocket>(3);
    private final Map<String, TypedValue<InputStream>> messages = new ConcurrentHashMap<String, TypedValue<InputStream>>();
    private final Map<String, List<Throwable>> exceptions = new ConcurrentHashMap<String, List<Throwable>>();
    private final AtomicInteger serverSocketCounter = new AtomicInteger(0);
    private final AtomicInteger clientSocketCounter = new AtomicInteger(0);
    private final AtomicInteger closedClientSocketCounter = new AtomicInteger(0);
    private final long idleSocketTimeout = 15000L;
    private BiConsumer<WebSocket, Throwable> errorCallback;
    private RetryPolicyTemplate retryPolicyTemplate = new NoRetryPolicyTemplate();
    private SimpleUnitTestSupportCustomScheduler scheduler = new SimpleUnitTestSupportCustomScheduler(1, Thread::new, (RejectedExecutionHandler)new ThreadPoolExecutor.AbortPolicy());

    @Before
    public void before() throws Exception {
        this.errorCallback = (ws, t) -> this.exceptions.computeIfAbsent(ws.getId(), k -> new LinkedList()).add(t);
        this.handlerManager = server.addWebSocketHandler(new WebSocketHandler(){

            public String getPath() {
                return BroadcastWebSocketTestCase.PATH;
            }

            public WebSocketConnectionHandler getConnectionHandler() {
                return new WebSocketConnectionHandler(){

                    public String getSocketId(WebSocketRequest request) {
                        return "" + BroadcastWebSocketTestCase.this.serverSocketCounter.addAndGet(1);
                    }

                    public void onConnect(WebSocket socket, WebSocketRequest request) throws WebSocketConnectionRejectedException {
                        BroadcastWebSocketTestCase.this.serverSockets.add(socket);
                    }

                    public void onClose(WebSocket socket, WebSocketRequest request, WebSocketCloseCode closeCode, String reason) {
                    }
                };
            }

            public WebSocketMessageHandler getMessageHandler() {
                return message -> BroadcastWebSocketTestCase.this.messages.put(message.getSocket().getId(), message.getContent());
            }

            public long getIdleSocketTimeoutMills() {
                return 15000L;
            }
        });
        this.handlerManager.start();
        for (int i = 0; i < 3; ++i) {
            this.connect(new WebSocketCallback(){

                public void onConnect(WebSocket webSocket) {
                    BroadcastWebSocketTestCase.this.clientSockets.add(webSocket);
                }

                public void onClose(WebSocket webSocket, WebSocketCloseCode code, String reason) {
                    BroadcastWebSocketTestCase.this.closedClientSocketCounter.addAndGet(1);
                }

                public void onMessage(WebSocket webSocket, TypedValue<InputStream> content) {
                }
            });
        }
        Assert.assertThat(this.clientSockets, (Matcher)Matchers.hasSize((int)3));
        Assert.assertThat(this.serverSockets, (Matcher)Matchers.hasSize((int)3));
    }

    @Override
    @After
    public void after() {
        super.after();
        this.scheduler.stop();
    }

    @Test
    public void broadcastToEmptyCollection() {
        BiConsumer errorCallback = (BiConsumer)Mockito.mock(BiConsumer.class);
        service.newWebSocketBroadcaster().broadcast(Collections.emptyList(), new TypedValue(null, DataType.INPUT_STREAM), errorCallback);
        PollingProber.checkNot((long)15000L, (long)1000L, () -> !this.messages.isEmpty());
        Mockito.verifyNoInteractions((Object[])new Object[]{errorCallback});
    }

    @Test
    public void broadcastShortTextMessage() throws Exception {
        this.assertTextBroadcast(SHORT_TEXT_MESSAGE);
    }

    @Test
    public void broadcastLargeTextMessage() throws Exception {
        this.assertTextBroadcast(RandomStringUtils.randomAlphanumeric((int)24676));
    }

    @Test
    public void broadcastShortBinaryMessage() throws Exception {
        this.assertBinaryBroadcast(RandomUtils.nextBytes((int)30));
    }

    @Test
    public void broadcastLargeBinaryMessage() throws Exception {
        this.assertBinaryBroadcast(RandomUtils.nextBytes((int)24676));
    }

    @Test
    public void broadcastToReconnectableSockets() throws Exception {
        PollingProber.check((long)16000L, (long)5000L, () -> this.closedClientSocketCounter.get() > 0);
        this.retryPolicyTemplate = new SimpleRetryPolicyTemplate(1000L, 2);
        this.assertTextBroadcast(SHORT_TEXT_MESSAGE);
    }

    private void assertTextBroadcast(String text) throws Exception {
        TypedValue content = new TypedValue((Object)new ByteArrayInputStream(text.getBytes()), DataType.TEXT_STRING);
        this.assertBroadcast((TypedValue<InputStream>)content, (CheckedConsumer<TypedValue<InputStream>>)((CheckedConsumer)message -> {
            Assert.assertThat((Object)message.getDataType().getType(), (Matcher)Matchers.equalTo(InputStream.class));
            Assert.assertThat((Object)message.getDataType().getMediaType(), (Matcher)Matchers.equalTo((Object)DataType.TEXT_STRING.getMediaType()));
        }));
    }

    private void assertBinaryBroadcast(byte[] data) throws Exception {
        TypedValue content = new TypedValue((Object)new ByteArrayInputStream(data), DataType.INPUT_STREAM);
        this.assertBroadcast((TypedValue<InputStream>)content, (CheckedConsumer<TypedValue<InputStream>>)((CheckedConsumer)message -> Assert.assertThat((Object)message.getDataType().getMediaType(), (Matcher)Matchers.equalTo((Object)MediaType.BINARY))));
    }

    private void assertBroadcast(TypedValue<InputStream> content, CheckedConsumer<TypedValue<InputStream>> mediaTypeAssertion) throws Exception {
        ((InputStream)content.getValue()).mark(Integer.MAX_VALUE);
        service.newWebSocketBroadcaster().broadcast(new ArrayList<WebSocket>(this.clientSockets), content, this.errorCallback, this.retryPolicyTemplate, (Scheduler)this.scheduler);
        PollingProber.probe((long)15000L, (long)1000L, () -> this.messages.size() == 3);
        for (WebSocket socket : this.serverSockets) {
            if (!socket.isConnected()) continue;
            TypedValue<InputStream> message = this.messages.get(socket.getId());
            Assert.assertThat(message, (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
            mediaTypeAssertion.accept(message);
            ((InputStream)content.getValue()).reset();
            Assert.assertThat((Object)IOUtils.toByteArray((InputStream)((InputStream)message.getValue())), (Matcher)Matchers.equalTo((Object)IOUtils.toByteArray((InputStream)((InputStream)content.getValue()))));
        }
    }

    private void connect(WebSocketCallback callback) throws Exception {
        PollingProber.probe((long)15000L, (long)1000L, () -> {
            String uri = String.format("ws://localhost:%d%s", port.getNumber(), PATH);
            client.openWebSocket(HttpRequest.builder().uri(uri).method("GET").build(), HttpRequestOptions.builder().responseTimeout(3000).followsRedirect(true).authentication(null).build(), "" + this.clientSocketCounter.addAndGet(1), callback).get();
            return true;
        });
    }
}

