/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.MultiTableCommittableTypeInfo;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.flink.sink.cdc.CdcMultiplexRecord;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.CdcRecordStoreMultiWriteOperator;
import org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.AbstractFileStoreWrite;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommonTestUtils;
import org.apache.paimon.utils.TraceableFileIO;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;

public class CdcRecordStoreMultiWriteOperatorTest {
    @TempDir
    java.nio.file.Path tempDir;
    private String commitUser;
    private Path warehouse;
    private String databaseName;
    private Identifier firstTable;
    private Catalog catalog;
    private Identifier secondTable;
    private CatalogLoader catalogLoader;
    private Schema firstTableSchema;

    @BeforeEach
    public void before() throws Exception {
        this.warehouse = new Path("traceable://" + this.tempDir.toString());
        this.commitUser = UUID.randomUUID().toString();
        this.databaseName = "test_db";
        this.firstTable = Identifier.create((String)this.databaseName, (String)"test_table1");
        this.secondTable = Identifier.create((String)this.databaseName, (String)"test_table2");
        this.catalogLoader = this.createCatalogLoader();
        this.catalog = this.catalogLoader.load();
        this.catalog.createDatabase(this.databaseName, true);
        Options conf = new Options();
        conf.set(CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME, (Object)Duration.ofMillis(10L));
        conf.set(CoreOptions.BUCKET, (Object)1);
        RowType rowType1 = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.STRING()}, (String[])new String[]{"pt", "k", "v"});
        RowType rowType2 = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.FLOAT(), DataTypes.VARCHAR((int)5), DataTypes.VARBINARY((int)5)}, (String[])new String[]{"k", "v1", "v2", "v3", "v4"});
        this.firstTableSchema = new Schema(rowType1.getFields(), Collections.singletonList("pt"), Arrays.asList("pt", "k"), conf.toMap(), "");
        this.createTestTables(this.catalog, Tuple2.of((Object)this.firstTable, (Object)this.firstTableSchema), Tuple2.of((Object)this.secondTable, (Object)new Schema(rowType2.getFields(), Collections.emptyList(), Collections.singletonList("k"), conf.toMap(), "")));
    }

    private void createTestTables(Catalog catalog, Tuple2<Identifier, Schema> ... tableSpecs) throws Exception {
        for (Tuple2<Identifier, Schema> spec : tableSpecs) {
            catalog.createTable((Identifier)spec.f0, (Schema)spec.f1, false);
        }
    }

    @AfterEach
    public void after() throws Exception {
        Predicate<Path> pathPredicate = path -> path.toString().contains(this.tempDir.toString());
        Assertions.assertThat((List)TraceableFileIO.openInputStreams(pathPredicate)).isEmpty();
        Assertions.assertThat((List)TraceableFileIO.openOutputStreams(pathPredicate)).isEmpty();
        if (this.catalog != null) {
            this.catalog.close();
        }
    }

    @Test
    @Timeout(value=30L)
    public void testAsyncTableCreate() throws Exception {
        Identifier tableId = Identifier.create((String)this.databaseName, (String)"async_new_table");
        OneInputStreamOperatorTestHarness<CdcMultiplexRecord, MultiTableCommittable> harness = this.createTestHarness(this.catalogLoader);
        harness.open();
        Runner runner = new Runner(harness);
        Thread t = new Thread(runner);
        t.start();
        HashMap<String, String> data = new HashMap<String, String>();
        data.put("pt", "0");
        data.put("k", "1");
        data.put("v", "10");
        CdcMultiplexRecord expected = CdcMultiplexRecord.fromCdcRecord((String)this.databaseName, (String)tableId.getObjectName(), (CdcRecord)new CdcRecord(RowKind.INSERT, data));
        runner.offer(expected);
        CdcMultiplexRecord actual = runner.poll(1L);
        Assertions.assertThat((Object)actual).isNull();
        this.catalog.createTable(tableId, this.firstTableSchema, true);
        actual = runner.take();
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        data = new HashMap();
        data.put("pt", "0");
        data.put("k", "3");
        data.put("v", "30");
        expected = CdcMultiplexRecord.fromCdcRecord((String)this.databaseName, (String)tableId.getObjectName(), (CdcRecord)new CdcRecord(RowKind.INSERT, data));
        runner.offer(expected);
        actual = runner.take();
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        runner.stop();
        t.join();
        harness.close();
    }

    @Test
    @Timeout(value=30L)
    public void testInitializeState() throws Exception {
        Identifier tableId = Identifier.create((String)this.databaseName, (String)"async_new_table");
        long timestamp = 1L;
        OneInputStreamOperatorTestHarness<CdcMultiplexRecord, MultiTableCommittable> harness = this.createTestHarness(this.catalogLoader);
        harness.open();
        Runner runner = new Runner(harness);
        Thread t = new Thread(runner);
        t.start();
        HashMap<String, String> data = new HashMap<String, String>();
        data.put("pt", "0");
        data.put("k", "1");
        data.put("v", "10");
        CdcMultiplexRecord expected = CdcMultiplexRecord.fromCdcRecord((String)this.databaseName, (String)tableId.getObjectName(), (CdcRecord)new CdcRecord(RowKind.INSERT, data));
        runner.offer(expected);
        CdcMultiplexRecord actual = runner.poll(1L);
        Assertions.assertThat((Object)actual).isNull();
        CdcRecordStoreMultiWriteOperator operator = (CdcRecordStoreMultiWriteOperator)harness.getOperator();
        Assertions.assertThat((int)operator.tables().size()).isEqualTo(0);
        Assertions.assertThat((int)operator.writes().size()).isEqualTo(0);
        this.catalog.createTable(tableId, this.firstTableSchema, true);
        actual = runner.take();
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        Assertions.assertThat((int)operator.tables().size()).isEqualTo(1);
        Assertions.assertThat((int)operator.writes().size()).isEqualTo(1);
        data = new HashMap();
        data.put("pt", "0");
        data.put("k", "3");
        data.put("v", "30");
        expected = CdcMultiplexRecord.fromCdcRecord((String)this.databaseName, (String)tableId.getObjectName(), (CdcRecord)new CdcRecord(RowKind.INSERT, data));
        runner.offer(expected);
        actual = runner.take();
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        OperatorSubtaskState snapshot = harness.snapshot(0L, timestamp++);
        String prevCommitUser = this.commitUser;
        this.commitUser = UUID.randomUUID().toString();
        harness.close();
        harness = this.createTestHarness(this.catalogLoader);
        harness.initializeState(snapshot);
        operator = (CdcRecordStoreMultiWriteOperator)harness.getOperator();
        Assertions.assertThat((String)operator.commitUser()).isEqualTo(prevCommitUser);
        runner.stop();
        t.join();
        harness.close();
    }

    @Test
    @Timeout(value=30L)
    public void testSingleTableAddColumn() throws Exception {
        Identifier tableId = this.firstTable;
        FileStoreTable table = (FileStoreTable)this.catalog.getTable(tableId);
        OneInputStreamOperatorTestHarness<CdcMultiplexRecord, MultiTableCommittable> harness = this.createTestHarness(this.catalogLoader);
        harness.open();
        Runner runner = new Runner(harness);
        Thread t = new Thread(runner);
        t.start();
        HashMap<String, String> data = new HashMap<String, String>();
        data.put("pt", "0");
        data.put("k", "1");
        data.put("v", "10");
        CdcMultiplexRecord expected = CdcMultiplexRecord.fromCdcRecord((String)this.databaseName, (String)tableId.getObjectName(), (CdcRecord)new CdcRecord(RowKind.INSERT, data));
        runner.offer(expected);
        CdcMultiplexRecord actual = runner.take();
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        data = new HashMap();
        data.put("pt", "0");
        data.put("k", "2");
        expected = CdcMultiplexRecord.fromCdcRecord((String)this.databaseName, (String)tableId.getObjectName(), (CdcRecord)new CdcRecord(RowKind.INSERT, data));
        runner.offer(expected);
        actual = runner.take();
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        data = new HashMap();
        data.put("pt", "0");
        data.put("k", "3");
        data.put("v", "30");
        data.put("v2", "300");
        expected = CdcMultiplexRecord.fromCdcRecord((String)this.databaseName, (String)tableId.getObjectName(), (CdcRecord)new CdcRecord(RowKind.INSERT, data));
        runner.offer(expected);
        actual = runner.poll(1L);
        Assertions.assertThat((Object)actual).isNull();
        SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location());
        schemaManager.commitChanges(new SchemaChange[]{SchemaChange.addColumn((String)"v2", (DataType)DataTypes.INT())});
        actual = runner.take();
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        runner.stop();
        t.join();
        harness.close();
    }

    private CatalogLoader createCatalogLoader() {
        Options catalogOptions = this.createCatalogOptions(this.warehouse);
        return (CatalogLoader & Serializable)() -> CatalogFactory.createCatalog((CatalogContext)CatalogContext.create((Options)catalogOptions));
    }

    private Options createCatalogOptions(Path warehouse) {
        Options conf = new Options();
        conf.set(CatalogOptions.WAREHOUSE, (Object)warehouse.toString());
        conf.set(CatalogOptions.URI, (Object)"");
        return conf;
    }

    @Test
    @Timeout(value=30L)
    public void testSingleTableUpdateColumnType() throws Exception {
        Identifier tableId = this.secondTable;
        FileStoreTable table = (FileStoreTable)this.catalog.getTable(tableId);
        OneInputStreamOperatorTestHarness<CdcMultiplexRecord, MultiTableCommittable> harness = this.createTestHarness(this.catalogLoader);
        harness.open();
        Runner runner = new Runner(harness);
        Thread t = new Thread(runner);
        t.start();
        HashMap<String, String> data = new HashMap<String, String>();
        data.put("k", "1");
        data.put("v1", "10");
        data.put("v2", "0.625");
        data.put("v3", "one");
        data.put("v4", "b_one");
        CdcMultiplexRecord expected = CdcMultiplexRecord.fromCdcRecord((String)this.databaseName, (String)tableId.getObjectName(), (CdcRecord)new CdcRecord(RowKind.INSERT, data));
        runner.offer(expected);
        CdcMultiplexRecord actual = runner.take();
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        data = new HashMap();
        data.put("k", "2");
        data.put("v1", "12345678987654321");
        data.put("v2", "0.25");
        expected = CdcMultiplexRecord.fromCdcRecord((String)this.databaseName, (String)tableId.getObjectName(), (CdcRecord)new CdcRecord(RowKind.INSERT, data));
        runner.offer(expected);
        actual = runner.poll(1L);
        Assertions.assertThat((Object)actual).isNull();
        SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location());
        schemaManager.commitChanges(new SchemaChange[]{SchemaChange.updateColumnType((String)"v1", (DataType)DataTypes.BIGINT())});
        actual = runner.take();
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        data = new HashMap();
        data.put("k", "3");
        data.put("v1", "100");
        data.put("v2", "1.0000000000009095");
        expected = CdcMultiplexRecord.fromCdcRecord((String)this.databaseName, (String)tableId.getObjectName(), (CdcRecord)new CdcRecord(RowKind.INSERT, data));
        runner.offer(expected);
        actual = runner.poll(1L);
        Assertions.assertThat((Object)actual).isNull();
        schemaManager.commitChanges(new SchemaChange[]{SchemaChange.updateColumnType((String)"v2", (DataType)DataTypes.DOUBLE())});
        actual = runner.take();
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        data = new HashMap();
        data.put("k", "4");
        data.put("v1", "40");
        data.put("v3", "long four");
        expected = CdcMultiplexRecord.fromCdcRecord((String)this.databaseName, (String)tableId.getObjectName(), (CdcRecord)new CdcRecord(RowKind.INSERT, data));
        runner.offer(expected);
        actual = runner.poll(1L);
        Assertions.assertThat((Object)actual).isNull();
        schemaManager.commitChanges(new SchemaChange[]{SchemaChange.updateColumnType((String)"v3", (DataType)DataTypes.VARCHAR((int)10))});
        actual = runner.take();
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        data = new HashMap();
        data.put("k", "5");
        data.put("v1", "50");
        data.put("v4", "long five~");
        expected = CdcMultiplexRecord.fromCdcRecord((String)this.databaseName, (String)tableId.getObjectName(), (CdcRecord)new CdcRecord(RowKind.INSERT, data));
        runner.offer(expected);
        actual = runner.poll(1L);
        Assertions.assertThat((Object)actual).isNull();
        schemaManager.commitChanges(new SchemaChange[]{SchemaChange.updateColumnType((String)"v4", (DataType)DataTypes.VARBINARY((int)10))});
        actual = runner.take();
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        runner.stop();
        t.join();
        harness.close();
    }

    @Test
    @Timeout(value=30L)
    public void testMultiTableUpdateColumnType() throws Exception {
        FileStoreTable table1 = (FileStoreTable)this.catalog.getTable(this.firstTable);
        FileStoreTable table2 = (FileStoreTable)this.catalog.getTable(this.secondTable);
        OneInputStreamOperatorTestHarness<CdcMultiplexRecord, MultiTableCommittable> harness = this.createTestHarness(this.catalogLoader);
        harness.open();
        Runner runner = new Runner(harness);
        Thread t = new Thread(runner);
        t.start();
        HashMap<String, String> data = new HashMap<String, String>();
        data.put("pt", "0");
        data.put("k", "1");
        data.put("v", "10");
        CdcMultiplexRecord expected = CdcMultiplexRecord.fromCdcRecord((String)this.databaseName, (String)this.firstTable.getObjectName(), (CdcRecord)new CdcRecord(RowKind.INSERT, data));
        runner.offer(expected);
        CdcMultiplexRecord actual = runner.take();
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        data = new HashMap();
        data.put("k", "1");
        data.put("v1", "10");
        data.put("v2", "0.625");
        data.put("v3", "one");
        data.put("v4", "b_one");
        expected = CdcMultiplexRecord.fromCdcRecord((String)this.databaseName, (String)this.secondTable.getObjectName(), (CdcRecord)new CdcRecord(RowKind.INSERT, data));
        runner.offer(expected);
        actual = runner.take();
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        data = new HashMap();
        data.put("pt", "1");
        data.put("k", "2");
        data.put("v", "varchar");
        data.put("v2", "hello");
        expected = CdcMultiplexRecord.fromCdcRecord((String)this.databaseName, (String)this.firstTable.getObjectName(), (CdcRecord)new CdcRecord(RowKind.INSERT, data));
        runner.offer(expected);
        actual = runner.poll(1L);
        Assertions.assertThat((Object)actual).isNull();
        SchemaManager schemaManager = new SchemaManager(table1.fileIO(), table1.location());
        schemaManager.commitChanges(new SchemaChange[]{SchemaChange.addColumn((String)"v2", (DataType)DataTypes.STRING())});
        actual = runner.take();
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        data = new HashMap();
        data.put("k", "2");
        data.put("v1", "12345678987654321");
        data.put("v2", "0.25");
        expected = CdcMultiplexRecord.fromCdcRecord((String)this.databaseName, (String)this.secondTable.getObjectName(), (CdcRecord)new CdcRecord(RowKind.INSERT, data));
        runner.offer(expected);
        actual = runner.poll(1L);
        Assertions.assertThat((Object)actual).isNull();
        schemaManager = new SchemaManager(table2.fileIO(), table2.location());
        schemaManager.commitChanges(new SchemaChange[]{SchemaChange.updateColumnType((String)"v1", (DataType)DataTypes.BIGINT())});
        actual = runner.take();
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        data = new HashMap();
        data.put("k", "3");
        data.put("v1", "100");
        data.put("v2", "1.0000000000009095");
        expected = CdcMultiplexRecord.fromCdcRecord((String)this.databaseName, (String)this.secondTable.getObjectName(), (CdcRecord)new CdcRecord(RowKind.INSERT, data));
        runner.offer(expected);
        actual = runner.poll(1L);
        Assertions.assertThat((Object)actual).isNull();
        schemaManager = new SchemaManager(table2.fileIO(), table2.location());
        schemaManager.commitChanges(new SchemaChange[]{SchemaChange.updateColumnType((String)"v2", (DataType)DataTypes.DOUBLE())});
        actual = runner.take();
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        data = new HashMap();
        data.put("k", "4");
        data.put("v1", "40");
        data.put("v3", "long four");
        expected = CdcMultiplexRecord.fromCdcRecord((String)this.databaseName, (String)this.secondTable.getObjectName(), (CdcRecord)new CdcRecord(RowKind.INSERT, data));
        runner.offer(expected);
        actual = runner.poll(1L);
        Assertions.assertThat((Object)actual).isNull();
        schemaManager = new SchemaManager(table2.fileIO(), table2.location());
        schemaManager.commitChanges(new SchemaChange[]{SchemaChange.updateColumnType((String)"v3", (DataType)DataTypes.VARCHAR((int)10))});
        actual = runner.take();
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        data = new HashMap();
        data.put("k", "5");
        data.put("v1", "50");
        data.put("v4", "long five~");
        expected = CdcMultiplexRecord.fromCdcRecord((String)this.databaseName, (String)this.secondTable.getObjectName(), (CdcRecord)new CdcRecord(RowKind.INSERT, data));
        runner.offer(expected);
        actual = runner.poll(1L);
        Assertions.assertThat((Object)actual).isNull();
        schemaManager.commitChanges(new SchemaChange[]{SchemaChange.updateColumnType((String)"v4", (DataType)DataTypes.VARBINARY((int)10))});
        actual = runner.take();
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
        runner.stop();
        t.join();
        harness.close();
    }

    @Test
    @Timeout(value=30L)
    public void testUsingTheSameCompactExecutor() throws Exception {
        OneInputStreamOperatorTestHarness<CdcMultiplexRecord, MultiTableCommittable> harness = this.createTestHarness(this.catalogLoader);
        harness.open();
        Runner runner = new Runner(harness);
        Thread t = new Thread(runner);
        t.start();
        HashMap<String, String> data = new HashMap<String, String>();
        data.put("pt", "0");
        data.put("k", "1");
        data.put("v", "10");
        CdcMultiplexRecord expected = CdcMultiplexRecord.fromCdcRecord((String)this.databaseName, (String)this.firstTable.getObjectName(), (CdcRecord)new CdcRecord(RowKind.INSERT, data));
        runner.offer(expected);
        data = new HashMap();
        data.put("k", "1");
        data.put("v1", "10");
        data.put("v2", "0.625");
        data.put("v3", "one");
        data.put("v4", "b_one");
        expected = CdcMultiplexRecord.fromCdcRecord((String)this.databaseName, (String)this.secondTable.getObjectName(), (CdcRecord)new CdcRecord(RowKind.INSERT, data));
        runner.offer(expected);
        CdcRecordStoreMultiWriteOperator operator = (CdcRecordStoreMultiWriteOperator)harness.getOperator();
        CommonTestUtils.waitUtil(() -> operator.writes().size() == 2, (Duration)Duration.ofSeconds(5L), (Duration)Duration.ofMillis(100L));
        ArrayList storeSinkWrites = new ArrayList(operator.writes().values());
        ArrayList<ExecutorService> compactExecutors = new ArrayList<ExecutorService>();
        for (StoreSinkWrite storeSinkWrite : storeSinkWrites) {
            StoreSinkWriteImpl storeSinkWriteImpl = (StoreSinkWriteImpl)storeSinkWrite;
            compactExecutors.add(((AbstractFileStoreWrite)storeSinkWriteImpl.getWrite().getWrite()).getCompactExecutor());
        }
        Assertions.assertThat((compactExecutors.get(0) == compactExecutors.get(1) ? 1 : 0) != 0).isTrue();
        ExecutorService compactExecutor = (ExecutorService)compactExecutors.get(0);
        for (StoreSinkWrite storeSinkWrite : storeSinkWrites) {
            storeSinkWrite.close();
            Assertions.assertThat((boolean)compactExecutor.isShutdown()).isFalse();
        }
        operator.close();
        Assertions.assertThat((boolean)compactExecutor.isShutdown()).isTrue();
        runner.stop();
        t.join();
        harness.close();
    }

    private OneInputStreamOperatorTestHarness<CdcMultiplexRecord, MultiTableCommittable> createTestHarness(CatalogLoader catalogLoader) throws Exception {
        CdcRecordStoreMultiWriteOperator.Factory operatorFactory = new CdcRecordStoreMultiWriteOperator.Factory(catalogLoader, (StoreSinkWrite.Provider & Serializable)(t, commitUser, state, ioManager, memoryPoolFactory, metricGroup) -> new StoreSinkWriteImpl(t, commitUser, state, ioManager, false, false, true, memoryPoolFactory, metricGroup), this.commitUser, Options.fromMap(new HashMap()));
        JavaSerializer inputSerializer = new JavaSerializer();
        TypeSerializer outputSerializer = new MultiTableCommittableTypeInfo().createSerializer(new ExecutionConfig());
        OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)operatorFactory, (TypeSerializer)inputSerializer);
        harness.setup(outputSerializer);
        return harness;
    }

    private static class Runner
    implements Runnable {
        private final OneInputStreamOperatorTestHarness<CdcMultiplexRecord, MultiTableCommittable> harness;
        private final BlockingQueue<CdcMultiplexRecord> toProcess = new LinkedBlockingQueue<CdcMultiplexRecord>();
        private final BlockingQueue<CdcMultiplexRecord> processed = new LinkedBlockingQueue<CdcMultiplexRecord>();
        private final AtomicBoolean running = new AtomicBoolean(true);

        private Runner(OneInputStreamOperatorTestHarness<CdcMultiplexRecord, MultiTableCommittable> harness) {
            this.harness = harness;
        }

        private void offer(CdcMultiplexRecord record) {
            this.toProcess.offer(record);
        }

        private CdcMultiplexRecord take() throws Exception {
            return this.processed.take();
        }

        private CdcMultiplexRecord poll(long seconds) throws Exception {
            return this.processed.poll(seconds, TimeUnit.SECONDS);
        }

        private void stop() {
            this.running.set(false);
        }

        @Override
        public void run() {
            long timestamp = 0L;
            try {
                while (this.running.get()) {
                    if (this.toProcess.isEmpty()) {
                        Thread.sleep(10L);
                        continue;
                    }
                    CdcMultiplexRecord record = (CdcMultiplexRecord)this.toProcess.poll();
                    this.harness.processElement((Object)record, ++timestamp);
                    this.processed.offer(record);
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
}

