/*
 * Decompiled with CFR 0.152.
 */
package io.activej.datastream.processor;

import io.activej.async.AsyncAccumulator;
import io.activej.common.initializer.WithInitializer;
import io.activej.datastream.AbstractStreamConsumer;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.processor.StreamReducer;
import io.activej.datastream.processor.StreamReducers;
import io.activej.datastream.processor.StreamSorterStorage;
import io.activej.datastream.processor.StreamTransformer;
import io.activej.promise.Promise;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class StreamSorter<K, T>
implements StreamTransformer<T, T>,
WithInitializer<StreamSorter<K, T>> {
    private static final Logger logger = LoggerFactory.getLogger(StreamSorter.class);
    private final AsyncAccumulator<? extends List<Integer>> temporaryStreamsAccumulator;
    private final StreamSorterStorage<T> storage;
    private final Function<T, K> keyFunction;
    private final Comparator<K> keyComparator;
    private final Comparator<T> itemComparator;
    private final boolean distinct;
    private final int itemsInMemory;
    private final Input input;
    private final StreamSupplier<T> output;
    private Executor sortingExecutor = Runnable::run;

    private StreamSorter(StreamSorterStorage<T> storage, Function<T, K> keyFunction, Comparator<K> keyComparator, boolean deduplicate, int itemsInMemory) {
        this.storage = storage;
        this.keyFunction = keyFunction;
        this.keyComparator = keyComparator;
        this.itemComparator = (item1, item2) -> {
            Object key1 = keyFunction.apply(item1);
            Object key2 = keyFunction.apply(item2);
            return keyComparator.compare(key1, key2);
        };
        this.distinct = deduplicate;
        this.itemsInMemory = itemsInMemory;
        ArrayList partitionIds = new ArrayList();
        this.input = new Input(partitionIds);
        this.temporaryStreamsAccumulator = AsyncAccumulator.create(partitionIds);
        this.output = StreamSupplier.ofPromise(this.temporaryStreamsAccumulator.get().then(streamIds -> {
            ArrayList sortedList = this.input.list;
            this.input.list = null;
            return Promise.ofBlocking((Executor)this.sortingExecutor, () -> sortedList.sort(this.itemComparator)).map($ -> {
                StreamSupplier listSupplier = StreamSupplier.ofIterator(deduplicate ? new DistinctIterator(sortedList, keyFunction, keyComparator) : sortedList.iterator());
                logger.info("Items in memory: {}, files: {}", (Object)sortedList.size(), (Object)streamIds.size());
                if (streamIds.isEmpty()) {
                    return listSupplier;
                }
                StreamReducer streamMerger = StreamReducer.create(keyComparator);
                listSupplier.streamTo(streamMerger.newInput(keyFunction, deduplicate ? StreamReducers.deduplicateReducer() : StreamReducers.mergeReducer()));
                for (Integer streamId : streamIds) {
                    StreamSupplier.ofPromise(storage.read(streamId)).streamTo(streamMerger.newInput(keyFunction, deduplicate ? StreamReducers.deduplicateReducer() : StreamReducers.mergeReducer()));
                }
                return streamMerger.getOutput();
            });
        }));
    }

    public StreamSorter<K, T> withSortingExecutor(Executor executor) {
        this.sortingExecutor = executor;
        return this;
    }

    public static <K, T> StreamSorter<K, T> create(StreamSorterStorage<T> storage, Function<T, K> keyFunction, Comparator<K> keyComparator, boolean distinct, int itemsInMemorySize) {
        return new StreamSorter<K, T>(storage, keyFunction, keyComparator, distinct, itemsInMemorySize);
    }

    @Override
    public StreamConsumer<T> getInput() {
        return this.input;
    }

    @Override
    public StreamSupplier<T> getOutput() {
        return this.output;
    }

    private final class Input
    extends AbstractStreamConsumer<T>
    implements StreamDataAcceptor<T> {
        private final List<Integer> partitionIds;
        private ArrayList<T> list = new ArrayList();
        private Promise<Void> cleanupPromise;

        private Input(List<Integer> partitionIds) {
            this.partitionIds = partitionIds;
        }

        @Override
        protected void onStarted() {
            this.resume(this);
        }

        @Override
        public void accept(T item) {
            this.list.add(item);
            if (this.list.size() < StreamSorter.this.itemsInMemory) {
                return;
            }
            ArrayList sortedList = this.list;
            this.list = new ArrayList(StreamSorter.this.itemsInMemory);
            StreamSorter.this.temporaryStreamsAccumulator.addPromise(Promise.ofBlocking((Executor)StreamSorter.this.sortingExecutor, () -> sortedList.sort(StreamSorter.this.itemComparator)).then($ -> {
                Iterator<Object> iterator = StreamSorter.this.distinct ? new DistinctIterator(sortedList, StreamSorter.this.keyFunction, StreamSorter.this.keyComparator) : sortedList.iterator();
                return StreamSorter.this.storage.newPartitionId().then(partitionId -> StreamSorter.this.storage.write((int)partitionId).then(consumer -> StreamSupplier.ofIterator(iterator).streamTo(consumer)).map($2 -> partitionId));
            }).whenResult(this::suspendOrResume).whenException(this::closeEx), List::add);
            this.suspendOrResume();
        }

        private void suspendOrResume() {
            if (StreamSorter.this.temporaryStreamsAccumulator.getActivePromises() > 2) {
                this.suspend();
            } else {
                this.resume(this);
            }
        }

        @Override
        protected void onEndOfStream() {
            StreamSorter.this.temporaryStreamsAccumulator.run();
            StreamSorter.this.output.getAcknowledgement().then((ackRes, e) -> this.cleanup().then(($, e1) -> Promise.of((Object)ackRes, (Exception)e))).whenResult(this::acknowledge).whenException(this::closeEx);
        }

        @Override
        protected void onError(Exception e) {
            StreamSorter.this.temporaryStreamsAccumulator.closeEx(e);
        }

        @Override
        protected void onCleanup() {
            this.list = null;
            this.cleanup();
        }

        private Promise<Void> cleanup() {
            if (this.cleanupPromise != null) {
                return this.cleanupPromise;
            }
            this.cleanupPromise = this.partitionIds.isEmpty() ? Promise.complete() : StreamSorter.this.storage.cleanup(this.partitionIds);
            return this.cleanupPromise;
        }
    }

    private static final class DistinctIterator<K, T>
    implements Iterator<T> {
        private final ArrayList<T> sortedList;
        private final Function<T, K> keyFunction;
        private final Comparator<K> keyComparator;
        int i = 0;

        private DistinctIterator(ArrayList<T> sortedList, Function<T, K> keyFunction, Comparator<K> keyComparator) {
            this.sortedList = sortedList;
            this.keyFunction = keyFunction;
            this.keyComparator = keyComparator;
        }

        @Override
        public boolean hasNext() {
            return this.i < this.sortedList.size();
        }

        @Override
        public T next() {
            T next = this.sortedList.get(this.i++);
            K nextKey = this.keyFunction.apply(next);
            while (this.i < this.sortedList.size() && this.keyComparator.compare(nextKey, this.keyFunction.apply(this.sortedList.get(this.i))) == 0) {
                ++this.i;
            }
            return next;
        }
    }
}

