/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver.wal;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter;
import org.apache.hadoop.hbase.regionserver.wal.CombinedAsyncWriter;
import org.apache.hadoop.hbase.regionserver.wal.FSWALEntry;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.LimitedPrivate(value={"Configuration"})
public class AsyncFSWAL
extends AbstractFSWAL<WALProvider.AsyncWriter> {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class);
    public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
    public static final long DEFAULT_WAL_BATCH_SIZE = 65536L;
    public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP = "hbase.wal.async.use-shared-event-loop";
    public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = false;
    public static final String ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = "hbase.wal.async.wait.on.shutdown.seconds";
    public static final int DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 5;
    private final EventLoopGroup eventLoopGroup;
    private final Class<? extends Channel> channelClass;
    private volatile AsyncFSOutput fsOut;
    private final StreamSlowMonitor streamSlowMonitor;

    public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, String suffix, FileSystem remoteFs, Path remoteWALDir, EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, StreamSlowMonitor monitor) throws FailedLogCloseException, IOException {
        super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix, remoteFs, remoteWALDir);
        this.eventLoopGroup = eventLoopGroup;
        this.channelClass = channelClass;
        this.streamSlowMonitor = monitor;
        if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, false)) {
            this.consumeExecutor = eventLoopGroup.next();
            this.shouldShutDownConsumeExecutorWhenClose = false;
            if (this.consumeExecutor instanceof SingleThreadEventExecutor) {
                try {
                    Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue");
                    field.setAccessible(true);
                    Queue queue = (Queue)field.get(this.consumeExecutor);
                    this.hasConsumerTask = () -> queue.peek() == this.consumer;
                }
                catch (Exception e) {
                    LOG.warn("Can not get task queue of " + this.consumeExecutor + ", this is not necessary, just give up", (Throwable)e);
                    this.hasConsumerTask = () -> false;
                }
            } else {
                this.hasConsumerTask = () -> false;
            }
        } else {
            this.createSingleThreadPoolConsumeExecutor("AsyncFSWAL", rootDir, prefix);
        }
        this.setWaitOnShutdownInSeconds(conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS, 5), ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS);
    }

    @Override
    protected CompletableFuture<Long> doWriterSync(WALProvider.AsyncWriter writer, boolean shouldUseHsync, long txidWhenSyn) {
        return writer.sync(shouldUseHsync);
    }

    protected final WALProvider.AsyncWriter createAsyncWriter(FileSystem fs, Path path) throws IOException {
        return AsyncFSWALProvider.createAsyncWriter(this.conf, fs, path, false, this.blocksize, this.eventLoopGroup, this.channelClass, this.streamSlowMonitor);
    }

    @Override
    protected WALProvider.AsyncWriter createWriterInstance(FileSystem fs, Path path) throws IOException {
        return this.createAsyncWriter(fs, path);
    }

    @Override
    protected void onWriterReplaced(WALProvider.AsyncWriter nextWriter) {
        if (nextWriter instanceof AsyncProtobufLogWriter) {
            this.fsOut = ((AsyncProtobufLogWriter)nextWriter).getOutput();
        }
    }

    @Override
    protected void doAppend(WALProvider.AsyncWriter writer, FSWALEntry entry) {
        writer.append(entry);
    }

    @Override
    DatanodeInfo[] getPipeline() {
        AsyncFSOutput output = this.fsOut;
        return output != null ? output.getPipeline() : new DatanodeInfo[]{};
    }

    @Override
    int getLogReplication() {
        return this.getPipeline().length;
    }

    @Override
    protected boolean doCheckLogLowReplication() {
        AsyncFSOutput output = this.fsOut;
        return output != null && output.isBroken();
    }

    @Override
    protected WALProvider.AsyncWriter createCombinedWriter(WALProvider.AsyncWriter localWriter, WALProvider.AsyncWriter remoteWriter) {
        return CombinedAsyncWriter.create(remoteWriter, localWriter);
    }
}

