/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.Serializable;
import java.util.List;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.StoreCompactOperator;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.SinkRecord;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class StoreCompactOperatorTest
extends TableTestBase {
    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testCompactExactlyOnce(boolean streamingMode) throws Exception {
        this.createTableDefault();
        CompactRememberStoreWrite compactRememberStoreWrite = new CompactRememberStoreWrite(streamingMode);
        StoreCompactOperator storeCompactOperator = new StoreCompactOperator(this.getTableDefault(), (StoreSinkWrite.Provider & Serializable)(table, commitUser, state, ioManager, memoryPool, metricGroup) -> compactRememberStoreWrite, "10086");
        storeCompactOperator.open();
        StateInitializationContextImpl context = new StateInitializationContextImpl(null, (OperatorStateStore)new MockOperatorStateStore(){

            public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
                return this.getListState(stateDescriptor);
            }
        }, null, null, null);
        storeCompactOperator.initStateAndWriter((StateInitializationContext)context, (a, b, c) -> true, (IOManager)new IOManagerAsync(), "123");
        storeCompactOperator.processElement(new StreamRecord((Object)this.data(0)));
        storeCompactOperator.processElement(new StreamRecord((Object)this.data(0)));
        storeCompactOperator.processElement(new StreamRecord((Object)this.data(1)));
        storeCompactOperator.processElement(new StreamRecord((Object)this.data(1)));
        storeCompactOperator.processElement(new StreamRecord((Object)this.data(2)));
        storeCompactOperator.prepareCommit(true, 1L);
        Assertions.assertThat((int)compactRememberStoreWrite.compactTime).isEqualTo(3);
    }

    private RowData data(int bucket) {
        GenericRow genericRow = GenericRow.of((Object[])new Object[]{0L, BinaryRow.EMPTY_ROW.toBytes(), bucket, new byte[]{0, 0, 0, 0}});
        return new FlinkRowData((InternalRow)genericRow);
    }

    private static class CompactRememberStoreWrite
    implements StoreSinkWrite {
        private final boolean streamingMode;
        private int compactTime = 0;

        public CompactRememberStoreWrite(boolean streamingMode) {
            this.streamingMode = streamingMode;
        }

        public SinkRecord write(InternalRow rowData) {
            return null;
        }

        public SinkRecord write(InternalRow rowData, int bucket) {
            return null;
        }

        public SinkRecord toLogRecord(SinkRecord record) {
            return null;
        }

        public void compact(BinaryRow partition, int bucket, boolean fullCompaction) {
            ++this.compactTime;
        }

        public void notifyNewFiles(long snapshotId, BinaryRow partition, int bucket, List<DataFileMeta> files) {
        }

        public List<Committable> prepareCommit(boolean waitCompaction, long checkpointId) {
            return null;
        }

        public void snapshotState() {
        }

        public boolean streamingMode() {
            return this.streamingMode;
        }

        public void close() {
        }

        public void replace(FileStoreTable newTable) {
        }
    }
}

