/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver.wal;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.Field;
import java.util.ArrayDeque;
import java.util.Comparator;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter;
import org.apache.hadoop.hbase.regionserver.wal.FSWALEntry;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.RingBufferTruck;
import org.apache.hadoop.hbase.regionserver.wal.SyncFuture;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.com.lmax.disruptor.RingBuffer;
import org.apache.hadoop.hbase.shaded.com.lmax.disruptor.Sequence;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.SingleThreadEventExecutor;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.htrace.NullScope;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.LimitedPrivate(value={"Configuration"})
public class AsyncFSWAL
extends AbstractFSWAL<WALProvider.AsyncWriter> {
    private static final Log LOG = LogFactory.getLog(AsyncFSWAL.class);
    private static final Comparator<SyncFuture> SEQ_COMPARATOR = (o1, o2) -> {
        int c = Long.compare(o1.getTxid(), o2.getTxid());
        return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2));
    };
    public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
    public static final long DEFAULT_WAL_BATCH_SIZE = 65536L;
    public static final String ASYNC_WAL_CREATE_MAX_RETRIES = "hbase.wal.async.create.retries";
    public static final int DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES = 10;
    private final EventLoop eventLoop;
    private final Class<? extends Channel> channelClass;
    private final Lock consumeLock = new ReentrantLock();
    private final Runnable consumer = this::consume;
    private final Supplier<Boolean> hasConsumerTask;
    private volatile boolean waitingRoll;
    private boolean readyForRolling;
    private final Condition readyForRollingCond = this.consumeLock.newCondition();
    private final RingBuffer<RingBufferTruck> waitingConsumePayloads;
    private final Sequence waitingConsumePayloadsGatingSequence;
    private final AtomicBoolean consumerScheduled = new AtomicBoolean(false);
    private volatile boolean writerBroken;
    private final long batchSize;
    private final int createMaxRetries;
    private final ExecutorService closeExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
    private volatile AsyncFSOutput fsOut;
    private final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<FSWALEntry>();
    private final Deque<FSWALEntry> unackedAppends = new ArrayDeque<FSWALEntry>();
    private final SortedSet<SyncFuture> syncFutures = new TreeSet<SyncFuture>(SEQ_COMPARATOR);
    private long highestProcessedAppendTxid;
    private long fileLengthAtLastSync;
    private long highestProcessedAppendTxidAtLastSync;

    public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, String suffix, EventLoop eventLoop, Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
        super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
        Supplier<Boolean> hasConsumerTask;
        this.eventLoop = eventLoop;
        this.channelClass = channelClass;
        if (eventLoop instanceof SingleThreadEventExecutor) {
            try {
                Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue");
                field.setAccessible(true);
                Queue queue = (Queue)field.get(eventLoop);
                hasConsumerTask = () -> queue.peek() == this.consumer;
            }
            catch (Exception e) {
                LOG.warn((Object)("Can not get task queue of " + eventLoop + ", this is not necessary, just give up"), (Throwable)e);
                hasConsumerTask = () -> false;
            }
        } else {
            hasConsumerTask = () -> false;
        }
        this.hasConsumerTask = hasConsumerTask;
        int preallocatedEventCount = this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 16384);
        this.waitingConsumePayloads = RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount);
        this.waitingConsumePayloadsGatingSequence = new Sequence(-1L);
        this.waitingConsumePayloads.addGatingSequences(this.waitingConsumePayloadsGatingSequence);
        this.waitingConsumePayloads.publish(this.waitingConsumePayloads.next());
        this.waitingConsumePayloadsGatingSequence.set(this.waitingConsumePayloads.getCursor());
        this.batchSize = conf.getLong(WAL_BATCH_SIZE, 65536L);
        this.createMaxRetries = conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, 10);
        this.rollWriter();
    }

    private boolean trySetReadyForRolling() {
        if (!this.waitingRoll || !this.unackedAppends.isEmpty()) {
            return false;
        }
        this.consumeLock.lock();
        try {
            if (this.waitingRoll) {
                this.readyForRolling = true;
                this.readyForRollingCond.signalAll();
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            this.consumeLock.unlock();
        }
    }

    private void syncFailed(Throwable error) {
        LOG.warn((Object)"sync failed", error);
        this.consumeLock.lock();
        try {
            if (this.writerBroken) {
                return;
            }
            this.writerBroken = true;
            if (this.waitingRoll) {
                this.readyForRolling = true;
                this.readyForRollingCond.signalAll();
            }
        }
        finally {
            this.consumeLock.unlock();
        }
        Iterator<FSWALEntry> iter = this.unackedAppends.descendingIterator();
        while (iter.hasNext()) {
            this.toWriteAppends.addFirst(iter.next());
        }
        this.highestUnsyncedTxid = this.highestSyncedTxid.get();
        this.requestLogRoll();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void syncCompleted(WALProvider.AsyncWriter writer, long processedTxid, long startTimeNs) {
        this.highestSyncedTxid.set(processedTxid);
        Iterator<FSWALEntry> iter = this.unackedAppends.iterator();
        while (iter.hasNext() && iter.next().getTxid() <= processedTxid) {
            iter.remove();
        }
        this.postSync(System.nanoTime() - startTimeNs, this.finishSync(true));
        if (this.trySetReadyForRolling()) {
            return;
        }
        if (writer.getLength() < this.logrollsize) {
            return;
        }
        if (!this.rollWriterLock.tryLock()) {
            return;
        }
        try {
            this.requestLogRoll();
        }
        finally {
            this.rollWriterLock.unlock();
        }
    }

    private void sync(WALProvider.AsyncWriter writer) {
        long currentHighestProcessedAppendTxid;
        this.fileLengthAtLastSync = writer.getLength();
        this.highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid = this.highestProcessedAppendTxid;
        long startTimeNs = System.nanoTime();
        writer.sync().whenComplete((result, error) -> {
            if (error != null) {
                this.syncFailed((Throwable)error);
            } else {
                this.syncCompleted(writer, currentHighestProcessedAppendTxid, startTimeNs);
            }
        });
    }

    private void addTimeAnnotation(SyncFuture future, String annotation) {
        TraceScope scope = Trace.continueSpan((Span)future.getSpan());
        Trace.addTimelineAnnotation((String)annotation);
        future.setSpan(scope.detach());
    }

    private int finishSyncLowerThanTxid(long txid, boolean addSyncTrace) {
        SyncFuture sync;
        int finished = 0;
        Iterator iter = this.syncFutures.iterator();
        while (iter.hasNext() && (sync = (SyncFuture)iter.next()).getTxid() <= txid) {
            sync.done(txid, null);
            iter.remove();
            ++finished;
            if (!addSyncTrace) continue;
            this.addTimeAnnotation(sync, "writer synced");
        }
        return finished;
    }

    private int finishSync(boolean addSyncTrace) {
        if (this.unackedAppends.isEmpty()) {
            if (this.toWriteAppends.isEmpty()) {
                long maxSyncTxid = this.highestSyncedTxid.get();
                for (SyncFuture sync : this.syncFutures) {
                    maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid());
                    sync.done(maxSyncTxid, null);
                    if (!addSyncTrace) continue;
                    this.addTimeAnnotation(sync, "writer synced");
                }
                this.highestSyncedTxid.set(maxSyncTxid);
                int finished = this.syncFutures.size();
                this.syncFutures.clear();
                return finished;
            }
            long lowestUnprocessedAppendTxid = this.toWriteAppends.peek().getTxid();
            assert (lowestUnprocessedAppendTxid > this.highestProcessedAppendTxid);
            long doneTxid = lowestUnprocessedAppendTxid - 1L;
            this.highestSyncedTxid.set(doneTxid);
            return this.finishSyncLowerThanTxid(doneTxid, addSyncTrace);
        }
        long lowestUnackedAppendTxid = this.unackedAppends.peek().getTxid();
        long doneTxid = Math.max(lowestUnackedAppendTxid - 1L, this.highestSyncedTxid.get());
        this.highestSyncedTxid.set(doneTxid);
        return this.finishSyncLowerThanTxid(doneTxid, addSyncTrace);
    }

    private void appendAndSync() {
        WALProvider.AsyncWriter writer = (WALProvider.AsyncWriter)this.writer;
        this.finishSync(false);
        long newHighestProcessedAppendTxid = -1L;
        Iterator<FSWALEntry> iter = this.toWriteAppends.iterator();
        while (iter.hasNext()) {
            boolean appended;
            FSWALEntry entry = iter.next();
            Span span = entry.detachSpan();
            if (span != null) {
                TraceScope scope = Trace.continueSpan((Span)span);
                try {
                    appended = this.append(writer, entry);
                }
                catch (IOException e) {
                    throw new AssertionError("should not happen", e);
                }
                finally {
                    assert (scope == NullScope.INSTANCE || !scope.isDetached());
                    scope.close();
                }
            }
            try {
                appended = this.append(writer, entry);
            }
            catch (IOException e) {
                throw new AssertionError("should not happen", e);
            }
            newHighestProcessedAppendTxid = entry.getTxid();
            iter.remove();
            if (!appended) continue;
            this.unackedAppends.addLast(entry);
            if (writer.getLength() - this.fileLengthAtLastSync < this.batchSize) continue;
            break;
        }
        if (newHighestProcessedAppendTxid > 0L) {
            this.highestProcessedAppendTxid = newHighestProcessedAppendTxid;
        } else {
            newHighestProcessedAppendTxid = this.highestProcessedAppendTxid;
        }
        if (writer.getLength() - this.fileLengthAtLastSync >= this.batchSize) {
            this.sync(writer);
            return;
        }
        if (writer.getLength() == this.fileLengthAtLastSync) {
            if (this.unackedAppends.isEmpty()) {
                this.highestSyncedTxid.set(this.highestProcessedAppendTxid);
                this.finishSync(false);
                this.trySetReadyForRolling();
            }
            return;
        }
        if (!this.syncFutures.isEmpty() && this.syncFutures.last().getTxid() > this.highestProcessedAppendTxidAtLastSync) {
            this.sync(writer);
        }
    }

    private void consume() {
        this.consumeLock.lock();
        try {
            if (this.writerBroken) {
                return;
            }
            if (this.waitingRoll) {
                if (((WALProvider.AsyncWriter)this.writer).getLength() > this.fileLengthAtLastSync) {
                    this.sync((WALProvider.AsyncWriter)this.writer);
                } else if (this.unackedAppends.isEmpty()) {
                    this.readyForRolling = true;
                    this.readyForRollingCond.signalAll();
                }
                return;
            }
        }
        finally {
            this.consumeLock.unlock();
        }
        long cursorBound = this.waitingConsumePayloads.getCursor();
        for (long nextCursor = this.waitingConsumePayloadsGatingSequence.get() + 1L; nextCursor <= cursorBound && this.waitingConsumePayloads.isPublished(nextCursor); ++nextCursor) {
            RingBufferTruck truck = this.waitingConsumePayloads.get(nextCursor);
            switch (truck.type()) {
                case APPEND: {
                    this.toWriteAppends.addLast(truck.unloadAppend());
                    break;
                }
                case SYNC: {
                    this.syncFutures.add(truck.unloadSync());
                    break;
                }
                default: {
                    LOG.warn((Object)("RingBufferTruck with unexpected type: " + (Object)((Object)truck.type())));
                }
            }
            this.waitingConsumePayloadsGatingSequence.set(nextCursor);
        }
        this.appendAndSync();
        if (this.hasConsumerTask.get().booleanValue()) {
            return;
        }
        if (this.toWriteAppends.isEmpty() && this.waitingConsumePayloadsGatingSequence.get() == this.waitingConsumePayloads.getCursor()) {
            this.consumerScheduled.set(false);
            if (this.waitingConsumePayloadsGatingSequence.get() == this.waitingConsumePayloads.getCursor()) {
                return;
            }
            if (!this.consumerScheduled.compareAndSet(false, true)) {
                return;
            }
        }
        this.eventLoop.execute(this.consumer);
    }

    private boolean shouldScheduleConsumer() {
        if (this.writerBroken || this.waitingRoll) {
            return false;
        }
        return this.consumerScheduled.compareAndSet(false, true);
    }

    @Override
    public long append(HRegionInfo hri, WALKey key, WALEdit edits, boolean inMemstore) throws IOException {
        long txid = this.stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, this.waitingConsumePayloads);
        if (this.shouldScheduleConsumer()) {
            this.eventLoop.execute(this.consumer);
        }
        return txid;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void sync() throws IOException {
        TraceScope scope = Trace.startSpan((String)"AsyncFSWAL.sync");
        try {
            SyncFuture future;
            long txid = this.waitingConsumePayloads.next();
            try {
                future = this.getSyncFuture(txid, scope.detach());
                RingBufferTruck truck = this.waitingConsumePayloads.get(txid);
                truck.load(future);
            }
            finally {
                this.waitingConsumePayloads.publish(txid);
            }
            if (this.shouldScheduleConsumer()) {
                this.eventLoop.execute(this.consumer);
            }
            scope = Trace.continueSpan((Span)this.blockOnSync(future));
        }
        finally {
            assert (scope == NullScope.INSTANCE || !scope.isDetached());
            scope.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void sync(long txid) throws IOException {
        if (this.highestSyncedTxid.get() >= txid) {
            return;
        }
        TraceScope scope = Trace.startSpan((String)"AsyncFSWAL.sync");
        try {
            SyncFuture future;
            long sequence = this.waitingConsumePayloads.next();
            try {
                future = this.getSyncFuture(txid, scope.detach());
                RingBufferTruck truck = this.waitingConsumePayloads.get(sequence);
                truck.load(future);
            }
            finally {
                this.waitingConsumePayloads.publish(sequence);
            }
            if (this.shouldScheduleConsumer()) {
                this.eventLoop.execute(this.consumer);
            }
            scope = Trace.continueSpan((Span)this.blockOnSync(future));
        }
        finally {
            assert (scope == NullScope.INSTANCE || !scope.isDetached());
            scope.close();
        }
    }

    @Override
    protected WALProvider.AsyncWriter createWriterInstance(Path path) throws IOException {
        boolean overwrite = false;
        int retry = 0;
        while (true) {
            try {
                return AsyncFSWALProvider.createAsyncWriter(this.conf, this.fs, path, overwrite, this.eventLoop, this.channelClass);
            }
            catch (RemoteException e) {
                LOG.warn((Object)("create wal log writer " + path + " failed, retry = " + retry), (Throwable)e);
                if (FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate(e)) {
                    if (retry >= this.createMaxRetries) {
                        break;
                    }
                }
                IOException ioe = e.unwrapRemoteException();
                if (e.getMessage().contains("Parent directory doesn't exist:")) {
                    this.syncFutures.forEach(f -> f.done(f.getTxid(), ioe));
                }
                throw ioe;
            }
            catch (FanOutOneBlockAsyncDFSOutputHelper.NameNodeException e) {
                throw e;
            }
            catch (IOException e) {
                LOG.warn((Object)("create wal log writer " + path + " failed, retry = " + retry), (Throwable)e);
                if (retry >= this.createMaxRetries) break;
                overwrite = true;
                try {
                    Thread.sleep(ConnectionUtils.getPauseTime(100L, retry));
                }
                catch (InterruptedException ie) {
                    throw new InterruptedIOException();
                }
            }
            ++retry;
        }
        throw new IOException("Failed to create wal log writer " + path + " after retrying " + this.createMaxRetries + " time(s)");
    }

    private void waitForSafePoint() {
        this.consumeLock.lock();
        try {
            if (this.writerBroken || this.writer == null) {
                return;
            }
            this.consumerScheduled.set(true);
            this.waitingRoll = true;
            this.readyForRolling = false;
            this.eventLoop.execute(this.consumer);
            while (!this.readyForRolling) {
                this.readyForRollingCond.awaitUninterruptibly();
            }
        }
        finally {
            this.consumeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected long doReplaceWriter(Path oldPath, Path newPath, WALProvider.AsyncWriter nextWriter) throws IOException {
        long oldFileLen;
        this.waitForSafePoint();
        WALProvider.AsyncWriter oldWriter = (WALProvider.AsyncWriter)this.writer;
        this.writer = nextWriter;
        if (nextWriter != null && nextWriter instanceof AsyncProtobufLogWriter) {
            this.fsOut = ((AsyncProtobufLogWriter)nextWriter).getOutput();
        }
        this.fileLengthAtLastSync = 0L;
        this.highestProcessedAppendTxidAtLastSync = 0L;
        this.consumeLock.lock();
        try {
            this.consumerScheduled.set(true);
            this.waitingRoll = false;
            this.writerBroken = false;
            this.eventLoop.execute(this.consumer);
        }
        finally {
            this.consumeLock.unlock();
        }
        if (oldWriter != null) {
            oldFileLen = oldWriter.getLength();
            this.closeExecutor.execute(() -> {
                try {
                    oldWriter.close();
                }
                catch (IOException e) {
                    LOG.warn((Object)"close old writer failed", (Throwable)e);
                }
            });
        } else {
            oldFileLen = 0L;
        }
        return oldFileLen;
    }

    @Override
    protected void doShutdown() throws IOException {
        this.waitForSafePoint();
        if (this.writer != null) {
            ((WALProvider.AsyncWriter)this.writer).close();
            this.writer = null;
        }
        this.closeExecutor.shutdown();
        IOException error = new IOException("WAL has been closed");
        this.syncFutures.forEach(f -> f.done(f.getTxid(), error));
    }

    @Override
    protected void doAppend(WALProvider.AsyncWriter writer, FSWALEntry entry) {
        writer.append(entry);
    }

    @Override
    DatanodeInfo[] getPipeline() {
        AsyncFSOutput output = this.fsOut;
        return output != null ? output.getPipeline() : new DatanodeInfo[]{};
    }

    @Override
    int getLogReplication() {
        return this.getPipeline().length;
    }
}

