/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.coordinator.rules;

import java.util.List;
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
import org.apache.druid.server.coordinator.balancer.RandomBalancerStrategy;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.server.coordinator.loading.SegmentAction;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.server.coordinator.rules.SegmentActionHandler;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class BroadcastDistributionRuleTest {
    private int serverId = 0;
    private static final String TIER_1 = "tier1";
    private static final String TIER_2 = "tier2";
    private final DataSegment wikiSegment = CreateDataSegments.ofDatasource("wiki").eachOfSizeInMb(100L).get(0);

    @Before
    public void setUp() {
        this.serverId = 0;
    }

    @Test
    public void testSegmentIsBroadcastToAllTiers() {
        ServerHolder serverT11 = this.create10gbHistorical(TIER_1, new DataSegment[0]);
        ServerHolder serverT21 = this.create10gbHistorical(TIER_2, new DataSegment[0]);
        DruidCluster cluster = DruidCluster.builder().add(serverT11).add(serverT21).build();
        DruidCoordinatorRuntimeParams params = this.makeParamsWithUsedSegments(cluster, this.wikiSegment);
        ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule();
        CoordinatorRunStats stats = this.runRuleOnSegment((Rule)rule, this.wikiSegment, params);
        Assert.assertEquals((long)1L, (long)stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, "wiki"));
        Assert.assertTrue((boolean)serverT11.isLoadingSegment(this.wikiSegment));
        Assert.assertEquals((long)1L, (long)stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_2, "wiki"));
        Assert.assertTrue((boolean)serverT21.isLoadingSegment(this.wikiSegment));
    }

    @Test
    public void testSegmentIsNotBroadcastToServerIfAlreadyLoaded() {
        ServerHolder serverT11 = this.create10gbHistorical(TIER_1, this.wikiSegment);
        ServerHolder serverT12 = this.create10gbHistorical(TIER_1, new DataSegment[0]);
        DruidCluster cluster = DruidCluster.builder().add(serverT11).add(serverT12).build();
        DruidCoordinatorRuntimeParams params = this.makeParamsWithUsedSegments(cluster, this.wikiSegment);
        ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule();
        CoordinatorRunStats stats = this.runRuleOnSegment((Rule)rule, this.wikiSegment, params);
        Assert.assertEquals((long)1L, (long)stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, "wiki"));
        Assert.assertFalse((boolean)serverT11.isLoadingSegment(this.wikiSegment));
        Assert.assertTrue((boolean)serverT11.isServingSegment(this.wikiSegment));
        Assert.assertTrue((boolean)serverT12.isLoadingSegment(this.wikiSegment));
    }

    @Test
    public void testSegmentIsNotBroadcastToDecommissioningServer() {
        ServerHolder activeServer = this.create10gbHistorical(TIER_1, new DataSegment[0]);
        ServerHolder decommissioningServer = this.createDecommissioningHistorical(TIER_1, new DataSegment[0]);
        DruidCluster cluster = DruidCluster.builder().add(activeServer).add(decommissioningServer).build();
        DruidCoordinatorRuntimeParams params = this.makeParamsWithUsedSegments(cluster, this.wikiSegment);
        ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule();
        CoordinatorRunStats stats = this.runRuleOnSegment((Rule)rule, this.wikiSegment, params);
        Assert.assertEquals((long)1L, (long)stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, "wiki"));
        Assert.assertTrue((boolean)activeServer.isLoadingSegment(this.wikiSegment));
        Assert.assertTrue((boolean)decommissioningServer.getLoadingSegments().isEmpty());
    }

    @Test
    public void testBroadcastSegmentIsDroppedFromDecommissioningServer() {
        ServerHolder activeServer = this.create10gbHistorical(TIER_1, this.wikiSegment);
        ServerHolder decommissioningServer = this.createDecommissioningHistorical(TIER_1, this.wikiSegment);
        DruidCluster cluster = DruidCluster.builder().add(activeServer).add(decommissioningServer).build();
        DruidCoordinatorRuntimeParams params = this.makeParamsWithUsedSegments(cluster, this.wikiSegment);
        ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule();
        CoordinatorRunStats stats = this.runRuleOnSegment((Rule)rule, this.wikiSegment, params);
        Assert.assertEquals((long)1L, (long)stats.getSegmentStat(Stats.Segments.DROPPED, TIER_1, "wiki"));
        Assert.assertTrue((boolean)activeServer.getPeon().getSegmentsToDrop().isEmpty());
        Assert.assertTrue((boolean)decommissioningServer.getPeon().getSegmentsToDrop().contains(this.wikiSegment));
    }

    @Test
    public void testSegmentIsBroadcastToAllServerTypes() {
        ServerHolder broker = new ServerHolder(this.create10gbServer(ServerType.BROKER, "broker_tier").toImmutableDruidServer(), (LoadQueuePeon)new TestLoadQueuePeon());
        ServerHolder indexer = new ServerHolder(this.create10gbServer(ServerType.INDEXER_EXECUTOR, TIER_2).toImmutableDruidServer(), (LoadQueuePeon)new TestLoadQueuePeon());
        ServerHolder historical = this.create10gbHistorical(TIER_1, new DataSegment[0]);
        DruidCluster cluster = DruidCluster.builder().add(broker).add(indexer).add(historical).build();
        DruidCoordinatorRuntimeParams params = this.makeParamsWithUsedSegments(cluster, this.wikiSegment);
        ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule();
        CoordinatorRunStats stats = this.runRuleOnSegment((Rule)rule, this.wikiSegment, params);
        Assert.assertEquals((long)1L, (long)stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, "wiki"));
        Assert.assertEquals((long)1L, (long)stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_2, "wiki"));
        Assert.assertEquals((long)1L, (long)stats.getSegmentStat(Stats.Segments.ASSIGNED, broker.getServer().getTier(), "wiki"));
        Assert.assertTrue((boolean)historical.isLoadingSegment(this.wikiSegment));
        Assert.assertTrue((boolean)indexer.isLoadingSegment(this.wikiSegment));
        Assert.assertTrue((boolean)broker.isLoadingSegment(this.wikiSegment));
    }

    @Test
    public void testReasonForBroadcastFailure() {
        ServerHolder eligibleServer = this.create10gbHistorical(TIER_1, new DataSegment[0]);
        ServerHolder serverWithNoDiskSpace = new ServerHolder(new DruidServer("server1", "server1", null, 0L, ServerType.HISTORICAL, TIER_1, 0).toImmutableDruidServer(), (LoadQueuePeon)new TestLoadQueuePeon());
        int maxSegmentsInLoadQueue = 5;
        ServerHolder serverWithFullQueue = new ServerHolder(this.create10gbServer(ServerType.HISTORICAL, TIER_1).toImmutableDruidServer(), (LoadQueuePeon)new TestLoadQueuePeon(), false, 5, 100);
        List<DataSegment> segmentsInQueue = CreateDataSegments.ofDatasource("koala").forIntervals(5, Granularities.MONTH).withNumPartitions(1).eachOfSizeInMb(10L);
        segmentsInQueue.forEach(s -> serverWithFullQueue.startOperation(SegmentAction.LOAD, s));
        Assert.assertTrue((boolean)serverWithFullQueue.isLoadQueueFull());
        DruidCluster cluster = DruidCluster.builder().add(eligibleServer).add(serverWithNoDiskSpace).add(serverWithFullQueue).build();
        DruidCoordinatorRuntimeParams params = this.makeParamsWithUsedSegments(cluster, this.wikiSegment);
        ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule();
        CoordinatorRunStats stats = this.runRuleOnSegment((Rule)rule, this.wikiSegment, params);
        Assert.assertEquals((long)1L, (long)stats.getSegmentStat(Stats.Segments.ASSIGNED, TIER_1, "wiki"));
        RowKey metricKey = RowKey.with((Dimension)Dimension.DATASOURCE, (String)"wiki").with(Dimension.TIER, TIER_1).and(Dimension.DESCRIPTION, "Not enough disk space");
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Segments.ASSIGN_SKIPPED, metricKey));
        metricKey = RowKey.with((Dimension)Dimension.DATASOURCE, (String)"wiki").with(Dimension.TIER, TIER_1).and(Dimension.DESCRIPTION, "Load queue is full");
        Assert.assertEquals((long)1L, (long)stats.get(Stats.Segments.ASSIGN_SKIPPED, metricKey));
    }

    private CoordinatorRunStats runRuleOnSegment(Rule rule, DataSegment segment, DruidCoordinatorRuntimeParams params) {
        StrategicSegmentAssigner segmentAssigner = params.getSegmentAssigner();
        rule.run(segment, (SegmentActionHandler)segmentAssigner);
        return params.getCoordinatorStats();
    }

    private DruidCoordinatorRuntimeParams makeParamsWithUsedSegments(DruidCluster druidCluster, DataSegment ... usedSegments) {
        return DruidCoordinatorRuntimeParams.builder().withDruidCluster(druidCluster).withUsedSegments(usedSegments).withBalancerStrategy((BalancerStrategy)new RandomBalancerStrategy()).withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null)).build();
    }

    private ServerHolder create10gbHistorical(String tier, DataSegment ... segments) {
        DruidServer server = this.create10gbServer(ServerType.HISTORICAL, tier);
        for (DataSegment segment : segments) {
            server.addDataSegment(segment);
        }
        return new ServerHolder(server.toImmutableDruidServer(), (LoadQueuePeon)new TestLoadQueuePeon());
    }

    private ServerHolder createDecommissioningHistorical(String tier, DataSegment ... segments) {
        DruidServer server = this.create10gbServer(ServerType.HISTORICAL, tier);
        for (DataSegment segment : segments) {
            server.addDataSegment(segment);
        }
        return new ServerHolder(server.toImmutableDruidServer(), (LoadQueuePeon)new TestLoadQueuePeon(), true);
    }

    private DruidServer create10gbServer(ServerType type, String tier) {
        String name = "server_" + this.serverId++;
        return new DruidServer(name, name, null, 0x280000000L, type, tier, 0);
    }
}

