/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.partition;

import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import org.apache.paimon.flink.sink.partition.PartitionMarkDoneTrigger;
import org.apache.paimon.partition.PartitionTimeExtractor;
import org.apache.paimon.testutils.assertj.PaimonAssertions;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class PartitionMarkDoneTriggerTest {
    private static final Duration timeInterval = Duration.ofDays(1L);
    private static final Duration idleTime = Duration.ofMinutes(15L);
    private List<String> pendingPartitions;
    private PartitionMarkDoneTrigger.State state;
    private PartitionTimeExtractor extractor;

    PartitionMarkDoneTriggerTest() {
    }

    @BeforeEach
    public void before() throws Exception {
        this.pendingPartitions = new ArrayList<String>();
        this.state = new PartitionMarkDoneTrigger.State(){

            public List<String> restore() {
                return new ArrayList<String>(PartitionMarkDoneTriggerTest.this.pendingPartitions);
            }

            public void update(List<String> partitions) {
                PartitionMarkDoneTriggerTest.this.pendingPartitions.clear();
                PartitionMarkDoneTriggerTest.this.pendingPartitions.addAll(partitions);
            }
        };
        this.extractor = new PartitionTimeExtractor("$dt", "yyyy-MM-dd");
    }

    @Test
    public void testWithoutEndInput() throws Exception {
        PartitionMarkDoneTrigger trigger = new PartitionMarkDoneTrigger(this.state, this.extractor, timeInterval, idleTime, this.toEpochMillis("2024-02-01"), false);
        trigger.notifyPartition("dt=2024-02-02", this.toEpochMillis("2024-02-01"));
        List partitions = trigger.donePartitions(false, this.toEpochMillis("2024-02-03"));
        Assertions.assertThat((List)partitions).isEmpty();
        Assertions.assertThat(this.pendingPartitions).isEmpty();
        trigger.snapshotState();
        Assertions.assertThat(this.pendingPartitions).containsOnly((Object[])new String[]{"dt=2024-02-02"});
        partitions = trigger.donePartitions(false, this.toEpochMillis("2024-02-03") + idleTime.toMillis());
        Assertions.assertThat((List)partitions).isEmpty();
        partitions = trigger.donePartitions(false, this.toEpochMillis("2024-02-03") + idleTime.toMillis() + 1L);
        Assertions.assertThat((List)partitions).containsOnly((Object[])new String[]{"dt=2024-02-02"});
        trigger.snapshotState();
        Assertions.assertThat(this.pendingPartitions).isEmpty();
        trigger.notifyPartition("dt=2024-02-03", this.toEpochMillis("2024-02-03"));
        trigger.notifyPartition("dt=2024-02-03", this.toEpochMillis("2024-02-04") + idleTime.toMillis());
        partitions = trigger.donePartitions(false, this.toEpochMillis("2024-02-04") + idleTime.toMillis() + 1L);
        Assertions.assertThat((List)partitions).isEmpty();
        partitions = trigger.donePartitions(false, this.toEpochMillis("2024-02-04") + 2L * idleTime.toMillis() + 1L);
        Assertions.assertThat((List)partitions).containsOnly((Object[])new String[]{"dt=2024-02-03"});
        this.pendingPartitions.add("dt=2024-02-04");
        trigger = new PartitionMarkDoneTrigger(this.state, this.extractor, timeInterval, idleTime, this.toEpochMillis("2024-02-06"), false);
        partitions = trigger.donePartitions(false, this.toEpochMillis("2024-02-06"));
        Assertions.assertThat((List)partitions).isEmpty();
        partitions = trigger.donePartitions(false, this.toEpochMillis("2024-02-06") + idleTime.toMillis() + 1L);
        Assertions.assertThat((List)partitions).containsOnly((Object[])new String[]{"dt=2024-02-04"});
    }

    @Test
    public void testWithEndInput() throws Exception {
        PartitionMarkDoneTrigger trigger = new PartitionMarkDoneTrigger(this.state, this.extractor, timeInterval, idleTime, this.toEpochMillis("2024-02-01"), true);
        trigger.notifyPartition("dt=2024-02-02", this.toEpochMillis("2024-02-01"));
        List partitions = trigger.donePartitions(true, this.toEpochMillis("2024-02-03"));
        Assertions.assertThat((List)partitions).containsOnly((Object[])new String[]{"dt=2024-02-02"});
    }

    @Test
    public void testParseNonDateFormattedPartition() throws Exception {
        PartitionMarkDoneTrigger trigger = new PartitionMarkDoneTrigger(this.state, this.extractor, timeInterval, idleTime, this.toEpochMillis("2024-02-01"), true);
        Assertions.assertThatThrownBy(() -> trigger.extractDateTime("unknown")).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(RuntimeException.class, (String)"Can't extract datetime from partition unknown")});
    }

    private long toEpochMillis(String dt) {
        return LocalDateTime.of(LocalDate.parse(dt), LocalTime.MIN).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
    }
}

