/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.base;

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.Sizeable;
import io.debezium.util.LoggingContext;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class ChangeEventQueueTest {
    private static final DataChangeEvent EVENT = new DataChangeEvent(new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "dummy", Schema.STRING_SCHEMA, (Object)"Change Data Capture Even via Debezium"));
    private final int noOfWriters;
    private final int noOfReaders;
    private final int noOfEventsPerWriter;
    private final long totalNoOfEvents;
    private final Thread[] writers;
    private final Thread[] readers;
    private final AtomicLong recordsRead;

    public ChangeEventQueueTest(int noOfWriters, int noOfReaders, int noOfEventsPerWriter) {
        this.noOfWriters = noOfWriters;
        this.noOfReaders = noOfReaders;
        this.noOfEventsPerWriter = noOfEventsPerWriter;
        this.totalNoOfEvents = (long)noOfWriters * (long)noOfEventsPerWriter;
        this.writers = new Thread[noOfWriters];
        this.readers = new Thread[noOfReaders];
        this.recordsRead = new AtomicLong();
    }

    @Parameterized.Parameters(name="{index}: testQueue({0} writers, {1} readers, {2} events)")
    public static Collection<Object[]> data() {
        int[] writers = new int[]{1, 2, 4, 8, 16};
        int[] readers = new int[]{1, 2, 4, 8, 16};
        int totalEvents = 1000000;
        Object[][] params = new Object[writers.length * readers.length][];
        int index = 0;
        for (int writer : writers) {
            for (int reader : readers) {
                params[index++] = new Object[]{writer, reader, totalEvents};
            }
        }
        return Arrays.asList(params);
    }

    @Before
    public void setup() {
        int i;
        ChangeEventQueue queue = new ChangeEventQueue.Builder().maxBatchSize(8192).maxQueueSize(16384).loggingContextSupplier(() -> LoggingContext.forConnector((String)"a", (String)"b", (String)"c")).pollInterval(Duration.ofMillis(500L)).build();
        for (i = 0; i < this.noOfWriters; ++i) {
            this.writers[i] = ChangeEventQueueTest.getWriter((ChangeEventQueue<DataChangeEvent>)queue, this.noOfEventsPerWriter);
        }
        for (i = 0; i < this.noOfReaders; ++i) {
            this.readers[i] = ChangeEventQueueTest.getReader((ChangeEventQueue<DataChangeEvent>)queue, this.totalNoOfEvents, this.recordsRead);
        }
    }

    @Test
    public void shouldQueueAndPollMessages() throws InterruptedException {
        for (Thread thread : this.writers) {
            thread.start();
        }
        for (Thread thread : this.readers) {
            thread.start();
        }
        long maxWaitTimeout = TimeUnit.SECONDS.toMillis(10L);
        for (Thread writer : this.writers) {
            writer.join(maxWaitTimeout);
        }
        for (Thread reader : this.readers) {
            reader.join(maxWaitTimeout);
        }
        Assert.assertEquals((long)this.totalNoOfEvents, (long)this.recordsRead.get());
    }

    @After
    public void teardown() {
        for (Thread thread : this.writers) {
            thread.interrupt();
        }
        for (Thread thread : this.readers) {
            thread.interrupt();
        }
    }

    private static Thread getWriter(ChangeEventQueue<DataChangeEvent> queue, int noOfEvents) {
        return new Thread(() -> {
            for (int i = 0; i < noOfEvents; ++i) {
                try {
                    queue.doEnqueue((Sizeable)EVENT);
                    continue;
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
        });
    }

    private static Thread getReader(ChangeEventQueue<DataChangeEvent> queue, long totalNoOfEvents, AtomicLong recordsRead) {
        return new Thread(() -> {
            while (recordsRead.get() < totalNoOfEvents) {
                try {
                    recordsRead.addAndGet(queue.poll().size());
                }
                catch (InterruptedException interruptedException) {}
            }
        });
    }
}

