/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.internals.GlobalStateMaintainer;
import org.apache.kafka.streams.processor.internals.GlobalStateManager;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordDeserializer;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer;

public class GlobalStateUpdateTask
implements GlobalStateMaintainer {
    private final ProcessorTopology topology;
    private final InternalProcessorContext processorContext;
    private final Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
    private final Map<String, SourceNodeAndDeserializer> deserializers = new HashMap<String, SourceNodeAndDeserializer>();
    private final GlobalStateManager stateMgr;

    public GlobalStateUpdateTask(ProcessorTopology topology, InternalProcessorContext processorContext, GlobalStateManager stateMgr) {
        this.topology = topology;
        this.stateMgr = stateMgr;
        this.processorContext = processorContext;
    }

    @Override
    public Map<TopicPartition, Long> initialize() {
        Set<String> storeNames = this.stateMgr.initialize(this.processorContext);
        Map<String, String> storeNameToTopic = this.topology.storeToChangelogTopic();
        for (String storeName : storeNames) {
            String sourceTopic = storeNameToTopic.get(storeName);
            SourceNode source = this.topology.source(sourceTopic);
            this.deserializers.put(sourceTopic, new SourceNodeAndDeserializer(source, new SourceNodeRecordDeserializer(source)));
        }
        this.initTopology();
        this.processorContext.initialized();
        return this.stateMgr.checkpointed();
    }

    @Override
    public void update(ConsumerRecord<byte[], byte[]> record) {
        SourceNodeAndDeserializer sourceNodeAndDeserializer = this.deserializers.get(record.topic());
        ConsumerRecord<Object, Object> deserialized = sourceNodeAndDeserializer.deserializer.deserialize(record);
        ProcessorRecordContext recordContext = new ProcessorRecordContext(deserialized.timestamp(), deserialized.offset(), deserialized.partition(), deserialized.topic());
        this.processorContext.setRecordContext(recordContext);
        this.processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode);
        sourceNodeAndDeserializer.sourceNode.process(deserialized.key(), deserialized.value());
        this.offsets.put(new TopicPartition(record.topic(), record.partition()), deserialized.offset() + 1L);
    }

    @Override
    public void flushState() {
        this.stateMgr.flush();
        this.stateMgr.checkpoint(this.offsets);
    }

    @Override
    public void close() throws IOException {
        this.stateMgr.close(this.offsets);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initTopology() {
        for (ProcessorNode node : this.topology.processors()) {
            this.processorContext.setCurrentNode(node);
            try {
                node.init(this.processorContext);
            }
            finally {
                this.processorContext.setCurrentNode(null);
            }
        }
    }

    private static class SourceNodeAndDeserializer {
        private final SourceNode sourceNode;
        private final RecordDeserializer deserializer;

        SourceNodeAndDeserializer(SourceNode sourceNode, RecordDeserializer deserializer) {
            this.sourceNode = sourceNode;
            this.deserializer = deserializer;
        }
    }
}

