/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableTypeInfo;
import org.apache.paimon.flink.sink.StoreCompactOperator;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.operation.WriteRestore;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.SinkRecord;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SerializationUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class StoreCompactOperatorTest
extends TableTestBase {
    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testCompactExactlyOnce(boolean streamingMode) throws Exception {
        this.createTableDefault();
        CompactRememberStoreWrite compactRememberStoreWrite = new CompactRememberStoreWrite(streamingMode);
        StoreCompactOperator.Factory operatorFactory = new StoreCompactOperator.Factory(this.getTableDefault(), (StoreSinkWrite.Provider & Serializable)(table, commitUser, state, ioManager, memoryPoolFactory, metricGroup) -> compactRememberStoreWrite, "10086", !streamingMode);
        TypeSerializer serializer = new CommittableTypeInfo().createSerializer(new ExecutionConfig());
        OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)operatorFactory);
        harness.setup(serializer);
        harness.initializeEmptyState();
        harness.open();
        harness.processElement(new StreamRecord((Object)this.data(0)));
        harness.processElement(new StreamRecord((Object)this.data(0)));
        harness.processElement(new StreamRecord((Object)this.data(1)));
        harness.processElement(new StreamRecord((Object)this.data(1)));
        harness.processElement(new StreamRecord((Object)this.data(2)));
        StoreCompactOperator operator = (StoreCompactOperator)harness.getOperator();
        Assertions.assertThat((Collection)operator.compactionWaitingSet()).containsExactlyInAnyOrder((Object[])new Pair[]{Pair.of((Object)BinaryRow.EMPTY_ROW, (Object)0), Pair.of((Object)BinaryRow.EMPTY_ROW, (Object)1), Pair.of((Object)BinaryRow.EMPTY_ROW, (Object)2)});
        Assertions.assertThat((int)compactRememberStoreWrite.compactTime).isEqualTo(0);
        operator.prepareCommit(true, 1L);
        Assertions.assertThat((Collection)operator.compactionWaitingSet()).isEmpty();
        Assertions.assertThat((int)compactRememberStoreWrite.compactTime).isEqualTo(3);
    }

    private RowData data(int bucket) {
        GenericRow genericRow = GenericRow.of((Object[])new Object[]{0L, SerializationUtils.serializeBinaryRow((BinaryRow)BinaryRow.EMPTY_ROW), bucket, new byte[]{0, 0, 0, 0}});
        return new FlinkRowData((InternalRow)genericRow);
    }

    private static class CompactRememberStoreWrite
    implements StoreSinkWrite {
        private final boolean streamingMode;
        private int compactTime = 0;

        public CompactRememberStoreWrite(boolean streamingMode) {
            this.streamingMode = streamingMode;
        }

        public void setWriteRestore(WriteRestore writeRestore) {
        }

        public SinkRecord write(InternalRow rowData) {
            return null;
        }

        public SinkRecord write(InternalRow rowData, int bucket) {
            return null;
        }

        public SinkRecord toLogRecord(SinkRecord record) {
            return null;
        }

        public void compact(BinaryRow partition, int bucket, boolean fullCompaction) {
            ++this.compactTime;
        }

        public void notifyNewFiles(long snapshotId, BinaryRow partition, int bucket, List<DataFileMeta> files) {
        }

        public List<Committable> prepareCommit(boolean waitCompaction, long checkpointId) {
            return null;
        }

        public void snapshotState() {
        }

        public boolean streamingMode() {
            return this.streamingMode;
        }

        public void close() {
        }

        public void replace(FileStoreTable newTable) {
        }
    }
}

