/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.SimpleTimeZone;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.RocksDBStore;
import org.apache.kafka.streams.state.internals.StoreChangeLogger;
import org.apache.kafka.streams.state.internals.WindowStoreUtils;

public class RocksDBWindowStore<K, V>
implements WindowStore<K, V> {
    public static final long MIN_SEGMENT_INTERVAL = 60000L;
    private volatile boolean open = false;
    private final String name;
    private final int numSegments;
    private final long segmentInterval;
    private final boolean retainDuplicates;
    private final Serde<K> keySerde;
    private final Serde<V> valueSerde;
    private final SimpleDateFormat formatter;
    private final ConcurrentHashMap<Long, Segment> segments = new ConcurrentHashMap();
    private ProcessorContext context;
    private int seqnum = 0;
    private long currentSegmentId = -1L;
    private StateSerdes<K, V> serdes;
    private boolean loggingEnabled = false;
    private StoreChangeLogger<Bytes, byte[]> changeLogger = null;

    public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde) {
        this.name = name;
        this.numSegments = numSegments;
        this.segmentInterval = Math.max(retentionPeriod / (long)(numSegments - 1), 60000L);
        this.keySerde = keySerde;
        this.valueSerde = valueSerde;
        this.retainDuplicates = retainDuplicates;
        this.formatter = new SimpleDateFormat("yyyyMMddHHmm");
        this.formatter.setTimeZone(new SimpleTimeZone(0, "GMT"));
    }

    public RocksDBWindowStore<K, V> enableLogging() {
        this.loggingEnabled = true;
        return this;
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public void init(ProcessorContext context, StateStore root) {
        this.context = context;
        this.serdes = new StateSerdes(this.name, (Serde<?>)(this.keySerde == null ? context.keySerde() : this.keySerde), (Serde<?>)(this.valueSerde == null ? context.valueSerde() : this.valueSerde));
        this.openExistingSegments();
        this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<Bytes, byte[]>(this.name, context, WindowStoreUtils.INNER_SERDES) : null;
        context.register(root, this.loggingEnabled, new StateRestoreCallback(){

            @Override
            public void restore(byte[] key, byte[] value) {
                if (value != null) {
                    RocksDBWindowStore.this.putInternal(key, value);
                }
            }
        });
        this.flush();
        this.open = true;
    }

    private void openExistingSegments() {
        try {
            File dir = new File(this.context.stateDir(), this.name);
            if (dir.exists()) {
                String[] list = dir.list();
                if (list != null) {
                    long[] segmentIds = new long[list.length];
                    for (int i = 0; i < list.length; ++i) {
                        segmentIds[i] = this.segmentIdFromSegmentName(list[i]);
                    }
                    Arrays.sort(segmentIds);
                    for (long segmentId : segmentIds) {
                        if (segmentId < 0L) continue;
                        this.currentSegmentId = segmentId;
                        this.getOrCreateSegment(segmentId);
                    }
                }
            } else {
                dir.mkdir();
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Override
    public boolean persistent() {
        return true;
    }

    @Override
    public boolean isOpen() {
        return this.open;
    }

    @Override
    public void flush() {
        for (KeyValueStore keyValueStore : this.segments.values()) {
            if (keyValueStore == null) continue;
            keyValueStore.flush();
        }
    }

    @Override
    public void close() {
        this.open = false;
        this.flush();
        for (KeyValueStore keyValueStore : this.segments.values()) {
            if (keyValueStore == null) continue;
            keyValueStore.close();
        }
    }

    @Override
    public void put(K key, V value) {
        this.put(key, value, this.context.timestamp());
    }

    @Override
    public void put(K key, V value, long timestamp) {
        byte[] rawValue = this.serdes.rawValue(value);
        byte[] rawKey = this.putAndReturnInternalKey(key, rawValue, timestamp);
        if (rawKey != null && this.loggingEnabled) {
            this.changeLogger.logChange(Bytes.wrap((byte[])rawKey), rawValue);
        }
    }

    private byte[] putAndReturnInternalKey(K key, byte[] value, long timestamp) {
        Segment segment;
        long segmentId = this.segmentId(timestamp);
        if (segmentId > this.currentSegmentId) {
            this.currentSegmentId = segmentId;
            this.cleanup();
        }
        if ((segment = this.getOrCreateSegment(segmentId)) != null) {
            if (this.retainDuplicates) {
                this.seqnum = this.seqnum + 1 & Integer.MAX_VALUE;
            }
            byte[] binaryKey = WindowStoreUtils.toBinaryKey(key, timestamp, this.seqnum, this.serdes);
            segment.put(Bytes.wrap((byte[])binaryKey), value);
            return binaryKey;
        }
        return null;
    }

    private void putInternal(byte[] binaryKey, byte[] binaryValue) {
        Segment segment;
        long timestamp = WindowStoreUtils.timestampFromBinaryKey(binaryKey);
        long segmentId = this.segmentId(timestamp);
        if (segmentId > this.currentSegmentId) {
            this.currentSegmentId = segmentId;
            this.cleanup();
        }
        if ((segment = this.getOrCreateSegment(segmentId)) != null) {
            segment.writeToStore(Bytes.wrap((byte[])binaryKey), binaryValue);
        }
    }

    private byte[] getInternal(byte[] binaryKey) {
        long segmentId = this.segmentId(WindowStoreUtils.timestampFromBinaryKey(binaryKey));
        Segment segment = this.getSegment(segmentId);
        if (segment != null) {
            return (byte[])segment.get(Bytes.wrap((byte[])binaryKey));
        }
        return null;
    }

    @Override
    public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
        if (!this.isOpen()) {
            throw new InvalidStateStoreException("Store " + this.name + " is currently not isOpen");
        }
        long segFrom = this.segmentId(timeFrom);
        long segTo = this.segmentId(Math.max(0L, timeTo));
        byte[] binaryFrom = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, this.serdes);
        byte[] binaryTo = WindowStoreUtils.toBinaryKey(key, timeTo, Integer.MAX_VALUE, this.serdes);
        ArrayList<Segment> segments = new ArrayList<Segment>();
        for (long segmentId = segFrom; segmentId <= segTo; ++segmentId) {
            Segment segment = this.getSegment(segmentId);
            if (segment == null || !segment.isOpen()) continue;
            try {
                segments.add(segment);
                continue;
            }
            catch (InvalidStateStoreException invalidStateStoreException) {
                // empty catch block
            }
        }
        if (!segments.isEmpty()) {
            return new RocksDBWindowStoreIterator<V>(this.serdes, Bytes.wrap((byte[])binaryFrom), Bytes.wrap((byte[])binaryTo), segments.iterator());
        }
        return new RocksDBWindowStoreIterator<V>(this.serdes);
    }

    private Segment getSegment(long segmentId) {
        Segment segment = this.segments.get(segmentId % (long)this.numSegments);
        if (!this.isSegment(segment, segmentId)) {
            return null;
        }
        return segment;
    }

    private boolean isSegment(Segment store, long segmentId) {
        return store != null && store.id == segmentId;
    }

    private Segment getOrCreateSegment(long segmentId) {
        if (segmentId <= this.currentSegmentId && segmentId > this.currentSegmentId - (long)this.numSegments) {
            long key = segmentId % (long)this.numSegments;
            Segment segment = this.segments.get(key);
            if (!this.isSegment(segment, segmentId)) {
                this.cleanup();
            }
            if (!this.segments.containsKey(key)) {
                Segment newSegment = new Segment(this.segmentName(segmentId), this.name, segmentId);
                newSegment.openDB(this.context);
                this.segments.put(key, newSegment);
            }
            return this.segments.get(key);
        }
        return null;
    }

    private void cleanup() {
        for (Map.Entry<Long, Segment> segmentEntry : this.segments.entrySet()) {
            Segment segment = segmentEntry.getValue();
            if (segment == null || segment.id > this.currentSegmentId - (long)this.numSegments) continue;
            this.segments.remove(segmentEntry.getKey());
            segment.close();
            segment.destroy();
        }
    }

    private long segmentId(long timestamp) {
        return timestamp / this.segmentInterval;
    }

    public String segmentName(long segmentId) {
        return this.name + "-" + this.formatter.format(new Date(segmentId * this.segmentInterval));
    }

    public long segmentIdFromSegmentName(String segmentName) {
        try {
            Date date = this.formatter.parse(segmentName.substring(this.name.length() + 1));
            return date.getTime() / this.segmentInterval;
        }
        catch (Exception ex) {
            return -1L;
        }
    }

    public Set<Long> segmentIds() {
        HashSet<Long> segmentIds = new HashSet<Long>();
        for (Segment segment : this.segments.values()) {
            if (segment == null) continue;
            segmentIds.add(segment.id);
        }
        return segmentIds;
    }

    private static class RocksDBWindowStoreIterator<V>
    implements WindowStoreIterator<V> {
        private final StateSerdes<?, V> serdes;
        private final Iterator<Segment> segments;
        private final Bytes from;
        private final Bytes to;
        private KeyValueIterator<Bytes, byte[]> currentIterator;
        private KeyValueStore<Bytes, byte[]> currentSegment;

        RocksDBWindowStoreIterator(StateSerdes<?, V> serdes) {
            this(serdes, null, null, Collections.emptyIterator());
        }

        RocksDBWindowStoreIterator(StateSerdes<?, V> serdes, Bytes from, Bytes to, Iterator<Segment> segments) {
            this.serdes = serdes;
            this.from = from;
            this.to = to;
            this.segments = segments;
        }

        @Override
        public boolean hasNext() {
            while (!(this.currentIterator != null && this.currentIterator.hasNext() && this.currentSegment.isOpen() || !this.segments.hasNext())) {
                this.close();
                this.currentSegment = this.segments.next();
                try {
                    this.currentIterator = this.currentSegment.range(this.from, this.to);
                }
                catch (InvalidStateStoreException invalidStateStoreException) {}
            }
            return this.currentIterator != null && this.currentIterator.hasNext();
        }

        @Override
        public KeyValue<Long, V> next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            KeyValue kv = (KeyValue)this.currentIterator.next();
            return new KeyValue<Long, V>(WindowStoreUtils.timestampFromBinaryKey(((Bytes)kv.key).get()), this.serdes.valueFrom((byte[])kv.value));
        }

        @Override
        public void remove() {
        }

        @Override
        public void close() {
            if (this.currentIterator != null) {
                this.currentIterator.close();
                this.currentIterator = null;
            }
        }
    }

    private static class Segment
    extends RocksDBStore<Bytes, byte[]> {
        public final long id;

        Segment(String segmentName, String windowName, long id) {
            super(segmentName, windowName, WindowStoreUtils.INNER_KEY_SERDE, WindowStoreUtils.INNER_VALUE_SERDE);
            this.id = id;
        }

        public void destroy() {
            Utils.delete((File)this.dbDir);
        }

        @Override
        public void openDB(ProcessorContext context) {
            super.openDB(context);
            this.open = true;
        }
    }
}

