/*
 * Decompiled with CFR 0.152.
 */
package com.amazonaws.athena.connector.lambda.data;

import com.amazonaws.athena.connector.lambda.data.Block;
import com.amazonaws.athena.connector.lambda.data.BlockAllocator;
import com.amazonaws.athena.connector.lambda.data.BlockSpiller;
import com.amazonaws.athena.connector.lambda.data.BlockWriter;
import com.amazonaws.athena.connector.lambda.data.SpillConfig;
import com.amazonaws.athena.connector.lambda.domain.predicate.ConstraintEvaluator;
import com.amazonaws.athena.connector.lambda.domain.spill.S3SpillLocation;
import com.amazonaws.athena.connector.lambda.domain.spill.SpillLocation;
import com.amazonaws.athena.connector.lambda.security.AesGcmBlockCrypto;
import com.amazonaws.athena.connector.lambda.security.BlockCrypto;
import com.amazonaws.athena.connector.lambda.security.EncryptionKey;
import com.amazonaws.athena.connector.lambda.security.NoOpBlockCrypto;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.google.common.io.ByteStreams;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.StampedLock;
import org.apache.arrow.vector.types.pojo.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3BlockSpiller
implements AutoCloseable,
BlockSpiller {
    private static final Logger logger = LoggerFactory.getLogger(S3BlockSpiller.class);
    private static final long ASYNC_SHUTDOWN_MILLIS = 10000L;
    private static final int MAX_ROWS_PER_CALL = 100;
    private final AmazonS3 amazonS3;
    private final BlockCrypto blockCrypto;
    private final BlockAllocator allocator;
    private final SpillConfig spillConfig;
    private final Schema schema;
    private final long maxRowsPerCall;
    private final List<SpillLocation> spillLocations = new ArrayList<SpillLocation>();
    private final AtomicReference<Block> inProgressBlock = new AtomicReference();
    private final ExecutorService asyncSpillPool;
    private final ReadWriteLock spillLock = new StampedLock().asReadWriteLock();
    private final AtomicLong spillNumber = new AtomicLong(0L);
    private final AtomicReference<RuntimeException> asyncException = new AtomicReference<Object>(null);
    private final ConstraintEvaluator constraintEvaluator;
    private final AtomicLong totalBytesSpilled = new AtomicLong();
    private final long startTime = System.currentTimeMillis();

    public S3BlockSpiller(AmazonS3 amazonS3, SpillConfig spillConfig, BlockAllocator allocator, Schema schema, ConstraintEvaluator constraintEvaluator) {
        this(amazonS3, spillConfig, allocator, schema, constraintEvaluator, 100);
    }

    public S3BlockSpiller(AmazonS3 amazonS3, SpillConfig spillConfig, BlockAllocator allocator, Schema schema, ConstraintEvaluator constraintEvaluator, int maxRowsPerCall) {
        this.amazonS3 = Objects.requireNonNull(amazonS3, "amazonS3 was null");
        this.spillConfig = Objects.requireNonNull(spillConfig, "spillConfig was null");
        this.allocator = Objects.requireNonNull(allocator, "allocator was null");
        this.schema = Objects.requireNonNull(schema, "schema was null");
        this.blockCrypto = spillConfig.getEncryptionKey() != null ? new AesGcmBlockCrypto(allocator) : new NoOpBlockCrypto(allocator);
        this.asyncSpillPool = spillConfig.getNumSpillThreads() <= 0 ? null : Executors.newFixedThreadPool(spillConfig.getNumSpillThreads());
        this.maxRowsPerCall = maxRowsPerCall;
        this.constraintEvaluator = constraintEvaluator;
    }

    @Override
    public ConstraintEvaluator getConstraintEvaluator() {
        return this.constraintEvaluator;
    }

    @Override
    public void writeRows(BlockWriter.RowWriter rowWriter) {
        int rows;
        this.ensureInit();
        Block block = this.inProgressBlock.get();
        int rowCount = block.getRowCount();
        try {
            rows = rowWriter.writeRows(block, rowCount);
        }
        catch (Exception ex) {
            throw ex instanceof RuntimeException ? (RuntimeException)ex : new RuntimeException(ex);
        }
        if ((long)rows > this.maxRowsPerCall) {
            throw new RuntimeException("Call generated more than " + this.maxRowsPerCall + "rows. Generating too many rows per call to writeRows(...) can result in blocks that exceed the max size.");
        }
        if (rows > 0) {
            block.setRowCount(rowCount + rows);
        }
        if (block.getSize() > this.spillConfig.getMaxBlockBytes()) {
            logger.info("writeRow: Spilling block with {} rows and {} bytes and config {} bytes", new Object[]{block.getRowCount(), block.getSize(), this.spillConfig.getMaxBlockBytes()});
            this.spillBlock(block);
            this.inProgressBlock.set(this.allocator.createBlock(this.schema));
            this.inProgressBlock.get().constrain(this.constraintEvaluator);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean spilled() {
        if (this.asyncException.get() != null) {
            throw this.asyncException.get();
        }
        Lock lock = this.spillLock.writeLock();
        try {
            lock.lock();
            this.ensureInit();
            Block block = this.inProgressBlock.get();
            boolean bl = !this.spillLocations.isEmpty() || block.getSize() >= this.spillConfig.getMaxInlineBlockSize();
            return bl;
        }
        finally {
            lock.unlock();
        }
    }

    @Override
    public Block getBlock() {
        if (this.spilled()) {
            throw new RuntimeException("Blocks have spilled, calls to getBlock not permitted. use getSpillLocations instead.");
        }
        logger.info("getBlock: Inline Block size[{}] bytes vs {}", (Object)this.inProgressBlock.get().getSize(), (Object)this.spillConfig.getMaxInlineBlockSize());
        return this.inProgressBlock.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<SpillLocation> getSpillLocations() {
        if (!this.spilled()) {
            throw new RuntimeException("Blocks have not spilled, calls to getSpillLocations not permitted. use getBlock instead.");
        }
        Lock lock = this.spillLock.writeLock();
        try {
            Block block = this.inProgressBlock.get();
            if (block.getRowCount() > 0) {
                logger.info("getSpillLocations: Spilling final block with {} rows and {} bytes and config {} bytes", new Object[]{block.getRowCount(), block.getSize(), this.spillConfig.getMaxBlockBytes()});
                this.spillBlock(block);
                this.inProgressBlock.set(this.allocator.createBlock(this.schema));
                this.inProgressBlock.get().constrain(this.constraintEvaluator);
            }
            lock.lock();
            List<SpillLocation> list = this.spillLocations;
            return list;
        }
        finally {
            lock.unlock();
        }
    }

    @Override
    public void close() {
        logger.info("close: Spilled a total of {} bytes in {} ms", (Object)this.totalBytesSpilled.get(), (Object)(System.currentTimeMillis() - this.startTime));
        if (this.asyncSpillPool == null) {
            return;
        }
        this.asyncSpillPool.shutdown();
        try {
            if (!this.asyncSpillPool.awaitTermination(10000L, TimeUnit.MILLISECONDS)) {
                this.asyncSpillPool.shutdownNow();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.asyncSpillPool.shutdownNow();
        }
    }

    protected SpillLocation write(Block block) {
        try {
            S3SpillLocation spillLocation = this.makeSpillLocation();
            EncryptionKey encryptionKey = this.spillConfig.getEncryptionKey();
            logger.info("write: Started encrypting block for write to {}", (Object)spillLocation);
            byte[] bytes = this.blockCrypto.encrypt(encryptionKey, block);
            this.totalBytesSpilled.addAndGet(bytes.length);
            logger.info("write: Started spilling block of size {} bytes", (Object)bytes.length);
            this.amazonS3.putObject(spillLocation.getBucket(), spillLocation.getKey(), (InputStream)new ByteArrayInputStream(bytes), new ObjectMetadata());
            logger.info("write: Completed spilling block of size {} bytes", (Object)bytes.length);
            return spillLocation;
        }
        catch (RuntimeException ex) {
            this.asyncException.compareAndSet(null, ex);
            logger.warn("write: Encountered error while writing block.", (Throwable)ex);
            throw ex;
        }
    }

    protected Block read(S3SpillLocation spillLocation, EncryptionKey key, Schema schema) {
        try {
            logger.debug("write: Started reading block from S3");
            S3Object fullObject = this.amazonS3.getObject(spillLocation.getBucket(), spillLocation.getKey());
            logger.debug("write: Completed reading block from S3");
            Block block = this.blockCrypto.decrypt(key, ByteStreams.toByteArray((InputStream)fullObject.getObjectContent()), schema);
            logger.debug("write: Completed decrypting block of size.");
            return block;
        }
        catch (IOException ex) {
            throw new RuntimeException(ex);
        }
    }

    private void spillBlock(Block block) {
        if (this.asyncSpillPool != null) {
            Lock lock = this.spillLock.readLock();
            try {
                lock.lock();
                this.asyncSpillPool.submit(() -> {
                    try {
                        SpillLocation spillLocation = this.write(block);
                        this.spillLocations.add(spillLocation);
                        this.safeClose(block);
                    }
                    finally {
                        lock.unlock();
                    }
                });
            }
            catch (Exception ex) {
                lock.unlock();
                throw ex;
            }
        } else {
            SpillLocation spillLocation = this.write(block);
            this.spillLocations.add(spillLocation);
            this.safeClose(block);
        }
    }

    private void ensureInit() {
        if (this.inProgressBlock.get() == null) {
            this.inProgressBlock.set(this.allocator.createBlock(this.schema));
            this.inProgressBlock.get().constrain(this.constraintEvaluator);
        }
    }

    private S3SpillLocation makeSpillLocation() {
        S3SpillLocation splitSpillLocation = (S3SpillLocation)this.spillConfig.getSpillLocation();
        if (!splitSpillLocation.isDirectory()) {
            throw new RuntimeException("Split's SpillLocation must be a directory because multiple blocks may be spilled.");
        }
        String blockKey = splitSpillLocation.getKey() + "." + this.spillNumber.getAndIncrement();
        return new S3SpillLocation(splitSpillLocation.getBucket(), blockKey, false);
    }

    private void safeClose(AutoCloseable block) {
        try {
            block.close();
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }
}

