/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.StaticFileStoreSplitEnumerator;
import org.apache.paimon.flink.source.StaticFileStoreSplitEnumeratorTestBase;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class FairAssignModeTest
extends StaticFileStoreSplitEnumeratorTestBase {
    @Test
    public void testSplitAllocation() {
        TestingSplitEnumeratorContext<FileStoreSourceSplit> context = this.getSplitEnumeratorContext(2);
        ArrayList<FileStoreSourceSplit> splits = new ArrayList<FileStoreSourceSplit>();
        for (int i = 1; i <= 4; ++i) {
            splits.add(this.createSnapshotSplit(i, 0, Collections.emptyList()));
        }
        StaticFileStoreSplitEnumerator enumerator = this.getSplitEnumerator((SplitEnumeratorContext<FileStoreSourceSplit>)context, (List<FileStoreSourceSplit>)splits);
        enumerator.handleSplitRequest(0, "test-host");
        enumerator.handleSplitRequest(1, "test-host");
        Map assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0, 1});
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits()).containsExactly((Object[])new FileStoreSourceSplit[]{(FileStoreSourceSplit)splits.get(0), (FileStoreSourceSplit)splits.get(2)});
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(1)).getAssignedSplits()).containsExactly((Object[])new FileStoreSourceSplit[]{(FileStoreSourceSplit)splits.get(1), (FileStoreSourceSplit)splits.get(3)});
        enumerator.addSplitsBack(((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits(), 0);
        context.getSplitAssignments().clear();
        Assertions.assertThat((Map)context.getSplitAssignments()).isEmpty();
        enumerator.handleSplitRequest(0, "test-host");
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits()).containsExactly((Object[])new FileStoreSourceSplit[]{(FileStoreSourceSplit)splits.get(0), (FileStoreSourceSplit)splits.get(2)});
    }

    @Test
    public void testSplitBatch() {
        TestingSplitEnumeratorContext<FileStoreSourceSplit> context = this.getSplitEnumeratorContext(2);
        ArrayList<FileStoreSourceSplit> splits = new ArrayList<FileStoreSourceSplit>();
        for (int i = 1; i <= 28; ++i) {
            splits.add(this.createSnapshotSplit(i, 0, Collections.emptyList()));
        }
        StaticFileStoreSplitEnumerator enumerator = this.getSplitEnumerator((SplitEnumeratorContext<FileStoreSourceSplit>)context, (List<FileStoreSourceSplit>)splits);
        enumerator.handleSplitRequest(0, "test-host");
        enumerator.handleSplitRequest(1, "test-host");
        Map assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0, 1});
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits()).hasSize(10);
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(1)).getAssignedSplits()).hasSize(10);
        enumerator.handleSplitRequest(0, "test-host");
        enumerator.handleSplitRequest(1, "test-host");
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0, 1});
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits()).hasSize(14);
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(1)).getAssignedSplits()).hasSize(14);
        enumerator.handleSplitRequest(0, "test-host");
        enumerator.handleSplitRequest(1, "test-host");
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0, 1});
        Assertions.assertThat((boolean)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).hasReceivedNoMoreSplitsSignal()).isEqualTo(true);
        Assertions.assertThat((boolean)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(1)).hasReceivedNoMoreSplitsSignal()).isEqualTo(true);
    }

    @Test
    public void testSplitAllocationNotEvenly() {
        TestingSplitEnumeratorContext<FileStoreSourceSplit> context = this.getSplitEnumeratorContext(2);
        ArrayList<FileStoreSourceSplit> splits = new ArrayList<FileStoreSourceSplit>();
        for (int i = 1; i <= 3; ++i) {
            splits.add(this.createSnapshotSplit(i, 0, Collections.emptyList()));
        }
        StaticFileStoreSplitEnumerator enumerator = this.getSplitEnumerator((SplitEnumeratorContext<FileStoreSourceSplit>)context, (List<FileStoreSourceSplit>)splits);
        enumerator.handleSplitRequest(0, "test-host");
        enumerator.handleSplitRequest(1, "test-host");
        Map assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0, 1});
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(1)).getAssignedSplits()).containsExactly((Object[])new FileStoreSourceSplit[]{(FileStoreSourceSplit)splits.get(0), (FileStoreSourceSplit)splits.get(2)});
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits()).containsExactly((Object[])new FileStoreSourceSplit[]{(FileStoreSourceSplit)splits.get(1)});
    }

    @Test
    public void testSplitAllocationSomeEmpty() {
        TestingSplitEnumeratorContext<FileStoreSourceSplit> context = this.getSplitEnumeratorContext(3);
        ArrayList<FileStoreSourceSplit> splits = new ArrayList<FileStoreSourceSplit>();
        for (int i = 1; i <= 2; ++i) {
            splits.add(this.createSnapshotSplit(i, 0, Collections.emptyList()));
        }
        StaticFileStoreSplitEnumerator enumerator = this.getSplitEnumerator((SplitEnumeratorContext<FileStoreSourceSplit>)context, (List<FileStoreSourceSplit>)splits);
        enumerator.handleSplitRequest(0, "test-host");
        enumerator.handleSplitRequest(1, "test-host");
        enumerator.handleSplitRequest(2, "test-host");
        Map assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0, 1, 2});
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits()).containsExactly((Object[])new FileStoreSourceSplit[]{(FileStoreSourceSplit)splits.get(0)});
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(1)).getAssignedSplits()).containsExactly((Object[])new FileStoreSourceSplit[]{(FileStoreSourceSplit)splits.get(1)});
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(2)).getAssignedSplits()).isEmpty();
    }

    @Override
    protected FlinkConnectorOptions.SplitAssignMode splitAssignMode() {
        return FlinkConnectorOptions.SplitAssignMode.FAIR;
    }
}

