/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.worker.shuffle;

import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.druid.indexing.worker.shuffle.ShuffleMetrics;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class ShuffleMetricsTest {
    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    @Test
    public void testShuffleRequested() {
        ShuffleMetrics metrics = new ShuffleMetrics();
        String supervisorTask1 = "supervisor1";
        String supervisorTask2 = "supervisor2";
        String supervisorTask3 = "supervisor3";
        metrics.shuffleRequested("supervisor1", 1024L);
        metrics.shuffleRequested("supervisor2", 10L);
        metrics.shuffleRequested("supervisor1", 512L);
        metrics.shuffleRequested("supervisor3", 10000L);
        metrics.shuffleRequested("supervisor2", 30L);
        Map snapshot = metrics.snapshotAndReset();
        Assert.assertEquals((Object)ImmutableSet.of((Object)"supervisor1", (Object)"supervisor2", (Object)"supervisor3"), snapshot.keySet());
        ShuffleMetrics.PerDatasourceShuffleMetrics perDatasourceShuffleMetrics = (ShuffleMetrics.PerDatasourceShuffleMetrics)snapshot.get("supervisor1");
        Assert.assertEquals((long)2L, (long)perDatasourceShuffleMetrics.getShuffleRequests());
        Assert.assertEquals((long)1536L, (long)perDatasourceShuffleMetrics.getShuffleBytes());
        perDatasourceShuffleMetrics = (ShuffleMetrics.PerDatasourceShuffleMetrics)snapshot.get("supervisor2");
        Assert.assertEquals((long)2L, (long)perDatasourceShuffleMetrics.getShuffleRequests());
        Assert.assertEquals((long)40L, (long)perDatasourceShuffleMetrics.getShuffleBytes());
        perDatasourceShuffleMetrics = (ShuffleMetrics.PerDatasourceShuffleMetrics)snapshot.get("supervisor3");
        Assert.assertEquals((long)1L, (long)perDatasourceShuffleMetrics.getShuffleRequests());
        Assert.assertEquals((long)10000L, (long)perDatasourceShuffleMetrics.getShuffleBytes());
    }

    @Test
    public void testSnapshotUnmodifiable() {
        this.expectedException.expect(UnsupportedOperationException.class);
        new ShuffleMetrics().snapshotAndReset().put("k", new ShuffleMetrics.PerDatasourceShuffleMetrics());
    }

    @Test
    public void testResetDatasourceMetricsAfterSnapshot() {
        ShuffleMetrics shuffleMetrics = new ShuffleMetrics();
        shuffleMetrics.shuffleRequested("supervisor", 10L);
        shuffleMetrics.shuffleRequested("supervisor", 10L);
        shuffleMetrics.shuffleRequested("supervisor2", 10L);
        shuffleMetrics.snapshotAndReset();
        Assert.assertEquals(Collections.emptyMap(), (Object)shuffleMetrics.getDatasourceMetrics());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=5000L)
    public void testConcurrency() throws ExecutionException, InterruptedException {
        ExecutorService exec = Execs.multiThreaded((int)3, (String)"shuffle-metrics-test-%d");
        try {
            ShuffleMetrics metrics = new ShuffleMetrics();
            String supervisorTask1 = "supervisor1";
            String supervisorTask2 = "supervisor2";
            CountDownLatch firstUpdatelatch = new CountDownLatch(2);
            ArrayList<Future<Void>> futures = new ArrayList<Future<Void>>();
            futures.add(exec.submit(() -> {
                metrics.shuffleRequested("supervisor1", 1024L);
                metrics.shuffleRequested("supervisor2", 30L);
                firstUpdatelatch.countDown();
                Thread.sleep(ThreadLocalRandom.current().nextInt(10));
                metrics.shuffleRequested("supervisor2", 10L);
                return null;
            }));
            futures.add(exec.submit(() -> {
                metrics.shuffleRequested("supervisor2", 30L);
                metrics.shuffleRequested("supervisor1", 1024L);
                firstUpdatelatch.countDown();
                Thread.sleep(ThreadLocalRandom.current().nextInt(10));
                metrics.shuffleRequested("supervisor1", 32L);
                return null;
            }));
            Map firstSnapshot = exec.submit(() -> {
                firstUpdatelatch.await();
                Thread.sleep(ThreadLocalRandom.current().nextInt(10));
                return metrics.snapshotAndReset();
            }).get();
            int expectedSecondSnapshotSize = 0;
            boolean task1ShouldBeInSecondSnapshot = false;
            boolean task2ShouldBeInSecondSnapshot = false;
            Assert.assertEquals((long)2L, (long)firstSnapshot.size());
            Assert.assertNotNull(firstSnapshot.get("supervisor1"));
            Assert.assertTrue((2048L == ((ShuffleMetrics.PerDatasourceShuffleMetrics)firstSnapshot.get("supervisor1")).getShuffleBytes() || 2080L == ((ShuffleMetrics.PerDatasourceShuffleMetrics)firstSnapshot.get("supervisor1")).getShuffleBytes() ? 1 : 0) != 0);
            Assert.assertTrue((2 == ((ShuffleMetrics.PerDatasourceShuffleMetrics)firstSnapshot.get("supervisor1")).getShuffleRequests() || 3 == ((ShuffleMetrics.PerDatasourceShuffleMetrics)firstSnapshot.get("supervisor1")).getShuffleRequests() ? 1 : 0) != 0);
            if (((ShuffleMetrics.PerDatasourceShuffleMetrics)firstSnapshot.get("supervisor1")).getShuffleRequests() == 2) {
                ++expectedSecondSnapshotSize;
                task1ShouldBeInSecondSnapshot = true;
            }
            Assert.assertNotNull(firstSnapshot.get("supervisor2"));
            Assert.assertTrue((60L == ((ShuffleMetrics.PerDatasourceShuffleMetrics)firstSnapshot.get("supervisor2")).getShuffleBytes() || 70L == ((ShuffleMetrics.PerDatasourceShuffleMetrics)firstSnapshot.get("supervisor2")).getShuffleBytes() ? 1 : 0) != 0);
            Assert.assertTrue((2 == ((ShuffleMetrics.PerDatasourceShuffleMetrics)firstSnapshot.get("supervisor2")).getShuffleRequests() || 3 == ((ShuffleMetrics.PerDatasourceShuffleMetrics)firstSnapshot.get("supervisor2")).getShuffleRequests() ? 1 : 0) != 0);
            if (((ShuffleMetrics.PerDatasourceShuffleMetrics)firstSnapshot.get("supervisor2")).getShuffleRequests() == 2) {
                ++expectedSecondSnapshotSize;
                task2ShouldBeInSecondSnapshot = true;
            }
            for (Future future : futures) {
                future.get();
            }
            Map secondSnapshot = metrics.snapshotAndReset();
            Assert.assertEquals((long)expectedSecondSnapshotSize, (long)secondSnapshot.size());
            Assert.assertEquals((Object)task1ShouldBeInSecondSnapshot, (Object)secondSnapshot.containsKey("supervisor1"));
            if (task1ShouldBeInSecondSnapshot) {
                Assert.assertEquals((long)32L, (long)((ShuffleMetrics.PerDatasourceShuffleMetrics)secondSnapshot.get("supervisor1")).getShuffleBytes());
                Assert.assertEquals((long)1L, (long)((ShuffleMetrics.PerDatasourceShuffleMetrics)secondSnapshot.get("supervisor1")).getShuffleRequests());
            }
            Assert.assertEquals((Object)task2ShouldBeInSecondSnapshot, (Object)secondSnapshot.containsKey("supervisor2"));
            if (task2ShouldBeInSecondSnapshot) {
                Assert.assertEquals((long)10L, (long)((ShuffleMetrics.PerDatasourceShuffleMetrics)secondSnapshot.get("supervisor2")).getShuffleBytes());
                Assert.assertEquals((long)1L, (long)((ShuffleMetrics.PerDatasourceShuffleMetrics)secondSnapshot.get("supervisor2")).getShuffleRequests());
            }
        }
        finally {
            exec.shutdown();
        }
    }
}

