/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution.control;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
import org.apache.beam.runners.fnexecution.control.ControlClientPool;
import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.runners.fnexecution.control.MapControlClientPool;
import org.apache.beam.sdk.fn.channel.AddHarnessIdInterceptor;
import org.apache.beam.sdk.fn.server.FnService;
import org.apache.beam.sdk.fn.server.GrpcContextHeaderAccessorProvider;
import org.apache.beam.sdk.fn.server.GrpcFnServer;
import org.apache.beam.sdk.fn.server.HeaderAccessor;
import org.apache.beam.sdk.fn.server.InProcessServerFactory;
import org.apache.beam.sdk.fn.server.ServerFactory;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.util.MoreFutures;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Channel;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ClientInterceptor;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.StatusException;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.inprocess.InProcessChannelBuilder;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.StreamObserver;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Matchers;
import org.mockito.Mockito;

@RunWith(value=JUnit4.class)
public class FnApiControlClientPoolServiceTest {
    @Rule
    public transient Timeout globalTimeout = Timeout.seconds((long)600L);
    private static final String WORKER_ID = "test_worker_id";
    private final ControlClientPool pool = MapControlClientPool.create();
    private final FnApiControlClientPoolService controlService = FnApiControlClientPoolService.offeringClientsToPool((ControlClientPool.Sink)this.pool.getSink(), (HeaderAccessor)GrpcContextHeaderAccessorProvider.getHeaderAccessor());
    private GrpcFnServer<FnApiControlClientPoolService> server;
    private BeamFnControlGrpc.BeamFnControlStub stub;

    @Before
    public void setup() throws IOException {
        this.server = GrpcFnServer.allocatePortAndCreateFor((FnService)this.controlService, (ServerFactory)InProcessServerFactory.create());
        this.stub = BeamFnControlGrpc.newStub((Channel)((InProcessChannelBuilder)InProcessChannelBuilder.forName((String)this.server.getApiServiceDescriptor().getUrl()).intercept(new ClientInterceptor[]{AddHarnessIdInterceptor.create((String)WORKER_ID)})).build());
    }

    @After
    public void teardown() throws Exception {
        this.server.close();
    }

    @Test
    public void testIncomingConnection() throws Exception {
        String id = "fakeInstruction";
        CompletableFuture<StreamObserver> clientResponseStream = new CompletableFuture<StreamObserver>();
        clientResponseStream.complete(this.stub.control((StreamObserver)TestStreams.withOnNext(request -> {
            try {
                ((StreamObserver)clientResponseStream.get()).onNext((Object)BeamFnApi.InstructionResponse.newBuilder().setInstructionId(request.getInstructionId()).build());
            }
            catch (Exception e) {
                Assert.fail((String)"Unexpected failure");
                throw new RuntimeException(e);
            }
        }).build()));
        InstructionRequestHandler client = this.pool.getSource().take(WORKER_ID, Duration.ofSeconds(2L));
        CompletionStage responseFuture = client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
        Assert.assertEquals((Object)id, (Object)((BeamFnApi.InstructionResponse)MoreFutures.get((CompletionStage)responseFuture)).getInstructionId());
    }

    @Test
    public void testCloseCompletesClients() throws Exception {
        final CountDownLatch latch = new CountDownLatch(1);
        final AtomicBoolean sawComplete = new AtomicBoolean();
        this.stub.control((StreamObserver)new StreamObserver<BeamFnApi.InstructionRequest>(){

            public void onNext(BeamFnApi.InstructionRequest value) {
                Assert.fail((String)"Should never see a request");
            }

            public void onError(Throwable t) {
                latch.countDown();
            }

            public void onCompleted() {
                sawComplete.set(true);
                latch.countDown();
            }
        });
        this.pool.getSource().take(WORKER_ID, Duration.ofSeconds(2L));
        this.server.close();
        latch.await();
        MatcherAssert.assertThat((Object)sawComplete.get(), (Matcher)org.hamcrest.Matchers.is((Object)true));
    }

    @Test
    public void testUnknownBundle() throws Exception {
        BeamFnApi.GetProcessBundleDescriptorRequest request = BeamFnApi.GetProcessBundleDescriptorRequest.newBuilder().setProcessBundleDescriptorId("missing").build();
        StreamObserver responseObserver = (StreamObserver)Mockito.mock(StreamObserver.class);
        this.controlService.getProcessBundleDescriptor(request, responseObserver);
        ((StreamObserver)Mockito.verify((Object)responseObserver)).onError((Throwable)Matchers.argThat(e -> e instanceof StatusException && ((StatusException)e).getStatus().getCode() == Status.Code.NOT_FOUND));
    }
}

