/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.aerospike.core;

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.Operation;
import com.aerospike.client.Value;
import com.aerospike.client.policy.RecordExistsAction;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.query.KeyRecord;
import com.aerospike.client.reactor.AerospikeReactorClient;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.data.aerospike.convert.AerospikeWriteData;
import org.springframework.data.aerospike.convert.MappingAerospikeConverter;
import org.springframework.data.aerospike.core.AerospikeExceptionTranslator;
import org.springframework.data.aerospike.core.BaseAerospikeTemplate;
import org.springframework.data.aerospike.core.OperationUtils;
import org.springframework.data.aerospike.core.ReactiveAerospikeOperations;
import org.springframework.data.aerospike.core.WritePolicyBuilder;
import org.springframework.data.aerospike.mapping.AerospikeMappingContext;
import org.springframework.data.aerospike.mapping.AerospikePersistentEntity;
import org.springframework.data.aerospike.repository.query.Query;
import org.springframework.data.domain.Sort;
import org.springframework.data.mapping.model.ConvertingPropertyAccessor;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ReactiveAerospikeTemplate
extends BaseAerospikeTemplate
implements ReactiveAerospikeOperations {
    private static final Logger log = LoggerFactory.getLogger(ReactiveAerospikeTemplate.class);
    private final AerospikeReactorClient reactorClient;

    public ReactiveAerospikeTemplate(AerospikeClient client, String namespace, MappingAerospikeConverter converter, AerospikeMappingContext mappingContext, AerospikeExceptionTranslator exceptionTranslator, AerospikeReactorClient reactorClient) {
        super(client, namespace, converter, mappingContext, exceptionTranslator);
        Assert.notNull((Object)reactorClient, (String)"Aerospike reactor client must not be null!");
        this.reactorClient = reactorClient;
    }

    @Override
    public <T> Mono<T> save(T document) {
        Assert.notNull(document, (String)"Object to save must not be null!");
        AerospikePersistentEntity entity = (AerospikePersistentEntity)this.mappingContext.getRequiredPersistentEntity(document.getClass());
        if (entity.hasVersionProperty()) {
            return this.doPersistWithCas(document, entity);
        }
        return this.doPersist(document, this.createWritePolicyBuilder(RecordExistsAction.REPLACE));
    }

    @Override
    public <T> Flux<T> insertAll(Collection<? extends T> documents) {
        return Flux.fromIterable(documents).flatMap(this::insert);
    }

    @Override
    public <T> Mono<T> insert(T document) {
        Assert.notNull(document, (String)"Document must not be null!");
        return this.doPersist(document, this.createWritePolicyBuilder(RecordExistsAction.CREATE_ONLY));
    }

    @Override
    public <T> Mono<T> update(T document) {
        Assert.notNull(document, (String)"Document must not be null!");
        return this.doPersist(document, this.createWritePolicyBuilder(RecordExistsAction.UPDATE_ONLY));
    }

    @Override
    public <T> Flux<T> findAll(Class<T> type) {
        Stream<T> results = this.findAllUsingQuery(type, null, null);
        return Flux.fromStream(results);
    }

    @Override
    public <T> Mono<T> add(T objectToAddTo, Map<String, Long> values) {
        Assert.notNull(objectToAddTo, (String)"Object to add to must not be null!");
        Assert.notNull(values, (String)"Values must not be null!");
        AerospikeWriteData data = this.writeData(objectToAddTo);
        Operation[] operations = new Operation[values.size() + 1];
        int x = 0;
        for (Map.Entry<String, Long> entry : values.entrySet()) {
            operations[x] = new Operation(Operation.Type.ADD, entry.getKey(), Value.get((Object)entry.getValue()));
            ++x;
        }
        operations[x] = Operation.get();
        WritePolicy writePolicy = new WritePolicy(this.client.writePolicyDefault);
        writePolicy.expiration = data.getExpiration();
        return this.executeOperationsOnValue(objectToAddTo, data, operations, writePolicy);
    }

    @Override
    public <T> Mono<T> add(T objectToAddTo, String binName, long value) {
        Assert.notNull(objectToAddTo, (String)"Object to add to must not be null!");
        Assert.notNull((Object)binName, (String)"Bin name must not be null!");
        AerospikeWriteData data = this.writeData(objectToAddTo);
        WritePolicy writePolicy = new WritePolicy(this.client.writePolicyDefault);
        writePolicy.expiration = data.getExpiration();
        Operation[] operations = new Operation[]{Operation.add((Bin)new Bin(binName, value)), Operation.get((String)binName)};
        return this.executeOperationsOnValue(objectToAddTo, data, operations, writePolicy);
    }

    @Override
    public <T> Mono<T> append(T objectToAppendTo, Map<String, String> values) {
        Assert.notNull(objectToAppendTo, (String)"Object to append to must not be null!");
        AerospikeWriteData data = this.writeData(objectToAppendTo);
        Operation[] operations = this.getOperations(values, Operation.Type.APPEND);
        return this.executeOperationsOnValue(objectToAppendTo, data, operations, null);
    }

    @Override
    public <T> Mono<T> append(T objectToAppendTo, String binName, String value) {
        Assert.notNull(objectToAppendTo, (String)"Object to append to must not be null!");
        AerospikeWriteData data = this.writeData(objectToAppendTo);
        Operation[] operations = new Operation[]{Operation.append((Bin)new Bin(binName, value)), Operation.get((String)binName)};
        return this.executeOperationsOnValue(objectToAppendTo, data, operations, null);
    }

    @Override
    public <T> Mono<T> prepend(T objectToPrependTo, Map<String, String> values) {
        Assert.notNull(objectToPrependTo, (String)"Object to prepend to must not be null!");
        AerospikeWriteData data = this.writeData(objectToPrependTo);
        Operation[] operations = this.getOperations(values, Operation.Type.PREPEND);
        return this.executeOperationsOnValue(objectToPrependTo, data, operations, null);
    }

    @Override
    public <T> Mono<T> prepend(T objectToPrependTo, String binName, String value) {
        Assert.notNull(objectToPrependTo, (String)"Object to prepend to must not be null!");
        AerospikeWriteData data = this.writeData(objectToPrependTo);
        Operation[] operations = new Operation[]{Operation.prepend((Bin)new Bin(binName, value)), Operation.get((String)binName)};
        return this.executeOperationsOnValue(objectToPrependTo, data, operations, null);
    }

    private Operation[] getOperations(Map<String, String> values, Operation.Type operationType) {
        Operation[] operations = new Operation[values.size() + 1];
        int x = 0;
        for (Map.Entry<String, String> entry : values.entrySet()) {
            operations[x] = new Operation(operationType, entry.getKey(), Value.get((String)entry.getValue()));
            ++x;
        }
        operations[x] = Operation.get();
        return operations;
    }

    private <T> Mono<T> executeOperationsOnValue(T value, AerospikeWriteData data, Operation[] operations, WritePolicy writePolicy) {
        return this.reactorClient.operate(writePolicy, data.getKey(), operations).filter(keyRecord -> Objects.nonNull(keyRecord.record)).map(keyRecord -> this.mapToEntity(keyRecord.key, value.getClass(), keyRecord.record)).onErrorMap(this::translateError);
    }

    @Override
    public <T> Mono<T> findById(Object id, Class<T> type) {
        Key key = this.getKey(id, type);
        AerospikePersistentEntity entity = (AerospikePersistentEntity)this.mappingContext.getRequiredPersistentEntity(type);
        if (entity.isTouchOnRead()) {
            Assert.state((!entity.hasExpirationProperty() ? 1 : 0) != 0, (String)"Touch on read is not supported for entity without expiration property");
            return this.getAndTouch(key, entity.getExpiration()).filter(keyRecord -> Objects.nonNull(keyRecord.record)).map(keyRecord -> this.mapToEntity(keyRecord.key, type, keyRecord.record)).onErrorResume(th -> th instanceof AerospikeException && ((AerospikeException)th).getResultCode() == 2, th -> Mono.empty()).onErrorMap(this::translateError);
        }
        return this.reactorClient.get(key).filter(keyRecord -> Objects.nonNull(keyRecord.record)).map(keyRecord -> this.mapToEntity(keyRecord.key, type, keyRecord.record)).onErrorMap(this::translateError);
    }

    @Override
    public <T> Flux<T> findByIds(Iterable<?> ids, Class<T> type) {
        Assert.notNull(ids, (String)"List of ids must not be null!");
        Assert.notNull(type, (String)"Type must not be null!");
        AerospikePersistentEntity entity = (AerospikePersistentEntity)this.mappingContext.getRequiredPersistentEntity(type);
        return Flux.fromIterable(ids).map(id -> this.getKey(id, entity)).flatMap(arg_0 -> ((AerospikeReactorClient)this.reactorClient).get(arg_0)).filter(keyRecord -> Objects.nonNull(keyRecord.record)).map(keyRecord -> this.mapToEntity(keyRecord.key, type, keyRecord.record));
    }

    @Override
    public <T> Flux<T> find(Query query, Class<T> type) {
        Assert.notNull((Object)query, (String)"Query must not be null!");
        Assert.notNull(type, (String)"Type must not be null!");
        Stream<T> results = this.findAllUsingQuery(type, query);
        return Flux.fromStream(results);
    }

    @Override
    public <T> Flux<T> findInRange(long offset, long limit, Sort sort, Class<T> type) {
        Assert.notNull(type, (String)"Type for count must not be null!");
        Stream<T> results = this.findAllUsingQuery(type, null, null).skip(offset).limit(limit);
        return Flux.fromStream(results);
    }

    @Override
    public <T> Mono<Long> count(Query query, Class<T> type) {
        Stream<KeyRecord> results = this.findAllRecordsUsingQuery(type, query);
        return Flux.fromStream(results).count();
    }

    @Override
    public <T> Mono<T> execute(Supplier<T> supplier) {
        Assert.notNull(supplier, (String)"Callback must not be null!");
        return Mono.fromSupplier(supplier).onErrorMap(this::translateError);
    }

    @Override
    public Mono<Boolean> exists(Object id, Class<?> type) {
        Key key = this.getKey(id, type);
        return this.reactorClient.exists(key).map(Objects::nonNull).defaultIfEmpty((Object)false).onErrorMap(this::translateError);
    }

    @Override
    public Mono<Boolean> delete(Object id, Class<?> type) {
        Assert.notNull((Object)id, (String)"Id must not be null!");
        Assert.notNull(type, (String)"Type must not be null!");
        AerospikePersistentEntity entity = (AerospikePersistentEntity)this.mappingContext.getRequiredPersistentEntity(type);
        return this.reactorClient.delete(null, this.getKey(id, entity)).map(k -> true).onErrorMap(this::translateError);
    }

    @Override
    public Mono<Boolean> delete(Object objectToDelete) {
        Assert.notNull((Object)objectToDelete, (String)"Object to delete must not be null!");
        AerospikeWriteData data = this.writeData(objectToDelete);
        return this.reactorClient.delete(null, data.getKey()).map(key -> true).onErrorMap(this::translateError);
    }

    private <T> Mono<T> doPersist(T document, WritePolicyBuilder policyBuilder) {
        AerospikeWriteData data = this.writeData(document);
        WritePolicy policy = policyBuilder.expiration(data.getExpiration()).build();
        return this.reactorClient.put(policy, data.getKey(), data.getBinsAsArray()).map(docKey -> document).onErrorMap(this::translateError);
    }

    private <T> Mono<T> doPersistWithCas(T document, AerospikePersistentEntity<?> entity) {
        AerospikeWriteData data = this.writeData(document);
        ConvertingPropertyAccessor accessor = this.getPropertyAccessor(entity, document);
        WritePolicy policy = this.getCasAwareWritePolicy(data, entity, accessor);
        Operation[] operations = OperationUtils.operations(data.getBinsAsArray(), Operation::put, Operation.getHeader());
        return this.reactorClient.operate(policy, data.getKey(), operations).map(newKeyRecord -> {
            accessor.setProperty(entity.getVersionProperty(), (Object)newKeyRecord.record.generation);
            return document;
        }).onErrorMap(AerospikeException.class, e -> {
            if (Arrays.asList(5, 3).contains(e.getResultCode())) {
                throw new OptimisticLockingFailureException("Save document with version value failed", (Throwable)e);
            }
            return this.translateError((Throwable)e);
        }).onErrorMap(this::translateError);
    }

    private Mono<KeyRecord> getAndTouch(Key key, int expiration) {
        WritePolicy policy = new WritePolicy(this.client.writePolicyDefault);
        policy.expiration = expiration;
        return this.reactorClient.operate(policy, key, new Operation[]{Operation.touch(), Operation.get()});
    }

    private WritePolicyBuilder createWritePolicyBuilder(RecordExistsAction recordExistsAction) {
        return WritePolicyBuilder.builder(this.client.writePolicyDefault).sendKey(true).recordExistsAction(recordExistsAction);
    }

    private <T> AerospikeWriteData writeData(T document) {
        AerospikeWriteData data = AerospikeWriteData.forWrite();
        this.converter.write(document, data);
        return data;
    }

    private Throwable translateError(Throwable e) {
        if (e instanceof AerospikeException) {
            DataAccessException translated = this.exceptionTranslator.translateExceptionIfPossible((RuntimeException)((AerospikeException)e));
            return translated == null ? e : translated;
        }
        return e;
    }
}

