/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.util.Collections;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.sink.FixedBucketSink;
import org.apache.paimon.flink.sink.RowDataStoreWriteOperator;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.operation.KeyValueFileStoreWrite;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class FlinkSinkTest {
    @TempDir
    java.nio.file.Path tempPath;
    protected static final RowType ROW_TYPE = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.INT()}, (String[])new String[]{"pk", "pt0"});

    @Test
    public void testOptimizeKeyValueWriterForBatch() throws Exception {
        FileStoreTable fileStoreTable = this.createFileStoreTable();
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        Assertions.assertThat((boolean)this.testSpillable(streamExecutionEnvironment, fileStoreTable)).isTrue();
        streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        Assertions.assertThat((boolean)this.testSpillable(streamExecutionEnvironment, fileStoreTable)).isFalse();
    }

    private boolean testSpillable(StreamExecutionEnvironment streamExecutionEnvironment, FileStoreTable fileStoreTable) throws Exception {
        DataStreamSource source = streamExecutionEnvironment.fromCollection(Collections.singletonList(GenericRow.of((Object[])new Object[]{1, 1})));
        FixedBucketSink flinkSink = new FixedBucketSink(fileStoreTable, null, null);
        DataStream written = flinkSink.doWrite((DataStream)source, "123", Integer.valueOf(1));
        RowDataStoreWriteOperator operator = (RowDataStoreWriteOperator)((SimpleOperatorFactory)((OneInputTransformation)written.getTransformation()).getOperatorFactory()).getOperator();
        StateInitializationContextImpl context = new StateInitializationContextImpl(null, (OperatorStateStore)new MockOperatorStateStore(){

            public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
                return this.getListState(stateDescriptor);
            }
        }, null, null, null);
        operator.initStateAndWriter((StateInitializationContext)context, (a, b, c) -> true, (IOManager)new IOManagerAsync(), "123");
        return ((KeyValueFileStoreWrite)((StoreSinkWriteImpl)operator.write).write.getWrite()).bufferSpillable();
    }

    private FileStoreTable createFileStoreTable() throws Exception {
        Path tablePath = new Path(this.tempPath.toString());
        Options options = new Options();
        options.set(CoreOptions.PATH, (Object)tablePath.toString());
        options.set(CoreOptions.BUCKET, (Object)1);
        TableSchema tableSchema = SchemaUtils.forceCommit((SchemaManager)new SchemaManager((FileIO)LocalFileIO.create(), tablePath), (Schema)new Schema(ROW_TYPE.getFields(), Collections.emptyList(), Collections.singletonList("pk"), options.toMap(), ""));
        return FileStoreTableFactory.create((FileIO)FileIOFinder.find((Path)tablePath), (Path)tablePath, (TableSchema)tableSchema, (Options)options, (CatalogEnvironment)new CatalogEnvironment(Lock.emptyFactory(), null, null));
    }
}

