/*
 * Decompiled with CFR 0.152.
 */
package org.cojen.tupl.core;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.Objects;
import org.cojen.tupl.ConfirmationFailureException;
import org.cojen.tupl.ConfirmationTimeoutException;
import org.cojen.tupl.DatabaseException;
import org.cojen.tupl.DurabilityMode;
import org.cojen.tupl.UnmodifiableReplicaException;
import org.cojen.tupl.core.ReplDecoder;
import org.cojen.tupl.core.ReplUtils;
import org.cojen.tupl.core.Utils;
import org.cojen.tupl.core._LocalDatabase;
import org.cojen.tupl.core._ReplEngine;
import org.cojen.tupl.core._ReplWriter;
import org.cojen.tupl.core._TransactionContext;
import org.cojen.tupl.diag.DatabaseStats;
import org.cojen.tupl.repl.StreamReplicator;
import org.cojen.tupl.util.LatchCondition;
import org.cojen.tupl.util.Runner;

final class _ReplController
extends _ReplWriter {
    private static final VarHandle cCheckpointPosHandle;
    private static final VarHandle cSwitchingHandle;
    final StreamReplicator mRepl;
    private LatchCondition mLeaderNotifyCondition;
    private volatile _ReplWriter mTxnRedoWriter;
    private volatile boolean mSwitchingToReplica;
    private _ReplWriter mCheckpointRedoWriter;
    private long mCheckpointPos;
    private long mCheckpointTxnId;
    private long mCheckpointNum;

    _ReplController(_ReplEngine engine) {
        super(engine, null);
        this.mRepl = engine.mRepl;
        this.mTxnRedoWriter = this;
        this.mUnmodifiable = true;
    }

    void initCheckpointNumber(long num) {
        this.acquireExclusive();
        this.mCheckpointNum = num;
        this.releaseExclusive();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ReplDecoder ready(long initialPosition, long initialTxnId) throws IOException {
        this.acquireExclusive();
        try {
            this.mLeaderNotifyCondition = new LatchCondition();
            cCheckpointPosHandle.setOpaque(this, initialPosition | Long.MIN_VALUE);
        }
        finally {
            this.releaseExclusive();
        }
        ReplDecoder decoder = this.mEngine.startReceiving(initialPosition, initialTxnId);
        if (decoder == null) {
            return null;
        }
        _LocalDatabase db = this.mEngine.mDatabase;
        this.mRepl.controlMessageAcceptor(message -> {
            try {
                db.writeControlMessage((byte[])message);
            }
            catch (UnmodifiableReplicaException unmodifiableReplicaException) {
            }
            catch (Throwable e) {
                Utils.uncaught(e);
            }
        });
        this.mRepl.snapshotRequestAcceptor(sender -> {
            block2: {
                try {
                    ReplUtils.sendSnapshot(db, sender);
                }
                catch (Throwable e) {
                    Utils.closeQuietly(sender);
                    if (!(e instanceof DatabaseException) && e instanceof IOException) break block2;
                    Utils.uncaught(e);
                }
            }
        });
        this.mRepl.start();
        return decoder;
    }

    public void catchup(ReplDecoder decoder) throws IOException {
        if (decoder == null) {
            return;
        }
        boolean isLeader = decoder.catchup();
        this.mEngine.suspend();
        this.mEngine.resume();
        this.acquireExclusive();
        try {
            if (isLeader && this.mLeaderNotifyCondition != null) {
                this.mLeaderNotifyCondition.await(this);
            }
        }
        finally {
            this.mLeaderNotifyCondition = null;
            this.releaseExclusive();
        }
    }

    @Override
    public _ReplWriter txnRedoWriter() {
        return this.mTxnRedoWriter;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    boolean shouldCheckpoint(long sizeThreshold) {
        this.acquireShared();
        try {
            StreamReplicator.Writer writer = this.mTxnRedoWriter.mReplWriter;
            long pos = writer == null ? this.mEngine.decodePosition() : writer.position();
            boolean bl = pos - this.checkpointPosition() >= sizeThreshold;
            return bl;
        }
        catch (IllegalStateException e) {
            boolean bl = false;
            return bl;
        }
        finally {
            this.releaseShared();
        }
    }

    @Override
    void checkpointPrepare() throws IOException {
        this.mEngine.suspend();
    }

    @Override
    void checkpointSwitch(_TransactionContext[] contexts) throws IOException {
        _ReplWriter redo;
        ++this.mCheckpointNum;
        this.mCheckpointRedoWriter = redo = this.mTxnRedoWriter;
        if (this.mCheckpointPos <= 0L && this.mCheckpointTxnId == 0L) {
            if (redo.mReplWriter == null) {
                cCheckpointPosHandle.setOpaque(this, this.mEngine.suspendedDecodePosition());
                this.mCheckpointTxnId = this.mEngine.suspendedDecodeTransactionId();
            } else {
                redo.acquireShared();
                cCheckpointPosHandle.set(this, redo.mLastCommitPos);
                this.mCheckpointTxnId = redo.mLastCommitTxnId;
                redo.releaseShared();
            }
        }
    }

    @Override
    long checkpointNumber() {
        return this.mCheckpointNum;
    }

    @Override
    long checkpointPosition() {
        return this.mCheckpointPos & Long.MAX_VALUE;
    }

    @Override
    long checkpointTransactionId() {
        return this.mCheckpointTxnId;
    }

    @Override
    void checkpointAborted() {
        this.mEngine.resume();
        this.mCheckpointRedoWriter = null;
    }

    @Override
    void checkpointStarted() throws IOException {
        this.mEngine.resume();
    }

    @Override
    void checkpointFlushed() throws IOException {
        _ReplWriter redo = this.mCheckpointRedoWriter;
        StreamReplicator.Writer writer = redo.mReplWriter;
        if (writer != null && !_ReplController.confirm(writer, this.mCheckpointPos, -1L)) {
            long endPos = _ReplController.confirmEnd(writer);
            if (endPos < this.mCheckpointPos) {
                cCheckpointPosHandle.setOpaque(this, endPos);
                this.mCheckpointTxnId = 0L;
            }
            this.mCheckpointRedoWriter = this;
            throw this.nowUnmodifiable(writer);
        }
        this.syncConfirm(this.mCheckpointPos, -1L);
    }

    private void syncConfirm(long position, long nanosTimeout) throws IOException {
        if (nanosTimeout < 0L) {
            while (!this.mRepl.syncCommit(position, 1000000000L)) {
                if (!this.mEngine.mDatabase.isClosed()) continue;
                throw new ConfirmationFailureException("Database is closed");
            }
        } else if (!this.mRepl.syncCommit(position, nanosTimeout)) {
            throw new ConfirmationTimeoutException(nanosTimeout);
        }
    }

    @Override
    void checkpointFinished() throws IOException {
        long pos = this.mCheckpointPos;
        this.mRepl.compact(pos);
        this.mCheckpointRedoWriter = null;
        cCheckpointPosHandle.setOpaque(this, pos | Long.MIN_VALUE);
        this.mCheckpointTxnId = 0L;
    }

    @Override
    DurabilityMode opWriteCheck(DurabilityMode mode) throws IOException {
        throw this.unmodifiable();
    }

    @Override
    long adjustTransactionId(long txnId) {
        return -txnId;
    }

    @Override
    void force(boolean metadata, long nanosTimeout) throws IOException {
        if (metadata) {
            try {
                StreamReplicator.Writer writer = this.mTxnRedoWriter.mReplWriter;
                long pos = writer == null ? this.mEngine.decodePosition() : writer.commitPosition();
                this.syncConfirm(pos, nanosTimeout);
                pos = cCheckpointPosHandle.getOpaque(this);
                if (pos < 0L) {
                    try {
                        this.mRepl.compact(pos & Long.MAX_VALUE);
                    }
                    catch (Throwable throwable) {
                        // empty catch block
                    }
                }
                return;
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
        super.force(metadata, nanosTimeout);
    }

    @Override
    void addStats(DatabaseStats stats) {
        if (!this.isLeader()) {
            try {
                long decodePosition = this.mEngine.decodePosition();
                long commitPosition = this.mEngine.mRepl.commitPosition();
                stats.replicationBacklog = commitPosition - decodePosition;
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
        }
    }

    @Override
    boolean isLeader() {
        return !this.mTxnRedoWriter.mUnmodifiable;
    }

    @Override
    void uponLeader(Runnable acquired, Runnable lost) {
        this.acquireExclusive();
        try {
            this.doUponLeader(acquired, lost);
        }
        finally {
            this.releaseExclusive();
        }
    }

    private void doUponLeader(Runnable acquired, Runnable lost) {
        if (this.isLeader()) {
            if (acquired != null) {
                Runner.start(acquired);
            }
            if (lost != null) {
                this.mTxnRedoWriter.mReplWriter.uponCommit(Long.MAX_VALUE, pos -> Runner.start(lost));
            }
        } else {
            if (this.mLeaderNotifyCondition == null) {
                this.mLeaderNotifyCondition = new LatchCondition();
            }
            this.mLeaderNotifyCondition.uponSignal(() -> this.doUponLeader(acquired, lost));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    _ReplWriter leaderNotify(StreamReplicator.Writer writer) throws IOException {
        Objects.requireNonNull(writer);
        this.acquireExclusive();
        try {
            _ReplWriter _ReplWriter2;
            if (this.mLeaderNotifyCondition != null) {
                this.mLeaderNotifyCondition.signalAll(this);
                this.mLeaderNotifyCondition = null;
            }
            if (this.mTxnRedoWriter.mReplWriter != null) {
                _ReplWriter _ReplWriter3 = null;
                return _ReplWriter3;
            }
            _ReplWriter redo = new _ReplWriter(this.mEngine, writer);
            redo.start();
            _TransactionContext context = this.mEngine.mDatabase.anyTransactionContext();
            context.fullAcquireRedoLatch(redo);
            try {
                this.mTxnRedoWriter = redo;
                writer.uponCommit(Long.MAX_VALUE, pos -> this.switchToReplica(writer));
                context.doRedoReset(redo);
                context.doRedoTimestamp(redo, (byte)2, DurabilityMode.NO_FLUSH);
                context.doRedoNopRandom(redo, DurabilityMode.NO_SYNC);
                _ReplWriter2 = redo;
            }
            catch (Throwable throwable) {
                context.releaseRedoLatch();
                throw throwable;
            }
            context.releaseRedoLatch();
            return _ReplWriter2;
        }
        finally {
            this.releaseExclusive();
        }
    }

    UnmodifiableReplicaException nowUnmodifiable(StreamReplicator.Writer expect) throws DatabaseException {
        this.switchToReplica(expect);
        return this.unmodifiable();
    }

    void switchToReplica(StreamReplicator.Writer expect) {
        if (this.mEngine.mDatabase.isClosed()) {
            return;
        }
        _ReplWriter redo = this.mTxnRedoWriter;
        StreamReplicator.Writer writer = redo.mReplWriter;
        if (writer == null || writer != expect) {
            return;
        }
        if (!cSwitchingHandle.compareAndSet(this, false, true)) {
            return;
        }
        try {
            Runner.start(() -> this.doSwitchToReplica(redo));
        }
        catch (Throwable e) {
            this.mEngine.fail(e);
        }
    }

    private void doSwitchToReplica(_ReplWriter redo) {
        long pos;
        try {
            StreamReplicator.Writer writer = redo.mReplWriter;
            pos = _ReplController.confirmEnd(writer);
            writer.close();
            redo.closeConsumerThread();
            this.mEngine.awaitPreparedTransactions();
            this.mEngine.mDatabase.waitForCommitted();
        }
        catch (Throwable e) {
            this.mEngine.fail(e);
            return;
        }
        this.mEngine.startReceiving(pos, 0L);
        this.acquireExclusive();
        this.mTxnRedoWriter = this;
        this.mSwitchingToReplica = false;
        this.releaseExclusive();
        this.mEngine.mDatabase.discardRedoWriter(redo);
    }

    static {
        try {
            MethodHandles.Lookup lookup = MethodHandles.lookup();
            cCheckpointPosHandle = lookup.findVarHandle(_ReplController.class, "mCheckpointPos", Long.TYPE);
            cSwitchingHandle = lookup.findVarHandle(_ReplController.class, "mSwitchingToReplica", Boolean.TYPE);
        }
        catch (Throwable e) {
            throw Utils.rethrow(e);
        }
    }
}

