/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.kernel.impl.transaction.log;

import java.io.File;
import java.io.Flushable;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import org.mockito.Mockito;
import org.neo4j.adversaries.Adversary;
import org.neo4j.adversaries.ClassGuardedAdversary;
import org.neo4j.adversaries.CountingAdversary;
import org.neo4j.adversaries.fs.AdversarialFileSystemAbstraction;
import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.FileSystemLifecycleAdapter;
import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.core.DatabasePanicEventGenerator;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.kernel.impl.transaction.SimpleLogVersionRepository;
import org.neo4j.kernel.impl.transaction.SimpleTransactionIdStore;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.command.Command;
import org.neo4j.kernel.impl.transaction.log.BatchingTransactionAppender;
import org.neo4j.kernel.impl.transaction.log.InMemoryClosableChannel;
import org.neo4j.kernel.impl.transaction.log.LogVersionRepository;
import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.TransactionMetadataCache;
import org.neo4j.kernel.impl.transaction.log.files.LogFile;
import org.neo4j.kernel.impl.transaction.log.files.LogFiles;
import org.neo4j.kernel.impl.transaction.log.files.LogFilesBuilder;
import org.neo4j.kernel.impl.transaction.log.files.TransactionLogFiles;
import org.neo4j.kernel.impl.transaction.log.rotation.LogRotation;
import org.neo4j.kernel.impl.transaction.tracing.LogAppendEvent;
import org.neo4j.kernel.impl.transaction.tracing.LogForceEvents;
import org.neo4j.kernel.impl.transaction.tracing.LogForceWaitEvent;
import org.neo4j.kernel.impl.util.IdOrderingQueue;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.lifecycle.LifeRule;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.logging.Log;
import org.neo4j.logging.NullLog;
import org.neo4j.test.DoubleLatch;
import org.neo4j.test.Race;
import org.neo4j.test.ThreadTestUtils;
import org.neo4j.test.rule.fs.EphemeralFileSystemRule;

public class BatchingTransactionAppenderConcurrencyTest {
    private static final long MILLISECONDS_TO_WAIT = TimeUnit.SECONDS.toMillis(10L);
    private static ExecutorService executor;
    private final LifeRule life = new LifeRule();
    private final EphemeralFileSystemRule fileSystemRule = new EphemeralFileSystemRule();
    @Rule
    public final RuleChain ruleChain = RuleChain.outerRule((TestRule)this.fileSystemRule).around((TestRule)this.life);
    private final LogAppendEvent logAppendEvent = LogAppendEvent.NULL;
    private final LogFiles logFiles = (LogFiles)Mockito.mock(TransactionLogFiles.class);
    private final LogFile logFile = (LogFile)Mockito.mock(LogFile.class);
    private final LogRotation logRotation = LogRotation.NO_ROTATION;
    private final TransactionMetadataCache transactionMetadataCache = new TransactionMetadataCache(10);
    private final TransactionIdStore transactionIdStore = new SimpleTransactionIdStore();
    private final SimpleLogVersionRepository logVersionRepository = new SimpleLogVersionRepository();
    private final IdOrderingQueue explicitIndexTransactionOrdering = IdOrderingQueue.BYPASS;
    private final DatabaseHealth databaseHealth = (DatabaseHealth)Mockito.mock(DatabaseHealth.class);
    private final Semaphore forceSemaphore = new Semaphore(0);
    private final BlockingQueue<ChannelCommand> channelCommandQueue = new LinkedBlockingQueue<ChannelCommand>(2);

    @BeforeClass
    public static void setUpExecutor() {
        executor = Executors.newCachedThreadPool();
    }

    @AfterClass
    public static void tearDownExecutor() {
        executor.shutdown();
        executor = null;
    }

    @Before
    public void setUp() {
        Mockito.when((Object)this.logFiles.getLogFile()).thenReturn((Object)this.logFile);
        Mockito.when((Object)this.logFile.getWriter()).thenReturn((Object)new CommandQueueChannel());
    }

    @Test
    public void shouldForceLogChannel() throws Throwable {
        BatchingTransactionAppender appender = (BatchingTransactionAppender)this.life.add((Lifecycle)this.createTransactionAppender());
        this.life.start();
        appender.forceAfterAppend((LogForceEvents)this.logAppendEvent);
        Assert.assertThat((Object)((Object)this.channelCommandQueue.take()), (Matcher)Matchers.is((Object)((Object)ChannelCommand.emptyBufferIntoChannelAndClearIt)));
        Assert.assertThat((Object)((Object)this.channelCommandQueue.take()), (Matcher)Matchers.is((Object)((Object)ChannelCommand.force)));
        Assert.assertTrue((boolean)this.channelCommandQueue.isEmpty());
    }

    @Test
    public void shouldWaitForOngoingForceToCompleteBeforeForcingAgain() throws Throwable {
        this.channelCommandQueue.put(ChannelCommand.dummy);
        BatchingTransactionAppender appender = (BatchingTransactionAppender)this.life.add((Lifecycle)this.createTransactionAppender());
        this.life.start();
        Runnable runnable = this.createForceAfterAppendRunnable(appender);
        Future<?> future = executor.submit(runnable);
        this.forceSemaphore.acquire();
        Thread otherThread = ThreadTestUtils.fork((Runnable)runnable);
        ThreadTestUtils.awaitThreadState((Thread)otherThread, (long)MILLISECONDS_TO_WAIT, (Thread.State)Thread.State.TIMED_WAITING, (Thread.State[])new Thread.State[0]);
        Assert.assertThat((Object)((Object)this.channelCommandQueue.take()), (Matcher)Matchers.is((Object)((Object)ChannelCommand.dummy)));
        Assert.assertThat((Object)((Object)this.channelCommandQueue.take()), (Matcher)Matchers.is((Object)((Object)ChannelCommand.emptyBufferIntoChannelAndClearIt)));
        Assert.assertThat((Object)((Object)this.channelCommandQueue.take()), (Matcher)Matchers.is((Object)((Object)ChannelCommand.force)));
        Assert.assertThat((Object)((Object)this.channelCommandQueue.take()), (Matcher)Matchers.is((Object)((Object)ChannelCommand.emptyBufferIntoChannelAndClearIt)));
        Assert.assertThat((Object)((Object)this.channelCommandQueue.take()), (Matcher)Matchers.is((Object)((Object)ChannelCommand.force)));
        future.get();
        otherThread.join();
        Assert.assertTrue((boolean)this.channelCommandQueue.isEmpty());
    }

    @Test
    public void shouldBatchUpMultipleWaitingForceRequests() throws Throwable {
        this.channelCommandQueue.put(ChannelCommand.dummy);
        BatchingTransactionAppender appender = (BatchingTransactionAppender)this.life.add((Lifecycle)this.createTransactionAppender());
        this.life.start();
        Runnable runnable = this.createForceAfterAppendRunnable(appender);
        Future<?> future = executor.submit(runnable);
        this.forceSemaphore.acquire();
        Thread[] otherThreads = new Thread[10];
        for (int i = 0; i < otherThreads.length; ++i) {
            otherThreads[i] = ThreadTestUtils.fork((Runnable)runnable);
        }
        for (Thread otherThread : otherThreads) {
            ThreadTestUtils.awaitThreadState((Thread)otherThread, (long)MILLISECONDS_TO_WAIT, (Thread.State)Thread.State.TIMED_WAITING, (Thread.State[])new Thread.State[0]);
        }
        Assert.assertThat((Object)((Object)this.channelCommandQueue.take()), (Matcher)Matchers.is((Object)((Object)ChannelCommand.dummy)));
        Assert.assertThat((Object)((Object)this.channelCommandQueue.take()), (Matcher)Matchers.is((Object)((Object)ChannelCommand.emptyBufferIntoChannelAndClearIt)));
        Assert.assertThat((Object)((Object)this.channelCommandQueue.take()), (Matcher)Matchers.is((Object)((Object)ChannelCommand.force)));
        Assert.assertThat((Object)((Object)this.channelCommandQueue.take()), (Matcher)Matchers.is((Object)((Object)ChannelCommand.emptyBufferIntoChannelAndClearIt)));
        Assert.assertThat((Object)((Object)this.channelCommandQueue.take()), (Matcher)Matchers.is((Object)((Object)ChannelCommand.force)));
        future.get();
        for (Thread otherThread : otherThreads) {
            otherThread.join();
        }
        Assert.assertTrue((boolean)this.channelCommandQueue.isEmpty());
    }

    @Test
    public void shouldHaveAllConcurrentAppendersSeePanic() throws Throwable {
        ClassGuardedAdversary adversary = new ClassGuardedAdversary((Adversary)new CountingAdversary(1, true), new Predicate[]{this.failMethod(BatchingTransactionAppender.class, "force")});
        EphemeralFileSystemAbstraction efs = new EphemeralFileSystemAbstraction();
        File directory = new File("dir").getCanonicalFile();
        efs.mkdirs(directory);
        AdversarialFileSystemAbstraction fs = new AdversarialFileSystemAbstraction((Adversary)adversary, (FileSystemAbstraction)efs);
        this.life.add((Lifecycle)new FileSystemLifecycleAdapter((FileSystemAbstraction)fs));
        DatabaseHealth databaseHealth = new DatabaseHealth((DatabasePanicEventGenerator)Mockito.mock(DatabasePanicEventGenerator.class), (Log)NullLog.getInstance());
        LogFiles logFiles = LogFilesBuilder.builder((File)directory, (FileSystemAbstraction)fs).withLogVersionRepository((LogVersionRepository)this.logVersionRepository).withTransactionIdStore(this.transactionIdStore).build();
        this.life.add((Lifecycle)logFiles);
        BatchingTransactionAppender appender = (BatchingTransactionAppender)this.life.add((Lifecycle)new BatchingTransactionAppender(logFiles, this.logRotation, this.transactionMetadataCache, this.transactionIdStore, this.explicitIndexTransactionOrdering, databaseHealth));
        this.life.start();
        int numberOfAppenders = 10;
        final CountDownLatch trap = new CountDownLatch(numberOfAppenders);
        LogAppendEvent.Empty beforeForceTrappingEvent = new LogAppendEvent.Empty(){

            public LogForceWaitEvent beginLogForceWait() {
                trap.countDown();
                DoubleLatch.awaitLatch(trap);
                return super.beginLogForceWait();
            }
        };
        Race race = new Race();
        for (int i = 0; i < numberOfAppenders; ++i) {
            race.addContestant(() -> this.lambda$shouldHaveAllConcurrentAppendersSeePanic$0(appender, (LogAppendEvent)beforeForceTrappingEvent));
        }
        race.go();
    }

    protected TransactionToApply tx() {
        NodeRecord before = new NodeRecord(0L);
        NodeRecord after = new NodeRecord(0L);
        after.setInUse(true);
        Command.NodeCommand nodeCommand = new Command.NodeCommand(before, after);
        PhysicalTransactionRepresentation tx = new PhysicalTransactionRepresentation(Collections.singletonList(nodeCommand));
        tx.setHeader(new byte[0], 0, 0, 0L, 0L, 0L, 0);
        return new TransactionToApply((TransactionRepresentation)tx);
    }

    private Runnable createForceAfterAppendRunnable(BatchingTransactionAppender appender) {
        return () -> {
            try {
                appender.forceAfterAppend((LogForceEvents)this.logAppendEvent);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        };
    }

    private Predicate<StackTraceElement> failMethod(Class<?> klass, String methodName) {
        return element -> element.getClassName().equals(klass.getName()) && element.getMethodName().equals(methodName);
    }

    private BatchingTransactionAppender createTransactionAppender() {
        return new BatchingTransactionAppender(this.logFiles, this.logRotation, this.transactionMetadataCache, this.transactionIdStore, this.explicitIndexTransactionOrdering, this.databaseHealth);
    }

    private /* synthetic */ void lambda$shouldHaveAllConcurrentAppendersSeePanic$0(BatchingTransactionAppender appender, LogAppendEvent beforeForceTrappingEvent) {
        try {
            appender.append(this.tx(), beforeForceTrappingEvent);
            Assert.fail((String)"No transaction should be considered appended");
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    class CommandQueueChannel
    extends InMemoryClosableChannel
    implements Flushable {
        CommandQueueChannel() {
        }

        @Override
        public Flushable prepareForFlush() {
            try {
                BatchingTransactionAppenderConcurrencyTest.this.channelCommandQueue.put(ChannelCommand.emptyBufferIntoChannelAndClearIt);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return this;
        }

        @Override
        public void flush() throws IOException {
            try {
                BatchingTransactionAppenderConcurrencyTest.this.forceSemaphore.release();
                BatchingTransactionAppenderConcurrencyTest.this.channelCommandQueue.put(ChannelCommand.force);
            }
            catch (InterruptedException e) {
                throw new IOException(e);
            }
        }
    }

    private static enum ChannelCommand {
        emptyBufferIntoChannelAndClearIt,
        force,
        dummy;

    }
}

