/*
 * Decompiled with CFR 0.152.
 */
package com.redis.spring.batch.common;

import com.redis.spring.batch.common.BatchOperation;
import com.redis.spring.batch.util.ConnectionUtils;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.RedisCommandInterruptedException;
import io.lettuce.core.RedisConnectionException;
import io.lettuce.core.RedisException;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.async.BaseRedisAsyncCommands;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.internal.Exceptions;
import io.lettuce.core.support.ConnectionPoolSupport;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemStreamSupport;
import org.springframework.util.ClassUtils;

public abstract class AbstractOperationExecutor<K, V, I, O>
extends ItemStreamSupport
implements ItemProcessor<List<I>, List<O>> {
    public static final int DEFAULT_POOL_SIZE = 8;
    private final Log log = LogFactory.getLog(((Object)((Object)this)).getClass());
    protected final AbstractRedisClient client;
    private final RedisCodec<K, V> codec;
    private ReadFrom readFrom;
    private int poolSize = 8;
    private GenericObjectPool<StatefulConnection<K, V>> pool;
    private BatchOperation<K, V, I, O> batchOperation;
    private String name;

    protected AbstractOperationExecutor(AbstractRedisClient client, RedisCodec<K, V> codec) {
        this.setName(ClassUtils.getShortName(((Object)((Object)this)).getClass()));
        this.client = client;
        this.codec = codec;
    }

    public void setPoolSize(int poolSize) {
        this.poolSize = poolSize;
    }

    public void setReadFrom(ReadFrom readFrom) {
        this.readFrom = readFrom;
    }

    public void setName(String name) {
        super.setName(name);
        this.name = name;
    }

    public synchronized void open(ExecutionContext executionContext) {
        super.open(executionContext);
        if (!this.isOpen()) {
            this.log.debug((Object)String.format("Opening %s", this.name));
            Supplier<StatefulConnection<K, V>> connectionSupplier = ConnectionUtils.supplier(this.client, this.codec, this.readFrom);
            GenericObjectPoolConfig config = new GenericObjectPoolConfig();
            config.setMaxTotal(this.poolSize);
            this.batchOperation = this.batchOperation();
            this.pool = ConnectionPoolSupport.createGenericObjectPool(connectionSupplier, (GenericObjectPoolConfig)config);
            this.log.debug((Object)String.format("Opened %s", this.name));
        }
    }

    public synchronized boolean isOpen() {
        return this.pool != null;
    }

    protected abstract BatchOperation<K, V, I, O> batchOperation();

    public synchronized void close() {
        if (this.isOpen()) {
            this.log.debug((Object)String.format("Closing %s", this.name));
            this.pool.close();
            this.pool = null;
            this.log.debug((Object)String.format("Closed %s", this.name));
        }
        super.close();
    }

    public List<O> process(List<I> items) throws RedisException {
        StatefulConnection connection;
        try {
            connection = (StatefulConnection)this.pool.borrowObject();
        }
        catch (Exception e) {
            throw new RedisConnectionException("Could not get connection from pool", (Throwable)e);
        }
        try {
            connection.setAutoFlushCommands(false);
            BaseRedisAsyncCommands commands = (BaseRedisAsyncCommands)ConnectionUtils.async(connection);
            List futures = this.batchOperation.execute(commands, items);
            connection.flushCommands();
            List list = AbstractOperationExecutor.getAll(connection.getTimeout(), futures);
            return list;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RedisCommandInterruptedException((Throwable)e);
        }
        catch (Exception e) {
            throw Exceptions.fromSynchronization((Throwable)e);
        }
        finally {
            connection.setAutoFlushCommands(true);
            connection.close();
        }
    }

    public static <T> List<T> getAll(Duration timeout, List<RedisFuture<T>> futures) throws TimeoutException, InterruptedException, ExecutionException {
        ArrayList<Object> results = new ArrayList<Object>(futures.size());
        long nanos = timeout.toNanos();
        long time = System.nanoTime();
        for (RedisFuture<T> f : futures) {
            if (timeout.isNegative()) {
                results.add(f.get());
                continue;
            }
            if (nanos < 0L) {
                throw new TimeoutException(String.format("Timed out after %s", timeout));
            }
            results.add(f.get(nanos, TimeUnit.NANOSECONDS));
            long now = System.nanoTime();
            nanos -= now - time;
            time = now;
        }
        return results;
    }
}

