/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.fn.stream;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.sdk.fn.CancellableQueue;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.CallStreamObserver;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.KeyForBottom;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.checkerframework.dataflow.qual.Pure;

@ThreadSafe
public final class BufferingStreamObserver<@UnknownKeyFor T>
implements StreamObserver<T> {
    private static final @UnknownKeyFor @NonNull @Initialized Object POISON_PILL = new Object();
    private final @UnknownKeyFor @NonNull @Initialized CancellableQueue<T> queue;
    private final @UnknownKeyFor @NonNull @Initialized Phaser phaser;
    private final @UnknownKeyFor @NonNull @Initialized CallStreamObserver<T> outboundObserver;
    private final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Future<@UnknownKeyFor @KeyForBottom @Nullable @Initialized @NonNull @Initialized ?> queueDrainer;
    private final @UnknownKeyFor @NonNull @Initialized int bufferSize;

    public BufferingStreamObserver(@UnknownKeyFor @NonNull @Initialized Phaser phaser, @UnknownKeyFor @NonNull @Initialized CallStreamObserver<T> outboundObserver, @UnknownKeyFor @NonNull @Initialized ExecutorService executor, @UnknownKeyFor @NonNull @Initialized int bufferSize) {
        this.phaser = phaser;
        this.bufferSize = bufferSize;
        this.queue = new CancellableQueue(bufferSize);
        this.outboundObserver = outboundObserver;
        this.queueDrainer = executor.submit(this::drainQueue);
    }

    private void drainQueue() {
        try {
            while (true) {
                int currentPhase = this.phaser.getPhase();
                while (this.outboundObserver.isReady()) {
                    T value = this.queue.take();
                    if (value != POISON_PILL) {
                        this.outboundObserver.onNext(value);
                        continue;
                    }
                    this.outboundObserver.onCompleted();
                    return;
                }
                this.phaser.awaitAdvance(currentPhase);
            }
        }
        catch (OnErrorException e) {
            this.outboundObserver.onError(e.getCause());
        }
        catch (Exception e) {
            this.queue.cancel(e);
            this.outboundObserver.onError((Throwable)e);
        }
    }

    public void onNext(T value) {
        try {
            this.queue.put(value);
        }
        catch (InterruptedException e) {
            this.queue.cancel(e);
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void onError(@UnknownKeyFor @NonNull @Initialized Throwable t2) {
        this.queue.cancel(new OnErrorException(t2));
        try {
            this.queueDrainer.get();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void onCompleted() {
        try {
            this.queue.put(POISON_PILL);
            this.queueDrainer.get();
        }
        catch (Exception e) {
            this.queue.cancel(e);
            throw new RuntimeException(e);
        }
    }

    @VisibleForTesting
    public @UnknownKeyFor @NonNull @Initialized int getBufferSize() {
        return this.bufferSize;
    }

    private static class OnErrorException
    extends Exception {
        public OnErrorException(@NonNull @UnknownKeyFor @Initialized Throwable throwable) {
            super(throwable);
        }

        @Override
        @Pure
        public synchronized @NonNull @UnknownKeyFor @Initialized Throwable getCause() {
            return super.getCause();
        }
    }
}

