/*
 * Decompiled with CFR 0.152.
 */
package org.apache.apex.malhar.lib.wal;

import com.datatorrent.netlet.util.Slice;
import com.google.common.base.Preconditions;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.utils.FileContextUtils;
import org.apache.apex.malhar.lib.utils.IOUtils;
import org.apache.apex.malhar.lib.wal.WAL;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.local.LocalFs;
import org.apache.hadoop.fs.local.RawLocalFs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileSystemWAL
implements WAL<FileSystemWALReader, FileSystemWALWriter> {
    @NotNull
    private String filePath;
    @Min(value=0L)
    private long maxLength;
    private FileSystemWALPointer walStartPointer = new FileSystemWALPointer(0, 0L);
    @NotNull
    private FileSystemWALReader fileSystemWALReader = new FileSystemWALReader(this);
    @NotNull
    private FileSystemWALWriter fileSystemWALWriter = new FileSystemWALWriter(this);
    private final Map<Integer, String> tempPartFiles = new TreeMap<Integer, String>();
    private long lastCheckpointedWindow = -1L;
    private static final String TMP_EXTENSION = ".tmp";
    private static final Logger LOG = LoggerFactory.getLogger(FileSystemWAL.class);

    @Override
    public void setup() {
        try {
            FileContext fileContext = FileContextUtils.getFileContext(this.filePath);
            if (this.maxLength == 0L) {
                this.maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
            }
            this.fileSystemWALWriter.open(fileContext);
            this.fileSystemWALReader.open(fileContext);
        }
        catch (IOException e) {
            throw new RuntimeException("during setup", e);
        }
    }

    @Override
    public void beforeCheckpoint(long window) {
        try {
            this.lastCheckpointedWindow = window;
            this.fileSystemWALWriter.flush();
        }
        catch (IOException e) {
            throw new RuntimeException("during before cp", e);
        }
    }

    @Override
    public void committed(long window) {
        try {
            this.fileSystemWALWriter.finalizeFiles(window);
        }
        catch (IOException e) {
            throw new RuntimeException("during committed", e);
        }
    }

    @Override
    public void teardown() {
        try {
            this.fileSystemWALReader.close();
            this.fileSystemWALWriter.close();
        }
        catch (IOException e) {
            throw new RuntimeException("during teardown", e);
        }
    }

    protected long getLastCheckpointedWindow() {
        return this.lastCheckpointedWindow;
    }

    protected String getPartFilePath(int partNumber) {
        return this.filePath + "_" + partNumber;
    }

    @Override
    public FileSystemWALReader getReader() {
        return this.fileSystemWALReader;
    }

    public void setFileSystemWALReader(@NotNull FileSystemWALReader fileSystemWALReader) {
        this.fileSystemWALReader = (FileSystemWALReader)Preconditions.checkNotNull((Object)fileSystemWALReader, (Object)"filesystem wal reader");
    }

    @Override
    public FileSystemWALWriter getWriter() {
        return this.fileSystemWALWriter;
    }

    public void setFileSystemWALWriter(@NotNull FileSystemWALWriter fileSystemWALWriter) {
        this.fileSystemWALWriter = (FileSystemWALWriter)Preconditions.checkNotNull((Object)fileSystemWALWriter, (Object)"filesystem wal writer");
    }

    public String getFilePath() {
        return this.filePath;
    }

    public void setFilePath(@NotNull String filePath) {
        this.filePath = (String)Preconditions.checkNotNull((Object)filePath, (Object)"filePath");
    }

    public long getMaxLength() {
        return this.maxLength;
    }

    public void setMaxLength(long maxLength) {
        this.maxLength = maxLength;
    }

    private static String getTmpFilePath(String filePath) {
        return filePath + '.' + System.currentTimeMillis() + TMP_EXTENSION;
    }

    public static class FileSystemWALWriter
    implements WAL.WALWriter<FileSystemWALPointer> {
        private FileSystemWALPointer currentPointer = new FileSystemWALPointer(0, 0L);
        private transient DataOutputStream outputStream;
        private final Map<Long, Integer> pendingFinalization = new TreeMap<Long, Integer>();
        private final FileSystemWAL fileSystemWAL;
        private transient FileContext fileContext;
        private int latestFinalizedPart = -1;
        private int lowestDeletedPart = -1;

        private FileSystemWALWriter() {
            this.fileSystemWAL = null;
        }

        public FileSystemWALWriter(@NotNull FileSystemWAL fileSystemWal) {
            this.fileSystemWAL = (FileSystemWAL)Preconditions.checkNotNull((Object)fileSystemWal, (Object)"wal");
        }

        protected void open(@NotNull FileContext fileContext) throws IOException {
            this.fileContext = (FileContext)Preconditions.checkNotNull((Object)fileContext, (Object)"file context");
            this.recover();
        }

        private void recover() throws IOException {
            LOG.debug("current point", (Object)this.currentPointer);
            String tmpFilePath = (String)this.fileSystemWAL.tempPartFiles.get(this.currentPointer.getPartNum());
            if (tmpFilePath != null) {
                Path tmpPath = new Path(tmpFilePath);
                if (this.fileContext.util().exists(tmpPath)) {
                    LOG.debug("tmp path exists {}", (Object)tmpPath);
                    this.outputStream = this.getOutputStream(new FileSystemWALPointer(this.currentPointer.partNum, 0L));
                    FSDataInputStream inputStreamOldTmp = this.fileContext.open(tmpPath);
                    IOUtils.copyPartial((InputStream)inputStreamOldTmp, this.currentPointer.offset, this.outputStream);
                    this.outputStream.flush();
                    inputStreamOldTmp.close();
                    LOG.debug("delete tmp {}", (Object)tmpPath);
                    this.fileContext.delete(tmpPath, true);
                }
            }
            HashSet<String> validPathNames = new HashSet<String>();
            for (Map.Entry entry : this.fileSystemWAL.tempPartFiles.entrySet()) {
                if ((Integer)entry.getKey() > this.currentPointer.partNum) continue;
                validPathNames.add(new Path((String)entry.getValue()).getName());
            }
            LOG.debug("valid names {}", validPathNames);
            Path walPath = new Path(this.fileSystemWAL.filePath);
            Path parentWAL = walPath.getParent();
            if (parentWAL != null && this.fileContext.util().exists(parentWAL)) {
                RemoteIterator remoteIterator = this.fileContext.listStatus(parentWAL);
                while (remoteIterator.hasNext()) {
                    FileStatus status = (FileStatus)remoteIterator.next();
                    String fileName = status.getPath().getName();
                    if (!fileName.startsWith(walPath.getName()) || !fileName.endsWith(FileSystemWAL.TMP_EXTENSION) || validPathNames.contains(fileName)) continue;
                    LOG.debug("delete stray tmp {}", (Object)status.getPath());
                    this.fileContext.delete(status.getPath(), true);
                }
            }
        }

        protected void close() throws IOException {
            if (this.outputStream != null) {
                this.outputStream.close();
                this.outputStream = null;
                LOG.debug("closed {}", (Object)this.currentPointer.partNum);
            }
        }

        @Override
        public int append(Slice entry) throws IOException {
            int entryLength;
            if (this.outputStream == null) {
                this.outputStream = this.getOutputStream(this.currentPointer);
            }
            if (this.shouldRotate(entryLength = entry.length + 4)) {
                this.rotate(true);
            }
            this.outputStream.writeInt(entry.length);
            this.outputStream.write(entry.buffer, entry.offset, entry.length);
            this.currentPointer.offset += entryLength;
            if (this.currentPointer.offset == this.fileSystemWAL.maxLength) {
                this.rotate(false);
            }
            return entryLength;
        }

        @Override
        public FileSystemWALPointer getPointer() {
            return this.currentPointer;
        }

        @Override
        public void delete(FileSystemWALPointer pointer) throws IOException {
            if (pointer.compareTo(this.currentPointer) <= 0) {
                this.fileSystemWAL.walStartPointer = pointer;
                this.deleteFinalizedParts(pointer);
            }
        }

        private void deleteFinalizedParts(FileSystemWALPointer pointer) throws IOException {
            int lastPartDeleted = -1;
            for (int i = this.lowestDeletedPart + 1; i < pointer.partNum; ++i) {
                if (i > this.latestFinalizedPart) continue;
                Path partPath = new Path(this.fileSystemWAL.getPartFilePath(i));
                if (!this.fileContext.util().exists(partPath)) break;
                LOG.debug("delete {}", (Object)partPath);
                this.fileContext.delete(partPath, true);
                lastPartDeleted = i;
            }
            if (lastPartDeleted != -1) {
                this.lowestDeletedPart = lastPartDeleted;
            }
            if (pointer.partNum <= this.latestFinalizedPart && pointer.offset > 0L) {
                String part = this.fileSystemWAL.getPartFilePath(pointer.partNum);
                Path inputPartPath = new Path(part);
                long length = this.fileContext.getFileStatus(inputPartPath).getLen();
                LOG.debug("truncate {} from {} length {}", new Object[]{part, pointer.offset, length});
                if (length > pointer.offset) {
                    String temp = FileSystemWAL.getTmpFilePath(part);
                    Path tmpPart = new Path(temp);
                    FSDataInputStream inputStream = this.fileContext.open(inputPartPath);
                    FSDataOutputStream outputStream = this.fileContext.create(tmpPart, EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND), new Options.CreateOpts[]{Options.CreateOpts.CreateParent.createParent()});
                    IOUtils.copyPartial((InputStream)inputStream, pointer.offset, length - pointer.offset, (OutputStream)outputStream);
                    inputStream.close();
                    outputStream.close();
                    if (this.fileSystemWAL.walStartPointer.partNum == pointer.partNum) {
                        this.fileSystemWAL.walStartPointer.offset = 0L;
                    }
                    this.fileContext.rename(tmpPart, inputPartPath, new Options.Rename[]{Options.Rename.OVERWRITE});
                }
            }
        }

        protected void flush() throws IOException {
            if (this.outputStream != null) {
                if (this.fileContext.getDefaultFileSystem() instanceof LocalFs || this.fileContext.getDefaultFileSystem() instanceof RawLocalFs) {
                    this.close();
                } else {
                    Syncable syncableOutputStream = (Syncable)this.outputStream;
                    syncableOutputStream.hflush();
                    syncableOutputStream.hsync();
                }
            }
        }

        protected boolean shouldRotate(int entryLength) {
            return this.currentPointer.offset + (long)entryLength > this.fileSystemWAL.maxLength;
        }

        protected void rotate(boolean openNextFile) throws IOException {
            this.flush();
            this.close();
            this.pendingFinalization.put(this.fileSystemWAL.getLastCheckpointedWindow(), this.currentPointer.partNum);
            LOG.debug("rotate {} to {}", (Object)this.currentPointer.partNum, (Object)(this.currentPointer.partNum + 1));
            this.currentPointer = new FileSystemWALPointer(this.currentPointer.partNum + 1, 0L);
            if (openNextFile) {
                this.outputStream = this.getOutputStream(this.currentPointer);
            }
        }

        protected void finalizeFiles(long window) throws IOException {
            if (!this.fileSystemWAL.tempPartFiles.isEmpty()) {
                Map.Entry<Long, Integer> entry;
                int largestPartAvailable = (Integer)this.fileSystemWAL.tempPartFiles.keySet().iterator().next();
                Iterator<Map.Entry<Long, Integer>> pendingFinalizeIter = this.pendingFinalization.entrySet().iterator();
                while (pendingFinalizeIter.hasNext() && (entry = pendingFinalizeIter.next()).getKey() < window) {
                    pendingFinalizeIter.remove();
                    int partToFinalizeTill = entry.getValue();
                    for (int i = largestPartAvailable; i <= partToFinalizeTill; ++i) {
                        String tmpToFinalize = (String)this.fileSystemWAL.tempPartFiles.remove(i);
                        Path tmpPath = new Path(tmpToFinalize);
                        if (!this.fileContext.util().exists(tmpPath)) continue;
                        LOG.debug("finalize {} of part {}", (Object)tmpToFinalize, (Object)i);
                        this.fileContext.rename(tmpPath, new Path(this.fileSystemWAL.getPartFilePath(i)), new Options.Rename[]{Options.Rename.OVERWRITE});
                        this.latestFinalizedPart = i;
                    }
                    largestPartAvailable = partToFinalizeTill + 1;
                }
            }
            if (this.lowestDeletedPart != -1 && this.lowestDeletedPart < this.fileSystemWAL.walStartPointer.partNum) {
                this.deleteFinalizedParts(this.fileSystemWAL.walStartPointer);
            }
        }

        private DataOutputStream getOutputStream(FileSystemWALPointer pointer) throws IOException {
            Preconditions.checkArgument((this.outputStream == null ? 1 : 0) != 0, (Object)"output stream is not null");
            if (pointer.offset > 0L && (this.fileContext.getDefaultFileSystem() instanceof LocalFs || this.fileContext.getDefaultFileSystem() instanceof RawLocalFs)) {
                return this.fileContext.create(new Path((String)this.fileSystemWAL.tempPartFiles.get(pointer.partNum)), EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND), new Options.CreateOpts[]{Options.CreateOpts.CreateParent.createParent()});
            }
            String partFile = this.fileSystemWAL.getPartFilePath(pointer.partNum);
            String tmpFilePath = FileSystemWAL.getTmpFilePath(partFile);
            this.fileSystemWAL.tempPartFiles.put(pointer.partNum, tmpFilePath);
            Preconditions.checkArgument((pointer.offset == 0L ? 1 : 0) != 0, (Object)"offset > 0");
            LOG.debug("open {} => {}", (Object)pointer.partNum, (Object)tmpFilePath);
            this.outputStream = this.fileContext.create(new Path(tmpFilePath), EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), new Options.CreateOpts[]{Options.CreateOpts.CreateParent.createParent()});
            return this.outputStream;
        }
    }

    public static class FileSystemWALReader
    implements WAL.WALReader<FileSystemWALPointer> {
        private FileSystemWALPointer currentPointer;
        private transient DataInputStream inputStream;
        private transient Path currentOpenPath;
        private transient boolean isOpenPathTmp;
        private final FileSystemWAL fileSystemWAL;
        private transient FileContext fileContext;

        private FileSystemWALReader() {
            this.fileSystemWAL = null;
        }

        public FileSystemWALReader(@NotNull FileSystemWAL fileSystemWal) {
            this.fileSystemWAL = (FileSystemWAL)Preconditions.checkNotNull((Object)fileSystemWal, (Object)"wal");
            this.currentPointer = new FileSystemWALPointer(fileSystemWal.walStartPointer.partNum, fileSystemWal.walStartPointer.offset);
        }

        protected void open(@NotNull FileContext fileContext) throws IOException {
            this.fileContext = (FileContext)Preconditions.checkNotNull((Object)fileContext, (Object)"fileContext");
        }

        protected void close() throws IOException {
            if (this.inputStream != null) {
                this.inputStream.close();
                this.inputStream = null;
            }
        }

        @Override
        public void seek(FileSystemWALPointer pointer) throws IOException {
            Preconditions.checkArgument((pointer.compareTo(this.fileSystemWAL.walStartPointer) >= 0 ? 1 : 0) != 0, (Object)"invalid pointer");
            if (this.inputStream != null) {
                this.close();
            }
            this.inputStream = this.getInputStream(pointer);
            this.currentPointer = pointer;
        }

        private boolean nextSegment() throws IOException {
            if (this.inputStream != null) {
                this.close();
            }
            this.currentPointer = new FileSystemWALPointer(this.currentPointer.partNum + 1, 0L);
            this.inputStream = this.getInputStream(this.currentPointer);
            return this.inputStream != null;
        }

        private DataInputStream getInputStream(FileSystemWALPointer walPointer) throws IOException {
            Path pathToReadFrom;
            Preconditions.checkArgument((this.inputStream == null ? 1 : 0) != 0, (Object)"input stream not null");
            String tmpPath = (String)this.fileSystemWAL.tempPartFiles.get(walPointer.getPartNum());
            if (tmpPath != null) {
                pathToReadFrom = new Path(tmpPath);
                this.isOpenPathTmp = true;
            } else {
                pathToReadFrom = new Path(this.fileSystemWAL.getPartFilePath(walPointer.partNum));
                this.isOpenPathTmp = false;
            }
            LOG.debug("path to read {} and pointer {}", (Object)pathToReadFrom, (Object)walPointer);
            if (this.fileContext.util().exists(pathToReadFrom)) {
                FSDataInputStream stream = this.fileContext.open(pathToReadFrom);
                if (walPointer.offset > 0L) {
                    stream.skip(walPointer.offset);
                }
                this.currentOpenPath = pathToReadFrom;
                return stream;
            }
            return null;
        }

        @Override
        public Slice next() throws IOException {
            do {
                if (this.inputStream == null) {
                    this.inputStream = this.getInputStream(this.currentPointer);
                }
                if (this.inputStream != null && this.isOpenPathTmp && !this.fileSystemWAL.tempPartFiles.containsKey(this.currentPointer.partNum)) {
                    this.close();
                    this.inputStream = this.getInputStream(this.currentPointer);
                }
                if (this.inputStream == null || this.currentPointer.offset >= this.fileContext.getFileStatus(this.currentOpenPath).getLen()) continue;
                int len = this.inputStream.readInt();
                Preconditions.checkState((len >= 0 ? 1 : 0) != 0, (Object)"negative length");
                byte[] data = new byte[len];
                this.inputStream.readFully(data);
                this.currentPointer.offset += data.length + 4;
                return new Slice(data);
            } while (this.nextSegment());
            this.close();
            return null;
        }

        @Override
        public FileSystemWALPointer getStartPointer() {
            return this.fileSystemWAL.walStartPointer;
        }
    }

    public static class FileSystemWALPointer
    implements Comparable<FileSystemWALPointer> {
        private final int partNum;
        private long offset;

        private FileSystemWALPointer() {
            this.partNum = -1;
        }

        public FileSystemWALPointer(int partNum, long offset) {
            this.partNum = partNum;
            this.offset = offset;
        }

        @Override
        public int compareTo(@NotNull FileSystemWALPointer o) {
            int partNumComparison = Integer.compare(this.partNum, o.partNum);
            return partNumComparison == 0 ? Long.compare(this.offset, o.offset) : partNumComparison;
        }

        public int getPartNum() {
            return this.partNum;
        }

        public long getOffset() {
            return this.offset;
        }

        public String toString() {
            return "FileSystemWalPointer{partNum=" + this.partNum + ", offset=" + this.offset + '}';
        }
    }
}

