/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;

import java.nio.ByteBuffer;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.CombinedKey;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.CombinedKeySchema;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResponseWrapper;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO>
implements ProcessorSupplier<KO, Change<VO>> {
    private static final Logger LOG = LoggerFactory.getLogger(ForeignJoinSubscriptionProcessorSupplier.class);
    private final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder;
    private final CombinedKeySchema<KO, K> keySchema;

    public ForeignJoinSubscriptionProcessorSupplier(StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder, CombinedKeySchema<KO, K> keySchema) {
        this.storeBuilder = storeBuilder;
        this.keySchema = keySchema;
    }

    @Override
    public Processor<KO, Change<VO>> get() {
        return new KTableKTableJoinProcessor();
    }

    private final class KTableKTableJoinProcessor
    extends AbstractProcessor<KO, Change<VO>> {
        private Sensor droppedRecordsSensor;
        private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>> store;

        private KTableKTableJoinProcessor() {
        }

        @Override
        public void init(ProcessorContext context) {
            super.init(context);
            InternalProcessorContext internalProcessorContext = (InternalProcessorContext)context;
            this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(Thread.currentThread().getName(), internalProcessorContext.taskId().toString(), internalProcessorContext.metrics());
            this.store = (TimestampedKeyValueStore)internalProcessorContext.getStateStore(ForeignJoinSubscriptionProcessorSupplier.this.storeBuilder);
        }

        @Override
        public void process(KO key, Change<VO> value) {
            if (key == null) {
                LOG.warn("Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]", value, this.context().topic(), this.context().partition(), this.context().offset());
                this.droppedRecordsSensor.record();
                return;
            }
            Bytes prefixBytes = ForeignJoinSubscriptionProcessorSupplier.this.keySchema.prefixBytes(key);
            try (KeyValueIterator prefixScanResults = this.store.range(prefixBytes, Bytes.increment(prefixBytes));){
                while (prefixScanResults.hasNext()) {
                    KeyValue next = (KeyValue)prefixScanResults.next();
                    if (!this.prefixEquals(((Bytes)next.key).get(), prefixBytes.get())) continue;
                    CombinedKey combinedKey = ForeignJoinSubscriptionProcessorSupplier.this.keySchema.fromBytes((Bytes)next.key);
                    this.context().forward(combinedKey.getPrimaryKey(), new SubscriptionResponseWrapper(((SubscriptionWrapper)((ValueAndTimestamp)next.value).value()).getHash(), value.newValue));
                }
            }
        }

        private boolean prefixEquals(byte[] x, byte[] y) {
            int min = Math.min(x.length, y.length);
            ByteBuffer xSlice = ByteBuffer.wrap(x, 0, min);
            ByteBuffer ySlice = ByteBuffer.wrap(y, 0, min);
            return xSlice.equals(ySlice);
        }
    }
}

