/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.control;

import java.util.EnumMap;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.fn.harness.control.BeamFnControlClient;
import org.apache.beam.fn.harness.logging.BeamFnLoggingMDC;
import org.apache.beam.fn.harness.logging.RestoreBeamFnLoggingMDC;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.BindableService;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.inprocess.InProcessServerBuilder;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.CallStreamObserver;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class BeamFnControlClientTest {
    private static final BeamFnApi.InstructionRequest SUCCESSFUL_REQUEST = BeamFnApi.InstructionRequest.newBuilder().setInstructionId("1L").setProcessBundle(BeamFnApi.ProcessBundleRequest.getDefaultInstance()).build();
    private static final BeamFnApi.InstructionResponse SUCCESSFUL_RESPONSE = BeamFnApi.InstructionResponse.newBuilder().setInstructionId("1L").setProcessBundle(BeamFnApi.ProcessBundleResponse.getDefaultInstance()).build();
    private static final BeamFnApi.InstructionRequest UNKNOWN_HANDLER_REQUEST = BeamFnApi.InstructionRequest.newBuilder().setInstructionId("2L").build();
    private static final BeamFnApi.InstructionResponse UNKNOWN_HANDLER_RESPONSE = BeamFnApi.InstructionResponse.newBuilder().setInstructionId("2L").setError("Unknown InstructionRequest type " + BeamFnApi.InstructionRequest.RequestCase.REQUEST_NOT_SET).build();
    private static final RuntimeException FAILURE = new RuntimeException("TestFailure");
    private static final BeamFnApi.InstructionRequest FAILURE_REQUEST = BeamFnApi.InstructionRequest.newBuilder().setInstructionId("3L").setRegister(BeamFnApi.RegisterRequest.getDefaultInstance()).build();
    private static final BeamFnApi.InstructionResponse FAILURE_RESPONSE = BeamFnApi.InstructionResponse.newBuilder().setInstructionId("3L").setError(Throwables.getStackTraceAsString((Throwable)FAILURE)).build();
    @Rule
    public TestRule restoreMDCAfterTest = new RestoreBeamFnLoggingMDC();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDelegation() throws Exception {
        AtomicBoolean clientClosedStream = new AtomicBoolean();
        LinkedBlockingQueue values = new LinkedBlockingQueue();
        final LinkedBlockingQueue outboundServerObservers = new LinkedBlockingQueue();
        final CallStreamObserver inboundServerObserver = TestStreams.withOnNext(values::add).withOnCompleted(() -> clientClosedStream.set(true)).build();
        Endpoints.ApiServiceDescriptor apiServiceDescriptor = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString()).build();
        Server server = ((InProcessServerBuilder)InProcessServerBuilder.forName((String)apiServiceDescriptor.getUrl()).addService((BindableService)new BeamFnControlGrpc.BeamFnControlImplBase(){

            public StreamObserver<BeamFnApi.InstructionResponse> control(StreamObserver<BeamFnApi.InstructionRequest> outboundObserver) {
                Uninterruptibles.putUninterruptibly((BlockingQueue)outboundServerObservers, outboundObserver);
                return inboundServerObserver;
            }
        })).build();
        server.start();
        try {
            EnumMap<BeamFnApi.InstructionRequest.RequestCase, ThrowingFunction> handlers = new EnumMap<BeamFnApi.InstructionRequest.RequestCase, ThrowingFunction>(BeamFnApi.InstructionRequest.RequestCase.class);
            handlers.put(BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE, value -> {
                Assert.assertEquals(value.getInstructionId(), BeamFnLoggingMDC.getInstructionId());
                return BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(BeamFnApi.ProcessBundleResponse.getDefaultInstance());
            });
            handlers.put(BeamFnApi.InstructionRequest.RequestCase.REGISTER, value -> {
                Assert.assertEquals(value.getInstructionId(), BeamFnLoggingMDC.getInstructionId());
                throw FAILURE;
            });
            ExecutorService executor = Executors.newCachedThreadPool();
            BeamFnControlClient client = new BeamFnControlClient(apiServiceDescriptor, ManagedChannelFactory.createInProcess(), OutboundObserverFactory.trivial(), (Executor)executor, handlers);
            StreamObserver outboundServerObserver = (StreamObserver)outboundServerObservers.take();
            outboundServerObserver.onNext((Object)SUCCESSFUL_REQUEST);
            Assert.assertEquals(SUCCESSFUL_RESPONSE, values.take());
            outboundServerObserver.onNext((Object)UNKNOWN_HANDLER_REQUEST);
            Assert.assertEquals(UNKNOWN_HANDLER_RESPONSE, values.take());
            outboundServerObserver.onNext((Object)FAILURE_REQUEST);
            Assert.assertEquals(FAILURE_RESPONSE, values.take());
            outboundServerObserver.onCompleted();
            client.terminationFuture().get();
        }
        finally {
            server.shutdownNow();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testJavaErrorResponse() throws Exception {
        final LinkedBlockingQueue outboundServerObservers = new LinkedBlockingQueue();
        LinkedBlockingQueue error = new LinkedBlockingQueue();
        final CallStreamObserver inboundServerObserver = TestStreams.withOnNext(response -> Assert.fail(String.format("Unexpected Response %s", response))).withOnError(error::add).build();
        Endpoints.ApiServiceDescriptor apiServiceDescriptor = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString()).build();
        Server server = ((InProcessServerBuilder)InProcessServerBuilder.forName((String)apiServiceDescriptor.getUrl()).addService((BindableService)new BeamFnControlGrpc.BeamFnControlImplBase(){

            public StreamObserver<BeamFnApi.InstructionResponse> control(StreamObserver<BeamFnApi.InstructionRequest> outboundObserver) {
                Uninterruptibles.putUninterruptibly((BlockingQueue)outboundServerObservers, outboundObserver);
                return inboundServerObserver;
            }
        })).build();
        server.start();
        try {
            EnumMap<BeamFnApi.InstructionRequest.RequestCase, ThrowingFunction> handlers = new EnumMap<BeamFnApi.InstructionRequest.RequestCase, ThrowingFunction>(BeamFnApi.InstructionRequest.RequestCase.class);
            handlers.put(BeamFnApi.InstructionRequest.RequestCase.REGISTER, value -> {
                Assert.assertEquals(value.getInstructionId(), BeamFnLoggingMDC.getInstructionId());
                throw new Error("Test Error");
            });
            ExecutorService executor = Executors.newCachedThreadPool();
            BeamFnControlClient client = new BeamFnControlClient(apiServiceDescriptor, ManagedChannelFactory.createInProcess(), OutboundObserverFactory.trivial(), (Executor)executor, handlers);
            StreamObserver outboundServerObserver = (StreamObserver)outboundServerObservers.take();
            outboundServerObserver.onNext((Object)BeamFnApi.InstructionRequest.newBuilder().setInstructionId("0").setRegister(BeamFnApi.RegisterRequest.getDefaultInstance()).build());
            MatcherAssert.assertThat((Throwable)error.take(), Matchers.not(Matchers.nullValue()));
            try {
                client.terminationFuture().get();
                throw new IllegalStateException("The future should have terminated with an error");
            }
            catch (ExecutionException errorWrapper) {
                MatcherAssert.assertThat(errorWrapper.getCause().getMessage(), Matchers.containsString("Test Error"));
                server.shutdownNow();
            }
        }
        catch (Throwable throwable) {
            server.shutdownNow();
            throw throwable;
        }
    }
}

