/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.internals.AbstractStream;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.JoinWindowsInternal;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.kstream.internals.KStreamJoinWindow;
import org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin;
import org.apache.kafka.streams.kstream.internals.NamedInternal;
import org.apache.kafka.streams.kstream.internals.PassThrough;
import org.apache.kafka.streams.kstream.internals.StreamJoinedInternal;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StreamStreamJoinNode;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.KeyAndJoinSide;
import org.apache.kafka.streams.state.internals.KeyAndJoinSideSerde;
import org.apache.kafka.streams.state.internals.LeftOrRightValue;
import org.apache.kafka.streams.state.internals.LeftOrRightValueSerde;
import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.TimeOrderedWindowStoreBuilder;

class KStreamImplJoin {
    private final InternalStreamsBuilder builder;
    private final boolean leftOuter;
    private final boolean rightOuter;

    KStreamImplJoin(InternalStreamsBuilder builder, boolean leftOuter, boolean rightOuter) {
        this.builder = builder;
        this.leftOuter = leftOuter;
        this.rightOuter = rightOuter;
    }

    public <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs, KStream<K1, V2> other, ValueJoinerWithKey<? super K1, ? super V1, ? super V2, ? extends R> joiner, JoinWindows windows, StreamJoined<K1, V1, V2> streamJoined) {
        StoreBuilder otherWindowStore;
        StoreBuilder thisWindowStore;
        StreamJoinedInternal<K1, V1, V2> streamJoinedInternal = new StreamJoinedInternal<K1, V1, V2>(streamJoined);
        NamedInternal renamed = new NamedInternal(streamJoinedInternal.name());
        String joinThisSuffix = this.rightOuter ? "-outer-this-join" : "-this-join";
        String joinOtherSuffix = this.leftOuter ? "-outer-other-join" : "-other-join";
        String thisWindowStreamProcessorName = renamed.suffixWithOrElseGet("-this-windowed", this.builder, "KSTREAM-WINDOWED-");
        String otherWindowStreamProcessorName = renamed.suffixWithOrElseGet("-other-windowed", this.builder, "KSTREAM-WINDOWED-");
        String joinThisGeneratedName = this.rightOuter ? this.builder.newProcessorName("KSTREAM-OUTERTHIS-") : this.builder.newProcessorName("KSTREAM-JOINTHIS-");
        String joinOtherGeneratedName = this.leftOuter ? this.builder.newProcessorName("KSTREAM-OUTEROTHER-") : this.builder.newProcessorName("KSTREAM-JOINOTHER-");
        String joinThisName = renamed.suffixWithOrElseGet(joinThisSuffix, joinThisGeneratedName);
        String joinOtherName = renamed.suffixWithOrElseGet(joinOtherSuffix, joinOtherGeneratedName);
        String joinMergeName = renamed.suffixWithOrElseGet("-merge", this.builder, "KSTREAM-MERGE-");
        GraphNode thisGraphNode = ((AbstractStream)((Object)lhs)).graphNode;
        GraphNode otherGraphNode = ((AbstractStream)((Object)other)).graphNode;
        String userProvidedBaseStoreName = streamJoinedInternal.storeName();
        WindowBytesStoreSupplier thisStoreSupplier = streamJoinedInternal.thisStoreSupplier();
        WindowBytesStoreSupplier otherStoreSupplier = streamJoinedInternal.otherStoreSupplier();
        this.assertUniqueStoreNames(thisStoreSupplier, otherStoreSupplier);
        if (thisStoreSupplier == null) {
            String thisJoinStoreName = userProvidedBaseStoreName == null ? joinThisGeneratedName : userProvidedBaseStoreName + joinThisSuffix;
            thisWindowStore = KStreamImplJoin.joinWindowStoreBuilder(thisJoinStoreName, windows, streamJoinedInternal.keySerde(), streamJoinedInternal.valueSerde(), streamJoinedInternal.loggingEnabled(), streamJoinedInternal.logConfig());
        } else {
            this.assertWindowSettings(thisStoreSupplier, windows);
            thisWindowStore = KStreamImplJoin.joinWindowStoreBuilderFromSupplier(thisStoreSupplier, streamJoinedInternal.keySerde(), streamJoinedInternal.valueSerde());
        }
        if (otherStoreSupplier == null) {
            String otherJoinStoreName = userProvidedBaseStoreName == null ? joinOtherGeneratedName : userProvidedBaseStoreName + joinOtherSuffix;
            otherWindowStore = KStreamImplJoin.joinWindowStoreBuilder(otherJoinStoreName, windows, streamJoinedInternal.keySerde(), streamJoinedInternal.otherValueSerde(), streamJoinedInternal.loggingEnabled(), streamJoinedInternal.logConfig());
        } else {
            this.assertWindowSettings(otherStoreSupplier, windows);
            otherWindowStore = KStreamImplJoin.joinWindowStoreBuilderFromSupplier(otherStoreSupplier, streamJoinedInternal.keySerde(), streamJoinedInternal.otherValueSerde());
        }
        KStreamJoinWindow thisWindowedStream = new KStreamJoinWindow(thisWindowStore.name());
        ProcessorParameters thisWindowStreamProcessorParams = new ProcessorParameters(thisWindowedStream, thisWindowStreamProcessorName);
        ProcessorGraphNode thisWindowedStreamsNode = new ProcessorGraphNode(thisWindowStreamProcessorName, thisWindowStreamProcessorParams);
        this.builder.addGraphNode(thisGraphNode, thisWindowedStreamsNode);
        KStreamJoinWindow otherWindowedStream = new KStreamJoinWindow(otherWindowStore.name());
        ProcessorParameters otherWindowStreamProcessorParams = new ProcessorParameters(otherWindowedStream, otherWindowStreamProcessorName);
        ProcessorGraphNode otherWindowedStreamsNode = new ProcessorGraphNode(otherWindowStreamProcessorName, otherWindowStreamProcessorParams);
        this.builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
        Optional<Object> outerJoinWindowStore = Optional.empty();
        if (this.leftOuter) {
            outerJoinWindowStore = Optional.of(this.sharedOuterJoinWindowStoreBuilder(windows, streamJoinedInternal, joinThisGeneratedName));
        }
        TimeTracker sharedTimeTracker = new TimeTracker();
        JoinWindowsInternal internalWindows = new JoinWindowsInternal(windows);
        KStreamKStreamJoin<? super K1, ? extends R, ? super V1, ? super V2> joinThis = new KStreamKStreamJoin<K1, R, V1, V2>(true, otherWindowStore.name(), internalWindows, joiner, this.leftOuter, outerJoinWindowStore.map(StoreBuilder::name), sharedTimeTracker);
        KStreamKStreamJoin<? super K1, ? extends R, ? super V2, ? super V1> joinOther = new KStreamKStreamJoin<K1, R, V2, V1>(false, thisWindowStore.name(), internalWindows, AbstractStream.reverseJoinerWithKey(joiner), this.rightOuter, outerJoinWindowStore.map(StoreBuilder::name), sharedTimeTracker);
        PassThrough joinMerge = new PassThrough();
        StreamStreamJoinNode.StreamStreamJoinNodeBuilder joinBuilder = StreamStreamJoinNode.streamStreamJoinNodeBuilder();
        ProcessorParameters joinThisProcessorParams = new ProcessorParameters(joinThis, joinThisName);
        ProcessorParameters joinOtherProcessorParams = new ProcessorParameters(joinOther, joinOtherName);
        ProcessorParameters joinMergeProcessorParams = new ProcessorParameters(joinMerge, joinMergeName);
        joinBuilder.withJoinMergeProcessorParameters(joinMergeProcessorParams).withJoinThisProcessorParameters(joinThisProcessorParams).withJoinOtherProcessorParameters(joinOtherProcessorParams).withThisWindowStoreBuilder(thisWindowStore).withOtherWindowStoreBuilder(otherWindowStore).withThisWindowedStreamProcessorParameters(thisWindowStreamProcessorParams).withOtherWindowedStreamProcessorParameters(otherWindowStreamProcessorParams).withOuterJoinWindowStoreBuilder(outerJoinWindowStore).withValueJoiner(joiner).withNodeName(joinMergeName);
        if (internalWindows.spuriousResultFixEnabled()) {
            joinBuilder.withSpuriousResultFixEnabled();
        }
        StreamStreamJoinNode joinGraphNode = joinBuilder.build();
        this.builder.addGraphNode(Arrays.asList(thisGraphNode, otherGraphNode), joinGraphNode);
        HashSet<String> allSourceNodes = new HashSet<String>(((KStreamImpl)lhs).subTopologySourceNodes);
        allSourceNodes.addAll(((KStreamImpl)other).subTopologySourceNodes);
        return new KStreamImpl(joinMergeName, streamJoinedInternal.keySerde(), null, allSourceNodes, false, joinGraphNode, this.builder);
    }

    private void assertWindowSettings(WindowBytesStoreSupplier supplier, JoinWindows joinWindows) {
        boolean allMatch;
        if (!supplier.retainDuplicates()) {
            throw new StreamsException("The StoreSupplier must set retainDuplicates=true, found retainDuplicates=false");
        }
        boolean bl = allMatch = supplier.retentionPeriod() == joinWindows.size() + joinWindows.gracePeriodMs() && supplier.windowSize() == joinWindows.size();
        if (!allMatch) {
            throw new StreamsException(String.format("Window settings mismatch. WindowBytesStoreSupplier settings %s must match JoinWindows settings %s for the window size and retention period", supplier, joinWindows));
        }
    }

    private void assertUniqueStoreNames(WindowBytesStoreSupplier supplier, WindowBytesStoreSupplier otherSupplier) {
        if (supplier != null && otherSupplier != null && supplier.name().equals(otherSupplier.name())) {
            throw new StreamsException("Both StoreSuppliers have the same name.  StoreSuppliers must provide unique names");
        }
    }

    private static <K, V> StoreBuilder<WindowStore<K, V>> joinWindowStoreBuilder(String storeName, JoinWindows windows, Serde<K> keySerde, Serde<V> valueSerde, boolean loggingEnabled, Map<String, String> logConfig) {
        StoreBuilder<WindowStore<K, V>> builder = Stores.windowStoreBuilder(Stores.persistentWindowStore(storeName + "-store", Duration.ofMillis(windows.size() + windows.gracePeriodMs()), Duration.ofMillis(windows.size()), true), keySerde, valueSerde);
        if (loggingEnabled) {
            builder.withLoggingEnabled(logConfig);
        } else {
            builder.withLoggingDisabled();
        }
        return builder;
    }

    private <K, V1, V2> String buildOuterJoinWindowStoreName(StreamJoinedInternal<K, V1, V2> streamJoinedInternal, String joinThisGeneratedName) {
        String outerJoinSuffix;
        String string = outerJoinSuffix = this.rightOuter ? "-outer-shared-join" : "-left-shared-join";
        if (streamJoinedInternal.thisStoreSupplier() != null && !streamJoinedInternal.thisStoreSupplier().name().isEmpty()) {
            return streamJoinedInternal.thisStoreSupplier().name() + outerJoinSuffix;
        }
        if (streamJoinedInternal.storeName() != null) {
            return streamJoinedInternal.storeName() + outerJoinSuffix;
        }
        return "KSTREAM-OUTERSHARED-" + joinThisGeneratedName.substring(this.rightOuter ? "KSTREAM-OUTERTHIS-".length() : "KSTREAM-JOINTHIS-".length());
    }

    private <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> sharedOuterJoinWindowStoreBuilder(JoinWindows windows, StreamJoinedInternal<K, V1, V2> streamJoinedInternal, String joinThisGeneratedName) {
        boolean persistent = streamJoinedInternal.thisStoreSupplier() == null || ((WindowStore)streamJoinedInternal.thisStoreSupplier().get()).persistent();
        String storeName = this.buildOuterJoinWindowStoreName(streamJoinedInternal, joinThisGeneratedName);
        KeyAndJoinSideSerde<K> keyAndJoinSideSerde = new KeyAndJoinSideSerde<K>(streamJoinedInternal.keySerde());
        LeftOrRightValueSerde<V1, V2> leftOrRightValueSerde = new LeftOrRightValueSerde<V1, V2>(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde());
        StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> builder = persistent ? new TimeOrderedWindowStoreBuilder(KStreamImplJoin.persistentTimeOrderedWindowStore(storeName + "-store", Duration.ofMillis(windows.size() + windows.gracePeriodMs()), Duration.ofMillis(windows.size())), keyAndJoinSideSerde, leftOrRightValueSerde, Time.SYSTEM) : Stores.windowStoreBuilder(Stores.inMemoryWindowStore(storeName + "-store", Duration.ofMillis(windows.size() + windows.gracePeriodMs()), Duration.ofMillis(windows.size()), false), keyAndJoinSideSerde, leftOrRightValueSerde);
        if (streamJoinedInternal.loggingEnabled()) {
            builder.withLoggingEnabled(streamJoinedInternal.logConfig());
        } else {
            builder.withLoggingDisabled();
        }
        return builder;
    }

    private static WindowBytesStoreSupplier persistentTimeOrderedWindowStore(String storeName, Duration retentionPeriod, Duration windowSize) {
        Objects.requireNonNull(storeName, "name cannot be null");
        String rpMsgPrefix = ApiUtils.prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
        long retentionMs = ApiUtils.validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
        String wsMsgPrefix = ApiUtils.prepareMillisCheckFailMsgPrefix(windowSize, "windowSize");
        long windowSizeMs = ApiUtils.validateMillisecondDuration(windowSize, wsMsgPrefix);
        long segmentInterval = Math.max(retentionMs / 2L, 60000L);
        if (retentionMs < 0L) {
            throw new IllegalArgumentException("retentionPeriod cannot be negative");
        }
        if (windowSizeMs < 0L) {
            throw new IllegalArgumentException("windowSize cannot be negative");
        }
        if (segmentInterval < 1L) {
            throw new IllegalArgumentException("segmentInterval cannot be zero or negative");
        }
        if (windowSizeMs > retentionMs) {
            throw new IllegalArgumentException("The retention period of the window store " + storeName + " must be no smaller than its window size. Got size=[" + windowSizeMs + "], retention=[" + retentionMs + "]");
        }
        return new RocksDbWindowBytesStoreSupplier(storeName, retentionMs, segmentInterval, windowSizeMs, true, RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIME_ORDERED_WINDOW_STORE);
    }

    private static <K, V> StoreBuilder<WindowStore<K, V>> joinWindowStoreBuilderFromSupplier(WindowBytesStoreSupplier storeSupplier, Serde<K> keySerde, Serde<V> valueSerde) {
        return Stores.windowStoreBuilder(storeSupplier, keySerde, valueSerde);
    }

    static class TimeTracker {
        private long emitIntervalMs = 1000L;
        long streamTime = -1L;
        long minTime = Long.MAX_VALUE;
        long nextTimeToEmit;

        TimeTracker() {
        }

        public void setEmitInterval(long emitIntervalMs) {
            this.emitIntervalMs = emitIntervalMs;
        }

        public void advanceStreamTime(long recordTimestamp) {
            this.streamTime = Math.max(recordTimestamp, this.streamTime);
        }

        public void updatedMinTime(long recordTimestamp) {
            this.minTime = Math.min(recordTimestamp, this.minTime);
        }

        public void advanceNextTimeToEmit() {
            this.nextTimeToEmit += this.emitIntervalMs;
        }
    }
}

