/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.partition;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.sink.partition.PartitionMarkDone;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileTestUtils;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class PartitionMarkDoneTest
extends TableTestBase {
    PartitionMarkDoneTest() {
    }

    @Test
    public void testTriggerByCompaction() throws Exception {
        this.innerTest(true);
    }

    @Test
    public void testNotTriggerByCompaction() throws Exception {
        this.innerTest(false);
    }

    private void innerTest(boolean deletionVectors) throws Exception {
        Identifier identifier = this.identifier("T");
        Schema schema = Schema.newBuilder().column("a", (DataType)DataTypes.INT()).column("b", (DataType)DataTypes.INT()).column("c", (DataType)DataTypes.INT()).partitionKeys(new String[]{"a"}).primaryKey(new String[]{"a", "b"}).option(CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT.key(), "true").option(CoreOptions.PARTITION_MARK_DONE_ACTION.key(), "success-file").option(CoreOptions.DELETION_VECTORS_ENABLED.key(), Boolean.valueOf(deletionVectors).toString()).build();
        this.catalog.createTable(identifier, schema, true);
        FileStoreTable table = (FileStoreTable)this.catalog.getTable(identifier);
        Path location = table.location();
        Path successFile = new Path(location, "a=0/_SUCCESS");
        PartitionMarkDone markDone = (PartitionMarkDone)PartitionMarkDone.create((boolean)false, (boolean)false, (OperatorStateStore)new MockOperatorStateStore(), (FileStoreTable)table).get();
        this.notifyCommits(markDone, true);
        Assertions.assertThat((boolean)table.fileIO().exists(successFile)).isEqualTo(deletionVectors);
        if (!deletionVectors) {
            this.notifyCommits(markDone, false);
            Assertions.assertThat((boolean)table.fileIO().exists(successFile)).isEqualTo(true);
        }
    }

    private void notifyCommits(PartitionMarkDone markDone, boolean isCompact) {
        ManifestCommittable committable = new ManifestCommittable(Long.MAX_VALUE);
        DataFileMeta file = DataFileTestUtils.newFile();
        CommitMessageImpl compactMessage = isCompact ? new CommitMessageImpl(BinaryRow.singleColumn((Integer)0), 0, new DataIncrement(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), new CompactIncrement(Collections.singletonList(file), Collections.emptyList(), Collections.emptyList()), new IndexIncrement(Collections.emptyList())) : new CommitMessageImpl(BinaryRow.singleColumn((Integer)0), 0, new DataIncrement(Collections.singletonList(file), Collections.emptyList(), Collections.emptyList()), new CompactIncrement(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), new IndexIncrement(Collections.emptyList()));
        committable.addFileCommittable((CommitMessage)compactMessage);
        markDone.notifyCommittable(Collections.singletonList(committable));
    }

    private static class MockListState<T>
    implements ListState<T> {
        private final List<T> backingList = new ArrayList<T>();

        public void update(List<T> values) {
            this.backingList.clear();
            this.addAll(values);
        }

        public void addAll(List<T> values) {
            this.backingList.addAll(values);
        }

        public Iterable<T> get() {
            return new Iterable<T>(){

                @Override
                @Nonnull
                public Iterator<T> iterator() {
                    return backingList.iterator();
                }
            };
        }

        public void add(T value) {
            this.backingList.add(value);
        }

        public void clear() {
            this.backingList.clear();
        }
    }

    private static class MockOperatorStateStore
    implements OperatorStateStore {
        private MockOperatorStateStore() {
        }

        public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) {
            throw new UnsupportedOperationException();
        }

        public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) {
            return new MockListState();
        }

        public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) {
            throw new UnsupportedOperationException();
        }

        public Set<String> getRegisteredStateNames() {
            throw new UnsupportedOperationException();
        }

        public Set<String> getRegisteredBroadcastStateNames() {
            throw new UnsupportedOperationException();
        }
    }
}

