/*
 * Decompiled with CFR 0.152.
 */
package org.cojen.tupl.repl;

import java.io.EOFException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import org.cojen.tupl.io.Utils;
import org.cojen.tupl.repl.Controller;
import org.cojen.tupl.repl.MessageReplicator;
import org.cojen.tupl.repl.Role;
import org.cojen.tupl.repl.SnapshotReceiver;
import org.cojen.tupl.repl.SnapshotSender;
import org.cojen.tupl.repl.StreamReplicator;

final class MessageStreamReplicator
implements MessageReplicator {
    private final StreamReplicator mRepl;
    private boolean mWriterExists;
    private MsgWriter mWriter;

    MessageStreamReplicator(StreamReplicator repl) {
        this.mRepl = repl;
        repl.controlMessageAcceptor(this::writeControl);
    }

    @Override
    public long encoding() {
        return this.mRepl.encoding() ^ 0x6F478E323184A48EL;
    }

    @Override
    public long localMemberId() {
        return this.mRepl.localMemberId();
    }

    @Override
    public SocketAddress localAddress() {
        return this.mRepl.localAddress();
    }

    @Override
    public Role localRole() {
        return this.mRepl.localRole();
    }

    @Override
    public Socket connect(SocketAddress addr) throws IOException {
        return this.mRepl.connect(addr);
    }

    @Override
    public void socketAcceptor(Consumer<Socket> acceptor) {
        this.mRepl.socketAcceptor(acceptor);
    }

    @Override
    public void sync() throws IOException {
        this.mRepl.sync();
    }

    @Override
    public boolean syncCommit(long position, long nanosTimeout) throws IOException {
        return this.mRepl.syncCommit(position, nanosTimeout);
    }

    @Override
    public boolean failover() throws IOException {
        return this.mRepl.failover();
    }

    @Override
    public void compact(long position) throws IOException {
        this.mRepl.compact(position);
    }

    @Override
    public long commitPosition() {
        return this.mRepl.commitPosition();
    }

    @Override
    public void start() throws IOException {
        this.mRepl.start();
    }

    @Override
    public SnapshotReceiver restore(Map<String, String> options) throws IOException {
        return this.mRepl.restore(options);
    }

    @Override
    public SnapshotReceiver requestSnapshot(Map<String, String> options) throws IOException {
        return this.mRepl.requestSnapshot(options);
    }

    @Override
    public void snapshotRequestAcceptor(Consumer<SnapshotSender> acceptor) {
        this.mRepl.snapshotRequestAcceptor(acceptor);
    }

    @Override
    public void close() throws IOException {
        this.mRepl.close();
    }

    void partitioned(boolean enable) {
        ((Controller)this.mRepl).partitioned(enable);
    }

    @Override
    public boolean isReadable(long position) {
        return this.mRepl.isReadable(position);
    }

    @Override
    public MessageReplicator.Reader newReader(long position, boolean follow) {
        StreamReplicator.Reader source = this.mRepl.newReader(position, follow);
        if (source == null) {
            return null;
        }
        try {
            return new MsgReader(source);
        }
        catch (Throwable e) {
            Utils.closeQuietly(source);
            throw e;
        }
    }

    @Override
    public MessageReplicator.Writer newWriter() {
        return this.createWriter(-1L);
    }

    @Override
    public MessageReplicator.Writer newWriter(long position) {
        if (position < 0L) {
            throw new IllegalArgumentException();
        }
        return this.createWriter(position);
    }

    private synchronized MessageReplicator.Writer createWriter(long position) {
        if (this.mWriter != null) {
            if (this.mWriterExists) {
                throw new IllegalStateException("Writer already exists");
            }
            if (position >= 0L && position != this.mWriter.position()) {
                return null;
            }
        } else {
            StreamReplicator.Writer source = position < 0L ? this.mRepl.newWriter() : this.mRepl.newWriter(position);
            if (source == null) {
                return null;
            }
            try {
                this.mWriter = new MsgWriter(source);
            }
            catch (Throwable e) {
                Utils.closeQuietly(source);
                throw e;
            }
        }
        this.mWriterExists = true;
        return this.mWriter;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeControl(byte[] message) {
        long commitPosition;
        long position;
        MsgWriter writer;
        MessageStreamReplicator messageStreamReplicator = this;
        synchronized (messageStreamReplicator) {
            writer = this.mWriter;
            if (writer == null) {
                StreamReplicator.Writer source = this.mRepl.newWriter();
                if (source == null) {
                    return;
                }
                try {
                    this.mWriter = writer = new MsgWriter(source);
                }
                catch (Throwable e) {
                    Utils.closeQuietly(source);
                    throw e;
                }
            }
        }
        try {
            position = writer.writeControl(message);
        }
        catch (IOException e) {
            throw Utils.rethrow(e);
        }
        if (position < 0L) {
            return;
        }
        try {
            commitPosition = writer.waitForCommit(position, 1000000000L);
        }
        catch (InterruptedIOException e) {
            return;
        }
        if (commitPosition >= 0L) {
            try {
                this.mRepl.controlMessageReceived(position, message);
            }
            catch (Throwable e) {
                Utils.closeQuietly(this);
                Utils.uncaught(e);
            }
        }
    }

    synchronized void closed(MsgWriter writer) {
        if (this.mWriter == writer) {
            this.mWriterExists = false;
            this.mWriter = null;
        }
    }

    private final class MsgReader
    implements MessageReplicator.Reader {
        private final StreamReplicator.Reader mSource;
        private final byte[] mBuffer;
        private int mPos;
        private int mEnd;
        private int mRemaining;

        MsgReader(StreamReplicator.Reader source) {
            this.mSource = source;
            this.mBuffer = new byte[8192];
        }

        @Override
        public long term() {
            return this.mSource.term();
        }

        @Override
        public long termStartPosition() {
            return this.mSource.termStartPosition();
        }

        @Override
        public long termEndPosition() {
            return this.mSource.termEndPosition();
        }

        @Override
        public long position() {
            return this.mSource.position();
        }

        @Override
        public long commitPosition() {
            return this.mSource.commitPosition();
        }

        @Override
        public void addCommitListener(LongConsumer listener) {
            this.mSource.addCommitListener(listener);
        }

        @Override
        public void close() {
            this.mSource.close();
        }

        @Override
        public void uponCommit(long position, LongConsumer task) {
            this.mSource.uponCommit(position, task);
        }

        @Override
        public long waitForCommit(long position, long nanosTimeout) throws InterruptedIOException {
            return this.mSource.waitForCommit(position, nanosTimeout);
        }

        @Override
        public long waitForEndCommit(long nanosTimeout) throws InterruptedIOException {
            return this.mSource.waitForEndCommit(nanosTimeout);
        }

        @Override
        public byte[] readMessage() throws IOException {
            if (this.mRemaining != 0) {
                throw new IllegalStateException("Partial message remains: " + this.mRemaining);
            }
            try {
                while (true) {
                    int avail;
                    if ((avail = this.mEnd - this.mPos) <= 0) {
                        avail = this.mSource.read(this.mBuffer);
                        if (avail <= 0) {
                            return null;
                        }
                        this.mPos = 0;
                        this.mEnd = avail;
                    }
                    int length = this.readLength(avail);
                    byte[] message = new byte[length & Integer.MAX_VALUE];
                    this.decodeMessage(message);
                    if (length >= 0) {
                        return message;
                    }
                    MessageStreamReplicator.this.mRepl.controlMessageReceived(this.position() - (long)(this.mEnd - this.mPos), message);
                }
            }
            catch (Throwable e) {
                Utils.closeQuietly(this);
                throw e;
            }
        }

        private void decodeMessage(byte[] message) throws IOException {
            this.decodeMessage(message, 0, message.length);
        }

        private void decodeMessage(byte[] message, int offset, int length) throws IOException {
            int avail = this.mEnd - this.mPos;
            int rem = length - avail;
            if (rem <= 0) {
                System.arraycopy(this.mBuffer, this.mPos, message, offset, length);
                this.mPos += length;
            } else {
                System.arraycopy(this.mBuffer, this.mPos, message, offset, avail);
                offset += avail;
                this.mPos = 0;
                this.mEnd = 0;
                if (rem >= this.mBuffer.length) {
                    this.mSource.readFully(message, offset, rem);
                } else {
                    this.fillBuffer(rem, 0);
                    System.arraycopy(this.mBuffer, this.mPos, message, offset, rem);
                    this.mPos += rem;
                }
            }
        }

        @Override
        public int readMessage(byte[] buf, int offset, int length) throws IOException {
            try {
                int rem = this.mRemaining;
                if (rem == 0) {
                    while (true) {
                        int avail;
                        if ((avail = this.mEnd - this.mPos) <= 0) {
                            avail = this.mSource.read(this.mBuffer);
                            if (avail <= 0) {
                                return -1;
                            }
                            this.mPos = 0;
                            this.mEnd = avail;
                        }
                        if ((rem = this.readLength(avail)) >= 0) break;
                        byte[] message = new byte[rem & Integer.MAX_VALUE];
                        this.decodeMessage(message);
                        MessageStreamReplicator.this.mRepl.controlMessageReceived(this.position() - (long)(this.mEnd - this.mPos), message);
                    }
                }
                if (rem <= length) {
                    this.decodeMessage(buf, offset, rem);
                    this.mRemaining = 0;
                    return rem;
                }
                this.decodeMessage(buf, offset, length);
                this.mRemaining = rem -= length;
                return ~rem;
            }
            catch (Throwable e) {
                Utils.closeQuietly(this);
                throw e;
            }
        }

        private int readLength(int avail) throws IOException {
            byte b;
            if ((b = this.mBuffer[this.mPos++]) >= 0) {
                return b;
            }
            --avail;
            if ((b & 0xC0) == 128) {
                if (avail <= 0) {
                    this.fillBuffer(1, 0);
                }
                return 128 + ((b & 0x3F) << 8 | this.mBuffer[this.mPos++] & 0xFF);
            }
            if (avail < 4) {
                this.fillBuffer(4, avail);
            }
            int length = Utils.decodeIntLE(this.mBuffer, this.mPos);
            this.mPos += 4;
            if (length >= 0) {
                if (b == -32) {
                    return length;
                }
                if (b == -1) {
                    return length | Integer.MIN_VALUE;
                }
            }
            throw new IllegalStateException("Illegal message: " + length + ", " + b);
        }

        private void fillBuffer(int required, int avail) throws IOException {
            byte[] buf = this.mBuffer;
            int end = this.mEnd;
            int tail = buf.length - end;
            if (tail < required) {
                System.arraycopy(buf, this.mPos, buf, 0, avail);
                this.mPos = 0;
                this.mEnd = end = avail;
                tail = buf.length - end;
            }
            while (true) {
                if ((avail = this.mSource.read(buf, end, tail)) <= 0) {
                    throw new EOFException();
                }
                this.mEnd = end += avail;
                if ((required -= avail) <= 0) break;
                tail -= avail;
            }
        }
    }

    private final class MsgWriter
    implements MessageReplicator.Writer {
        private final StreamReplicator.Writer mSource;
        private final byte[] mShortPrefix;
        private final byte[] mMediumPrefix;
        private final byte[] mLongPrefix;
        private boolean mFinished;

        MsgWriter(StreamReplicator.Writer source) {
            this.mSource = source;
            this.mShortPrefix = new byte[1];
            this.mMediumPrefix = new byte[2];
            this.mLongPrefix = new byte[5];
            this.mFinished = true;
        }

        @Override
        public long term() {
            return this.mSource.term();
        }

        @Override
        public long termStartPosition() {
            return this.mSource.termStartPosition();
        }

        @Override
        public long termEndPosition() {
            return this.mSource.termEndPosition();
        }

        @Override
        public long position() {
            return this.mSource.position();
        }

        @Override
        public long commitPosition() {
            return this.mSource.commitPosition();
        }

        @Override
        public void addCommitListener(LongConsumer listener) {
            this.mSource.addCommitListener(listener);
        }

        @Override
        public void close() {
            MessageStreamReplicator.this.closed(this);
            this.mSource.close();
        }

        @Override
        public void uponCommit(long position, LongConsumer task) {
            this.mSource.uponCommit(position, task);
        }

        @Override
        public long waitForCommit(long position, long nanosTimeout) throws InterruptedIOException {
            return this.mSource.waitForCommit(position, nanosTimeout);
        }

        @Override
        public long waitForEndCommit(long nanosTimeout) throws InterruptedIOException {
            return this.mSource.waitForEndCommit(nanosTimeout);
        }

        @Override
        public synchronized int writeMessage(byte[] message, int offset, int length, boolean finished) throws IOException {
            byte[] prefix;
            if (length < 128) {
                prefix = this.mShortPrefix;
                prefix[0] = (byte)length;
            } else if (length < 16512) {
                prefix = this.mMediumPrefix;
                int v = length - 128;
                prefix[0] = (byte)(0x80 | v >> 8);
                prefix[1] = (byte)v;
            } else {
                prefix = this.mLongPrefix;
                prefix[0] = -32;
                Utils.encodeIntLE(prefix, 1, length);
            }
            this.mFinished = finished;
            long highestPosition = 0L;
            if (finished) {
                highestPosition = this.position() + (long)prefix.length + (long)length;
            }
            return this.mSource.write(prefix, message, offset, length, highestPosition);
        }

        synchronized long writeControl(byte[] message) throws IOException {
            byte[] prefix = this.mLongPrefix;
            prefix[0] = -1;
            Utils.encodeIntLE(prefix, 1, message.length);
            long highestPosition = 0L;
            if (this.mFinished) {
                highestPosition = this.position() + (long)prefix.length + (long)message.length;
            }
            if (this.mSource.write(prefix, message, 0, message.length, highestPosition) <= 0) {
                return -1L;
            }
            return this.position();
        }
    }
}

