/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.direct_java.sdk.fn.data;

import java.io.InputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.repackaged.direct_java.sdk.fn.CancellableQueue;
import org.apache.beam.repackaged.direct_java.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.repackaged.direct_java.sdk.fn.data.DataEndpoint;
import org.apache.beam.repackaged.direct_java.sdk.fn.data.FnDataReceiver;
import org.apache.beam.repackaged.direct_java.sdk.fn.data.TimerEndpoint;
import org.apache.beam.sdk.coders.Coder;

public class BeamFnDataInboundObserver2
implements CloseableFnDataReceiver<BeamFnApi.Elements> {
    private final Map<String, EndpointStatus<DataEndpoint<?>>> transformIdToDataEndpoint = new HashMap();
    private final Map<String, Map<String, EndpointStatus<TimerEndpoint<?>>>> transformIdToTimerFamilyIdToTimerEndpoint;
    private final CancellableQueue<BeamFnApi.Elements> queue;
    private final int totalNumEndpoints;
    private int numEndpointsThatAreIncomplete;

    public static BeamFnDataInboundObserver2 forConsumers(List<DataEndpoint<?>> dataEndpoints, List<TimerEndpoint<?>> timerEndpoints) {
        return new BeamFnDataInboundObserver2(dataEndpoints, timerEndpoints);
    }

    private BeamFnDataInboundObserver2(List<DataEndpoint<?>> dataEndpoints, List<TimerEndpoint<?>> timerEndpoints) {
        for (DataEndpoint<?> dataEndpoint : dataEndpoints) {
            this.transformIdToDataEndpoint.put(dataEndpoint.getTransformId(), new EndpointStatus(dataEndpoint));
        }
        this.transformIdToTimerFamilyIdToTimerEndpoint = new HashMap();
        for (TimerEndpoint timerEndpoint : timerEndpoints) {
            this.transformIdToTimerFamilyIdToTimerEndpoint.computeIfAbsent(timerEndpoint.getTransformId(), unused -> new HashMap()).put(timerEndpoint.getTimerFamilyId(), new EndpointStatus<TimerEndpoint>(timerEndpoint));
        }
        this.queue = new CancellableQueue(100);
        this.numEndpointsThatAreIncomplete = this.totalNumEndpoints = dataEndpoints.size() + timerEndpoints.size();
    }

    @Override
    public void accept(BeamFnApi.Elements elements) throws Exception {
        this.queue.put(elements);
    }

    @Override
    public void flush() throws Exception {
        throw new UnsupportedOperationException();
    }

    @Override
    public void close() throws Exception {
        this.queue.cancel(new IllegalStateException("Inbound observer closed."));
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void awaitCompletion() throws Exception {
        try {
            block7: while (true) {
                BeamFnApi.Elements elements = this.queue.take();
                for (BeamFnApi.Elements.Data data : elements.getDataList()) {
                    EndpointStatus<DataEndpoint<?>> endpoint = this.transformIdToDataEndpoint.get(data.getTransformId());
                    if (endpoint == null) {
                        throw new IllegalStateException(String.format("Unable to find inbound data receiver for instruction %s and transform %s.", data.getInstructionId(), data.getTransformId()));
                    }
                    if (endpoint.isDone) {
                        throw new IllegalStateException(String.format("Received data after inbound data receiver is done for instruction %s and transform %s.", data.getInstructionId(), data.getTransformId()));
                    }
                    InputStream inputStream = data.getData().newInput();
                    Coder coder = ((DataEndpoint)endpoint.endpoint).getCoder();
                    FnDataReceiver<Object> receiver = ((DataEndpoint)endpoint.endpoint).getReceiver();
                    while (inputStream.available() > 0) {
                        receiver.accept(coder.decode(inputStream));
                    }
                    if (!data.getIsLast()) continue;
                    endpoint.isDone = true;
                    --this.numEndpointsThatAreIncomplete;
                    if (this.numEndpointsThatAreIncomplete != 0) continue;
                    return;
                }
                Iterator iterator = elements.getTimersList().iterator();
                while (true) {
                    if (!iterator.hasNext()) continue block7;
                    BeamFnApi.Elements.Timers timers = (BeamFnApi.Elements.Timers)iterator.next();
                    Map<String, EndpointStatus<TimerEndpoint<?>>> timerFamilyIdToEndpoints = this.transformIdToTimerFamilyIdToTimerEndpoint.get(timers.getTransformId());
                    if (timerFamilyIdToEndpoints == null) {
                        throw new IllegalStateException(String.format("Unable to find inbound timer receiver for instruction %s, transform %s, and timer family %s.", timers.getInstructionId(), timers.getTransformId(), timers.getTimerFamilyId()));
                    }
                    EndpointStatus<TimerEndpoint<?>> endpoint = timerFamilyIdToEndpoints.get(timers.getTimerFamilyId());
                    if (endpoint == null) {
                        throw new IllegalStateException(String.format("Unable to find inbound timer receiver for instruction %s, transform %s, and timer family %s.", timers.getInstructionId(), timers.getTransformId(), timers.getTimerFamilyId()));
                    }
                    if (endpoint.isDone) {
                        throw new IllegalStateException(String.format("Received timer after inbound timer receiver is done for instruction %s, transform %s, and timer family %s.", timers.getInstructionId(), timers.getTransformId(), timers.getTimerFamilyId()));
                    }
                    InputStream inputStream = timers.getTimers().newInput();
                    Coder coder = ((TimerEndpoint)endpoint.endpoint).getCoder();
                    FnDataReceiver<Object> receiver = ((TimerEndpoint)endpoint.endpoint).getReceiver();
                    while (inputStream.available() > 0) {
                        receiver.accept(coder.decode(inputStream));
                    }
                    if (timers.getIsLast()) {
                        --this.numEndpointsThatAreIncomplete;
                        if (this.numEndpointsThatAreIncomplete == 0) {
                            return;
                        }
                    }
                    continue;
                    break;
                }
                break;
            }
        }
        catch (Exception e) {
            this.queue.cancel(e);
            throw e;
        }
        finally {
            this.close();
        }
    }

    public void reset() {
        this.numEndpointsThatAreIncomplete = this.totalNumEndpoints;
        for (EndpointStatus<DataEndpoint<?>> endpointStatus : this.transformIdToDataEndpoint.values()) {
            endpointStatus.isDone = false;
        }
        for (Map map : this.transformIdToTimerFamilyIdToTimerEndpoint.values()) {
            for (EndpointStatus status : map.values()) {
                status.isDone = false;
            }
        }
        this.queue.reset();
    }

    private static class EndpointStatus<T> {
        final T endpoint;
        boolean isDone;

        EndpointStatus(T endpoint) {
            this.endpoint = endpoint;
        }
    }
}

