/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.GlobalStateMaintainer;
import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl;
import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GlobalStreamThread
extends Thread {
    private static final Logger log = LoggerFactory.getLogger(GlobalStreamThread.class);
    private final StreamsConfig config;
    private final Consumer<byte[], byte[]> consumer;
    private final StateDirectory stateDirectory;
    private final Time time;
    private final ThreadCache cache;
    private final StreamsMetrics streamsMetrics;
    private final ProcessorTopology topology;
    private volatile StreamsException startupException;
    private volatile State state = State.CREATED;
    private final Object stateLock = new Object();
    private StreamThread.StateListener stateListener = null;
    private final String logPrefix;

    public void setStateListener(StreamThread.StateListener listener) {
        this.stateListener = listener;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public State state() {
        Object object = this.stateLock;
        synchronized (object) {
            return this.state;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void setState(State newState, boolean ignoreWhenShuttingDownOrDead) {
        State oldState;
        Object object = this.stateLock;
        synchronized (object) {
            oldState = this.state;
            if (ignoreWhenShuttingDownOrDead && (this.state == State.PENDING_SHUTDOWN || this.state == State.DEAD)) {
                return;
            }
            if (!this.state.isValidTransition(newState)) {
                log.warn("{} Unexpected state transition from {} to {}.", new Object[]{this.logPrefix, oldState, newState});
                throw new StreamsException(this.logPrefix + " Unexpected state transition from " + oldState + " to " + newState);
            }
            log.info("{} State transition from {} to {}.", new Object[]{this.logPrefix, oldState, newState});
            this.state = newState;
        }
        if (this.stateListener != null) {
            this.stateListener.onChange(this, this.state, oldState);
        }
    }

    public GlobalStreamThread(ProcessorTopology topology, StreamsConfig config, Consumer<byte[], byte[]> globalConsumer, StateDirectory stateDirectory, Metrics metrics, Time time, String clientId) {
        super("GlobalStreamThread");
        this.topology = topology;
        this.config = config;
        this.consumer = globalConsumer;
        this.stateDirectory = stateDirectory;
        this.time = time;
        long cacheSizeBytes = Math.max(0L, config.getLong("cache.max.bytes.buffering") / (long)(config.getInt("num.stream.threads") + 1));
        String threadClientId = clientId + "-" + this.getName();
        this.streamsMetrics = new StreamsMetricsImpl(metrics, threadClientId, Collections.singletonMap("client-id", threadClientId));
        this.cache = new ThreadCache(threadClientId, cacheSizeBytes, this.streamsMetrics);
        this.logPrefix = String.format("global-stream-thread [%s]", threadClientId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        StateConsumer stateConsumer = this.initialize();
        if (stateConsumer == null) {
            return;
        }
        this.setState(State.RUNNING, true);
        try {
            while (this.stillRunning()) {
                stateConsumer.pollAndUpdate();
            }
            log.debug("Shutting down GlobalStreamThread at user request");
        }
        finally {
            try {
                this.setState(State.PENDING_SHUTDOWN, true);
                stateConsumer.close();
                this.setState(State.DEAD, false);
            }
            catch (IOException e) {
                log.error("Failed to cleanly shutdown GlobalStreamThread", (Throwable)e);
            }
        }
    }

    private StateConsumer initialize() {
        try {
            GlobalStateManagerImpl stateMgr = new GlobalStateManagerImpl(this.topology, this.consumer, this.stateDirectory);
            StateConsumer stateConsumer = new StateConsumer(this.consumer, new GlobalStateUpdateTask(this.topology, new GlobalProcessorContextImpl(this.config, stateMgr, this.streamsMetrics, this.cache), stateMgr), this.time, this.config.getLong("poll.ms"), this.config.getLong("commit.interval.ms"));
            stateConsumer.initialize();
            return stateConsumer;
        }
        catch (StreamsException e) {
            this.startupException = e;
        }
        catch (Exception e) {
            this.startupException = new StreamsException("Exception caught during initialization of GlobalStreamThread", e);
        }
        return null;
    }

    @Override
    public synchronized void start() {
        super.start();
        while (!this.stillRunning()) {
            Utils.sleep((long)1L);
            if (this.startupException == null) continue;
            throw this.startupException;
        }
    }

    public void close() {
        this.setState(State.PENDING_SHUTDOWN, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean stillRunning() {
        Object object = this.stateLock;
        synchronized (object) {
            return this.state.isRunning();
        }
    }

    static class StateConsumer {
        private final Consumer<byte[], byte[]> consumer;
        private final GlobalStateMaintainer stateMaintainer;
        private final Time time;
        private final long pollMs;
        private final long flushInterval;
        private long lastFlush;

        StateConsumer(Consumer<byte[], byte[]> consumer, GlobalStateMaintainer stateMaintainer, Time time, long pollMs, long flushInterval) {
            this.consumer = consumer;
            this.stateMaintainer = stateMaintainer;
            this.time = time;
            this.pollMs = pollMs;
            this.flushInterval = flushInterval;
        }

        void initialize() {
            Map<TopicPartition, Long> partitionOffsets = this.stateMaintainer.initialize();
            this.consumer.assign(partitionOffsets.keySet());
            for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
                this.consumer.seek(entry.getKey(), entry.getValue().longValue());
            }
            this.lastFlush = this.time.milliseconds();
        }

        void pollAndUpdate() {
            ConsumerRecords received = this.consumer.poll(this.pollMs);
            for (ConsumerRecord record : received) {
                this.stateMaintainer.update((ConsumerRecord<byte[], byte[]>)record);
            }
            long now = this.time.milliseconds();
            if (this.flushInterval >= 0L && now >= this.lastFlush + this.flushInterval) {
                this.stateMaintainer.flushState();
                this.lastFlush = now;
            }
        }

        public void close() throws IOException {
            try {
                this.consumer.close();
            }
            catch (Exception e) {
                log.error("Failed to cleanly close GlobalStreamThread consumer", (Throwable)e);
            }
            this.stateMaintainer.close();
        }
    }

    public static enum State implements ThreadStateTransitionValidator
    {
        CREATED(1, 2),
        RUNNING(2),
        PENDING_SHUTDOWN(3),
        DEAD(new Integer[0]);

        private final Set<Integer> validTransitions = new HashSet<Integer>();

        private State(Integer ... validTransitions) {
            this.validTransitions.addAll(Arrays.asList(validTransitions));
        }

        public boolean isRunning() {
            return !this.equals(PENDING_SHUTDOWN) && !this.equals(CREATED) && !this.equals(DEAD);
        }

        @Override
        public boolean isValidTransition(ThreadStateTransitionValidator newState) {
            State tmpState = (State)newState;
            return this.validTransitions.contains(tmpState.ordinal());
        }
    }
}

