/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.coordinator.loading;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.coordination.DataSegmentChangeCallback;
import org.apache.druid.server.coordination.DataSegmentChangeHandler;
import org.apache.druid.server.coordination.DataSegmentChangeRequest;
import org.apache.druid.server.coordination.DataSegmentChangeResponse;
import org.apache.druid.server.coordination.SegmentChangeStatus;
import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.config.HttpLoadQueuePeonConfig;
import org.apache.druid.server.coordinator.loading.HttpLoadQueuePeon;
import org.apache.druid.server.coordinator.loading.LoadPeonCallback;
import org.apache.druid.server.coordinator.loading.SegmentAction;
import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
import org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService;
import org.apache.druid.timeline.DataSegment;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class HttpLoadQueuePeonTest {
    private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper();
    private final List<DataSegment> segments = CreateDataSegments.ofDatasource("test").forIntervals(1, Granularities.DAY).startingAt("2022-01-01").withNumPartitions(4).eachOfSizeInMb(100L);
    private TestHttpClient httpClient;
    private HttpLoadQueuePeon httpLoadQueuePeon;

    @Before
    public void setUp() {
        this.httpClient = new TestHttpClient();
        this.httpLoadQueuePeon = new HttpLoadQueuePeon("http://dummy:4000", MAPPER, (HttpClient)this.httpClient, new HttpLoadQueuePeonConfig(null, null, Integer.valueOf(10)), (ScheduledExecutorService)new WrappingScheduledExecutorService("HttpLoadQueuePeonTest-%s", this.httpClient.processingExecutor, true), (ExecutorService)this.httpClient.callbackExecutor);
        this.httpLoadQueuePeon.start();
    }

    @After
    public void tearDown() {
        this.httpLoadQueuePeon.stop();
    }

    @Test
    public void testSimple() {
        this.httpLoadQueuePeon.dropSegment(this.segments.get(0), this.markSegmentProcessed(this.segments.get(0)));
        this.httpLoadQueuePeon.loadSegment(this.segments.get(1), SegmentAction.LOAD, this.markSegmentProcessed(this.segments.get(1)));
        this.httpLoadQueuePeon.loadSegment(this.segments.get(2), SegmentAction.REPLICATE, this.markSegmentProcessed(this.segments.get(2)));
        this.httpLoadQueuePeon.loadSegment(this.segments.get(3), SegmentAction.MOVE_TO, this.markSegmentProcessed(this.segments.get(3)));
        this.httpClient.sendRequestToServerAndHandleResponse();
        Assert.assertEquals(this.segments, this.httpClient.segmentsSentToServer);
        this.httpClient.executeCallbacks();
        Assert.assertEquals(this.segments, this.httpClient.processedSegments);
    }

    @Test
    public void testLoadDropAfterStop() {
        this.httpLoadQueuePeon.stop();
        HashSet failedSegments = new HashSet();
        DataSegment segment1 = this.segments.get(0);
        this.httpLoadQueuePeon.dropSegment(segment1, success -> {
            if (!success) {
                failedSegments.add(segment1);
            }
        });
        DataSegment segment2 = this.segments.get(1);
        this.httpLoadQueuePeon.loadSegment(segment2, SegmentAction.MOVE_TO, success -> {
            if (!success) {
                failedSegments.add(segment2);
            }
        });
        Assert.assertTrue((boolean)failedSegments.contains(segment1));
        Assert.assertTrue((boolean)failedSegments.contains(segment2));
    }

    @Test
    public void testPriorityOfSegmentAction() {
        ArrayList<DataSegment> segmentsDay1 = new ArrayList<DataSegment>(this.segments);
        Collections.shuffle(segmentsDay1);
        List<QueueAction> actions = Arrays.asList(QueueAction.of((DataSegment)segmentsDay1.get(0), s -> this.httpLoadQueuePeon.dropSegment(s, null)), QueueAction.of((DataSegment)segmentsDay1.get(1), s -> this.httpLoadQueuePeon.loadSegment(s, SegmentAction.LOAD, null)), QueueAction.of((DataSegment)segmentsDay1.get(2), s -> this.httpLoadQueuePeon.loadSegment(s, SegmentAction.REPLICATE, null)), QueueAction.of((DataSegment)segmentsDay1.get(3), s -> this.httpLoadQueuePeon.loadSegment(s, SegmentAction.MOVE_TO, null)));
        Collections.shuffle(actions);
        actions.forEach(QueueAction::invoke);
        this.httpClient.sendRequestToServerAndHandleResponse();
        Assert.assertEquals(segmentsDay1, this.httpClient.segmentsSentToServer);
    }

    @Test
    public void testPriorityOfSegmentInterval() {
        ArrayList<DataSegment> segmentsDay1 = new ArrayList<DataSegment>(this.segments);
        Collections.shuffle(segmentsDay1);
        ArrayList<DataSegment> segmentsDay2 = new ArrayList<DataSegment>(CreateDataSegments.ofDatasource("test").forIntervals(1, Granularities.DAY).startingAt("2022-01-02").withNumPartitions(4).eachOfSizeInMb(100L));
        Collections.shuffle(segmentsDay2);
        List<QueueAction> actions = Arrays.asList(QueueAction.of((DataSegment)segmentsDay2.get(0), s -> this.httpLoadQueuePeon.dropSegment(s, null)), QueueAction.of((DataSegment)segmentsDay1.get(0), s -> this.httpLoadQueuePeon.dropSegment(s, null)), QueueAction.of((DataSegment)segmentsDay2.get(1), s -> this.httpLoadQueuePeon.loadSegment(s, SegmentAction.LOAD, null)), QueueAction.of((DataSegment)segmentsDay1.get(1), s -> this.httpLoadQueuePeon.loadSegment(s, SegmentAction.LOAD, null)), QueueAction.of((DataSegment)segmentsDay2.get(2), s -> this.httpLoadQueuePeon.loadSegment(s, SegmentAction.REPLICATE, null)), QueueAction.of((DataSegment)segmentsDay1.get(2), s -> this.httpLoadQueuePeon.loadSegment(s, SegmentAction.REPLICATE, null)), QueueAction.of((DataSegment)segmentsDay2.get(3), s -> this.httpLoadQueuePeon.loadSegment(s, SegmentAction.MOVE_TO, null)), QueueAction.of((DataSegment)segmentsDay1.get(3), s -> this.httpLoadQueuePeon.loadSegment(s, SegmentAction.MOVE_TO, null)));
        List expectedSegmentOrder = actions.stream().map(a -> a.segment).collect(Collectors.toList());
        Collections.shuffle(actions);
        actions.forEach(QueueAction::invoke);
        this.httpClient.sendRequestToServerAndHandleResponse();
        Assert.assertEquals(expectedSegmentOrder, this.httpClient.segmentsSentToServer);
    }

    @Test
    public void testCancelLoad() {
        DataSegment segment = this.segments.get(0);
        this.httpLoadQueuePeon.loadSegment(segment, SegmentAction.REPLICATE, this.markSegmentProcessed(segment));
        Assert.assertEquals((long)1L, (long)this.httpLoadQueuePeon.getSegmentsToLoad().size());
        boolean cancelled = this.httpLoadQueuePeon.cancelOperation(segment);
        Assert.assertTrue((boolean)cancelled);
        Assert.assertEquals((long)0L, (long)this.httpLoadQueuePeon.getSegmentsToLoad().size());
        Assert.assertTrue((boolean)this.httpClient.processedSegments.isEmpty());
    }

    @Test
    public void testCancelDrop() {
        DataSegment segment = this.segments.get(0);
        this.httpLoadQueuePeon.dropSegment(segment, this.markSegmentProcessed(segment));
        Assert.assertEquals((long)1L, (long)this.httpLoadQueuePeon.getSegmentsToDrop().size());
        boolean cancelled = this.httpLoadQueuePeon.cancelOperation(segment);
        Assert.assertTrue((boolean)cancelled);
        Assert.assertTrue((boolean)this.httpLoadQueuePeon.getSegmentsToDrop().isEmpty());
        Assert.assertTrue((boolean)this.httpClient.processedSegments.isEmpty());
    }

    @Test
    public void testCannotCancelRequestSentToServer() {
        DataSegment segment = this.segments.get(0);
        this.httpLoadQueuePeon.loadSegment(segment, SegmentAction.REPLICATE, this.markSegmentProcessed(segment));
        Assert.assertTrue((boolean)this.httpLoadQueuePeon.getSegmentsToLoad().contains(segment));
        this.httpClient.sendRequestToServer();
        Assert.assertTrue((boolean)this.httpClient.segmentsSentToServer.contains(segment));
        Assert.assertTrue((boolean)this.httpLoadQueuePeon.getSegmentsToLoad().contains(segment));
        boolean cancelled = this.httpLoadQueuePeon.cancelOperation(segment);
        Assert.assertFalse((boolean)cancelled);
        this.httpClient.handleResponseFromServer();
        Assert.assertTrue((boolean)this.httpLoadQueuePeon.getSegmentsToLoad().isEmpty());
        cancelled = this.httpLoadQueuePeon.cancelOperation(segment);
        Assert.assertFalse((boolean)cancelled);
        this.httpClient.executeCallbacks();
        Assert.assertTrue((boolean)this.httpClient.processedSegments.contains(segment));
    }

    @Test
    public void testCannotCancelOperationMultipleTimes() {
        DataSegment segment = this.segments.get(0);
        this.httpLoadQueuePeon.loadSegment(segment, SegmentAction.REPLICATE, this.markSegmentProcessed(segment));
        Assert.assertTrue((boolean)this.httpLoadQueuePeon.getSegmentsToLoad().contains(segment));
        Assert.assertTrue((boolean)this.httpLoadQueuePeon.cancelOperation(segment));
        Assert.assertFalse((boolean)this.httpLoadQueuePeon.cancelOperation(segment));
    }

    @Test
    public void testLoadRateIsZeroWhenNoLoadHasFinishedYet() {
        this.httpLoadQueuePeon.loadSegment(this.segments.get(0), SegmentAction.LOAD, null);
        this.httpClient.sendRequestToServer();
        Assert.assertEquals((long)1L, (long)this.httpLoadQueuePeon.getSegmentsToLoad().size());
        Assert.assertEquals((long)0L, (long)this.httpLoadQueuePeon.getLoadRateKbps());
    }

    @Test
    public void testLoadRateIsUnchangedByDrops() throws InterruptedException {
        long millisTakenToDropSegment = 10L;
        this.httpLoadQueuePeon.dropSegment(this.segments.get(0), null);
        this.httpClient.sendRequestToServer();
        Thread.sleep(10L);
        this.httpClient.handleResponseFromServer();
        Assert.assertEquals((long)0L, (long)this.httpLoadQueuePeon.getLoadRateKbps());
    }

    @Test
    public void testLoadRateIsChangedWhenLoadSucceeds() throws InterruptedException {
        long millisTakenToLoadSegment = 10L;
        this.httpLoadQueuePeon.loadSegment(this.segments.get(0), SegmentAction.LOAD, null);
        this.httpClient.sendRequestToServer();
        Thread.sleep(10L);
        this.httpClient.handleResponseFromServer();
        long expectedRateKbps = 8L * this.segments.get(0).getSize() / 10L;
        long observedRateKbps = this.httpLoadQueuePeon.getLoadRateKbps();
        Assert.assertTrue((observedRateKbps > expectedRateKbps / 2L && observedRateKbps <= expectedRateKbps ? 1 : 0) != 0);
    }

    private LoadPeonCallback markSegmentProcessed(DataSegment segment) {
        return success -> this.httpClient.processedSegments.add(segment);
    }

    private static class QueueAction {
        final DataSegment segment;
        final Consumer<DataSegment> action;

        static QueueAction of(DataSegment segment, Consumer<DataSegment> action) {
            return new QueueAction(segment, action);
        }

        QueueAction(DataSegment segment, Consumer<DataSegment> action) {
            this.segment = segment;
            this.action = action;
        }

        void invoke() {
            this.action.accept(this.segment);
        }
    }

    private static class TestHttpClient
    implements HttpClient,
    DataSegmentChangeHandler {
        final BlockingExecutorService processingExecutor = new BlockingExecutorService("HttpLoadQueuePeonTest-%s");
        final BlockingExecutorService callbackExecutor = new BlockingExecutorService("HttpLoadQueuePeonTest-cb");
        final List<DataSegment> processedSegments = new ArrayList<DataSegment>();
        final List<DataSegment> segmentsSentToServer = new ArrayList<DataSegment>();

        private TestHttpClient() {
        }

        public <Intermediate, Final> ListenableFuture<Final> go(Request request, HttpResponseHandler<Intermediate, Final> httpResponseHandler) {
            throw new UnsupportedOperationException("Not Implemented.");
        }

        public <Intermediate, Final> ListenableFuture<Final> go(Request request, HttpResponseHandler<Intermediate, Final> httpResponseHandler, Duration duration) {
            DefaultHttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
            httpResponse.setContent(ChannelBuffers.buffer((int)0));
            httpResponseHandler.handleResponse((HttpResponse)httpResponse, null);
            try {
                List changeRequests = (List)MAPPER.readValue(request.getContent().array(), (TypeReference)new TypeReference<List<DataSegmentChangeRequest>>(){});
                ArrayList<DataSegmentChangeResponse> statuses = new ArrayList<DataSegmentChangeResponse>(changeRequests.size());
                for (DataSegmentChangeRequest cr : changeRequests) {
                    cr.go((DataSegmentChangeHandler)this, null);
                    statuses.add(new DataSegmentChangeResponse(cr, SegmentChangeStatus.SUCCESS));
                }
                return Futures.immediateFuture((Object)new ByteArrayInputStream(MAPPER.writerWithType(HttpLoadQueuePeon.RESPONSE_ENTITY_TYPE_REF).writeValueAsBytes(statuses)));
            }
            catch (Exception ex) {
                throw new RE((Throwable)ex, "Unexpected exception.", new Object[0]);
            }
        }

        public void addSegment(DataSegment segment, DataSegmentChangeCallback callback) {
            this.segmentsSentToServer.add(segment);
        }

        public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback) {
            this.segmentsSentToServer.add(segment);
        }

        void sendRequestToServerAndHandleResponse() {
            this.sendRequestToServer();
            this.handleResponseFromServer();
        }

        void sendRequestToServer() {
            this.processingExecutor.finishNextPendingTask();
        }

        void handleResponseFromServer() {
            this.processingExecutor.finishAllPendingTasks();
        }

        void executeCallbacks() {
            this.callbackExecutor.finishAllPendingTasks();
        }
    }
}

