/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.listener;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.StoreCommitter;
import org.apache.paimon.flink.sink.listener.CommitListener;
import org.apache.paimon.flink.sink.listener.CommitListenerFactory;
import org.apache.paimon.flink.sink.listener.ListenerTestUtils;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.TableCommit;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class CustomCommitListenerTest {
    @TempDir
    java.nio.file.Path tempDir;
    private static final Map<String, Set<String>> commitListenerResult = new ConcurrentHashMap<String, Set<String>>();

    @Test
    public void testCustomCommitListener() throws Exception {
        Path tablePath = new Path(this.tempDir.toString());
        SchemaManager schemaManager = new SchemaManager((FileIO)LocalFileIO.create(), tablePath);
        String testId = UUID.randomUUID().toString();
        Schema schema = Schema.newBuilder().column("a", (DataType)DataTypes.INT()).column("pt", (DataType)DataTypes.STRING()).partitionKeys(new String[]{"pt"}).option(FlinkConnectorOptions.COMMIT_CUSTOM_LISTENERS.key(), "partition-collector").option("test-listener-id", testId).build();
        schemaManager.createTable(schema);
        FileStoreTable table = FileStoreTableFactory.create((FileIO)LocalFileIO.create(), (Path)tablePath);
        String commitUser = UUID.randomUUID().toString();
        TableWriteImpl write = table.newWrite(commitUser);
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{1, BinaryString.fromString((String)"20250101")}));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{1, BinaryString.fromString((String)"20250102")}));
        List commitMessages = write.prepareCommit(false, 1L);
        write.close();
        StoreCommitter committer = new StoreCommitter(table, (TableCommit)table.newCommit(commitUser), ListenerTestUtils.createMockContext(true, false));
        ManifestCommittable committable = new ManifestCommittable(1L, null);
        commitMessages.forEach(arg_0 -> ((ManifestCommittable)committable).addFileCommittable(arg_0));
        committer.commit(Collections.singletonList(committable));
        committer.close();
        Assertions.assertThat((Collection)commitListenerResult.get(testId)).containsExactly((Object[])new String[]{"20250101", "20250102"});
    }

    public static class TestPartitionCollector
    implements CommitListener {
        private final String testId;

        public TestPartitionCollector(String testId) {
            this.testId = testId;
        }

        public void notifyCommittable(List<ManifestCommittable> committables) {
            commitListenerResult.computeIfAbsent(this.testId, k -> new HashSet()).addAll(committables.stream().flatMap(c -> c.fileCommittables().stream()).map(CommitMessage::partition).map(p -> p.getString(0).toString()).collect(Collectors.toSet()));
        }

        public void snapshotState() throws Exception {
        }

        public void close() throws IOException {
        }

        public static class Factory
        implements CommitListenerFactory {
            public String identifier() {
                return "partition-collector";
            }

            public Optional<CommitListener> create(Committer.Context context, FileStoreTable table) throws Exception {
                return Optional.of(new TestPartitionCollector((String)table.options().get("test-listener-id")));
            }
        }
    }
}

