/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.io.fs;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.counters.BasicCounters;
import com.datatorrent.lib.io.fs.FilterStreamContext;
import com.datatorrent.lib.io.fs.FilterStreamProvider;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@OperatorAnnotation(checkpointableWithinAppWindow=false)
public abstract class AbstractFileOutputOperator<INPUT>
extends BaseOperator
implements Operator.CheckpointListener {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractFileOutputOperator.class);
    private static final String TMP_EXTENSION = ".tmp";
    private static final int MAX_NUMBER_FILES_IN_TEARDOWN_EXCEPTION = 25;
    private static final int COPY_BUFFER_SIZE = 1024;
    public static final int DEFAULT_MAX_OPEN_FILES = 100;
    protected Map<String, MutableInt> openPart;
    protected Map<String, MutableLong> endOffsets;
    protected Map<String, MutableLong> counts;
    @NotNull
    protected Map<String, RotationState> rotationStates;
    @NotNull
    protected String filePath;
    protected long totalBytesWritten = 0L;
    @Min(value=0L)
    protected int replication = 0;
    @Min(value=1L)
    protected int maxOpenFiles = 100;
    @Min(value=1L)
    protected Long maxLength = Long.MAX_VALUE;
    @Min(value=0L)
    protected int rotationWindows = 0;
    protected transient boolean rollingFile = false;
    protected transient FileSystem fs;
    protected transient FileContext fileContext;
    protected short filePermission = (short)511;
    protected transient LoadingCache<String, FSFilterStreamContext> streamsCache;
    protected transient Context.OperatorContext context;
    private transient long totalWritingTime;
    protected final BasicCounters<MutableLong> fileCounters = new BasicCounters<MutableLong>(MutableLong.class);
    protected StreamCodec<INPUT> streamCodec;
    private int rotationCount;
    protected FilterStreamProvider filterStreamProvider;
    protected boolean alwaysWriteToTmp = true;
    private final Map<String, String> fileNameToTmpName;
    private final Map<Long, Set<String>> finalizedFiles;
    protected final Map<String, MutableInt> finalizedPart;
    protected long currentWindow;
    private Long expireStreamAfterAccessMillis;
    private final Set<String> filesWithOpenStreams;
    public final transient DefaultInputPort<INPUT> input = new DefaultInputPort<INPUT>(){

        public void process(INPUT tuple) {
            AbstractFileOutputOperator.this.processTuple(tuple);
        }

        public StreamCodec<INPUT> getStreamCodec() {
            if (AbstractFileOutputOperator.this.streamCodec == null) {
                return super.getStreamCodec();
            }
            return AbstractFileOutputOperator.this.streamCodec;
        }
    };

    public AbstractFileOutputOperator() {
        this.endOffsets = Maps.newHashMap();
        this.counts = Maps.newHashMap();
        this.openPart = Maps.newHashMap();
        this.rotationStates = Maps.newHashMap();
        this.fileNameToTmpName = Maps.newHashMap();
        this.finalizedFiles = Maps.newTreeMap();
        this.finalizedPart = Maps.newHashMap();
        this.filesWithOpenStreams = Sets.newHashSet();
    }

    protected FileSystem getFSInstance() throws IOException {
        FileSystem tempFS = FileSystem.newInstance((URI)new Path(this.filePath).toUri(), (Configuration)new Configuration());
        if (tempFS instanceof LocalFileSystem) {
            tempFS = ((LocalFileSystem)tempFS).getRaw();
        }
        return tempFS;
    }

    public void setup(Context.OperatorContext context) {
        LOG.debug("setup initiated");
        if (this.expireStreamAfterAccessMillis == null) {
            this.expireStreamAfterAccessMillis = (Integer)context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS) * (Integer)context.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT);
        }
        this.rollingFile = this.maxLength < Long.MAX_VALUE || this.rotationWindows > 0;
        try {
            this.fs = this.getFSInstance();
        }
        catch (IOException ex) {
            throw new RuntimeException(ex);
        }
        if (this.replication <= 0) {
            this.replication = this.fs.getDefaultReplication(new Path(this.filePath));
        }
        LOG.debug("FS class {}", this.fs.getClass());
        RemovalListener<String, FSFilterStreamContext> removalListener = this.createCacheRemoveListener();
        CacheLoader<String, FSFilterStreamContext> loader = this.createCacheLoader();
        this.streamsCache = CacheBuilder.newBuilder().maximumSize((long)this.maxOpenFiles).expireAfterAccess(this.expireStreamAfterAccessMillis.longValue(), TimeUnit.MILLISECONDS).removalListener(removalListener).build(loader);
        LOG.debug("File system class: {}", this.fs.getClass());
        LOG.debug("end-offsets {}", this.endOffsets);
        try {
            Path writerPath = new Path(this.filePath);
            if (this.fs.exists(writerPath)) {
                for (String seenFileName : this.endOffsets.keySet()) {
                    Path activeFilePath;
                    String seenFileNamePart = this.getPartFileNamePri(seenFileName);
                    LOG.debug("seenFileNamePart: {}", (Object)seenFileNamePart);
                    if (this.alwaysWriteToTmp) {
                        String tmpFileName = this.fileNameToTmpName.get(seenFileNamePart);
                        activeFilePath = new Path(this.filePath + "/" + tmpFileName);
                    } else {
                        activeFilePath = new Path(this.filePath + "/" + seenFileNamePart);
                    }
                    if (!this.fs.exists(activeFilePath)) continue;
                    this.recoverFile(seenFileName, seenFileNamePart, activeFilePath);
                }
            }
            if (this.rollingFile) {
                for (String seenFileName : this.endOffsets.keySet()) {
                    try {
                        String tmpFileName;
                        Path activePath;
                        String seenPartFileName;
                        Integer fileOpenPart = this.openPart.get(seenFileName).getValue();
                        int nextPart = fileOpenPart + 1;
                        while (true) {
                            seenPartFileName = this.getPartFileName(seenFileName, nextPart);
                            activePath = null;
                            if (this.alwaysWriteToTmp) {
                                tmpFileName = this.fileNameToTmpName.get(seenPartFileName);
                                if (tmpFileName != null) {
                                    activePath = new Path(this.filePath + "/" + tmpFileName);
                                }
                            } else {
                                activePath = new Path(this.filePath + "/" + seenPartFileName);
                            }
                            if (activePath == null || !this.fs.exists(activePath)) break;
                            this.fs.delete(activePath, true);
                            ++nextPart;
                        }
                        seenPartFileName = this.getPartFileName(seenFileName, fileOpenPart);
                        activePath = null;
                        if (this.alwaysWriteToTmp) {
                            tmpFileName = this.fileNameToTmpName.get(seenPartFileName);
                            if (tmpFileName != null) {
                                activePath = new Path(this.filePath + "/" + this.fileNameToTmpName.get(seenPartFileName));
                            }
                        } else {
                            activePath = new Path(this.filePath + "/" + seenPartFileName);
                        }
                        if (activePath == null || this.fs.getFileStatus(activePath).getLen() <= this.maxLength) continue;
                        LOG.debug("rotating file at setup.");
                        this.rotate(seenFileName);
                    }
                    catch (IOException | ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
            LOG.debug("setup completed");
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        this.context = context;
        this.fileCounters.setCounter(Counters.TOTAL_BYTES_WRITTEN, new MutableLong());
        this.fileCounters.setCounter(Counters.TOTAL_TIME_WRITING_MILLISECONDS, new MutableLong());
    }

    private void recoverFile(String filename, String partFileName, Path filepath) throws IOException {
        LOG.debug("path exists {}", (Object)filepath);
        long offset = this.endOffsets.get(filename).longValue();
        FSDataInputStream inputStream = this.fs.open(filepath);
        FileStatus status = this.fs.getFileStatus(filepath);
        if (status.getLen() != offset) {
            LOG.info("path corrupted {} {} {}", new Object[]{filepath, offset, status.getLen()});
            byte[] buffer = new byte[1024];
            String recoveryFileName = partFileName + '.' + System.currentTimeMillis() + TMP_EXTENSION;
            Path recoveryFilePath = new Path(this.filePath + "/" + recoveryFileName);
            FSDataOutputStream fsOutput = this.openStream(recoveryFilePath, false);
            while (inputStream.getPos() < offset) {
                long remainingBytes = offset - inputStream.getPos();
                int bytesToWrite = remainingBytes < 1024L ? (int)remainingBytes : 1024;
                inputStream.read(buffer);
                fsOutput.write(buffer, 0, bytesToWrite);
            }
            this.flush(fsOutput);
            fsOutput.close();
            inputStream.close();
            LOG.debug("active {} recovery {} ", (Object)filepath, (Object)recoveryFilePath);
            if (this.alwaysWriteToTmp) {
                this.fileNameToTmpName.put(partFileName, recoveryFileName);
            } else {
                LOG.debug("recovery path {} actual path {} ", (Object)recoveryFilePath, (Object)status.getPath());
                this.rename(recoveryFilePath, status.getPath());
            }
        } else {
            if (this.alwaysWriteToTmp && this.filesWithOpenStreams.contains(filename)) {
                String currentTmp = partFileName + '.' + System.currentTimeMillis() + TMP_EXTENSION;
                FSDataOutputStream outputStream = this.openStream(new Path(this.filePath + "/" + currentTmp), false);
                IOUtils.copy((InputStream)inputStream, (OutputStream)outputStream);
                this.streamsCache.put((Object)filename, (Object)new FSFilterStreamContext(outputStream));
                this.fileNameToTmpName.put(partFileName, currentTmp);
            }
            inputStream.close();
        }
    }

    private CacheLoader<String, FSFilterStreamContext> createCacheLoader() {
        return new CacheLoader<String, FSFilterStreamContext>(){

            public FSFilterStreamContext load(@Nonnull String filename) {
                Path activeFilePath;
                if (AbstractFileOutputOperator.this.rollingFile) {
                    RotationState state = AbstractFileOutputOperator.this.getRotationState(filename);
                    if (AbstractFileOutputOperator.this.rollingFile && state.rotated) {
                        AbstractFileOutputOperator.this.openPart.get(filename).add(1);
                        state.rotated = false;
                        MutableLong offset = AbstractFileOutputOperator.this.endOffsets.get(filename);
                        offset.setValue(0L);
                    }
                }
                String partFileName = AbstractFileOutputOperator.this.getPartFileNamePri(filename);
                Path originalFilePath = new Path(AbstractFileOutputOperator.this.filePath + "/" + partFileName);
                if (!AbstractFileOutputOperator.this.alwaysWriteToTmp) {
                    activeFilePath = originalFilePath;
                } else {
                    String tmpFileName = (String)AbstractFileOutputOperator.this.fileNameToTmpName.get(partFileName);
                    if (tmpFileName == null) {
                        tmpFileName = partFileName + '.' + System.currentTimeMillis() + AbstractFileOutputOperator.TMP_EXTENSION;
                        AbstractFileOutputOperator.this.fileNameToTmpName.put(partFileName, tmpFileName);
                    }
                    activeFilePath = new Path(AbstractFileOutputOperator.this.filePath + "/" + tmpFileName);
                }
                boolean sawThisFileBefore = AbstractFileOutputOperator.this.endOffsets.containsKey(filename);
                try {
                    FSDataOutputStream fsOutput;
                    if (AbstractFileOutputOperator.this.fs.exists(originalFilePath) || AbstractFileOutputOperator.this.alwaysWriteToTmp && AbstractFileOutputOperator.this.fs.exists(activeFilePath)) {
                        if (sawThisFileBefore) {
                            FileStatus fileStatus = AbstractFileOutputOperator.this.fs.getFileStatus(activeFilePath);
                            MutableLong endOffset = AbstractFileOutputOperator.this.endOffsets.get(filename);
                            if (endOffset != null) {
                                endOffset.setValue(fileStatus.getLen());
                            } else {
                                AbstractFileOutputOperator.this.endOffsets.put(filename, new MutableLong(fileStatus.getLen()));
                            }
                            fsOutput = AbstractFileOutputOperator.this.openStream(activeFilePath, true);
                            LOG.debug("appending to {}", (Object)activeFilePath);
                        } else if (AbstractFileOutputOperator.this.rollingFile) {
                            Path seenPartFilePath;
                            int part = 0;
                            while (AbstractFileOutputOperator.this.fs.exists(seenPartFilePath = new Path(AbstractFileOutputOperator.this.filePath + "/" + AbstractFileOutputOperator.this.getPartFileName(filename, part)))) {
                                AbstractFileOutputOperator.this.fs.delete(seenPartFilePath, true);
                                ++part;
                            }
                            fsOutput = AbstractFileOutputOperator.this.openStream(activeFilePath, false);
                        } else {
                            AbstractFileOutputOperator.this.fs.delete(activeFilePath, true);
                            if (AbstractFileOutputOperator.this.alwaysWriteToTmp && AbstractFileOutputOperator.this.fs.exists(originalFilePath)) {
                                AbstractFileOutputOperator.this.fs.delete(originalFilePath, true);
                            }
                            fsOutput = AbstractFileOutputOperator.this.openStream(activeFilePath, false);
                        }
                    } else {
                        fsOutput = AbstractFileOutputOperator.this.openStream(activeFilePath, false);
                    }
                    AbstractFileOutputOperator.this.filesWithOpenStreams.add(filename);
                    LOG.info("opened {}, active {}", (Object)partFileName, (Object)activeFilePath);
                    return new FSFilterStreamContext(fsOutput);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        };
    }

    private RemovalListener<String, FSFilterStreamContext> createCacheRemoveListener() {
        return new RemovalListener<String, FSFilterStreamContext>(){

            public void onRemoval(@Nonnull RemovalNotification<String, FSFilterStreamContext> notification) {
                FSFilterStreamContext streamContext = (FSFilterStreamContext)notification.getValue();
                if (streamContext != null) {
                    try {
                        String filename = (String)notification.getKey();
                        String partFileName = AbstractFileOutputOperator.this.getPartFileNamePri(filename);
                        LOG.info("closing {}", (Object)partFileName);
                        long start = System.currentTimeMillis();
                        AbstractFileOutputOperator.this.closeStream(streamContext);
                        AbstractFileOutputOperator.this.filesWithOpenStreams.remove(filename);
                        AbstractFileOutputOperator.this.totalWritingTime += System.currentTimeMillis() - start;
                    }
                    catch (IOException e) {
                        LOG.error("removing {}", notification.getValue(), (Object)e);
                        throw new RuntimeException(e);
                    }
                }
            }
        };
    }

    protected FSDataOutputStream openStream(Path filepath, boolean append) throws IOException {
        FSDataOutputStream fsOutput;
        if (append) {
            fsOutput = this.fs.append(filepath);
        } else {
            fsOutput = this.fs.create(filepath, (short)this.replication);
            this.fs.setPermission(filepath, FsPermission.createImmutable((short)this.filePermission));
        }
        return fsOutput;
    }

    protected void closeStream(FSFilterStreamContext streamContext) throws IOException {
        streamContext.close();
    }

    protected void rename(Path source, Path destination) throws IOException {
        if (this.fileContext == null) {
            this.fileContext = FileContext.getFileContext((URI)this.fs.getUri());
        }
        this.fileContext.rename(source, destination, new Options.Rename[]{Options.Rename.OVERWRITE});
    }

    protected void requestFinalize(String fileName) {
        HashSet filesPerWindow = this.finalizedFiles.get(this.currentWindow);
        if (filesPerWindow == null) {
            filesPerWindow = Sets.newHashSet();
            this.finalizedFiles.put(this.currentWindow, filesPerWindow);
        }
        if (this.rollingFile) {
            MutableInt part = this.finalizedPart.get(fileName);
            if (part == null) {
                part = new MutableInt(-1);
                this.finalizedPart.put(fileName, part);
            }
            MutableInt currentOpenPart = this.openPart.get(fileName);
            for (int x = part.getValue() + 1; x <= currentOpenPart.getValue(); ++x) {
                String prevPartNotFinalized = this.getPartFileName(fileName, x);
                LOG.debug("request finalize {}", (Object)prevPartNotFinalized);
                filesPerWindow.add(prevPartNotFinalized);
            }
            fileName = this.getPartFileNamePri(fileName);
            part.setValue((Number)currentOpenPart.getValue());
        }
        filesPerWindow.add(fileName);
    }

    public void teardown() {
        ArrayList<String> fileNames = new ArrayList<String>();
        int numberOfFailures = 0;
        IOException savedException = null;
        ConcurrentMap openStreams = this.streamsCache.asMap();
        for (String seenFileName : openStreams.keySet()) {
            FSFilterStreamContext fsFilterStreamContext = (FSFilterStreamContext)openStreams.get(seenFileName);
            try {
                long start = System.currentTimeMillis();
                this.closeStream(fsFilterStreamContext);
                this.filesWithOpenStreams.remove(seenFileName);
                this.totalWritingTime += System.currentTimeMillis() - start;
            }
            catch (IOException ex) {
                ++numberOfFailures;
                if (fileNames.size() >= 25) continue;
                fileNames.add(seenFileName);
                savedException = ex;
            }
        }
        boolean fsFailed = false;
        try {
            this.fs.close();
        }
        catch (IOException ex) {
            savedException = ex;
            fsFailed = true;
        }
        if (savedException != null) {
            String errorMessage = "";
            if (fsFailed) {
                errorMessage = errorMessage + "Closing the fileSystem failed. ";
            }
            if (!fileNames.isEmpty()) {
                errorMessage = errorMessage + "The following files failed closing: ";
            }
            for (String seenFileName : fileNames) {
                errorMessage = errorMessage + seenFileName + ", ";
            }
            if (numberOfFailures > 25) {
                errorMessage = errorMessage + (numberOfFailures - 25) + " more files failed.";
            }
            throw new RuntimeException(errorMessage, savedException);
        }
        long currentTimeStamp = System.currentTimeMillis();
    }

    protected void processTuple(INPUT tuple) {
        String fileName = this.getFileName(tuple);
        if (Strings.isNullOrEmpty((String)fileName)) {
            return;
        }
        try {
            MutableLong count;
            FilterOutputStream fsOutput = ((FSFilterStreamContext)this.streamsCache.get((Object)fileName)).getFilterStream();
            byte[] tupleBytes = this.getBytesForTuple(tuple);
            long start = System.currentTimeMillis();
            fsOutput.write(tupleBytes);
            this.totalWritingTime += System.currentTimeMillis() - start;
            this.totalBytesWritten += (long)tupleBytes.length;
            MutableLong currentOffset = this.endOffsets.get(fileName);
            if (currentOffset == null) {
                currentOffset = new MutableLong(0L);
                this.endOffsets.put(fileName, currentOffset);
            }
            currentOffset.add((long)tupleBytes.length);
            if (this.rotationWindows > 0) {
                this.getRotationState((String)fileName).notEmpty = true;
            }
            if (this.rollingFile && currentOffset.longValue() > this.maxLength) {
                LOG.debug("Rotating file {} {} {}", new Object[]{fileName, this.openPart.get(fileName), currentOffset.longValue()});
                this.rotate(fileName);
            }
            if ((count = this.counts.get(fileName)) == null) {
                count = new MutableLong(0L);
                this.counts.put(fileName, count);
            }
            count.add(1L);
        }
        catch (IOException | ExecutionException ex) {
            throw new RuntimeException(ex);
        }
    }

    protected void rotate(String fileName) throws IllegalArgumentException, IOException, ExecutionException {
        this.requestFinalize(fileName);
        this.counts.remove(fileName);
        this.streamsCache.invalidate((Object)fileName);
        MutableInt mi = this.openPart.get(fileName);
        LOG.debug("Part file rotated {} : {}", (Object)fileName, (Object)mi.getValue());
        String partFileName = this.getPartFileName(fileName, mi.getValue());
        this.rotateHook(partFileName);
        this.getRotationState((String)fileName).rotated = true;
    }

    private RotationState getRotationState(String fileName) {
        RotationState rotationState = this.rotationStates.get(fileName);
        if (rotationState == null) {
            rotationState = new RotationState();
            this.rotationStates.put(fileName, rotationState);
        }
        return rotationState;
    }

    @Deprecated
    protected void rotateHook(String finishedFile) {
    }

    protected void flush(FSDataOutputStream fsOutput) throws IOException {
        if (this.fs instanceof LocalFileSystem || this.fs instanceof RawLocalFileSystem) {
            fsOutput.flush();
        } else {
            fsOutput.hflush();
        }
    }

    protected String getPartFileNamePri(String fileName) {
        if (!this.rollingFile) {
            return fileName;
        }
        MutableInt part = this.openPart.get(fileName);
        if (part == null) {
            part = new MutableInt(0);
            this.openPart.put(fileName, part);
            LOG.debug("First file part number {}", (Object)part);
        }
        return this.getPartFileName(fileName, part.intValue());
    }

    protected String getPartFileName(String fileName, int part) {
        return fileName + "." + part;
    }

    public void beginWindow(long windowId) {
        try {
            ConcurrentMap openStreams = this.streamsCache.asMap();
            for (FSFilterStreamContext streamContext : openStreams.values()) {
                streamContext.initializeContext();
            }
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        this.currentWindow = windowId;
    }

    public void endWindow() {
        try {
            ConcurrentMap openStreams = this.streamsCache.asMap();
            for (FSFilterStreamContext streamContext : openStreams.values()) {
                long start = System.currentTimeMillis();
                streamContext.finalizeContext();
                this.totalWritingTime += System.currentTimeMillis() - start;
            }
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        if (this.rotationWindows > 0 && ++this.rotationCount == this.rotationWindows) {
            this.rotationCount = 0;
            Iterator<Map.Entry<String, MutableInt>> iterator = this.openPart.entrySet().iterator();
            while (iterator.hasNext()) {
                String filename = iterator.next().getKey();
                RotationState rotationState = this.rotationStates.get(filename);
                boolean rotate = false;
                if (rotationState != null) {
                    rotate = !rotationState.rotated && rotationState.notEmpty;
                    rotationState.notEmpty = false;
                }
                if (!rotate) continue;
                try {
                    this.rotate(filename);
                }
                catch (IOException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            }
        }
        this.fileCounters.getCounter(Counters.TOTAL_TIME_WRITING_MILLISECONDS).setValue(this.totalWritingTime);
        this.fileCounters.getCounter(Counters.TOTAL_BYTES_WRITTEN).setValue(this.totalBytesWritten);
        this.context.setCounters(this.fileCounters);
    }

    protected abstract String getFileName(INPUT var1);

    protected abstract byte[] getBytesForTuple(INPUT var1);

    public void setFilePath(@Nonnull String dir) {
        this.filePath = dir;
    }

    public String getFilePath() {
        return this.filePath;
    }

    public void setMaxLength(long maxLength) {
        this.maxLength = maxLength;
    }

    public long getMaxLength() {
        return this.maxLength;
    }

    public int getRotationWindows() {
        return this.rotationWindows;
    }

    public void setRotationWindows(int rotationWindows) {
        this.rotationWindows = rotationWindows;
    }

    public void setMaxOpenFiles(int maxOpenFiles) {
        this.maxOpenFiles = maxOpenFiles;
    }

    public int getMaxOpenFiles() {
        return this.maxOpenFiles;
    }

    public short getFilePermission() {
        return this.filePermission;
    }

    public void setFilePermission(short filePermission) {
        this.filePermission = filePermission;
    }

    public FilterStreamProvider getFilterStreamProvider() {
        return this.filterStreamProvider;
    }

    public void setFilterStreamProvider(FilterStreamProvider filterStreamProvider) {
        this.filterStreamProvider = filterStreamProvider;
    }

    public void checkpointed(long l) {
    }

    public void committed(long l) {
        if (this.alwaysWriteToTmp) {
            Iterator<Map.Entry<Long, Set<String>>> finalizedFilesIter = this.finalizedFiles.entrySet().iterator();
            try {
                Map.Entry<Long, Set<String>> filesPerWindow;
                while (finalizedFilesIter.hasNext() && (filesPerWindow = finalizedFilesIter.next()).getKey() <= l) {
                    for (String file : filesPerWindow.getValue()) {
                        this.finalizeFile(file);
                    }
                    finalizedFilesIter.remove();
                }
            }
            catch (IOException e) {
                throw new RuntimeException("failed to commit", e);
            }
        }
    }

    protected void finalizeFile(String fileName) throws IOException {
        FileStatus[] statuses;
        String tmpFileName = this.fileNameToTmpName.get(fileName);
        Path srcPath = new Path(this.filePath + "/" + tmpFileName);
        Path destPath = new Path(this.filePath + "/" + fileName);
        if (!this.fs.exists(destPath)) {
            LOG.debug("rename from tmp {} actual {} ", (Object)tmpFileName, (Object)fileName);
            this.rename(srcPath, destPath);
        } else if (this.fs.exists(srcPath)) {
            LOG.debug("deleting tmp {}", (Object)tmpFileName);
            this.fs.delete(srcPath, true);
        }
        this.endOffsets.remove(fileName);
        this.fileNameToTmpName.remove(fileName);
        for (FileStatus status : statuses = this.fs.listStatus(destPath.getParent())) {
            String actualFileName;
            String statusName = status.getPath().getName();
            if (!statusName.endsWith(TMP_EXTENSION) || !statusName.startsWith(destPath.getName()) || !fileName.equals(actualFileName = statusName.substring(0, statusName.lastIndexOf(46, statusName.lastIndexOf(46) - 1)))) continue;
            LOG.debug("deleting stray file {}", (Object)statusName);
            this.fs.delete(status.getPath(), true);
        }
    }

    public boolean isAlwaysWriteToTmp() {
        return this.alwaysWriteToTmp;
    }

    public void setAlwaysWriteToTmp(boolean alwaysWriteToTmp) {
        this.alwaysWriteToTmp = alwaysWriteToTmp;
    }

    @VisibleForTesting
    protected Map<String, String> getFileNameToTmpName() {
        return this.fileNameToTmpName;
    }

    @VisibleForTesting
    protected Map<Long, Set<String>> getFinalizedFiles() {
        return this.finalizedFiles;
    }

    public Long getExpireStreamAfterAccessMillis() {
        return this.expireStreamAfterAccessMillis;
    }

    public void setExpireStreamAfterAccessMillis(Long millis) {
        this.expireStreamAfterAccessMillis = millis;
    }

    private static class NonCloseableFilterOutputStream
    extends FilterOutputStream {
        public NonCloseableFilterOutputStream(OutputStream out) {
            super(out);
        }

        @Override
        public void close() throws IOException {
        }
    }

    protected class FSFilterStreamContext
    implements FilterStreamContext<FilterOutputStream> {
        private FSDataOutputStream outputStream;
        private FilterStreamContext filterContext;
        private NonCloseableFilterOutputStream outputWrapper;

        public FSFilterStreamContext(FSDataOutputStream outputStream) throws IOException {
            this.outputStream = outputStream;
            this.outputWrapper = new NonCloseableFilterOutputStream((OutputStream)outputStream);
            this.initializeContext();
        }

        @Override
        public FilterOutputStream getFilterStream() {
            if (this.filterContext != null) {
                return this.filterContext.getFilterStream();
            }
            return this.outputStream;
        }

        @Override
        public void finalizeContext() throws IOException {
            if (this.filterContext != null) {
                this.filterContext.finalizeContext();
                this.outputWrapper.flush();
            }
            this.outputStream.hflush();
            if (AbstractFileOutputOperator.this.filterStreamProvider != null) {
                AbstractFileOutputOperator.this.filterStreamProvider.reclaimFilterStreamContext(this.filterContext);
            }
        }

        public void initializeContext() throws IOException {
            if (AbstractFileOutputOperator.this.filterStreamProvider != null) {
                this.filterContext = AbstractFileOutputOperator.this.filterStreamProvider.getFilterStreamContext(this.outputWrapper);
            }
        }

        public void close() throws IOException {
            if (this.filterContext != null) {
                ((FilterOutputStream)this.filterContext.getFilterStream()).close();
            }
            this.outputStream.close();
        }
    }

    public static enum Counters {
        TOTAL_BYTES_WRITTEN,
        TOTAL_TIME_WRITING_MILLISECONDS;

    }

    private static class RotationState {
        boolean notEmpty;
        boolean rotated;

        private RotationState() {
        }
    }
}

