/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableTransformValues;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.NoOpValueTransformerWithKeySupplier;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.mockito.verification.VerificationMode;

@ExtendWith(value={MockitoExtension.class})
@MockitoSettings(strictness=Strictness.STRICT_STUBS)
public class KTableTransformValuesTest {
    private static final String QUERYABLE_NAME = "queryable-store";
    private static final String INPUT_TOPIC = "inputTopic";
    private static final String STORE_NAME = "someStore";
    private static final String OTHER_STORE_NAME = "otherStore";
    private static final Consumed<String, String> CONSUMED = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String());
    private TopologyTestDriver driver;
    private MockProcessorSupplier<String, String, Void, Void> capture;
    private StreamsBuilder builder;
    @Mock
    private KTableImpl<String, String, String> parent;
    @Mock
    private InternalProcessorContext<String, Change<String>> context;
    @Mock
    private KTableValueGetterSupplier<String, String> parentGetterSupplier;
    @Mock
    private KTableValueGetter<String, String> parentGetter;
    @Mock
    private TimestampedKeyValueStore<String, String> stateStore;
    @Mock
    private ValueTransformerWithKeySupplier<String, String, String> mockSupplier;
    @Mock
    private ValueTransformerWithKey<String, String, String> transformer;

    @AfterEach
    public void cleanup() {
        if (this.driver != null) {
            this.driver.close();
            this.driver = null;
        }
    }

    @BeforeEach
    public void setUp() {
        this.capture = new MockProcessorSupplier();
        this.builder = new StreamsBuilder();
    }

    @Test
    public void shouldThrowOnGetIfSupplierReturnsNull() {
        KTableTransformValues transformer = new KTableTransformValues(this.parent, (ValueTransformerWithKeySupplier)new NullSupplier(), QUERYABLE_NAME);
        try {
            transformer.get();
            Assertions.fail((String)"NPE expected");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowOnViewGetIfSupplierReturnsNull() {
        KTableValueGetterSupplier view = new KTableTransformValues(this.parent, (ValueTransformerWithKeySupplier)new NullSupplier(), null).view();
        try {
            view.get();
            Assertions.fail((String)"NPE expected");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
    }

    @Test
    public void shouldInitializeTransformerWithForwardDisabledProcessorContext() {
        NoOpValueTransformerWithKeySupplier transformer = new NoOpValueTransformerWithKeySupplier();
        KTableTransformValues transformValues = new KTableTransformValues(this.parent, transformer, null);
        Processor processor = transformValues.get();
        processor.init(this.context);
        MatcherAssert.assertThat((Object)transformer.context, (Matcher)CoreMatchers.isA(ForwardingDisabledProcessorContext.class));
    }

    @Test
    public void shouldNotSendOldValuesByDefault() {
        KTableTransformValues transformValues = new KTableTransformValues(this.parent, (ValueTransformerWithKeySupplier)new ExclamationValueTransformerSupplier(new String[0]), null);
        Processor processor = transformValues.get();
        processor.init(this.context);
        ((InternalProcessorContext)Mockito.doNothing().when(this.context)).forward(new Record((Object)"Key", (Object)new Change((Object)"Key->newValue!", null), 0L));
        processor.process(new Record((Object)"Key", (Object)new Change((Object)"newValue", (Object)"oldValue"), 0L));
    }

    @Test
    public void shouldSendOldValuesIfConfigured() {
        KTableTransformValues transformValues = new KTableTransformValues(this.parent, (ValueTransformerWithKeySupplier)new ExclamationValueTransformerSupplier(new String[0]), null);
        Mockito.when((Object)this.parent.enableSendingOldValues(true)).thenReturn((Object)true);
        transformValues.enableSendingOldValues(true);
        Processor processor = transformValues.get();
        processor.init(this.context);
        ((InternalProcessorContext)Mockito.doNothing().when(this.context)).forward(new Record((Object)"Key", (Object)new Change((Object)"Key->newValue!", (Object)"Key->oldValue!"), 0L));
        processor.process(new Record((Object)"Key", (Object)new Change((Object)"newValue", (Object)"oldValue"), 0L));
    }

    @Test
    public void shouldNotSetSendOldValuesOnParentIfMaterialized() {
        new KTableTransformValues(this.parent, new NoOpValueTransformerWithKeySupplier(), QUERYABLE_NAME).enableSendingOldValues(true);
        ((KTableImpl)Mockito.verify(this.parent, (VerificationMode)Mockito.never())).enableSendingOldValues(ArgumentMatchers.anyBoolean());
    }

    @Test
    public void shouldSetSendOldValuesOnParentIfNotMaterialized() {
        Mockito.when((Object)this.parent.enableSendingOldValues(true)).thenReturn((Object)true);
        new KTableTransformValues(this.parent, new NoOpValueTransformerWithKeySupplier(), null).enableSendingOldValues(true);
    }

    @Test
    public void shouldTransformOnGetIfNotMaterialized() {
        KTableTransformValues transformValues = new KTableTransformValues(this.parent, (ValueTransformerWithKeySupplier)new ExclamationValueTransformerSupplier(new String[0]), null);
        Mockito.when((Object)this.parent.valueGetterSupplier()).thenReturn(this.parentGetterSupplier);
        Mockito.when((Object)this.parentGetterSupplier.get()).thenReturn(this.parentGetter);
        Mockito.when((Object)this.parentGetter.get((Object)"Key")).thenReturn((Object)ValueAndTimestamp.make((Object)"Value", (long)73L));
        ProcessorRecordContext recordContext = new ProcessorRecordContext(42L, 23L, -1, "foo", (Headers)new RecordHeaders());
        Mockito.when((Object)this.context.recordContext()).thenReturn((Object)recordContext);
        ((InternalProcessorContext)Mockito.doNothing().when(this.context)).setRecordContext(new ProcessorRecordContext(73L, -1L, -1, null, (Headers)new RecordHeaders()));
        ((InternalProcessorContext)Mockito.doNothing().when(this.context)).setRecordContext(recordContext);
        KTableValueGetter getter = transformValues.view().get();
        getter.init(this.context);
        String result = (String)getter.get((Object)"Key").value();
        MatcherAssert.assertThat((Object)result, (Matcher)CoreMatchers.is((Object)"Key->Value!"));
    }

    @Test
    public void shouldGetFromStateStoreIfMaterialized() {
        KTableTransformValues transformValues = new KTableTransformValues(this.parent, (ValueTransformerWithKeySupplier)new ExclamationValueTransformerSupplier(new String[0]), QUERYABLE_NAME);
        Mockito.when((Object)this.context.getStateStore(QUERYABLE_NAME)).thenReturn(this.stateStore);
        Mockito.when((Object)this.stateStore.get((Object)"Key")).thenReturn((Object)ValueAndTimestamp.make((Object)"something", (long)0L));
        KTableValueGetter getter = transformValues.view().get();
        getter.init(this.context);
        String result = (String)getter.get((Object)"Key").value();
        MatcherAssert.assertThat((Object)result, (Matcher)CoreMatchers.is((Object)"something"));
    }

    @Test
    public void shouldGetStoreNamesFromParentIfNotMaterialized() {
        KTableTransformValues transformValues = new KTableTransformValues(this.parent, (ValueTransformerWithKeySupplier)new ExclamationValueTransformerSupplier(new String[0]), null);
        Mockito.when((Object)this.parent.valueGetterSupplier()).thenReturn(this.parentGetterSupplier);
        Mockito.when((Object)this.parentGetterSupplier.storeNames()).thenReturn((Object)new String[]{"store1", "store2"});
        String[] storeNames = transformValues.view().storeNames();
        MatcherAssert.assertThat((Object)storeNames, (Matcher)CoreMatchers.is((Object)new String[]{"store1", "store2"}));
    }

    @Test
    public void shouldGetQueryableStoreNameIfMaterialized() {
        KTableTransformValues transformValues = new KTableTransformValues(this.parent, (ValueTransformerWithKeySupplier)new ExclamationValueTransformerSupplier(new String[0]), QUERYABLE_NAME);
        String[] storeNames = transformValues.view().storeNames();
        MatcherAssert.assertThat((Object)storeNames, (Matcher)CoreMatchers.is((Object)new String[]{QUERYABLE_NAME}));
    }

    @Test
    public void shouldCloseTransformerOnProcessorClose() {
        KTableTransformValues transformValues = new KTableTransformValues(this.parent, this.mockSupplier, null);
        Mockito.when((Object)this.mockSupplier.get()).thenReturn(this.transformer);
        ((ValueTransformerWithKey)Mockito.doNothing().when(this.transformer)).close();
        Processor processor = transformValues.get();
        processor.close();
    }

    @Test
    public void shouldCloseTransformerOnGetterClose() {
        KTableTransformValues transformValues = new KTableTransformValues(this.parent, this.mockSupplier, null);
        Mockito.when((Object)this.mockSupplier.get()).thenReturn(this.transformer);
        Mockito.when((Object)this.parentGetterSupplier.get()).thenReturn(this.parentGetter);
        Mockito.when((Object)this.parent.valueGetterSupplier()).thenReturn(this.parentGetterSupplier);
        ((ValueTransformerWithKey)Mockito.doNothing().when(this.transformer)).close();
        KTableValueGetter getter = transformValues.view().get();
        getter.close();
    }

    @Test
    public void shouldCloseParentGetterClose() {
        KTableTransformValues transformValues = new KTableTransformValues(this.parent, this.mockSupplier, null);
        Mockito.when((Object)this.parent.valueGetterSupplier()).thenReturn(this.parentGetterSupplier);
        Mockito.when((Object)this.mockSupplier.get()).thenReturn(this.transformer);
        Mockito.when((Object)this.parentGetterSupplier.get()).thenReturn(this.parentGetter);
        ((KTableValueGetter)Mockito.doNothing().when(this.parentGetter)).close();
        KTableValueGetter getter = transformValues.view().get();
        getter.close();
    }

    @Test
    public void shouldTransformValuesWithKey() {
        this.builder.addStateStore(KTableTransformValuesTest.storeBuilder(STORE_NAME)).addStateStore(KTableTransformValuesTest.storeBuilder(OTHER_STORE_NAME)).table(INPUT_TOPIC, CONSUMED).transformValues((ValueTransformerWithKeySupplier)new ExclamationValueTransformerSupplier(STORE_NAME, OTHER_STORE_NAME), new String[]{STORE_NAME, OTHER_STORE_NAME}).toStream().process(this.capture, new String[0]);
        this.driver = new TopologyTestDriver(this.builder.build(), KTableTransformValuesTest.props());
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        inputTopic.pipeInput((Object)"A", (Object)"a", 5L);
        inputTopic.pipeInput((Object)"B", (Object)"b", 10L);
        inputTopic.pipeInput((Object)"D", null, 15L);
        MatcherAssert.assertThat(this.output(), (Matcher)CoreMatchers.hasItems((Object[])new KeyValueTimestamp[]{new KeyValueTimestamp<String, String>("A", "A->a!", 5L), new KeyValueTimestamp<String, String>("B", "B->b!", 10L), new KeyValueTimestamp<String, String>("D", "D->null!", 15L)}));
        Assertions.assertNull((Object)this.driver.getKeyValueStore(QUERYABLE_NAME), (String)"Store should not be materialized");
    }

    @Test
    public void shouldTransformValuesWithKeyAndMaterialize() {
        this.builder.addStateStore(KTableTransformValuesTest.storeBuilder(STORE_NAME)).table(INPUT_TOPIC, CONSUMED).transformValues((ValueTransformerWithKeySupplier)new ExclamationValueTransformerSupplier(STORE_NAME, QUERYABLE_NAME), Materialized.as((String)QUERYABLE_NAME).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()), new String[]{STORE_NAME}).toStream().process(this.capture, new String[0]);
        this.driver = new TopologyTestDriver(this.builder.build(), KTableTransformValuesTest.props());
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        inputTopic.pipeInput((Object)"A", (Object)"a", 5L);
        inputTopic.pipeInput((Object)"B", (Object)"b", 10L);
        inputTopic.pipeInput((Object)"C", null, 15L);
        MatcherAssert.assertThat(this.output(), (Matcher)CoreMatchers.hasItems((Object[])new KeyValueTimestamp[]{new KeyValueTimestamp<String, String>("A", "A->a!", 5L), new KeyValueTimestamp<String, String>("B", "B->b!", 10L), new KeyValueTimestamp<String, String>("C", "C->null!", 15L)}));
        KeyValueStore keyValueStore = this.driver.getKeyValueStore(QUERYABLE_NAME);
        MatcherAssert.assertThat((Object)keyValueStore.get((Object)"A"), (Matcher)CoreMatchers.is((Object)"A->a!"));
        MatcherAssert.assertThat((Object)keyValueStore.get((Object)"B"), (Matcher)CoreMatchers.is((Object)"B->b!"));
        MatcherAssert.assertThat((Object)keyValueStore.get((Object)"C"), (Matcher)CoreMatchers.is((Object)"C->null!"));
        keyValueStore = this.driver.getTimestampedKeyValueStore(QUERYABLE_NAME);
        MatcherAssert.assertThat((Object)keyValueStore.get((Object)"A"), (Matcher)CoreMatchers.is((Object)ValueAndTimestamp.make((Object)"A->a!", (long)5L)));
        MatcherAssert.assertThat((Object)keyValueStore.get((Object)"B"), (Matcher)CoreMatchers.is((Object)ValueAndTimestamp.make((Object)"B->b!", (long)10L)));
        MatcherAssert.assertThat((Object)keyValueStore.get((Object)"C"), (Matcher)CoreMatchers.is((Object)ValueAndTimestamp.make((Object)"C->null!", (long)15L)));
    }

    @Test
    public void shouldCalculateCorrectOldValuesIfMaterializedEvenIfStateful() {
        this.builder.table(INPUT_TOPIC, CONSUMED).transformValues((ValueTransformerWithKeySupplier)new StatefulTransformerSupplier(), Materialized.as((String)QUERYABLE_NAME).withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer()), new String[0]).groupBy(KTableTransformValuesTest.toForceSendingOfOldValues(), Grouped.with((Serde)Serdes.String(), (Serde)Serdes.Integer())).reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR).mapValues(KTableTransformValuesTest.mapBackToStrings()).toStream().process(this.capture, new String[0]);
        this.driver = new TopologyTestDriver(this.builder.build(), KTableTransformValuesTest.props());
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        inputTopic.pipeInput((Object)"A", (Object)"ignored", 5L);
        inputTopic.pipeInput((Object)"A", (Object)"ignored1", 15L);
        inputTopic.pipeInput((Object)"A", (Object)"ignored2", 10L);
        MatcherAssert.assertThat(this.output(), (Matcher)CoreMatchers.equalTo(Arrays.asList(new KeyValueTimestamp<String, String>("A", "1", 5L), new KeyValueTimestamp<String, String>("A", "2", 15L), new KeyValueTimestamp<String, String>("A", "3", 15L))));
        KeyValueStore keyValueStore = this.driver.getKeyValueStore(QUERYABLE_NAME);
        MatcherAssert.assertThat((Object)keyValueStore.get((Object)"A"), (Matcher)CoreMatchers.is((Object)3));
    }

    @Test
    public void shouldCalculateCorrectOldValuesIfNotStatefulEvenIfNotMaterialized() {
        this.builder.table(INPUT_TOPIC, CONSUMED).transformValues((ValueTransformerWithKeySupplier)new StatelessTransformerSupplier(), new String[0]).groupBy(KTableTransformValuesTest.toForceSendingOfOldValues(), Grouped.with((Serde)Serdes.String(), (Serde)Serdes.Integer())).reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR).mapValues(KTableTransformValuesTest.mapBackToStrings()).toStream().process(this.capture, new String[0]);
        this.driver = new TopologyTestDriver(this.builder.build(), KTableTransformValuesTest.props());
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        inputTopic.pipeInput((Object)"A", (Object)"a", 5L);
        inputTopic.pipeInput((Object)"A", (Object)"aa", 15L);
        inputTopic.pipeInput((Object)"A", (Object)"aaa", 10L);
        MatcherAssert.assertThat(this.output(), (Matcher)CoreMatchers.equalTo(Arrays.asList(new KeyValueTimestamp<String, String>("A", "1", 5L), new KeyValueTimestamp<String, String>("A", "2", 15L), new KeyValueTimestamp<String, String>("A", "3", 15L))));
    }

    private ArrayList<KeyValueTimestamp<String, String>> output() {
        return this.capture.capturedProcessors(1).get(0).processed();
    }

    private static KeyValueMapper<String, Integer, KeyValue<String, Integer>> toForceSendingOfOldValues() {
        return KeyValue::new;
    }

    private static ValueMapper<Integer, String> mapBackToStrings() {
        return Object::toString;
    }

    private static StoreBuilder<KeyValueStore<Long, Long>> storeBuilder(String storeName) {
        return Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)storeName), (Serde)Serdes.Long(), (Serde)Serdes.Long());
    }

    public static Properties props() {
        Properties props = new Properties();
        props.setProperty("state.dir", TestUtils.tempDirectory().getAbsolutePath());
        props.setProperty("default.key.serde", Serdes.Integer().getClass().getName());
        props.setProperty("default.value.serde", Serdes.Integer().getClass().getName());
        return props;
    }

    private static void throwIfStoresNotAvailable(ProcessorContext context, List<String> expectedStoredNames) {
        ArrayList<String> missing = new ArrayList<String>();
        for (String storedName : expectedStoredNames) {
            if (context.getStateStore(storedName) != null) continue;
            missing.add(storedName);
        }
        if (!missing.isEmpty()) {
            throw new AssertionError((Object)("State stores are not accessible: " + missing));
        }
    }

    private static class StatelessTransformer
    implements ValueTransformerWithKey<String, String, Integer> {
        private StatelessTransformer() {
        }

        public void init(ProcessorContext context) {
        }

        public Integer transform(String readOnlyKey, String value) {
            return value == null ? null : Integer.valueOf(value.length());
        }

        public void close() {
        }
    }

    private static class StatelessTransformerSupplier
    implements ValueTransformerWithKeySupplier<String, String, Integer> {
        private StatelessTransformerSupplier() {
        }

        public ValueTransformerWithKey<String, String, Integer> get() {
            return new StatelessTransformer();
        }
    }

    private static class StatefulTransformer
    implements ValueTransformerWithKey<String, String, Integer> {
        private int counter;

        private StatefulTransformer() {
        }

        public void init(ProcessorContext context) {
        }

        public Integer transform(String readOnlyKey, String value) {
            return ++this.counter;
        }

        public void close() {
        }
    }

    private static class StatefulTransformerSupplier
    implements ValueTransformerWithKeySupplier<String, String, Integer> {
        private StatefulTransformerSupplier() {
        }

        public ValueTransformerWithKey<String, String, Integer> get() {
            return new StatefulTransformer();
        }
    }

    private static class NullSupplier
    implements ValueTransformerWithKeySupplier<String, String, String> {
        private NullSupplier() {
        }

        public ValueTransformerWithKey<String, String, String> get() {
            return null;
        }
    }

    public static class ExclamationValueTransformer
    implements ValueTransformerWithKey<Object, String, String> {
        private final List<String> expectedStoredNames;

        ExclamationValueTransformer(List<String> expectedStoredNames) {
            this.expectedStoredNames = expectedStoredNames;
        }

        public void init(ProcessorContext context) {
            KTableTransformValuesTest.throwIfStoresNotAvailable(context, this.expectedStoredNames);
        }

        public String transform(Object readOnlyKey, String value) {
            return readOnlyKey.toString() + "->" + value + "!";
        }

        public void close() {
        }
    }

    public static class ExclamationValueTransformerSupplier
    implements ValueTransformerWithKeySupplier<Object, String, String> {
        private final List<String> expectedStoredNames;

        ExclamationValueTransformerSupplier(String ... expectedStoreNames) {
            this.expectedStoredNames = Arrays.asList(expectedStoreNames);
        }

        public ExclamationValueTransformer get() {
            return new ExclamationValueTransformer(this.expectedStoredNames);
        }
    }
}

