/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.bigtable.gaxx.reframing;

import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StreamController;
import com.google.cloud.bigtable.gaxx.reframing.IncompleteStreamException;
import com.google.cloud.bigtable.gaxx.reframing.Reframer;
import com.google.cloud.bigtable.gaxx.reframing.ReframingResponseObserver;
import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi;
import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Queues;
import com.google.common.truth.Truth;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(value=JUnit4.class)
public class ReframingResponseObserverTest {
    private ExecutorService executor;

    @Before
    public void setUp() throws Exception {
        this.executor = Executors.newCachedThreadPool();
    }

    @After
    public void tearDown() throws Exception {
        this.executor.shutdownNow();
    }

    @Test
    public void testUnsolicitedResponseError() throws Exception {
        MockStreamingApi.MockResponseObserver outerObserver = new MockStreamingApi.MockResponseObserver(false);
        ReframingResponseObserver middleware = new ReframingResponseObserver(outerObserver, (Reframer)new DasherizingReframer(1));
        MockStreamingApi.MockServerStreamingCallable innerCallable = new MockStreamingApi.MockServerStreamingCallable();
        innerCallable.call("request", (ResponseObserver)middleware);
        MockStreamingApi.MockServerStreamingCall lastCall = innerCallable.popLastCall();
        MockStreamingApi.MockStreamController innerController = lastCall.getController();
        Preconditions.checkState((innerController.popLastPull() == 0 ? 1 : 0) != 0);
        Throwable error = null;
        try {
            innerController.getObserver().onResponse((Object)"a");
        }
        catch (Throwable t) {
            error = t;
        }
        Truth.assertThat((Throwable)error).isInstanceOf(IllegalStateException.class);
    }

    @Test
    public void testConcurrentRequestAfterClose() throws Exception {
        GatedMockResponseObserver outerObserver = new GatedMockResponseObserver(false);
        outerObserver.completeBreakpoint.enable();
        ReframingResponseObserver middleware = new ReframingResponseObserver((ResponseObserver)outerObserver, (Reframer)new DasherizingReframer(1));
        MockStreamingApi.MockServerStreamingCallable innerCallable = new MockStreamingApi.MockServerStreamingCallable();
        innerCallable.call("request", (ResponseObserver)middleware);
        MockStreamingApi.MockServerStreamingCall lastCall = innerCallable.popLastCall();
        final MockStreamingApi.MockStreamController innerController = lastCall.getController();
        Future<?> completeFuture = this.executor.submit(new Runnable(){

            @Override
            public void run() {
                innerController.getObserver().onComplete();
            }
        });
        outerObserver.completeBreakpoint.awaitArrival();
        outerObserver.getController().request(1);
        outerObserver.completeBreakpoint.release();
        Truth.assertThat((Throwable)outerObserver.getFinalError()).isNull();
        Throwable error = null;
        try {
            completeFuture.get();
        }
        catch (ExecutionException e) {
            error = e.getCause();
        }
        Truth.assertThat((Throwable)error).isNull();
    }

    @Test
    public void testOneToOne() throws InterruptedException {
        MockStreamingApi.MockResponseObserver outerObserver = new MockStreamingApi.MockResponseObserver(false);
        ReframingResponseObserver middleware = new ReframingResponseObserver(outerObserver, (Reframer)new DasherizingReframer(1));
        FakeStreamingApi.ServerStreamingStashCallable innerCallable = new FakeStreamingApi.ServerStreamingStashCallable(ImmutableList.of((Object)"a"));
        innerCallable.call("request", (ResponseObserver)middleware);
        outerObserver.getController().request(1);
        Truth.assertThat((String)((String)outerObserver.popNextResponse())).isEqualTo((Object)"a");
        Truth.assertThat((Boolean)outerObserver.isDone()).isTrue();
    }

    @Test
    public void testOneToOneAuto() throws InterruptedException {
        MockStreamingApi.MockResponseObserver outerObserver = new MockStreamingApi.MockResponseObserver(true);
        ReframingResponseObserver middleware = new ReframingResponseObserver(outerObserver, (Reframer)new DasherizingReframer(1));
        FakeStreamingApi.ServerStreamingStashCallable innerCallable = new FakeStreamingApi.ServerStreamingStashCallable(ImmutableList.of((Object)"a", (Object)"b"));
        innerCallable.call("request", (ResponseObserver)middleware);
        Truth.assertThat((String)((String)outerObserver.popNextResponse())).isEqualTo((Object)"a");
        Truth.assertThat((String)((String)outerObserver.popNextResponse())).isEqualTo((Object)"b");
        Truth.assertThat((Boolean)outerObserver.isDone()).isTrue();
    }

    @Test
    public void testManyToOne() throws InterruptedException {
        MockStreamingApi.MockResponseObserver outerObserver = new MockStreamingApi.MockResponseObserver(false);
        ReframingResponseObserver middleware = new ReframingResponseObserver(outerObserver, (Reframer)new DasherizingReframer(1));
        FakeStreamingApi.ServerStreamingStashCallable innerCallable = new FakeStreamingApi.ServerStreamingStashCallable(ImmutableList.of((Object)"a-b"));
        innerCallable.call("request", (ResponseObserver)middleware);
        Preconditions.checkState((outerObserver.popNextResponse() == null ? 1 : 0) != 0);
        outerObserver.getController().request(1);
        Truth.assertThat((String)((String)outerObserver.popNextResponse())).isEqualTo((Object)"a");
        Truth.assertThat((String)((String)outerObserver.popNextResponse())).isEqualTo(null);
        Truth.assertThat((Boolean)outerObserver.isDone()).isFalse();
        outerObserver.getController().request(1);
        Truth.assertThat((String)((String)outerObserver.popNextResponse())).isEqualTo((Object)"b");
        Truth.assertThat((Boolean)outerObserver.isDone()).isTrue();
    }

    @Test
    public void testManyToOneAuto() throws InterruptedException {
        MockStreamingApi.MockResponseObserver outerObserver = new MockStreamingApi.MockResponseObserver(true);
        ReframingResponseObserver middleware = new ReframingResponseObserver(outerObserver, (Reframer)new DasherizingReframer(1));
        FakeStreamingApi.ServerStreamingStashCallable innerCallable = new FakeStreamingApi.ServerStreamingStashCallable(ImmutableList.of((Object)"a-b"));
        innerCallable.call("request", (ResponseObserver)middleware);
        Truth.assertThat((String)((String)outerObserver.popNextResponse())).isEqualTo((Object)"a");
        Truth.assertThat((String)((String)outerObserver.popNextResponse())).isEqualTo((Object)"b");
        Truth.assertThat((Boolean)outerObserver.isDone()).isTrue();
    }

    @Test
    public void testManyToOneCancelEarly() throws InterruptedException {
        MockStreamingApi.MockResponseObserver outerObserver = new MockStreamingApi.MockResponseObserver(false);
        ReframingResponseObserver middleware = new ReframingResponseObserver(outerObserver, (Reframer)new DasherizingReframer(1));
        MockStreamingApi.MockServerStreamingCallable innerCallable = new MockStreamingApi.MockServerStreamingCallable();
        innerCallable.call("request", (ResponseObserver)middleware);
        MockStreamingApi.MockServerStreamingCall lastCall = innerCallable.popLastCall();
        MockStreamingApi.MockStreamController innerController = lastCall.getController();
        outerObserver.getController().request(1);
        innerController.getObserver().onResponse((Object)"a-b");
        outerObserver.popNextResponse();
        outerObserver.getController().cancel();
        Truth.assertThat((Boolean)innerController.isCancelled()).isTrue();
        innerController.getObserver().onError((Throwable)new RuntimeException("Some other upstream error"));
        Truth.assertThat((Throwable)outerObserver.getFinalError()).isInstanceOf(CancellationException.class);
    }

    @Test
    public void testOneToMany() throws InterruptedException {
        MockStreamingApi.MockResponseObserver outerObserver = new MockStreamingApi.MockResponseObserver(false);
        ReframingResponseObserver middleware = new ReframingResponseObserver(outerObserver, (Reframer)new DasherizingReframer(2));
        FakeStreamingApi.ServerStreamingStashCallable innerCallable = new FakeStreamingApi.ServerStreamingStashCallable(ImmutableList.of((Object)"a", (Object)"b"));
        innerCallable.call("request", (ResponseObserver)middleware);
        Preconditions.checkState((outerObserver.popNextResponse() == null ? 1 : 0) != 0);
        outerObserver.getController().request(1);
        Truth.assertThat((String)((String)outerObserver.popNextResponse())).isEqualTo((Object)"a-b");
        Truth.assertThat((Boolean)outerObserver.isDone()).isTrue();
        Truth.assertThat((Throwable)outerObserver.getFinalError()).isNull();
    }

    @Test
    public void testOneToManyAuto() throws InterruptedException {
        MockStreamingApi.MockResponseObserver outerObserver = new MockStreamingApi.MockResponseObserver(true);
        ReframingResponseObserver middleware = new ReframingResponseObserver(outerObserver, (Reframer)new DasherizingReframer(2));
        FakeStreamingApi.ServerStreamingStashCallable innerCallable = new FakeStreamingApi.ServerStreamingStashCallable(ImmutableList.of((Object)"a", (Object)"b"));
        innerCallable.call("request", (ResponseObserver)middleware);
        Truth.assertThat((String)((String)outerObserver.popNextResponse())).isEqualTo((Object)"a-b");
        Truth.assertThat((Boolean)outerObserver.isDone()).isTrue();
        Truth.assertThat((Throwable)outerObserver.getFinalError()).isNull();
    }

    @Test
    public void testOneToManyIncomplete() {
        MockStreamingApi.MockResponseObserver outerObserver = new MockStreamingApi.MockResponseObserver(true);
        ReframingResponseObserver middleware = new ReframingResponseObserver(outerObserver, (Reframer)new DasherizingReframer(2));
        FakeStreamingApi.ServerStreamingStashCallable innerCallable = new FakeStreamingApi.ServerStreamingStashCallable(ImmutableList.of((Object)"a"));
        innerCallable.call("request", (ResponseObserver)middleware);
        Truth.assertThat((Throwable)outerObserver.getFinalError()).isInstanceOf(IncompleteStreamException.class);
    }

    @Test
    public void testConcurrentCancel() throws InterruptedException {
        final MockStreamingApi.MockResponseObserver outerObserver = new MockStreamingApi.MockResponseObserver(true);
        ReframingResponseObserver middleware = new ReframingResponseObserver(outerObserver, (Reframer)new DasherizingReframer(2));
        MockStreamingApi.MockServerStreamingCallable innerCallable = new MockStreamingApi.MockServerStreamingCallable();
        innerCallable.call("request", (ResponseObserver)middleware);
        MockStreamingApi.MockServerStreamingCall lastCall = innerCallable.popLastCall();
        final MockStreamingApi.MockStreamController innerController = lastCall.getController();
        final CountDownLatch latch = new CountDownLatch(2);
        this.executor.submit(new Runnable(){

            @Override
            public void run() {
                while (!outerObserver.isDone()) {
                    outerObserver.popNextResponse();
                }
                latch.countDown();
            }
        });
        this.executor.submit(new Runnable(){

            @Override
            public void run() {
                while (!innerController.isCancelled()) {
                    if (innerController.popLastPull() <= 0) continue;
                    innerController.getObserver().onResponse((Object)"a");
                }
                innerController.getObserver().onError((Throwable)new RuntimeException("Some other upstream error"));
                latch.countDown();
            }
        });
        outerObserver.getController().cancel();
        Truth.assertThat((Boolean)latch.await(1L, TimeUnit.MINUTES)).isTrue();
    }

    @Test
    public void testReframerPushError() throws Exception {
        MockStreamingApi.MockResponseObserver outerObserver = new MockStreamingApi.MockResponseObserver(true);
        DasherizingReframer reframer = new DasherizingReframer(1){

            @Override
            public void push(String response) {
                if ("boom".equals(response)) {
                    throw new IllegalStateException("fake error");
                }
                super.push(response);
            }
        };
        ReframingResponseObserver middleware = new ReframingResponseObserver(outerObserver, (Reframer)reframer);
        FakeStreamingApi.ServerStreamingStashCallable innerCallable = new FakeStreamingApi.ServerStreamingStashCallable(ImmutableList.of((Object)"a", (Object)"boom", (Object)"c"));
        innerCallable.call("request", (ResponseObserver)middleware);
        Truth.assertThat((Throwable)outerObserver.getFinalError()).isInstanceOf(IllegalStateException.class);
        Truth.assertThat((Throwable)outerObserver.getFinalError()).hasMessageThat().isEqualTo((Object)"fake error");
        Truth.assertThat((String)((String)outerObserver.popNextResponse())).isEqualTo((Object)"a");
        Truth.assertThat((String)((String)outerObserver.popNextResponse())).isNull();
    }

    @Test
    public void testReframerPopError() {
        final AtomicInteger popCount = new AtomicInteger();
        MockStreamingApi.MockResponseObserver outerObserver = new MockStreamingApi.MockResponseObserver(true);
        DasherizingReframer reframer = new DasherizingReframer(1){

            @Override
            public String pop() {
                if (popCount.incrementAndGet() == 2) {
                    throw new IllegalStateException("fake error");
                }
                return super.pop();
            }
        };
        ReframingResponseObserver middleware = new ReframingResponseObserver(outerObserver, (Reframer)reframer);
        FakeStreamingApi.ServerStreamingStashCallable innerCallable = new FakeStreamingApi.ServerStreamingStashCallable(ImmutableList.of((Object)"a", (Object)"boom", (Object)"c"));
        innerCallable.call("request", (ResponseObserver)middleware);
        FakeStreamingApi.ServerStreamingStashCallable.StreamControllerStash lastCall = innerCallable.popLastCall();
        Truth.assertThat((Throwable)outerObserver.getFinalError()).isInstanceOf(IllegalStateException.class);
        Truth.assertThat((Throwable)outerObserver.getFinalError()).hasMessageThat().isEqualTo((Object)"fake error");
        Truth.assertThat((String)((String)outerObserver.popNextResponse())).isEqualTo((Object)"a");
        Truth.assertThat((String)((String)outerObserver.popNextResponse())).isNull();
        Truth.assertThat((Integer)popCount.get()).isEqualTo((Object)2);
        Truth.assertThat((Throwable)lastCall.getError()).isInstanceOf(CancellationException.class);
        Truth.assertThat((Long)lastCall.getNumDelivered()).isEqualTo((Object)2);
    }

    @Test
    public void testFailedRecoveryHandling() {
        MockStreamingApi.MockResponseObserver outerObserver = new MockStreamingApi.MockResponseObserver(true);
        final RuntimeException fakeReframerError = new RuntimeException("fake reframer error");
        Reframer<String, String> brokenReframer = new Reframer<String, String>(){

            public void push(String ignored) {
                throw fakeReframerError;
            }

            public boolean hasFullFrame() {
                return false;
            }

            public boolean hasPartialFrame() {
                return false;
            }

            public String pop() {
                throw new IllegalStateException("should not be called");
            }
        };
        ReframingResponseObserver middleware = new ReframingResponseObserver(outerObserver, (Reframer)brokenReframer);
        StreamController mockInnerController = (StreamController)Mockito.mock(StreamController.class);
        RuntimeException fakeCancelError = new RuntimeException("fake cancel error");
        ((StreamController)Mockito.doThrow((Throwable[])new Throwable[]{fakeCancelError}).when((Object)mockInnerController)).cancel();
        middleware.onStartImpl(mockInnerController);
        middleware.onResponseImpl((Object)"1");
        Throwable finalError = outerObserver.getFinalError();
        Truth.assertThat((Throwable)finalError).isSameInstanceAs((Object)fakeReframerError);
        Truth.assertThat((Iterable)ImmutableList.of((Object)finalError.getSuppressed())).hasSize(1);
        Truth.assertThat((Throwable)finalError.getSuppressed()[0]).isInstanceOf(IllegalStateException.class);
        Truth.assertThat((Throwable)finalError.getSuppressed()[0]).hasMessageThat().isEqualTo((Object)"Failed to cancel upstream while recovering from an unexpected error");
        Truth.assertThat((Throwable)finalError.getSuppressed()[0].getCause()).isSameInstanceAs((Object)fakeCancelError);
    }

    @Test
    public void testRequestAndCompleteRaceCondition() throws Throwable {
        int concurrency = 20;
        int iterations = 20000;
        ExecutorService executor = Executors.newFixedThreadPool(concurrency);
        ArrayList<Future> results = new ArrayList<Future>();
        for (int i = 0; i < concurrency; ++i) {
            Future result = executor.submit(() -> {
                for (int j = 0; j < iterations; ++j) {
                    ReframingResponseObserverTest.requestAndCompleteRaceConditionIteration();
                }
                return null;
            });
            results.add(result);
        }
        executor.shutdown();
        for (Future result : results) {
            try {
                result.get();
            }
            catch (ExecutionException e) {
                throw e.getCause();
            }
        }
    }

    private static void requestAndCompleteRaceConditionIteration() throws InterruptedException, ExecutionException {
        MockStreamingApi.MockResponseObserver observer = new MockStreamingApi.MockResponseObserver(false);
        ReframingResponseObserver underTest = new ReframingResponseObserver(observer, (Reframer)new DasherizingReframer(1));
        CountDownLatch readySignal = new CountDownLatch(2);
        CompletableFuture<Object> startSignal = new CompletableFuture<Object>();
        ExecutorService executor = Executors.newFixedThreadPool(2);
        Future<Void> f1 = executor.submit(() -> {
            readySignal.countDown();
            startSignal.get();
            underTest.onComplete();
            return null;
        });
        Future<Void> f2 = executor.submit(() -> {
            underTest.onStart(new StreamController(){

                public void cancel() {
                }

                public void disableAutoInboundFlowControl() {
                }

                public void request(int count) {
                }
            });
            observer.getController().request(1);
            underTest.onResponse((Object)"moo");
            readySignal.countDown();
            startSignal.get();
            observer.getController().request(1);
            return null;
        });
        executor.shutdown();
        readySignal.await();
        startSignal.complete(null);
        f1.get();
        f2.get();
        Truth.assertWithMessage((String)"outer observer should not hang").that(Boolean.valueOf(observer.isDone())).isTrue();
    }

    static class DasherizingReframer
    implements Reframer<String, String> {
        final Queue<String> buffer = Queues.newArrayDeque();
        final int partsPerResponse;

        DasherizingReframer(int partsPerResponse) {
            this.partsPerResponse = partsPerResponse;
        }

        public void push(String response) {
            this.buffer.addAll(Arrays.asList(response.split("-")));
        }

        public boolean hasFullFrame() {
            return this.buffer.size() >= this.partsPerResponse;
        }

        public boolean hasPartialFrame() {
            return !this.buffer.isEmpty();
        }

        public String pop() {
            Object[] parts = new String[this.partsPerResponse];
            for (int i = 0; i < this.partsPerResponse; ++i) {
                parts[i] = this.buffer.poll();
            }
            return Joiner.on((String)"-").join(parts);
        }
    }

    static class GatedMockResponseObserver
    extends MockStreamingApi.MockResponseObserver<String> {
        final Breakpoint completeBreakpoint = new Breakpoint();
        final Breakpoint errorBreakpoint = new Breakpoint();

        public GatedMockResponseObserver(boolean autoFlowControl) {
            super(autoFlowControl);
        }

        @Override
        protected void onErrorImpl(Throwable t) {
            super.onErrorImpl(t);
            this.errorBreakpoint.arrive();
        }

        @Override
        protected void onCompleteImpl() {
            super.onCompleteImpl();
            this.completeBreakpoint.arrive();
        }
    }

    static class Breakpoint {
        private volatile CountDownLatch arriveLatch = new CountDownLatch(0);
        private volatile CountDownLatch leaveLatch = new CountDownLatch(0);

        Breakpoint() {
        }

        public void enable() {
            this.arriveLatch = new CountDownLatch(1);
            this.leaveLatch = new CountDownLatch(1);
        }

        public void arrive() {
            this.arriveLatch.countDown();
            try {
                this.leaveLatch.await(1L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        void awaitArrival() {
            try {
                this.arriveLatch.await(1L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        public void release() {
            this.leaveLatch.countDown();
        }
    }
}

