/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source.align;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.flink.source.FileSplitEnumeratorTestBase;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.PendingSplitsCheckpoint;
import org.apache.paimon.flink.source.align.AlignedContinuousFileSplitEnumerator;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.testutils.assertj.PaimonAssertions;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Condition;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class AlignedContinuousFileSplitEnumeratorTest
extends FileSplitEnumeratorTestBase {
    private static final RowType ROW_TYPE = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT()}, (String[])new String[]{"pt", "a", "b"});
    private static final String CONSUMER_ID = "consumer";
    @TempDir
    private java.nio.file.Path tempDir;
    private FileStoreTable table;

    @BeforeEach
    public void before() throws Exception {
        Path tablePath = new Path("traceable://" + this.tempDir.toString());
        FileIO fileIO = FileIOFinder.find((Path)tablePath);
        SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
        TableSchema tableSchema = schemaManager.createTable(new Schema(ROW_TYPE.getFields(), Collections.singletonList("pt"), Arrays.asList("pt", "a"), Collections.singletonMap(CoreOptions.CONSUMER_ID.key(), CONSUMER_ID), ""));
        this.table = FileStoreTableFactory.create((FileIO)LocalFileIO.create(), (Path)tablePath, (TableSchema)tableSchema);
    }

    @Test
    public void testSplitsAssignedBySnapshot() throws Exception {
        TestingSplitEnumeratorContext<FileStoreSourceSplit> context = this.getSplitEnumeratorContext(2);
        ArrayList<FileStoreSourceSplit> initialSplits = new ArrayList<FileStoreSourceSplit>();
        for (int i = 1; i <= 2; ++i) {
            initialSplits.add(this.createSnapshotSplit(i, 0, Collections.emptyList()));
        }
        initialSplits.add(this.createSnapshotSplit(2, 1, Collections.emptyList()));
        ArrayList expectedSplits = new ArrayList(initialSplits);
        AlignedContinuousFileSplitEnumerator enumerator = new Builder().setSplitEnumeratorContext((SplitEnumeratorContext<FileStoreSourceSplit>)context).setInitialSplits(initialSplits).setDiscoveryInterval(3L).build();
        enumerator.handleSplitRequest(0, "test-host");
        enumerator.handleSplitRequest(1, "test-host");
        Map assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0});
        List assignedSplits = ((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits();
        Assertions.assertThat((List)assignedSplits).containsExactly((Object[])new FileStoreSourceSplit[]{(FileStoreSourceSplit)expectedSplits.get(0)});
        context.getSplitAssignments().clear();
        enumerator.handleSplitRequest(0, "test-host");
        enumerator.handleSplitRequest(1, "test-host");
        Assertions.assertThat((Map)context.getSplitAssignments()).isEmpty();
        enumerator.snapshotState(1L);
        Assertions.assertThat((Map)context.getSplitAssignments()).isEmpty();
        enumerator.handleSplitRequest(0, "test-host");
        enumerator.handleSplitRequest(1, "test-host");
        assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0, 1});
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits()).containsExactly((Object[])new FileStoreSourceSplit[]{(FileStoreSourceSplit)expectedSplits.get(1)});
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(1)).getAssignedSplits()).containsExactly((Object[])new FileStoreSourceSplit[]{(FileStoreSourceSplit)expectedSplits.get(2)});
    }

    @Test
    public void testEnumeratorSnapshotState() throws Exception {
        TestingSplitEnumeratorContext<FileStoreSourceSplit> context = this.getSplitEnumeratorContext(1);
        AlignedContinuousFileSplitEnumerator enumerator = new Builder().setSplitEnumeratorContext((SplitEnumeratorContext<FileStoreSourceSplit>)context).setInitialSplits(Collections.emptyList()).setDiscoveryInterval(3L).setAlignedTimeout(10L).build();
        Assertions.assertThatThrownBy(() -> enumerator.snapshotState(1L)).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches((String)"Timeout while waiting for snapshot from paimon source.")});
        ArrayList<FileStoreSourceSplit> splits = new ArrayList<FileStoreSourceSplit>();
        for (int i = 1; i <= 2; ++i) {
            splits.add(this.createSnapshotSplit(i, 0, Collections.emptyList()));
        }
        enumerator.addSplits(splits);
        enumerator.handleSplitRequest(0, "test-host");
        Map assignments = context.getSplitAssignments();
        Assertions.assertThat((Map)assignments).containsOnlyKeys((Object[])new Integer[]{0});
        Assertions.assertThat((List)((TestingSplitEnumeratorContext.SplitAssignmentState)assignments.get(0)).getAssignedSplits()).containsExactly((Object[])new FileStoreSourceSplit[]{(FileStoreSourceSplit)splits.get(0)});
        PendingSplitsCheckpoint checkpoint = enumerator.snapshotState(1L);
        Assertions.assertThat((Collection)checkpoint.splits()).containsExactly((Object[])new FileStoreSourceSplit[]{(FileStoreSourceSplit)splits.get(1)});
    }

    @Test
    public void testScanWithConsumerId() throws Exception {
        TestingSplitEnumeratorContext<FileStoreSourceSplit> context = this.getSplitEnumeratorContext(1);
        AlignedContinuousFileSplitEnumerator enumerator = new Builder().setSplitEnumeratorContext((SplitEnumeratorContext<FileStoreSourceSplit>)context).setInitialSplits(Collections.emptyList()).setScan((StreamTableScan)this.table.newStreamScan()).build();
        ArrayList<FileStoreSourceSplit> splits = new ArrayList<FileStoreSourceSplit>();
        for (int i = 1; i <= 2; ++i) {
            splits.add(this.createSnapshotSplit(i, 0, Collections.emptyList()));
        }
        enumerator.addSplits(splits);
        ConsumerManager consumerManager = new ConsumerManager(this.table.fileIO(), this.table.location());
        Assertions.assertThat((Optional)consumerManager.consumer(CONSUMER_ID)).isEmpty();
        enumerator.handleSplitRequest(0, "test-host");
        enumerator.snapshotState(1L);
        enumerator.notifyCheckpointComplete(1L);
        Assertions.assertThat((Optional)consumerManager.consumer(CONSUMER_ID)).hasValueSatisfying(new Condition(consumer -> consumer.nextSnapshot() == 2L, "condition", new Object[0]));
        enumerator.handleSplitRequest(0, "test-host");
        enumerator.snapshotState(2L);
        enumerator.notifyCheckpointComplete(2L);
        Assertions.assertThat((Optional)consumerManager.consumer(CONSUMER_ID)).hasValueSatisfying(new Condition(consumer -> consumer.nextSnapshot() == 3L, "condition", new Object[0]));
    }

    private static class Builder {
        private SplitEnumeratorContext<FileStoreSourceSplit> context;
        private Collection<FileStoreSourceSplit> initialSplits = Collections.emptyList();
        private long discoveryInterval = Long.MAX_VALUE;
        private StreamTableScan scan;
        private BucketMode bucketMode = BucketMode.FIXED;
        private long timeout = 30000L;

        private Builder() {
        }

        public Builder setSplitEnumeratorContext(SplitEnumeratorContext<FileStoreSourceSplit> context) {
            this.context = context;
            return this;
        }

        public Builder setInitialSplits(Collection<FileStoreSourceSplit> initialSplits) {
            this.initialSplits = initialSplits;
            return this;
        }

        public Builder setDiscoveryInterval(long discoveryInterval) {
            this.discoveryInterval = discoveryInterval;
            return this;
        }

        public Builder setScan(StreamTableScan scan) {
            this.scan = scan;
            return this;
        }

        public Builder withBucketMode(BucketMode bucketMode) {
            this.bucketMode = bucketMode;
            return this;
        }

        public Builder setAlignedTimeout(long timeout) {
            this.timeout = timeout;
            return this;
        }

        public AlignedContinuousFileSplitEnumerator build() {
            return new AlignedContinuousFileSplitEnumerator(this.context, this.initialSplits, null, this.discoveryInterval, this.scan, this.bucketMode, this.timeout, 10);
        }
    }
}

