/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.tserver.log;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.accumulo.core.client.Durability;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.crypto.CryptoServiceFactory;
import org.apache.accumulo.core.crypto.CryptoUtils;
import org.apache.accumulo.core.crypto.streams.NoFlushOutputStream;
import org.apache.accumulo.core.cryptoImpl.CryptoEnvironmentImpl;
import org.apache.accumulo.core.cryptoImpl.NoCryptoService;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
import org.apache.accumulo.core.spi.crypto.CryptoService;
import org.apache.accumulo.core.spi.crypto.FileDecrypter;
import org.apache.accumulo.core.spi.crypto.FileEncrypter;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.fate.util.LoggingRunnable;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeChooserEnvironment;
import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.tserver.TabletMutations;
import org.apache.accumulo.tserver.logger.LogEvents;
import org.apache.accumulo.tserver.logger.LogFileKey;
import org.apache.accumulo.tserver.logger.LogFileValue;
import org.apache.accumulo.tserver.tablet.CommitSession;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DfsLogger
implements Comparable<DfsLogger> {
    public static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---";
    public static final String LOG_FILE_HEADER_V4 = "--- Log File Header (v4) ---";
    private static final Logger log = LoggerFactory.getLogger(DfsLogger.class);
    private static final DatanodeInfo[] EMPTY_PIPELINE = new DatanodeInfo[0];
    private final LinkedBlockingQueue<LogWork> workQueue = new LinkedBlockingQueue();
    private final Object closeLock = new Object();
    private static final LogWork CLOSED_MARKER = new LogWork(null, Durability.FLUSH);
    private static final LogFileValue EMPTY = new LogFileValue();
    private boolean closed = false;
    static final LoggerOperation NO_WAIT_LOGGER_OP = new NoWaitLoggerOperation();
    private final ServerContext context;
    private final ServerResources conf;
    private FSDataOutputStream logFile;
    private DataOutputStream encryptingLogFile = null;
    private Method sync;
    private Method flush;
    private String logPath;
    private Daemon syncThread;
    private String metaReference;
    private AtomicLong syncCounter;
    private AtomicLong flushCounter;
    private final long slowFlushMillis;
    private long writes = 0L;

    public boolean equals(Object obj) {
        if (obj == null) {
            return false;
        }
        if (obj instanceof DfsLogger) {
            return this.getFileName().equals(((DfsLogger)obj).getFileName());
        }
        return false;
    }

    public int hashCode() {
        return this.getFileName().hashCode();
    }

    private DfsLogger(ServerContext context, ServerResources conf) {
        this.context = context;
        this.conf = conf;
        this.slowFlushMillis = conf.getConfiguration().getTimeInMillis(Property.TSERV_SLOW_FLUSH_MILLIS);
    }

    public DfsLogger(ServerContext context, ServerResources conf, AtomicLong syncCounter, AtomicLong flushCounter) {
        this(context, conf);
        this.syncCounter = syncCounter;
        this.flushCounter = flushCounter;
    }

    public DfsLogger(ServerContext context, ServerResources conf, String filename, String meta) {
        this(context, conf);
        this.logPath = filename;
        this.metaReference = meta;
    }

    public static DFSLoggerInputStreams readHeaderAndReturnStream(FSDataInputStream input, AccumuloConfiguration conf) throws IOException {
        FSDataInputStream decryptingInput;
        block6: {
            byte[] magic3;
            byte[] magic4 = LOG_FILE_HEADER_V4.getBytes(StandardCharsets.UTF_8);
            if (magic4.length != (magic3 = LOG_FILE_HEADER_V3.getBytes(StandardCharsets.UTF_8)).length) {
                throw new AssertionError((Object)("Always expect log file headers to be same length : " + magic4.length + " != " + magic3.length));
            }
            byte[] magicBuffer = new byte[magic4.length];
            try {
                input.readFully(magicBuffer);
                if (Arrays.equals(magicBuffer, magic4)) {
                    byte[] params = CryptoUtils.readParams((DataInputStream)input);
                    CryptoService cryptoService = CryptoServiceFactory.newInstance((AccumuloConfiguration)conf, (CryptoServiceFactory.ClassloaderType)CryptoServiceFactory.ClassloaderType.ACCUMULO);
                    CryptoEnvironmentImpl env = new CryptoEnvironmentImpl(CryptoEnvironment.Scope.WAL, params);
                    FileDecrypter decrypter = cryptoService.getFileDecrypter((CryptoEnvironment)env);
                    log.debug("Using {} for decrypting WAL", (Object)cryptoService.getClass().getSimpleName());
                    decryptingInput = cryptoService instanceof NoCryptoService ? input : new DataInputStream(decrypter.decryptStream((InputStream)input));
                    break block6;
                }
                if (Arrays.equals(magicBuffer, magic3)) {
                    String cryptoModuleClassname = input.readUTF();
                    if (!cryptoModuleClassname.equals("NullCryptoModule")) {
                        throw new IllegalArgumentException("Old encryption modules not supported at this time.  Unsupported module : " + cryptoModuleClassname);
                    }
                    decryptingInput = input;
                    break block6;
                }
                throw new IllegalArgumentException("Unsupported write ahead log version " + new String(magicBuffer));
            }
            catch (EOFException e) {
                throw new LogHeaderIncompleteException(e);
            }
        }
        return new DFSLoggerInputStreams(input, (DataInputStream)decryptingInput);
    }

    public synchronized void open(String address) throws IOException {
        String filename = UUID.randomUUID().toString();
        log.debug("Address is {}", (Object)address);
        String logger = Joiner.on((String)"+").join((Object[])address.split(":"));
        log.debug("DfsLogger.open() begin");
        VolumeManager fs = this.conf.getFileSystem();
        VolumeChooserEnvironmentImpl chooserEnv = new VolumeChooserEnvironmentImpl(VolumeChooserEnvironment.ChooserScope.LOGGER, this.context);
        this.logPath = fs.choose((VolumeChooserEnvironment)chooserEnv, ServerConstants.getBaseUris((ServerContext)this.context)) + "/" + "wal" + "/" + logger + "/" + filename;
        this.metaReference = this.toString();
        LoggerOperation op = null;
        try {
            short replication = (short)this.conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION);
            if (replication == 0) {
                replication = fs.getDefaultReplication(new Path(this.logPath));
            }
            long blockSize = DfsLogger.getWalBlockSize(this.conf.getConfiguration());
            this.logFile = this.conf.getConfiguration().getBoolean(Property.TSERV_WAL_SYNC) ? fs.createSyncable(new Path(this.logPath), 0, replication, blockSize) : fs.create(new Path(this.logPath), true, 0, replication, blockSize);
            this.sync = this.logFile.getClass().getMethod("hsync", new Class[0]);
            this.flush = this.logFile.getClass().getMethod("hflush", new Class[0]);
            CryptoService cryptoService = this.context.getCryptoService();
            this.logFile.write(LOG_FILE_HEADER_V4.getBytes(StandardCharsets.UTF_8));
            log.debug("Using {} for encrypting WAL {}", (Object)cryptoService.getClass().getSimpleName(), (Object)filename);
            CryptoEnvironmentImpl env = new CryptoEnvironmentImpl(CryptoEnvironment.Scope.WAL, null);
            FileEncrypter encrypter = cryptoService.getFileEncrypter((CryptoEnvironment)env);
            byte[] cryptoParams = encrypter.getDecryptionParameters();
            CryptoUtils.writeParams((byte[])cryptoParams, (DataOutputStream)this.logFile);
            OutputStream encryptedStream = encrypter.encryptStream((OutputStream)new NoFlushOutputStream((OutputStream)this.logFile));
            this.encryptingLogFile = encryptedStream instanceof NoFlushOutputStream ? (NoFlushOutputStream)encryptedStream : new DataOutputStream(encryptedStream);
            LogFileKey key = new LogFileKey();
            key.event = LogEvents.OPEN;
            key.tserverSession = filename;
            key.filename = filename;
            op = this.logKeyData(key, Durability.SYNC);
        }
        catch (Exception ex) {
            if (this.logFile != null) {
                this.logFile.close();
            }
            this.logFile = null;
            this.encryptingLogFile = null;
            throw new IOException(ex);
        }
        this.syncThread = new Daemon((Runnable)new LoggingRunnable(log, (Runnable)new LogSyncingTask()));
        this.syncThread.setName("Accumulo WALog thread " + this);
        this.syncThread.start();
        op.await();
        log.debug("Got new write-ahead log: {}", (Object)this);
    }

    static long getWalBlockSize(AccumuloConfiguration conf) {
        long blockSize = conf.getAsBytes(Property.TSERV_WAL_BLOCKSIZE);
        if (blockSize == 0L) {
            blockSize = (long)((double)conf.getAsBytes(Property.TSERV_WALOG_MAX_SIZE) * 1.1);
        }
        return blockSize;
    }

    public String toString() {
        String fileName = this.getFileName();
        if (fileName.contains(":")) {
            return this.getLogger() + "/" + this.getFileName();
        }
        return fileName;
    }

    public String getMeta() {
        if (this.metaReference == null) {
            throw new IllegalStateException("logger doesn't have meta reference. " + this);
        }
        return this.metaReference;
    }

    public String getFileName() {
        return this.logPath;
    }

    public Path getPath() {
        return new Path(this.logPath);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws IOException {
        Object object = this.closeLock;
        synchronized (object) {
            if (this.closed) {
                return;
            }
            this.closed = true;
            this.workQueue.add(CLOSED_MARKER);
        }
        if (this.syncThread != null) {
            try {
                this.syncThread.join();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        if (this.workQueue.size() != 0) {
            log.error("WAL work queue not empty after sync thread exited");
            throw new IllegalStateException("WAL work queue not empty after sync thread exited");
        }
        if (this.encryptingLogFile != null) {
            try {
                this.logFile.close();
            }
            catch (IOException ex) {
                log.error("Failed to close log file", (Throwable)ex);
                throw new LogClosedException();
            }
        }
    }

    public synchronized long getWrites() {
        Preconditions.checkState((this.writes >= 0L ? 1 : 0) != 0);
        return this.writes;
    }

    public LoggerOperation defineTablet(CommitSession cs) throws IOException {
        LogFileKey key = new LogFileKey();
        key.event = LogEvents.DEFINE_TABLET;
        key.seq = cs.getWALogSeq();
        key.tabletId = cs.getLogId();
        key.tablet = cs.getExtent();
        return this.logKeyData(key, Durability.LOG);
    }

    private synchronized void write(LogFileKey key, LogFileValue value) throws IOException {
        key.write(this.encryptingLogFile);
        value.write(this.encryptingLogFile);
        this.encryptingLogFile.flush();
        ++this.writes;
    }

    private LoggerOperation logKeyData(LogFileKey key, Durability d) throws IOException {
        return this.logFileData(Collections.singletonList(new Pair((Object)key, (Object)EMPTY)), d);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private LoggerOperation logFileData(List<Pair<LogFileKey, LogFileValue>> keys, Durability durability) throws IOException {
        LogWork work = new LogWork(new CountDownLatch(1), durability);
        try {
            for (Pair<LogFileKey, LogFileValue> pair : keys) {
                this.write((LogFileKey)pair.getFirst(), (LogFileValue)pair.getSecond());
            }
        }
        catch (ClosedChannelException ex) {
            throw new LogClosedException();
        }
        catch (Exception e) {
            log.error("Failed to write log entries", (Throwable)e);
            work.exception = e;
        }
        Object object = this.closeLock;
        synchronized (object) {
            if (this.closed) {
                throw new LogClosedException();
            }
            if (durability == Durability.LOG) {
                return NO_WAIT_LOGGER_OP;
            }
            this.workQueue.add(work);
        }
        return new LoggerOperation(work);
    }

    public LoggerOperation logManyTablets(Collection<TabletMutations> mutations) throws IOException {
        Durability durability = Durability.NONE;
        ArrayList<Pair<LogFileKey, LogFileValue>> data = new ArrayList<Pair<LogFileKey, LogFileValue>>();
        for (TabletMutations tabletMutations : mutations) {
            LogFileKey key = new LogFileKey();
            key.event = LogEvents.MANY_MUTATIONS;
            key.seq = tabletMutations.getSeq();
            key.tabletId = tabletMutations.getTid();
            LogFileValue value = new LogFileValue();
            value.mutations = tabletMutations.getMutations();
            data.add((Pair<LogFileKey, LogFileValue>)new Pair((Object)key, (Object)value));
            durability = DfsLogger.maxDurability(tabletMutations.getDurability(), durability);
        }
        return this.logFileData(data, durability);
    }

    public LoggerOperation log(CommitSession cs, Mutation m, Durability d) throws IOException {
        LogFileKey key = new LogFileKey();
        key.event = LogEvents.MUTATION;
        key.seq = cs.getWALogSeq();
        key.tabletId = cs.getLogId();
        LogFileValue value = new LogFileValue();
        value.mutations = Collections.singletonList(m);
        return this.logFileData(Collections.singletonList(new Pair((Object)key, (Object)value)), d);
    }

    static Durability maxDurability(Durability dur1, Durability dur2) {
        if (dur1.ordinal() > dur2.ordinal()) {
            return dur1;
        }
        return dur2;
    }

    public LoggerOperation minorCompactionFinished(long seq, int tid, Durability durability) throws IOException {
        LogFileKey key = new LogFileKey();
        key.event = LogEvents.COMPACTION_FINISH;
        key.seq = seq;
        key.tabletId = tid;
        return this.logKeyData(key, durability);
    }

    public LoggerOperation minorCompactionStarted(long seq, int tid, String fqfn, Durability durability) throws IOException {
        LogFileKey key = new LogFileKey();
        key.event = LogEvents.COMPACTION_START;
        key.seq = seq;
        key.tabletId = tid;
        key.filename = fqfn;
        return this.logKeyData(key, durability);
    }

    public String getLogger() {
        String[] parts = this.logPath.split("/");
        return Joiner.on((String)":").join((Object[])parts[parts.length - 2].split("[+]"));
    }

    @Override
    public int compareTo(DfsLogger o) {
        return this.getFileName().compareTo(o.getFileName());
    }

    DatanodeInfo[] getPipeLine() {
        OutputStream os;
        if (this.logFile != null && (os = this.logFile.getWrappedStream()) instanceof DFSOutputStream) {
            return ((DFSOutputStream)os).getPipeline();
        }
        return EMPTY_PIPELINE;
    }

    private static class NoWaitLoggerOperation
    extends LoggerOperation {
        public NoWaitLoggerOperation() {
            super(null);
        }

        @Override
        public void await() {
        }
    }

    static class LoggerOperation {
        private final LogWork work;

        public LoggerOperation(LogWork work) {
            this.work = work;
        }

        public void await() throws IOException {
            try {
                this.work.latch.await();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            if (this.work.exception != null) {
                if (this.work.exception instanceof IOException) {
                    throw (IOException)this.work.exception;
                }
                if (this.work.exception instanceof RuntimeException) {
                    throw (RuntimeException)this.work.exception;
                }
                throw new RuntimeException(this.work.exception);
            }
        }
    }

    private static class LogWork {
        final CountDownLatch latch;
        final Durability durability;
        volatile Exception exception;

        public LogWork(CountDownLatch latch, Durability durability) {
            this.latch = latch;
            this.durability = durability;
        }
    }

    private class LogSyncingTask
    implements Runnable {
        private int expectedReplication = 0;

        private LogSyncingTask() {
        }

        @Override
        public void run() {
            ArrayList<LogWork> work = new ArrayList<LogWork>();
            boolean sawClosedMarker = false;
            while (!sawClosedMarker) {
                work.clear();
                try {
                    work.add((LogWork)DfsLogger.this.workQueue.take());
                }
                catch (InterruptedException ex) {
                    continue;
                }
                DfsLogger.this.workQueue.drainTo(work);
                Method durabilityMethod = null;
                block14: for (LogWork logWork : work) {
                    switch (logWork.durability) {
                        case DEFAULT: 
                        case NONE: 
                        case LOG: {
                            throw new IllegalArgumentException("unexpected durability " + logWork.durability);
                        }
                        case SYNC: {
                            durabilityMethod = DfsLogger.this.sync;
                            break block14;
                        }
                        case FLUSH: {
                            if (durabilityMethod == null) {
                                durabilityMethod = DfsLogger.this.flush;
                            }
                        }
                        default: {
                            continue block14;
                        }
                    }
                }
                long start = System.currentTimeMillis();
                try {
                    if (durabilityMethod != null) {
                        durabilityMethod.invoke((Object)DfsLogger.this.logFile, new Object[0]);
                        if (durabilityMethod == DfsLogger.this.sync) {
                            DfsLogger.this.syncCounter.incrementAndGet();
                        } else {
                            DfsLogger.this.flushCounter.incrementAndGet();
                        }
                    }
                }
                catch (Exception ex) {
                    this.fail(work, ex, "synching");
                }
                long duration = System.currentTimeMillis() - start;
                if (duration > DfsLogger.this.slowFlushMillis) {
                    String msg = new StringBuilder(128).append("Slow sync cost: ").append(duration).append(" ms, current pipeline: ").append(Arrays.toString(DfsLogger.this.getPipeLine())).toString();
                    log.info(msg);
                    if (this.expectedReplication > 0) {
                        int current = this.expectedReplication;
                        try {
                            current = ((DFSOutputStream)DfsLogger.this.logFile.getWrappedStream()).getCurrentBlockReplication();
                        }
                        catch (IOException e) {
                            this.fail(work, e, "getting replication level");
                        }
                        if (current < this.expectedReplication) {
                            this.fail(work, new IOException("replication of " + current + " is less than " + this.expectedReplication), "replication check");
                        }
                    }
                }
                if (this.expectedReplication == 0 && DfsLogger.this.logFile.getWrappedStream() instanceof DFSOutputStream) {
                    try {
                        this.expectedReplication = ((DFSOutputStream)DfsLogger.this.logFile.getWrappedStream()).getCurrentBlockReplication();
                    }
                    catch (IOException e) {
                        this.fail(work, e, "getting replication level");
                    }
                }
                for (LogWork logWork : work) {
                    if (logWork == CLOSED_MARKER) {
                        sawClosedMarker = true;
                        continue;
                    }
                    logWork.latch.countDown();
                }
            }
        }

        private void fail(ArrayList<LogWork> work, Exception ex, String why) {
            log.warn("Exception " + why + " " + ex);
            for (LogWork logWork : work) {
                logWork.exception = ex;
            }
        }
    }

    public static interface ServerResources {
        public AccumuloConfiguration getConfiguration();

        public VolumeManager getFileSystem();
    }

    public static class DFSLoggerInputStreams {
        private FSDataInputStream originalInput;
        private DataInputStream decryptingInputStream;

        public DFSLoggerInputStreams(FSDataInputStream originalInput, DataInputStream decryptingInputStream) {
            this.originalInput = originalInput;
            this.decryptingInputStream = decryptingInputStream;
        }

        public FSDataInputStream getOriginalInput() {
            return this.originalInput;
        }

        public DataInputStream getDecryptingInputStream() {
            return this.decryptingInputStream;
        }
    }

    public static class LogHeaderIncompleteException
    extends IOException {
        private static final long serialVersionUID = 1L;

        public LogHeaderIncompleteException(Throwable cause) {
            super(cause);
        }
    }

    public static class LogClosedException
    extends IOException {
        private static final long serialVersionUID = 1L;

        public LogClosedException() {
            super("LogClosed");
        }
    }
}

