/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.state;

import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnStateGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.fn.test.TestExecutors;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.BindableService;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.StatusRuntimeException;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.inprocess.InProcessServerBuilder;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.CallStreamObserver;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class BeamFnStateGrpcClientCacheTest {
    private static final String SUCCESS = "SUCCESS";
    private static final String FAIL = "FAIL";
    private static final String TEST_ERROR = "TEST ERROR";
    private static final String SERVER_ERROR = "SERVER ERROR";
    @Rule
    public TestExecutors.TestExecutorService executor = TestExecutors.from(Executors::newCachedThreadPool);
    private Endpoints.ApiServiceDescriptor apiServiceDescriptor;
    private Server testServer;
    private BeamFnStateGrpcClientCache clientCache;
    private BlockingQueue<StreamObserver<BeamFnApi.StateResponse>> outboundServerObservers;
    private BlockingQueue<BeamFnApi.StateRequest> values;

    @Before
    public void setUp() throws Exception {
        this.values = new LinkedBlockingQueue<BeamFnApi.StateRequest>();
        this.outboundServerObservers = new LinkedBlockingQueue<StreamObserver<BeamFnApi.StateResponse>>();
        final CallStreamObserver inboundServerObserver = TestStreams.withOnNext(this.values::add).build();
        this.apiServiceDescriptor = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(this.getClass().getName() + "-" + UUID.randomUUID()).build();
        this.testServer = ((InProcessServerBuilder)InProcessServerBuilder.forName((String)this.apiServiceDescriptor.getUrl()).addService((BindableService)new BeamFnStateGrpc.BeamFnStateImplBase(){

            public StreamObserver<BeamFnApi.StateRequest> state(StreamObserver<BeamFnApi.StateResponse> outboundObserver) {
                Uninterruptibles.putUninterruptibly((BlockingQueue)BeamFnStateGrpcClientCacheTest.this.outboundServerObservers, outboundObserver);
                return inboundServerObserver;
            }
        })).build();
        this.testServer.start();
        this.clientCache = new BeamFnStateGrpcClientCache(IdGenerators.decrementingLongs(), ManagedChannelFactory.createInProcess(), OutboundObserverFactory.trivial());
    }

    @After
    public void tearDown() throws Exception {
        this.testServer.shutdownNow();
    }

    @Test
    public void testCachingOfClient() throws Exception {
        Endpoints.ApiServiceDescriptor otherApiServiceDescriptor = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(this.apiServiceDescriptor.getUrl() + "-other").build();
        Server testServer2 = ((InProcessServerBuilder)InProcessServerBuilder.forName((String)otherApiServiceDescriptor.getUrl()).addService((BindableService)new BeamFnStateGrpc.BeamFnStateImplBase(){

            public StreamObserver<BeamFnApi.StateRequest> state(StreamObserver<BeamFnApi.StateResponse> outboundObserver) {
                throw new RuntimeException();
            }
        })).build();
        testServer2.start();
        try {
            Assert.assertSame(this.clientCache.forApiServiceDescriptor(this.apiServiceDescriptor), this.clientCache.forApiServiceDescriptor(this.apiServiceDescriptor));
            Assert.assertNotSame(this.clientCache.forApiServiceDescriptor(this.apiServiceDescriptor), this.clientCache.forApiServiceDescriptor(otherApiServiceDescriptor));
        }
        finally {
            testServer2.shutdownNow();
        }
    }

    @Test
    public void testRequestResponses() throws Exception {
        BeamFnStateClient client = this.clientCache.forApiServiceDescriptor(this.apiServiceDescriptor);
        CompletableFuture successfulResponse = client.handle(BeamFnApi.StateRequest.newBuilder().setInstructionId(SUCCESS));
        CompletableFuture unsuccessfulResponse = client.handle(BeamFnApi.StateRequest.newBuilder().setInstructionId(FAIL));
        StreamObserver<BeamFnApi.StateResponse> outboundServerObserver = this.outboundServerObservers.take();
        outboundServerObserver.onNext((Object)BeamFnApi.StateResponse.newBuilder().setId("UNKNOWN ID").build());
        this.handleServerRequest(outboundServerObserver, this.values.take());
        this.handleServerRequest(outboundServerObserver, this.values.take());
        Assert.assertNotNull(successfulResponse.get());
        try {
            unsuccessfulResponse.get();
            Assert.fail("Expected unsuccessful response");
        }
        catch (ExecutionException e) {
            MatcherAssert.assertThat(e.toString(), Matchers.containsString(TEST_ERROR));
        }
    }

    @Test
    public void testServerErrorCausesPendingAndFutureCallsToFail() throws Exception {
        BeamFnStateClient client = this.clientCache.forApiServiceDescriptor(this.apiServiceDescriptor);
        Future stateResponse = this.executor.submit(() -> client.handle(BeamFnApi.StateRequest.newBuilder().setInstructionId(SUCCESS)));
        Future serverResponse = this.executor.submit(() -> {
            StreamObserver<BeamFnApi.StateResponse> outboundServerObserver = this.outboundServerObservers.take();
            outboundServerObserver.onError((Throwable)new StatusRuntimeException(Status.INTERNAL.withDescription(SERVER_ERROR)));
            return null;
        });
        CompletableFuture inflight = (CompletableFuture)stateResponse.get();
        serverResponse.get();
        try {
            inflight.get();
            Assert.fail("Expected unsuccessful response due to server error");
        }
        catch (ExecutionException e) {
            MatcherAssert.assertThat(e.toString(), Matchers.containsString(SERVER_ERROR));
        }
    }

    @Test
    public void testServerCompletionCausesPendingAndFutureCallsToFail() throws Exception {
        BeamFnStateClient client = this.clientCache.forApiServiceDescriptor(this.apiServiceDescriptor);
        Future stateResponse = this.executor.submit(() -> client.handle(BeamFnApi.StateRequest.newBuilder().setInstructionId(SUCCESS)));
        Future serverResponse = this.executor.submit(() -> {
            StreamObserver<BeamFnApi.StateResponse> outboundServerObserver = this.outboundServerObservers.take();
            outboundServerObserver.onCompleted();
            return null;
        });
        CompletableFuture inflight = (CompletableFuture)stateResponse.get();
        serverResponse.get();
        try {
            inflight.get();
            Assert.fail("Expected unsuccessful response due to server error");
        }
        catch (ExecutionException e) {
            MatcherAssert.assertThat(e.toString(), Matchers.containsString("Server hanged up"));
        }
    }

    private void handleServerRequest(StreamObserver<BeamFnApi.StateResponse> outboundObserver, BeamFnApi.StateRequest value) {
        switch (value.getInstructionId()) {
            case "SUCCESS": {
                outboundObserver.onNext((Object)BeamFnApi.StateResponse.newBuilder().setId(value.getId()).build());
                return;
            }
            case "FAIL": {
                outboundObserver.onNext((Object)BeamFnApi.StateResponse.newBuilder().setId(value.getId()).setError(TEST_ERROR).build());
                return;
            }
        }
        outboundObserver.onNext((Object)BeamFnApi.StateResponse.newBuilder().setId(value.getId()).build());
    }
}

