/*
 * Decompiled with CFR 0.152.
 */
package net.snowflake.ingest.internal.apache.hadoop.fs.impl.prefetch;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import net.snowflake.ingest.internal.apache.hadoop.conf.Configuration;
import net.snowflake.ingest.internal.apache.hadoop.fs.LocalDirAllocator;
import net.snowflake.ingest.internal.apache.hadoop.fs.impl.prefetch.BlockCache;
import net.snowflake.ingest.internal.apache.hadoop.fs.impl.prefetch.BlockData;
import net.snowflake.ingest.internal.apache.hadoop.fs.impl.prefetch.BlockManager;
import net.snowflake.ingest.internal.apache.hadoop.fs.impl.prefetch.BlockOperations;
import net.snowflake.ingest.internal.apache.hadoop.fs.impl.prefetch.BufferData;
import net.snowflake.ingest.internal.apache.hadoop.fs.impl.prefetch.BufferPool;
import net.snowflake.ingest.internal.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
import net.snowflake.ingest.internal.apache.hadoop.fs.impl.prefetch.PrefetchingStatistics;
import net.snowflake.ingest.internal.apache.hadoop.fs.impl.prefetch.Retryer;
import net.snowflake.ingest.internal.apache.hadoop.fs.impl.prefetch.SingleFilePerBlockCache;
import net.snowflake.ingest.internal.apache.hadoop.fs.impl.prefetch.Validate;
import net.snowflake.ingest.internal.apache.hadoop.fs.statistics.DurationTracker;
import net.snowflake.ingest.internal.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class CachingBlockManager
extends BlockManager {
    private static final Logger LOG = LoggerFactory.getLogger(CachingBlockManager.class);
    private static final int TIMEOUT_MINUTES = 60;
    private final ExecutorServiceFuturePool futurePool;
    private BufferPool bufferPool;
    private final int bufferPoolSize;
    private BlockCache cache;
    private final AtomicInteger numCachingErrors;
    private final AtomicInteger numReadErrors;
    private final BlockOperations ops;
    private boolean closed;
    private static final int SLOW_CACHING_THRESHOLD = 5;
    private final AtomicBoolean cachingDisabled;
    private final PrefetchingStatistics prefetchingStatistics;
    private final Configuration conf;
    private final LocalDirAllocator localDirAllocator;
    private static final BufferData.State[] EXPECTED_STATE_AT_CACHING = new BufferData.State[]{BufferData.State.PREFETCHING, BufferData.State.READY};

    public CachingBlockManager(ExecutorServiceFuturePool futurePool, BlockData blockData, int bufferPoolSize, PrefetchingStatistics prefetchingStatistics, Configuration conf, LocalDirAllocator localDirAllocator) {
        super(blockData);
        Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize");
        this.futurePool = Objects.requireNonNull(futurePool);
        this.bufferPoolSize = bufferPoolSize;
        this.numCachingErrors = new AtomicInteger();
        this.numReadErrors = new AtomicInteger();
        this.cachingDisabled = new AtomicBoolean();
        this.prefetchingStatistics = Objects.requireNonNull(prefetchingStatistics);
        if (this.getBlockData().getFileSize() > 0L) {
            this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize(), this.prefetchingStatistics);
            this.cache = this.createCache();
        }
        this.ops = new BlockOperations();
        this.ops.setDebug(false);
        this.conf = Objects.requireNonNull(conf);
        this.localDirAllocator = localDirAllocator;
    }

    @Override
    public BufferData get(int blockNumber) throws IOException {
        BufferData data;
        boolean done;
        Validate.checkNotNegative(blockNumber, "blockNumber");
        int maxRetryDelayMs = this.bufferPoolSize * 120 * 1000;
        int statusUpdateDelayMs = 120000;
        Retryer retryer = new Retryer(10, maxRetryDelayMs, 120000);
        do {
            if (this.closed) {
                throw new IOException("this stream is already closed");
            }
            data = this.bufferPool.acquire(blockNumber);
            done = this.getInternal(data);
            if (!retryer.updateStatus()) continue;
            LOG.warn("waiting to get block: {}", (Object)blockNumber);
            LOG.info("state = {}", (Object)this.toString());
        } while (!done && retryer.continueRetry());
        if (done) {
            return data;
        }
        String message = String.format("Wait failed for get(%d)", blockNumber);
        throw new IllegalStateException(message);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean getInternal(BufferData data) throws IOException {
        Validate.checkNotNull(data, "data");
        if (data.stateEqualsOneOf(BufferData.State.PREFETCHING, BufferData.State.CACHING, BufferData.State.DONE)) {
            return false;
        }
        BufferData bufferData = data;
        synchronized (bufferData) {
            if (data.stateEqualsOneOf(BufferData.State.PREFETCHING, BufferData.State.CACHING, BufferData.State.DONE)) {
                return false;
            }
            int blockNumber = data.getBlockNumber();
            if (data.getState() == BufferData.State.READY) {
                BlockOperations.Operation op = this.ops.getPrefetched(blockNumber);
                this.ops.end(op);
                return true;
            }
            data.throwIfStateIncorrect(BufferData.State.BLANK);
            this.read(data);
            return true;
        }
    }

    @Override
    public void release(BufferData data) {
        if (this.closed) {
            return;
        }
        Validate.checkNotNull(data, "data");
        BlockOperations.Operation op = this.ops.release(data.getBlockNumber());
        this.bufferPool.release(data);
        this.ops.end(op);
    }

    @Override
    public synchronized void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        BlockOperations.Operation op = this.ops.close();
        this.cancelPrefetches();
        IOUtils.cleanupWithLogger(LOG, this.cache);
        this.ops.end(op);
        LOG.info(this.ops.getSummary(false));
        this.bufferPool.close();
        this.bufferPool = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void requestPrefetch(int blockNumber) {
        Validate.checkNotNegative(blockNumber, "blockNumber");
        if (this.closed) {
            return;
        }
        BufferData data = this.bufferPool.tryAcquire(blockNumber);
        if (data == null) {
            return;
        }
        if (!data.stateEqualsOneOf(BufferData.State.BLANK)) {
            return;
        }
        BufferData bufferData = data;
        synchronized (bufferData) {
            if (!data.stateEqualsOneOf(BufferData.State.BLANK)) {
                return;
            }
            BlockOperations.Operation op = this.ops.requestPrefetch(blockNumber);
            PrefetchTask prefetchTask = new PrefetchTask(data, this, Instant.now());
            Future<Void> prefetchFuture = this.futurePool.executeFunction(prefetchTask);
            data.setPrefetch(prefetchFuture);
            this.ops.end(op);
        }
    }

    @Override
    public void cancelPrefetches() {
        BlockOperations.Operation op = this.ops.cancelPrefetches();
        for (BufferData data : this.bufferPool.getAll()) {
            if (!data.stateEqualsOneOf(BufferData.State.PREFETCHING, BufferData.State.READY)) continue;
            this.requestCaching(data);
        }
        this.ops.end(op);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void read(BufferData data) throws IOException {
        BufferData bufferData = data;
        synchronized (bufferData) {
            try {
                this.readBlock(data, false, BufferData.State.BLANK);
            }
            catch (IOException e) {
                LOG.error("error reading block {}", (Object)data.getBlockNumber(), (Object)e);
                throw e;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void prefetch(BufferData data, Instant taskQueuedStartTime) throws IOException {
        BufferData bufferData = data;
        synchronized (bufferData) {
            this.prefetchingStatistics.executorAcquired(Duration.between(taskQueuedStartTime, Instant.now()));
            this.readBlock(data, true, BufferData.State.PREFETCHING, BufferData.State.CACHING);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void readBlock(BufferData data, boolean isPrefetch, BufferData.State ... expectedState) throws IOException {
        if (this.closed) {
            return;
        }
        BlockOperations.Operation op = null;
        DurationTracker tracker = null;
        BufferData bufferData = data;
        synchronized (bufferData) {
            block20: {
                block21: {
                    block18: {
                        block19: {
                            try {
                                if (data.stateEqualsOneOf(BufferData.State.DONE, BufferData.State.READY)) {
                                    return;
                                }
                                data.throwIfStateIncorrect(expectedState);
                                int blockNumber = data.getBlockNumber();
                                if (this.cache.containsBlock(blockNumber)) {
                                    op = this.ops.getCached(blockNumber);
                                    this.cache.get(blockNumber, data.getBuffer());
                                    data.setReady(expectedState);
                                    if (op == null) break block18;
                                    break block19;
                                }
                                if (isPrefetch) {
                                    tracker = this.prefetchingStatistics.prefetchOperationStarted();
                                    op = this.ops.prefetch(data.getBlockNumber());
                                } else {
                                    op = this.ops.getRead(data.getBlockNumber());
                                }
                                long offset = this.getBlockData().getStartOffset(data.getBlockNumber());
                                int size = this.getBlockData().getSize(data.getBlockNumber());
                                ByteBuffer buffer = data.getBuffer();
                                buffer.clear();
                                this.read(buffer, offset, size);
                                buffer.flip();
                                data.setReady(expectedState);
                                if (op == null) break block20;
                                break block21;
                            }
                            catch (Exception e) {
                                if (isPrefetch && tracker != null) {
                                    tracker.failed();
                                }
                                this.numReadErrors.incrementAndGet();
                                data.setDone();
                                throw e;
                            }
                        }
                        this.ops.end(op);
                    }
                    if (!isPrefetch) return;
                    this.prefetchingStatistics.prefetchOperationCompleted();
                    if (tracker == null) return;
                    tracker.close();
                    return;
                }
                this.ops.end(op);
            }
            if (!isPrefetch) return;
            this.prefetchingStatistics.prefetchOperationCompleted();
            if (tracker == null) return;
            tracker.close();
            return;
            finally {
                if (op != null) {
                    this.ops.end(op);
                }
                if (isPrefetch) {
                    this.prefetchingStatistics.prefetchOperationCompleted();
                    if (tracker != null) {
                        tracker.close();
                    }
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void requestCaching(BufferData data) {
        if (this.closed) {
            return;
        }
        if (this.cachingDisabled.get()) {
            data.setDone();
            return;
        }
        Validate.checkNotNull(data, "data");
        if (!data.stateEqualsOneOf(EXPECTED_STATE_AT_CACHING)) {
            return;
        }
        BufferData bufferData = data;
        synchronized (bufferData) {
            Future<Void> blockFuture;
            if (!data.stateEqualsOneOf(EXPECTED_STATE_AT_CACHING)) {
                return;
            }
            if (this.cache.containsBlock(data.getBlockNumber())) {
                data.setDone();
                return;
            }
            BufferData.State state = data.getState();
            BlockOperations.Operation op = this.ops.requestCaching(data.getBlockNumber());
            if (state == BufferData.State.PREFETCHING) {
                blockFuture = data.getActionFuture();
            } else {
                CompletableFuture<Void> cf = new CompletableFuture<Void>();
                cf.complete(null);
                blockFuture = cf;
            }
            CachePutTask task = new CachePutTask(data, blockFuture, this, Instant.now());
            Future<Void> actionFuture = this.futurePool.executeFunction(task);
            data.setCaching(actionFuture);
            this.ops.end(op);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture, Instant taskQueuedStartTime) {
        this.prefetchingStatistics.executorAcquired(Duration.between(taskQueuedStartTime, Instant.now()));
        if (this.closed) {
            return;
        }
        if (this.cachingDisabled.get()) {
            data.setDone();
            return;
        }
        try {
            blockFuture.get(60L, TimeUnit.MINUTES);
            if (data.stateEqualsOneOf(BufferData.State.DONE)) {
                return;
            }
        }
        catch (Exception e) {
            LOG.info("error waiting on blockFuture: {}. {}", (Object)data, (Object)e.getMessage());
            LOG.debug("error waiting on blockFuture: {}", (Object)data, (Object)e);
            data.setDone();
            return;
        }
        if (this.cachingDisabled.get()) {
            data.setDone();
            return;
        }
        BlockOperations.Operation op = null;
        BufferData bufferData = data;
        synchronized (bufferData) {
            BlockOperations.End endOp;
            try {
                if (data.stateEqualsOneOf(BufferData.State.DONE)) {
                    return;
                }
                if (this.cache.containsBlock(data.getBlockNumber())) {
                    data.setDone();
                    return;
                }
                op = this.ops.addToCache(data.getBlockNumber());
                ByteBuffer buffer = data.getBuffer().duplicate();
                buffer.rewind();
                this.cachePut(data.getBlockNumber(), buffer);
                data.setDone();
            }
            catch (Exception e) {
                this.numCachingErrors.incrementAndGet();
                LOG.info("error adding block to cache after wait: {}. {}", (Object)data, (Object)e.getMessage());
                LOG.debug("error adding block to cache after wait: {}", (Object)data, (Object)e);
                data.setDone();
            }
            if (op != null && (endOp = (BlockOperations.End)this.ops.end(op)).duration() > 5.0 && !this.cachingDisabled.getAndSet(true)) {
                String message = String.format("Caching disabled because of slow operation (%.1f sec)", endOp.duration());
                LOG.warn(message);
            }
        }
    }

    protected BlockCache createCache() {
        return new SingleFilePerBlockCache(this.prefetchingStatistics);
    }

    protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException {
        if (this.closed) {
            return;
        }
        this.cache.put(blockNumber, buffer, this.conf, this.localDirAllocator);
    }

    public int numAvailable() {
        return this.bufferPool.numAvailable();
    }

    public int numCached() {
        return this.cache.size();
    }

    public int numCachingErrors() {
        return this.numCachingErrors.get();
    }

    public int numReadErrors() {
        return this.numReadErrors.get();
    }

    BufferData getData(int blockNumber) {
        return this.bufferPool.tryAcquire(blockNumber);
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("cache(");
        sb.append(this.cache.toString());
        sb.append("); ");
        sb.append("pool: ");
        sb.append(this.bufferPool.toString());
        return sb.toString();
    }

    private static class CachePutTask
    implements Supplier<Void> {
        private final BufferData data;
        private final Future<Void> blockFuture;
        private final CachingBlockManager blockManager;
        private final Instant taskQueuedStartTime;

        CachePutTask(BufferData data, Future<Void> blockFuture, CachingBlockManager blockManager, Instant taskQueuedStartTime) {
            this.data = data;
            this.blockFuture = blockFuture;
            this.blockManager = blockManager;
            this.taskQueuedStartTime = taskQueuedStartTime;
        }

        @Override
        public Void get() {
            this.blockManager.addToCacheAndRelease(this.data, this.blockFuture, this.taskQueuedStartTime);
            return null;
        }
    }

    private static class PrefetchTask
    implements Supplier<Void> {
        private final BufferData data;
        private final CachingBlockManager blockManager;
        private final Instant taskQueuedStartTime;

        PrefetchTask(BufferData data, CachingBlockManager blockManager, Instant taskQueuedStartTime) {
            this.data = data;
            this.blockManager = blockManager;
            this.taskQueuedStartTime = taskQueuedStartTime;
        }

        @Override
        public Void get() {
            try {
                this.blockManager.prefetch(this.data, this.taskQueuedStartTime);
            }
            catch (Exception e) {
                LOG.info("error prefetching block {}. {}", (Object)this.data.getBlockNumber(), (Object)e.getMessage());
                LOG.debug("error prefetching block {}", (Object)this.data.getBlockNumber(), (Object)e);
            }
            return null;
        }
    }
}

