/*
 * Decompiled with CFR 0.152.
 */
package com.redis.spring.batch.reader;

import com.redis.spring.batch.common.Utils;
import com.redis.spring.batch.reader.PollableItemReader;
import com.redis.spring.batch.reader.StreamReaderOptions;
import io.lettuce.core.Consumer;
import io.lettuce.core.RedisBusyException;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.XGroupCreateArgs;
import io.lettuce.core.XReadArgs;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.sync.RedisStreamCommands;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.util.Assert;

public class StreamItemReader<K, V>
implements PollableItemReader<StreamMessage<K, V>> {
    public static final Duration DEFAULT_POLL_DURATION = Duration.ofSeconds(1L);
    public static final String START_OFFSET = "0-0";
    private final GenericObjectPool<StatefulConnection<K, V>> pool;
    private final K stream;
    private final Consumer<K> consumer;
    private final StreamReaderOptions options;
    private Iterator<StreamMessage<K, V>> iterator = Collections.emptyIterator();
    private boolean open;
    private MessageReader<K, V> reader;
    private String lastId;

    public StreamItemReader(GenericObjectPool<StatefulConnection<K, V>> pool, K stream, Consumer<K> consumer, StreamReaderOptions options) {
        Assert.notNull(pool, (String)"A connection pool is required");
        this.pool = pool;
        this.stream = stream;
        this.consumer = consumer;
        this.options = options;
    }

    private XReadArgs args(long blockMillis) {
        return XReadArgs.Builder.count((long)this.options.getCount()).block(blockMillis);
    }

    private RedisStreamCommands<K, V> commands(StatefulConnection<K, V> connection) {
        return (RedisStreamCommands)Utils.sync(connection);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void open(ExecutionContext executionContext) {
        GenericObjectPool<StatefulConnection<K, V>> genericObjectPool = this.pool;
        synchronized (genericObjectPool) {
            try (StatefulConnection connection = (StatefulConnection)this.pool.borrowObject();){
                RedisStreamCommands commands = (RedisStreamCommands)Utils.sync(connection);
                this.createConsumerGroup(commands);
                this.lastId = this.options.getOffset();
                this.reader = this.options.getAckPolicy() == StreamReaderOptions.AckPolicy.MANUAL ? new ExplicitAckPendingMessageReader() : new AutoAckPendingMessageReader();
                this.open = true;
            }
            catch (Exception e) {
                throw new ItemStreamException("Failed to initialize the reader", (Throwable)e);
            }
        }
    }

    private void createConsumerGroup(RedisStreamCommands<K, V> commands) {
        try {
            commands.xgroupCreate(XReadArgs.StreamOffset.from(this.stream, (String)this.options.getOffset()), this.consumer.getGroup(), XGroupCreateArgs.Builder.mkstream((boolean)true));
        }
        catch (RedisBusyException redisBusyException) {
            // empty catch block
        }
    }

    public boolean isOpen() {
        return this.open;
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
    }

    public StreamMessage<K, V> read() throws Exception {
        return this.poll(DEFAULT_POLL_DURATION.toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public StreamMessage<K, V> poll(long timeout, TimeUnit unit) throws Exception {
        if (!this.iterator.hasNext()) {
            List<StreamMessage<K, V>> messages = this.reader.read(unit.toMillis(timeout));
            if (messages == null || messages.isEmpty()) {
                return null;
            }
            this.iterator = messages.iterator();
        }
        return this.iterator.next();
    }

    public List<StreamMessage<K, V>> readMessages() throws Exception {
        return this.reader.read(this.options.getBlock().toMillis());
    }

    public long ack(Iterable<? extends StreamMessage<K, V>> messages) throws Exception {
        if (messages == null) {
            return 0L;
        }
        ArrayList ids = new ArrayList();
        messages.forEach(m -> ids.add(m.getId()));
        return this.ack((String[])ids.toArray(String[]::new));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long ack(String ... ids) throws Exception {
        if (ids.length > 0) {
            GenericObjectPool<StatefulConnection<K, V>> genericObjectPool = this.pool;
            synchronized (genericObjectPool) {
                try (StatefulConnection connection = (StatefulConnection)this.pool.borrowObject();){
                    this.ack((RedisStreamCommands)Utils.sync(connection), ids);
                }
                this.lastId = ids[ids.length - 1];
            }
        }
        return ids.length;
    }

    private void ack(RedisStreamCommands<K, V> commands, Iterable<StreamMessage<K, V>> messages) {
        ArrayList<String> ids = new ArrayList<String>();
        for (StreamMessage<K, V> message : messages) {
            ids.add(message.getId());
        }
        this.ack(commands, (String[])ids.toArray(String[]::new));
    }

    private void ack(RedisStreamCommands<K, V> commands, String ... ids) {
        if (ids.length == 0) {
            return;
        }
        commands.xack(this.stream, this.consumer.getGroup(), ids);
    }

    public void close() throws ItemStreamException {
        this.open = false;
    }

    private class AutoAckMessageReader
    extends ExplicitAckMessageReader {
        private AutoAckMessageReader() {
        }

        @Override
        public List<StreamMessage<K, V>> read(long blockMillis) throws Exception {
            List messages = super.read(blockMillis);
            StreamItemReader.this.ack(messages);
            return messages;
        }
    }

    private class AutoAckPendingMessageReader
    extends ExplicitAckPendingMessageReader {
        private AutoAckPendingMessageReader() {
        }

        @Override
        protected MessageReader<K, V> messageReader() {
            return new AutoAckMessageReader();
        }

        @Override
        protected List<StreamMessage<K, V>> recover(RedisStreamCommands<K, V> commands, List<StreamMessage<K, V>> messages) {
            StreamItemReader.this.ack(commands, messages);
            return Collections.emptyList();
        }
    }

    private class ExplicitAckMessageReader
    implements MessageReader<K, V> {
        private ExplicitAckMessageReader() {
        }

        @Override
        public List<StreamMessage<K, V>> read(long blockMillis) throws Exception {
            try (StatefulConnection connection = (StatefulConnection)StreamItemReader.this.pool.borrowObject();){
                List list = StreamItemReader.this.commands(connection).xreadgroup(StreamItemReader.this.consumer, StreamItemReader.this.args(blockMillis), new XReadArgs.StreamOffset[]{XReadArgs.StreamOffset.lastConsumed((Object)StreamItemReader.this.stream)});
                return list;
            }
        }
    }

    private class ExplicitAckPendingMessageReader
    implements MessageReader<K, V> {
        private ExplicitAckPendingMessageReader() {
        }

        protected List<StreamMessage<K, V>> readMessages(RedisStreamCommands<K, V> commands, XReadArgs args) {
            return this.recover(commands, commands.xreadgroup(StreamItemReader.this.consumer, args, new XReadArgs.StreamOffset[]{XReadArgs.StreamOffset.from((Object)StreamItemReader.this.stream, (String)StreamItemReader.START_OFFSET)}));
        }

        protected List<StreamMessage<K, V>> recover(RedisStreamCommands<K, V> commands, List<StreamMessage<K, V>> messages) {
            if (messages.isEmpty()) {
                return messages;
            }
            ArrayList recoveredMessages = new ArrayList();
            ArrayList messagesToAck = new ArrayList();
            StreamId recoveryId = StreamId.parse(StreamItemReader.this.lastId);
            for (StreamMessage message : messages) {
                StreamId messageId = StreamId.parse(message.getId());
                if (messageId.compareTo(recoveryId) > 0) {
                    recoveredMessages.add(message);
                    StreamItemReader.this.lastId = message.getId();
                    continue;
                }
                messagesToAck.add(message);
            }
            StreamItemReader.this.ack(commands, messagesToAck);
            return recoveredMessages;
        }

        protected MessageReader<K, V> messageReader() {
            return new ExplicitAckMessageReader();
        }

        @Override
        public List<StreamMessage<K, V>> read(long blockMillis) throws Exception {
            List messages;
            try (StatefulConnection connection = (StatefulConnection)StreamItemReader.this.pool.borrowObject();){
                messages = this.readMessages(StreamItemReader.this.commands(connection), StreamItemReader.this.args(blockMillis));
            }
            if (messages.isEmpty()) {
                StreamItemReader.this.reader = this.messageReader();
                return StreamItemReader.this.reader.read(blockMillis);
            }
            return messages;
        }
    }

    private static interface MessageReader<K, V> {
        public List<StreamMessage<K, V>> read(long var1) throws Exception;
    }

    public static class StreamId
    implements Comparable<StreamId> {
        public static final StreamId ZERO = StreamId.of(0L, 0L);
        private final long millis;
        private final long sequence;

        public StreamId(long millis, long sequence) {
            this.millis = millis;
            this.sequence = sequence;
        }

        private static void checkPositive(String id, long number) {
            if (number < 0L) {
                throw new IllegalArgumentException(String.format("not an id: %s", id));
            }
        }

        public static StreamId parse(String id) {
            int off = id.indexOf("-");
            if (off == -1) {
                long millis = Long.parseLong(id);
                StreamId.checkPositive(id, millis);
                return StreamId.of(millis, 0L);
            }
            long millis = Long.parseLong(id.substring(0, off));
            StreamId.checkPositive(id, millis);
            long sequence = Long.parseLong(id.substring(off + 1));
            StreamId.checkPositive(id, sequence);
            return StreamId.of(millis, sequence);
        }

        public static StreamId of(long millis, long sequence) {
            return new StreamId(millis, sequence);
        }

        public String toStreamId() {
            return this.millis + "-" + this.sequence;
        }

        public String toString() {
            return this.toStreamId();
        }

        @Override
        public int compareTo(StreamId o) {
            long diff = this.millis - o.millis;
            if (diff != 0L) {
                return Long.signum(diff);
            }
            return Long.signum(this.sequence - o.sequence);
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof StreamId)) {
                return false;
            }
            StreamId o = (StreamId)obj;
            return o.millis == this.millis && o.sequence == this.sequence;
        }

        public int hashCode() {
            long val = this.millis * 31L * this.sequence;
            return (int)(val ^ val >> 32);
        }
    }
}

