/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.direct_java.runners.fnexecution.control;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.BundleCheckpointHandler;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.BundleFinalizationHandler;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.RemoteOutputReceiver;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.data.FnDataService;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.data.RemoteInputDestination;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.state.StateDelegator;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.repackaged.direct_java.sdk.fn.IdGenerator;
import org.apache.beam.repackaged.direct_java.sdk.fn.IdGenerators;
import org.apache.beam.repackaged.direct_java.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.repackaged.direct_java.sdk.fn.data.FnDataReceiver;
import org.apache.beam.repackaged.direct_java.sdk.fn.data.InboundDataClient;
import org.apache.beam.repackaged.direct_java.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.util.MoreFutures;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SdkHarnessClient
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(SdkHarnessClient.class);
    private final IdGenerator idGenerator;
    private final InstructionRequestHandler fnApiControlClient;
    private final FnDataService fnApiDataService;
    private final ConcurrentHashMap<String, BundleProcessor> clientProcessors;

    private SdkHarnessClient(InstructionRequestHandler fnApiControlClient, FnDataService fnApiDataService, IdGenerator idGenerator) {
        this.fnApiDataService = fnApiDataService;
        this.idGenerator = idGenerator;
        this.fnApiControlClient = fnApiControlClient;
        this.clientProcessors = new ConcurrentHashMap();
    }

    public static SdkHarnessClient usingFnApiClient(InstructionRequestHandler fnApiControlClient, FnDataService fnApiDataService) {
        return new SdkHarnessClient(fnApiControlClient, fnApiDataService, IdGenerators.incrementingLongs());
    }

    public SdkHarnessClient withIdGenerator(IdGenerator idGenerator) {
        return new SdkHarnessClient(this.fnApiControlClient, this.fnApiDataService, idGenerator);
    }

    public BundleProcessor getProcessor(BeamFnApi.ProcessBundleDescriptor descriptor, Map<String, RemoteInputDestination> remoteInputDesinations) {
        Preconditions.checkState((!descriptor.hasStateApiServiceDescriptor() ? 1 : 0) != 0, (String)"The %s cannot support a %s containing a state %s.", (Object)BundleProcessor.class.getSimpleName(), (Object)BeamFnApi.ProcessBundleDescriptor.class.getSimpleName(), (Object)Endpoints.ApiServiceDescriptor.class.getSimpleName());
        return this.getProcessor(descriptor, remoteInputDesinations, NoOpStateDelegator.INSTANCE);
    }

    public BundleProcessor getProcessor(BeamFnApi.ProcessBundleDescriptor descriptor, Map<String, RemoteInputDestination> remoteInputDestinations, StateDelegator stateDelegator) {
        BundleProcessor bundleProcessor = this.clientProcessors.computeIfAbsent(descriptor.getId(), s -> this.create(descriptor, remoteInputDestinations, stateDelegator));
        Preconditions.checkArgument((boolean)bundleProcessor.processBundleDescriptor.equals((Object)descriptor), (String)"The provided %s with id %s collides with an existing %s with the same id but containing different contents.", (Object)BeamFnApi.ProcessBundleDescriptor.class.getSimpleName(), (Object)descriptor.getId(), (Object)BeamFnApi.ProcessBundleDescriptor.class.getSimpleName());
        return bundleProcessor;
    }

    private BundleProcessor create(BeamFnApi.ProcessBundleDescriptor processBundleDescriptor, Map<String, RemoteInputDestination> remoteInputDestinations, StateDelegator stateDelegator) {
        LOG.debug("Registering {}", (Object)processBundleDescriptor);
        CompletionStage<BeamFnApi.InstructionResponse> genericResponse = this.fnApiControlClient.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(this.idGenerator.getId()).setRegister(BeamFnApi.RegisterRequest.newBuilder().addProcessBundleDescriptor(processBundleDescriptor).build()).build());
        CompletionStage<BeamFnApi.RegisterResponse> registerResponseFuture = genericResponse.thenApply(BeamFnApi.InstructionResponse::getRegister);
        BundleProcessor bundleProcessor = new BundleProcessor(processBundleDescriptor, registerResponseFuture, remoteInputDestinations, stateDelegator);
        return bundleProcessor;
    }

    @Override
    public void close() {
    }

    private static class NoOpStateDelegator
    implements StateDelegator {
        private static final NoOpStateDelegator INSTANCE = new NoOpStateDelegator();

        private NoOpStateDelegator() {
        }

        @Override
        public Registration registerForProcessBundleInstructionId(String processBundleInstructionId, StateRequestHandler handler) {
            return Registration.INSTANCE;
        }

        private static class Registration
        implements StateDelegator.Registration {
            private static final Registration INSTANCE = new Registration();

            private Registration() {
            }

            @Override
            public void deregister() {
            }

            @Override
            public void abort() {
            }
        }
    }

    public static class ActiveBundle
    implements RemoteBundle {
        private final String bundleId;
        private final CompletionStage<BeamFnApi.ProcessBundleResponse> response;
        private final Map<String, CloseableFnDataReceiver> inputReceivers;
        private final Map<String, InboundDataClient> outputClients;
        private final StateDelegator.Registration stateRegistration;
        private final BundleProgressHandler progressHandler;
        private final BundleCheckpointHandler checkpointHandler;
        private final BundleFinalizationHandler finalizationHandler;

        private ActiveBundle(String bundleId, CompletionStage<BeamFnApi.ProcessBundleResponse> response, Map<String, CloseableFnDataReceiver> inputReceivers, Map<String, InboundDataClient> outputClients, StateDelegator.Registration stateRegistration, BundleProgressHandler progressHandler, BundleCheckpointHandler checkpointHandler, BundleFinalizationHandler finalizationHandler) {
            this.bundleId = bundleId;
            this.response = response;
            this.inputReceivers = inputReceivers;
            this.outputClients = outputClients;
            this.stateRegistration = stateRegistration;
            this.progressHandler = progressHandler;
            this.checkpointHandler = checkpointHandler;
            this.finalizationHandler = finalizationHandler;
        }

        @Override
        public String getId() {
            return this.bundleId;
        }

        @Override
        public Map<String, FnDataReceiver> getInputReceivers() {
            return this.inputReceivers;
        }

        @Override
        public void close() throws Exception {
            Exception exception;
            block21: {
                exception = null;
                for (CloseableFnDataReceiver inputReceiver : this.inputReceivers.values()) {
                    try {
                        inputReceiver.close();
                    }
                    catch (Exception e) {
                        if (exception == null) {
                            exception = e;
                            continue;
                        }
                        exception.addSuppressed(e);
                    }
                }
                try {
                    if (exception == null) {
                        BeamFnApi.ProcessBundleResponse completedResponse = (BeamFnApi.ProcessBundleResponse)MoreFutures.get(this.response);
                        this.progressHandler.onCompleted(completedResponse);
                        if (completedResponse.getResidualRootsCount() > 0) {
                            this.checkpointHandler.onCheckpoint(completedResponse);
                        }
                        if (completedResponse.getRequiresFinalization()) {
                            this.finalizationHandler.requestsFinalization(this.bundleId);
                        }
                        break block21;
                    }
                    throw new IllegalStateException("Processing bundle failed, TODO: [BEAM-3962] abort bundle.");
                }
                catch (Exception e) {
                    if (exception == null) {
                        exception = e;
                    }
                    exception.addSuppressed(e);
                }
            }
            try {
                if (exception == null) {
                    this.stateRegistration.deregister();
                } else {
                    this.stateRegistration.abort();
                }
            }
            catch (Exception e) {
                if (exception == null) {
                    exception = e;
                }
                exception.addSuppressed(e);
            }
            for (InboundDataClient outputClient : this.outputClients.values()) {
                try {
                    if (exception == null) {
                        outputClient.awaitCompletion();
                        continue;
                    }
                    outputClient.cancel();
                }
                catch (Exception e) {
                    if (exception == null) {
                        exception = e;
                        continue;
                    }
                    exception.addSuppressed(e);
                }
            }
            if (exception != null) {
                throw exception;
            }
        }
    }

    public class BundleProcessor {
        private final BeamFnApi.ProcessBundleDescriptor processBundleDescriptor;
        private final CompletionStage<BeamFnApi.RegisterResponse> registrationFuture;
        private final Map<String, RemoteInputDestination> remoteInputs;
        private final StateDelegator stateDelegator;

        private BundleProcessor(BeamFnApi.ProcessBundleDescriptor processBundleDescriptor, CompletionStage<BeamFnApi.RegisterResponse> registrationFuture, Map<String, RemoteInputDestination> remoteInputs, StateDelegator stateDelegator) {
            this.processBundleDescriptor = processBundleDescriptor;
            this.registrationFuture = registrationFuture;
            this.remoteInputs = remoteInputs;
            this.stateDelegator = stateDelegator;
        }

        public CompletionStage<BeamFnApi.RegisterResponse> getRegistrationFuture() {
            return this.registrationFuture;
        }

        public ActiveBundle newBundle(Map<String, RemoteOutputReceiver<?>> outputReceivers, BundleProgressHandler progressHandler) {
            return this.newBundle(outputReceivers, request -> {
                throw new UnsupportedOperationException(String.format("The %s does not have a registered state handler.", ActiveBundle.class.getSimpleName()));
            }, progressHandler);
        }

        public ActiveBundle newBundle(Map<String, RemoteOutputReceiver<?>> outputReceivers, StateRequestHandler stateRequestHandler, BundleProgressHandler progressHandler) {
            return this.newBundle(outputReceivers, stateRequestHandler, progressHandler, request -> {
                throw new UnsupportedOperationException(String.format("The %s does not have a registered bundle checkpoint handler.", ActiveBundle.class.getSimpleName()));
            }, bundleId -> {
                throw new UnsupportedOperationException(String.format("The %s does not have a registered bundle finalization handler.", ActiveBundle.class.getSimpleName()));
            });
        }

        public ActiveBundle newBundle(Map<String, RemoteOutputReceiver<?>> outputReceivers, StateRequestHandler stateRequestHandler, BundleProgressHandler progressHandler, BundleCheckpointHandler checkpointHandler, BundleFinalizationHandler finalizationHandler) {
            String bundleId = SdkHarnessClient.this.idGenerator.getId();
            CompletionStage<BeamFnApi.ProcessBundleResponse> genericResponse = this.registrationFuture.thenCompose(registration -> SdkHarnessClient.this.fnApiControlClient.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(bundleId).setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId(this.processBundleDescriptor.getId()).addAllCacheTokens(stateRequestHandler.getCacheTokens())).build()));
            LOG.debug("Sent {} with ID {} for {} with ID {}", new Object[]{BeamFnApi.ProcessBundleRequest.class.getSimpleName(), bundleId, BeamFnApi.ProcessBundleDescriptor.class.getSimpleName(), this.processBundleDescriptor.getId()});
            CompletionStage<BeamFnApi.ProcessBundleResponse> specificResponse = genericResponse.thenApply(BeamFnApi.InstructionResponse::getProcessBundle);
            HashMap<String, InboundDataClient> outputClients = new HashMap<String, InboundDataClient>();
            for (Map.Entry<String, RemoteOutputReceiver<?>> receiver : outputReceivers.entrySet()) {
                InboundDataClient outputClient = this.attachReceiver(bundleId, receiver.getKey(), receiver.getValue());
                outputClients.put(receiver.getKey(), outputClient);
            }
            ImmutableMap.Builder dataReceiversBuilder = ImmutableMap.builder();
            for (Map.Entry<String, RemoteInputDestination> remoteInput : this.remoteInputs.entrySet()) {
                dataReceiversBuilder.put((Object)remoteInput.getKey(), SdkHarnessClient.this.fnApiDataService.send(LogicalEndpoint.of(bundleId, remoteInput.getValue().getPTransformId()), remoteInput.getValue().getCoder()));
            }
            return new ActiveBundle(bundleId, specificResponse, (Map)dataReceiversBuilder.build(), outputClients, this.stateDelegator.registerForProcessBundleInstructionId(bundleId, stateRequestHandler), progressHandler, checkpointHandler, finalizationHandler);
        }

        private <OutputT> InboundDataClient attachReceiver(String bundleId, String ptransformId, RemoteOutputReceiver<OutputT> receiver) {
            return SdkHarnessClient.this.fnApiDataService.receive(LogicalEndpoint.of(bundleId, ptransformId), receiver.getCoder(), receiver.getReceiver());
        }
    }
}

