/*
 * Decompiled with CFR 0.152.
 */
package io.trino.plugin.exchange.filesystem;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.concurrent.MoreFutures;
import io.airlift.slice.SizeOf;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceOutput;
import io.airlift.slice.Slices;
import io.airlift.units.DataSize;
import io.trino.plugin.exchange.filesystem.ExchangeStorageWriter;
import io.trino.plugin.exchange.filesystem.FileSystemExchangeStats;
import io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.exchange.ExchangeSink;
import io.trino.spi.exchange.ExchangeSinkInstanceHandle;
import java.net.URI;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;

@ThreadSafe
public class FileSystemExchangeSink
implements ExchangeSink {
    public static final String COMMITTED_MARKER_FILE_NAME = "committed";
    public static final String DATA_FILE_SUFFIX = ".data";
    private static final int INSTANCE_SIZE = SizeOf.instanceSize(FileSystemExchangeSink.class);
    private final FileSystemExchangeStorage exchangeStorage;
    private final FileSystemExchangeStats stats;
    private final URI outputDirectory;
    private final int outputPartitionCount;
    private final boolean preserveOrderWithinPartition;
    private final int maxPageStorageSizeInBytes;
    private final long maxFileSizeInBytes;
    private final BufferPool bufferPool;
    private final Map<Integer, BufferedStorageWriter> writersMap = new ConcurrentHashMap<Integer, BufferedStorageWriter>();
    private final AtomicReference<Throwable> failure = new AtomicReference();
    private volatile boolean closed;

    public FileSystemExchangeSink(FileSystemExchangeStorage exchangeStorage, FileSystemExchangeStats stats, URI outputDirectory, int outputPartitionCount, boolean preserveOrderWithinPartition, int maxPageStorageSizeInBytes, int exchangeSinkBufferPoolMinSize, int exchangeSinkBuffersPerPartition, long maxFileSizeInBytes) {
        Preconditions.checkArgument(((long)maxPageStorageSizeInBytes <= maxFileSizeInBytes ? 1 : 0) != 0, (Object)String.format("maxPageStorageSizeInBytes %s exceeded maxFileSizeInBytes %s", DataSize.succinctBytes((long)maxPageStorageSizeInBytes), DataSize.succinctBytes((long)maxFileSizeInBytes)));
        this.exchangeStorage = Objects.requireNonNull(exchangeStorage, "exchangeStorage is null");
        this.stats = Objects.requireNonNull(stats, "stats is null");
        this.outputDirectory = Objects.requireNonNull(outputDirectory, "outputDirectory is null");
        this.outputPartitionCount = outputPartitionCount;
        this.preserveOrderWithinPartition = preserveOrderWithinPartition;
        this.maxPageStorageSizeInBytes = maxPageStorageSizeInBytes;
        this.maxFileSizeInBytes = maxFileSizeInBytes;
        this.bufferPool = new BufferPool(stats, Math.max(outputPartitionCount * exchangeSinkBuffersPerPartition, exchangeSinkBufferPoolMinSize), exchangeStorage.getWriteBufferSize());
    }

    public boolean isHandleUpdateRequired() {
        return false;
    }

    public void updateHandle(ExchangeSinkInstanceHandle handle) {
        throw new UnsupportedOperationException();
    }

    public CompletableFuture<Void> isBlocked() {
        return this.bufferPool.isBlocked();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void add(int partitionId, Slice data) {
        BufferedStorageWriter writer;
        this.throwIfFailed();
        Preconditions.checkArgument((partitionId < this.outputPartitionCount ? 1 : 0) != 0, (String)"partition id is expected to be less than %s: %s", (int)this.outputPartitionCount, (int)partitionId);
        FileSystemExchangeSink fileSystemExchangeSink = this;
        synchronized (fileSystemExchangeSink) {
            if (this.closed) {
                return;
            }
            writer = this.writersMap.computeIfAbsent(partitionId, this::createWriter);
        }
        writer.write(data);
    }

    private BufferedStorageWriter createWriter(int partitionId) {
        return new BufferedStorageWriter(this.exchangeStorage, this.stats, this.outputDirectory, this.preserveOrderWithinPartition, partitionId, this.bufferPool, this.failure, this.maxPageStorageSizeInBytes, this.maxFileSizeInBytes);
    }

    public long getMemoryUsage() {
        return (long)INSTANCE_SIZE + this.bufferPool.getRetainedSize() + SizeOf.estimatedSizeOf(this.writersMap, SizeOf::sizeOf, BufferedStorageWriter::getRetainedSize);
    }

    public synchronized CompletableFuture<Void> finish() {
        if (this.closed) {
            return CompletableFuture.failedFuture(new IllegalStateException("Exchange sink has already closed"));
        }
        ListenableFuture finishFuture = MoreFutures.asVoid((ListenableFuture)Futures.allAsList((Iterable)((Iterable)this.writersMap.values().stream().map(BufferedStorageWriter::finish).collect(ImmutableList.toImmutableList()))));
        MoreFutures.addSuccessCallback((ListenableFuture)finishFuture, this::destroy);
        finishFuture = Futures.transformAsync((ListenableFuture)finishFuture, ignored -> this.exchangeStorage.createEmptyFile(this.outputDirectory.resolve(COMMITTED_MARKER_FILE_NAME)), (Executor)MoreExecutors.directExecutor());
        Futures.addCallback((ListenableFuture)finishFuture, (FutureCallback)new FutureCallback<Void>(){

            public void onSuccess(Void result) {
                FileSystemExchangeSink.this.closed = true;
            }

            public void onFailure(Throwable ignored) {
                FileSystemExchangeSink.this.abort();
            }
        }, (Executor)MoreExecutors.directExecutor());
        return this.stats.getExchangeSinkFinish().record(MoreFutures.toCompletableFuture((ListenableFuture)finishFuture));
    }

    public synchronized CompletableFuture<Void> abort() {
        if (this.closed) {
            return CompletableFuture.completedFuture(null);
        }
        this.closed = true;
        ListenableFuture abortFuture = MoreFutures.asVoid((ListenableFuture)Futures.allAsList((Iterable)((Iterable)this.writersMap.values().stream().map(BufferedStorageWriter::abort).collect(ImmutableList.toImmutableList()))));
        MoreFutures.addSuccessCallback((ListenableFuture)abortFuture, this::destroy);
        return this.stats.getExchangeSinkAbort().record(MoreFutures.toCompletableFuture((ListenableFuture)Futures.transformAsync((ListenableFuture)abortFuture, ignored -> this.exchangeStorage.deleteRecursively((List<URI>)ImmutableList.of((Object)this.outputDirectory)), (Executor)MoreExecutors.directExecutor())));
    }

    private void throwIfFailed() {
        Throwable throwable = this.failure.get();
        if (throwable != null) {
            Throwables.throwIfUnchecked((Throwable)throwable);
            throw new RuntimeException(throwable);
        }
    }

    private void destroy() {
        this.writersMap.clear();
        this.bufferPool.close();
    }

    @ThreadSafe
    private static class BufferPool {
        private static final int INSTANCE_SIZE = SizeOf.instanceSize(BufferPool.class);
        private final FileSystemExchangeStats stats;
        private final int maxNumBuffers;
        private final int writeBufferSize;
        private final long bufferRetainedSize;
        @GuardedBy(value="this")
        private final Queue<SliceOutput> freeBuffersQueue;
        @GuardedBy(value="this")
        private CompletableFuture<Void> blockedFuture = new CompletableFuture();
        @GuardedBy(value="this")
        private boolean closed;
        @GuardedBy(value="this")
        private int numBuffersCreated;

        public BufferPool(FileSystemExchangeStats stats, int maxNumBuffers, int writeBufferSize) {
            this.stats = Objects.requireNonNull(stats, "stats is null");
            Preconditions.checkArgument((maxNumBuffers >= 1 ? 1 : 0) != 0, (Object)"maxNumBuffers must be at least one");
            this.maxNumBuffers = maxNumBuffers;
            this.writeBufferSize = writeBufferSize;
            this.numBuffersCreated = 1;
            this.freeBuffersQueue = new ArrayDeque<SliceOutput>(maxNumBuffers);
            this.freeBuffersQueue.add(Slices.allocate((int)writeBufferSize).getOutput());
            this.bufferRetainedSize = this.freeBuffersQueue.peek().getRetainedSize();
        }

        public synchronized CompletableFuture<Void> isBlocked() {
            if (!this.hasFreeBuffers()) {
                if (this.blockedFuture.isDone()) {
                    this.blockedFuture = new CompletableFuture();
                    this.stats.getExchangeSinkBlocked().record(this.blockedFuture);
                }
                return this.blockedFuture;
            }
            return ExchangeSink.NOT_BLOCKED;
        }

        public synchronized SliceOutput take() {
            while (!this.closed) {
                if (this.hasFreeBuffers()) {
                    return this.freeBuffersQueue.poll();
                }
                try {
                    this.wait();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
            }
            return null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void offer(SliceOutput buffer) {
            CompletableFuture<Void> completableFuture;
            buffer.reset();
            BufferPool bufferPool = this;
            synchronized (bufferPool) {
                if (this.closed) {
                    return;
                }
                completableFuture = this.blockedFuture;
                this.freeBuffersQueue.add(buffer);
                this.notify();
            }
            completableFuture.complete(null);
        }

        public synchronized long getRetainedSize() {
            if (this.closed) {
                return INSTANCE_SIZE;
            }
            return (long)INSTANCE_SIZE + (long)this.numBuffersCreated * this.bufferRetainedSize;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void close() {
            CompletableFuture<Void> completableFuture;
            BufferPool bufferPool = this;
            synchronized (bufferPool) {
                if (this.closed) {
                    return;
                }
                this.closed = true;
                this.notifyAll();
                completableFuture = this.blockedFuture;
                this.freeBuffersQueue.clear();
            }
            completableFuture.complete(null);
        }

        @GuardedBy(value="this")
        private boolean hasFreeBuffers() {
            if (!this.freeBuffersQueue.isEmpty()) {
                return true;
            }
            if (this.numBuffersCreated < this.maxNumBuffers) {
                this.freeBuffersQueue.add(Slices.allocate((int)this.writeBufferSize).getOutput());
                ++this.numBuffersCreated;
                return true;
            }
            return false;
        }
    }

    @ThreadSafe
    private static class BufferedStorageWriter {
        private static final int INSTANCE_SIZE = SizeOf.instanceSize(BufferedStorageWriter.class);
        private final FileSystemExchangeStorage exchangeStorage;
        private final FileSystemExchangeStats stats;
        private final URI outputDirectory;
        private final boolean preserveOrderWithinPartition;
        private final int partitionId;
        private final BufferPool bufferPool;
        private final AtomicReference<Throwable> failure;
        private final int maxPageStorageSizeInBytes;
        private final long maxFileSizeInBytes;
        @GuardedBy(value="this")
        private ExchangeStorageWriter currentWriter;
        @GuardedBy(value="this")
        private long currentFileSize;
        @GuardedBy(value="this")
        private SliceOutput currentBuffer;
        @GuardedBy(value="this")
        private final List<ExchangeStorageWriter> writers = new ArrayList<ExchangeStorageWriter>();
        @GuardedBy(value="this")
        private boolean closed;

        public BufferedStorageWriter(FileSystemExchangeStorage exchangeStorage, FileSystemExchangeStats stats, URI outputDirectory, boolean preserveOrderWithinPartition, int partitionId, BufferPool bufferPool, AtomicReference<Throwable> failure, int maxPageStorageSizeInBytes, long maxFileSizeInBytes) {
            this.exchangeStorage = Objects.requireNonNull(exchangeStorage, "exchangeStorage is null");
            this.stats = Objects.requireNonNull(stats, "stats is null");
            this.outputDirectory = Objects.requireNonNull(outputDirectory, "outputDirectory is null");
            this.preserveOrderWithinPartition = preserveOrderWithinPartition;
            this.partitionId = partitionId;
            this.bufferPool = Objects.requireNonNull(bufferPool, "bufferPool is null");
            this.failure = Objects.requireNonNull(failure, "failure is null");
            this.maxPageStorageSizeInBytes = maxPageStorageSizeInBytes;
            this.maxFileSizeInBytes = maxFileSizeInBytes;
            this.setupWriterForNextPart();
        }

        public synchronized void write(Slice data) {
            if (this.closed) {
                return;
            }
            int requiredPageStorageSize = 4 + data.length();
            if (requiredPageStorageSize > this.maxPageStorageSizeInBytes) {
                throw new TrinoException((ErrorCodeSupplier)StandardErrorCode.NOT_SUPPORTED, String.format("Max row size of %s exceeded: %s", DataSize.succinctBytes((long)this.maxPageStorageSizeInBytes), DataSize.succinctBytes((long)requiredPageStorageSize)));
            }
            if (this.currentFileSize + (long)requiredPageStorageSize > this.maxFileSizeInBytes && !this.preserveOrderWithinPartition) {
                this.stats.getFileSizeInBytes().add(this.currentFileSize);
                this.flushIfNeeded(true);
                this.setupWriterForNextPart();
                this.currentFileSize = 0L;
                this.currentBuffer = null;
            }
            Slice sizeSlice = Slices.allocate((int)4);
            sizeSlice.setInt(0, data.length());
            this.writeInternal(sizeSlice);
            this.writeInternal(data);
            this.currentFileSize += (long)requiredPageStorageSize;
        }

        public synchronized ListenableFuture<Void> finish() {
            if (this.closed) {
                return Futures.immediateFailedFuture((Throwable)new IllegalStateException("BufferedStorageWriter has already closed"));
            }
            this.stats.getFileSizeInBytes().add(this.currentFileSize);
            this.flushIfNeeded(true);
            if (this.writers.size() == 1) {
                return this.currentWriter.finish();
            }
            return MoreFutures.asVoid((ListenableFuture)Futures.allAsList((Iterable)((Iterable)this.writers.stream().map(ExchangeStorageWriter::finish).collect(ImmutableList.toImmutableList()))));
        }

        public synchronized ListenableFuture<Void> abort() {
            if (this.closed) {
                return Futures.immediateVoidFuture();
            }
            this.closed = true;
            if (this.writers.size() == 1) {
                return this.currentWriter.abort();
            }
            return MoreFutures.asVoid((ListenableFuture)Futures.allAsList((Iterable)((Iterable)this.writers.stream().map(ExchangeStorageWriter::abort).collect(ImmutableList.toImmutableList()))));
        }

        public synchronized long getRetainedSize() {
            return (long)INSTANCE_SIZE + SizeOf.estimatedSizeOf(this.writers, ExchangeStorageWriter::getRetainedSize);
        }

        @GuardedBy(value="this")
        private void setupWriterForNextPart() {
            this.currentWriter = this.exchangeStorage.createExchangeStorageWriter(this.outputDirectory.resolve(this.partitionId + "_" + this.writers.size() + FileSystemExchangeSink.DATA_FILE_SUFFIX));
            this.writers.add(this.currentWriter);
        }

        @GuardedBy(value="this")
        private void writeInternal(Slice slice) {
            int writableBytes;
            for (int position = 0; position < slice.length(); position += writableBytes) {
                if (this.currentBuffer == null) {
                    this.currentBuffer = this.bufferPool.take();
                    if (this.currentBuffer == null) {
                        return;
                    }
                }
                writableBytes = Math.min(this.currentBuffer.writableBytes(), slice.length() - position);
                this.currentBuffer.writeBytes(slice.getBytes(position, writableBytes));
                this.flushIfNeeded(false);
            }
        }

        @GuardedBy(value="this")
        private void flushIfNeeded(boolean finished) {
            SliceOutput buffer = this.currentBuffer;
            if (buffer != null && (!buffer.isWritable() || finished)) {
                if (!buffer.isWritable()) {
                    this.currentBuffer = null;
                }
                ListenableFuture<Void> writeFuture = this.currentWriter.write(buffer.slice());
                writeFuture.addListener(() -> this.bufferPool.offer(buffer), MoreExecutors.directExecutor());
                MoreFutures.addExceptionCallback(writeFuture, throwable -> this.failure.compareAndSet((Throwable)null, (Throwable)throwable));
            }
        }
    }
}

