/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.state;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.FakeBeamFnStateClient;
import org.apache.beam.fn.harness.state.StateBackedIterable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterator;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Streams;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.Parameterized;

@RunWith(value=Enclosed.class)
public class StateBackedIterableTest {
    private static BeamFnApi.StateKey key(String id) throws IOException {
        return BeamFnApi.StateKey.newBuilder().setRunner(BeamFnApi.StateKey.Runner.newBuilder().setKey(StateBackedIterableTest.encode(id))).build();
    }

    private static ByteString encode(String ... values) throws IOException {
        ByteStringOutputStream out = new ByteStringOutputStream();
        for (String value : values) {
            StringUtf8Coder.of().encode(value, (OutputStream)out);
        }
        return out.toByteString();
    }

    @RunWith(value=JUnit4.class)
    public static class CoderTest {
        @Test
        public void testDecodeEncodeRegularIterable() throws Exception {
            FluentIterable iterable = FluentIterable.of((Object)"A", (Object[])new String[]{"B", "C"});
            StateBackedIterable.Coder coder = new StateBackedIterable.Coder(() -> Caches.noop(), null, () -> "instructionId", (Coder)StringUtf8Coder.of());
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            coder.encode((Iterable)iterable, (OutputStream)baos);
            Iterable result = coder.decode((InputStream)new ByteArrayInputStream(baos.toByteArray()));
            Assert.assertEquals(Lists.newArrayList((Iterable)iterable), Lists.newArrayList((Iterable)result));
        }

        @Test
        public void testEncodeDecodeStateBackedIterable() throws Exception {
            StateBackedIterable iterable = new StateBackedIterable(Caches.noop(), null, "instructionId", StateBackedIterableTest.key("key"), (Coder)StringUtf8Coder.of(), Arrays.asList("A", "B"));
            StateBackedIterable.Coder coder = new StateBackedIterable.Coder(() -> Caches.noop(), null, () -> "instructionId", (Coder)StringUtf8Coder.of());
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            coder.encode((Iterable)iterable, (OutputStream)baos);
            StateBackedIterable result = (StateBackedIterable)coder.decode((InputStream)new ByteArrayInputStream(baos.toByteArray()));
            Assert.assertEquals(iterable.prefix, result.prefix);
            Assert.assertEquals(iterable.request, result.request);
        }

        @Test
        public void testSerializability() throws Exception {
            FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.of((Object)StateBackedIterableTest.key("suffix"), Arrays.asList("C", "D", "E"), (Object)StateBackedIterableTest.key("emptySuffix"), Arrays.asList(new String[0])));
            StateBackedIterable iterable = new StateBackedIterable(Caches.noop(), (BeamFnStateClient)fakeBeamFnStateClient, "instruction", StateBackedIterableTest.key("suffix"), (Coder)StringUtf8Coder.of(), (List)ImmutableList.of((Object)"A", (Object)"B"));
            ImmutableList expected = ImmutableList.of((Object)"A", (Object)"B", (Object)"C", (Object)"D", (Object)"E");
            ByteArrayOutputStream bout = new ByteArrayOutputStream();
            ObjectOutputStream out = new ObjectOutputStream(bout);
            out.writeObject(iterable);
            out.flush();
            ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
            ObjectInputStream in = new ObjectInputStream(bin);
            Iterable deserialized = (Iterable)in.readObject();
            Assert.assertEquals(expected, Lists.newArrayList((Iterable)deserialized));
            Assert.assertEquals(expected, Lists.newArrayList((Iterable)iterable));
        }
    }

    @RunWith(value=Parameterized.class)
    public static class IterationTest {
        @Parameterized.Parameter(value=0)
        public List<String> prefix;
        @Parameterized.Parameter(value=1)
        public String suffixKey;
        @Parameterized.Parameter(value=2)
        public List<String> expected;

        @Parameterized.Parameters
        public static Iterable<Object[]> data() {
            return ImmutableList.builder().add((Object)new Object[]{Collections.emptyList(), "emptySuffix", ImmutableList.of()}).add((Object)new Object[]{ImmutableList.of((Object)"A", (Object)"B"), "emptySuffix", ImmutableList.of((Object)"A", (Object)"B")}).add((Object)new Object[]{Collections.emptyList(), "nonEmptySuffix", ImmutableList.of((Object)"C", (Object)"D", (Object)"E", (Object)"F", (Object)"G", (Object)"H", (Object)"I", (Object)"J", (Object)"K")}).add((Object)new Object[]{ImmutableList.of((Object)"A", (Object)"B"), "nonEmptySuffix", ImmutableList.of((Object)"A", (Object)"B", (Object)"C", (Object)"D", (Object)"E", (Object)"F", (Object)"G", (Object)"H", (Object)"I", (Object)"J", (Object)"K")}).build();
        }

        @Test
        public void testReiteration() throws Exception {
            FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.of((Object)StateBackedIterableTest.key("nonEmptySuffix"), Arrays.asList("C", "D", "E", "F", "G", "H", "I", "J", "K"), (Object)StateBackedIterableTest.key("emptySuffix"), Arrays.asList(new String[0])));
            StateBackedIterable iterable = new StateBackedIterable(Caches.noop(), (BeamFnStateClient)fakeBeamFnStateClient, "instruction", StateBackedIterableTest.key(this.suffixKey), (Coder)StringUtf8Coder.of(), this.prefix);
            Assert.assertEquals(this.expected, Lists.newArrayList((Iterable)iterable));
            Assert.assertEquals(this.expected, Lists.newArrayList((Iterable)iterable));
            Assert.assertEquals(this.expected, Lists.newArrayList((Iterable)iterable));
        }

        @Test
        public void testReiterationCached() throws Exception {
            FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.of((Object)StateBackedIterableTest.key("nonEmptySuffix"), Arrays.asList("C", "D", "E", "F", "G", "H", "I", "J", "K"), (Object)StateBackedIterableTest.key("emptySuffix"), Arrays.asList(new String[0])));
            StateBackedIterable iterable = new StateBackedIterable(Caches.eternal(), (BeamFnStateClient)fakeBeamFnStateClient, "instruction", StateBackedIterableTest.key(this.suffixKey), (Coder)StringUtf8Coder.of(), this.prefix);
            Assert.assertEquals(0L, fakeBeamFnStateClient.getCallCount());
            Assert.assertEquals(this.expected, Lists.newArrayList((Iterable)iterable));
            int callCount = fakeBeamFnStateClient.getCallCount();
            Assert.assertEquals(this.expected, Lists.newArrayList((Iterable)iterable));
            Assert.assertEquals(this.expected, Lists.newArrayList((Iterable)iterable));
            Assert.assertEquals(callCount, fakeBeamFnStateClient.getCallCount());
        }

        @Test
        public void testCacheKeyIsUnique() throws Exception {
            Cache cache = Caches.eternal();
            FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.of((Object)StateBackedIterableTest.key("nonEmptySuffix"), Arrays.asList("C", "D", "E", "F", "G", "H", "I", "J", "K"), (Object)StateBackedIterableTest.key("emptySuffix"), Arrays.asList(new String[0]), (Object)StateBackedIterableTest.key("otherIterable"), Arrays.asList("Z")));
            StateBackedIterable otherIterable = new StateBackedIterable(cache, (BeamFnStateClient)fakeBeamFnStateClient, "instruction", StateBackedIterableTest.key("otherIterable"), (Coder)StringUtf8Coder.of(), Collections.emptyList());
            Assert.assertEquals(0L, fakeBeamFnStateClient.getCallCount());
            Assert.assertEquals(Arrays.asList("Z"), Lists.newArrayList((Iterable)otherIterable));
            StateBackedIterable iterable = new StateBackedIterable(cache, (BeamFnStateClient)fakeBeamFnStateClient, "instruction", StateBackedIterableTest.key(this.suffixKey), (Coder)StringUtf8Coder.of(), this.prefix);
            Assert.assertEquals(this.expected, Lists.newArrayList((Iterable)iterable));
            int callCount = fakeBeamFnStateClient.getCallCount();
            Assert.assertEquals(this.expected, Lists.newArrayList((Iterable)iterable));
            Assert.assertEquals(this.expected, Lists.newArrayList((Iterable)iterable));
            Assert.assertEquals(callCount, fakeBeamFnStateClient.getCallCount());
        }

        @Test
        public void testUsingInterleavedReiteration() throws Exception {
            FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.of((Object)StateBackedIterableTest.key("nonEmptySuffix"), Arrays.asList("C", "D", "E", "F", "G", "H", "I", "J", "K"), (Object)StateBackedIterableTest.key("emptySuffix"), Arrays.asList(new String[0])));
            StateBackedIterable iterable = new StateBackedIterable(Caches.noop(), (BeamFnStateClient)fakeBeamFnStateClient, "instruction", StateBackedIterableTest.key(this.suffixKey), (Coder)StringUtf8Coder.of(), this.prefix);
            ArrayList<ElementByteSizeObservableIterator> iterators = new ArrayList<ElementByteSizeObservableIterator>();
            ArrayList results = new ArrayList();
            for (int i = 0; i < 3; ++i) {
                iterators.add(iterable.iterator());
                results.add(new ArrayList());
            }
            Random random = new Random(42L);
            while (!iterators.isEmpty()) {
                int current = random.nextInt(iterators.size());
                if (!((Iterator)iterators.get(current)).hasNext()) {
                    iterators.remove(current);
                    Assert.assertEquals(this.expected, results.remove(current));
                    continue;
                }
                ((List)results.get(current)).add((String)((Iterator)iterators.get(current)).next());
            }
        }

        @Test
        public void testByteObservingStateBackedIterable() throws Exception {
            FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.of((Object)StateBackedIterableTest.key("nonEmptySuffix"), Arrays.asList("C", "D", "E", "F", "G", "H", "I", "J", "K"), (Object)StateBackedIterableTest.key("emptySuffix"), Arrays.asList(new String[0])));
            StateBackedIterable iterable = new StateBackedIterable(Caches.noop(), (BeamFnStateClient)fakeBeamFnStateClient, "instruction", StateBackedIterableTest.key(this.suffixKey), (Coder)StringUtf8Coder.of(), this.prefix);
            StateBackedIterable.Coder coder = new StateBackedIterable.Coder(() -> Caches.noop(), (BeamFnStateClient)fakeBeamFnStateClient, () -> "instructionId", (Coder)StringUtf8Coder.of());
            Assert.assertTrue(coder.isRegisterByteSizeObserverCheap((Iterable)iterable));
            TestByteObserver observer = new TestByteObserver();
            coder.registerByteSizeObserver((Iterable)iterable, (ElementByteSizeObserver)observer);
            Assert.assertTrue(observer.getIsLazy());
            long iterateBytes = Streams.stream((Iterable)iterable).mapToLong(s -> {
                try {
                    return 1L + StringUtf8Coder.of().getEncodedElementByteSize(s);
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }).sum();
            observer.advance();
            Assert.assertEquals((float)iterateBytes + 5.0f, (float)observer.total, 3.0f);
        }

        private static class TestByteObserver
        extends ElementByteSizeObserver {
            public long total = 0L;

            private TestByteObserver() {
            }

            protected void reportElementSize(long elementByteSize) {
                this.total += elementByteSize;
            }
        }
    }
}

