/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution.data;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
import org.apache.beam.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver;
import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.DataEndpoint;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.server.FnService;
import org.apache.beam.sdk.fn.server.GrpcFnServer;
import org.apache.beam.sdk.fn.server.InProcessServerFactory;
import org.apache.beam.sdk.fn.server.ServerFactory;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Channel;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.inprocess.InProcessChannelBuilder;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.StreamObserver;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class GrpcDataServiceTest {
    @Rule
    public transient Timeout globalTimeout = Timeout.seconds((long)600L);
    private static final String TRANSFORM_ID = "888";
    private static final Coder<WindowedValue<String>> CODER = LengthPrefixCoder.of((Coder)WindowedValue.getValueOnlyCoder((Coder)StringUtf8Coder.of()));

    @Test
    public void testMessageReceivedBySingleClientWhenThereAreMultipleClients() throws Exception {
        LinkedBlockingQueue clientInboundElements = new LinkedBlockingQueue();
        ExecutorService executorService = Executors.newCachedThreadPool();
        CountDownLatch waitForInboundElements = new CountDownLatch(1);
        GrpcDataService service = GrpcDataService.create((PipelineOptions)PipelineOptionsFactory.create(), (ExecutorService)Executors.newCachedThreadPool(), (OutboundObserverFactory)OutboundObserverFactory.serverDirect());
        try (GrpcFnServer server = GrpcFnServer.allocatePortAndCreateFor((FnService)service, (ServerFactory)InProcessServerFactory.create());){
            int i;
            ArrayList<Future<Void>> clientFutures = new ArrayList<Future<Void>>();
            for (i = 0; i < 3; ++i) {
                clientFutures.add(executorService.submit(() -> {
                    ManagedChannel channel = ((InProcessChannelBuilder)InProcessChannelBuilder.forName((String)server.getApiServiceDescriptor().getUrl()).directExecutor()).build();
                    StreamObserver outboundObserver = BeamFnDataGrpc.newStub((Channel)channel).data((StreamObserver)TestStreams.withOnNext(clientInboundElements::add).build());
                    waitForInboundElements.await();
                    outboundObserver.onCompleted();
                    return null;
                }));
            }
            for (i = 0; i < 3; ++i) {
                String string = Integer.toString(i);
                BeamFnDataOutboundAggregator aggregator = service.createOutboundAggregator(() -> instructionId, false);
                aggregator.start();
                FnDataReceiver consumer = aggregator.registerOutputDataLocation(TRANSFORM_ID, CODER);
                consumer.accept((Object)WindowedValue.valueInGlobalWindow((Object)("A" + i)));
                consumer.accept((Object)WindowedValue.valueInGlobalWindow((Object)("B" + i)));
                consumer.accept((Object)WindowedValue.valueInGlobalWindow((Object)("C" + i)));
                aggregator.sendOrCollectBufferedDataAndFinishOutboundStreams();
            }
            waitForInboundElements.countDown();
            for (Future future : clientFutures) {
                future.get();
            }
            MatcherAssert.assertThat(clientInboundElements, (Matcher)Matchers.containsInAnyOrder((Object[])new BeamFnApi.Elements[]{this.elementsWithData("0"), this.elementsWithData("1"), this.elementsWithData("2")}));
        }
    }

    @Test
    public void testMultipleClientsSendMessagesAreDirectedToProperConsumers() throws Exception {
        LinkedBlockingQueue clientInboundElements = new LinkedBlockingQueue();
        ExecutorService executorService = Executors.newCachedThreadPool();
        CountDownLatch waitForInboundElements = new CountDownLatch(1);
        GrpcDataService service = GrpcDataService.create((PipelineOptions)PipelineOptionsFactory.create(), (ExecutorService)Executors.newCachedThreadPool(), (OutboundObserverFactory)OutboundObserverFactory.serverDirect());
        try (GrpcFnServer server = GrpcFnServer.allocatePortAndCreateFor((FnService)service, (ServerFactory)InProcessServerFactory.create());){
            ArrayList<Future<Void>> clientFutures = new ArrayList<Future<Void>>();
            for (int i = 0; i < 3; ++i) {
                String instructionId = Integer.toString(i);
                clientFutures.add(executorService.submit(() -> {
                    ManagedChannel channel = InProcessChannelBuilder.forName((String)server.getApiServiceDescriptor().getUrl()).build();
                    StreamObserver outboundObserver = BeamFnDataGrpc.newStub((Channel)channel).data((StreamObserver)TestStreams.withOnNext(clientInboundElements::add).build());
                    outboundObserver.onNext((Object)this.elementsWithData(instructionId));
                    waitForInboundElements.await();
                    outboundObserver.onCompleted();
                    return null;
                }));
            }
            ArrayList serverInboundValues = new ArrayList();
            ArrayList<BeamFnDataInboundObserver> inboundObservers = new ArrayList<BeamFnDataInboundObserver>();
            for (int i = 0; i < 3; ++i) {
                ArrayList arrayList = new ArrayList();
                serverInboundValues.add(arrayList);
                DataEndpoint[] dataEndpointArray = new DataEndpoint[1];
                dataEndpointArray[0] = DataEndpoint.create((String)TRANSFORM_ID, CODER, arrayList::add);
                BeamFnDataInboundObserver inboundObserver = BeamFnDataInboundObserver.forConsumers(Arrays.asList(dataEndpointArray), Collections.emptyList());
                service.registerReceiver(Integer.toString(i), (CloseableFnDataReceiver)inboundObserver);
                inboundObservers.add(inboundObserver);
            }
            for (BeamFnDataInboundObserver beamFnDataInboundObserver : inboundObservers) {
                beamFnDataInboundObserver.awaitCompletion();
            }
            waitForInboundElements.countDown();
            for (Future future : clientFutures) {
                future.get();
            }
            for (int i = 0; i < 3; ++i) {
                MatcherAssert.assertThat((Object)((Collection)serverInboundValues.get(i)), (Matcher)Matchers.contains((Object[])new WindowedValue[]{WindowedValue.valueInGlobalWindow((Object)("A" + i)), WindowedValue.valueInGlobalWindow((Object)("B" + i)), WindowedValue.valueInGlobalWindow((Object)("C" + i))}));
            }
            MatcherAssert.assertThat(clientInboundElements, (Matcher)Matchers.empty());
        }
    }

    private BeamFnApi.Elements elementsWithData(String id) throws CoderException {
        return BeamFnApi.Elements.newBuilder().addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(id).setTransformId(TRANSFORM_ID).setData(ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray(CODER, (Object)WindowedValue.valueInGlobalWindow((Object)("A" + id)))).concat(ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray(CODER, (Object)WindowedValue.valueInGlobalWindow((Object)("B" + id))))).concat(ByteString.copyFrom((byte[])CoderUtils.encodeToByteArray(CODER, (Object)WindowedValue.valueInGlobalWindow((Object)("C" + id))))))).addData(BeamFnApi.Elements.Data.newBuilder().setInstructionId(id).setTransformId(TRANSFORM_ID).setIsLast(true)).build();
    }
}

