/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source;

import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.IcebergSource;
import org.apache.iceberg.flink.source.TestIcebergSourceFailover;
import org.awaitility.Awaitility;

public class TestIcebergSourceFailoverWithWatermarkExtractor
extends TestIcebergSourceFailover {
    private static final long RECORD_BATCH_TS_INCREMENT_MILLI = TimeUnit.MINUTES.toMillis(15L);
    private static final long RECORD_TS_INCREMENT_MILLI = TimeUnit.SECONDS.toMillis(1L);
    private final AtomicLong tsMilli = new AtomicLong(System.currentTimeMillis());

    @Override
    protected IcebergSource.Builder<RowData> sourceBuilder() {
        return IcebergSource.builder().tableLoader(this.sourceTableResource.tableLoader()).watermarkColumn("ts").project(TestFixtures.TS_SCHEMA);
    }

    @Override
    protected Schema schema() {
        return TestFixtures.TS_SCHEMA;
    }

    @Override
    protected List<Record> generateRecords(int numRecords, long seed) {
        this.tsMilli.addAndGet(RECORD_BATCH_TS_INCREMENT_MILLI);
        return RandomGenericData.generate((Schema)this.schema(), (int)numRecords, (long)seed).stream().peek(record -> {
            LocalDateTime ts = LocalDateTime.ofInstant(Instant.ofEpochMilli(this.tsMilli.addAndGet(RECORD_TS_INCREMENT_MILLI)), ZoneId.of("Z"));
            record.setField("ts", (Object)ts);
        }).collect(Collectors.toList());
    }

    @Override
    protected void assertRecords(Table table, List<Record> expectedRecords, Duration timeout) throws Exception {
        List<Record> expectedNormalized = this.convertLocalDateTimeToMilli(expectedRecords);
        Awaitility.await((String)"expected list of records should be produced").atMost(timeout).untilAsserted(() -> {
            SimpleDataUtil.equalsRecords(expectedNormalized, this.convertLocalDateTimeToMilli(SimpleDataUtil.tableRecords(table)), table.schema());
            SimpleDataUtil.assertRecordsEqual(expectedNormalized, this.convertLocalDateTimeToMilli(SimpleDataUtil.tableRecords(table)), table.schema());
        });
    }

    private List<Record> convertLocalDateTimeToMilli(List<Record> records) {
        return records.stream().peek(r -> {
            LocalDateTime localDateTime = (LocalDateTime)r.getField("ts");
            r.setField("ts", (Object)localDateTime.atZone(ZoneOffset.UTC).toInstant().toEpochMilli());
        }).collect(Collectors.toList());
    }
}

