/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.fn.harness.PrecombineGroupingTable;
import org.apache.beam.runners.core.GlobalCombineFnRunner;
import org.apache.beam.runners.core.GlobalCombineFnRunners;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.test.TestExecutors;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ArrayListMultimap;
import org.hamcrest.Description;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.hamcrest.core.Is;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class PrecombineGroupingTableTest {
    @Rule
    public TestExecutors.TestExecutorService executorService = TestExecutors.from((ExecutorService)Executors.newCachedThreadPool());
    private static final Combine.CombineFn<Integer, Long, Long> COMBINE_FN = new Combine.CombineFn<Integer, Long, Long>(){

        public Long createAccumulator() {
            return 0L;
        }

        public Long addInput(Long accumulator, Integer value) {
            return accumulator + (long)value.intValue();
        }

        public Long mergeAccumulators(Iterable<Long> accumulators) {
            long sum = 0L;
            for (Long part : accumulators) {
                sum += part.longValue();
            }
            return sum;
        }

        public Long compact(Long accumulator) {
            if (accumulator % 2L == 0L) {
                return accumulator / 4L;
            }
            return accumulator;
        }

        public Long extractOutput(Long accumulator) {
            return accumulator;
        }
    };

    @Test
    public void testCombiningInheritsOneOfTheValuesTimestamps() throws Exception {
        PrecombineGroupingTable table = new PrecombineGroupingTable(PipelineOptionsFactory.create(), Caches.forMaximumBytes((long)2500L), (Coder)StringUtf8Coder.of(), GlobalCombineFnRunners.create(COMBINE_FN), (PrecombineGroupingTable.SizeEstimator)new TestSizeEstimator(), false);
        TestOutputReceiver receiver = new TestOutputReceiver();
        table.put(WindowedValue.timestampedValueInGlobalWindow((Object)KV.of((Object)"A", (Object)1), (Instant)new Instant(1L)), receiver);
        table.put(WindowedValue.timestampedValueInGlobalWindow((Object)KV.of((Object)"B", (Object)9), (Instant)new Instant(21L)), receiver);
        table.put(WindowedValue.timestampedValueInGlobalWindow((Object)KV.of((Object)"A", (Object)2), (Instant)new Instant(1L)), receiver);
        table.put(WindowedValue.timestampedValueInGlobalWindow((Object)KV.of((Object)"B", (Object)2), (Instant)new Instant(20L)), receiver);
        table.put(WindowedValue.timestampedValueInGlobalWindow((Object)KV.of((Object)"A", (Object)4), (Instant)new Instant(1L)), receiver);
        table.flush(receiver);
        MatcherAssert.assertThat(receiver.outputElems, Matchers.containsInAnyOrder(WindowedValue.timestampedValueInGlobalWindow((Object)KV.of((Object)"A", (Object)7L), (Instant)new Instant(1L)), WindowedValue.timestampedValueInGlobalWindow((Object)KV.of((Object)"B", (Object)11L), (Instant)new Instant(21L))));
    }

    @Test
    public void testCombiningGroupingTableHonorsKeyWeights() throws Exception {
        PrecombineGroupingTable table = new PrecombineGroupingTable(PipelineOptionsFactory.create(), Caches.forMaximumBytes((long)2500L), (Coder)StringUtf8Coder.of(), GlobalCombineFnRunners.create(COMBINE_FN), (PrecombineGroupingTable.SizeEstimator)new TestSizeEstimator(), false);
        TestOutputReceiver receiver = new TestOutputReceiver();
        table.put(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"AAA", (Object)1)), receiver);
        table.put(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"AAA", (Object)2)), receiver);
        table.put(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"AAA", (Object)4)), receiver);
        MatcherAssert.assertThat(receiver.outputElems, Matchers.empty());
        table.put(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"BB", (Object)509)), receiver);
        table.put(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"CCC", (Object)11)), receiver);
        MatcherAssert.assertThat(receiver.outputElems, Matchers.containsInAnyOrder(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"AAA", (Object)7L)), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"BB", (Object)509L))));
        table.flush(receiver);
        MatcherAssert.assertThat(receiver.outputElems, Matchers.containsInAnyOrder(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"AAA", (Object)7L)), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"BB", (Object)509L)), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"CCC", (Object)11L))));
    }

    @Test
    public void testCombiningGroupingTableEvictsAllOnLargeEntry() throws Exception {
        PrecombineGroupingTable table = new PrecombineGroupingTable(PipelineOptionsFactory.create(), Caches.forMaximumBytes((long)2500L), (Coder)StringUtf8Coder.of(), GlobalCombineFnRunners.create(COMBINE_FN), (PrecombineGroupingTable.SizeEstimator)new TestSizeEstimator(), false);
        TestOutputReceiver receiver = new TestOutputReceiver();
        table.put(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"A", (Object)1)), receiver);
        table.put(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"B", (Object)3)), receiver);
        table.put(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"B", (Object)6)), receiver);
        table.put(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"C", (Object)7)), receiver);
        MatcherAssert.assertThat(receiver.outputElems, Matchers.empty());
        table.put(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"C", (Object)9999)), receiver);
        MatcherAssert.assertThat(receiver.outputElems, Matchers.containsInAnyOrder(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"A", (Object)1L)), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"B", (Object)9L)), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"C", (Object)2501L))));
        table.flush(receiver);
        MatcherAssert.assertThat(receiver.outputElems, Matchers.containsInAnyOrder(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"A", (Object)1L)), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"B", (Object)9L)), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"C", (Object)2501L))));
    }

    @Test
    public void testCombiningGroupingTableCompactionSaves() throws Exception {
        PrecombineGroupingTable table = new PrecombineGroupingTable(PipelineOptionsFactory.create(), Caches.forMaximumBytes((long)2500L), (Coder)StringUtf8Coder.of(), GlobalCombineFnRunners.create(COMBINE_FN), (PrecombineGroupingTable.SizeEstimator)new TestSizeEstimator(), false);
        TestOutputReceiver receiver = new TestOutputReceiver();
        table.put(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"A", (Object)804)), receiver);
        table.put(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"B", (Object)904)), receiver);
        table.put(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"C", (Object)1004)), receiver);
        MatcherAssert.assertThat(receiver.outputElems, Matchers.empty());
        MatcherAssert.assertThat(table.getWeight(), Matchers.lessThan(2712L));
        table.flush(receiver);
        MatcherAssert.assertThat(receiver.outputElems, Matchers.containsInAnyOrder(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"A", (Object)201L)), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"B", (Object)226L)), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"C", (Object)251L))));
    }

    @Test
    public void testCombiningGroupingTablePartialEviction() throws Exception {
        PrecombineGroupingTable table = new PrecombineGroupingTable(PipelineOptionsFactory.create(), Caches.forMaximumBytes((long)2500L), (Coder)StringUtf8Coder.of(), GlobalCombineFnRunners.create(COMBINE_FN), (PrecombineGroupingTable.SizeEstimator)new TestSizeEstimator(), false);
        TestOutputReceiver receiver = new TestOutputReceiver();
        table.put(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"A", (Object)801)), receiver);
        table.put(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"B", (Object)901)), receiver);
        table.put(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"C", (Object)1001)), receiver);
        MatcherAssert.assertThat(receiver.outputElems, Matchers.containsInAnyOrder(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"A", (Object)801L)), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"B", (Object)901L))));
        table.flush(receiver);
        MatcherAssert.assertThat(receiver.outputElems, Matchers.containsInAnyOrder(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"A", (Object)801L)), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"B", (Object)901L)), WindowedValue.valueInGlobalWindow((Object)KV.of((Object)"C", (Object)1001L))));
    }

    @Test
    public void testCombiningGroupingTableEmitsCorrectValuesUnderHighCacheContention() throws Exception {
        Long[] expectedKeys = new Long[1000];
        for (int j = 1; j <= 1000; ++j) {
            expectedKeys[j - 1] = j;
        }
        int numThreads = 1000;
        ArrayList<Future> futures = new ArrayList<Future>(numThreads);
        PipelineOptions options = PipelineOptionsFactory.create();
        GlobalCombineFnRunner<Integer, Long, Long> combineFnRunner = GlobalCombineFnRunners.create(COMBINE_FN);
        Cache cache = Caches.forMaximumBytes((long)(numThreads * 50000));
        int i = 0;
        while (i < numThreads) {
            int currentI = i++;
            futures.add(this.executorService.submit(() -> {
                int j;
                ArrayListMultimap values = ArrayListMultimap.create();
                PrecombineGroupingTable table = new PrecombineGroupingTable(options, Caches.subCache((Cache)cache, (Object)currentI, (Object[])new Object[0]), (Coder)VarLongCoder.of(), combineFnRunner, (PrecombineGroupingTable.SizeEstimator)new TestSizeEstimator(), false);
                for (j = 1; j <= 1000; ++j) {
                    table.put(WindowedValue.valueInGlobalWindow((Object)KV.of((Object)j, (Object)j)), input -> values.put((Object)((Long)((KV)input.getValue()).getKey()), (Object)((Long)((KV)input.getValue()).getValue())));
                }
                for (j = 1; j <= 1000; ++j) {
                    table.flush(input -> values.put((Object)((Long)((KV)input.getValue()).getKey()), (Object)((Long)((KV)input.getValue()).getValue())));
                }
                MatcherAssert.assertThat(values.keySet(), Matchers.containsInAnyOrder(expectedKeys));
                for (Map.Entry value : values.entries()) {
                    if ((Long)value.getKey() % 2L == 0L) {
                        MatcherAssert.assertThat((Long)value.getValue(), Matchers.equalTo((Long)value.getKey() / 4L));
                        continue;
                    }
                    MatcherAssert.assertThat((Long)value.getValue(), Matchers.equalTo((Long)value.getKey()));
                }
                return null;
            }));
        }
        for (Future future : futures) {
            future.get();
        }
    }

    @Test
    public void testSampleFlatSizes() throws Exception {
        int k;
        TestSizeEstimator underlying = new TestSizeEstimator();
        PrecombineGroupingTable.SamplingSizeEstimator estimator = new PrecombineGroupingTable.SamplingSizeEstimator((PrecombineGroupingTable.SizeEstimator)underlying, 0.05, 1.0, 10L, new Random(1L));
        for (k = 0; k < 10; ++k) {
            Assert.assertEquals(100L, estimator.estimateSize((Object)100L));
            Assert.assertEquals(k + 1, underlying.calls);
        }
        for (k = 10; k < 20; ++k) {
            Assert.assertEquals(100L, estimator.estimateSize((Object)100L));
        }
        MatcherAssert.assertThat(underlying.calls, PrecombineGroupingTableTest.between(11, 19));
        int initialCalls = underlying.calls;
        for (int k2 = 20; k2 < 1020; ++k2) {
            Assert.assertEquals(100L, estimator.estimateSize((Object)100L));
        }
        MatcherAssert.assertThat(underlying.calls - initialCalls, PrecombineGroupingTableTest.between(40, 60));
    }

    @Test
    public void testSampleBoringSizes() throws Exception {
        int k;
        TestSizeEstimator underlying = new TestSizeEstimator();
        PrecombineGroupingTable.SamplingSizeEstimator estimator = new PrecombineGroupingTable.SamplingSizeEstimator((PrecombineGroupingTable.SizeEstimator)underlying, 0.05, 1.0, 10L, new Random(1L));
        for (k = 0; k < 10; k += 2) {
            Assert.assertEquals(100L, estimator.estimateSize((Object)100L));
            Assert.assertEquals(102L, estimator.estimateSize((Object)102L));
            Assert.assertEquals(k + 2, underlying.calls);
        }
        for (k = 10; k < 20; k += 2) {
            MatcherAssert.assertThat(estimator.estimateSize((Object)100L), PrecombineGroupingTableTest.between(100L, 102L));
            MatcherAssert.assertThat(estimator.estimateSize((Object)102L), PrecombineGroupingTableTest.between(100L, 102L));
        }
        MatcherAssert.assertThat(underlying.calls, PrecombineGroupingTableTest.between(11, 19));
        int initialCalls = underlying.calls;
        for (int k2 = 20; k2 < 1020; k2 += 2) {
            MatcherAssert.assertThat(estimator.estimateSize((Object)100L), PrecombineGroupingTableTest.between(100L, 102L));
            MatcherAssert.assertThat(estimator.estimateSize((Object)102L), PrecombineGroupingTableTest.between(100L, 102L));
        }
        MatcherAssert.assertThat(underlying.calls - initialCalls, PrecombineGroupingTableTest.between(40, 60));
    }

    @Test
    public void testSampleHighVarianceSizes() throws Exception {
        long size;
        int k;
        long size2;
        int k2;
        List<Long> sizes = Arrays.asList(1L, 10L, 100L, 1000L);
        TestSizeEstimator underlying = new TestSizeEstimator();
        PrecombineGroupingTable.SamplingSizeEstimator estimator = new PrecombineGroupingTable.SamplingSizeEstimator((PrecombineGroupingTable.SizeEstimator)underlying, 0.1, 0.2, 10L, new Random(1L));
        for (k2 = 0; k2 < 10; ++k2) {
            size2 = sizes.get(k2 % sizes.size());
            Assert.assertEquals(size2, estimator.estimateSize((Object)size2));
            Assert.assertEquals(k2 + 1, underlying.calls);
        }
        for (k2 = 10; k2 < 20; ++k2) {
            size2 = sizes.get(k2 % sizes.size());
            Assert.assertEquals(size2, estimator.estimateSize((Object)size2));
            Assert.assertEquals(k2 + 1, underlying.calls);
        }
        for (k2 = 20; k2 < 500; ++k2) {
            estimator.estimateSize((Object)sizes.get(k2 % sizes.size()));
        }
        int initialCalls = underlying.calls;
        for (k = 500; k < 1500; ++k) {
            size = sizes.get(k % sizes.size());
            MatcherAssert.assertThat(estimator.estimateSize((Object)size), Matchers.anyOf(Is.is(Matchers.in(sizes)), PrecombineGroupingTableTest.between(250L, 350L)));
        }
        MatcherAssert.assertThat(underlying.calls - initialCalls, PrecombineGroupingTableTest.between(180, 220));
        for (k = 1500; k < 3000; ++k) {
            estimator.estimateSize((Object)sizes.get(k % sizes.size()));
        }
        initialCalls = underlying.calls;
        for (k = 3000; k < 4000; ++k) {
            size = sizes.get(k % sizes.size());
            MatcherAssert.assertThat(estimator.estimateSize((Object)size), Matchers.anyOf(Is.is(Matchers.in(sizes)), PrecombineGroupingTableTest.between(250L, 350L)));
        }
        MatcherAssert.assertThat(underlying.calls - initialCalls, PrecombineGroupingTableTest.between(90, 110));
    }

    @Test
    public void testSampleChangingSizes() throws Exception {
        int k;
        TestSizeEstimator underlying = new TestSizeEstimator();
        PrecombineGroupingTable.SamplingSizeEstimator estimator = new PrecombineGroupingTable.SamplingSizeEstimator((PrecombineGroupingTable.SizeEstimator)underlying, 0.05, 1.0, 10L, new Random(1L));
        for (k = 0; k < 10; ++k) {
            Assert.assertEquals(100L, estimator.estimateSize((Object)100L));
            Assert.assertEquals(k + 1, underlying.calls);
        }
        for (k = 10; k < 20; ++k) {
            Assert.assertEquals(100L, estimator.estimateSize((Object)100L));
        }
        MatcherAssert.assertThat(underlying.calls, PrecombineGroupingTableTest.between(11, 19));
        int initialCalls = underlying.calls;
        for (int k2 = 20; k2 < 1020; ++k2) {
            Assert.assertEquals(100L, estimator.estimateSize((Object)100L));
        }
        MatcherAssert.assertThat(underlying.calls - initialCalls, PrecombineGroupingTableTest.between(40, 60));
        while (estimator.estimateSize((Object)1000000L) == 100L) {
        }
        Assert.assertEquals(99L, estimator.estimateSize((Object)99L));
    }

    private static <T extends Comparable<T>> TypeSafeDiagnosingMatcher<T> between(final T min, final T max) {
        return new TypeSafeDiagnosingMatcher<T>(){

            @Override
            public void describeTo(Description description) {
                description.appendText("is between " + min + " and " + max);
            }

            @Override
            protected boolean matchesSafely(T item, Description mismatchDescription) {
                return min.compareTo(item) <= 0 && item.compareTo((Comparable)max) <= 0;
            }
        };
    }

    private static class TestSizeEstimator
    implements PrecombineGroupingTable.SizeEstimator {
        int calls = 0;

        private TestSizeEstimator() {
        }

        public long estimateSize(Object element) {
            ++this.calls;
            if (element instanceof PrecombineGroupingTable.GloballyWindowedTableGroupingKey) {
                element = ((PrecombineGroupingTable.GloballyWindowedTableGroupingKey)element).getStructuralKey();
            } else if (element instanceof PrecombineGroupingTable.WindowedGroupingTableKey) {
                element = ((PrecombineGroupingTable.WindowedGroupingTableKey)element).getStructuralKey();
            } else if (element instanceof PrecombineGroupingTable.GroupingTableEntry) {
                element = ((PrecombineGroupingTable.GroupingTableEntry)element).getAccumulator();
            }
            if (element instanceof String) {
                return (long)Math.pow(10.0, ((String)element).length());
            }
            if (element instanceof Long) {
                return (Long)element;
            }
            throw new IllegalArgumentException("Unknown type " + (element == null ? "null" : element.getClass().toString()));
        }
    }

    private static class TestOutputReceiver<T>
    implements FnDataReceiver<T> {
        final List<T> outputElems = new ArrayList<T>();

        private TestOutputReceiver() {
        }

        public void accept(T elem) {
            this.outputElems.add(elem);
        }
    }
}

