/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.status;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.fn.harness.control.ExecutionStateSampler;
import org.apache.beam.fn.harness.control.ProcessBundleHandler;
import org.apache.beam.fn.harness.status.BeamFnStatusClient;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnWorkerStatusGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.BindableService;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.inprocess.InProcessServerBuilder;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.CallStreamObserver;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(value=JUnit4.class)
public class BeamFnStatusClientTest {
    private final Endpoints.ApiServiceDescriptor apiServiceDescriptor = Endpoints.ApiServiceDescriptor.newBuilder().setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString()).build();

    @Test
    public void testActiveBundleState() {
        ProcessBundleHandler handler = Mockito.mock(ProcessBundleHandler.class);
        ProcessBundleHandler.BundleProcessorCache processorCache = Mockito.mock(ProcessBundleHandler.BundleProcessorCache.class);
        HashMap<String, ProcessBundleHandler.BundleProcessor> bundleProcessorMap = new HashMap<String, ProcessBundleHandler.BundleProcessor>();
        for (int i = 0; i < 11; ++i) {
            ProcessBundleHandler.BundleProcessor processor = Mockito.mock(ProcessBundleHandler.BundleProcessor.class);
            ExecutionStateSampler.ExecutionStateTracker executionStateTracker = Mockito.mock(ExecutionStateSampler.ExecutionStateTracker.class);
            Mockito.when(processor.getStateTracker()).thenReturn(executionStateTracker);
            Mockito.when(executionStateTracker.getStatus()).thenReturn(ExecutionStateSampler.ExecutionStateTrackerStatus.create((String)"ptransformId", (String)"ptransformIdName", (Thread)Thread.currentThread(), (long)(i * 1000), null));
            String instruction = Integer.toString(i);
            Mockito.when(processorCache.find(instruction)).thenReturn(processor);
            bundleProcessorMap.put(instruction, processor);
        }
        Mockito.when(handler.getBundleProcessorCache()).thenReturn(processorCache);
        Mockito.when(processorCache.getActiveBundleProcessors()).thenReturn(bundleProcessorMap);
        ManagedChannelFactory channelFactory = ManagedChannelFactory.createInProcess();
        BeamFnStatusClient client = new BeamFnStatusClient(this.apiServiceDescriptor, arg_0 -> ((ManagedChannelFactory)channelFactory).forDescriptor(arg_0), handler.getBundleProcessorCache(), PipelineOptionsFactory.create(), Caches.noop());
        StringJoiner joiner = new StringJoiner("\n");
        joiner.add(client.getActiveProcessBundleState());
        String actualState = joiner.toString();
        ArrayList<String> expectedInstructions = new ArrayList<String>();
        for (int i = 0; i < 10; ++i) {
            expectedInstructions.add(String.format("Instruction %d", i));
        }
        MatcherAssert.assertThat(actualState, Matchers.stringContainsInOrder(expectedInstructions));
        MatcherAssert.assertThat(actualState, Matchers.not(Matchers.containsString("Instruction 10")));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWorkerStatusResponse() throws Exception {
        LinkedBlockingQueue values = new LinkedBlockingQueue();
        final LinkedBlockingQueue requestObservers = new LinkedBlockingQueue();
        CallStreamObserver inboundServerObserver = TestStreams.withOnNext(values::add).build();
        Server server = ((InProcessServerBuilder)InProcessServerBuilder.forName((String)this.apiServiceDescriptor.getUrl()).addService((BindableService)new BeamFnWorkerStatusGrpc.BeamFnWorkerStatusImplBase((StreamObserver)inboundServerObserver){
            final /* synthetic */ StreamObserver val$inboundServerObserver;
            {
                this.val$inboundServerObserver = streamObserver;
            }

            public StreamObserver<BeamFnApi.WorkerStatusResponse> workerStatus(StreamObserver<BeamFnApi.WorkerStatusRequest> responseObserver) {
                Uninterruptibles.putUninterruptibly((BlockingQueue)requestObservers, responseObserver);
                return this.val$inboundServerObserver;
            }
        })).build();
        server.start();
        try {
            ProcessBundleHandler.BundleProcessorCache processorCache = Mockito.mock(ProcessBundleHandler.BundleProcessorCache.class);
            Mockito.when(processorCache.getActiveBundleProcessors()).thenReturn(Collections.emptyMap());
            ManagedChannelFactory channelFactory = ManagedChannelFactory.createInProcess();
            new BeamFnStatusClient(this.apiServiceDescriptor, arg_0 -> ((ManagedChannelFactory)channelFactory).forDescriptor(arg_0), processorCache, PipelineOptionsFactory.create(), Caches.noop());
            StreamObserver requestObserver = (StreamObserver)requestObservers.take();
            requestObserver.onNext((Object)BeamFnApi.WorkerStatusRequest.newBuilder().setId("id").build());
            BeamFnApi.WorkerStatusResponse response = (BeamFnApi.WorkerStatusResponse)values.take();
            MatcherAssert.assertThat(response.getStatusInfo(), Matchers.containsString("No active processing bundles."));
            MatcherAssert.assertThat(response.getId(), Matchers.is("id"));
        }
        finally {
            server.shutdownNow();
        }
    }

    @Test
    public void testCacheStatsExist() {
        ManagedChannelFactory channelFactory = ManagedChannelFactory.createInProcess();
        BeamFnStatusClient client = new BeamFnStatusClient(this.apiServiceDescriptor, arg_0 -> ((ManagedChannelFactory)channelFactory).forDescriptor(arg_0), Mockito.mock(ProcessBundleHandler.BundleProcessorCache.class), PipelineOptionsFactory.create(), Caches.fromOptions((PipelineOptions)PipelineOptionsFactory.fromArgs((String[])new String[]{"--maxCacheMemoryUsageMb=234"}).create()));
        MatcherAssert.assertThat(client.getCacheStats(), Matchers.containsString("used/max 0/234 MB"));
    }
}

