/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.io.Serializable;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.types.Row;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.SerializableRowData;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.FailingFileIO;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
public class FlinkEndInputPartitionExpireITCase
extends CatalogITCaseBase {
    private static final RowType TABLE_TYPE = new RowType(Arrays.asList(new RowType.RowField("v", (LogicalType)new IntType()), new RowType.RowField("p", (LogicalType)new VarCharType(10)), new RowType.RowField("_k", (LogicalType)new IntType())));
    private static final List<RowData> SOURCE_DATA = Arrays.asList(FlinkEndInputPartitionExpireITCase.wrap((RowData)GenericRowData.of((Object[])new Object[]{0, StringData.fromString((String)"20240101"), 1})), FlinkEndInputPartitionExpireITCase.wrap((RowData)GenericRowData.of((Object[])new Object[]{0, StringData.fromString((String)"20240101"), 2})), FlinkEndInputPartitionExpireITCase.wrap((RowData)GenericRowData.of((Object[])new Object[]{0, StringData.fromString((String)"20240103"), 1})), FlinkEndInputPartitionExpireITCase.wrap((RowData)GenericRowData.of((Object[])new Object[]{5, StringData.fromString((String)"20240103"), 1})), FlinkEndInputPartitionExpireITCase.wrap((RowData)GenericRowData.of((Object[])new Object[]{6, StringData.fromString((String)"20240105"), 1})));
    private final StreamExecutionEnvironment env = this.streamExecutionEnvironmentBuilder().batchMode().parallelism(2).build();

    private static SerializableRowData wrap(RowData row) {
        return new SerializableRowData(row, (TypeSerializer<RowData>)InternalSerializers.create((RowType)TABLE_TYPE));
    }

    @Parameters(name="isBatch-{0}")
    public static List<Boolean> getVarSeg() {
        return Arrays.asList(true, false);
    }

    @TestTemplate
    public void testEndInputPartitionExpire() throws Exception {
        FileStoreTable table = this.buildFileStoreTable(new int[]{1}, new int[]{1, 2});
        DataStreamSource source = this.env.fromCollection(SOURCE_DATA, (TypeInformation)InternalTypeInfo.of((RowType)TABLE_TYPE));
        SingleOutputStreamOperator input = source.map((MapFunction & Serializable)r -> Row.of((Object[])new Object[]{r.getInt(0), r.getString(1).toString(), r.getInt(2)})).setParallelism(source.getParallelism());
        DataType inputType = DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"v", (DataType)DataTypes.INT()), DataTypes.FIELD((String)"p", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"_k", (DataType)DataTypes.INT())});
        new FlinkSinkBuilder((Table)table).forRow((DataStream)input, inputType).build();
        this.env.execute();
        Assertions.assertEquals((long)2L, (long)table.snapshotManager().snapshotCount());
        Assertions.assertEquals((Object)Snapshot.CommitKind.OVERWRITE, (Object)table.snapshotManager().snapshot(2L).commitKind());
    }

    private FileStoreTable buildFileStoreTable(int[] partitions, int[] primaryKey) throws Exception {
        Options options = new Options();
        options.set(CoreOptions.BUCKET, (Object)3);
        options.set(CoreOptions.PATH, (Object)this.getTempDirPath());
        options.set(CoreOptions.FILE_FORMAT, (Object)"avro");
        options.set(CoreOptions.PARTITION_EXPIRATION_TIME, (Object)Duration.ofDays(2L));
        options.set(CoreOptions.PARTITION_EXPIRATION_CHECK_INTERVAL, (Object)Duration.ofHours(1L));
        options.set(CoreOptions.PARTITION_TIMESTAMP_FORMATTER, (Object)"yyyyMMdd");
        options.set(CoreOptions.END_INPUT_CHECK_PARTITION_EXPIRE, (Object)true);
        Path tablePath = new CoreOptions(options.toMap()).path();
        if (primaryKey.length == 0) {
            options.set(CoreOptions.BUCKET_KEY, (Object)"_k");
        }
        Schema schema = new Schema(LogicalTypeConversion.toDataType((RowType)TABLE_TYPE).getFields(), Arrays.stream(partitions).mapToObj(i -> (String)TABLE_TYPE.getFieldNames().get(i)).collect(Collectors.toList()), Arrays.stream(primaryKey).mapToObj(i -> (String)TABLE_TYPE.getFieldNames().get(i)).collect(Collectors.toList()), options.toMap(), "");
        return (FileStoreTable)FailingFileIO.retryArtificialException(() -> {
            new SchemaManager((FileIO)LocalFileIO.create(), tablePath).createTable(schema);
            return FileStoreTableFactory.create((FileIO)LocalFileIO.create(), (Options)options);
        });
    }
}

