/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.io.Serializable;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableTypeInfo;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.TraceableFileIO;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;

public class CdcRecordStoreWriteOperatorTest {
    @TempDir
    java.nio.file.Path tempDir;
    private Path tablePath;
    private String commitUser;

    @BeforeEach
    public void before() {
        this.tablePath = new Path("traceable://" + this.tempDir.toString());
        this.commitUser = UUID.randomUUID().toString();
    }

    @AfterEach
    public void after() {
        Predicate<Path> pathPredicate = path -> path.toString().contains(this.tempDir.toString());
        Assertions.assertThat((List)TraceableFileIO.openInputStreams(pathPredicate)).isEmpty();
        Assertions.assertThat((List)TraceableFileIO.openOutputStreams(pathPredicate)).isEmpty();
    }

    @Test
    @Timeout(value=30L)
    public void testAddColumn() throws Exception {
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.BIGINT(), DataTypes.STRING()}, (String[])new String[]{"pt", "k", "v"});
        FileStoreTable table = this.createFileStoreTable(rowType, Collections.singletonList("pt"), Arrays.asList("pt", "k"));
        OneInputStreamOperatorTestHarness<CdcRecord, Committable> harness = this.createTestHarness(table);
        harness.open();
        Runner runner = new Runner(harness);
        Thread t = new Thread(runner);
        t.start();
        HashMap<String, String> data = new HashMap<String, String>();
        data.put("pt", "0");
        data.put("k", "1");
        data.put("v", "10");
        CdcRecord expected = new CdcRecord(RowKind.INSERT, data);
        runner.offer(expected);
        CdcRecord actual = runner.take();
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        data = new HashMap();
        data.put("pt", "0");
        data.put("k", "2");
        expected = new CdcRecord(RowKind.INSERT, data);
        runner.offer(expected);
        actual = runner.take();
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        data = new HashMap();
        data.put("pt", "0");
        data.put("k", "3");
        data.put("v", "30");
        data.put("v2", "300");
        expected = new CdcRecord(RowKind.INSERT, data);
        runner.offer(expected);
        actual = runner.poll(1L);
        Assertions.assertThat((Object)actual).isNull();
        SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location());
        schemaManager.commitChanges(new SchemaChange[]{SchemaChange.addColumn((String)"v2", (DataType)DataTypes.INT())});
        actual = runner.take();
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        runner.stop();
        t.join();
        harness.close();
    }

    @Test
    @Timeout(value=30L)
    public void testUpdateColumnType() throws Exception {
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.FLOAT(), DataTypes.VARCHAR((int)5), DataTypes.VARBINARY((int)5)}, (String[])new String[]{"k", "v1", "v2", "v3", "v4"});
        FileStoreTable table = this.createFileStoreTable(rowType, Collections.emptyList(), Collections.singletonList("k"));
        OneInputStreamOperatorTestHarness<CdcRecord, Committable> harness = this.createTestHarness(table);
        harness.open();
        Runner runner = new Runner(harness);
        Thread t = new Thread(runner);
        t.start();
        HashMap<String, String> data = new HashMap<String, String>();
        data.put("k", "1");
        data.put("v1", "10");
        data.put("v2", "0.625");
        data.put("v3", "one");
        data.put("v4", "b_one");
        CdcRecord expected = new CdcRecord(RowKind.INSERT, data);
        runner.offer(expected);
        CdcRecord actual = runner.take();
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        data = new HashMap();
        data.put("k", "2");
        data.put("v1", "12345678987654321");
        data.put("v2", "0.25");
        expected = new CdcRecord(RowKind.INSERT, data);
        runner.offer(expected);
        actual = runner.poll(1L);
        Assertions.assertThat((Object)actual).isNull();
        SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location());
        schemaManager.commitChanges(new SchemaChange[]{SchemaChange.updateColumnType((String)"v1", (DataType)DataTypes.BIGINT())});
        actual = runner.take();
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        data = new HashMap();
        data.put("k", "3");
        data.put("v1", "100");
        data.put("v2", "1.0000000000009095");
        expected = new CdcRecord(RowKind.INSERT, data);
        runner.offer(expected);
        actual = runner.poll(1L);
        Assertions.assertThat((Object)actual).isNull();
        schemaManager.commitChanges(new SchemaChange[]{SchemaChange.updateColumnType((String)"v2", (DataType)DataTypes.DOUBLE())});
        actual = runner.take();
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        data = new HashMap();
        data.put("k", "4");
        data.put("v1", "40");
        data.put("v3", "long four");
        expected = new CdcRecord(RowKind.INSERT, data);
        runner.offer(expected);
        actual = runner.poll(1L);
        Assertions.assertThat((Object)actual).isNull();
        schemaManager.commitChanges(new SchemaChange[]{SchemaChange.updateColumnType((String)"v3", (DataType)DataTypes.VARCHAR((int)10))});
        actual = runner.take();
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        data = new HashMap();
        data.put("k", "5");
        data.put("v1", "50");
        data.put("v4", "long five~");
        expected = new CdcRecord(RowKind.INSERT, data);
        runner.offer(expected);
        actual = runner.poll(1L);
        Assertions.assertThat((Object)actual).isNull();
        schemaManager.commitChanges(new SchemaChange[]{SchemaChange.updateColumnType((String)"v4", (DataType)DataTypes.VARBINARY((int)10))});
        actual = runner.take();
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        runner.stop();
        t.join();
        harness.close();
    }

    private OneInputStreamOperatorTestHarness<CdcRecord, Committable> createTestHarness(FileStoreTable table) throws Exception {
        CdcRecordStoreWriteOperator.Factory operatorFactory = new CdcRecordStoreWriteOperator.Factory(table, (StoreSinkWrite.Provider & Serializable)(t, commitUser, state, ioManager, memoryPoolFactory, metricGroup) -> new StoreSinkWriteImpl(t, commitUser, state, ioManager, false, false, true, memoryPoolFactory, metricGroup), this.commitUser);
        JavaSerializer inputSerializer = new JavaSerializer();
        TypeSerializer outputSerializer = new CommittableTypeInfo().createSerializer(new ExecutionConfig());
        OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)operatorFactory, (TypeSerializer)inputSerializer);
        harness.setup(outputSerializer);
        return harness;
    }

    private FileStoreTable createFileStoreTable(RowType rowType, List<String> partitions, List<String> primaryKeys) throws Exception {
        Options conf = new Options();
        conf.set(CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME, (Object)Duration.ofMillis(10L));
        conf.set(CoreOptions.BUCKET, (Object)1);
        TableSchema tableSchema = SchemaUtils.forceCommit((SchemaManager)new SchemaManager((FileIO)LocalFileIO.create(), this.tablePath), (Schema)new Schema(rowType.getFields(), partitions, primaryKeys, conf.toMap(), ""));
        return FileStoreTableFactory.create((FileIO)LocalFileIO.create(), (Path)this.tablePath, (TableSchema)tableSchema);
    }

    private static class Runner
    implements Runnable {
        private final OneInputStreamOperatorTestHarness<CdcRecord, Committable> harness;
        private final BlockingQueue<CdcRecord> toProcess = new LinkedBlockingQueue<CdcRecord>();
        private final BlockingQueue<CdcRecord> processed = new LinkedBlockingQueue<CdcRecord>();
        private final AtomicBoolean running = new AtomicBoolean(true);

        private Runner(OneInputStreamOperatorTestHarness<CdcRecord, Committable> harness) {
            this.harness = harness;
        }

        private void offer(CdcRecord record) {
            this.toProcess.offer(record);
        }

        private CdcRecord take() throws Exception {
            return this.processed.take();
        }

        private CdcRecord poll(long seconds) throws Exception {
            return this.processed.poll(seconds, TimeUnit.SECONDS);
        }

        private void stop() {
            this.running.set(false);
        }

        @Override
        public void run() {
            long timestamp = 0L;
            try {
                while (this.running.get()) {
                    if (this.toProcess.isEmpty()) {
                        Thread.sleep(10L);
                        continue;
                    }
                    CdcRecord record = (CdcRecord)this.toProcess.poll();
                    this.harness.processElement((Object)record, ++timestamp);
                    this.processed.offer(record);
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
}

