/*
 * Decompiled with CFR 0.152.
 */
package org.cojen.dirmi.io;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.cojen.dirmi.ClosedException;
import org.cojen.dirmi.RejectedException;
import org.cojen.dirmi.RemoteTimeoutException;
import org.cojen.dirmi.io.BasicChannelBroker;
import org.cojen.dirmi.io.Channel;
import org.cojen.dirmi.io.ChannelBroker;
import org.cojen.dirmi.io.ChannelBrokerConnector;
import org.cojen.dirmi.io.ChannelConnector;
import org.cojen.dirmi.io.ChannelTimeout;
import org.cojen.dirmi.io.CloseableGroup;
import org.cojen.dirmi.io.IOExecutor;
import org.cojen.dirmi.util.Timer;

public class BasicChannelBrokerConnector
implements ChannelBrokerConnector {
    private final IOExecutor mExecutor;
    private final ChannelConnector mConnector;
    private final CloseableGroup<Broker> mConnectedBrokers;

    public BasicChannelBrokerConnector(IOExecutor executor, ChannelConnector connector) {
        this.mExecutor = executor;
        this.mConnector = connector;
        this.mConnectedBrokers = new CloseableGroup();
    }

    @Override
    public Object getRemoteAddress() {
        return this.mConnector.getRemoteAddress();
    }

    @Override
    public Object getLocalAddress() {
        return this.mConnector.getLocalAddress();
    }

    @Override
    public ChannelBroker connect() throws IOException {
        this.mConnectedBrokers.checkClosed();
        return this.connected(this.mConnector.connect(), null);
    }

    @Override
    public ChannelBroker connect(long timeout, TimeUnit unit) throws IOException {
        return timeout < 0L ? this.connect() : this.connect(new Timer(timeout, unit));
    }

    @Override
    public ChannelBroker connect(Timer timer) throws IOException {
        return this.connected(this.mConnector.connect(timer), timer);
    }

    @Override
    public void connect(final ChannelBrokerConnector.Listener listener) {
        this.mConnector.connect(new ChannelConnector.Listener(){

            @Override
            public void connected(Channel channel) {
                ChannelBroker broker;
                if (BasicChannelBrokerConnector.this.mConnectedBrokers.isClosed()) {
                    listener.closed(new ClosedException());
                }
                try {
                    broker = BasicChannelBrokerConnector.this.connected(channel, null);
                }
                catch (IOException e) {
                    listener.failed(e);
                    return;
                }
                listener.connected(broker);
            }

            @Override
            public void rejected(RejectedException e) {
                listener.rejected(e);
            }

            @Override
            public void failed(IOException e) {
                listener.failed(e);
            }

            @Override
            public void closed(IOException e) {
                listener.closed(e);
            }
        });
    }

    @Override
    public void close() {
        this.mConnectedBrokers.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ChannelBroker connected(Channel channel, Timer timer) throws IOException {
        if (timer == null) {
            timer = new Timer(15L, TimeUnit.SECONDS);
        }
        try {
            long id;
            ChannelTimeout timeout = new ChannelTimeout(this.mExecutor, channel, timer);
            try {
                channel.getOutputStream().write(1);
                channel.flush();
                id = new DataInputStream(channel.getInputStream()).readLong();
            }
            finally {
                timeout.cancel();
            }
            return new Broker(id, channel);
        }
        catch (IOException e) {
            channel.disconnect();
            throw e;
        }
    }

    private class Broker
    extends BasicChannelBroker {
        private volatile int mProtocol;

        Broker(long id, Channel control) throws RejectedException {
            super(BasicChannelBrokerConnector.this.mExecutor, id, control);
            BasicChannelBrokerConnector.this.mConnectedBrokers.add(this);
            this.mControl.inputNotify(new Channel.Listener(){

                @Override
                public void ready() {
                    Channel channel;
                    int op;
                    Channel control = Broker.this.mControl;
                    try {
                        op = control.getInputStream().read();
                        if (op != 7 && op != 5) {
                            if (op < 0) {
                                throw new ClosedException("Control channel is closed");
                            }
                            throw new IOException("Invalid operation from control channel: " + op);
                        }
                        control.inputNotify(this);
                    }
                    catch (IOException e) {
                        this.closed(e);
                        return;
                    }
                    if (op == 7) {
                        try {
                            if (BasicChannelBroker.cPingLogger != null) {
                                Broker.this.logPingMessage("Ping request from " + Broker.this.mControl);
                            }
                            control.getOutputStream().write(8);
                            control.getOutputStream().flush();
                            if (BasicChannelBroker.cPingLogger != null) {
                                Broker.this.logPingMessage("Ping response to " + Broker.this.mControl);
                            }
                            Broker.this.pinged();
                        }
                        catch (IOException e) {
                            if (BasicChannelBroker.cPingLogger != null) {
                                Broker.this.logPingMessage("Ping response failure to " + Broker.this.mControl + ": " + e);
                            }
                            Broker.this.close(new ClosedException("Ping failure", e));
                        }
                        return;
                    }
                    try {
                        channel = BasicChannelBrokerConnector.this.mConnector.connect();
                        DataOutputStream dout = new DataOutputStream(channel.getOutputStream());
                        dout.writeByte(6);
                        dout.writeLong(Broker.this.mId);
                        dout.flush();
                    }
                    catch (IOException e) {
                        this.closed(e);
                        return;
                    }
                    Broker.this.accepted(channel);
                }

                @Override
                public void rejected(RejectedException e) {
                    this.closed(e);
                    if (BasicChannelBroker.cPingLogger != null) {
                        Broker.this.logPingMessage("Ping check stopping for " + Broker.this.mControl + ": " + e);
                    }
                }

                @Override
                public void closed(IOException e) {
                    Broker.this.close(e);
                    if (BasicChannelBroker.cPingLogger != null) {
                        Broker.this.logPingMessage("Ping check stopping for " + Broker.this.mControl + ": " + e);
                    }
                }
            });
        }

        @Override
        public Channel connect() throws IOException {
            return this.connect(new Timer(15L, TimeUnit.SECONDS));
        }

        @Override
        public Channel connect(long timeout, TimeUnit unit) throws IOException {
            if (timeout < 0L) {
                Channel channel = BasicChannelBrokerConnector.this.mConnector.connect();
                if (!this.sendRequest(channel)) {
                    channel = BasicChannelBrokerConnector.this.mConnector.connect();
                    this.sendRequest(channel);
                }
                return channel;
            }
            return this.connect(new Timer(timeout, unit));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Channel connect(Timer timer) throws IOException {
            Channel channel = BasicChannelBrokerConnector.this.mConnector.connect(RemoteTimeoutException.checkRemaining(timer), timer.unit());
            ChannelTimeout timeoutTask = new ChannelTimeout(BasicChannelBrokerConnector.this.mExecutor, channel, RemoteTimeoutException.checkRemaining(timer), timer.unit());
            try {
                if (!this.sendRequest(channel)) {
                    channel = BasicChannelBrokerConnector.this.mConnector.connect(RemoteTimeoutException.checkRemaining(timer), timer.unit());
                    this.sendRequest(channel);
                }
                Channel channel2 = channel;
                return channel2;
            }
            finally {
                timeoutTask.cancel();
            }
        }

        @Override
        public void connect(final ChannelConnector.Listener listener) {
            BasicChannelBrokerConnector.this.mConnector.connect(new ChannelConnector.Listener(){

                @Override
                public void connected(Channel channel) {
                    try {
                        if (!Broker.this.sendRequest(channel)) {
                            channel = Broker.this.connect(15L, TimeUnit.SECONDS);
                        }
                    }
                    catch (IOException e) {
                        listener.failed(e);
                        return;
                    }
                    listener.connected(channel);
                }

                @Override
                public void rejected(RejectedException e) {
                    listener.rejected(e);
                }

                @Override
                public void failed(IOException e) {
                    listener.failed(e);
                }

                @Override
                public void closed(IOException e) {
                    listener.closed(e);
                }
            });
        }

        @Override
        public void close() {
            BasicChannelBrokerConnector.this.mConnectedBrokers.remove(this);
            super.close();
        }

        @Override
        protected boolean requirePingTask() {
            return false;
        }

        @Override
        protected boolean doPing() {
            throw new AssertionError();
        }

        private boolean sendRequest(Channel channel) throws IOException {
            DataOutputStream dout = new DataOutputStream(channel.getOutputStream());
            if (this.mProtocol == 1) {
                dout.writeByte(3);
                dout.writeLong(this.mId);
                dout.flush();
            } else {
                dout.writeByte(9);
                dout.writeLong(this.mId);
                dout.flush();
                int response = channel.getInputStream().read();
                if (response < 0) {
                    if (this.mProtocol == 0) {
                        this.mProtocol = 1;
                        channel.disconnect();
                        return false;
                    }
                    throw new ClosedException("New connection immediately closed");
                }
                this.mProtocol = 2;
                if (response != 10) {
                    channel.disconnect();
                    ClosedException exception = new ClosedException("Stale session");
                    this.close(exception);
                    throw exception;
                }
            }
            channel.register(this.mAllChannels);
            return true;
        }
    }
}

