/*
 * Decompiled with CFR 0.152.
 */
package com.redis.spring.batch.writer;

import com.redis.spring.batch.common.Utils;
import com.redis.spring.batch.writer.PipelinedOperation;
import com.redis.spring.batch.writer.WaitForReplication;
import io.lettuce.core.RedisCommandExecutionException;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.async.BaseRedisAsyncCommands;
import io.lettuce.core.cluster.PipelinedRedisFuture;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

public class WaitForReplicationOperation<K, V, T>
implements PipelinedOperation<K, V, T> {
    private final PipelinedOperation<K, V, T> delegate;
    private final WaitForReplication options;

    public WaitForReplicationOperation(PipelinedOperation<K, V, T> delegate, WaitForReplication options) {
        this.delegate = delegate;
        this.options = options;
    }

    @Override
    public Collection<RedisFuture<?>> execute(StatefulConnection<K, V> connection, List<? extends T> items) {
        ArrayList futures = new ArrayList();
        futures.addAll(this.delegate.execute(connection, items));
        BaseRedisAsyncCommands commands = (BaseRedisAsyncCommands)Utils.async(connection);
        PipelinedRedisFuture replicationFuture = new PipelinedRedisFuture(commands.waitForReplication(this.options.getReplicas(), this.options.getTimeout().toMillis()).thenAccept(r -> {
            if (r < (long)this.options.getReplicas()) {
                throw new RedisCommandExecutionException(String.format("Insufficient replication level - expected: %s, actual: %s", this.options.getReplicas(), r));
            }
        }));
        futures.add((RedisFuture<?>)replicationFuture);
        return futures;
    }
}

