/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.worker;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Joiner;
import java.util.List;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CompressionProvider;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.curator.PotentiallyGzippedCompressionProvider;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexing.common.IndexingServiceCondition;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskReportFileWriter;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestRealtimeTask;
import org.apache.druid.indexing.common.TestTasks;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TestRemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.indexing.worker.TaskAnnouncement;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.WorkerCuratorCoordinator;
import org.apache.druid.indexing.worker.WorkerTaskMonitor;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9Factory;
import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.zookeeper.data.Stat;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class WorkerTaskMonitorTest {
    private static final Joiner JOINER = Joiner.on((String)"/");
    private static final String BASE_PATH = "/test/druid";
    private static final String TASKS_PATH = StringUtils.format((String)"%s/indexer/tasks/worker", (Object[])new Object[]{"/test/druid"});
    private static final String STATUS_PATH = StringUtils.format((String)"%s/indexer/status/worker", (Object[])new Object[]{"/test/druid"});
    private static final DruidNode DUMMY_NODE = new DruidNode("dummy", "dummy", false, Integer.valueOf(9000), null, true, false);
    private TestingCluster testingCluster;
    private CuratorFramework cf;
    private WorkerCuratorCoordinator workerCuratorCoordinator;
    private WorkerTaskMonitor workerTaskMonitor;
    private Task task;
    private Worker worker;
    private final TestUtils testUtils = new TestUtils();
    private ObjectMapper jsonMapper = this.testUtils.getTestObjectMapper();
    private IndexMergerV9Factory indexMergerV9Factory = this.testUtils.getIndexMergerV9Factory();
    private IndexIO indexIO = this.testUtils.getTestIndexIO();

    @Before
    public void setUp() throws Exception {
        this.testingCluster = new TestingCluster(1);
        this.testingCluster.start();
        this.cf = CuratorFrameworkFactory.builder().connectString(this.testingCluster.getConnectString()).retryPolicy((RetryPolicy)new ExponentialBackoffRetry(1, 10)).compressionProvider((CompressionProvider)new PotentiallyGzippedCompressionProvider(false)).build();
        this.cf.start();
        this.cf.blockUntilConnected();
        this.cf.create().creatingParentsIfNeeded().forPath(BASE_PATH);
        this.worker = new Worker("http", "worker", "localhost", 3, "0", "_default_worker_category");
        this.workerCuratorCoordinator = new WorkerCuratorCoordinator(this.jsonMapper, new IndexerZkConfig(new ZkPathsConfig(){

            public String getBase() {
                return WorkerTaskMonitorTest.BASE_PATH;
            }
        }, null, null, null, null), (RemoteTaskRunnerConfig)new TestRemoteTaskRunnerConfig(new Period((Object)"PT1S")), this.cf, this.worker);
        this.workerCuratorCoordinator.start();
        this.workerTaskMonitor = this.createTaskMonitor();
        TestTasks.registerSubtypes(this.jsonMapper);
        this.jsonMapper.registerSubtypes(new NamedType[]{new NamedType(TestRealtimeTask.class, "test_realtime")});
        this.workerTaskMonitor.start();
        this.task = TestTasks.immediateSuccess("test");
    }

    private WorkerTaskMonitor createTaskMonitor() {
        TaskConfig taskConfig = new TaskConfig(FileUtils.createTempDir().toString(), null, null, Integer.valueOf(0), null, false, null, null, null, false, false, TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(), null, false);
        TaskActionClientFactory taskActionClientFactory = (TaskActionClientFactory)EasyMock.createNiceMock(TaskActionClientFactory.class);
        TaskActionClient taskActionClient = (TaskActionClient)EasyMock.createNiceMock(TaskActionClient.class);
        EasyMock.expect((Object)taskActionClientFactory.create((Task)EasyMock.anyObject())).andReturn((Object)taskActionClient).anyTimes();
        SegmentHandoffNotifierFactory notifierFactory = (SegmentHandoffNotifierFactory)EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
        EasyMock.replay((Object[])new Object[]{taskActionClientFactory, taskActionClient, notifierFactory});
        return new WorkerTaskMonitor(this.jsonMapper, (TaskRunner)new SingleTaskBackgroundRunner(new TaskToolboxFactory(taskConfig, null, taskActionClientFactory, null, null, null, null, null, null, null, notifierFactory, null, null, (JoinableFactory)NoopJoinableFactory.INSTANCE, null, new SegmentCacheManagerFactory(this.jsonMapper), this.jsonMapper, this.indexIO, null, null, null, this.indexMergerV9Factory, null, null, null, null, (TaskReportFileWriter)new NoopTestTaskReportFileWriter(), null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, (ChatHandlerProvider)new NoopChatHandlerProvider(), this.testUtils.getRowIngestionMetersFactory(), (AppenderatorsManager)new TestAppenderatorsManager(), (OverlordClient)new NoopOverlordClient(), null, null, null, null, "1"), taskConfig, (ServiceEmitter)new NoopServiceEmitter(), DUMMY_NODE, new ServerConfig()), taskConfig, this.cf, this.workerCuratorCoordinator, (DruidLeaderClient)EasyMock.createNiceMock(DruidLeaderClient.class));
    }

    @After
    public void tearDown() throws Exception {
        this.workerCuratorCoordinator.stop();
        this.workerTaskMonitor.stop();
        this.cf.close();
        this.testingCluster.stop();
    }

    @Test(timeout=60000L)
    public void testRunTask() throws Exception {
        Assert.assertTrue((boolean)TestUtils.conditionValid(new IndexingServiceCondition(){

            @Override
            public boolean isValid() {
                try {
                    return WorkerTaskMonitorTest.this.cf.checkExists().forPath(JOINER.join((Object)TASKS_PATH, (Object)WorkerTaskMonitorTest.this.task.getId(), new Object[0])) == null;
                }
                catch (Exception e) {
                    return false;
                }
            }
        }));
        this.cf.create().creatingParentsIfNeeded().forPath(JOINER.join((Object)TASKS_PATH, (Object)this.task.getId(), new Object[0]), this.jsonMapper.writeValueAsBytes((Object)this.task));
        Assert.assertTrue((boolean)TestUtils.conditionValid(new IndexingServiceCondition(){

            @Override
            public boolean isValid() {
                try {
                    byte[] bytes = (byte[])WorkerTaskMonitorTest.this.cf.getData().forPath(JOINER.join((Object)STATUS_PATH, (Object)WorkerTaskMonitorTest.this.task.getId(), new Object[0]));
                    TaskAnnouncement announcement = (TaskAnnouncement)WorkerTaskMonitorTest.this.jsonMapper.readValue(bytes, TaskAnnouncement.class);
                    return announcement.getTaskStatus().isComplete();
                }
                catch (Exception e) {
                    return false;
                }
            }
        }));
        TaskAnnouncement taskAnnouncement = (TaskAnnouncement)this.jsonMapper.readValue((byte[])this.cf.getData().forPath(JOINER.join((Object)STATUS_PATH, (Object)this.task.getId(), new Object[0])), TaskAnnouncement.class);
        Assert.assertEquals((Object)this.task.getId(), (Object)taskAnnouncement.getTaskStatus().getId());
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)taskAnnouncement.getTaskStatus().getStatusCode());
    }

    @Test(timeout=60000L)
    public void testGetAnnouncements() throws Exception {
        this.cf.create().creatingParentsIfNeeded().forPath(JOINER.join((Object)TASKS_PATH, (Object)this.task.getId(), new Object[0]), this.jsonMapper.writeValueAsBytes((Object)this.task));
        Assert.assertTrue((boolean)TestUtils.conditionValid(new IndexingServiceCondition(){

            @Override
            public boolean isValid() {
                try {
                    byte[] bytes = (byte[])WorkerTaskMonitorTest.this.cf.getData().forPath(JOINER.join((Object)STATUS_PATH, (Object)WorkerTaskMonitorTest.this.task.getId(), new Object[0]));
                    TaskAnnouncement announcement = (TaskAnnouncement)WorkerTaskMonitorTest.this.jsonMapper.readValue(bytes, TaskAnnouncement.class);
                    return announcement.getTaskStatus().isComplete();
                }
                catch (Exception e) {
                    return false;
                }
            }
        }));
        List announcements = this.workerCuratorCoordinator.getAnnouncements();
        Assert.assertEquals((long)1L, (long)announcements.size());
        Assert.assertEquals((Object)this.task.getId(), (Object)((TaskAnnouncement)announcements.get(0)).getTaskStatus().getId());
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)((TaskAnnouncement)announcements.get(0)).getTaskStatus().getStatusCode());
        Assert.assertEquals((Object)DUMMY_NODE.getHost(), (Object)((TaskAnnouncement)announcements.get(0)).getTaskLocation().getHost());
        Assert.assertEquals((long)DUMMY_NODE.getPlaintextPort(), (long)((TaskAnnouncement)announcements.get(0)).getTaskLocation().getPort());
    }

    @Test(timeout=60000L)
    public void testRestartCleansOldStatus() throws Exception {
        this.task = TestTasks.unending("test");
        this.cf.create().creatingParentsIfNeeded().forPath(JOINER.join((Object)TASKS_PATH, (Object)this.task.getId(), new Object[0]), this.jsonMapper.writeValueAsBytes((Object)this.task));
        Assert.assertTrue((boolean)TestUtils.conditionValid(new IndexingServiceCondition(){

            @Override
            public boolean isValid() {
                try {
                    return WorkerTaskMonitorTest.this.cf.checkExists().forPath(JOINER.join((Object)STATUS_PATH, (Object)WorkerTaskMonitorTest.this.task.getId(), new Object[0])) != null;
                }
                catch (Exception e) {
                    return false;
                }
            }
        }));
        this.workerTaskMonitor.stop();
        this.workerTaskMonitor = this.createTaskMonitor();
        this.workerTaskMonitor.start();
        List announcements = this.workerCuratorCoordinator.getAnnouncements();
        Assert.assertEquals((long)1L, (long)announcements.size());
        Assert.assertEquals((Object)this.task.getId(), (Object)((TaskAnnouncement)announcements.get(0)).getTaskStatus().getId());
        Assert.assertEquals((Object)TaskState.FAILED, (Object)((TaskAnnouncement)announcements.get(0)).getTaskStatus().getStatusCode());
        Assert.assertEquals((Object)"Canceled as unknown task. See middleManager or indexer logs for more details.", (Object)((TaskAnnouncement)announcements.get(0)).getTaskStatus().getErrorMsg());
    }

    @Test(timeout=60000L)
    public void testStatusAnnouncementsArePersistent() throws Exception {
        this.cf.create().creatingParentsIfNeeded().forPath(JOINER.join((Object)TASKS_PATH, (Object)this.task.getId(), new Object[0]), this.jsonMapper.writeValueAsBytes((Object)this.task));
        Assert.assertTrue((boolean)TestUtils.conditionValid(new IndexingServiceCondition(){

            @Override
            public boolean isValid() {
                try {
                    return WorkerTaskMonitorTest.this.cf.checkExists().forPath(JOINER.join((Object)STATUS_PATH, (Object)WorkerTaskMonitorTest.this.task.getId(), new Object[0])) != null;
                }
                catch (Exception e) {
                    return false;
                }
            }
        }));
        Assert.assertEquals((long)0L, (long)((Stat)this.cf.checkExists().forPath(JOINER.join((Object)STATUS_PATH, (Object)this.task.getId(), new Object[0]))).getEphemeralOwner());
    }
}

