/*
 * Decompiled with CFR 0.152.
 */
package de.esoco.coroutine;

import de.esoco.coroutine.ChannelClosedException;
import de.esoco.coroutine.ChannelId;
import de.esoco.coroutine.CoroutineException;
import de.esoco.coroutine.Suspension;
import de.esoco.lib.concurrent.RunLock;
import java.util.Deque;
import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Channel<T>
implements AutoCloseable {
    private final ChannelId<T> id;
    private final BlockingQueue<T> channelData;
    private final Deque<Suspension<T>> sendQueue = new LinkedList<Suspension<T>>();
    private final Deque<Suspension<T>> receiveQueue = new LinkedList<Suspension<T>>();
    private final RunLock accessLock = new RunLock();
    private boolean closed = false;

    protected Channel(ChannelId<T> id, int capacity) {
        this.id = id;
        this.channelData = new LinkedBlockingQueue<T>(capacity);
    }

    public final void checkClosed() {
        if (this.isClosed()) {
            throw new ChannelClosedException(this.id);
        }
    }

    @Override
    public void close() {
        this.accessLock.runLocked(() -> {
            this.closed = true;
            ChannelClosedException eClosed = new ChannelClosedException(this.id);
            for (Suspension<T> rSuspension : this.receiveQueue) {
                rSuspension.fail(eClosed);
            }
            for (Suspension<T> rSuspension : this.sendQueue) {
                rSuspension.fail(eClosed);
            }
        });
    }

    public ChannelId<T> getId() {
        return this.id;
    }

    public final boolean isClosed() {
        return this.closed;
    }

    public T receiveBlocking() {
        return (T)this.accessLock.supplyLocked(() -> {
            this.checkClosed();
            try {
                T rValue = this.channelData.take();
                this.resumeSenders();
                return rValue;
            }
            catch (InterruptedException e) {
                throw new CoroutineException(e);
            }
        });
    }

    public void receiveSuspending(Suspension<T> suspension) {
        this.accessLock.runLocked(() -> {
            this.checkClosed();
            Object rValue = this.channelData.poll();
            if (rValue != null) {
                suspension.resume(rValue);
                this.resumeSenders();
            } else {
                this.receiveQueue.add(suspension);
            }
        });
    }

    public int remainingCapacity() {
        return this.channelData.remainingCapacity();
    }

    public void sendBlocking(T value) {
        this.accessLock.runLocked(() -> {
            this.checkClosed();
            try {
                this.channelData.put(value);
                this.resumeReceivers();
            }
            catch (InterruptedException e) {
                throw new CoroutineException(e);
            }
        });
    }

    public void sendSuspending(Suspension<T> suspension) {
        this.accessLock.runLocked(() -> {
            this.checkClosed();
            if (this.channelData.offer(suspension.value())) {
                suspension.resume();
                this.resumeReceivers();
            } else {
                this.sendQueue.add(suspension);
            }
        });
    }

    public int size() {
        return this.channelData.size();
    }

    public String toString() {
        return String.format("%s-%s", this.getClass().getSimpleName(), this.id);
    }

    private void resumeReceivers() {
        while (this.channelData.size() > 0 && !this.receiveQueue.isEmpty()) {
            Suspension<T> rSuspension = this.receiveQueue.remove();
            rSuspension.ifNotCancelled(() -> {
                Object rValue = this.channelData.remove();
                if (rValue != null) {
                    rSuspension.resume(rValue);
                } else {
                    this.receiveQueue.push(rSuspension);
                }
            });
        }
    }

    private void resumeSenders() {
        while (this.channelData.remainingCapacity() > 0 && !this.sendQueue.isEmpty()) {
            Suspension<T> rSuspension = this.sendQueue.remove();
            rSuspension.ifNotCancelled(() -> {
                if (this.channelData.offer(rSuspension.value())) {
                    rSuspension.resume();
                } else {
                    this.sendQueue.push(rSuspension);
                }
            });
        }
    }
}

