/*
 * Decompiled with CFR 0.152.
 */
package com.redis.spring.batch.reader;

import com.redis.spring.batch.reader.PollableItemReader;
import com.redis.spring.batch.util.ConnectionUtils;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.Consumer;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.RedisBusyException;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.XGroupCreateArgs;
import io.lettuce.core.XReadArgs;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.sync.RedisStreamCommands;
import io.lettuce.core.codec.RedisCodec;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.util.ClassUtils;

public class StreamItemReader<K, V>
extends AbstractItemStreamItemReader<StreamMessage<K, V>>
implements PollableItemReader<StreamMessage<K, V>> {
    public static final Duration DEFAULT_POLL_DURATION = Duration.ofSeconds(1L);
    public static final String DEFAULT_OFFSET = "0-0";
    public static final Duration DEFAULT_BLOCK = Duration.ofMillis(100L);
    public static final long DEFAULT_COUNT = 50L;
    public static final StreamAckPolicy DEFAULT_ACK_POLICY = StreamAckPolicy.AUTO;
    private final AbstractRedisClient client;
    private final RedisCodec<K, V> codec;
    private final K stream;
    private final Consumer<K> consumer;
    private String offset = "0-0";
    private Duration block = DEFAULT_BLOCK;
    private long count = 50L;
    private StreamAckPolicy ackPolicy = DEFAULT_ACK_POLICY;
    private StatefulConnection<K, V> connection;
    private Iterator<StreamMessage<K, V>> iterator = Collections.emptyIterator();
    private MessageReader<K, V> messageReader;
    private String lastId;
    private RedisStreamCommands<K, V> commands;
    private ReadFrom readFrom;

    public StreamItemReader(AbstractRedisClient client, RedisCodec<K, V> codec, K stream, Consumer<K> consumer) {
        this.setName(ClassUtils.getShortName(this.getClass()));
        this.client = client;
        this.codec = codec;
        this.stream = stream;
        this.consumer = consumer;
    }

    public ReadFrom getReadFrom() {
        return this.readFrom;
    }

    public void setReadFrom(ReadFrom readFrom) {
        this.readFrom = readFrom;
    }

    public String getOffset() {
        return this.offset;
    }

    public void setOffset(String offset) {
        this.offset = offset;
    }

    public Duration getBlock() {
        return this.block;
    }

    public void setBlock(Duration block) {
        this.block = block;
    }

    public long getCount() {
        return this.count;
    }

    public void setCount(long count) {
        this.count = count;
    }

    public StreamAckPolicy getAckPolicy() {
        return this.ackPolicy;
    }

    public void setAckPolicy(StreamAckPolicy policy) {
        this.ackPolicy = policy;
    }

    private XReadArgs args(long blockMillis) {
        return XReadArgs.Builder.count((long)this.count).block(blockMillis);
    }

    public synchronized void open(ExecutionContext executionContext) {
        super.open(executionContext);
        if (!this.isOpen()) {
            this.doOpen();
        }
    }

    private void doOpen() {
        this.connection = ConnectionUtils.supplier(this.client, this.codec, this.readFrom).get();
        this.commands = (RedisStreamCommands)ConnectionUtils.sync(this.connection);
        XReadArgs.StreamOffset streamOffset = XReadArgs.StreamOffset.from(this.stream, (String)this.offset);
        XGroupCreateArgs args = XGroupCreateArgs.Builder.mkstream((boolean)true);
        try {
            this.commands.xgroupCreate(streamOffset, this.consumer.getGroup(), args);
        }
        catch (RedisBusyException redisBusyException) {
            // empty catch block
        }
        this.lastId = this.offset;
        this.messageReader = this.reader();
    }

    public boolean isOpen() {
        return this.messageReader != null;
    }

    public synchronized void close() {
        if (this.isOpen()) {
            this.doClose();
        }
        super.close();
    }

    private void doClose() {
        this.messageReader = null;
        this.lastId = null;
        this.connection.close();
        this.connection = null;
        this.commands = null;
    }

    private MessageReader<K, V> reader() {
        if (this.ackPolicy == StreamAckPolicy.MANUAL) {
            return new ExplicitAckPendingMessageReader();
        }
        return new AutoAckPendingMessageReader();
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
    }

    public StreamMessage<K, V> read() {
        return this.poll(DEFAULT_POLL_DURATION.toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public synchronized StreamMessage<K, V> poll(long timeout, TimeUnit unit) {
        if (!this.iterator.hasNext()) {
            List<StreamMessage<K, V>> messages = this.messageReader.read(unit.toMillis(timeout));
            if (messages == null || messages.isEmpty()) {
                return null;
            }
            this.iterator = messages.iterator();
        }
        return this.iterator.next();
    }

    public List<StreamMessage<K, V>> readMessages() {
        return this.messageReader.read(this.block.toMillis());
    }

    public Long ack(Iterable<? extends StreamMessage<K, V>> messages) {
        if (messages == null) {
            return 0L;
        }
        Stream<String> ids = StreamSupport.stream(messages.spliterator(), false).map(StreamMessage::getId);
        return this.doAck((String[])ids.toArray(String[]::new));
    }

    public Long ack(String ... ids) {
        if (ids.length == 0) {
            return 0L;
        }
        this.lastId = ids[ids.length - 1];
        return this.doAck(ids);
    }

    private Long doAck(String ... ids) {
        if (ids.length == 0) {
            return 0L;
        }
        return this.commands.xack(this.stream, this.consumer.getGroup(), ids);
    }

    public long streamLength() {
        return this.commands.xlen(this.stream);
    }

    private static interface MessageReader<K, V> {
        public List<StreamMessage<K, V>> read(long var1);
    }

    public static enum StreamAckPolicy {
        AUTO,
        MANUAL;

    }

    private class ExplicitAckPendingMessageReader
    implements MessageReader<K, V> {
        private ExplicitAckPendingMessageReader() {
        }

        protected List<StreamMessage<K, V>> readMessages(XReadArgs args) {
            return this.recover(StreamItemReader.this.commands.xreadgroup(StreamItemReader.this.consumer, args, new XReadArgs.StreamOffset[]{XReadArgs.StreamOffset.from((Object)StreamItemReader.this.stream, (String)StreamItemReader.DEFAULT_OFFSET)}));
        }

        protected List<StreamMessage<K, V>> recover(List<StreamMessage<K, V>> messages) {
            if (messages.isEmpty()) {
                return messages;
            }
            ArrayList recoveredMessages = new ArrayList();
            ArrayList messagesToAck = new ArrayList();
            StreamId recoveryId = StreamId.parse(StreamItemReader.this.lastId);
            for (StreamMessage message : messages) {
                StreamId messageId = StreamId.parse(message.getId());
                if (messageId.compareTo(recoveryId) > 0) {
                    recoveredMessages.add(message);
                    StreamItemReader.this.lastId = message.getId();
                    continue;
                }
                messagesToAck.add(message);
            }
            StreamItemReader.this.ack(messagesToAck);
            return recoveredMessages;
        }

        protected MessageReader<K, V> messageReader() {
            return new ExplicitAckMessageReader();
        }

        @Override
        public List<StreamMessage<K, V>> read(long blockMillis) {
            List messages = this.readMessages(StreamItemReader.this.args(blockMillis));
            if (messages.isEmpty()) {
                StreamItemReader.this.messageReader = this.messageReader();
                return StreamItemReader.this.messageReader.read(blockMillis);
            }
            return messages;
        }
    }

    private class AutoAckPendingMessageReader
    extends ExplicitAckPendingMessageReader {
        private AutoAckPendingMessageReader() {
        }

        @Override
        protected MessageReader<K, V> messageReader() {
            return new AutoAckMessageReader();
        }

        @Override
        protected List<StreamMessage<K, V>> recover(List<StreamMessage<K, V>> messages) {
            StreamItemReader.this.ack(messages);
            return Collections.emptyList();
        }
    }

    private class AutoAckMessageReader
    extends ExplicitAckMessageReader {
        private AutoAckMessageReader() {
        }

        @Override
        public List<StreamMessage<K, V>> read(long blockMillis) {
            List messages = super.read(blockMillis);
            StreamItemReader.this.ack(messages);
            return messages;
        }
    }

    private class ExplicitAckMessageReader
    implements MessageReader<K, V> {
        private ExplicitAckMessageReader() {
        }

        @Override
        public List<StreamMessage<K, V>> read(long blockMillis) {
            return StreamItemReader.this.commands.xreadgroup(StreamItemReader.this.consumer, StreamItemReader.this.args(blockMillis), new XReadArgs.StreamOffset[]{XReadArgs.StreamOffset.lastConsumed((Object)StreamItemReader.this.stream)});
        }
    }

    public static class StreamId
    implements Comparable<StreamId> {
        public static final StreamId ZERO = StreamId.of(0L, 0L);
        private final long millis;
        private final long sequence;

        public StreamId(long millis, long sequence) {
            this.millis = millis;
            this.sequence = sequence;
        }

        private static void checkPositive(String id, long number) {
            if (number < 0L) {
                throw new IllegalArgumentException(String.format("not an id: %s", id));
            }
        }

        public static StreamId parse(String id) {
            int off = id.indexOf("-");
            if (off == -1) {
                long millis = Long.parseLong(id);
                StreamId.checkPositive(id, millis);
                return StreamId.of(millis, 0L);
            }
            long millis = Long.parseLong(id.substring(0, off));
            StreamId.checkPositive(id, millis);
            long sequence = Long.parseLong(id.substring(off + 1));
            StreamId.checkPositive(id, sequence);
            return StreamId.of(millis, sequence);
        }

        public static StreamId of(long millis, long sequence) {
            return new StreamId(millis, sequence);
        }

        public String toStreamId() {
            return this.millis + "-" + this.sequence;
        }

        public String toString() {
            return this.toStreamId();
        }

        @Override
        public int compareTo(StreamId o) {
            long diff = this.millis - o.millis;
            if (diff != 0L) {
                return Long.signum(diff);
            }
            return Long.signum(this.sequence - o.sequence);
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof StreamId)) {
                return false;
            }
            StreamId o = (StreamId)obj;
            return o.millis == this.millis && o.sequence == this.sequence;
        }

        public int hashCode() {
            long val = this.millis * 31L * this.sequence;
            return (int)(val ^ val >> 32);
        }
    }
}

