/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.List;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.internals.HasNextCondition;
import org.apache.kafka.streams.state.internals.Segment;
import org.apache.kafka.streams.state.internals.SegmentedBytesStore;
import org.apache.kafka.streams.state.internals.Segments;

class SessionKeySchema
implements SegmentedBytesStore.KeySchema {
    private String topic;

    SessionKeySchema() {
    }

    @Override
    public void init(String topic) {
        this.topic = topic;
    }

    @Override
    public Bytes upperRange(Bytes key, long to) {
        Windowed<Bytes> sessionKey = new Windowed<Bytes>(key, new SessionWindow(to, Long.MAX_VALUE));
        return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer(), this.topic);
    }

    @Override
    public Bytes lowerRange(Bytes key, long from) {
        Windowed<Bytes> sessionKey = new Windowed<Bytes>(key, new SessionWindow(0L, Math.max(0L, from)));
        return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer(), this.topic);
    }

    @Override
    public long segmentTimestamp(Bytes key) {
        return SessionKeySerde.extractEnd(key.get());
    }

    @Override
    public HasNextCondition hasNextCondition(final Bytes binaryKey, final long from, final long to) {
        return new HasNextCondition(){

            @Override
            public boolean hasNext(KeyValueIterator<Bytes, ?> iterator) {
                while (iterator.hasNext()) {
                    Bytes bytes = iterator.peekNextKey();
                    Windowed<Bytes> windowedKey = SessionKeySerde.fromBytes(bytes);
                    if (windowedKey.key().equals((Object)binaryKey) && windowedKey.window().end() >= from && windowedKey.window().start() <= to) {
                        return true;
                    }
                    iterator.next();
                }
                return false;
            }
        };
    }

    @Override
    public List<Segment> segmentsToSearch(Segments segments, long from, long to) {
        return segments.segments(from, Long.MAX_VALUE);
    }
}

