/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.operator;

import com.facebook.airlift.concurrent.Threads;
import com.facebook.presto.block.BlockAssertions;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockEncoding;
import com.facebook.presto.common.block.BlockEncodingSerde;
import com.facebook.presto.common.block.TestingBlockEncodingSerde;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.operator.FileFragmentResultCacheConfig;
import com.facebook.presto.operator.FileFragmentResultCacheManager;
import com.facebook.presto.operator.FragmentCacheResult;
import com.facebook.presto.operator.FragmentCacheStats;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeProvider;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

public class TestFileFragmentResultCacheManager {
    private static final String SERIALIZED_PLAN_FRAGMENT_1 = "test plan fragment 1";
    private static final String SERIALIZED_PLAN_FRAGMENT_2 = "test plan fragment 2";
    private static final Split SPLIT_1 = new Split(new ConnectorId("test"), new ConnectorTransactionHandle(){}, (ConnectorSplit)new TestingSplit(1));
    private static final Split SPLIT_2 = new Split(new ConnectorId("test"), new ConnectorTransactionHandle(){}, (ConnectorSplit)new TestingSplit(2));
    private static final long INPUT_DATA_SIZE_1 = 1000L;
    private static final long INPUT_DATA_SIZE_2 = 2000L;
    private final ExecutorService writeExecutor = Executors.newScheduledThreadPool(5, Threads.daemonThreadsNamed((String)"test-cache-flusher-%s"));
    private final ExecutorService removalExecutor = Executors.newScheduledThreadPool(5, Threads.daemonThreadsNamed((String)"test-cache-remover-%s"));
    private final ExecutorService multithreadingWriteExecutor = Executors.newScheduledThreadPool(10, Threads.daemonThreadsNamed((String)"test-cache-multithreading-flusher-%s"));

    @AfterClass
    public void close() throws IOException, InterruptedException {
        this.writeExecutor.shutdown();
        this.removalExecutor.shutdown();
        this.removalExecutor.awaitTermination(30L, TimeUnit.SECONDS);
        this.multithreadingWriteExecutor.shutdown();
    }

    private URI getNewCacheDirectory(String prefix) throws Exception {
        return Files.createTempDirectory(prefix, new FileAttribute[0]).toUri();
    }

    private void cleanupCacheDirectory(URI cacheDirectory) throws IOException {
        Preconditions.checkState((cacheDirectory != null ? 1 : 0) != 0);
        File[] files = new File(cacheDirectory).listFiles();
        if (files != null) {
            for (File file : files) {
                Files.deleteIfExists(file.toPath());
            }
        }
        Files.deleteIfExists(new File(cacheDirectory).toPath());
    }

    @Test(timeOut=30000L)
    public void testBasic() throws Exception {
        URI cacheDirectory = this.getNewCacheDirectory("testBasic");
        FragmentCacheStats stats = new FragmentCacheStats();
        FileFragmentResultCacheManager cacheManager = this.fileFragmentResultCacheManager(stats, cacheDirectory);
        FragmentCacheResult fragmentCacheResult = cacheManager.get(SERIALIZED_PLAN_FRAGMENT_1, SPLIT_1);
        Assert.assertFalse((boolean)fragmentCacheResult.getPages().isPresent());
        Assert.assertEquals((long)fragmentCacheResult.getInputDataSize(), (long)0L);
        Assert.assertEquals((long)stats.getCacheMiss(), (long)1L);
        Assert.assertEquals((long)stats.getCacheHit(), (long)0L);
        Assert.assertEquals((long)stats.getCacheEntries(), (long)0L);
        Assert.assertEquals((long)stats.getCacheSizeInBytes(), (long)0L);
        cacheManager.put(SERIALIZED_PLAN_FRAGMENT_1, SPLIT_1, (List)ImmutableList.of(), 1000L).get();
        fragmentCacheResult = cacheManager.get(SERIALIZED_PLAN_FRAGMENT_1, SPLIT_1);
        Optional result = fragmentCacheResult.getPages();
        Assert.assertTrue((boolean)result.isPresent());
        Assert.assertFalse((boolean)((Iterator)result.get()).hasNext());
        Assert.assertEquals((long)fragmentCacheResult.getInputDataSize(), (long)1000L);
        Assert.assertEquals((long)stats.getCacheMiss(), (long)1L);
        Assert.assertEquals((long)stats.getCacheHit(), (long)1L);
        Assert.assertEquals((long)stats.getCacheEntries(), (long)1L);
        Assert.assertEquals((long)stats.getCacheSizeInBytes(), (long)0L);
        ImmutableList pages = ImmutableList.of((Object)new Page(new Block[]{BlockAssertions.createStringsBlock("plan-1-split-2")}));
        cacheManager.put(SERIALIZED_PLAN_FRAGMENT_2, SPLIT_2, (List)pages, 2000L).get();
        fragmentCacheResult = cacheManager.get(SERIALIZED_PLAN_FRAGMENT_2, SPLIT_2);
        result = fragmentCacheResult.getPages();
        Assert.assertTrue((boolean)result.isPresent());
        TestFileFragmentResultCacheManager.assertPagesEqual((Iterator)result.get(), pages.iterator());
        Assert.assertEquals((long)fragmentCacheResult.getInputDataSize(), (long)2000L);
        Assert.assertEquals((long)stats.getCacheMiss(), (long)1L);
        Assert.assertEquals((long)stats.getCacheHit(), (long)2L);
        Assert.assertEquals((long)stats.getCacheEntries(), (long)2L);
        Assert.assertEquals((long)stats.getCacheSizeInBytes(), (long)this.getCachePhysicalSize(cacheDirectory));
        cacheManager.get(SERIALIZED_PLAN_FRAGMENT_1, SPLIT_2);
        Assert.assertEquals((long)stats.getCacheMiss(), (long)2L);
        Assert.assertEquals((long)stats.getCacheHit(), (long)2L);
        Assert.assertEquals((long)stats.getCacheEntries(), (long)2L);
        cacheManager.get(SERIALIZED_PLAN_FRAGMENT_2, SPLIT_1);
        Assert.assertEquals((long)stats.getCacheMiss(), (long)3L);
        Assert.assertEquals((long)stats.getCacheHit(), (long)2L);
        Assert.assertEquals((long)stats.getCacheEntries(), (long)2L);
        Assert.assertEquals((long)stats.getCacheSizeInBytes(), (long)this.getCachePhysicalSize(cacheDirectory));
        cacheManager.invalidateAllCache();
        Assert.assertEquals((long)stats.getCacheMiss(), (long)3L);
        Assert.assertEquals((long)stats.getCacheHit(), (long)2L);
        Assert.assertEquals((long)stats.getCacheEntries(), (long)0L);
        Assert.assertEquals((long)stats.getCacheRemoval(), (long)2L);
        Assert.assertEquals((long)stats.getCacheSizeInBytes(), (long)0L);
        this.cleanupCacheDirectory(cacheDirectory);
    }

    @Test(timeOut=30000L)
    public void testMaxCacheSize() throws Exception {
        ImmutableList pages = ImmutableList.of((Object)new Page(new Block[]{BlockAssertions.createStringsBlock("plan-1-split-2")}));
        URI cacheDirectory = this.getNewCacheDirectory("testMaxCacheSize");
        FragmentCacheStats stats = new FragmentCacheStats();
        FileFragmentResultCacheConfig config = new FileFragmentResultCacheConfig();
        config.setMaxCacheSize(new DataSize(71.0, DataSize.Unit.BYTE));
        FileFragmentResultCacheManager cacheManager = this.fileFragmentResultCacheManager(stats, config, cacheDirectory);
        cacheManager.put(SERIALIZED_PLAN_FRAGMENT_1, SPLIT_1, (List)pages, 1000L).get();
        FragmentCacheResult fragmentCacheResult = cacheManager.get(SERIALIZED_PLAN_FRAGMENT_1, SPLIT_1);
        Optional result = fragmentCacheResult.getPages();
        Assert.assertTrue((boolean)result.isPresent());
        TestFileFragmentResultCacheManager.assertPagesEqual((Iterator)result.get(), pages.iterator());
        Assert.assertEquals((long)fragmentCacheResult.getInputDataSize(), (long)1000L);
        Assert.assertEquals((long)stats.getCacheMiss(), (long)0L);
        Assert.assertEquals((long)stats.getCacheHit(), (long)1L);
        Assert.assertEquals((long)stats.getCacheEntries(), (long)1L);
        Assert.assertEquals((long)stats.getCacheSizeInBytes(), (long)this.getCachePhysicalSize(cacheDirectory));
        Assert.assertNull(cacheManager.put(SERIALIZED_PLAN_FRAGMENT_1, SPLIT_2, (List)pages, 2000L).get());
        fragmentCacheResult = cacheManager.get(SERIALIZED_PLAN_FRAGMENT_1, SPLIT_2);
        result = fragmentCacheResult.getPages();
        Assert.assertFalse((boolean)result.isPresent());
        Assert.assertEquals((long)fragmentCacheResult.getInputDataSize(), (long)0L);
        Assert.assertEquals((long)stats.getCacheMiss(), (long)1L);
        Assert.assertEquals((long)stats.getCacheHit(), (long)1L);
        Assert.assertEquals((long)stats.getCacheEntries(), (long)1L);
        Assert.assertEquals((long)stats.getCacheSizeInBytes(), (long)this.getCachePhysicalSize(cacheDirectory));
        cacheManager.put(SERIALIZED_PLAN_FRAGMENT_2, SPLIT_1, (List)ImmutableList.of(), 0L).get();
        fragmentCacheResult = cacheManager.get(SERIALIZED_PLAN_FRAGMENT_2, SPLIT_1);
        result = fragmentCacheResult.getPages();
        Assert.assertTrue((boolean)result.isPresent());
        Assert.assertEquals((long)fragmentCacheResult.getInputDataSize(), (long)0L);
        Assert.assertFalse((boolean)((Iterator)result.get()).hasNext());
        Assert.assertEquals((long)stats.getCacheMiss(), (long)1L);
        Assert.assertEquals((long)stats.getCacheHit(), (long)2L);
        Assert.assertEquals((long)stats.getCacheEntries(), (long)2L);
        Assert.assertEquals((long)stats.getCacheSizeInBytes(), (long)this.getCachePhysicalSize(cacheDirectory));
        cacheManager.invalidateAllCache();
        Assert.assertEquals((long)stats.getCacheMiss(), (long)1L);
        Assert.assertEquals((long)stats.getCacheHit(), (long)2L);
        Assert.assertEquals((long)stats.getCacheEntries(), (long)0L);
        Assert.assertEquals((long)stats.getCacheRemoval(), (long)2L);
        Assert.assertEquals((long)stats.getCacheSizeInBytes(), (long)0L);
        this.cleanupCacheDirectory(cacheDirectory);
    }

    private static void assertPagesEqual(Iterator<Page> pages1, Iterator<Page> pages2) {
        while (pages1.hasNext() && pages2.hasNext()) {
            Page page1 = pages1.next();
            Page page2 = pages2.next();
            Assert.assertEquals((int)page1.getChannelCount(), (int)page2.getChannelCount());
            for (int i = 0; i < page1.getChannelCount(); ++i) {
                Assert.assertTrue((boolean)page1.getBlock(i).equals(0, 0, page2.getBlock(i), 0, 0, page1.getBlock(0).getSliceLength(0)));
            }
        }
        Assert.assertFalse((boolean)pages1.hasNext());
        Assert.assertFalse((boolean)pages2.hasNext());
    }

    @Test(timeOut=30000L)
    public void testThreadWrite() throws Exception {
        URI cacheDirectory = this.getNewCacheDirectory("testThreadWrite");
        String writeThreadNameFormat = "test write content,thread %s,%s";
        FragmentCacheStats stats = new FragmentCacheStats();
        FileFragmentResultCacheManager threadWriteCacheManager = this.fileFragmentResultCacheManager(stats, cacheDirectory);
        ImmutableList.Builder futures = ImmutableList.builder();
        for (int i = 0; i < 10; ++i) {
            Future future = this.multithreadingWriteExecutor.submit(() -> {
                try {
                    String threadInfo = String.format(writeThreadNameFormat, Thread.currentThread().getName(), Thread.currentThread().getId());
                    ImmutableList pages = ImmutableList.of((Object)new Page(new Block[]{BlockAssertions.createStringsBlock(threadInfo)}));
                    threadWriteCacheManager.put(threadInfo, SPLIT_2, (List)pages, 2000L).get();
                    FragmentCacheResult fragmentCacheResult = threadWriteCacheManager.get(threadInfo, SPLIT_2);
                    Optional result = fragmentCacheResult.getPages();
                    Assert.assertTrue((boolean)result.isPresent());
                    TestFileFragmentResultCacheManager.assertPagesEqual((Iterator)result.get(), pages.iterator());
                    Assert.assertEquals((long)fragmentCacheResult.getInputDataSize(), (long)2000L);
                    return true;
                }
                catch (Exception e) {
                    return false;
                }
            });
            futures.add(future);
        }
        for (Future future : futures.build()) {
            Assert.assertTrue((boolean)((Boolean)future.get(30L, TimeUnit.SECONDS)));
        }
        Assert.assertTrue((stats.getCacheSizeInBytes() > 0L ? 1 : 0) != 0);
        threadWriteCacheManager.invalidateAllCache();
        Assert.assertEquals((long)stats.getCacheSizeInBytes(), (long)0L);
        this.cleanupCacheDirectory(cacheDirectory);
    }

    private long getCachePhysicalSize(URI cacheDirectory) {
        Preconditions.checkState((cacheDirectory != null ? 1 : 0) != 0);
        File[] files = new File(cacheDirectory).listFiles();
        long physicalSize = 0L;
        if (files != null) {
            for (File file : files) {
                physicalSize += file.length();
            }
        }
        return physicalSize;
    }

    private FileFragmentResultCacheManager fileFragmentResultCacheManager(FragmentCacheStats fragmentCacheStats, URI cacheDirectory) {
        return this.fileFragmentResultCacheManager(fragmentCacheStats, new FileFragmentResultCacheConfig(), cacheDirectory);
    }

    private FileFragmentResultCacheManager fileFragmentResultCacheManager(FragmentCacheStats fragmentCacheStats, FileFragmentResultCacheConfig cacheConfig, URI cacheDirectory) {
        return new FileFragmentResultCacheManager(cacheConfig.setBaseDirectory(cacheDirectory).setInputDataStatsEnabled(true), (BlockEncodingSerde)new TestingBlockEncodingSerde(new BlockEncoding[0]), fragmentCacheStats, this.writeExecutor, this.removalExecutor);
    }

    private static class TestingSplit
    implements ConnectorSplit {
        private final int id;

        public TestingSplit(int id) {
            this.id = id;
        }

        public NodeSelectionStrategy getNodeSelectionStrategy() {
            return NodeSelectionStrategy.NO_PREFERENCE;
        }

        public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider) {
            return ImmutableList.of();
        }

        public Object getInfo() {
            return this;
        }

        public Object getSplitIdentifier() {
            return this.id;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TestingSplit that = (TestingSplit)o;
            return this.id == that.id;
        }

        public int hashCode() {
            return Objects.hash(this.id);
        }

        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("id", this.id).toString();
        }
    }
}

