/*
 * Decompiled with CFR 0.152.
 */
package org.cojen.dirmi.io;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.cojen.dirmi.ClosedException;
import org.cojen.dirmi.RejectedException;
import org.cojen.dirmi.RemoteTimeoutException;
import org.cojen.dirmi.io.Channel;
import org.cojen.dirmi.io.ChannelAcceptWaiter;
import org.cojen.dirmi.io.ChannelAcceptor;
import org.cojen.dirmi.io.ChannelBroker;
import org.cojen.dirmi.io.ChannelConnector;
import org.cojen.dirmi.io.CloseableGroup;
import org.cojen.dirmi.io.IOExecutor;
import org.cojen.dirmi.io.ListenerQueue;
import org.cojen.dirmi.io.PipedChannel;
import org.cojen.dirmi.io.PipedInputStream;
import org.cojen.dirmi.io.PipedOutputStream;
import org.cojen.dirmi.util.Timer;

public class PipedChannelBroker
implements ChannelBroker {
    private static final int DEFAULT_BUFFER_SIZE = 100;
    private final IOExecutor mExecutor;
    private final int mBufferSize;
    private final CloseableGroup<Channel> mAllChannels;
    volatile PipedChannelBroker mEndpoint;
    private final ListenerQueue<ChannelAcceptor.Listener> mAcceptListenerQueue;

    public static ChannelBroker[] newPair(IOExecutor executor) {
        return PipedChannelBroker.newPair(executor, 100);
    }

    public static ChannelBroker[] newPair(IOExecutor executor, int bufferSize) {
        if (bufferSize < 2) {
            bufferSize = 2;
        }
        PipedChannelBroker broker_0 = new PipedChannelBroker(executor, bufferSize);
        PipedChannelBroker broker_1 = new PipedChannelBroker(executor, bufferSize, broker_0);
        return new ChannelBroker[]{broker_0, broker_1};
    }

    private PipedChannelBroker(IOExecutor executor, int bufferSize) {
        this.mExecutor = executor;
        this.mBufferSize = bufferSize;
        this.mAllChannels = new CloseableGroup();
        this.mAcceptListenerQueue = new ListenerQueue<ChannelAcceptor.Listener>(this.mExecutor, ChannelAcceptor.Listener.class);
    }

    private PipedChannelBroker(IOExecutor executor, int bufferSize, PipedChannelBroker endpoint) {
        this(executor, bufferSize);
        this.mEndpoint = endpoint;
        endpoint.mEndpoint = this;
    }

    @Override
    public Object getLocalAddress() {
        return null;
    }

    @Override
    public Object getRemoteAddress() {
        return null;
    }

    @Override
    public Channel connect() throws IOException {
        PipedChannelBroker endpoint = this.endpoint();
        PipedInputStream cin = new PipedInputStream();
        PipedInputStream ain = new PipedInputStream();
        PipedOutputStream aout = new PipedOutputStream(cin);
        PipedOutputStream cout = new PipedOutputStream(ain);
        PipedChannel channel = new PipedChannel(this.mExecutor, cin, cout, this.mBufferSize);
        channel.register(this.mAllChannels);
        endpoint.accepted(ain, aout);
        return channel;
    }

    private void accepted(PipedInputStream ain, PipedOutputStream aout) throws IOException {
        final PipedChannel channel = new PipedChannel(this.mExecutor, ain, aout, this.mBufferSize);
        channel.register(this.mAllChannels);
        this.mExecutor.execute(new Runnable(){

            @Override
            public void run() {
                ((ChannelAcceptor.Listener)PipedChannelBroker.this.mAcceptListenerQueue.dequeue()).accepted(channel);
            }
        });
    }

    @Override
    public Channel connect(long timeout, TimeUnit unit) throws IOException {
        return this.connect();
    }

    @Override
    public Channel connect(Timer timer) throws IOException {
        return this.connect(RemoteTimeoutException.checkRemaining(timer), timer.unit());
    }

    @Override
    public void connect(final ChannelConnector.Listener listener) {
        try {
            this.mExecutor.execute(new Runnable(){

                @Override
                public void run() {
                    Channel channel;
                    try {
                        channel = PipedChannelBroker.this.connect();
                    }
                    catch (IOException e) {
                        listener.failed(e);
                        return;
                    }
                    listener.connected(channel);
                }
            });
        }
        catch (RejectedException e) {
            listener.rejected(e);
        }
    }

    @Override
    public Channel accept() throws IOException {
        ChannelAcceptWaiter listener = new ChannelAcceptWaiter();
        this.accept(listener);
        return listener.waitForChannel();
    }

    @Override
    public Channel accept(long timeout, TimeUnit unit) throws IOException {
        ChannelAcceptWaiter listener = new ChannelAcceptWaiter();
        this.accept(listener);
        return listener.waitForChannel(timeout, unit);
    }

    @Override
    public Channel accept(Timer timer) throws IOException {
        return this.accept(RemoteTimeoutException.checkRemaining(timer), timer.unit());
    }

    @Override
    public void accept(ChannelAcceptor.Listener listener) {
        try {
            this.mAcceptListenerQueue.enqueue(listener);
        }
        catch (RejectedException e) {
            this.mAcceptListenerQueue.dequeue().rejected(e);
        }
    }

    @Override
    public void close() {
        PipedChannelBroker endpoint = this.mEndpoint;
        if (endpoint != null) {
            this.mEndpoint = null;
            this.mAllChannels.close();
            endpoint.close();
            this.mAcceptListenerQueue.dequeueForClose().closed(new ClosedException());
        }
    }

    private PipedChannelBroker endpoint() throws IOException {
        PipedChannelBroker endpoint = this.mEndpoint;
        if (endpoint == null) {
            throw new ClosedException();
        }
        return endpoint;
    }
}

