/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.state;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.fn.harness.state.BagUserState;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.FakeBeamFnStateClient;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class BagUserStateTest {
    @Test
    public void testGet() throws Exception {
        FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.of((Object)this.key("A"), Arrays.asList("A1", "A2", "A3")));
        BagUserState userState = new BagUserState(Caches.noop(), (BeamFnStateClient)fakeClient, "instructionId", this.key("A"), (Coder)StringUtf8Coder.of());
        Assert.assertArrayEquals(new String[]{"A1", "A2", "A3"}, Iterables.toArray((Iterable)userState.get(), String.class));
        userState.asyncClose();
        Assert.assertThrows(IllegalStateException.class, () -> userState.get());
    }

    @Test
    public void testGetCached() throws Exception {
        FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.of((Object)this.key("A"), Arrays.asList("A1", "A2", "A3")));
        Cache cache = Caches.eternal();
        BagUserState userState = new BagUserState(cache, (BeamFnStateClient)fakeClient, "instructionId", this.key("A"), (Coder)StringUtf8Coder.of());
        Assert.assertArrayEquals(new String[]{"A1", "A2", "A3"}, Iterables.toArray((Iterable)userState.get(), String.class));
        userState.asyncClose();
        userState = new BagUserState(cache, requestBuilder -> {
            throw new IllegalStateException("Unexpected call for test.");
        }, "instructionId", this.key("A"), (Coder)StringUtf8Coder.of());
        Assert.assertArrayEquals(new String[]{"A1", "A2", "A3"}, Iterables.toArray((Iterable)userState.get(), String.class));
        userState.asyncClose();
    }

    @Test
    public void testAppend() throws Exception {
        FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.of((Object)this.key("A"), Arrays.asList("A1")));
        BagUserState userState = new BagUserState(Caches.noop(), (BeamFnStateClient)fakeClient, "instructionId", this.key("A"), (Coder)StringUtf8Coder.of());
        userState.append((Object)"A2");
        PrefetchableIterable stateBeforeA3 = userState.get();
        Assert.assertArrayEquals(new String[]{"A1", "A2"}, Iterables.toArray((Iterable)stateBeforeA3, String.class));
        userState.append((Object)"A3");
        Assert.assertArrayEquals(new String[]{"A1", "A2"}, Iterables.toArray((Iterable)stateBeforeA3, String.class));
        Assert.assertArrayEquals(new String[]{"A1", "A2", "A3"}, Iterables.toArray((Iterable)userState.get(), String.class));
        userState.asyncClose();
        Assert.assertEquals(this.encode("A1", "A2", "A3"), fakeClient.getData().get(this.key("A")));
        Assert.assertThrows(IllegalStateException.class, () -> userState.append((Object)"A4"));
    }

    @Test
    public void testAppendBatchingLimit() throws Exception {
        String a1 = "A1";
        FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.of((Object)this.key("A"), Arrays.asList(a1)));
        BagUserState userState = new BagUserState(Caches.noop(), (BeamFnStateClient)fakeClient, "instructionId", this.key("A"), (Coder)StringUtf8Coder.of());
        String a2 = Strings.repeat((String)"A2", (int)0x200000);
        userState.append((Object)a2);
        String a3 = Strings.repeat((String)"A3", (int)0x400000);
        userState.append((Object)a3);
        String a4 = "A4";
        userState.append((Object)a4);
        String a5 = Strings.repeat((String)"A5", (int)100);
        userState.append((Object)a5);
        String a6 = Strings.repeat((String)"A6", (int)0xB00000);
        userState.append((Object)a6);
        String a7 = "A7";
        userState.append((Object)a7);
        PrefetchableIterable stateBeforeA3 = userState.get();
        Assert.assertArrayEquals(new String[]{a1, a2, a3, a4, a5, a6, a7}, Iterables.toArray((Iterable)stateBeforeA3, String.class));
        userState.asyncClose();
        Assert.assertEquals(this.encode(a1, a2, a3, a4, a5, a6, a7), fakeClient.getData().get(this.key("A")));
        Assert.assertTrue(this.encode(a2, a3).size() > 0xA00000);
        Assert.assertTrue(this.encode(a3, a4, a5).size() < 0xA00000);
        Assert.assertTrue(this.encode(a3, a4, a5, a6).size() > 0xA00000);
        Assert.assertTrue(this.encode(a6, a7).size() > 0xA00000);
        Assert.assertArrayEquals(new ByteString[]{this.encode(a1), this.encode(a2), this.encode(a3, a4, a5), this.encode(a6), this.encode(a7)}, Iterables.toArray((Iterable)fakeClient.getRawData().get(this.key("A")), ByteString.class));
    }

    @Test
    public void testAppendCached() throws Exception {
        FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.of((Object)this.key("A"), Arrays.asList("A1")));
        Cache cache = Caches.eternal();
        BagUserState userState = new BagUserState(cache, (BeamFnStateClient)fakeClient, "instructionId", this.key("A"), (Coder)StringUtf8Coder.of());
        userState.append((Object)"A2");
        PrefetchableIterable stateBeforeA3 = userState.get();
        Assert.assertArrayEquals(new String[]{"A1", "A2"}, Iterables.toArray((Iterable)stateBeforeA3, String.class));
        userState.append((Object)"A3");
        Assert.assertArrayEquals(new String[]{"A1", "A2"}, Iterables.toArray((Iterable)stateBeforeA3, String.class));
        Assert.assertArrayEquals(new String[]{"A1", "A2", "A3"}, Iterables.toArray((Iterable)userState.get(), String.class));
        userState.asyncClose();
        userState = new BagUserState(cache, requestBuilder -> {
            if (requestBuilder.hasGet()) {
                throw new IllegalStateException("Unexpected call for test.");
            }
            return fakeClient.handle(requestBuilder);
        }, "instructionId", this.key("A"), (Coder)StringUtf8Coder.of());
        userState.append((Object)"A4");
        PrefetchableIterable stateBeforeA5 = userState.get();
        Assert.assertArrayEquals(new String[]{"A1", "A2", "A3", "A4"}, Iterables.toArray((Iterable)stateBeforeA5, String.class));
        userState.append((Object)"A5");
        Assert.assertArrayEquals(new String[]{"A1", "A2", "A3", "A4"}, Iterables.toArray((Iterable)stateBeforeA5, String.class));
        Assert.assertArrayEquals(new String[]{"A1", "A2", "A3", "A4", "A5"}, Iterables.toArray((Iterable)userState.get(), String.class));
        userState.asyncClose();
        Assert.assertEquals(this.encode("A1", "A2", "A3", "A4", "A5"), fakeClient.getData().get(this.key("A")));
    }

    @Test
    public void testClear() throws Exception {
        FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.of((Object)this.key("A"), Arrays.asList("A1", "A2", "A3")));
        BagUserState userState = new BagUserState(Caches.noop(), (BeamFnStateClient)fakeClient, "instructionId", this.key("A"), (Coder)StringUtf8Coder.of());
        Assert.assertArrayEquals(new String[]{"A1", "A2", "A3"}, Iterables.toArray((Iterable)userState.get(), String.class));
        userState.clear();
        Assert.assertFalse(userState.get().iterator().hasNext());
        userState.append((Object)"A4");
        Assert.assertArrayEquals(new String[]{"A4"}, Iterables.toArray((Iterable)userState.get(), String.class));
        userState.clear();
        Assert.assertFalse(userState.get().iterator().hasNext());
        userState.asyncClose();
        Assert.assertNull(fakeClient.getData().get(this.key("A")));
        Assert.assertThrows(IllegalStateException.class, () -> userState.clear());
    }

    @Test
    public void testClearCached() throws Exception {
        FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.of((Object)this.key("A"), Arrays.asList("A1", "A2", "A3")));
        Cache cache = Caches.eternal();
        BagUserState userState = new BagUserState(cache, (BeamFnStateClient)fakeClient, "instructionId", this.key("A"), (Coder)StringUtf8Coder.of());
        Assert.assertArrayEquals(new String[]{"A1", "A2", "A3"}, Iterables.toArray((Iterable)userState.get(), String.class));
        userState.clear();
        Assert.assertFalse(userState.get().iterator().hasNext());
        userState.append((Object)"A4");
        Assert.assertArrayEquals(new String[]{"A4"}, Iterables.toArray((Iterable)userState.get(), String.class));
        userState.asyncClose();
        userState = new BagUserState(cache, requestBuilder -> {
            if (requestBuilder.hasGet()) {
                throw new IllegalStateException("Unexpected call for test.");
            }
            return fakeClient.handle(requestBuilder);
        }, "instructionId", this.key("A"), (Coder)StringUtf8Coder.of());
        Assert.assertArrayEquals(new String[]{"A4"}, Iterables.toArray((Iterable)userState.get(), String.class));
        userState.clear();
        Assert.assertFalse(userState.get().iterator().hasNext());
        userState.asyncClose();
        userState = new BagUserState(cache, requestBuilder -> {
            if (requestBuilder.hasGet()) {
                throw new IllegalStateException("Unexpected call for test.");
            }
            return fakeClient.handle(requestBuilder);
        }, "instructionId", this.key("A"), (Coder)StringUtf8Coder.of());
        Assert.assertArrayEquals(new String[0], Iterables.toArray((Iterable)userState.get(), String.class));
        userState.asyncClose();
        Assert.assertNull(fakeClient.getData().get(this.key("A")));
    }

    private BeamFnApi.StateKey key(String id) throws IOException {
        return BeamFnApi.StateKey.newBuilder().setBagUserState(BeamFnApi.StateKey.BagUserState.newBuilder().setTransformId("ptransformId").setUserStateId("stateId").setWindow(ByteString.copyFromUtf8((String)"encodedWindow")).setKey(this.encode(id))).build();
    }

    private ByteString encode(String ... values) throws IOException {
        ByteStringOutputStream out = new ByteStringOutputStream();
        for (String value : values) {
            StringUtf8Coder.of().encode(value, (OutputStream)out);
        }
        return out.toByteString();
    }
}

