/*
 * Decompiled with CFR 0.152.
 */
package org.apache.distributedlog;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.distributedlog.BKAsyncLogWriter;
import org.apache.distributedlog.BKDistributedLogManager;
import org.apache.distributedlog.DLMTestUtil;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.LogRecord;
import org.apache.distributedlog.TestDistributedLogBase;
import org.apache.distributedlog.exceptions.LogRecordTooLongException;
import org.apache.distributedlog.exceptions.WriteCancelledException;
import org.apache.distributedlog.exceptions.WriteException;
import org.apache.distributedlog.util.FailpointUtils;
import org.apache.distributedlog.util.Utils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestAsyncBulkWrite
extends TestDistributedLogBase {
    static final Logger LOG = LoggerFactory.getLogger(TestAsyncBulkWrite.class);
    @Rule
    public TestName runtime = new TestName();
    protected final DistributedLogConfiguration testConf = new DistributedLogConfiguration();

    public TestAsyncBulkWrite() {
        this.testConf.addConfiguration(conf);
        this.testConf.setReaderIdleErrorThresholdMillis(1200000);
    }

    @Test(timeout=60000L)
    public void testAsyncBulkWritePartialFailureBufferFailure() throws Exception {
        int i;
        String name = "distrlog-testAsyncBulkWritePartialFailure";
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(this.testConf);
        confLocal.setOutputBufferSize(1024);
        BKDistributedLogManager dlm = this.createNewDLM(confLocal, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter)dlm.startAsyncLogSegmentNonPartitioned();
        int goodRecs = 10;
        List<LogRecord> records = DLMTestUtil.getLargeLogRecordInstanceList(1L, 10);
        records.add(DLMTestUtil.getLogRecordInstance(10L, 1040385));
        records.addAll(DLMTestUtil.getLargeLogRecordInstanceList(1L, 10));
        CompletableFuture<List<CompletableFuture<DLSN>>> futureResults = writer.writeBulk(records);
        List<CompletableFuture<DLSN>> results = DLMTestUtil.validateFutureSucceededAndGetResult(futureResults);
        Assert.assertEquals((long)21L, (long)results.size());
        for (i = 0; i < 10; ++i) {
            DLSN dLSN = DLMTestUtil.validateFutureSucceededAndGetResult(results.get(i));
        }
        DLMTestUtil.validateFutureFailed(results.get(10), LogRecordTooLongException.class);
        for (i = 11; i < 21; ++i) {
            DLMTestUtil.validateFutureFailed(results.get(i), WriteCancelledException.class);
        }
        writer.closeAndComplete();
        dlm.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testAsyncBulkWriteTotalFailureTransmitFailure() throws Exception {
        String name = "distrlog-testAsyncBulkWriteTotalFailureDueToTransmitFailure";
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(this.testConf);
        confLocal.setOutputBufferSize(1024);
        BKDistributedLogManager dlm = this.createNewDLM(confLocal, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter)dlm.startAsyncLogSegmentNonPartitioned();
        int batchSize = 100;
        FailpointUtils.setFailpoint(FailpointUtils.FailPointName.FP_TransmitComplete, FailpointUtils.FailPointActions.FailPointAction_Default);
        try {
            this.checkAllSubmittedButFailed(writer, 100, 1024, 1L);
        }
        finally {
            FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_TransmitComplete);
        }
        writer.abort();
        dlm.close();
    }

    @Test(timeout=60000L)
    public void testAsyncBulkWriteNoLedgerRollWithPartialFailures() throws Exception {
        String name = "distrlog-testAsyncBulkWriteNoLedgerRollWithPartialFailures";
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(this.testConf);
        confLocal.setOutputBufferSize(1024);
        confLocal.setMaxLogSegmentBytes(1024L);
        confLocal.setLogSegmentRollingIntervalMinutes(0);
        BKDistributedLogManager dlm = this.createNewDLM(confLocal, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter)dlm.startAsyncLogSegmentNonPartitioned();
        int txid = 1;
        LogRecord record = DLMTestUtil.getLogRecordInstance(txid++, 2048);
        CompletableFuture<DLSN> result = writer.write(record);
        DLSN dlsn = DLMTestUtil.validateFutureSucceededAndGetResult(result);
        Assert.assertEquals((long)1L, (long)dlsn.getLogSegmentSequenceNo());
        ArrayList<LogRecord> records = null;
        CompletableFuture<List<CompletableFuture<DLSN>>> futureResults = null;
        List<CompletableFuture<DLSN>> results = null;
        records = new ArrayList<LogRecord>(2);
        records.add(DLMTestUtil.getLogRecordInstance(txid++, 2048));
        records.add(DLMTestUtil.getLogRecordInstance(txid++, 1040385));
        futureResults = writer.writeBulk(records);
        results = DLMTestUtil.validateFutureSucceededAndGetResult(futureResults);
        result = results.get(0);
        dlsn = DLMTestUtil.validateFutureSucceededAndGetResult(result);
        Assert.assertEquals((long)1L, (long)dlsn.getLogSegmentSequenceNo());
        records = new ArrayList(1);
        records.add(DLMTestUtil.getLogRecordInstance(txid++, 2048));
        futureResults = writer.writeBulk(records);
        DLMTestUtil.validateFutureFailed(futureResults, WriteException.class);
        writer.closeAndComplete();
        dlm.close();
    }

    @Test(timeout=60000L)
    public void testSimpleAsyncBulkWriteSpanningEntryAndLedger() throws Exception {
        String name = "distrlog-testSimpleAsyncBulkWriteSpanningEntryAndLedger";
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(this.testConf);
        confLocal.setOutputBufferSize(1024);
        BKDistributedLogManager dlm = this.createNewDLM(confLocal, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter)dlm.startAsyncLogSegmentNonPartitioned();
        int batchSize = 100;
        int recSize = 1024;
        long ledgerIndex = 1L;
        long entryIndex = 0L;
        long slotIndex = 0L;
        long txIndex = 1L;
        this.checkAllSucceeded(writer, batchSize, recSize, ledgerIndex, entryIndex, slotIndex, txIndex);
        slotIndex = 0L;
        this.checkAllSucceeded(writer, batchSize, recSize, ledgerIndex, ++entryIndex, slotIndex, txIndex += (long)batchSize);
        entryIndex = 0L;
        slotIndex = 0L;
        writer.closeAndComplete();
        writer = (BKAsyncLogWriter)dlm.startAsyncLogSegmentNonPartitioned();
        this.checkAllSucceeded(writer, batchSize, recSize, ++ledgerIndex, entryIndex, slotIndex, txIndex += (long)batchSize);
        writer.closeAndComplete();
        dlm.close();
    }

    @Test(timeout=60000L)
    public void testAsyncBulkWriteSpanningPackets() throws Exception {
        String name = "distrlog-testAsyncBulkWriteSpanningPackets";
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(this.testConf);
        confLocal.setOutputBufferSize(1024);
        BKDistributedLogManager dlm = this.createNewDLM(confLocal, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter)dlm.startAsyncLogSegmentNonPartitioned();
        int numTransmissions = 4;
        int recSize = 10240;
        int batchSize = (numTransmissions * 1044480 + 1) / recSize;
        long ledgerIndex = 1L;
        long entryIndex = 0L;
        long slotIndex = 0L;
        long txIndex = 1L;
        DLSN dlsn = this.checkAllSucceeded(writer, batchSize, recSize, ledgerIndex, entryIndex, slotIndex, txIndex);
        Assert.assertEquals((long)4L, (long)dlsn.getEntryId());
        Assert.assertEquals((long)1L, (long)dlsn.getLogSegmentSequenceNo());
        writer.closeAndComplete();
        dlm.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testAsyncBulkWriteSpanningPacketsWithTransmitFailure() throws Exception {
        String name = "distrlog-testAsyncBulkWriteSpanningPacketsWithTransmitFailure";
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(this.testConf);
        confLocal.setOutputBufferSize(1024);
        BKDistributedLogManager dlm = this.createNewDLM(confLocal, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter)dlm.startAsyncLogSegmentNonPartitioned();
        int numTransmissions = 4;
        int recSize = 10240;
        int batchSize = (numTransmissions * 1044480 + 1) / recSize;
        long ledgerIndex = 1L;
        long entryIndex = 0L;
        long slotIndex = 0L;
        long txIndex = 1L;
        DLSN dlsn = this.checkAllSucceeded(writer, batchSize, recSize, ledgerIndex, entryIndex, slotIndex, txIndex);
        Assert.assertEquals((long)4L, (long)dlsn.getEntryId());
        Assert.assertEquals((long)1L, (long)dlsn.getLogSegmentSequenceNo());
        FailpointUtils.setFailpoint(FailpointUtils.FailPointName.FP_TransmitComplete, FailpointUtils.FailPointActions.FailPointAction_Default);
        try {
            this.checkAllSubmittedButFailed(writer, batchSize, recSize, 1L);
        }
        finally {
            FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_TransmitComplete);
        }
        writer.abort();
        dlm.close();
    }

    private DLSN checkAllSucceeded(BKAsyncLogWriter writer, int batchSize, int recSize, long ledgerIndex, long entryIndex, long slotIndex, long txIndex) throws Exception {
        List<LogRecord> records = DLMTestUtil.getLogRecordInstanceList(txIndex, batchSize, recSize);
        CompletableFuture<List<CompletableFuture<DLSN>>> futureResults = writer.writeBulk(records);
        Assert.assertNotNull(futureResults);
        List<CompletableFuture<DLSN>> results = Utils.ioResult(futureResults, 10L, TimeUnit.SECONDS);
        Assert.assertNotNull(results);
        Assert.assertEquals((long)results.size(), (long)records.size());
        long prevEntryId = 0L;
        DLSN lastDlsn = null;
        for (CompletableFuture<DLSN> result : results) {
            DLSN dlsn;
            lastDlsn = dlsn = Utils.ioResult(result, 10L, TimeUnit.SECONDS);
            if (dlsn.getEntryId() > prevEntryId) {
                slotIndex = 0L;
            }
            Assert.assertEquals((long)ledgerIndex, (long)dlsn.getLogSegmentSequenceNo());
            Assert.assertEquals((long)slotIndex, (long)dlsn.getSlotId());
            ++slotIndex;
            prevEntryId = dlsn.getEntryId();
        }
        return lastDlsn;
    }

    private void checkAllSubmittedButFailed(BKAsyncLogWriter writer, int batchSize, int recSize, long txIndex) throws Exception {
        List<LogRecord> records = DLMTestUtil.getLogRecordInstanceList(txIndex, batchSize, recSize);
        CompletableFuture<List<CompletableFuture<DLSN>>> futureResults = writer.writeBulk(records);
        Assert.assertNotNull(futureResults);
        List<CompletableFuture<DLSN>> results = Utils.ioResult(futureResults, 10L, TimeUnit.SECONDS);
        Assert.assertNotNull(results);
        Assert.assertEquals((long)results.size(), (long)records.size());
        for (CompletableFuture<DLSN> result : results) {
            DLMTestUtil.validateFutureFailed(result, IOException.class);
        }
    }
}

