/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source.assigner;

import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.assigner.GetSplitResult;
import org.apache.iceberg.flink.source.assigner.OrderedSplitAssignerFactory;
import org.apache.iceberg.flink.source.assigner.SplitAssigner;
import org.apache.iceberg.flink.source.assigner.SplitAssignerTestBase;
import org.apache.iceberg.flink.source.reader.ColumnStatsWatermarkExtractor;
import org.apache.iceberg.flink.source.reader.ReaderUtil;
import org.apache.iceberg.flink.source.reader.SplitWatermarkExtractor;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.flink.source.split.SplitComparators;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SerializationUtil;
import org.junit.Assert;
import org.junit.Test;

public class TestWatermarkBasedSplitAssigner
extends SplitAssignerTestBase {
    public static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.required((int)1, (String)"timestamp_column", (Type)Types.TimestampType.withoutZone())});
    private static final GenericAppenderFactory APPENDER_FACTORY = new GenericAppenderFactory(SCHEMA);

    @Override
    protected SplitAssigner splitAssigner() {
        return new OrderedSplitAssignerFactory(SplitComparators.watermark((SplitWatermarkExtractor)new ColumnStatsWatermarkExtractor(SCHEMA, "timestamp_column", null))).createAssigner();
    }

    @Test
    public void testMultipleFilesInAnIcebergSplit() {
        SplitAssigner assigner = this.splitAssigner();
        assigner.onDiscoveredSplits(this.createSplits(4, 2, "2"));
        this.assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
        this.assertGetNext(assigner, GetSplitResult.Status.AVAILABLE);
        this.assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
    }

    @Test
    public void testSplitSort() {
        SplitAssigner assigner = this.splitAssigner();
        Instant now = Instant.now();
        List splits = IntStream.range(0, 5).mapToObj(i -> this.splitFromInstant(now.plus((long)i, ChronoUnit.MINUTES))).collect(Collectors.toList());
        assigner.onDiscoveredSplits(splits.subList(3, 5));
        assigner.onDiscoveredSplits(splits.subList(0, 1));
        assigner.onDiscoveredSplits(splits.subList(1, 3));
        this.assertGetNext(assigner, (IcebergSourceSplit)splits.get(0));
        this.assertGetNext(assigner, (IcebergSourceSplit)splits.get(1));
        this.assertGetNext(assigner, (IcebergSourceSplit)splits.get(2));
        this.assertGetNext(assigner, (IcebergSourceSplit)splits.get(3));
        this.assertGetNext(assigner, (IcebergSourceSplit)splits.get(4));
        this.assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE);
    }

    @Test
    public void testSerializable() {
        byte[] bytes = SerializationUtil.serializeToBytes((Object)SplitComparators.watermark((SplitWatermarkExtractor)new ColumnStatsWatermarkExtractor(TestFixtures.SCHEMA, "id", TimeUnit.MILLISECONDS)));
        SerializableComparator comparator = (SerializableComparator)SerializationUtil.deserializeFromBytes((byte[])bytes);
        Assert.assertNotNull((Object)comparator);
    }

    private void assertGetNext(SplitAssigner assigner, IcebergSourceSplit split) {
        GetSplitResult result = assigner.getNext(null);
        Assert.assertEquals((Object)result.split(), (Object)split);
    }

    @Override
    protected List<IcebergSourceSplit> createSplits(int fileCount, int filesPerSplit, String version) {
        return IntStream.range(0, fileCount / filesPerSplit).mapToObj(splitNum -> this.splitFromRecords(IntStream.range(0, filesPerSplit).mapToObj(fileNum -> RandomGenericData.generate((Schema)SCHEMA, (int)2, (long)(splitNum * filesPerSplit + fileNum))).collect(Collectors.toList()))).collect(Collectors.toList());
    }

    private IcebergSourceSplit splitFromInstant(Instant instant) {
        GenericRecord record = GenericRecord.create((Schema)SCHEMA);
        record.set(0, (Object)LocalDateTime.ofInstant(instant, ZoneOffset.UTC));
        return this.splitFromRecords((List<List<Record>>)ImmutableList.of((Object)ImmutableList.of((Object)record)));
    }

    private IcebergSourceSplit splitFromRecords(List<List<Record>> records) {
        try {
            return IcebergSourceSplit.fromCombinedScanTask((CombinedScanTask)ReaderUtil.createCombinedScanTask(records, TEMPORARY_FOLDER, FileFormat.PARQUET, APPENDER_FACTORY));
        }
        catch (IOException e) {
            throw new RuntimeException("Split creation exception", e);
        }
    }
}

