/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;

import java.util.Objects;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.CombinedKey;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResponseWrapper;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.state.ValueAndTimestamp;

public class SubscriptionJoinForeignProcessorSupplier<K, KO, VO>
implements ProcessorSupplier<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> {
    private final KTableValueGetterSupplier<KO, VO> foreignValueGetterSupplier;

    public SubscriptionJoinForeignProcessorSupplier(KTableValueGetterSupplier<KO, VO> foreignValueGetterSupplier) {
        this.foreignValueGetterSupplier = foreignValueGetterSupplier;
    }

    @Override
    public Processor<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> get() {
        return new AbstractProcessor<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>>(){
            private KTableValueGetter<KO, VO> foreignValues;

            @Override
            public void init(ProcessorContext context) {
                super.init(context);
                this.foreignValues = SubscriptionJoinForeignProcessorSupplier.this.foreignValueGetterSupplier.get();
                this.foreignValues.init(context);
            }

            @Override
            public void process(CombinedKey<KO, K> combinedKey, Change<ValueAndTimestamp<SubscriptionWrapper<K>>> change) {
                Objects.requireNonNull(combinedKey, "This processor should never see a null key.");
                Objects.requireNonNull(change, "This processor should never see a null value.");
                ValueAndTimestamp valueAndTimestamp = (ValueAndTimestamp)change.newValue;
                Objects.requireNonNull(valueAndTimestamp, "This processor should never see a null newValue.");
                SubscriptionWrapper value = (SubscriptionWrapper)valueAndTimestamp.value();
                if (value.getVersion() != 0) {
                    throw new UnsupportedVersionException("SubscriptionWrapper is of an incompatible version.");
                }
                ValueAndTimestamp foreignValueAndTime = this.foreignValues.get(combinedKey.getForeignKey());
                long resultTimestamp = foreignValueAndTime == null ? valueAndTimestamp.timestamp() : Math.max(valueAndTimestamp.timestamp(), foreignValueAndTime.timestamp());
                switch (value.getInstruction()) {
                    case DELETE_KEY_AND_PROPAGATE: {
                        this.context().forward(combinedKey.getPrimaryKey(), new SubscriptionResponseWrapper<Object>(value.getHash(), null), To.all().withTimestamp(resultTimestamp));
                        break;
                    }
                    case PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE: {
                        Object valueToSend = foreignValueAndTime == null ? null : (Object)foreignValueAndTime.value();
                        this.context().forward(combinedKey.getPrimaryKey(), new SubscriptionResponseWrapper<Object>(value.getHash(), valueToSend), To.all().withTimestamp(resultTimestamp));
                        break;
                    }
                    case PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE: {
                        if (foreignValueAndTime == null) break;
                        this.context().forward(combinedKey.getPrimaryKey(), new SubscriptionResponseWrapper(value.getHash(), foreignValueAndTime.value()), To.all().withTimestamp(resultTimestamp));
                        break;
                    }
                    case DELETE_KEY_NO_PROPAGATE: {
                        break;
                    }
                    default: {
                        throw new IllegalStateException("Unhandled instruction: " + (Object)((Object)value.getInstruction()));
                    }
                }
            }
        };
    }
}

