/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.common.actions;

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.actions.SegmentAllocateActionBuilder;
import org.apache.druid.indexing.common.actions.SegmentAllocateRequest;
import org.apache.druid.indexing.common.actions.SegmentAllocationQueue;
import org.apache.druid.indexing.common.actions.TaskActionTestKit;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
import org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class SegmentAllocationQueueTest {
    @Rule
    public TaskActionTestKit taskActionTestKit = new TaskActionTestKit();
    private static final String DS_WIKI = "wiki";
    private static final String DS_KOALA = "koala";
    private SegmentAllocationQueue allocationQueue;
    private StubServiceEmitter emitter;
    private BlockingExecutorService executor;

    @Before
    public void setUp() {
        this.executor = new BlockingExecutorService("alloc-test-exec");
        this.emitter = new StubServiceEmitter("overlord", "alloc-test");
        TaskLockConfig lockConfig = new TaskLockConfig(){

            public boolean isBatchSegmentAllocation() {
                return true;
            }

            public long getBatchAllocationWaitTime() {
                return 0L;
            }
        };
        this.allocationQueue = new SegmentAllocationQueue(this.taskActionTestKit.getTaskLockbox(), lockConfig, this.taskActionTestKit.getMetadataStorageCoordinator(), (ServiceEmitter)this.emitter, (corePoolSize, nameFormat) -> new WrappingScheduledExecutorService(nameFormat, (ExecutorService)this.executor, false));
        this.allocationQueue.start();
        this.allocationQueue.becomeLeader();
    }

    @After
    public void tearDown() {
        if (this.allocationQueue != null) {
            this.allocationQueue.stop();
        }
        if (this.executor != null) {
            this.executor.shutdownNow();
        }
        this.emitter.flush();
    }

    @Test
    public void testBatchWithMultipleTimestamps() {
        this.verifyAllocationWithBatching(this.allocateRequest().forTask(this.createTask(DS_WIKI, "group_1")).forTimestamp("2022-01-01T01:00:00").withSegmentGranularity(Granularities.DAY).withQueryGranularity(Granularities.SECOND).withLockGranularity(LockGranularity.TIME_CHUNK).withSequenceName("seq_1").build(), this.allocateRequest().forTask(this.createTask(DS_WIKI, "group_1")).forTimestamp("2022-01-01T02:00:00").withSegmentGranularity(Granularities.DAY).withQueryGranularity(Granularities.SECOND).withLockGranularity(LockGranularity.TIME_CHUNK).withSequenceName("seq_2").build(), true);
    }

    @Test
    public void testBatchWithExclusiveLocks() {
        this.verifyAllocationWithBatching(this.allocateRequest().forTask(this.createTask(DS_WIKI, "group_1")).withTaskLockType(TaskLockType.EXCLUSIVE).build(), this.allocateRequest().forTask(this.createTask(DS_WIKI, "group_1")).withTaskLockType(TaskLockType.EXCLUSIVE).build(), true);
    }

    @Test
    public void testBatchWithSharedLocks() {
        this.verifyAllocationWithBatching(this.allocateRequest().forTask(this.createTask(DS_WIKI, "group_1")).withTaskLockType(TaskLockType.SHARED).build(), this.allocateRequest().forTask(this.createTask(DS_WIKI, "group_1")).withTaskLockType(TaskLockType.SHARED).build(), true);
    }

    @Test
    public void testBatchWithMultipleQueryGranularities() {
        this.verifyAllocationWithBatching(this.allocateRequest().forTask(this.createTask(DS_WIKI, "group_1")).withQueryGranularity(Granularities.SECOND).build(), this.allocateRequest().forTask(this.createTask(DS_WIKI, "group_1")).withQueryGranularity(Granularities.MINUTE).build(), true);
    }

    @Test
    public void testMultipleDatasourcesCannotBatch() {
        this.verifyAllocationWithBatching(this.allocateRequest().forTask(this.createTask(DS_WIKI, "group_1")).build(), this.allocateRequest().forTask(this.createTask(DS_KOALA, "group_1")).build(), false);
    }

    @Test
    public void testMultipleGroupIdsCannotBatch() {
        this.verifyAllocationWithBatching(this.allocateRequest().forTask(this.createTask(DS_WIKI, "group_2")).build(), this.allocateRequest().forTask(this.createTask(DS_WIKI, "group_3")).build(), false);
    }

    @Test
    public void testMultipleLockGranularitiesCannotBatch() {
        this.verifyAllocationWithBatching(this.allocateRequest().forTask(this.createTask(DS_WIKI, "group_1")).withLockGranularity(LockGranularity.TIME_CHUNK).build(), this.allocateRequest().forTask(this.createTask(DS_WIKI, "group_1")).withLockGranularity(LockGranularity.SEGMENT).build(), false);
    }

    @Test
    public void testMultipleAllocateIntervalsCannotBatch() {
        this.verifyAllocationWithBatching(this.allocateRequest().forTask(this.createTask(DS_WIKI, "group_1")).forTimestamp("2022-01-01").withSegmentGranularity(Granularities.DAY).build(), this.allocateRequest().forTask(this.createTask(DS_WIKI, "group_1")).forTimestamp("2022-01-02").withSegmentGranularity(Granularities.DAY).build(), false);
    }

    @Test
    public void testConflictingPendingSegment() {
        SegmentAllocateRequest hourSegmentRequest = this.allocateRequest().forTask(this.createTask(DS_WIKI, "group_1")).withSegmentGranularity(Granularities.HOUR).build();
        Future hourSegmentFuture = this.allocationQueue.add(hourSegmentRequest);
        SegmentAllocateRequest halfHourSegmentRequest = this.allocateRequest().forTask(this.createTask(DS_WIKI, "group_1")).withSegmentGranularity(Granularities.THIRTY_MINUTE).build();
        Future halfHourSegmentFuture = this.allocationQueue.add(halfHourSegmentRequest);
        this.executor.finishNextPendingTask();
        Assert.assertNotNull((Object)this.getSegmentId(hourSegmentFuture));
        Throwable t = Assert.assertThrows(ISE.class, () -> this.getSegmentId(halfHourSegmentFuture));
        Assert.assertEquals((Object)"Storage coordinator could not allocate segment.", (Object)t.getMessage());
    }

    @Test
    public void testFullAllocationQueue() {
        for (int i = 0; i < 2000; ++i) {
            SegmentAllocateRequest request = this.allocateRequest().forTask(this.createTask(DS_WIKI, "group_" + i)).build();
            this.allocationQueue.add(request);
        }
        SegmentAllocateRequest request = this.allocateRequest().forTask(this.createTask(DS_WIKI, "next_group")).build();
        Future future = this.allocationQueue.add(request);
        Throwable t = Assert.assertThrows(ISE.class, () -> this.getSegmentId(future));
        Assert.assertEquals((Object)"Segment allocation queue is full. Check the metric `task/action/batch/runTime` to determine if metadata operations are slow.", (Object)t.getMessage());
    }

    @Test
    public void testMaxBatchSize() {
        for (int i = 0; i < 500; ++i) {
            SegmentAllocateRequest request = this.allocateRequest().forTask(this.createTask(DS_WIKI, "group_1")).build();
            this.allocationQueue.add(request);
        }
        Assert.assertEquals((long)1L, (long)this.allocationQueue.size());
        SegmentAllocateRequest request = this.allocateRequest().forTask(this.createTask(DS_WIKI, "group_1")).build();
        this.allocationQueue.add(request);
        Assert.assertEquals((long)2L, (long)this.allocationQueue.size());
    }

    @Test
    public void testMultipleRequestsForSameSegment() {
        ArrayList<Future> segmentFutures = new ArrayList<Future>();
        for (int i = 0; i < 10; ++i) {
            SegmentAllocateRequest request = this.allocateRequest().forTask(this.createTask(DS_WIKI, "group_" + i)).withSequenceName("sequence_1").withPreviousSegmentId("segment_1").build();
            segmentFutures.add(this.allocationQueue.add(request));
        }
        this.executor.finishNextPendingTask();
        SegmentIdWithShardSpec segmentId1 = this.getSegmentId((Future)segmentFutures.get(0));
        for (Future future : segmentFutures) {
            Assert.assertEquals((Object)this.getSegmentId(future), (Object)segmentId1);
        }
    }

    @Test
    public void testMaxWaitTime() {
    }

    @Test
    public void testRequestsFailOnLeaderChange() {
        ArrayList<Future> segmentFutures = new ArrayList<Future>();
        for (int i = 0; i < 10; ++i) {
            SegmentAllocateRequest request = this.allocateRequest().forTask(this.createTask(DS_WIKI, "group_" + i)).build();
            segmentFutures.add(this.allocationQueue.add(request));
        }
        this.allocationQueue.stopBeingLeader();
        this.executor.finishNextPendingTask();
        for (Future future : segmentFutures) {
            Throwable t = Assert.assertThrows(ISE.class, () -> this.getSegmentId(future));
            Assert.assertEquals((Object)"Cannot allocate segment if not leader", (Object)t.getMessage());
        }
    }

    private void verifyAllocationWithBatching(SegmentAllocateRequest a, SegmentAllocateRequest b, boolean canBatch) {
        Assert.assertEquals((long)0L, (long)this.allocationQueue.size());
        Future futureA = this.allocationQueue.add(a);
        Future futureB = this.allocationQueue.add(b);
        int expectedCount = canBatch ? 1 : 2;
        Assert.assertEquals((long)expectedCount, (long)this.allocationQueue.size());
        this.executor.finishNextPendingTask();
        this.emitter.verifyEmitted("task/action/batch/size", expectedCount);
        Assert.assertNotNull((Object)this.getSegmentId(futureA));
        Assert.assertNotNull((Object)this.getSegmentId(futureB));
    }

    private SegmentIdWithShardSpec getSegmentId(Future<SegmentIdWithShardSpec> future) {
        try {
            return future.get(5L, TimeUnit.SECONDS);
        }
        catch (ExecutionException e) {
            throw new ISE(e.getCause().getMessage(), new Object[0]);
        }
        catch (InterruptedException | TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

    private SegmentAllocateActionBuilder allocateRequest() {
        return new SegmentAllocateActionBuilder().forDatasource(DS_WIKI).forTimestamp("2022-01-01").withLockGranularity(LockGranularity.TIME_CHUNK).withTaskLockType(TaskLockType.SHARED).withQueryGranularity(Granularities.SECOND).withSegmentGranularity(Granularities.HOUR);
    }

    private Task createTask(String datasource, String groupId) {
        NoopTask task = new NoopTask(null, groupId, datasource, 0L, 0L, null, null, null);
        this.taskActionTestKit.getTaskLockbox().add((Task)task);
        return task;
    }
}

