/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.dataset.partition;

import edu.iu.dsc.tws.api.comms.messaging.types.MessageType;
import edu.iu.dsc.tws.api.comms.messaging.types.MessageTypes;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.data.FileStatus;
import edu.iu.dsc.tws.api.data.FileSystem;
import edu.iu.dsc.tws.api.data.Path;
import edu.iu.dsc.tws.api.dataset.DataPartitionConsumer;
import edu.iu.dsc.tws.api.exceptions.Twister2RuntimeException;
import edu.iu.dsc.tws.dataset.partition.CollectionPartition;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.logging.Logger;
import java.util.stream.Collectors;

public abstract class BufferedCollectionPartition<T>
extends CollectionPartition<T>
implements Closeable {
    private static final Logger LOG = Logger.getLogger(BufferedCollectionPartition.class.getName());
    private static final long DEFAULT_MAX_BUFFERED_BYTES = 10000000L;
    private static final MessageType DEFAULT_DATA_TYPE = MessageTypes.OBJECT;
    private static final String EXTENSION = ".pbck";
    private long maxFramesInMemory;
    private MessageType dataType;
    private List<Path> filesList = new ArrayList<Path>();
    private long fileCounter;
    private List<byte[]> buffers = new ArrayList<byte[]>();
    private long bufferedBytes = 0L;
    private long maxBufferedBytes;
    private FileSystem fileSystem;
    private Path rootPath;
    private String reference;
    private List<byte[]> currentFileCache = new ArrayList<byte[]>();
    private int cachedFileIndex = -1;

    public BufferedCollectionPartition(long maxFramesInMemory, MessageType dataType, long bufferedBytes, Config config, String reference) {
        this.reference = reference;
        this.maxFramesInMemory = maxFramesInMemory;
        this.maxBufferedBytes = bufferedBytes;
        this.dataType = dataType;
        try {
            this.fileSystem = this.getFileSystem(config);
            this.rootPath = this.getRootPath(config);
            this.fileSystem.mkdirs(this.rootPath);
        }
        catch (IOException e) {
            throw new Twister2RuntimeException("Failed to initialize and create a directory to hold the partition", (Throwable)e);
        }
    }

    public BufferedCollectionPartition(long maxFramesInMemory, Config config) {
        this(maxFramesInMemory, DEFAULT_DATA_TYPE, 10000000L, config, UUID.randomUUID().toString());
    }

    public BufferedCollectionPartition(long maxFramesInMemory, Config config, String reference) {
        this(maxFramesInMemory, DEFAULT_DATA_TYPE, 10000000L, config, reference);
    }

    public BufferedCollectionPartition(long maxFramesInMemory, MessageType dataType, Config config) {
        this(maxFramesInMemory, dataType, 10000000L, config, UUID.randomUUID().toString());
    }

    public BufferedCollectionPartition(MessageType dataType, long bufferedBytes, Config config) {
        this(0L, dataType, bufferedBytes, config, UUID.randomUUID().toString());
    }

    public BufferedCollectionPartition(MessageType dataType, long bufferedBytes, Config config, String reference) {
        this(0L, dataType, bufferedBytes, config, reference);
    }

    protected abstract FileSystem getFileSystem(Config var1) throws IOException;

    protected abstract Path getRootPath(Config var1);

    private void loadFromFS() {
        try {
            FileStatus[] fileStatuses = this.fileSystem.listFiles(this.rootPath);
            this.filesList = Arrays.stream(fileStatuses).map(FileStatus::getPath).filter(p -> p.getName().contains(EXTENSION)).sorted(Comparator.comparingLong(path -> Long.parseLong(path.getName().replace(EXTENSION, "")))).collect(Collectors.toList());
            this.fileCounter = fileStatuses.length;
        }
        catch (IOException e) {
            throw new Twister2RuntimeException("Failed to load frames from file system", (Throwable)e);
        }
    }

    @Override
    public void add(T val) {
        if ((long)this.dataList.size() < this.maxFramesInMemory) {
            super.add(val);
        } else {
            LOG.info("Writing to disk...");
            byte[] bytes = this.dataType.getDataPacker().packToByteArray(val);
            this.buffers.add(bytes);
            this.bufferedBytes += (long)bytes.length;
            if (this.bufferedBytes > this.maxBufferedBytes) {
                this.flush();
            }
        }
    }

    @Override
    public void addAll(Collection<T> frames) {
        for (T frame : frames) {
            this.add(frame);
        }
    }

    @Override
    public DataPartitionConsumer<T> getConsumer() {
        final Iterator inMemoryIterator = this.dataList.iterator();
        final Iterator<Path> fileIterator = this.filesList.iterator();
        final Iterator<byte[]> buffersIterator = this.buffers.iterator();
        return new DataPartitionConsumer<T>(){
            private Queue<byte[]> bufferFromDisk = new LinkedList<byte[]>();

            public boolean hasNext() {
                return inMemoryIterator.hasNext() || fileIterator.hasNext() || buffersIterator.hasNext() || !this.bufferFromDisk.isEmpty();
            }

            public T next() {
                if (!this.bufferFromDisk.isEmpty()) {
                    return BufferedCollectionPartition.this.dataType.getDataPacker().unpackFromByteArray(this.bufferFromDisk.poll());
                }
                if (inMemoryIterator.hasNext()) {
                    return inMemoryIterator.next();
                }
                if (fileIterator.hasNext()) {
                    Path nextFile = (Path)fileIterator.next();
                    try {
                        DataInputStream reader = new DataInputStream((InputStream)BufferedCollectionPartition.this.fileSystem.open(nextFile));
                        long noOfFrames = reader.readLong();
                        for (long i = 0L; i < noOfFrames; ++i) {
                            int size = reader.readInt();
                            byte[] data = new byte[size];
                            reader.read(data);
                            this.bufferFromDisk.add(data);
                        }
                        return this.next();
                    }
                    catch (IOException e) {
                        throw new Twister2RuntimeException("Failed to read value from the temp file : " + nextFile.toString(), (Throwable)e);
                    }
                }
                if (buffersIterator.hasNext()) {
                    return BufferedCollectionPartition.this.dataType.getDataPacker().unpackFromByteArray((byte[])buffersIterator.next());
                }
                throw new Twister2RuntimeException("No more frames available in this partition");
            }
        };
    }

    public void dispose() {
        this.buffers = null;
        this.dataList = null;
    }

    @Override
    public void clear() {
        for (Path path : this.filesList) {
            try {
                this.fileSystem.delete(path, true);
            }
            catch (IOException e) {
                throw new Twister2RuntimeException("Failed to delete the temporary file : " + path.toString(), (Throwable)e);
            }
        }
        super.clear();
        this.filesList.clear();
        this.buffers.clear();
        this.bufferedBytes = 0L;
        this.fileCounter = 0L;
    }

    public void flush() {
        Path filePath = new Path(this.rootPath, this.fileCounter++ + EXTENSION);
        try (DataOutputStream outputStream = new DataOutputStream((OutputStream)this.fileSystem.create(filePath));){
            outputStream.writeLong(this.buffers.size());
            for (byte[] next : this.buffers) {
                outputStream.writeInt(next.length);
                outputStream.write(next);
            }
        }
        catch (IOException e) {
            throw new Twister2RuntimeException("Couldn't flush partitions to the disk", (Throwable)e);
        }
        this.filesList.add(filePath);
        this.buffers.clear();
        this.bufferedBytes = 0L;
    }

    public boolean hasIndexInMemory(int index) {
        return index < this.dataList.size();
    }

    public T get(int index) {
        if (index < this.dataList.size()) {
            return (T)this.dataList.get(index);
        }
        long currentSize = this.dataList.size();
        for (int fileIndex = 0; fileIndex < this.filesList.size(); ++fileIndex) {
            Path nextFile = this.filesList.get(fileIndex);
            try {
                DataInputStream reader = new DataInputStream((InputStream)this.fileSystem.open(nextFile));
                long noOfFrames = reader.readLong();
                if ((long)index < currentSize + noOfFrames) {
                    if (this.cachedFileIndex != fileIndex) {
                        this.cachedFileIndex = fileIndex;
                        this.currentFileCache = new ArrayList<byte[]>();
                        for (long i = 0L; i < noOfFrames; ++i) {
                            int size = reader.readInt();
                            byte[] data = new byte[size];
                            reader.read(data);
                            this.currentFileCache.add(data);
                        }
                    }
                    return (T)this.dataType.getDataPacker().unpackFromByteArray(this.currentFileCache.get((int)((long)(index - this.dataList.size()) - currentSize)));
                }
                currentSize += noOfFrames;
                continue;
            }
            catch (IOException ioex) {
                throw new Twister2RuntimeException("Failed to read from file : " + nextFile);
            }
        }
        return (T)this.dataType.getDataPacker().unpackFromByteArray(this.buffers.get((int)((long)index - currentSize)));
    }

    @Override
    public void close() {
        this.flush();
    }

    public String getReference() {
        return this.reference;
    }
}

