/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.fluss.fs;

import com.alibaba.fluss.fs.ClosingFSDataInputStream;
import com.alibaba.fluss.fs.FSDataInputStream;
import com.alibaba.fluss.fs.FSDataOutputStream;
import com.alibaba.fluss.fs.FileSystem;
import com.alibaba.fluss.fs.FileSystemSafetyNet;
import com.alibaba.fluss.fs.FsPath;
import com.alibaba.fluss.fs.SafetyNetCloseableRegistry;
import com.alibaba.fluss.fs.SafetyNetWrapperFileSystem;
import com.alibaba.fluss.fs.WrappingProxyCloseable;
import com.alibaba.fluss.fs.local.LocalFileSystem;
import com.alibaba.fluss.testutils.common.CheckedThread;
import com.alibaba.fluss.utils.AbstractAutoCloseableRegistry;
import com.alibaba.fluss.utils.AbstractAutoCloseableRegistryTest;
import com.alibaba.fluss.utils.ExceptionUtils;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class SafetyNetCloseableRegistryTest
extends AbstractAutoCloseableRegistryTest<Closeable, WrappingProxyCloseable<? extends Closeable>, SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef> {
    @Override
    protected void registerCloseable(final Closeable closeable) throws IOException {
        WrappingProxyCloseable<Closeable> wrappingProxyCloseable = new WrappingProxyCloseable<Closeable>(){

            public void close() throws IOException {
                closeable.close();
            }

            public Closeable getWrappedDelegate() {
                return closeable;
            }
        };
        this.closeableRegistry.registerCloseable((AutoCloseable)wrappingProxyCloseable);
    }

    @Override
    protected AbstractAutoCloseableRegistry<Closeable, WrappingProxyCloseable<? extends Closeable>, SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef, IOException> createRegistry() {
        return new SafetyNetCloseableRegistry(() -> new JoinOnInterruptReaperThread());
    }

    @Override
    protected AbstractAutoCloseableRegistryTest.ProducerThread<Closeable, WrappingProxyCloseable<? extends Closeable>, SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef> createProducerThread(AbstractAutoCloseableRegistry<Closeable, WrappingProxyCloseable<? extends Closeable>, SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef, IOException> registry, AtomicInteger unclosedCounter, int maxStreams) {
        return new AbstractAutoCloseableRegistryTest.ProducerThread<Closeable, WrappingProxyCloseable<? extends Closeable>, SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef>(registry, unclosedCounter, maxStreams){
            int count;
            {
                this.count = 0;
            }

            @Override
            protected void createAndRegisterStream() throws IOException {
                String debug = Thread.currentThread().getName() + " " + this.count;
                TestFSDataInputStream testStream = new TestFSDataInputStream(this.refCount);
                ClosingFSDataInputStream pis = ClosingFSDataInputStream.wrapSafe((FSDataInputStream)testStream, (SafetyNetCloseableRegistry)((SafetyNetCloseableRegistry)this.registry), (String)debug);
                ++this.count;
            }
        };
    }

    @AfterEach
    void tearDown() {
        Assertions.assertThat((boolean)SafetyNetCloseableRegistry.isReaperThreadRunning()).isFalse();
    }

    @Test
    public void testCorrectScopesForSafetyNet(final @TempDir Path tempDir) throws Exception {
        CheckedThread t1 = new CheckedThread(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void go() throws Exception {
                try {
                    FileSystem fs1 = FileSystem.get((URI)LocalFileSystem.getLocalFsURI());
                    Assertions.assertThat((Object)fs1).isNotInstanceOf(SafetyNetWrapperFileSystem.class);
                    FileSystemSafetyNet.initializeSafetyNetForThread();
                    fs1 = FileSystem.get((URI)LocalFileSystem.getLocalFsURI());
                    Assertions.assertThat((Object)fs1).isInstanceOf(SafetyNetWrapperFileSystem.class);
                    FsPath tmp = new FsPath(tempDir.toString(), "test_file");
                    try (FSDataOutputStream stream = fs1.create(tmp, FileSystem.WriteMode.NO_OVERWRITE);){
                        CheckedThread t2 = new CheckedThread(){

                            public void go() {
                                try {
                                    FileSystem fs2 = FileSystem.get((URI)LocalFileSystem.getLocalFsURI());
                                    Assertions.assertThat((Object)fs2).isNotInstanceOf(SafetyNetWrapperFileSystem.class);
                                    FileSystemSafetyNet.initializeSafetyNetForThread();
                                    fs2 = FileSystem.get((URI)LocalFileSystem.getLocalFsURI());
                                    Assertions.assertThat((Object)fs2).isInstanceOf(SafetyNetWrapperFileSystem.class);
                                    FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                                    fs2 = FileSystem.get((URI)LocalFileSystem.getLocalFsURI());
                                    Assertions.assertThat((Object)fs2).isNotInstanceOf(SafetyNetWrapperFileSystem.class);
                                }
                                catch (Exception e) {
                                    Assertions.fail((String)ExceptionUtils.stringifyException((Throwable)e));
                                }
                            }
                        };
                        t2.start();
                        t2.sync();
                        stream.write(42);
                        FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                        try {
                            stream.write(43);
                            Assertions.fail((String)"stream should be closed.");
                        }
                        catch (IOException iOException) {
                            // empty catch block
                        }
                        fs1 = FileSystem.get((URI)LocalFileSystem.getLocalFsURI());
                        Assertions.assertThat((Object)fs1).isNotInstanceOf(SafetyNetWrapperFileSystem.class);
                    }
                    finally {
                        fs1.delete(tmp, false);
                    }
                }
                catch (Exception e) {
                    Assertions.fail((String)ExceptionUtils.stringifyException((Throwable)e));
                }
            }
        };
        t1.start();
        t1.sync();
    }

    @Test
    void testSafetyNetClose() throws Exception {
        this.setup(20);
        this.startThreads();
        this.joinThreads();
        for (int i = 0; i < 5 && this.unclosedCounter.get() > 0; ++i) {
            System.gc();
            Thread.sleep(50L);
        }
        Assertions.assertThat((int)this.unclosedCounter.get()).isEqualTo(0);
        this.closeableRegistry.close();
    }

    @Test
    void testReaperThreadSpawnAndStop() throws Exception {
        Assertions.assertThat((boolean)SafetyNetCloseableRegistry.isReaperThreadRunning()).isFalse();
        try (SafetyNetCloseableRegistry ignored = new SafetyNetCloseableRegistry();){
            Assertions.assertThat((boolean)SafetyNetCloseableRegistry.isReaperThreadRunning()).isTrue();
            try (SafetyNetCloseableRegistry ignored2 = new SafetyNetCloseableRegistry();){
                Assertions.assertThat((boolean)SafetyNetCloseableRegistry.isReaperThreadRunning()).isTrue();
            }
            Assertions.assertThat((boolean)SafetyNetCloseableRegistry.isReaperThreadRunning()).isTrue();
        }
        Assertions.assertThat((boolean)SafetyNetCloseableRegistry.isReaperThreadRunning()).isFalse();
    }

    @Test
    void testReaperThreadStartFailed() throws Exception {
        try {
            new SafetyNetCloseableRegistry(() -> new OutOfMemoryReaperThread());
        }
        catch (OutOfMemoryError outOfMemoryError) {
            // empty catch block
        }
        Assertions.assertThat((boolean)SafetyNetCloseableRegistry.isReaperThreadRunning()).isFalse();
        SafetyNetCloseableRegistry closeableRegistry = new SafetyNetCloseableRegistry();
        Assertions.assertThat((boolean)SafetyNetCloseableRegistry.isReaperThreadRunning()).isTrue();
        closeableRegistry.close();
    }

    protected static final class TestFSDataInputStream
    extends FSDataInputStream {
        protected AtomicInteger refCount;

        public TestFSDataInputStream(AtomicInteger refCount) {
            this.refCount = refCount;
            refCount.incrementAndGet();
        }

        public int read() throws IOException {
            return 0;
        }

        public synchronized void close() throws IOException {
            this.refCount.decrementAndGet();
        }

        public void seek(long desired) throws IOException {
        }

        public long getPos() throws IOException {
            return 0L;
        }
    }

    private static class OutOfMemoryReaperThread
    extends SafetyNetCloseableRegistry.CloseableReaperThread {
        private OutOfMemoryReaperThread() {
        }

        public synchronized void start() {
            throw new OutOfMemoryError();
        }
    }

    private static class JoinOnInterruptReaperThread
    extends SafetyNetCloseableRegistry.CloseableReaperThread {
        private JoinOnInterruptReaperThread() {
        }

        public void interrupt() {
            super.interrupt();
            try {
                this.join();
            }
            catch (InterruptedException ex) {
                JoinOnInterruptReaperThread.currentThread().interrupt();
            }
        }
    }
}

