/*
 * Decompiled with CFR 0.152.
 */
package com.milaboratory.util;

import cc.redberry.pipe.CUtils;
import cc.redberry.pipe.OutputPort;
import cc.redberry.pipe.OutputPortCloseable;
import cc.redberry.pipe.blocks.Merger;
import cc.redberry.pipe.util.Chunk;
import com.milaboratory.util.ObjectSerializer;
import gnu.trove.list.array.TLongArrayList;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Comparator;
import java.util.PriorityQueue;
import org.apache.commons.io.output.CloseShieldOutputStream;
import org.apache.commons.io.output.CountingOutputStream;

public final class Sorter<T> {
    private final OutputPort<T> initialSource;
    private final Comparator<T> comparator;
    private final int chunkSize;
    private final ObjectSerializer<T> serializer;
    private final File tempFile;
    private final TLongArrayList chunkOffsets = new TLongArrayList();
    private int lastChunkSize = -1;
    private long memoryBudget = -1L;

    public Sorter(OutputPort<T> initialSource, Comparator<T> comparator, int chunkSize, ObjectSerializer<T> serializer, File tempFile) {
        this.initialSource = initialSource;
        this.comparator = comparator;
        this.chunkSize = chunkSize;
        this.serializer = serializer;
        this.tempFile = tempFile;
    }

    public static <T> OutputPortCloseable<T> sort(OutputPort<T> initialSource, Comparator<T> comparator, int chunkSize, Class<T> clazz, File tempFile) throws IOException {
        return Sorter.sort(initialSource, comparator, chunkSize, new ObjectSerializer.PrimitivIOObjectSerializer<T>(clazz), tempFile);
    }

    public static <T> OutputPortCloseable<T> sort(OutputPort<T> initialSource, Comparator<T> comparator, int chunkSize, ObjectSerializer<T> serializer, File tempFile) throws IOException {
        Sorter<T> sorter = new Sorter<T>(initialSource, comparator, chunkSize, serializer, tempFile);
        sorter.build();
        return sorter.getSorted();
    }

    public void build() throws IOException {
        try (CountingOutputStream output = new CountingOutputStream((OutputStream)new BufferedOutputStream(new FileOutputStream(this.tempFile), 0x100000));){
            Chunk chunk;
            Merger chunked = CUtils.buffered((OutputPort)CUtils.chunked(this.initialSource, (int)this.chunkSize), (int)1);
            long maxBlockSize = 0L;
            long previousPosition = 0L;
            while ((chunk = (Chunk)chunked.take()) != null) {
                Object[] data = chunk.toArray();
                Arrays.sort(data, this.comparator);
                maxBlockSize = Math.max(maxBlockSize, output.getByteCount() - previousPosition);
                previousPosition = output.getByteCount();
                this.chunkOffsets.add(output.getByteCount());
                this.serializer.write(Arrays.asList(data), (OutputStream)new CloseShieldOutputStream((OutputStream)output));
                this.lastChunkSize = data.length;
            }
            this.memoryBudget = maxBlockSize;
        }
    }

    public OutputPortCloseable<T> getSorted() throws IOException {
        if (this.lastChunkSize == -1) {
            throw new IllegalStateException();
        }
        return new MergeSortingPort();
    }

    private final class SortedBlockReader
    implements Comparable<SortedBlockReader>,
    AutoCloseable,
    Closeable {
        final DataInputStream input;
        final int chunkSize;
        private int position = 0;
        private final OutputPort<T> port;
        private T current = null;

        public SortedBlockReader(File file, long chunkOffset, int chunkSize, int bufferSize) throws IOException {
            this.chunkSize = chunkSize;
            FileInputStream fo = new FileInputStream(file);
            fo.getChannel().position(chunkOffset);
            this.input = new DataInputStream(new BufferedInputStream(fo, bufferSize));
            this.port = Sorter.this.serializer.read(this.input);
        }

        public void advance() throws IOException {
            if (this.position == this.chunkSize) {
                this.current = null;
            } else {
                ++this.position;
                this.current = this.port.take();
            }
        }

        public T current() {
            return this.current;
        }

        @Override
        public void close() throws IOException {
            this.input.close();
        }

        @Override
        public int compareTo(SortedBlockReader o) {
            return Sorter.this.comparator.compare(this.current, o.current);
        }
    }

    private final class MergeSortingPort
    implements OutputPortCloseable<T> {
        final PriorityQueue<SortedBlockReader> queue = new PriorityQueue();
        private boolean closed = false;

        public MergeSortingPort() throws IOException {
            int bufferSize = (int)Math.min(Math.max(1024L, Sorter.this.memoryBudget / (long)Sorter.this.chunkOffsets.size()), Integer.MAX_VALUE);
            for (int i = 0; i < Sorter.this.chunkOffsets.size(); ++i) {
                SortedBlockReader block = new SortedBlockReader(Sorter.this.tempFile, Sorter.this.chunkOffsets.get(i), i == Sorter.this.chunkOffsets.size() - 1 ? Sorter.this.lastChunkSize : Sorter.this.chunkSize, bufferSize);
                block.advance();
                this.queue.add(block);
            }
        }

        public synchronized T take() {
            if (this.queue.isEmpty()) {
                return null;
            }
            SortedBlockReader head = this.queue.poll();
            Object current = head.current();
            try {
                head.advance();
                if (head.current() != null) {
                    this.queue.add(head);
                } else {
                    head.close();
                }
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            return current;
        }

        public synchronized void close() {
            if (this.closed) {
                return;
            }
            for (SortedBlockReader block : this.queue) {
                try {
                    block.close();
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            Sorter.this.tempFile.delete();
            this.closed = true;
        }
    }
}

