/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.listener;

import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.listener.ListenerTestUtils;
import org.apache.paimon.flink.sink.listener.MockCustomPartitionMarkDoneAction;
import org.apache.paimon.flink.sink.listener.PartitionMarkDoneListener;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class CustomPartitionMarkDoneActionTest
extends TableTestBase {
    @Test
    public void testCustomPartitionMarkDoneAction() throws Exception {
        Identifier identifier = this.identifier("T");
        Schema schema = Schema.newBuilder().column("a", (DataType)DataTypes.INT()).column("b", (DataType)DataTypes.INT()).column("c", (DataType)DataTypes.INT()).partitionKeys(new String[]{"a"}).primaryKey(new String[]{"a", "b"}).option(CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT.key(), "true").option(CoreOptions.PARTITION_MARK_DONE_ACTION.key(), "success-file,custom").build();
        this.catalog.createTable(identifier, schema, true);
        FileStoreTable table = (FileStoreTable)this.catalog.getTable(identifier);
        Path location = table.location();
        Path successFile = new Path(location, "a=0/_SUCCESS");
        Assertions.assertThatThrownBy(() -> PartitionMarkDoneListener.create((ClassLoader)((Object)((Object)this)).getClass().getClassLoader(), (boolean)false, (boolean)false, (OperatorStateStore)new MockOperatorStateStore(), (FileStoreTable)table)).hasMessageContaining(String.format("You need to set [%s] when you add [%s] mark done action in your property [%s].", CoreOptions.PARTITION_MARK_DONE_CUSTOM_CLASS.key(), CoreOptions.PartitionMarkDoneAction.CUSTOM, CoreOptions.PARTITION_MARK_DONE_ACTION.key()));
        this.catalog.alterTable(identifier, SchemaChange.setOption((String)CoreOptions.PARTITION_MARK_DONE_CUSTOM_CLASS.key(), (String)MockCustomPartitionMarkDoneAction.class.getName()), true);
        FileStoreTable table2 = (FileStoreTable)this.catalog.getTable(identifier);
        PartitionMarkDoneListener markDone = (PartitionMarkDoneListener)PartitionMarkDoneListener.create((ClassLoader)((Object)((Object)this)).getClass().getClassLoader(), (boolean)false, (boolean)false, (OperatorStateStore)new MockOperatorStateStore(), (FileStoreTable)table2).get();
        ListenerTestUtils.notifyCommits(markDone, false);
        Assertions.assertThat((boolean)table2.fileIO().exists(successFile)).isEqualTo(true);
        Assertions.assertThat((String)MockCustomPartitionMarkDoneAction.getMarkedDonePartitions().iterator().next()).isEqualTo("table=default.T,partition=a=0/");
    }
}

