/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.data;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.InboundDataClient;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueueingBeamFnDataClient
implements BeamFnDataClient {
    private static final Logger LOG = LoggerFactory.getLogger(QueueingBeamFnDataClient.class);
    private final BeamFnDataClient mainClient;
    @GuardedBy(value="inboundDataClients")
    private final HashSet<InboundDataClient> inboundDataClients;
    @GuardedBy(value="inboundDataClients")
    private final ArrayList<InboundDataClient> finishedClients;
    @GuardedBy(value="inboundDataClients")
    private boolean isDraining = false;
    private final int queueSize;
    private ClosableQueue queue;

    public QueueingBeamFnDataClient(BeamFnDataClient mainClient, int queueSize) {
        this.mainClient = mainClient;
        this.inboundDataClients = new HashSet();
        this.finishedClients = new ArrayList();
        this.queueSize = queueSize;
        this.queue = new ClosableQueue(queueSize);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public InboundDataClient receive(Endpoints.ApiServiceDescriptor apiServiceDescriptor, LogicalEndpoint inputLocation, FnDataReceiver<ByteString> consumer) {
        InboundDataClient inboundDataClient;
        LOG.debug("Registering consumer for instruction {} and transform {}", (Object)inputLocation.getInstructionId(), (Object)inputLocation.getTransformId());
        QueueingFnDataReceiver<ByteString> queueingConsumer = new QueueingFnDataReceiver<ByteString>(consumer, this.queue);
        queueingConsumer.inboundDataClient = inboundDataClient = this.mainClient.receive(apiServiceDescriptor, inputLocation, queueingConsumer);
        HashSet<InboundDataClient> hashSet = this.inboundDataClients;
        synchronized (hashSet) {
            Preconditions.checkState(!this.isDraining);
            if (this.inboundDataClients.add(inboundDataClient)) {
                inboundDataClient.runWhenComplete(() -> this.completeInbound(inboundDataClient));
            }
        }
        return inboundDataClient;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void completeInbound(InboundDataClient client) {
        Preconditions.checkState(client.isDone());
        HashSet<InboundDataClient> hashSet = this.inboundDataClients;
        synchronized (hashSet) {
            if (!this.inboundDataClients.remove(client)) {
                return;
            }
            this.finishedClients.add(client);
            if (this.inboundDataClients.isEmpty() && this.isDraining) {
                this.queue.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void drainAndBlock() throws Exception {
        HashSet<InboundDataClient> hashSet = this.inboundDataClients;
        synchronized (hashSet) {
            Preconditions.checkState(!this.isDraining);
            this.isDraining = true;
            if (this.inboundDataClients.isEmpty()) {
                this.queue.close();
            }
        }
        try {
            ConsumerAndData<?> tuple;
            while ((tuple = this.queue.take()) != null) {
                tuple.accept();
            }
        }
        catch (Exception e) {
            LOG.error("Client failed to deque and process the value", e);
            HashSet<InboundDataClient> clients = new HashSet<InboundDataClient>();
            HashSet<InboundDataClient> hashSet2 = this.inboundDataClients;
            synchronized (hashSet2) {
                clients.addAll(this.inboundDataClients);
                clients.addAll(this.finishedClients);
            }
            for (InboundDataClient inboundDataClient : clients) {
                inboundDataClient.fail(e);
            }
            throw e;
        }
        hashSet = this.inboundDataClients;
        synchronized (hashSet) {
            Preconditions.checkState(this.inboundDataClients.isEmpty());
            Preconditions.checkState(this.isDraining);
        }
        Preconditions.checkState(this.queue.isEmpty());
    }

    @Override
    public <T> CloseableFnDataReceiver<T> send(Endpoints.ApiServiceDescriptor apiServiceDescriptor, LogicalEndpoint outputLocation, Coder<T> coder) {
        LOG.debug("Creating output consumer for instruction {} and transform {}", (Object)outputLocation.getInstructionId(), (Object)outputLocation.getTransformId());
        return this.mainClient.send(apiServiceDescriptor, outputLocation, coder);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void reset() {
        HashSet<InboundDataClient> hashSet = this.inboundDataClients;
        synchronized (hashSet) {
            this.inboundDataClients.clear();
            this.isDraining = false;
            this.finishedClients.clear();
        }
        this.queue = new ClosableQueue(this.queueSize);
    }

    private static class ConsumerAndData<T> {
        private final FnDataReceiver<T> consumer;
        private final T data;

        public ConsumerAndData(FnDataReceiver<T> receiver, T data) {
            this.consumer = receiver;
            this.data = data;
        }

        void accept() throws Exception {
            this.consumer.accept(this.data);
        }
    }

    private static class QueueingFnDataReceiver<T>
    implements FnDataReceiver<T> {
        private final FnDataReceiver<T> consumer;
        private final ClosableQueue queue;
        public @Nullable InboundDataClient inboundDataClient;

        public QueueingFnDataReceiver(FnDataReceiver<T> consumer, ClosableQueue queue) {
            this.queue = queue;
            this.consumer = consumer;
        }

        @Override
        public void accept(T value) throws Exception {
            @NonNull InboundDataClient client = this.inboundDataClient;
            try {
                ConsumerAndData<T> offering = new ConsumerAndData<T>(this.consumer, value);
                while (!this.queue.offer(offering, 200L, TimeUnit.MILLISECONDS) && !client.isDone()) {
                }
            }
            catch (Exception e) {
                LOG.error("Failed to insert the value into the queue", e);
                client.fail(e);
                throw e;
            }
        }
    }

    private static class ClosableQueue {
        private static final ConsumerAndData<Object> POISON = new ConsumerAndData<Object>(input -> {
            throw new RuntimeException("Unable to accept poison.");
        }, new Object());
        private final BlockingQueue<ConsumerAndData<?>> queue;
        private final AtomicBoolean closed = new AtomicBoolean();

        ClosableQueue(int queueSize) {
            this.queue = new ArrayBlockingQueue(queueSize);
        }

        void close() {
            Preconditions.checkArgument(!this.closed.getAndSet(true));
            if (!this.queue.offer(POISON)) {
                LOG.debug("Queue was full, not adding poison");
            }
        }

        boolean offer(ConsumerAndData<?> e, long l, TimeUnit t) throws InterruptedException {
            return this.queue.offer(e, l, t);
        }

        @Nullable ConsumerAndData<?> take() throws InterruptedException {
            @Nullable ConsumerAndData<Object> result = (ConsumerAndData<?>)this.queue.poll();
            if (result == null) {
                result = this.closed.get() ? (ConsumerAndData)this.queue.poll() : this.queue.take();
            }
            if (result == POISON) {
                return null;
            }
            return result;
        }

        boolean isEmpty() {
            return this.queue.isEmpty() || this.queue.peek() == POISON;
        }
    }
}

