/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.AbstractTask;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StandbyContextImpl;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StandbyTask
extends AbstractTask {
    private static final Logger log = LoggerFactory.getLogger(StandbyTask.class);
    private Map<TopicPartition, Long> checkpointedOffsets = new HashMap<TopicPartition, Long>();

    StandbyTask(TaskId id, String applicationId, Collection<TopicPartition> partitions, ProcessorTopology topology, Consumer<byte[], byte[]> consumer, ChangelogReader changelogReader, StreamsConfig config, StreamsMetrics metrics, StateDirectory stateDirectory) {
        super(id, applicationId, partitions, topology, consumer, changelogReader, true, stateDirectory, config);
        this.processorContext = new StandbyContextImpl(id, applicationId, config, this.stateMgr, metrics);
    }

    @Override
    public boolean initializeStateStores() {
        log.trace("Initializing state stores");
        this.initStateStores();
        this.checkpointedOffsets = Collections.unmodifiableMap(this.stateMgr.checkpointed());
        this.processorContext.initialized();
        this.taskInitialized = true;
        return true;
    }

    @Override
    public void initializeTopology() {
    }

    @Override
    public void resume() {
        log.debug("{} Resuming", (Object)this.logPrefix);
        this.updateOffsetLimits();
    }

    @Override
    public void commit() {
        log.trace("{} Committing", (Object)this.logPrefix);
        this.flushAndCheckpointState();
        this.updateOffsetLimits();
    }

    @Override
    public void suspend() {
        log.debug("{} Suspending", (Object)this.logPrefix);
        this.flushAndCheckpointState();
    }

    private void flushAndCheckpointState() {
        this.stateMgr.flush();
        this.stateMgr.checkpoint(Collections.emptyMap());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close(boolean clean, boolean isZombie) {
        if (!this.taskInitialized) {
            return;
        }
        log.debug("{} Closing", (Object)this.logPrefix);
        boolean committedSuccessfully = false;
        try {
            this.commit();
            committedSuccessfully = true;
        }
        finally {
            this.closeStateManager(committedSuccessfully);
        }
    }

    public List<ConsumerRecord<byte[], byte[]>> update(TopicPartition partition, List<ConsumerRecord<byte[], byte[]>> records) {
        log.trace("{} Updating standby replicas of its state store for partition [{}]", (Object)this.logPrefix, (Object)partition);
        return this.stateMgr.updateStandbyStates(partition, records);
    }

    Map<TopicPartition, Long> checkpointedOffsets() {
        return this.checkpointedOffsets;
    }

    public boolean initialize() {
        log.debug("{} Initializing", (Object)this.logPrefix);
        this.initializeStateStores();
        this.checkpointedOffsets = Collections.unmodifiableMap(this.stateMgr.checkpointed());
        this.processorContext.initialized();
        this.taskInitialized = true;
        return true;
    }

    @Override
    boolean process() {
        throw new UnsupportedOperationException("process not supported by standby task");
    }

    @Override
    boolean maybePunctuate() {
        throw new UnsupportedOperationException("maybePunctuate not supported by standby task");
    }

    @Override
    boolean commitNeeded() {
        throw new UnsupportedOperationException("commitNeeded not supported by standby task");
    }
}

