/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.seekablestream.supervisor;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Pair;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SeekableStreamSupervisorStateManagerTest {
    private SeekableStreamSupervisorStateManager stateManager;
    private SupervisorStateManagerConfig config;
    private ObjectMapper defaultMapper;

    @Before
    public void setupTest() {
        this.config = new SupervisorStateManagerConfig(10);
        this.stateManager = new SeekableStreamSupervisorStateManager(this.config, false);
        this.defaultMapper = new DefaultObjectMapper();
    }

    @Test
    public void testHappyPath() {
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.PENDING, (Object)this.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.PENDING, (Object)this.stateManager.getSupervisorState().getBasicState());
        this.stateManager.maybeSetState((SupervisorStateManager.State)SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM);
        Assert.assertEquals((Object)SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM, (Object)this.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)this.stateManager.getSupervisorState().getBasicState());
        this.stateManager.maybeSetState((SupervisorStateManager.State)SeekableStreamSupervisorStateManager.SeekableStreamState.DISCOVERING_INITIAL_TASKS);
        Assert.assertEquals((Object)SeekableStreamSupervisorStateManager.SeekableStreamState.DISCOVERING_INITIAL_TASKS, (Object)this.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)this.stateManager.getSupervisorState().getBasicState());
        this.stateManager.maybeSetState((SupervisorStateManager.State)SeekableStreamSupervisorStateManager.SeekableStreamState.CREATING_TASKS);
        Assert.assertEquals((Object)SeekableStreamSupervisorStateManager.SeekableStreamState.CREATING_TASKS, (Object)this.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)this.stateManager.getSupervisorState().getBasicState());
        this.stateManager.markRunFinished();
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)this.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)this.stateManager.getSupervisorState().getBasicState());
        this.stateManager.maybeSetState((SupervisorStateManager.State)SupervisorStateManager.BasicState.PENDING);
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)this.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)this.stateManager.getSupervisorState().getBasicState());
        this.stateManager.maybeSetState((SupervisorStateManager.State)SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM);
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)this.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)this.stateManager.getSupervisorState().getBasicState());
        this.stateManager.maybeSetState((SupervisorStateManager.State)SeekableStreamSupervisorStateManager.SeekableStreamState.DISCOVERING_INITIAL_TASKS);
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)this.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)this.stateManager.getSupervisorState().getBasicState());
        this.stateManager.maybeSetState((SupervisorStateManager.State)SeekableStreamSupervisorStateManager.SeekableStreamState.CREATING_TASKS);
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)this.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)this.stateManager.getSupervisorState().getBasicState());
        this.stateManager.markRunFinished();
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)this.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)this.stateManager.getSupervisorState().getBasicState());
    }

    @Test
    public void testStreamFailureLostContact() {
        this.stateManager.markRunFinished();
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)this.stateManager.getSupervisorState());
        for (int i = 0; i < this.config.getUnhealthinessThreshold(); ++i) {
            Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)this.stateManager.getSupervisorState());
            this.stateManager.recordThrowableEvent((Throwable)new StreamException((Throwable)new IllegalStateException("DOH!")));
            this.stateManager.markRunFinished();
        }
        Assert.assertEquals((Object)SeekableStreamSupervisorStateManager.SeekableStreamState.LOST_CONTACT_WITH_STREAM, (Object)this.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, (Object)this.stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals((long)this.config.getUnhealthinessThreshold(), (long)this.stateManager.getExceptionEvents().size());
        this.stateManager.getExceptionEvents().forEach(x -> {
            Assert.assertTrue((boolean)((SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent)x).isStreamException());
            Assert.assertEquals((Object)IllegalStateException.class.getName(), (Object)x.getExceptionClass());
        });
    }

    @Test
    public void testStreamFailureUnableToConnect() {
        this.stateManager.maybeSetState((SupervisorStateManager.State)SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM);
        for (int i = 0; i < this.config.getUnhealthinessThreshold(); ++i) {
            Assert.assertEquals((Object)SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM, (Object)this.stateManager.getSupervisorState());
            this.stateManager.recordThrowableEvent((Throwable)new StreamException((Throwable)new IllegalStateException("DOH!")));
            this.stateManager.markRunFinished();
        }
        Assert.assertEquals((Object)SeekableStreamSupervisorStateManager.SeekableStreamState.UNABLE_TO_CONNECT_TO_STREAM, (Object)this.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, (Object)this.stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals((long)this.config.getUnhealthinessThreshold(), (long)this.stateManager.getExceptionEvents().size());
        this.stateManager.getExceptionEvents().forEach(x -> {
            Assert.assertTrue((boolean)((SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent)x).isStreamException());
            Assert.assertEquals((Object)IllegalStateException.class.getName(), (Object)x.getExceptionClass());
        });
    }

    @Test
    public void testNonStreamUnhealthiness() {
        this.stateManager.maybeSetState((SupervisorStateManager.State)SeekableStreamSupervisorStateManager.SeekableStreamState.DISCOVERING_INITIAL_TASKS);
        for (int i = 0; i < this.config.getUnhealthinessThreshold(); ++i) {
            Assert.assertEquals((Object)SeekableStreamSupervisorStateManager.SeekableStreamState.DISCOVERING_INITIAL_TASKS, (Object)this.stateManager.getSupervisorState());
            this.stateManager.recordThrowableEvent((Throwable)new NullPointerException("oof"));
            this.stateManager.markRunFinished();
        }
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, (Object)this.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, (Object)this.stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals((long)this.config.getUnhealthinessThreshold(), (long)this.stateManager.getExceptionEvents().size());
        this.stateManager.getExceptionEvents().forEach(x -> {
            Assert.assertFalse((boolean)((SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent)x).isStreamException());
            Assert.assertEquals((Object)NullPointerException.class.getName(), (Object)x.getExceptionClass());
        });
    }

    @Test
    public void testTransientUnhealthiness() {
        this.stateManager.markRunFinished();
        for (int j = 1; j < 3; ++j) {
            for (int i = 0; i < this.config.getUnhealthinessThreshold() - 1; ++i) {
                this.stateManager.recordThrowableEvent((Throwable)new NullPointerException("oof"));
                this.stateManager.markRunFinished();
                Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)this.stateManager.getSupervisorState());
            }
            this.stateManager.markRunFinished();
            Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)this.stateManager.getSupervisorState());
            Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)this.stateManager.getSupervisorState().getBasicState());
            Assert.assertEquals((long)(j * (this.config.getUnhealthinessThreshold() - 1)), (long)this.stateManager.getExceptionEvents().size());
        }
    }

    @Test
    public void testNonTransientTaskUnhealthiness() {
        this.stateManager.markRunFinished();
        for (int i = 0; i < this.config.getTaskUnhealthinessThreshold(); ++i) {
            Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)this.stateManager.getSupervisorState());
            this.stateManager.recordCompletedTaskState(TaskState.FAILED);
            this.stateManager.markRunFinished();
        }
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_TASKS, (Object)this.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_TASKS, (Object)this.stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals((long)0L, (long)this.stateManager.getExceptionEvents().size());
    }

    @Test
    public void testTransientTaskUnhealthiness() {
        this.stateManager.markRunFinished();
        for (int i = 0; i < this.config.getTaskUnhealthinessThreshold() + 3; ++i) {
            Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)this.stateManager.getSupervisorState());
            this.stateManager.recordCompletedTaskState(TaskState.FAILED);
            this.stateManager.recordCompletedTaskState(TaskState.SUCCESS);
            this.stateManager.markRunFinished();
        }
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)this.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)this.stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals((long)0L, (long)this.stateManager.getExceptionEvents().size());
    }

    @Test
    public void testSupervisorRecoveryWithHealthinessThreshold() {
        int i;
        for (i = 0; i < this.config.getUnhealthinessThreshold(); ++i) {
            Assert.assertEquals((Object)SupervisorStateManager.BasicState.PENDING, (Object)this.stateManager.getSupervisorState());
            this.stateManager.recordThrowableEvent((Throwable)new Exception("Except the inevitable"));
            this.stateManager.markRunFinished();
        }
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, (Object)this.stateManager.getSupervisorState());
        for (i = 0; i < this.config.getHealthinessThreshold(); ++i) {
            Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, (Object)this.stateManager.getSupervisorState());
            this.stateManager.markRunFinished();
        }
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)this.stateManager.getSupervisorState());
        Assert.assertEquals((long)this.config.getUnhealthinessThreshold(), (long)this.stateManager.getExceptionEvents().size());
        this.stateManager.getExceptionEvents().forEach(x -> {
            Assert.assertFalse((boolean)((SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent)x).isStreamException());
            Assert.assertEquals((Object)Exception.class.getName(), (Object)x.getExceptionClass());
        });
    }

    @Test
    public void testTaskRecoveryWithHealthinessThreshold() {
        int i;
        this.stateManager.markRunFinished();
        for (i = 0; i < this.config.getTaskUnhealthinessThreshold(); ++i) {
            Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)this.stateManager.getSupervisorState());
            this.stateManager.recordCompletedTaskState(TaskState.FAILED);
            this.stateManager.markRunFinished();
        }
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_TASKS, (Object)this.stateManager.getSupervisorState());
        for (i = 0; i < this.config.getTaskHealthinessThreshold(); ++i) {
            Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_TASKS, (Object)this.stateManager.getSupervisorState());
            this.stateManager.recordCompletedTaskState(TaskState.SUCCESS);
            this.stateManager.markRunFinished();
        }
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)this.stateManager.getSupervisorState());
    }

    @Test
    public void testTwoUnhealthyStates() {
        this.stateManager.markRunFinished();
        for (int i = 0; i < Math.max(this.config.getTaskUnhealthinessThreshold(), this.config.getUnhealthinessThreshold()); ++i) {
            this.stateManager.recordThrowableEvent((Throwable)new NullPointerException("somebody goofed"));
            this.stateManager.recordCompletedTaskState(TaskState.FAILED);
            this.stateManager.markRunFinished();
        }
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, (Object)this.stateManager.getSupervisorState());
    }

    @Test
    public void testGetThrowableEvents() {
        ImmutableList exceptions = ImmutableList.of((Object)new StreamException((Throwable)new UnsupportedOperationException("oof")), (Object)new NullPointerException("oof"), (Object)new RuntimeException((Throwable)new StreamException((Throwable)new Exception("oof"))), (Object)new RuntimeException(new IllegalArgumentException("oof")));
        for (Exception exception : exceptions) {
            this.stateManager.recordThrowableEvent((Throwable)exception);
            this.stateManager.markRunFinished();
        }
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, (Object)this.stateManager.getSupervisorState());
        ImmutableList expected = ImmutableList.of((Object)Pair.of((Object)"java.lang.UnsupportedOperationException", (Object)true), (Object)Pair.of((Object)"java.lang.NullPointerException", (Object)false), (Object)Pair.of((Object)"java.lang.Exception", (Object)true), (Object)Pair.of((Object)"java.lang.IllegalArgumentException", (Object)false));
        Iterator it = this.stateManager.getExceptionEvents().iterator();
        expected.forEach(x -> {
            SupervisorStateManager.ExceptionEvent event = (SupervisorStateManager.ExceptionEvent)it.next();
            Assert.assertNotNull((Object)event.getMessage());
            Assert.assertEquals((Object)x.lhs, (Object)event.getExceptionClass());
            Assert.assertEquals((Object)x.rhs, (Object)((SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent)event).isStreamException());
        });
        Assert.assertFalse((boolean)it.hasNext());
    }

    @Test
    public void testExceptionEventSerde() throws IOException {
        SupervisorStateManager.ExceptionEvent event = new SupervisorStateManager.ExceptionEvent((Throwable)new NullPointerException("msg"), true);
        String serialized = this.defaultMapper.writeValueAsString((Object)event);
        Map deserialized = (Map)this.defaultMapper.readValue(serialized, (TypeReference)new TypeReference<Map<String, String>>(){});
        Assert.assertNotNull(deserialized.get("timestamp"));
        Assert.assertEquals((Object)"java.lang.NullPointerException", deserialized.get("exceptionClass"));
        Assert.assertFalse((boolean)Boolean.getBoolean((String)deserialized.get("streamException")));
        Assert.assertNotNull(deserialized.get("message"));
    }
}

