/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness;

import com.google.auto.service.AutoService;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.beam.fn.harness.FnHarness;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.harness.JvmInitializer;
import org.apache.beam.sdk.options.ExecutorOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.MessageOrBuilder;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.TextFormat;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.BindableService;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ServerBuilder;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.InOrder;
import org.mockito.Mockito;

@RunWith(value=JUnit4.class)
public class FnHarnessTest {
    private static final BeamFnApi.InstructionRequest INSTRUCTION_REQUEST = BeamFnApi.InstructionRequest.newBuilder().setInstructionId("999L").setRegister(BeamFnApi.RegisterRequest.getDefaultInstance()).build();
    private static final BeamFnApi.InstructionResponse INSTRUCTION_RESPONSE = BeamFnApi.InstructionResponse.newBuilder().setInstructionId("999L").setRegister(BeamFnApi.RegisterResponse.getDefaultInstance()).build();
    private static Runnable onStartupMock = Mockito.mock(Runnable.class);
    private static Consumer<PipelineOptions> beforeProcessingMock = Mockito.mock(Consumer.class);
    @Rule
    public Timeout timeout = Timeout.builder().withLookingForStuckThread(true).withTimeout(1L, TimeUnit.MINUTES).build();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLaunchFnHarnessAndTeardownCleanly() throws Exception {
        Function environmentVariableMock = Mockito.mock(Function.class);
        final PipelineOptions options = PipelineOptionsFactory.create();
        Mockito.when((String)environmentVariableMock.apply("HARNESS_ID")).thenReturn("id");
        Mockito.when((String)environmentVariableMock.apply("PIPELINE_OPTIONS")).thenReturn(PipelineOptionsTranslation.toJson((PipelineOptions)options));
        final ArrayList logEntries = new ArrayList();
        final List instructionResponses = Mockito.mock(List.class);
        BeamFnLoggingGrpc.BeamFnLoggingImplBase loggingService = new BeamFnLoggingGrpc.BeamFnLoggingImplBase(){

            public StreamObserver<BeamFnApi.LogEntry.List> logging(StreamObserver<BeamFnApi.LogControl> responseObserver) {
                return TestStreams.withOnNext(entries -> logEntries.addAll(entries.getLogEntriesList())).withOnCompleted(() -> responseObserver.onCompleted()).build();
            }
        };
        BeamFnControlGrpc.BeamFnControlImplBase controlService = new BeamFnControlGrpc.BeamFnControlImplBase(){

            public StreamObserver<BeamFnApi.InstructionResponse> control(StreamObserver<BeamFnApi.InstructionRequest> responseObserver) {
                CountDownLatch waitForResponses = new CountDownLatch(1);
                ((ExecutorOptions)options.as(ExecutorOptions.class)).getScheduledExecutorService().submit(() -> {
                    responseObserver.onNext((Object)INSTRUCTION_REQUEST);
                    Uninterruptibles.awaitUninterruptibly((CountDownLatch)waitForResponses);
                    responseObserver.onCompleted();
                });
                return TestStreams.withOnNext(t -> {
                    instructionResponses.add(t);
                    waitForResponses.countDown();
                }).withOnCompleted(waitForResponses::countDown).build();
            }
        };
        Server loggingServer = ServerBuilder.forPort((int)0).addService((BindableService)loggingService).build();
        loggingServer.start();
        try {
            Server controlServer = ServerBuilder.forPort((int)0).addService((BindableService)controlService).build();
            controlServer.start();
            try {
                Endpoints.ApiServiceDescriptor loggingDescriptor = Endpoints.ApiServiceDescriptor.newBuilder().setUrl("localhost:" + loggingServer.getPort()).build();
                Endpoints.ApiServiceDescriptor controlDescriptor = Endpoints.ApiServiceDescriptor.newBuilder().setUrl("localhost:" + controlServer.getPort()).build();
                Mockito.when((String)environmentVariableMock.apply("LOGGING_API_SERVICE_DESCRIPTOR")).thenReturn(TextFormat.printer().printToString((MessageOrBuilder)loggingDescriptor));
                Mockito.when((String)environmentVariableMock.apply("CONTROL_API_SERVICE_DESCRIPTOR")).thenReturn(TextFormat.printer().printToString((MessageOrBuilder)controlDescriptor));
                FnHarness.main((Function)environmentVariableMock);
            }
            finally {
                controlServer.shutdownNow();
            }
        }
        finally {
            loggingServer.shutdownNow();
        }
        InOrder inOrder = Mockito.inOrder(onStartupMock, beforeProcessingMock, environmentVariableMock, instructionResponses);
        inOrder.verify(onStartupMock).run();
        inOrder.verify(environmentVariableMock, Mockito.atLeastOnce()).apply((String)Mockito.any());
        inOrder.verify(beforeProcessingMock).accept((PipelineOptions)Mockito.any());
        inOrder.verify(instructionResponses).add(INSTRUCTION_RESPONSE);
    }

    @AutoService(value={JvmInitializer.class})
    public static class FnHarnessTestInitializer
    implements JvmInitializer {
        public void onStartup() {
            onStartupMock.run();
        }

        public void beforeProcessing(PipelineOptions options) {
            beforeProcessingMock.accept(options);
        }
    }
}

