/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.listener;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.StoreCommitter;
import org.apache.paimon.flink.sink.listener.ListenerTestUtils;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.partition.file.SuccessFile;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.TableCommit;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class WatermarkPartitionMarkDoneTest
extends TableTestBase {
    @Test
    public void testWaterMarkPartitionMarkDone() throws Exception {
        Identifier identifier = this.identifier("T");
        Schema schema = Schema.newBuilder().column("a", (DataType)DataTypes.STRING()).column("b", (DataType)DataTypes.INT()).column("c", (DataType)DataTypes.INT()).partitionKeys(new String[]{"a"}).primaryKey(new String[]{"a", "b"}).option(CoreOptions.PARTITION_MARK_DONE_ACTION.key(), "success-file").option(CoreOptions.FILE_FORMAT.key(), "avro").option(FlinkConnectorOptions.PARTITION_MARK_DONE_MODE.key(), "watermark").option(FlinkConnectorOptions.PARTITION_TIME_INTERVAL.key(), "1 h").option(FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE.key(), "15 min").option(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), "yyyy-MM-dd HH").option(CoreOptions.BUCKET.key(), "1").build();
        this.catalog.createTable(identifier, schema, true);
        FileStoreTable table = (FileStoreTable)this.catalog.getTable(identifier);
        TableWriteImpl write = table.newWrite("user1");
        TableCommitImpl commit = table.newCommit("user1");
        StoreCommitter committer = new StoreCommitter(table, (TableCommit)commit, ListenerTestUtils.createMockContext(true, false));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{BinaryString.fromString((String)"2025-03-01 12"), 1, 1}));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{BinaryString.fromString((String)"2025-03-01 13"), 1, 1}));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{BinaryString.fromString((String)"2025-03-01 14"), 1, 1}));
        ManifestCommittable committable1 = new ManifestCommittable(1L, Long.valueOf(LocalDateTime.parse("2025-03-01T12:50:30.00").atZone(ZoneId.of("UTC")).toInstant().toEpochMilli()));
        write.prepareCommit(true, 1L).forEach(arg_0 -> ((ManifestCommittable)committable1).addFileCommittable(arg_0));
        committer.commit(Collections.singletonList(committable1));
        this.validatePartitions(table, Collections.emptyList(), Arrays.asList("2025-03-01 12", "2025-03-01 13", "2025-03-01 14"));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{BinaryString.fromString((String)"2025-03-01 15"), 1, 1}));
        ManifestCommittable committable2 = new ManifestCommittable(2L, Long.valueOf(LocalDateTime.parse("2025-03-01T14:30:30.00").atZone(ZoneId.of("UTC")).toInstant().toEpochMilli()));
        write.prepareCommit(true, 2L).forEach(arg_0 -> ((ManifestCommittable)committable2).addFileCommittable(arg_0));
        committer.commit(Collections.singletonList(committable2));
        this.validatePartitions(table, Arrays.asList("2025-03-01 12", "2025-03-01 13"), Arrays.asList("2025-03-01 14", "2025-03-01 15"));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{BinaryString.fromString((String)"2025-03-01 16"), 2, 1}));
        ManifestCommittable committable3 = new ManifestCommittable(3L, Long.valueOf(LocalDateTime.parse("2025-03-01T16:20:30.00").atZone(ZoneId.of("UTC")).toInstant().toEpochMilli()));
        write.prepareCommit(true, 3L).forEach(arg_0 -> ((ManifestCommittable)committable3).addFileCommittable(arg_0));
        committer.commit(Collections.singletonList(committable3));
        this.validatePartitions(table, Arrays.asList("2025-03-01 12", "2025-03-01 13", "2025-03-01 14", "2025-03-01 15"), Collections.singletonList("2025-03-01 16"));
        committer.close();
    }

    private void validatePartitions(FileStoreTable table, List<String> donePartitions, List<String> pendingPartitions) throws Exception {
        LocalFileIO fileIO;
        for (String partition : donePartitions) {
            fileIO = new LocalFileIO();
            Path successPath = new Path(table.location(), String.format("a=%s/_SUCCESS", partition));
            SuccessFile successFile = SuccessFile.safelyFromPath((FileIO)fileIO, (Path)successPath);
            Assertions.assertThat((Object)successFile).isNotNull();
        }
        for (String partition : pendingPartitions) {
            fileIO = new LocalFileIO();
            Path dir = new Path(table.location(), String.format("a=%s", partition));
            Assertions.assertThat((boolean)fileIO.exists(dir)).isTrue();
            Path successPath = new Path(table.location(), String.format("a=%s/_SUCCESS", partition));
            SuccessFile successFile = SuccessFile.safelyFromPath((FileIO)fileIO, (Path)successPath);
            Assertions.assertThat((Object)successFile).isNull();
        }
    }
}

