/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.InvalidPidMappingException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.AlwaysContinueProductionExceptionHandler;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
import org.apache.kafka.streams.processor.internals.StreamsProducer;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.log4j.Level;
import org.apache.log4j.Priority;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.hamcrest.core.IsInstanceOf;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(value=MockitoJUnitRunner.StrictStubs.class)
public class RecordCollectorTest {
    private final LogContext logContext = new LogContext("test ");
    private final TaskId taskId = new TaskId(0, 0);
    private final ProductionExceptionHandler productionExceptionHandler = new DefaultProductionExceptionHandler();
    private final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(new Metrics());
    private final StreamsConfig config = new StreamsConfig(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)"appId"), Utils.mkEntry((Object)"bootstrap.servers", (Object)"dummy:1234")}));
    private final StreamsConfig eosConfig = new StreamsConfig(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)"appId"), Utils.mkEntry((Object)"bootstrap.servers", (Object)"dummy:1234"), Utils.mkEntry((Object)"processing.guarantee", (Object)"exactly_once_v2")}));
    private final String topic = "topic";
    private final String sinkNodeName = "output-node";
    private final Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), Arrays.asList(new PartitionInfo("topic", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic", 2, Node.noNode(), new Node[0], new Node[0])), Collections.emptySet(), Collections.emptySet());
    private final StringSerializer stringSerializer = new StringSerializer();
    private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
    private final UUID processId = UUID.randomUUID();
    private final StreamPartitioner<String, Object> streamPartitioner = (topic, key, value, numPartitions) -> Integer.parseInt(key) % numPartitions;
    private MockProducer<byte[], byte[]> mockProducer;
    private StreamsProducer streamsProducer;
    private ProcessorTopology topology;
    private final InternalProcessorContext<Void, Void> context = new InternalMockProcessorContext();
    private RecordCollectorImpl collector;

    @Before
    public void setup() {
        MockClientSupplier clientSupplier = new MockClientSupplier();
        clientSupplier.setCluster(this.cluster);
        this.streamsProducer = new StreamsProducer(this.config, this.processId + "-StreamThread-1", (KafkaClientSupplier)clientSupplier, null, this.processId, this.logContext, Time.SYSTEM);
        this.mockProducer = clientSupplier.producers.get(0);
        SinkNode sinkNode = new SinkNode("output-node", (TopicNameExtractor)new StaticTopicNameExtractor("topic"), (Serializer)this.stringSerializer, (Serializer)this.byteArraySerializer, this.streamPartitioner);
        this.topology = new ProcessorTopology(Collections.emptyList(), Collections.emptyMap(), Collections.singletonMap("topic", sinkNode), Collections.emptyList(), Collections.emptyList(), Collections.emptyMap(), Collections.emptySet(), Collections.emptyMap());
        this.collector = new RecordCollectorImpl(this.logContext, this.taskId, this.streamsProducer, this.productionExceptionHandler, this.streamsMetrics, this.topology);
    }

    @After
    public void cleanup() {
        this.collector.closeClean();
    }

    @Test
    public void shouldRecordRecordsAndBytesProduced() {
        RecordHeaders headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
        String threadId = Thread.currentThread().getName();
        String processorNodeId = "output-node";
        String topic = "topic";
        Metric recordsProduced = (Metric)this.streamsMetrics.metrics().get(new MetricName("records-produced-total", "stream-topic-metrics", "The total number of records produced from this topic", this.streamsMetrics.topicLevelTagMap(threadId, this.taskId.toString(), "output-node", "topic")));
        Metric bytesProduced = (Metric)this.streamsMetrics.metrics().get(new MetricName("bytes-produced-total", "stream-topic-metrics", "The total number of bytes produced from this topic", this.streamsMetrics.topicLevelTagMap(threadId, this.taskId.toString(), "output-node", "topic")));
        double totalRecords = 0.0;
        double totalBytes = 0.0;
        MatcherAssert.assertThat((Object)recordsProduced.metricValue(), (Matcher)IsEqual.equalTo((Object)totalRecords));
        MatcherAssert.assertThat((Object)bytesProduced.metricValue(), (Matcher)IsEqual.equalTo((Object)totalBytes));
        this.collector.send("topic", (Object)"999", (Object)"0", null, Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, "output-node", this.context);
        MatcherAssert.assertThat((Object)recordsProduced.metricValue(), (Matcher)IsEqual.equalTo((Object)(totalRecords += 1.0)));
        MatcherAssert.assertThat((Object)bytesProduced.metricValue(), (Matcher)IsEqual.equalTo((Object)(totalBytes += (double)ClientUtils.producerRecordSizeInBytes((ProducerRecord)((ProducerRecord)this.mockProducer.history().get(0))))));
        this.collector.send("topic", (Object)"999", (Object)"0", (Headers)headers, Integer.valueOf(1), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, "output-node", this.context);
        MatcherAssert.assertThat((Object)recordsProduced.metricValue(), (Matcher)IsEqual.equalTo((Object)(totalRecords += 1.0)));
        MatcherAssert.assertThat((Object)bytesProduced.metricValue(), (Matcher)IsEqual.equalTo((Object)(totalBytes += (double)ClientUtils.producerRecordSizeInBytes((ProducerRecord)((ProducerRecord)this.mockProducer.history().get(1))))));
        this.collector.send("topic", (Object)"999", (Object)"0", null, Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, "output-node", this.context);
        MatcherAssert.assertThat((Object)recordsProduced.metricValue(), (Matcher)IsEqual.equalTo((Object)(totalRecords += 1.0)));
        MatcherAssert.assertThat((Object)bytesProduced.metricValue(), (Matcher)IsEqual.equalTo((Object)(totalBytes += (double)ClientUtils.producerRecordSizeInBytes((ProducerRecord)((ProducerRecord)this.mockProducer.history().get(2))))));
        this.collector.send("topic", (Object)"999", (Object)"0", (Headers)headers, Integer.valueOf(1), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, "output-node", this.context);
        MatcherAssert.assertThat((Object)recordsProduced.metricValue(), (Matcher)IsEqual.equalTo((Object)(totalRecords += 1.0)));
        MatcherAssert.assertThat((Object)bytesProduced.metricValue(), (Matcher)IsEqual.equalTo((Object)(totalBytes += (double)ClientUtils.producerRecordSizeInBytes((ProducerRecord)((ProducerRecord)this.mockProducer.history().get(3))))));
        this.collector.send("topic", (Object)"999", (Object)"0", null, Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, "output-node", this.context);
        MatcherAssert.assertThat((Object)recordsProduced.metricValue(), (Matcher)IsEqual.equalTo((Object)(totalRecords += 1.0)));
        MatcherAssert.assertThat((Object)bytesProduced.metricValue(), (Matcher)IsEqual.equalTo((Object)(totalBytes += (double)ClientUtils.producerRecordSizeInBytes((ProducerRecord)((ProducerRecord)this.mockProducer.history().get(4))))));
    }

    @Test
    public void shouldSendToSpecificPartition() {
        RecordHeaders headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
        this.collector.send("topic", (Object)"999", (Object)"0", null, Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context);
        this.collector.send("topic", (Object)"999", (Object)"0", null, Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context);
        this.collector.send("topic", (Object)"999", (Object)"0", null, Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context);
        this.collector.send("topic", (Object)"999", (Object)"0", (Headers)headers, Integer.valueOf(1), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context);
        this.collector.send("topic", (Object)"999", (Object)"0", (Headers)headers, Integer.valueOf(1), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context);
        this.collector.send("topic", (Object)"999", (Object)"0", (Headers)headers, Integer.valueOf(2), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context);
        Map offsets = this.collector.offsets();
        Assert.assertEquals((long)2L, (long)((Long)offsets.get(new TopicPartition("topic", 0))));
        Assert.assertEquals((long)1L, (long)((Long)offsets.get(new TopicPartition("topic", 1))));
        Assert.assertEquals((long)0L, (long)((Long)offsets.get(new TopicPartition("topic", 2))));
        Assert.assertEquals((long)6L, (long)this.mockProducer.history().size());
        this.collector.send("topic", (Object)"999", (Object)"0", null, Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context);
        this.collector.send("topic", (Object)"999", (Object)"0", null, Integer.valueOf(1), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context);
        this.collector.send("topic", (Object)"999", (Object)"0", (Headers)headers, Integer.valueOf(2), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context);
        offsets = this.collector.offsets();
        Assert.assertEquals((long)3L, (long)((Long)offsets.get(new TopicPartition("topic", 0))));
        Assert.assertEquals((long)2L, (long)((Long)offsets.get(new TopicPartition("topic", 1))));
        Assert.assertEquals((long)1L, (long)((Long)offsets.get(new TopicPartition("topic", 2))));
        Assert.assertEquals((long)9L, (long)this.mockProducer.history().size());
    }

    @Test
    public void shouldSendWithPartitioner() {
        RecordHeaders headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
        this.collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, this.streamPartitioner);
        this.collector.send("topic", (Object)"9", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, this.streamPartitioner);
        this.collector.send("topic", (Object)"27", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, this.streamPartitioner);
        this.collector.send("topic", (Object)"81", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, this.streamPartitioner);
        this.collector.send("topic", (Object)"243", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, this.streamPartitioner);
        this.collector.send("topic", (Object)"28", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, this.streamPartitioner);
        this.collector.send("topic", (Object)"82", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, this.streamPartitioner);
        this.collector.send("topic", (Object)"244", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, this.streamPartitioner);
        this.collector.send("topic", (Object)"245", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, this.streamPartitioner);
        Map offsets = this.collector.offsets();
        Assert.assertEquals((long)4L, (long)((Long)offsets.get(new TopicPartition("topic", 0))));
        Assert.assertEquals((long)2L, (long)((Long)offsets.get(new TopicPartition("topic", 1))));
        Assert.assertEquals((long)0L, (long)((Long)offsets.get(new TopicPartition("topic", 2))));
        Assert.assertEquals((long)9L, (long)this.mockProducer.history().size());
        TopicPartition topicPartition = new TopicPartition("topic", 0);
        Assert.assertThrows(UnsupportedOperationException.class, () -> offsets.put(topicPartition, 50L));
    }

    @Test
    public void shouldSendOnlyToEvenPartitions() {
        class EvenPartitioner
        implements StreamPartitioner<String, Object> {
            EvenPartitioner() {
            }

            @Deprecated
            public Integer partition(String topic, String key, Object value, int numPartitions) {
                return null;
            }

            public Optional<Set<Integer>> partitions(String topic, String key, Object value, int numPartitions) {
                HashSet<Integer> partitions = new HashSet<Integer>();
                for (int i = 0; i < numPartitions; i += 2) {
                    partitions.add(i);
                }
                return Optional.of(partitions);
            }
        }
        EvenPartitioner evenPartitioner = new EvenPartitioner();
        SinkNode sinkNode = new SinkNode("output-node", (TopicNameExtractor)new StaticTopicNameExtractor("topic"), (Serializer)this.stringSerializer, (Serializer)this.byteArraySerializer, (StreamPartitioner)evenPartitioner);
        this.topology = new ProcessorTopology(Collections.emptyList(), Collections.emptyMap(), Collections.singletonMap("topic", sinkNode), Collections.emptyList(), Collections.emptyList(), Collections.emptyMap(), Collections.emptySet(), Collections.emptyMap());
        this.collector = new RecordCollectorImpl(this.logContext, this.taskId, this.streamsProducer, this.productionExceptionHandler, this.streamsMetrics, this.topology);
        RecordHeaders headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
        this.collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)evenPartitioner);
        this.collector.send("topic", (Object)"9", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)evenPartitioner);
        this.collector.send("topic", (Object)"27", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)evenPartitioner);
        this.collector.send("topic", (Object)"81", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)evenPartitioner);
        this.collector.send("topic", (Object)"243", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)evenPartitioner);
        this.collector.send("topic", (Object)"28", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)evenPartitioner);
        this.collector.send("topic", (Object)"82", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)evenPartitioner);
        this.collector.send("topic", (Object)"244", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)evenPartitioner);
        this.collector.send("topic", (Object)"245", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)evenPartitioner);
        Map offsets = this.collector.offsets();
        Assert.assertEquals((long)8L, (long)((Long)offsets.get(new TopicPartition("topic", 0))));
        Assert.assertFalse((boolean)offsets.containsKey(new TopicPartition("topic", 1)));
        Assert.assertEquals((long)8L, (long)((Long)offsets.get(new TopicPartition("topic", 2))));
        Assert.assertEquals((long)18L, (long)this.mockProducer.history().size());
        TopicPartition topicPartition = new TopicPartition("topic", 0);
        Assert.assertThrows(UnsupportedOperationException.class, () -> offsets.put(topicPartition, 50L));
    }

    @Test
    public void shouldBroadcastToAllPartitions() {
        class BroadcastingPartitioner
        implements StreamPartitioner<String, Object> {
            BroadcastingPartitioner() {
            }

            @Deprecated
            public Integer partition(String topic, String key, Object value, int numPartitions) {
                return null;
            }

            public Optional<Set<Integer>> partitions(String topic, String key, Object value, int numPartitions) {
                return Optional.of(IntStream.range(0, numPartitions).boxed().collect(Collectors.toSet()));
            }
        }
        BroadcastingPartitioner broadcastingPartitioner = new BroadcastingPartitioner();
        SinkNode sinkNode = new SinkNode("output-node", (TopicNameExtractor)new StaticTopicNameExtractor("topic"), (Serializer)this.stringSerializer, (Serializer)this.byteArraySerializer, (StreamPartitioner)broadcastingPartitioner);
        this.topology = new ProcessorTopology(Collections.emptyList(), Collections.emptyMap(), Collections.singletonMap("topic", sinkNode), Collections.emptyList(), Collections.emptyList(), Collections.emptyMap(), Collections.emptySet(), Collections.emptyMap());
        this.collector = new RecordCollectorImpl(this.logContext, this.taskId, this.streamsProducer, this.productionExceptionHandler, this.streamsMetrics, this.topology);
        RecordHeaders headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
        this.collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)broadcastingPartitioner);
        this.collector.send("topic", (Object)"9", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)broadcastingPartitioner);
        this.collector.send("topic", (Object)"27", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)broadcastingPartitioner);
        this.collector.send("topic", (Object)"81", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)broadcastingPartitioner);
        this.collector.send("topic", (Object)"243", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)broadcastingPartitioner);
        this.collector.send("topic", (Object)"28", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)broadcastingPartitioner);
        this.collector.send("topic", (Object)"82", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)broadcastingPartitioner);
        this.collector.send("topic", (Object)"244", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)broadcastingPartitioner);
        this.collector.send("topic", (Object)"245", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)broadcastingPartitioner);
        Map offsets = this.collector.offsets();
        Assert.assertEquals((long)8L, (long)((Long)offsets.get(new TopicPartition("topic", 0))));
        Assert.assertEquals((long)8L, (long)((Long)offsets.get(new TopicPartition("topic", 1))));
        Assert.assertEquals((long)8L, (long)((Long)offsets.get(new TopicPartition("topic", 2))));
        Assert.assertEquals((long)27L, (long)this.mockProducer.history().size());
        TopicPartition topicPartition = new TopicPartition("topic", 0);
        Assert.assertThrows(UnsupportedOperationException.class, () -> offsets.put(topicPartition, 50L));
    }

    @Test
    public void shouldDropAllRecords() {
        class DroppingPartitioner
        implements StreamPartitioner<String, Object> {
            DroppingPartitioner() {
            }

            @Deprecated
            public Integer partition(String topic, String key, Object value, int numPartitions) {
                return null;
            }

            public Optional<Set<Integer>> partitions(String topic, String key, Object value, int numPartitions) {
                return Optional.of(Collections.emptySet());
            }
        }
        DroppingPartitioner droppingPartitioner = new DroppingPartitioner();
        SinkNode sinkNode = new SinkNode("output-node", (TopicNameExtractor)new StaticTopicNameExtractor("topic"), (Serializer)this.stringSerializer, (Serializer)this.byteArraySerializer, (StreamPartitioner)droppingPartitioner);
        this.topology = new ProcessorTopology(Collections.emptyList(), Collections.emptyMap(), Collections.singletonMap("topic", sinkNode), Collections.emptyList(), Collections.emptyList(), Collections.emptyMap(), Collections.emptySet(), Collections.emptyMap());
        this.collector = new RecordCollectorImpl(this.logContext, this.taskId, this.streamsProducer, this.productionExceptionHandler, this.streamsMetrics, this.topology);
        String topic = "topic";
        Metric recordsDropped = (Metric)this.streamsMetrics.metrics().get(new MetricName("dropped-records-total", "stream-task-metrics", "The total number of dropped records", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)Thread.currentThread().getName()), Utils.mkEntry((Object)"task-id", (Object)this.taskId.toString())})));
        RecordHeaders headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
        this.collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)droppingPartitioner);
        this.collector.send("topic", (Object)"9", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)droppingPartitioner);
        this.collector.send("topic", (Object)"27", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)droppingPartitioner);
        this.collector.send("topic", (Object)"81", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)droppingPartitioner);
        this.collector.send("topic", (Object)"243", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)droppingPartitioner);
        this.collector.send("topic", (Object)"28", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)droppingPartitioner);
        this.collector.send("topic", (Object)"82", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)droppingPartitioner);
        this.collector.send("topic", (Object)"244", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)droppingPartitioner);
        this.collector.send("topic", (Object)"245", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)droppingPartitioner);
        Map offsets = this.collector.offsets();
        Assert.assertTrue((boolean)offsets.isEmpty());
        Assert.assertEquals((long)0L, (long)this.mockProducer.history().size());
        MatcherAssert.assertThat((Object)recordsDropped.metricValue(), (Matcher)IsEqual.equalTo((Object)9.0));
        TopicPartition topicPartition = new TopicPartition("topic", 0);
        Assert.assertThrows(UnsupportedOperationException.class, () -> offsets.put(topicPartition, 50L));
    }

    @Test
    public void shouldUseDefaultPartitionerViaPartitions() {
        class DefaultPartitioner
        implements StreamPartitioner<String, Object> {
            DefaultPartitioner() {
            }

            @Deprecated
            public Integer partition(String topic, String key, Object value, int numPartitions) {
                return null;
            }

            public Optional<Set<Integer>> partitions(String topic, String key, Object value, int numPartitions) {
                return Optional.empty();
            }
        }
        DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
        SinkNode sinkNode = new SinkNode("output-node", (TopicNameExtractor)new StaticTopicNameExtractor("topic"), (Serializer)this.stringSerializer, (Serializer)this.byteArraySerializer, (StreamPartitioner)defaultPartitioner);
        this.topology = new ProcessorTopology(Collections.emptyList(), Collections.emptyMap(), Collections.singletonMap("topic", sinkNode), Collections.emptyList(), Collections.emptyList(), Collections.emptyMap(), Collections.emptySet(), Collections.emptyMap());
        this.collector = new RecordCollectorImpl(this.logContext, this.taskId, this.streamsProducer, this.productionExceptionHandler, this.streamsMetrics, this.topology);
        String topic = "topic";
        RecordHeaders headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
        this.collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)defaultPartitioner);
        this.collector.send("topic", (Object)"9", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)defaultPartitioner);
        this.collector.send("topic", (Object)"27", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)defaultPartitioner);
        this.collector.send("topic", (Object)"81", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)defaultPartitioner);
        this.collector.send("topic", (Object)"243", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)defaultPartitioner);
        this.collector.send("topic", (Object)"28", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)defaultPartitioner);
        this.collector.send("topic", (Object)"82", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)defaultPartitioner);
        this.collector.send("topic", (Object)"244", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)defaultPartitioner);
        this.collector.send("topic", (Object)"245", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, (StreamPartitioner)defaultPartitioner);
        Map offsets = this.collector.offsets();
        Assert.assertEquals((long)3L, (long)((Long)offsets.get(new TopicPartition("topic", 0))));
        Assert.assertEquals((long)2L, (long)((Long)offsets.get(new TopicPartition("topic", 1))));
        Assert.assertEquals((long)1L, (long)((Long)offsets.get(new TopicPartition("topic", 2))));
        Assert.assertEquals((long)9L, (long)this.mockProducer.history().size());
    }

    @Test
    public void shouldUseDefaultPartitionerAsPartitionReturnsNull() {
        StreamPartitioner streamPartitioner = (topic, key, value, numPartitions) -> null;
        SinkNode sinkNode = new SinkNode("output-node", (TopicNameExtractor)new StaticTopicNameExtractor("topic"), (Serializer)this.stringSerializer, (Serializer)this.byteArraySerializer, streamPartitioner);
        this.topology = new ProcessorTopology(Collections.emptyList(), Collections.emptyMap(), Collections.singletonMap("topic", sinkNode), Collections.emptyList(), Collections.emptyList(), Collections.emptyMap(), Collections.emptySet(), Collections.emptyMap());
        this.collector = new RecordCollectorImpl(this.logContext, this.taskId, this.streamsProducer, this.productionExceptionHandler, this.streamsMetrics, this.topology);
        String topic2 = "topic";
        RecordHeaders headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
        this.collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, streamPartitioner);
        this.collector.send("topic", (Object)"9", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, streamPartitioner);
        this.collector.send("topic", (Object)"27", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, streamPartitioner);
        this.collector.send("topic", (Object)"81", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, streamPartitioner);
        this.collector.send("topic", (Object)"243", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, streamPartitioner);
        this.collector.send("topic", (Object)"28", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, streamPartitioner);
        this.collector.send("topic", (Object)"82", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, streamPartitioner);
        this.collector.send("topic", (Object)"244", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, streamPartitioner);
        this.collector.send("topic", (Object)"245", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, streamPartitioner);
        Map offsets = this.collector.offsets();
        Assert.assertEquals((long)3L, (long)((Long)offsets.get(new TopicPartition("topic", 0))));
        Assert.assertEquals((long)2L, (long)((Long)offsets.get(new TopicPartition("topic", 1))));
        Assert.assertEquals((long)1L, (long)((Long)offsets.get(new TopicPartition("topic", 2))));
        Assert.assertEquals((long)9L, (long)this.mockProducer.history().size());
    }

    @Test
    public void shouldUseDefaultPartitionerAsStreamPartitionerIsNull() {
        SinkNode sinkNode = new SinkNode("output-node", (TopicNameExtractor)new StaticTopicNameExtractor("topic"), (Serializer)this.stringSerializer, (Serializer)this.byteArraySerializer, this.streamPartitioner);
        this.topology = new ProcessorTopology(Collections.emptyList(), Collections.emptyMap(), Collections.singletonMap("topic", sinkNode), Collections.emptyList(), Collections.emptyList(), Collections.emptyMap(), Collections.emptySet(), Collections.emptyMap());
        this.collector = new RecordCollectorImpl(this.logContext, this.taskId, this.streamsProducer, this.productionExceptionHandler, this.streamsMetrics, this.topology);
        String topic = "topic";
        RecordHeaders headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
        this.collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, null);
        this.collector.send("topic", (Object)"9", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, null);
        this.collector.send("topic", (Object)"27", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, null);
        this.collector.send("topic", (Object)"81", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, null);
        this.collector.send("topic", (Object)"243", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, null);
        this.collector.send("topic", (Object)"28", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, null);
        this.collector.send("topic", (Object)"82", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, null);
        this.collector.send("topic", (Object)"244", (Object)"0", (Headers)headers, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, null);
        this.collector.send("topic", (Object)"245", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, null);
        Map offsets = this.collector.offsets();
        Assert.assertEquals((long)3L, (long)((Long)offsets.get(new TopicPartition("topic", 0))));
        Assert.assertEquals((long)2L, (long)((Long)offsets.get(new TopicPartition("topic", 1))));
        Assert.assertEquals((long)1L, (long)((Long)offsets.get(new TopicPartition("topic", 2))));
        Assert.assertEquals((long)9L, (long)this.mockProducer.history().size());
    }

    @Test
    public void shouldSendWithNoPartition() {
        RecordHeaders headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
        this.collector.send("topic", (Object)"3", (Object)"0", (Headers)headers, null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context);
        this.collector.send("topic", (Object)"9", (Object)"0", (Headers)headers, null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context);
        this.collector.send("topic", (Object)"27", (Object)"0", (Headers)headers, null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context);
        this.collector.send("topic", (Object)"81", (Object)"0", (Headers)headers, null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context);
        this.collector.send("topic", (Object)"243", (Object)"0", (Headers)headers, null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context);
        this.collector.send("topic", (Object)"28", (Object)"0", (Headers)headers, null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context);
        this.collector.send("topic", (Object)"82", (Object)"0", (Headers)headers, null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context);
        this.collector.send("topic", (Object)"244", (Object)"0", (Headers)headers, null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context);
        this.collector.send("topic", (Object)"245", (Object)"0", (Headers)headers, null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context);
        Map offsets = this.collector.offsets();
        Assert.assertEquals((long)3L, (long)((Long)offsets.get(new TopicPartition("topic", 0))));
        Assert.assertEquals((long)2L, (long)((Long)offsets.get(new TopicPartition("topic", 1))));
        Assert.assertEquals((long)1L, (long)((Long)offsets.get(new TopicPartition("topic", 2))));
        Assert.assertEquals((long)9L, (long)this.mockProducer.history().size());
    }

    @Test
    public void shouldUpdateOffsetsUponCompletion() {
        Map offsets = this.collector.offsets();
        this.collector.send("topic", (Object)"999", (Object)"0", null, Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context);
        this.collector.send("topic", (Object)"999", (Object)"0", null, Integer.valueOf(1), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context);
        this.collector.send("topic", (Object)"999", (Object)"0", null, Integer.valueOf(2), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context);
        Assert.assertEquals(Collections.emptyMap(), (Object)offsets);
        this.collector.flush();
        offsets = this.collector.offsets();
        Assert.assertEquals((Object)0L, offsets.get(new TopicPartition("topic", 0)));
        Assert.assertEquals((Object)0L, offsets.get(new TopicPartition("topic", 1)));
        Assert.assertEquals((Object)0L, offsets.get(new TopicPartition("topic", 2)));
    }

    @Test
    public void shouldPassThroughRecordHeaderToSerializer() {
        CustomStringSerializer keySerializer = new CustomStringSerializer();
        CustomStringSerializer valueSerializer = new CustomStringSerializer();
        keySerializer.configure(Collections.emptyMap(), true);
        this.collector.send("topic", (Object)"3", (Object)"0", (Headers)new RecordHeaders(), null, (Serializer)keySerializer, (Serializer)valueSerializer, null, this.context, this.streamPartitioner);
        List recordHistory = this.mockProducer.history();
        for (ProducerRecord sentRecord : recordHistory) {
            Headers headers = sentRecord.headers();
            Assert.assertEquals((long)2L, (long)headers.toArray().length);
            Assert.assertEquals((Object)new RecordHeader("key", "key".getBytes()), (Object)headers.lastHeader("key"));
            Assert.assertEquals((Object)new RecordHeader("value", "value".getBytes()), (Object)headers.lastHeader("value"));
        }
    }

    @Test
    public void shouldForwardFlushToStreamsProducer() {
        StreamsProducer streamsProducer = (StreamsProducer)Mockito.mock(StreamsProducer.class);
        Mockito.when((Object)streamsProducer.eosEnabled()).thenReturn((Object)false);
        ((StreamsProducer)Mockito.doNothing().when((Object)streamsProducer)).flush();
        Mockito.when((Object)streamsProducer.sendException()).thenReturn(new AtomicReference<Object>(null));
        ProcessorTopology topology = (ProcessorTopology)Mockito.mock(ProcessorTopology.class);
        Mockito.when((Object)topology.sinkTopics()).thenReturn(Collections.emptySet());
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, streamsProducer, this.productionExceptionHandler, this.streamsMetrics, topology);
        collector.flush();
    }

    @Test
    public void shouldForwardFlushToStreamsProducerEosEnabled() {
        StreamsProducer streamsProducer = (StreamsProducer)Mockito.mock(StreamsProducer.class);
        Mockito.when((Object)streamsProducer.eosEnabled()).thenReturn((Object)true);
        Mockito.when((Object)streamsProducer.sendException()).thenReturn(new AtomicReference<Object>(null));
        ((StreamsProducer)Mockito.doNothing().when((Object)streamsProducer)).flush();
        ProcessorTopology topology = (ProcessorTopology)Mockito.mock(ProcessorTopology.class);
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, streamsProducer, this.productionExceptionHandler, this.streamsMetrics, topology);
        collector.flush();
    }

    @Test
    public void shouldClearOffsetsOnCloseClean() {
        this.shouldClearOffsetsOnClose(true);
    }

    @Test
    public void shouldClearOffsetsOnCloseDirty() {
        this.shouldClearOffsetsOnClose(false);
    }

    private void shouldClearOffsetsOnClose(boolean clean) {
        StreamsProducer streamsProducer = (StreamsProducer)Mockito.mock(StreamsProducer.class);
        Mockito.when((Object)streamsProducer.eosEnabled()).thenReturn((Object)true);
        Mockito.when((Object)streamsProducer.sendException()).thenReturn(new AtomicReference<Object>(null));
        long offset = 1234L;
        RecordMetadata metadata = new RecordMetadata(new TopicPartition("topic", 0), 1234L, 0, 0L, 1, 1);
        Mockito.when((Object)streamsProducer.send((ProducerRecord)ArgumentMatchers.any(), (Callback)ArgumentMatchers.any())).thenAnswer(invocation -> {
            ((Callback)invocation.getArgument(1)).onCompletion(metadata, null);
            return null;
        });
        ProcessorTopology topology = (ProcessorTopology)Mockito.mock(ProcessorTopology.class);
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, streamsProducer, this.productionExceptionHandler, this.streamsMetrics, topology);
        collector.send("topic-changelog", (Object)"key", (Object)"value", (Headers)new RecordHeaders(), Integer.valueOf(0), Long.valueOf(0L), (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), null, null);
        Assert.assertFalse((boolean)collector.offsets().isEmpty());
        if (clean) {
            collector.closeClean();
        } else {
            collector.closeDirty();
        }
        Assert.assertTrue((boolean)collector.offsets().isEmpty());
    }

    @Test
    public void shouldNotAbortTxOnCloseCleanIfEosEnabled() {
        StreamsProducer streamsProducer = (StreamsProducer)Mockito.mock(StreamsProducer.class);
        Mockito.when((Object)streamsProducer.eosEnabled()).thenReturn((Object)true);
        Mockito.when((Object)streamsProducer.sendException()).thenReturn(new AtomicReference<Object>(null));
        ProcessorTopology topology = (ProcessorTopology)Mockito.mock(ProcessorTopology.class);
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, streamsProducer, this.productionExceptionHandler, this.streamsMetrics, topology);
        collector.closeClean();
    }

    @Test
    public void shouldAbortTxOnCloseDirtyIfEosEnabled() {
        StreamsProducer streamsProducer = (StreamsProducer)Mockito.mock(StreamsProducer.class);
        Mockito.when((Object)streamsProducer.eosEnabled()).thenReturn((Object)true);
        Mockito.when((Object)streamsProducer.sendException()).thenReturn(new AtomicReference<Object>(null));
        ((StreamsProducer)Mockito.doNothing().when((Object)streamsProducer)).abortTransaction();
        ProcessorTopology topology = (ProcessorTopology)Mockito.mock(ProcessorTopology.class);
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, streamsProducer, this.productionExceptionHandler, this.streamsMetrics, topology);
        collector.closeDirty();
    }

    @Test
    public void shouldThrowInformativeStreamsExceptionOnKeyClassCastException() {
        StreamsException expected = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.collector.send("topic", (Object)"key", (Object)"value", (Headers)new RecordHeaders(), Integer.valueOf(0), Long.valueOf(0L), (Serializer)new LongSerializer(), (Serializer)new StringSerializer(), null, null));
        MatcherAssert.assertThat((Object)expected.getCause(), (Matcher)IsInstanceOf.instanceOf(ClassCastException.class));
        MatcherAssert.assertThat((Object)expected.getMessage(), (Matcher)IsEqual.equalTo((Object)"ClassCastException while producing data to topic topic. A serializer (key: org.apache.kafka.common.serialization.LongSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`)."));
    }

    @Test
    public void shouldThrowInformativeStreamsExceptionOnKeyAndNullValueClassCastException() {
        StreamsException expected = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.collector.send("topic", (Object)"key", null, (Headers)new RecordHeaders(), Integer.valueOf(0), Long.valueOf(0L), (Serializer)new LongSerializer(), (Serializer)new StringSerializer(), null, null));
        MatcherAssert.assertThat((Object)expected.getCause(), (Matcher)IsInstanceOf.instanceOf(ClassCastException.class));
        MatcherAssert.assertThat((Object)expected.getMessage(), (Matcher)IsEqual.equalTo((Object)"ClassCastException while producing data to topic topic. A serializer (key: org.apache.kafka.common.serialization.LongSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: unknown because value is null). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`)."));
    }

    @Test
    public void shouldThrowInformativeStreamsExceptionOnValueClassCastException() {
        StreamsException expected = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.collector.send("topic", (Object)"key", (Object)"value", (Headers)new RecordHeaders(), Integer.valueOf(0), Long.valueOf(0L), (Serializer)new StringSerializer(), (Serializer)new LongSerializer(), null, null));
        MatcherAssert.assertThat((Object)expected.getCause(), (Matcher)IsInstanceOf.instanceOf(ClassCastException.class));
        MatcherAssert.assertThat((Object)expected.getMessage(), (Matcher)IsEqual.equalTo((Object)"ClassCastException while producing data to topic topic. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.LongSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`)."));
    }

    @Test
    public void shouldThrowInformativeStreamsExceptionOnValueAndNullKeyClassCastException() {
        StreamsException expected = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.collector.send("topic", null, (Object)"value", (Headers)new RecordHeaders(), Integer.valueOf(0), Long.valueOf(0L), (Serializer)new StringSerializer(), (Serializer)new LongSerializer(), null, null));
        MatcherAssert.assertThat((Object)expected.getCause(), (Matcher)IsInstanceOf.instanceOf(ClassCastException.class));
        MatcherAssert.assertThat((Object)expected.getMessage(), (Matcher)IsEqual.equalTo((Object)"ClassCastException while producing data to topic topic. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.LongSerializer) is not compatible to the actual key or value type (key type: unknown because key is null / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`)."));
    }

    @Test
    public void shouldThrowInformativeStreamsExceptionOnKafkaExceptionFromStreamPartitioner() {
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, this.getExceptionalStreamProducerOnPartitionsFor((RuntimeException)new KafkaException("Kaboom!")), this.productionExceptionHandler, this.streamsMetrics, this.topology);
        collector.initialize();
        StreamsException exception = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.lambda$shouldThrowInformativeStreamsExceptionOnKafkaExceptionFromStreamPartitioner$11((RecordCollector)collector));
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)IsEqual.equalTo((Object)("Could not determine the number of partitions for topic 'topic' for task " + this.taskId + " due to org.apache.kafka.common.KafkaException: Kaboom!")));
    }

    @Test
    public void shouldForwardTimeoutExceptionFromStreamPartitionerWithoutWrappingIt() {
        this.shouldForwardExceptionWithoutWrappingIt(new TimeoutException("Kaboom!"));
    }

    @Test
    public void shouldForwardRuntimeExceptionFromStreamPartitionerWithoutWrappingIt() {
        this.shouldForwardExceptionWithoutWrappingIt(new RuntimeException("Kaboom!"));
    }

    private <E extends RuntimeException> void shouldForwardExceptionWithoutWrappingIt(E runtimeException) {
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, this.getExceptionalStreamProducerOnPartitionsFor(runtimeException), this.productionExceptionHandler, this.streamsMetrics, this.topology);
        collector.initialize();
        RuntimeException exception = (RuntimeException)Assert.assertThrows(runtimeException.getClass(), () -> this.lambda$shouldForwardExceptionWithoutWrappingIt$12((RecordCollector)collector));
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)IsEqual.equalTo((Object)"Kaboom!"));
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnSubsequentSendWhenProducerFencedInCallback() {
        this.testThrowTaskMigratedExceptionOnSubsequentSend((RuntimeException)new ProducerFencedException("KABOOM!"));
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnSubsequentSendWhenInvalidPidMappingInCallback() {
        this.testThrowTaskMigratedExceptionOnSubsequentSend((RuntimeException)new InvalidPidMappingException("KABOOM!"));
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnSubsequentSendWhenInvalidEpochInCallback() {
        this.testThrowTaskMigratedExceptionOnSubsequentSend((RuntimeException)new InvalidProducerEpochException("KABOOM!"));
    }

    private void testThrowTaskMigratedExceptionOnSubsequentSend(RuntimeException exception) {
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, this.getExceptionalStreamsProducerOnSend(exception), this.productionExceptionHandler, this.streamsMetrics, this.topology);
        collector.initialize();
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, null, this.streamPartitioner);
        TaskMigratedException thrown = (TaskMigratedException)Assert.assertThrows(TaskMigratedException.class, () -> this.lambda$testThrowTaskMigratedExceptionOnSubsequentSend$13((RecordCollector)collector));
        Assert.assertEquals((Object)exception, (Object)thrown.getCause());
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnSubsequentFlushWhenProducerFencedInCallback() {
        this.testThrowTaskMigratedExceptionOnSubsequentFlush((RuntimeException)new ProducerFencedException("KABOOM!"));
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnSubsequentFlushWhenInvalidPidMappingInCallback() {
        this.testThrowTaskMigratedExceptionOnSubsequentFlush((RuntimeException)new InvalidPidMappingException("KABOOM!"));
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnSubsequentFlushWhenInvalidEpochInCallback() {
        this.testThrowTaskMigratedExceptionOnSubsequentFlush((RuntimeException)new InvalidProducerEpochException("KABOOM!"));
    }

    private void testThrowTaskMigratedExceptionOnSubsequentFlush(RuntimeException exception) {
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, this.getExceptionalStreamsProducerOnSend(exception), this.productionExceptionHandler, this.streamsMetrics, this.topology);
        collector.initialize();
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, null, this.streamPartitioner);
        TaskMigratedException thrown = (TaskMigratedException)Assert.assertThrows(TaskMigratedException.class, () -> ((RecordCollector)collector).flush());
        Assert.assertEquals((Object)exception, (Object)thrown.getCause());
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnSubsequentCloseWhenProducerFencedInCallback() {
        this.testThrowTaskMigratedExceptionOnSubsequentClose((RuntimeException)new ProducerFencedException("KABOOM!"));
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnSubsequentCloseWhenInvalidPidMappingInCallback() {
        this.testThrowTaskMigratedExceptionOnSubsequentClose((RuntimeException)new InvalidPidMappingException("KABOOM!"));
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnSubsequentCloseWhenInvalidEpochInCallback() {
        this.testThrowTaskMigratedExceptionOnSubsequentClose((RuntimeException)new InvalidProducerEpochException("KABOOM!"));
    }

    private void testThrowTaskMigratedExceptionOnSubsequentClose(RuntimeException exception) {
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, this.getExceptionalStreamsProducerOnSend(exception), this.productionExceptionHandler, this.streamsMetrics, this.topology);
        collector.initialize();
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, null, this.streamPartitioner);
        TaskMigratedException thrown = (TaskMigratedException)Assert.assertThrows(TaskMigratedException.class, () -> ((RecordCollector)collector).closeClean());
        Assert.assertEquals((Object)exception, (Object)thrown.getCause());
    }

    @Test
    public void shouldThrowStreamsExceptionOnSubsequentSendIfASendFailsWithDefaultExceptionHandler() {
        KafkaException exception = new KafkaException("KABOOM!");
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, this.getExceptionalStreamsProducerOnSend((Exception)exception), this.productionExceptionHandler, this.streamsMetrics, this.topology);
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, null, this.streamPartitioner);
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.lambda$shouldThrowStreamsExceptionOnSubsequentSendIfASendFailsWithDefaultExceptionHandler$14((RecordCollector)collector));
        Assert.assertEquals((Object)exception, (Object)thrown.getCause());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"Error encountered sending record to topic topic for task 0_0 due to:\norg.apache.kafka.common.KafkaException: KABOOM!\nException handler choose to FAIL the processing, no more records would be sent."));
    }

    @Test
    public void shouldThrowStreamsExceptionOnSubsequentFlushIfASendFailsWithDefaultExceptionHandler() {
        KafkaException exception = new KafkaException("KABOOM!");
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, this.getExceptionalStreamsProducerOnSend((Exception)exception), this.productionExceptionHandler, this.streamsMetrics, this.topology);
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, null, this.streamPartitioner);
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> ((RecordCollector)collector).flush());
        Assert.assertEquals((Object)exception, (Object)thrown.getCause());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"Error encountered sending record to topic topic for task 0_0 due to:\norg.apache.kafka.common.KafkaException: KABOOM!\nException handler choose to FAIL the processing, no more records would be sent."));
    }

    @Test
    public void shouldThrowStreamsExceptionOnSubsequentCloseIfASendFailsWithDefaultExceptionHandler() {
        KafkaException exception = new KafkaException("KABOOM!");
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, this.getExceptionalStreamsProducerOnSend((Exception)exception), this.productionExceptionHandler, this.streamsMetrics, this.topology);
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, null, this.streamPartitioner);
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> ((RecordCollector)collector).closeClean());
        Assert.assertEquals((Object)exception, (Object)thrown.getCause());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"Error encountered sending record to topic topic for task 0_0 due to:\norg.apache.kafka.common.KafkaException: KABOOM!\nException handler choose to FAIL the processing, no more records would be sent."));
    }

    @Test
    public void shouldThrowStreamsExceptionOnSubsequentSendIfFatalEvenWithContinueExceptionHandler() {
        AuthenticationException exception = new AuthenticationException("KABOOM!");
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, this.getExceptionalStreamsProducerOnSend((Exception)exception), (ProductionExceptionHandler)new AlwaysContinueProductionExceptionHandler(), this.streamsMetrics, this.topology);
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, null, this.streamPartitioner);
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.lambda$shouldThrowStreamsExceptionOnSubsequentSendIfFatalEvenWithContinueExceptionHandler$15((RecordCollector)collector));
        Assert.assertEquals((Object)exception, (Object)thrown.getCause());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"Error encountered sending record to topic topic for task 0_0 due to:\norg.apache.kafka.common.errors.AuthenticationException: KABOOM!\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error."));
    }

    @Test
    public void shouldThrowStreamsExceptionOnSubsequentFlushIfFatalEvenWithContinueExceptionHandler() {
        AuthenticationException exception = new AuthenticationException("KABOOM!");
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, this.getExceptionalStreamsProducerOnSend((Exception)exception), (ProductionExceptionHandler)new AlwaysContinueProductionExceptionHandler(), this.streamsMetrics, this.topology);
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, null, this.streamPartitioner);
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> ((RecordCollector)collector).flush());
        Assert.assertEquals((Object)exception, (Object)thrown.getCause());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"Error encountered sending record to topic topic for task 0_0 due to:\norg.apache.kafka.common.errors.AuthenticationException: KABOOM!\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error."));
    }

    @Test
    public void shouldThrowStreamsExceptionOnSubsequentCloseIfFatalEvenWithContinueExceptionHandler() {
        AuthenticationException exception = new AuthenticationException("KABOOM!");
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, this.getExceptionalStreamsProducerOnSend((Exception)exception), (ProductionExceptionHandler)new AlwaysContinueProductionExceptionHandler(), this.streamsMetrics, this.topology);
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, null, this.streamPartitioner);
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> ((RecordCollector)collector).closeClean());
        Assert.assertEquals((Object)exception, (Object)thrown.getCause());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"Error encountered sending record to topic topic for task 0_0 due to:\norg.apache.kafka.common.errors.AuthenticationException: KABOOM!\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error."));
    }

    @Test
    public void shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContinueExceptionHandler() {
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, this.getExceptionalStreamsProducerOnSend(new Exception()), (ProductionExceptionHandler)new AlwaysContinueProductionExceptionHandler(), this.streamsMetrics, this.topology);
        try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(RecordCollectorImpl.class);){
            logCaptureAppender.setThreshold((Priority)Level.INFO);
            collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, null, this.streamPartitioner);
            collector.flush();
            List messages = logCaptureAppender.getMessages();
            StringBuilder errorMessage = new StringBuilder("Messages received:");
            for (String error : messages) {
                errorMessage.append("\n - ").append(error);
            }
            Assert.assertTrue((String)errorMessage.toString(), (boolean)((String)messages.get(messages.size() - 1)).endsWith("Exception handler choose to CONTINUE processing in spite of this error but written offsets would not be recorded."));
        }
        Metric metric = (Metric)this.streamsMetrics.metrics().get(new MetricName("dropped-records-total", "stream-task-metrics", "The total number of dropped records", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)Thread.currentThread().getName()), Utils.mkEntry((Object)"task-id", (Object)this.taskId.toString())})));
        Assert.assertEquals((Object)1.0, (Object)metric.metricValue());
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, null, this.streamPartitioner);
        collector.flush();
        collector.closeClean();
    }

    @Test
    public void shouldNotAbortTxnOnEOSCloseDirtyIfNothingSent() {
        final AtomicBoolean functionCalled = new AtomicBoolean(false);
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, new StreamsProducer(this.eosConfig, "-StreamThread-1", (KafkaClientSupplier)new MockClientSupplier(){

            @Override
            public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
                return new MockProducer<byte[], byte[]>(RecordCollectorTest.this.cluster, true, (Serializer)RecordCollectorTest.this.byteArraySerializer, (Serializer)RecordCollectorTest.this.byteArraySerializer){

                    public void abortTransaction() {
                        functionCalled.set(true);
                    }
                };
            }
        }, this.taskId, this.processId, this.logContext, Time.SYSTEM), this.productionExceptionHandler, this.streamsMetrics, this.topology);
        collector.closeDirty();
        Assert.assertFalse((boolean)functionCalled.get());
    }

    @Test
    public void shouldThrowIfTopicIsUnknownOnSendWithPartitioner() {
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, new StreamsProducer(this.config, this.processId + "-StreamThread-1", (KafkaClientSupplier)new MockClientSupplier(){

            @Override
            public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
                return new MockProducer<byte[], byte[]>(RecordCollectorTest.this.cluster, true, (Serializer)RecordCollectorTest.this.byteArraySerializer, (Serializer)RecordCollectorTest.this.byteArraySerializer){

                    public List<PartitionInfo> partitionsFor(String topic) {
                        return Collections.emptyList();
                    }
                };
            }
        }, null, null, this.logContext, Time.SYSTEM), this.productionExceptionHandler, this.streamsMetrics, this.topology);
        collector.initialize();
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.lambda$shouldThrowIfTopicIsUnknownOnSendWithPartitioner$16((RecordCollector)collector));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)IsEqual.equalTo((Object)"Could not get partition information for topic topic for task 0_0. This can happen if the topic does not exist."));
    }

    @Test
    public void shouldNotCloseInternalProducerForEOS() {
        RecordCollectorImpl collector = new RecordCollectorImpl(this.logContext, this.taskId, new StreamsProducer(this.eosConfig, this.processId + "-StreamThread-1", (KafkaClientSupplier)new MockClientSupplier(){

            @Override
            public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
                return RecordCollectorTest.this.mockProducer;
            }
        }, this.taskId, this.processId, this.logContext, Time.SYSTEM), this.productionExceptionHandler, this.streamsMetrics, this.topology);
        collector.closeClean();
        this.streamsProducer.flush();
    }

    @Test
    public void shouldNotCloseInternalProducerForNonEOS() {
        this.collector.closeClean();
        this.streamsProducer.flush();
    }

    @Test
    public void shouldThrowStreamsExceptionUsingDefaultExceptionHandler() {
        try (ErrorStringSerializer errorSerializer = new ErrorStringSerializer();){
            RecordCollector collector = this.newRecordCollector((ProductionExceptionHandler)new DefaultProductionExceptionHandler());
            collector.initialize();
            StreamsException error = (StreamsException)Assert.assertThrows(StreamsException.class, () -> collector.send("topic", (Object)"key", (Object)"val", null, Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)errorSerializer, "output-node", this.context));
            MatcherAssert.assertThat((Object)error.getCause(), (Matcher)IsInstanceOf.instanceOf(SerializationException.class));
        }
    }

    @Test
    public void shouldDropRecordExceptionUsingAlwaysContinueExceptionHandler() {
        try (ErrorStringSerializer errorSerializer = new ErrorStringSerializer();){
            RecordCollector collector = this.newRecordCollector(new AlwaysContinueProductionExceptionHandler());
            collector.initialize();
            collector.send("topic", (Object)"key", (Object)"val", null, Integer.valueOf(0), null, (Serializer)errorSerializer, (Serializer)this.stringSerializer, "output-node", this.context);
            MatcherAssert.assertThat((Object)this.mockProducer.history().isEmpty(), (Matcher)IsEqual.equalTo((Object)true));
            MatcherAssert.assertThat((Object)((Metric)this.streamsMetrics.metrics().get(new MetricName("dropped-records-total", "stream-task-metrics", "The total number of dropped records", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)Thread.currentThread().getName()), Utils.mkEntry((Object)"task-id", (Object)this.taskId.toString())})))).metricValue(), (Matcher)IsEqual.equalTo((Object)1.0));
        }
    }

    @Test
    public void shouldNotCallProductionExceptionHandlerOnClassCastException() {
        try (ErrorStringSerializer errorSerializer = new ErrorStringSerializer();){
            RecordCollector collector = this.newRecordCollector(new AlwaysContinueProductionExceptionHandler());
            collector.initialize();
            MatcherAssert.assertThat((Object)this.mockProducer.history().isEmpty(), (Matcher)IsEqual.equalTo((Object)true));
            StreamsException error = (StreamsException)Assert.assertThrows(StreamsException.class, () -> collector.send("topic", (Object)true, (Object)"val", null, Integer.valueOf(0), null, (Serializer)errorSerializer, (Serializer)this.stringSerializer, "output-node", this.context));
            MatcherAssert.assertThat((Object)error.getCause(), (Matcher)IsInstanceOf.instanceOf(ClassCastException.class));
        }
    }

    @Test
    public void shouldNotSendIfSendOfOtherTaskFailedInCallback() {
        TaskId taskId1 = new TaskId(0, 0);
        TaskId taskId2 = new TaskId(0, 1);
        StreamsProducer streamsProducer = (StreamsProducer)Mockito.mock(StreamsProducer.class);
        Mockito.when((Object)streamsProducer.eosEnabled()).thenReturn((Object)true);
        Mockito.when((Object)streamsProducer.sendException()).thenReturn(new AtomicReference<Object>(null));
        Mockito.when((Object)streamsProducer.send((ProducerRecord)ArgumentMatchers.any(), (Callback)ArgumentMatchers.any())).thenAnswer(invocation -> {
            Callback callback = (Callback)invocation.getArgument(1);
            callback.onCompletion(null, (Exception)new ProducerFencedException("KABOOM!"));
            return null;
        });
        RecordCollectorImpl collector1 = new RecordCollectorImpl(this.logContext, taskId1, streamsProducer, this.productionExceptionHandler, this.streamsMetrics, this.topology);
        collector1.initialize();
        RecordCollectorImpl collector2 = new RecordCollectorImpl(this.logContext, taskId2, streamsProducer, this.productionExceptionHandler, this.streamsMetrics, this.topology);
        collector2.initialize();
        collector1.send("topic", (Object)"key", (Object)"val", null, Integer.valueOf(0), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, "output-node", this.context);
        Assert.assertThrows(StreamsException.class, () -> this.lambda$shouldNotSendIfSendOfOtherTaskFailedInCallback$20((RecordCollector)collector2));
    }

    private RecordCollector newRecordCollector(ProductionExceptionHandler productionExceptionHandler) {
        return new RecordCollectorImpl(this.logContext, this.taskId, this.streamsProducer, productionExceptionHandler, this.streamsMetrics, this.topology);
    }

    private StreamsProducer getExceptionalStreamsProducerOnSend(final Exception exception) {
        return new StreamsProducer(this.config, this.processId + "-StreamThread-1", (KafkaClientSupplier)new MockClientSupplier(){

            @Override
            public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
                return new MockProducer<byte[], byte[]>(RecordCollectorTest.this.cluster, true, (Serializer)RecordCollectorTest.this.byteArraySerializer, (Serializer)RecordCollectorTest.this.byteArraySerializer){

                    public synchronized Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> record, Callback callback) {
                        callback.onCompletion(null, exception);
                        return null;
                    }
                };
            }
        }, null, null, this.logContext, Time.SYSTEM);
    }

    private StreamsProducer getExceptionalStreamProducerOnPartitionsFor(final RuntimeException exception) {
        return new StreamsProducer(this.config, this.processId + "-StreamThread-1", (KafkaClientSupplier)new MockClientSupplier(){

            @Override
            public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
                return new MockProducer<byte[], byte[]>(RecordCollectorTest.this.cluster, true, (Serializer)RecordCollectorTest.this.byteArraySerializer, (Serializer)RecordCollectorTest.this.byteArraySerializer){

                    public synchronized List<PartitionInfo> partitionsFor(String topic) {
                        throw exception;
                    }
                };
            }
        }, null, null, this.logContext, Time.SYSTEM);
    }

    private /* synthetic */ void lambda$shouldNotSendIfSendOfOtherTaskFailedInCallback$20(RecordCollector collector2) throws Throwable {
        collector2.send("topic", (Object)"key", (Object)"val", null, Integer.valueOf(1), null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, "output-node", this.context);
    }

    private /* synthetic */ void lambda$shouldThrowIfTopicIsUnknownOnSendWithPartitioner$16(RecordCollector collector) throws Throwable {
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, null, this.streamPartitioner);
    }

    private /* synthetic */ void lambda$shouldThrowStreamsExceptionOnSubsequentSendIfFatalEvenWithContinueExceptionHandler$15(RecordCollector collector) throws Throwable {
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, null, this.streamPartitioner);
    }

    private /* synthetic */ void lambda$shouldThrowStreamsExceptionOnSubsequentSendIfASendFailsWithDefaultExceptionHandler$14(RecordCollector collector) throws Throwable {
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, null, this.streamPartitioner);
    }

    private /* synthetic */ void lambda$testThrowTaskMigratedExceptionOnSubsequentSend$13(RecordCollector collector) throws Throwable {
        collector.send("topic", (Object)"3", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, null, this.streamPartitioner);
    }

    private /* synthetic */ void lambda$shouldForwardExceptionWithoutWrappingIt$12(RecordCollector collector) throws Throwable {
        collector.send("topic", (Object)"0", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, null, this.streamPartitioner);
    }

    private /* synthetic */ void lambda$shouldThrowInformativeStreamsExceptionOnKafkaExceptionFromStreamPartitioner$11(RecordCollector collector) throws Throwable {
        collector.send("topic", (Object)"0", (Object)"0", null, null, (Serializer)this.stringSerializer, (Serializer)this.stringSerializer, null, this.context, this.streamPartitioner);
    }

    private static class CustomStringSerializer
    extends StringSerializer {
        private boolean isKey;

        private CustomStringSerializer() {
        }

        public void configure(Map<String, ?> configs, boolean isKey) {
            this.isKey = isKey;
            super.configure(configs, isKey);
        }

        public byte[] serialize(String topic, Headers headers, String data) {
            if (this.isKey) {
                headers.add((Header)new RecordHeader("key", "key".getBytes()));
            } else {
                headers.add((Header)new RecordHeader("value", "value".getBytes()));
            }
            return this.serialize(topic, data);
        }
    }

    private static class ErrorStringSerializer
    extends StringSerializer {
        private ErrorStringSerializer() {
        }

        public byte[] serialize(String topic, Headers headers, String data) {
            throw new SerializationException("Not Supported");
        }
    }
}

