/*
 * Decompiled with CFR 0.152.
 */
package org.apache.zeppelin.interpreter.remote;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Appender;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.zeppelin.interpreter.remote.AppendOutputRunner;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class AppendOutputRunnerTest {
    private static final int NUM_EVENTS = 10000;
    private static final int NUM_CLUBBED_EVENTS = 100;
    private static final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
    private static ScheduledFuture<?> future = null;
    private static volatile int numInvocations = 0;

    @AfterEach
    public void afterEach() {
        if (future != null) {
            future.cancel(true);
        }
    }

    @Test
    public void testSingleEvent() throws InterruptedException {
        RemoteInterpreterProcessListener listener = (RemoteInterpreterProcessListener)Mockito.mock(RemoteInterpreterProcessListener.class);
        String[][] buffer = new String[][]{{"note", "para", "data\n"}};
        this.loopForCompletingEvents(listener, 1, buffer);
        ((RemoteInterpreterProcessListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).onOutputAppend((String)ArgumentMatchers.any(String.class), (String)ArgumentMatchers.any(String.class), ArgumentMatchers.anyInt(), (String)ArgumentMatchers.any(String.class));
        ((RemoteInterpreterProcessListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).onOutputAppend("note", "para", 0, "data\n");
    }

    @Test
    public void testMultipleEventsOfSameParagraph() throws InterruptedException {
        RemoteInterpreterProcessListener listener = (RemoteInterpreterProcessListener)Mockito.mock(RemoteInterpreterProcessListener.class);
        String note1 = "note1";
        String para1 = "para1";
        String[][] buffer = new String[][]{{note1, para1, "data1\n"}, {note1, para1, "data2\n"}, {note1, para1, "data3\n"}};
        this.loopForCompletingEvents(listener, 1, buffer);
        ((RemoteInterpreterProcessListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).onOutputAppend((String)ArgumentMatchers.any(String.class), (String)ArgumentMatchers.any(String.class), ArgumentMatchers.anyInt(), (String)ArgumentMatchers.any(String.class));
        ((RemoteInterpreterProcessListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).onOutputAppend(note1, para1, 0, "data1\ndata2\ndata3\n");
    }

    @Test
    public void testMultipleEventsOfDifferentParagraphs() throws InterruptedException {
        RemoteInterpreterProcessListener listener = (RemoteInterpreterProcessListener)Mockito.mock(RemoteInterpreterProcessListener.class);
        String note1 = "note1";
        String note2 = "note2";
        String para1 = "para1";
        String para2 = "para2";
        String[][] buffer = new String[][]{{note1, para1, "data1\n"}, {note1, para2, "data2\n"}, {note2, para1, "data3\n"}, {note2, para2, "data4\n"}};
        this.loopForCompletingEvents(listener, 4, buffer);
        ((RemoteInterpreterProcessListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)4))).onOutputAppend((String)ArgumentMatchers.any(String.class), (String)ArgumentMatchers.any(String.class), ArgumentMatchers.anyInt(), (String)ArgumentMatchers.any(String.class));
        ((RemoteInterpreterProcessListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).onOutputAppend(note1, para1, 0, "data1\n");
        ((RemoteInterpreterProcessListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).onOutputAppend(note1, para2, 0, "data2\n");
        ((RemoteInterpreterProcessListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).onOutputAppend(note2, para1, 0, "data3\n");
        ((RemoteInterpreterProcessListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).onOutputAppend(note2, para2, 0, "data4\n");
    }

    @Test
    public void testClubbedData() throws InterruptedException {
        RemoteInterpreterProcessListener listener = (RemoteInterpreterProcessListener)Mockito.mock(RemoteInterpreterProcessListener.class);
        AppendOutputRunner runner = new AppendOutputRunner(listener);
        future = service.scheduleWithFixedDelay((Runnable)runner, 0L, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
        Thread thread = new Thread(new BombardEvents(runner));
        thread.start();
        thread.join();
        Thread.sleep(1000L);
        ((RemoteInterpreterProcessListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.atMost((int)100))).onOutputAppend((String)ArgumentMatchers.any(String.class), (String)ArgumentMatchers.any(String.class), ArgumentMatchers.anyInt(), (String)ArgumentMatchers.any(String.class));
    }

    @Test
    public void testWarnLoggerForLargeData() throws InterruptedException {
        int warnLogCounter;
        RemoteInterpreterProcessListener listener = (RemoteInterpreterProcessListener)Mockito.mock(RemoteInterpreterProcessListener.class);
        AppendOutputRunner runner = new AppendOutputRunner(listener);
        String data = "data\n";
        int numEvents = 100000;
        for (int i = 0; i < numEvents; ++i) {
            runner.appendBuffer("noteId", "paraId", 0, data);
        }
        TestAppender appender = new TestAppender();
        Logger logger = Logger.getRootLogger();
        logger.addAppender((Appender)appender);
        runner.run();
        LoggingEvent sizeWarnLogEntry = null;
        do {
            warnLogCounter = 0;
            List<LoggingEvent> log = appender.getLog();
            for (LoggingEvent logEntry : log) {
                if (!Level.WARN.equals((Object)logEntry.getLevel())) continue;
                sizeWarnLogEntry = logEntry;
                ++warnLogCounter;
            }
        } while (warnLogCounter != 2);
        String loggerString = "Processing size for buffered append-output is high: " + data.length() * numEvents + " characters.";
        Assertions.assertEquals((Object)loggerString, (Object)sizeWarnLogEntry.getMessage());
    }

    private void prepareInvocationCounts(RemoteInterpreterProcessListener listener) {
        ((RemoteInterpreterProcessListener)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Throwable {
                numInvocations += 1;
                return null;
            }
        }).when((Object)listener)).onOutputAppend((String)ArgumentMatchers.any(String.class), (String)ArgumentMatchers.any(String.class), ArgumentMatchers.anyInt(), (String)ArgumentMatchers.any(String.class));
    }

    private void loopForCompletingEvents(RemoteInterpreterProcessListener listener, int numTimes, String[][] buffer) {
        numInvocations = 0;
        this.prepareInvocationCounts(listener);
        AppendOutputRunner runner = new AppendOutputRunner(listener);
        for (String[] bufferElement : buffer) {
            runner.appendBuffer(bufferElement[0], bufferElement[1], 0, bufferElement[2]);
        }
        future = service.scheduleWithFixedDelay((Runnable)runner, 0L, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
        long startTimeMs = System.currentTimeMillis();
        while (numInvocations != numTimes) {
            if (System.currentTimeMillis() - startTimeMs <= 2000L) continue;
            Assertions.fail((String)"Buffered events were not sent for 2 seconds");
        }
    }

    private class TestAppender
    extends AppenderSkeleton {
        private final List<LoggingEvent> log = new ArrayList<LoggingEvent>();

        private TestAppender() {
        }

        public boolean requiresLayout() {
            return false;
        }

        protected void append(LoggingEvent loggingEvent) {
            this.log.add(loggingEvent);
        }

        public void close() {
        }

        public List<LoggingEvent> getLog() {
            return new ArrayList<LoggingEvent>(this.log);
        }
    }

    private class BombardEvents
    implements Runnable {
        private final AppendOutputRunner runner;

        private BombardEvents(AppendOutputRunner runner) {
            this.runner = runner;
        }

        @Override
        public void run() {
            String noteId = "noteId";
            String paraId = "paraId";
            for (int i = 0; i < 10000; ++i) {
                this.runner.appendBuffer(noteId, paraId, 0, "data\n");
            }
        }
    }
}

