/*
 * Decompiled with CFR 0.152.
 */
package io.trino.spiller;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.airlift.slice.OutputStreamSliceOutput;
import io.airlift.slice.Slice;
import io.trino.execution.buffer.PageDeserializer;
import io.trino.execution.buffer.PageSerializer;
import io.trino.execution.buffer.PagesSerdeFactory;
import io.trino.execution.buffer.PagesSerdeUtil;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.operator.SpillContext;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.Page;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spiller.FileHolder;
import io.trino.spiller.SingleStreamSpiller;
import io.trino.spiller.SpillerStats;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import javax.annotation.concurrent.NotThreadSafe;
import javax.crypto.SecretKey;

@NotThreadSafe
public class FileSingleStreamSpiller
implements SingleStreamSpiller {
    @VisibleForTesting
    static final int BUFFER_SIZE = 4096;
    private final FileHolder targetFile;
    private final Closer closer = Closer.create();
    private final PagesSerdeFactory serdeFactory;
    private volatile Optional<SecretKey> encryptionKey;
    private final boolean encrypted;
    private final SpillerStats spillerStats;
    private final SpillContext localSpillContext;
    private final LocalMemoryContext memoryContext;
    private final ListeningExecutorService executor;
    private boolean writable = true;
    private long spilledPagesInMemorySize;
    private ListenableFuture<Void> spillInProgress = Futures.immediateVoidFuture();
    private final Runnable fileSystemErrorHandler;

    public FileSingleStreamSpiller(PagesSerdeFactory serdeFactory, Optional<SecretKey> encryptionKey, ListeningExecutorService executor, Path spillPath, SpillerStats spillerStats, SpillContext spillContext, LocalMemoryContext memoryContext, Runnable fileSystemErrorHandler) {
        this.serdeFactory = Objects.requireNonNull(serdeFactory, "serdeFactory is null");
        this.encryptionKey = Objects.requireNonNull(encryptionKey, "encryptionKey is null");
        this.encrypted = encryptionKey.isPresent();
        this.executor = Objects.requireNonNull(executor, "executor is null");
        this.spillerStats = Objects.requireNonNull(spillerStats, "spillerStats is null");
        this.localSpillContext = spillContext.newLocalSpillContext();
        this.memoryContext = Objects.requireNonNull(memoryContext, "memoryContext is null");
        this.memoryContext.setBytes(4096L);
        this.fileSystemErrorHandler = Objects.requireNonNull(fileSystemErrorHandler, "filesystemErrorHandler is null");
        try {
            this.targetFile = (FileHolder)this.closer.register((Closeable)new FileHolder(Files.createTempFile(spillPath, "spill", ".bin", new FileAttribute[0])));
        }
        catch (IOException e) {
            this.fileSystemErrorHandler.run();
            throw new TrinoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "Failed to create spill file", (Throwable)e);
        }
    }

    @Override
    public ListenableFuture<Void> spill(Iterator<Page> pageIterator) {
        Objects.requireNonNull(pageIterator, "pageIterator is null");
        this.checkNoSpillInProgress();
        this.spillInProgress = Futures.submit(() -> this.writePages(pageIterator), (Executor)this.executor);
        return this.spillInProgress;
    }

    @Override
    public long getSpilledPagesInMemorySize() {
        return this.spilledPagesInMemorySize;
    }

    @Override
    public Iterator<Page> getSpilledPages() {
        this.checkNoSpillInProgress();
        return this.readPages();
    }

    @Override
    public ListenableFuture<List<Page>> getAllSpilledPages() {
        return this.executor.submit(() -> ImmutableList.copyOf(this.getSpilledPages()));
    }

    private void writePages(Iterator<Page> pageIterator) {
        Preconditions.checkState((boolean)this.writable, (Object)"Spilling no longer allowed. The spiller has been made non-writable on first read for subsequent reads to be consistent");
        Optional<SecretKey> encryptionKey = this.encryptionKey;
        Preconditions.checkState((this.encrypted == encryptionKey.isPresent() ? 1 : 0) != 0, (Object)"encryptionKey has been discarded");
        PageSerializer serializer = this.serdeFactory.createSerializer(encryptionKey);
        try (OutputStreamSliceOutput output = new OutputStreamSliceOutput(this.targetFile.newOutputStream(StandardOpenOption.APPEND), 4096);){
            while (pageIterator.hasNext()) {
                Page page = pageIterator.next();
                this.spilledPagesInMemorySize += page.getSizeInBytes();
                Slice serializedPage = serializer.serialize(page);
                long pageSize = serializedPage.length();
                this.localSpillContext.updateBytes(pageSize);
                this.spillerStats.addToTotalSpilledBytes(pageSize);
                output.writeBytes(serializedPage);
            }
        }
        catch (IOException | UncheckedIOException e) {
            this.fileSystemErrorHandler.run();
            throw new TrinoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "Failed to spill pages", (Throwable)e);
        }
    }

    private Iterator<Page> readPages() {
        Preconditions.checkState((boolean)this.writable, (Object)"Repeated reads are disallowed to prevent potential resource leaks");
        this.writable = false;
        try {
            Optional<SecretKey> encryptionKey = this.encryptionKey;
            Preconditions.checkState((this.encrypted == encryptionKey.isPresent() ? 1 : 0) != 0, (Object)"encryptionKey has been discarded");
            PageDeserializer deserializer = this.serdeFactory.createDeserializer(encryptionKey);
            this.encryptionKey = Optional.empty();
            InputStream input = (InputStream)this.closer.register((Closeable)this.targetFile.newInputStream(new OpenOption[0]));
            Iterator<Page> pages = PagesSerdeUtil.readPages(deserializer, input);
            return FileSingleStreamSpiller.closeWhenExhausted(pages, input);
        }
        catch (IOException e) {
            this.fileSystemErrorHandler.run();
            throw new TrinoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "Failed to read spilled pages", (Throwable)e);
        }
    }

    @Override
    public void close() {
        this.encryptionKey = Optional.empty();
        this.closer.register((Closeable)this.localSpillContext);
        this.closer.register(() -> this.memoryContext.setBytes(0L));
        try {
            this.closer.close();
        }
        catch (IOException e) {
            this.fileSystemErrorHandler.run();
            throw new TrinoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "Failed to close spiller", (Throwable)e);
        }
    }

    private void checkNoSpillInProgress() {
        Preconditions.checkState((boolean)this.spillInProgress.isDone(), (Object)"spill in progress");
    }

    private static <T> Iterator<T> closeWhenExhausted(final Iterator<T> iterator, final Closeable resource) {
        Objects.requireNonNull(iterator, "iterator is null");
        Objects.requireNonNull(resource, "resource is null");
        return new AbstractIterator<T>(){

            protected T computeNext() {
                if (iterator.hasNext()) {
                    return iterator.next();
                }
                try {
                    resource.close();
                }
                catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
                return this.endOfData();
            }
        };
    }
}

