/*
 * Decompiled with CFR 0.152.
 */
package org.cojen.tupl.core;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import org.cojen.tupl.ConfirmationFailureException;
import org.cojen.tupl.ConfirmationInterruptedException;
import org.cojen.tupl.ConfirmationTimeoutException;
import org.cojen.tupl.DatabaseException;
import org.cojen.tupl.DurabilityMode;
import org.cojen.tupl.UnmodifiableReplicaException;
import org.cojen.tupl.WriteFailureException;
import org.cojen.tupl.core.LocalTransaction;
import org.cojen.tupl.core.PendingTxn;
import org.cojen.tupl.core.RedoWriter;
import org.cojen.tupl.core.ReplEngine;
import org.cojen.tupl.core.TransactionContext;
import org.cojen.tupl.core.Utils;
import org.cojen.tupl.repl.StreamReplicator;
import org.cojen.tupl.util.Latch;
import org.cojen.tupl.util.Parker;

class ReplWriter
extends RedoWriter {
    final ReplEngine mEngine;
    final StreamReplicator.Writer mReplWriter;
    long mLastCommitPos;
    long mLastCommitTxnId;
    private final Latch mBufferLatch;
    private Thread mProducer;
    private Thread mConsumer;
    private boolean mConsumerParked;
    private byte[] mBuffer;
    private int mBufferHead;
    private int mBufferTail = -1;
    private long mWritePos;
    private Throwable mConsumerException;
    volatile boolean mUnmodifiable;
    private volatile PendingTxn mFirstPending;
    private volatile PendingTxn mLastPending;
    private static final VarHandle cFirstPendingHandle;
    private static final VarHandle cLastPendingHandle;

    ReplWriter(ReplEngine engine, StreamReplicator.Writer writer) {
        this.mEngine = engine;
        this.mReplWriter = writer;
        if (writer == null) {
            this.mBufferLatch = null;
        } else {
            this.mBufferLatch = new Latch();
            writer.addCommitListener(this::finishPending);
        }
    }

    void start() {
        this.mBufferLatch.acquireExclusive();
        try {
            if (this.mEngine.mDatabase.isClosed()) {
                return;
            }
            this.mWritePos = this.mReplWriter.position();
            this.mBuffer = new byte[65536];
            this.mConsumer = new Thread(this::consume);
            this.mConsumer.setName("WriteConsumer-" + Parker.threadId(this.mConsumer));
            this.mConsumer.setDaemon(true);
            this.mConsumer.start();
        }
        finally {
            this.mBufferLatch.releaseExclusive();
        }
    }

    @Override
    final boolean failover() throws IOException {
        if (!this.mEngine.mRepl.failover()) {
            return false;
        }
        this.drain(true);
        return true;
    }

    @Override
    public final void txnCommitSync(long commitPos) throws IOException {
        StreamReplicator.Writer writer = this.mReplWriter;
        if (writer == null) {
            throw this.unmodifiable();
        }
        if (!ReplWriter.confirm(writer, commitPos, -1L)) {
            throw this.nowUnmodifiable();
        }
    }

    @Override
    public final long encoding() {
        return this.mEngine.mRepl.encoding();
    }

    @Override
    public ReplWriter txnRedoWriter() {
        return this;
    }

    @Override
    boolean shouldCheckpoint(long sizeThreshold) {
        return false;
    }

    @Override
    void checkpointPrepare() throws IOException {
        throw ReplWriter.fail();
    }

    @Override
    void checkpointSwitch(TransactionContext[] contexts) throws IOException {
        throw ReplWriter.fail();
    }

    @Override
    long checkpointNumber() {
        throw ReplWriter.fail();
    }

    @Override
    long checkpointPosition() {
        throw ReplWriter.fail();
    }

    @Override
    long checkpointTransactionId() {
        throw ReplWriter.fail();
    }

    @Override
    void checkpointAborted() {
    }

    @Override
    void checkpointStarted() throws IOException {
        throw ReplWriter.fail();
    }

    @Override
    void checkpointFlushed() throws IOException {
        throw ReplWriter.fail();
    }

    @Override
    void checkpointFinished() throws IOException {
        throw ReplWriter.fail();
    }

    private static UnsupportedOperationException fail() {
        return new UnsupportedOperationException();
    }

    @Override
    DurabilityMode opWriteCheck(DurabilityMode mode) throws IOException {
        return DurabilityMode.SYNC;
    }

    @Override
    boolean shouldWriteTerminators() {
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    final long write(boolean flush, byte[] bytes, int offset, int length, int commitLen, PendingTxn pending) throws IOException {
        if (this.mReplWriter == null) {
            throw this.unmodifiable();
        }
        this.mBufferLatch.acquireExclusive();
        try {
            byte[] buffer = this.mBuffer;
            if (buffer == null) {
                throw this.nowUnmodifiable();
            }
            if (commitLen > 0) {
                this.mLastCommitPos = this.mWritePos + (long)commitLen;
                this.mLastCommitTxnId = this.mLastTxnId;
                if (pending != null) {
                    pending.commitPos(this.mLastCommitPos);
                    PendingTxn last = this.mLastPending;
                    while (true) {
                        if (last == null) {
                            cLastPendingHandle.set(this, pending);
                            while (!cFirstPendingHandle.compareAndSet(this, null, pending)) {
                                Thread.onSpinWait();
                            }
                            break;
                        }
                        PendingTxn newLast = cLastPendingHandle.compareAndExchange(this, last, pending);
                        if (newLast == pending) {
                            last.setNextVolatile(pending);
                            break;
                        }
                        last = newLast;
                    }
                }
            }
            while (true) {
                int amt;
                if (this.mBufferHead == this.mBufferTail) {
                    this.mProducer = Thread.currentThread();
                    try {
                        Thread consumer = this.mConsumer;
                        do {
                            boolean parked;
                            if (parked = this.mConsumerParked) {
                                this.mConsumerParked = false;
                            }
                            this.mBufferLatch.releaseExclusive();
                            if (parked) {
                                Parker.unpark(consumer);
                            }
                            Parker.park(this.mBufferLatch);
                            this.mBufferLatch.acquireExclusive();
                            buffer = this.mBuffer;
                            if (buffer == null) {
                                throw this.nowUnmodifiable();
                            }
                            this.checkConsumerException();
                        } while (this.mBufferHead == this.mBufferTail);
                    }
                    finally {
                        this.mProducer = null;
                    }
                }
                if (this.mBufferHead < this.mBufferTail) {
                    amt = buffer.length - this.mBufferTail;
                } else if (this.mBufferTail >= 0) {
                    amt = this.mBufferHead - this.mBufferTail;
                } else {
                    amt = buffer.length;
                    if (length != 0) {
                        if (length >= amt) {
                            this.mWritePos += (long)length;
                            if (this.replWrite(bytes, offset, length) > 0) {
                                long parked = this.mWritePos;
                                return parked;
                            }
                            this.mReplWriter.close();
                            throw this.nowUnmodifiable();
                        }
                        this.mBufferHead = 0;
                        this.mBufferTail = 0;
                    }
                }
                if (length <= amt) {
                    try {
                        System.arraycopy(bytes, offset, buffer, this.mBufferTail, length);
                    }
                    catch (Throwable e) {
                        if (this.mBufferHead == this.mBufferTail) {
                            this.mBufferTail = -1;
                        }
                        throw e;
                    }
                    this.mWritePos += (long)length;
                    if ((this.mBufferTail += length) >= buffer.length) {
                        this.mBufferTail = 0;
                    }
                    if (this.mConsumerParked) {
                        this.mConsumerParked = false;
                        Parker.unpark(this.mConsumer);
                    }
                    long e = this.mWritePos;
                    return e;
                }
                try {
                    System.arraycopy(bytes, offset, buffer, this.mBufferTail, amt);
                }
                catch (Throwable e) {
                    if (this.mBufferHead == this.mBufferTail) {
                        this.mBufferTail = -1;
                    }
                    throw e;
                }
                this.mWritePos += (long)amt;
                length -= amt;
                offset += amt;
                if ((this.mBufferTail += amt) < buffer.length) continue;
                this.mBufferTail = 0;
            }
        }
        finally {
            this.mBufferLatch.releaseExclusive();
        }
    }

    @Override
    final void alwaysFlush(boolean enable) {
    }

    @Override
    public final void flush() {
    }

    @Override
    void force(boolean metadata, long nanosTimeout) throws IOException {
        long commitPos = this.drain(false);
        if (commitPos > 0L) {
            ReplWriter.confirm(this.mReplWriter, commitPos, nanosTimeout);
        }
        this.mEngine.mRepl.sync();
    }

    private long drain(boolean checkWrite) throws IOException {
        return this.txnRedoWriter().doDrain(checkWrite);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long doDrain(boolean checkWrite) throws IOException {
        if (this.mBufferLatch == null) {
            return 0L;
        }
        long commitPos = 0L;
        this.acquireExclusive();
        this.mBufferLatch.acquireExclusive();
        try {
            PendingTxn pending = this.mLastPending;
            if (pending != null) {
                commitPos = pending.commitPos();
            }
            if (this.mBufferTail >= 0 && this.mBuffer != null) {
                while (true) {
                    this.mProducer = Thread.currentThread();
                    try {
                        this.mBufferLatch.releaseExclusive();
                        Parker.park(this.mBufferLatch);
                        this.mBufferLatch.acquireExclusive();
                    }
                    finally {
                        this.mProducer = null;
                    }
                    if (this.mBufferTail < 0 || this.mBuffer == null) break;
                    this.checkConsumerException();
                }
            }
            if (checkWrite && this.mBuffer != null && this.replWrite(this.mBuffer, 0, 0) <= 0) {
                this.mReplWriter.close();
            }
        }
        finally {
            this.mBufferLatch.releaseExclusive();
            this.releaseExclusive();
        }
        return commitPos;
    }

    @Override
    public void close() throws IOException {
        this.mEngine.mRepl.close();
        this.mEngine.interrupt();
        if (this.mBufferLatch == null) {
            return;
        }
        this.closeConsumerThread();
    }

    @Override
    void stashForRecovery(LocalTransaction txn) {
        this.mEngine.stashForRecovery(txn);
    }

    private void finishPending(long commitPos) {
        PendingTxn pending;
        PendingTxn first;
        block10: {
            PendingTxn prev;
            first = this.mFirstPending;
            if (first == null) {
                return;
            }
            PendingTxn next = first.getNextVolatile();
            if (commitPos < 0L) {
                first.commitPos(-1L);
            } else if (commitPos < first.commitPos()) {
                return;
            }
            pending = first;
            while (true) {
                if (next == null) {
                    PendingTxn last = this.mLastPending;
                    if (last == pending && cLastPendingHandle.compareAndSet(this, last, null)) {
                        cFirstPendingHandle.compareAndSet(this, first, null);
                    } else {
                        while ((next = pending.getNextVolatile()) == null) {
                            Thread.onSpinWait();
                        }
                        cFirstPendingHandle.set(this, next);
                    }
                    break block10;
                }
                prev = pending;
                pending = next;
                next = pending.getNextVolatile();
                if (commitPos < 0L) {
                    pending.commitPos(-1L);
                    continue;
                }
                if (commitPos < pending.commitPos()) break;
            }
            cFirstPendingHandle.set(this, pending);
            pending = prev;
        }
        pending.setNextPlain(null);
        this.mEngine.mFinisher.enqueue(first, pending);
    }

    void closeConsumerThread() {
        this.mBufferLatch.acquireExclusive();
        Thread consumer = this.mConsumer;
        this.mConsumer = null;
        this.mConsumerParked = false;
        this.mBufferLatch.releaseExclusive();
        if (consumer != null) {
            Parker.unpark(consumer);
            try {
                consumer.join();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    private void checkConsumerException() throws WriteFailureException {
        if (this.mConsumerException != null) {
            WriteFailureException e = new WriteFailureException(this.mConsumerException);
            this.mConsumerException = null;
            throw e;
        }
    }

    UnmodifiableReplicaException unmodifiable() throws DatabaseException {
        this.mEngine.mDatabase.checkClosed();
        return new UnmodifiableReplicaException();
    }

    private UnmodifiableReplicaException nowUnmodifiable() throws DatabaseException {
        this.mUnmodifiable = true;
        return this.mEngine.mController.nowUnmodifiable(this.mReplWriter);
    }

    static boolean confirm(StreamReplicator.Writer writer, long commitPos, long nanosTimeout) throws ConfirmationFailureException {
        long pos;
        try {
            pos = writer.waitForCommit(commitPos, nanosTimeout);
        }
        catch (InterruptedIOException e) {
            throw new ConfirmationInterruptedException();
        }
        if (pos >= commitPos) {
            return true;
        }
        if (pos == -1L) {
            return false;
        }
        throw ReplWriter.failedConfirmation(pos, nanosTimeout);
    }

    static long confirmEnd(StreamReplicator.Writer writer) throws ConfirmationFailureException {
        long pos;
        try {
            pos = writer.waitForEndCommit(-1L);
        }
        catch (InterruptedIOException e) {
            throw new ConfirmationInterruptedException();
        }
        if (pos >= 0L) {
            return pos;
        }
        if (pos == -1L) {
            throw new ConfirmationFailureException("Closed");
        }
        throw ReplWriter.failedConfirmation(pos, -1L);
    }

    static ConfirmationFailureException failedConfirmation(long pos, long nanosTimeout) {
        if (pos == -2L) {
            return new ConfirmationTimeoutException(nanosTimeout);
        }
        return new ConfirmationFailureException("Unexpected result: " + pos);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void consume() {
        this.mBufferLatch.acquireExclusive();
        byte[] buffer = this.mBuffer;
        while (this.mConsumer != null) {
            block15: {
                int head = this.mBufferHead;
                int tail = this.mBufferTail;
                try {
                    int result;
                    if (head == tail) {
                        int result2 = this.replWrite(buffer, head, buffer.length - head);
                        if (result2 <= 0) {
                            if (result2 != 0 || head <= 0) break;
                            this.mBufferHead = 0;
                            this.replWrite(buffer, 0, tail);
                            break;
                        }
                        if (head > 0) {
                            this.mBufferHead = 0;
                            if (this.replWrite(buffer, 0, tail) <= 0) break;
                        }
                        this.mBufferTail = -1;
                        break block15;
                    }
                    if (tail < 0) break block15;
                    this.mBufferLatch.releaseExclusive();
                    try {
                        if (head < tail) {
                            result = this.replWrite(buffer, head, tail - head);
                            head = tail;
                        } else {
                            result = this.replWrite(buffer, head, buffer.length - head);
                            head = 0;
                        }
                    }
                    finally {
                        this.mBufferLatch.acquireExclusive();
                    }
                    if (result <= 0 && (result < 0 || head == this.mBufferTail)) break;
                    if (head != this.mBufferTail) {
                        this.mBufferHead = head;
                        continue;
                    }
                    this.mBufferTail = -1;
                }
                catch (Throwable e) {
                    if (this.mConsumerException == null) {
                        this.mConsumerException = e;
                    }
                    if (this.mProducer != null) break block15;
                    this.mBufferLatch.releaseExclusive();
                    Thread.yield();
                    this.mBufferLatch.acquireExclusive();
                    continue;
                }
            }
            this.mConsumerParked = true;
            Thread producer = this.mProducer;
            this.mBufferLatch.releaseExclusive();
            Parker.unpark(producer);
            Parker.park(this.mBufferLatch);
            this.mBufferLatch.acquireExclusive();
        }
        if (this.mConsumerException != null) {
            if (!(this.mConsumerException instanceof IOException)) {
                Utils.uncaught(this.mConsumerException);
            }
            this.mConsumerException = null;
        }
        this.mReplWriter.close();
        this.mConsumer = null;
        this.mBuffer = null;
        Parker.unpark(this.mProducer);
        this.mBufferLatch.releaseExclusive();
        this.mEngine.mController.switchToReplica(this.mReplWriter);
        this.mReplWriter.addCommitListener(this::finishPending);
    }

    private int replWrite(byte[] buf, int off, int len) throws IOException {
        return this.mReplWriter.write(buf, off, len, this.mLastCommitPos);
    }

    static {
        try {
            MethodHandles.Lookup lookup = MethodHandles.lookup();
            cFirstPendingHandle = lookup.findVarHandle(ReplWriter.class, "mFirstPending", PendingTxn.class);
            cLastPendingHandle = lookup.findVarHandle(ReplWriter.class, "mLastPending", PendingTxn.class);
        }
        catch (Throwable e) {
            throw Utils.rethrow(e);
        }
    }
}

