/*
 * Decompiled with CFR 0.152.
 */
package io.activej.datastream.processor;

import io.activej.common.Checks;
import io.activej.common.MemSize;
import io.activej.common.initializer.WithInitializer;
import io.activej.csp.ChannelSupplier;
import io.activej.csp.dsl.ChannelSupplierTransformer;
import io.activej.csp.file.ChannelFileReader;
import io.activej.csp.file.ChannelFileWriter;
import io.activej.csp.process.ChannelByteChunker;
import io.activej.csp.process.frames.ChannelFrameDecoder;
import io.activej.csp.process.frames.ChannelFrameEncoder;
import io.activej.csp.process.frames.FrameFormat;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.csp.ChannelDeserializer;
import io.activej.datastream.csp.ChannelSerializer;
import io.activej.datastream.processor.StreamSorterStorage;
import io.activej.promise.Promise;
import io.activej.serializer.BinarySerializer;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class StreamSorterStorageImpl<T>
implements StreamSorterStorage<T>,
WithInitializer<StreamSorterStorageImpl<T>> {
    private static final Logger logger = LoggerFactory.getLogger(StreamSorterStorageImpl.class);
    public static final String DEFAULT_FILE_PATTERN = "%d";
    public static final MemSize DEFAULT_SORTER_BLOCK_SIZE = MemSize.kilobytes((long)256L);
    private static final AtomicInteger PARTITION = new AtomicInteger();
    private final Executor executor;
    private final BinarySerializer<T> serializer;
    private final FrameFormat frameFormat;
    private final Path path;
    private String filePattern = "%d";
    private MemSize readBlockSize = ChannelSerializer.DEFAULT_INITIAL_BUFFER_SIZE;
    private MemSize writeBlockSize = DEFAULT_SORTER_BLOCK_SIZE;

    private StreamSorterStorageImpl(Executor executor, BinarySerializer<T> serializer, FrameFormat frameFormat, Path path) {
        this.executor = executor;
        this.serializer = serializer;
        this.frameFormat = frameFormat;
        this.path = path;
    }

    public static <T> StreamSorterStorageImpl<T> create(Executor executor, BinarySerializer<T> serializer, FrameFormat frameFormat, Path path) {
        Checks.checkArgument((!path.getFileName().toString().contains(DEFAULT_FILE_PATTERN) ? 1 : 0) != 0, (Object)"Filename should not contain '%d'");
        try {
            Files.createDirectories(path, new FileAttribute[0]);
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        return new StreamSorterStorageImpl<T>(executor, serializer, frameFormat, path);
    }

    public StreamSorterStorageImpl<T> withFilePattern(String filePattern) {
        Checks.checkArgument((!filePattern.contains(DEFAULT_FILE_PATTERN) ? 1 : 0) != 0, (Object)"File pattern should not contain '%d'");
        this.filePattern = filePattern;
        return this;
    }

    public StreamSorterStorageImpl<T> withReadBlockSize(MemSize readBlockSize) {
        this.readBlockSize = readBlockSize;
        return this;
    }

    public StreamSorterStorageImpl<T> withWriteBlockSize(MemSize writeBlockSize) {
        this.writeBlockSize = writeBlockSize;
        return this;
    }

    private Path partitionPath(int i) {
        return this.path.resolve(String.format(this.filePattern, i));
    }

    @Override
    public Promise<Integer> newPartitionId() {
        return Promise.of((Object)PARTITION.incrementAndGet());
    }

    @Override
    public Promise<StreamConsumer<T>> write(int partition) {
        Path path = this.partitionPath(partition);
        return Promise.of(StreamConsumer.ofSupplier(supplier -> ((ChannelSupplier)((ChannelSupplier)((ChannelSupplier)((ChannelSupplier)supplier.transformWith(ChannelSerializer.create(this.serializer).withInitialBufferSize(this.readBlockSize))).transformWith((ChannelSupplierTransformer)ChannelByteChunker.create((MemSize)this.writeBlockSize.map(bytes -> bytes / 2L), (MemSize)this.writeBlockSize))).transformWith((ChannelSupplierTransformer)ChannelFrameEncoder.create((FrameFormat)this.frameFormat))).transformWith((ChannelSupplierTransformer)ChannelByteChunker.create((MemSize)this.writeBlockSize.map(bytes -> bytes / 2L), (MemSize)this.writeBlockSize))).streamTo(ChannelFileWriter.open((Executor)this.executor, (Path)path))));
    }

    @Override
    public Promise<StreamSupplier<T>> read(int partition) {
        Path path = this.partitionPath(partition);
        return ChannelFileReader.open((Executor)this.executor, (Path)path).map(file -> (StreamSupplier)((ChannelSupplier)file.transformWith((ChannelSupplierTransformer)ChannelFrameDecoder.create((FrameFormat)this.frameFormat))).transformWith(ChannelDeserializer.create(this.serializer)));
    }

    @Override
    public Promise<Void> cleanup(List<Integer> partitionsToDelete) {
        return Promise.ofBlocking((Executor)this.executor, () -> {
            for (Integer partitionToDelete : partitionsToDelete) {
                Path path = this.partitionPath(partitionToDelete);
                try {
                    Files.delete(path);
                }
                catch (IOException e) {
                    logger.warn("Could not delete {}", (Object)path, (Object)e);
                }
            }
        });
    }
}

