/*
 * Decompiled with CFR 0.152.
 */
package org.cojen.tupl.core;

import java.io.IOException;
import org.cojen.tupl.core.DataIn;
import org.cojen.tupl.core.RedoDecoder;
import org.cojen.tupl.repl.StreamReplicator;
import org.cojen.tupl.util.Latch;

final class ReplDecoder
extends RedoDecoder {
    volatile boolean mDeactivated;

    ReplDecoder(StreamReplicator repl, long initialPosition, long initialTxnId, Latch decodeLatch) {
        super(false, initialTxnId, new In(initialPosition, repl), decodeLatch);
    }

    @Override
    boolean verifyTerminator(DataIn in) {
        return true;
    }

    boolean catchup() {
        return ((In)this.mIn).catchup(this.mDecodeLatch);
    }

    StreamReplicator.Writer extractWriter() {
        In in = (In)this.mIn;
        StreamReplicator.Writer writer = in.mWriter;
        in.mWriter = null;
        return writer;
    }

    static final class In
    extends DataIn {
        private final StreamReplicator mRepl;
        private volatile StreamReplicator.Reader mReader;
        private volatile StreamReplicator.Writer mWriter;

        In(long position, StreamReplicator repl) {
            this(position, repl, 65536);
        }

        In(long position, StreamReplicator repl, int bufferSize) {
            super(position, bufferSize);
            this.mRepl = repl;
            this.mReader = this.mRepl.newReader(position, false);
        }

        @Override
        int doRead(byte[] buf, int off, int len) throws IOException {
            StreamReplicator.Reader reader = this.mReader;
            while (true) {
                if (reader != null) {
                    int amt = reader.read(buf, off, len);
                    if (amt >= 0) {
                        return amt;
                    }
                    reader.close();
                }
                while ((reader = this.mRepl.newReader(this.mPos, false)) == null) {
                    this.mWriter = this.mRepl.newWriter();
                    if (this.mWriter != null) {
                        this.mReader = null;
                        return -1;
                    }
                    Thread.yield();
                }
                this.mReader = reader;
            }
        }

        boolean catchup(Latch decodeLatch) {
            StreamReplicator.Reader reader = this.mReader;
            boolean bumped = false;
            block2: while (reader != null) {
                long commitPosition = reader.commitPosition();
                long delayMillis = 1L;
                while (true) {
                    StreamReplicator.Reader currentReader;
                    if ((currentReader = this.mReader) != reader) {
                        reader = currentReader;
                        continue block2;
                    }
                    long readerPos = reader.position();
                    if (readerPos >= commitPosition && readerPos < reader.termEndPosition()) {
                        decodeLatch.acquireShared();
                        long decodePos = this.mPos;
                        decodeLatch.releaseShared();
                        while (decodePos >= commitPosition) {
                            if (!bumped) {
                                commitPosition = reader.commitPosition();
                                bumped = true;
                                continue;
                            }
                            return false;
                        }
                    }
                    try {
                        Thread.sleep(delayMillis);
                    }
                    catch (InterruptedException e) {
                        return false;
                    }
                    delayMillis = Math.min(delayMillis << 1, 100L);
                }
                break;
            }
            return true;
        }

        @Override
        public void close() {
            if (this.mReader != null) {
                this.mReader.close();
                this.mReader = null;
            }
        }
    }
}

