/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution.control;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
import org.apache.beam.runners.fnexecution.control.SingleEnvironmentInstanceJobBundleFactory;
import org.apache.beam.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.fn.server.FnService;
import org.apache.beam.sdk.fn.server.GrpcFnServer;
import org.apache.beam.sdk.fn.server.InProcessServerFactory;
import org.apache.beam.sdk.fn.server.ServerFactory;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.construction.Environments;
import org.apache.beam.sdk.util.construction.PipelineTranslation;
import org.apache.beam.sdk.util.construction.graph.ExecutableStage;
import org.apache.beam.sdk.util.construction.graph.GreedyPipelineFuser;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(value=JUnit4.class)
public class SingleEnvironmentInstanceJobBundleFactoryTest {
    @Rule
    public transient Timeout globalTimeout = Timeout.seconds((long)600L);
    @Mock
    private EnvironmentFactory environmentFactory;
    @Mock
    private InstructionRequestHandler instructionRequestHandler;
    private ExecutorService executor = Executors.newCachedThreadPool();
    private GrpcFnServer<GrpcDataService> dataServer;
    private GrpcFnServer<GrpcStateService> stateServer;
    private JobBundleFactory factory;
    private static final String GENERATED_ID = "staticId";

    @Before
    public void setup() throws Exception {
        MockitoAnnotations.initMocks((Object)this);
        Mockito.when((Object)this.instructionRequestHandler.handle((BeamFnApi.InstructionRequest)Matchers.any(BeamFnApi.InstructionRequest.class))).thenReturn(CompletableFuture.completedFuture(BeamFnApi.InstructionResponse.getDefaultInstance()));
        InProcessServerFactory serverFactory = InProcessServerFactory.create();
        this.dataServer = GrpcFnServer.allocatePortAndCreateFor((FnService)GrpcDataService.create((PipelineOptions)PipelineOptionsFactory.create(), (ExecutorService)this.executor, (OutboundObserverFactory)OutboundObserverFactory.serverDirect()), (ServerFactory)serverFactory);
        this.stateServer = GrpcFnServer.allocatePortAndCreateFor((FnService)GrpcStateService.create(), (ServerFactory)serverFactory);
        this.factory = SingleEnvironmentInstanceJobBundleFactory.create((EnvironmentFactory)this.environmentFactory, this.dataServer, this.stateServer, () -> GENERATED_ID);
    }

    @After
    public void teardown() throws Exception {
        try (GrpcFnServer<GrpcDataService> data = this.dataServer;
             GrpcFnServer<GrpcStateService> state = this.stateServer;){
            this.executor.shutdownNow();
        }
    }

    @Test
    public void closeShutsDownEnvironments() throws Exception {
        Pipeline p = Pipeline.create();
        ExperimentalOptions.addExperiment((ExperimentalOptions)((ExperimentalOptions)p.getOptions().as(ExperimentalOptions.class)), (String)"beam_fn_api");
        p.apply("Create", (PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}));
        ExecutableStage stage = (ExecutableStage)GreedyPipelineFuser.fuse((RunnerApi.Pipeline)PipelineTranslation.toProto((Pipeline)p)).getFusedStages().stream().findFirst().get();
        RemoteEnvironment remoteEnv = (RemoteEnvironment)Mockito.mock(RemoteEnvironment.class);
        Mockito.when((Object)remoteEnv.getInstructionRequestHandler()).thenReturn((Object)this.instructionRequestHandler);
        Mockito.when((Object)this.environmentFactory.createEnvironment(stage.getEnvironment(), GENERATED_ID)).thenReturn((Object)remoteEnv);
        this.factory.forStage(stage);
        this.factory.close();
        ((RemoteEnvironment)Mockito.verify((Object)remoteEnv)).close();
    }

    @Test
    public void closeShutsDownEnvironmentsWhenSomeFail() throws Exception {
        Pipeline p = Pipeline.create();
        ExperimentalOptions.addExperiment((ExperimentalOptions)((ExperimentalOptions)p.getOptions().as(ExperimentalOptions.class)), (String)"beam_fn_api");
        p.apply("Create", (PTransform)Create.of((Object)1, (Object[])new Integer[]{2, 3}));
        ExecutableStage firstEnvStage = (ExecutableStage)GreedyPipelineFuser.fuse((RunnerApi.Pipeline)PipelineTranslation.toProto((Pipeline)p)).getFusedStages().stream().findFirst().get();
        RunnerApi.ExecutableStagePayload basePayload = RunnerApi.ExecutableStagePayload.parseFrom((ByteString)firstEnvStage.toPTransform("foo").getSpec().getPayload());
        RunnerApi.Environment secondEnv = Environments.createDockerEnvironment((String)"second_env");
        ExecutableStage secondEnvStage = ExecutableStage.fromPayload((RunnerApi.ExecutableStagePayload)basePayload.toBuilder().setEnvironment(secondEnv).build());
        RunnerApi.Environment thirdEnv = Environments.createDockerEnvironment((String)"third_env");
        ExecutableStage thirdEnvStage = ExecutableStage.fromPayload((RunnerApi.ExecutableStagePayload)basePayload.toBuilder().setEnvironment(thirdEnv).build());
        RemoteEnvironment firstRemoteEnv = (RemoteEnvironment)Mockito.mock(RemoteEnvironment.class, (String)"First Remote Env");
        RemoteEnvironment secondRemoteEnv = (RemoteEnvironment)Mockito.mock(RemoteEnvironment.class, (String)"Second Remote Env");
        RemoteEnvironment thirdRemoteEnv = (RemoteEnvironment)Mockito.mock(RemoteEnvironment.class, (String)"Third Remote Env");
        Mockito.when((Object)this.environmentFactory.createEnvironment(firstEnvStage.getEnvironment(), GENERATED_ID)).thenReturn((Object)firstRemoteEnv);
        Mockito.when((Object)this.environmentFactory.createEnvironment(secondEnvStage.getEnvironment(), GENERATED_ID)).thenReturn((Object)secondRemoteEnv);
        Mockito.when((Object)this.environmentFactory.createEnvironment(thirdEnvStage.getEnvironment(), GENERATED_ID)).thenReturn((Object)thirdRemoteEnv);
        Mockito.when((Object)firstRemoteEnv.getInstructionRequestHandler()).thenReturn((Object)this.instructionRequestHandler);
        Mockito.when((Object)secondRemoteEnv.getInstructionRequestHandler()).thenReturn((Object)this.instructionRequestHandler);
        Mockito.when((Object)thirdRemoteEnv.getInstructionRequestHandler()).thenReturn((Object)this.instructionRequestHandler);
        this.factory.forStage(firstEnvStage);
        this.factory.forStage(secondEnvStage);
        this.factory.forStage(thirdEnvStage);
        IllegalStateException firstException = new IllegalStateException("first stage");
        ((RemoteEnvironment)Mockito.doThrow((Throwable[])new Throwable[]{firstException}).when((Object)firstRemoteEnv)).close();
        IllegalStateException thirdException = new IllegalStateException("third stage");
        ((RemoteEnvironment)Mockito.doThrow((Throwable[])new Throwable[]{thirdException}).when((Object)thirdRemoteEnv)).close();
        try {
            this.factory.close();
            Assert.fail((String)"Factory close should have thrown");
        }
        catch (IllegalStateException expected) {
            if (expected.equals(firstException)) {
                MatcherAssert.assertThat((Object)ImmutableList.copyOf((Object[])expected.getSuppressed()), (Matcher)org.hamcrest.Matchers.contains((Object[])new Throwable[]{thirdException}));
            } else if (expected.equals(thirdException)) {
                MatcherAssert.assertThat((Object)ImmutableList.copyOf((Object[])expected.getSuppressed()), (Matcher)org.hamcrest.Matchers.contains((Object[])new Throwable[]{firstException}));
            } else {
                throw expected;
            }
            ((RemoteEnvironment)Mockito.verify((Object)firstRemoteEnv)).close();
            ((RemoteEnvironment)Mockito.verify((Object)secondRemoteEnv)).close();
            ((RemoteEnvironment)Mockito.verify((Object)thirdRemoteEnv)).close();
        }
    }
}

