/*
 * Decompiled with CFR 0.152.
 */
package com.aerospike.client.reactor;

import com.aerospike.client.AbortStatus;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.BatchRead;
import com.aerospike.client.BatchRecord;
import com.aerospike.client.BatchResults;
import com.aerospike.client.Bin;
import com.aerospike.client.CommitStatus;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Key;
import com.aerospike.client.Operation;
import com.aerospike.client.Txn;
import com.aerospike.client.Value;
import com.aerospike.client.async.AsyncIndexTask;
import com.aerospike.client.cdt.CTX;
import com.aerospike.client.cluster.Node;
import com.aerospike.client.listener.AbortListener;
import com.aerospike.client.listener.BatchListListener;
import com.aerospike.client.listener.BatchOperateListListener;
import com.aerospike.client.listener.BatchRecordArrayListener;
import com.aerospike.client.listener.BatchSequenceListener;
import com.aerospike.client.listener.CommitListener;
import com.aerospike.client.listener.DeleteListener;
import com.aerospike.client.listener.ExecuteListener;
import com.aerospike.client.listener.ExistsArrayListener;
import com.aerospike.client.listener.ExistsListener;
import com.aerospike.client.listener.ExistsSequenceListener;
import com.aerospike.client.listener.IndexListener;
import com.aerospike.client.listener.InfoListener;
import com.aerospike.client.listener.RecordArrayListener;
import com.aerospike.client.listener.RecordListener;
import com.aerospike.client.listener.RecordSequenceListener;
import com.aerospike.client.listener.TaskStatusListener;
import com.aerospike.client.listener.WriteListener;
import com.aerospike.client.policy.BatchDeletePolicy;
import com.aerospike.client.policy.BatchPolicy;
import com.aerospike.client.policy.BatchWritePolicy;
import com.aerospike.client.policy.InfoPolicy;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.policy.ScanPolicy;
import com.aerospike.client.policy.TxnRollPolicy;
import com.aerospike.client.policy.TxnVerifyPolicy;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.query.IndexCollectionType;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.query.KeyRecord;
import com.aerospike.client.query.Statement;
import com.aerospike.client.reactor.IAerospikeReactorClient;
import com.aerospike.client.reactor.dto.KeyExists;
import com.aerospike.client.reactor.dto.KeyObject;
import com.aerospike.client.reactor.dto.KeysExists;
import com.aerospike.client.reactor.dto.KeysRecords;
import com.aerospike.client.reactor.listeners.ReactorAbortListener;
import com.aerospike.client.reactor.listeners.ReactorBatchListListener;
import com.aerospike.client.reactor.listeners.ReactorBatchOperateListListener;
import com.aerospike.client.reactor.listeners.ReactorBatchRecordArrayListener;
import com.aerospike.client.reactor.listeners.ReactorBatchSequenceListener;
import com.aerospike.client.reactor.listeners.ReactorCommitListener;
import com.aerospike.client.reactor.listeners.ReactorDeleteListener;
import com.aerospike.client.reactor.listeners.ReactorExecuteListener;
import com.aerospike.client.reactor.listeners.ReactorExistsArrayListener;
import com.aerospike.client.reactor.listeners.ReactorExistsListener;
import com.aerospike.client.reactor.listeners.ReactorExistsSequenceListener;
import com.aerospike.client.reactor.listeners.ReactorIndexListener;
import com.aerospike.client.reactor.listeners.ReactorInfoListener;
import com.aerospike.client.reactor.listeners.ReactorRecordArrayListener;
import com.aerospike.client.reactor.listeners.ReactorRecordListener;
import com.aerospike.client.reactor.listeners.ReactorRecordSequenceListener;
import com.aerospike.client.reactor.listeners.ReactorTaskStatusListener;
import com.aerospike.client.reactor.listeners.ReactorWriteListener;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

public class AerospikeReactorClient
implements IAerospikeReactorClient {
    private final IAerospikeClient aerospikeClient;

    public AerospikeReactorClient(IAerospikeClient aerospikeClient) {
        this.aerospikeClient = aerospikeClient;
    }

    @Override
    public void close() {
        this.aerospikeClient.close();
    }

    @Override
    public final Mono<KeyRecord> get(Key key) throws AerospikeException {
        return this.get(null, key);
    }

    @Override
    public final Mono<KeyRecord> get(Policy policy, Key key) throws AerospikeException {
        return this.get(policy, key, null);
    }

    @Override
    public final Mono<KeyRecord> get(Policy policy, Key key, String[] binNames) throws AerospikeException {
        return Mono.create(sink -> this.aerospikeClient.get(null, (RecordListener)new ReactorRecordListener((MonoSink<KeyRecord>)sink), policy, key, binNames));
    }

    @Override
    public final Mono<KeysRecords> get(Key[] keys) throws AerospikeException {
        return this.get(null, keys);
    }

    @Override
    public final Mono<KeysRecords> get(BatchPolicy policy, Key[] keys) throws AerospikeException {
        return Mono.create(sink -> this.aerospikeClient.get(null, (RecordArrayListener)new ReactorRecordArrayListener((MonoSink<KeysRecords>)sink), policy, keys));
    }

    @Override
    public final Mono<List<BatchRead>> get(List<BatchRead> records) throws AerospikeException {
        return this.get(null, records);
    }

    @Override
    public final Mono<List<BatchRead>> get(BatchPolicy policy, List<BatchRead> records) throws AerospikeException {
        return Mono.create(sink -> this.aerospikeClient.get(null, (BatchListListener)new ReactorBatchListListener((MonoSink<List<BatchRead>>)sink), policy, records));
    }

    @Override
    public final Mono<KeysRecords> get(Key[] keys, Operation ... operations) throws AerospikeException {
        return this.get(null, keys, operations);
    }

    @Override
    public final Mono<KeysRecords> get(BatchPolicy policy, Key[] keys, Operation ... operations) throws AerospikeException {
        return Mono.create(sink -> this.aerospikeClient.get(null, (RecordArrayListener)new ReactorRecordArrayListener((MonoSink<KeysRecords>)sink), policy, keys, operations));
    }

    @Override
    public final Flux<BatchRead> getFlux(List<BatchRead> records) throws AerospikeException {
        return this.getFlux(null, records);
    }

    @Override
    public final Flux<BatchRead> getFlux(BatchPolicy policy, List<BatchRead> records) throws AerospikeException {
        return Flux.create(sink -> this.aerospikeClient.get(null, (BatchSequenceListener)new ReactorBatchSequenceListener((FluxSink<BatchRead>)sink), policy, records));
    }

    @Override
    public final Flux<KeyRecord> getFlux(Key[] keys) throws AerospikeException {
        return this.getFlux(null, keys);
    }

    @Override
    public final Flux<KeyRecord> getFlux(BatchPolicy policy, Key[] keys) throws AerospikeException {
        return Flux.create(sink -> this.aerospikeClient.get(null, (RecordSequenceListener)new ReactorRecordSequenceListener((FluxSink<KeyRecord>)sink), policy, keys));
    }

    @Override
    public final Flux<KeyRecord> getFlux(Key[] keys, Operation ... operations) throws AerospikeException {
        return this.getFlux(null, keys, operations);
    }

    @Override
    public final Flux<KeyRecord> getFlux(BatchPolicy policy, Key[] keys, Operation ... operations) throws AerospikeException {
        return Flux.create(sink -> this.aerospikeClient.get(null, (RecordSequenceListener)new ReactorRecordSequenceListener((FluxSink<KeyRecord>)sink), policy, keys, operations));
    }

    @Override
    public final Mono<KeyRecord> getHeader(Key key) throws AerospikeException {
        return this.getHeader(null, key);
    }

    @Override
    public final Mono<KeyRecord> getHeader(Policy policy, Key key) throws AerospikeException {
        return Mono.create(sink -> this.aerospikeClient.getHeader(null, (RecordListener)new ReactorRecordListener((MonoSink<KeyRecord>)sink), policy, key));
    }

    @Override
    public final Mono<KeysRecords> getHeaders(Key[] keys) throws AerospikeException {
        return this.getHeaders(null, keys);
    }

    @Override
    public final Mono<KeysRecords> getHeaders(BatchPolicy policy, Key[] keys) throws AerospikeException {
        return Mono.create(sink -> this.aerospikeClient.getHeader(null, (RecordArrayListener)new ReactorRecordArrayListener((MonoSink<KeysRecords>)sink), policy, keys));
    }

    @Override
    public final Mono<Key> touch(Key key) throws AerospikeException {
        return this.touch(null, key);
    }

    @Override
    public final Mono<Key> touch(WritePolicy policy, Key key) throws AerospikeException {
        return Mono.create(sink -> this.aerospikeClient.touch(null, (WriteListener)new ReactorWriteListener((MonoSink<Key>)sink), policy, key));
    }

    @Override
    public final Mono<Key> exists(Key key) throws AerospikeException {
        return this.exists(null, key);
    }

    @Override
    public final Mono<Key> exists(Policy policy, Key key) throws AerospikeException {
        return Mono.create(sink -> this.aerospikeClient.exists(null, (ExistsListener)new ReactorExistsListener((MonoSink<Key>)sink), policy, key));
    }

    @Override
    public final Mono<KeysExists> exists(Key[] keys) throws AerospikeException {
        return this.exists(null, keys);
    }

    @Override
    public final Mono<KeysExists> exists(BatchPolicy policy, Key[] keys) throws AerospikeException {
        return Mono.create(sink -> this.aerospikeClient.exists(null, (ExistsArrayListener)new ReactorExistsArrayListener((MonoSink<KeysExists>)sink), policy, keys));
    }

    @Override
    public final Flux<KeyExists> existsFlux(Key[] keys) throws AerospikeException {
        return this.existsFlux(null, keys);
    }

    @Override
    public final Flux<KeyExists> existsFlux(BatchPolicy policy, Key[] keys) throws AerospikeException {
        return Flux.create(sink -> this.aerospikeClient.exists(null, (ExistsSequenceListener)new ReactorExistsSequenceListener((FluxSink<KeyExists>)sink), policy, keys));
    }

    @Override
    public final Mono<Key> put(Key key, Bin ... bins) throws AerospikeException {
        return this.put(null, key, bins);
    }

    @Override
    public final Mono<Key> put(WritePolicy policy, Key key, Bin ... bins) throws AerospikeException {
        return Mono.create(sink -> this.aerospikeClient.put(null, (WriteListener)new ReactorWriteListener((MonoSink<Key>)sink), policy, key, bins));
    }

    @Override
    public final Mono<Key> append(Key key, Bin ... bins) throws AerospikeException {
        return this.append(null, key, bins);
    }

    @Override
    public final Mono<Key> append(WritePolicy policy, Key key, Bin ... bins) throws AerospikeException {
        return Mono.create(sink -> this.aerospikeClient.append(null, (WriteListener)new ReactorWriteListener((MonoSink<Key>)sink), policy, key, bins));
    }

    @Override
    public final Mono<Key> prepend(Key key, Bin ... bins) throws AerospikeException {
        return this.prepend(null, key, bins);
    }

    @Override
    public final Mono<Key> prepend(WritePolicy policy, Key key, Bin ... bins) throws AerospikeException {
        return Mono.create(sink -> this.aerospikeClient.prepend(null, (WriteListener)new ReactorWriteListener((MonoSink<Key>)sink), policy, key, bins));
    }

    @Override
    public final Mono<Key> add(Key key, Bin ... bins) throws AerospikeException {
        return this.add(null, key, bins);
    }

    @Override
    public final Mono<Key> add(WritePolicy policy, Key key, Bin ... bins) throws AerospikeException {
        return Mono.create(sink -> this.aerospikeClient.add(null, (WriteListener)new ReactorWriteListener((MonoSink<Key>)sink), policy, key, bins));
    }

    @Override
    public final Mono<Key> delete(Key key) throws AerospikeException {
        return this.delete(null, key);
    }

    @Override
    public final Mono<Key> delete(WritePolicy policy, Key key) throws AerospikeException {
        return Mono.create(sink -> this.aerospikeClient.delete(null, (DeleteListener)new ReactorDeleteListener((MonoSink<Key>)sink), policy, key));
    }

    @Override
    public Mono<BatchResults> delete(BatchPolicy batchPolicy, BatchDeletePolicy deletePolicy, Key[] keys) throws AerospikeException {
        return Mono.create(sink -> this.aerospikeClient.delete(null, (BatchRecordArrayListener)new ReactorBatchRecordArrayListener((MonoSink<BatchResults>)sink), batchPolicy, deletePolicy, keys));
    }

    @Override
    public final Mono<KeyRecord> operate(Key key, Operation ... operations) throws AerospikeException {
        return this.operate(null, key, operations);
    }

    @Override
    public final Mono<KeyRecord> operate(WritePolicy policy, Key key, Operation ... operations) throws AerospikeException {
        return Mono.create(sink -> this.aerospikeClient.operate(null, (RecordListener)new ReactorRecordListener((MonoSink<KeyRecord>)sink), policy, key, operations));
    }

    @Override
    public Mono<BatchResults> operate(BatchPolicy batchPolicy, BatchWritePolicy writePolicy, Key[] keys, Operation ... ops) throws AerospikeException {
        return Mono.create(sink -> this.aerospikeClient.operate(null, (BatchRecordArrayListener)new ReactorBatchRecordArrayListener((MonoSink<BatchResults>)sink), batchPolicy, writePolicy, keys, ops));
    }

    @Override
    public Mono<Boolean> operate(BatchPolicy policy, List<BatchRecord> records) throws AerospikeException {
        return Mono.create(sink -> this.aerospikeClient.operate(null, (BatchOperateListListener)new ReactorBatchOperateListListener((MonoSink<Boolean>)sink), policy, records));
    }

    @Override
    public Mono<CommitStatus> commit(Txn txn) throws AerospikeException {
        return Mono.create(sink -> this.aerospikeClient.commit(null, (CommitListener)new ReactorCommitListener((MonoSink<CommitStatus>)sink), txn));
    }

    @Override
    public Mono<AbortStatus> abort(Txn txn) throws AerospikeException {
        return Mono.create(sink -> this.aerospikeClient.abort(null, (AbortListener)new ReactorAbortListener((MonoSink<AbortStatus>)sink), txn));
    }

    @Override
    public final Flux<KeyRecord> query(Statement statement) throws AerospikeException {
        return this.query(null, statement);
    }

    @Override
    public final Flux<KeyRecord> query(QueryPolicy policy, Statement statement) throws AerospikeException {
        return Flux.create(sink -> this.aerospikeClient.query(null, (RecordSequenceListener)new ReactorRecordSequenceListener((FluxSink<KeyRecord>)sink), policy, statement));
    }

    @Override
    public final Flux<KeyRecord> scanAll(String namespace, String setName, String ... binNames) throws AerospikeException {
        return this.scanAll((ScanPolicy)null, namespace, setName, binNames);
    }

    @Override
    public final Flux<KeyRecord> scanAll(ScanPolicy policy, String namespace, String setName, String ... binNames) throws AerospikeException {
        return Flux.create(sink -> this.aerospikeClient.scanAll(null, (RecordSequenceListener)new ReactorRecordSequenceListener((FluxSink<KeyRecord>)sink), policy, namespace, setName, binNames));
    }

    @Override
    public final Mono<KeyObject> execute(Key key, String packageName, String functionName, Value ... functionArgs) throws AerospikeException {
        return this.execute(null, key, packageName, functionName, functionArgs);
    }

    @Override
    public final Mono<KeyObject> execute(WritePolicy policy, Key key, String packageName, String functionName, Value ... functionArgs) throws AerospikeException {
        return Mono.create(sink -> this.aerospikeClient.execute(null, (ExecuteListener)new ReactorExecuteListener((MonoSink<KeyObject>)sink), policy, key, packageName, functionName, functionArgs));
    }

    @Override
    public Mono<String> info(InfoPolicy infoPolicy, Node node, String command) {
        return this.info(infoPolicy, node, Collections.singletonList(command)).flatMap(resultMap -> {
            if (resultMap.containsKey(command)) {
                String result = (String)resultMap.get(command);
                return Mono.just((Object)(result != null ? result : ""));
            }
            return Mono.error((Throwable)new AerospikeException(String.format("Unknown info command: [%s]", command)));
        });
    }

    @Override
    public Mono<Map<String, String>> info(InfoPolicy infoPolicy, Node node, List<String> commands) {
        return Mono.create(sink -> this.aerospikeClient.info(null, (InfoListener)new ReactorInfoListener((MonoSink<Map<String, String>>)sink), infoPolicy, node, commands.toArray(new String[0])));
    }

    @Override
    public Mono<Void> createIndex(Policy policy, String namespace, String setName, String indexName, String binName, IndexType indexType, IndexCollectionType indexCollectionType, CTX ... ctx) {
        return this.waitTillComplete(this.createIndexImpl(policy, namespace, setName, indexName, binName, indexType, indexCollectionType, ctx), policy != null ? new InfoPolicy(policy) : this.aerospikeClient.getInfoPolicyDefault());
    }

    @Override
    public Mono<Void> dropIndex(Policy policy, String namespace, String setName, String indexName) {
        return this.waitTillComplete(this.dropIndexImpl(policy, namespace, setName, indexName), policy != null ? new InfoPolicy(policy) : this.aerospikeClient.getInfoPolicyDefault());
    }

    @Override
    public IAerospikeClient getAerospikeClient() {
        return this.aerospikeClient;
    }

    private Mono<AsyncIndexTask> createIndexImpl(Policy policy, String namespace, String setName, String indexName, String binName, IndexType indexType, IndexCollectionType indexCollectionType, CTX ... ctx) {
        return Mono.create(sink -> this.aerospikeClient.createIndex(null, (IndexListener)new ReactorIndexListener((MonoSink<AsyncIndexTask>)sink), policy, namespace, setName, indexName, binName, indexType, indexCollectionType, ctx));
    }

    private Mono<AsyncIndexTask> dropIndexImpl(Policy policy, String namespace, String setName, String indexName) {
        return Mono.create(sink -> this.aerospikeClient.dropIndex(null, (IndexListener)new ReactorIndexListener((MonoSink<AsyncIndexTask>)sink), policy, namespace, setName, indexName));
    }

    private Mono<Void> waitTillComplete(Mono<AsyncIndexTask> asyncIndexTaskMono, InfoPolicy infoPolicy) {
        return asyncIndexTaskMono.flatMapMany(indexTask -> Flux.fromArray((Object[])this.aerospikeClient.getNodes()).filter(Node::isActive).flatMap(node -> this.queryIndexStatus(infoPolicy, (AsyncIndexTask)indexTask, (Node)node).delayElement(Duration.ofMillis(1000L)).repeat().takeWhile(status -> status == 1))).then();
    }

    private Mono<Integer> queryIndexStatus(InfoPolicy infoPolicy, AsyncIndexTask indexTask, Node node) {
        return Mono.create(sink -> indexTask.queryStatus(null, infoPolicy, node, (TaskStatusListener)new ReactorTaskStatusListener((MonoSink<Integer>)sink)));
    }

    @Override
    public Policy getReadPolicyDefault() {
        return this.aerospikeClient.getReadPolicyDefault();
    }

    @Override
    public WritePolicy getWritePolicyDefault() {
        return this.aerospikeClient.getWritePolicyDefault();
    }

    @Override
    public ScanPolicy getScanPolicyDefault() {
        return this.aerospikeClient.getScanPolicyDefault();
    }

    @Override
    public QueryPolicy getQueryPolicyDefault() {
        return this.aerospikeClient.getQueryPolicyDefault();
    }

    @Override
    public BatchPolicy getBatchPolicyDefault() {
        return this.aerospikeClient.getBatchPolicyDefault();
    }

    @Override
    public InfoPolicy getInfoPolicyDefault() {
        return this.aerospikeClient.getInfoPolicyDefault();
    }

    @Override
    public TxnVerifyPolicy getTxnVerifyPolicyDefault() {
        return this.aerospikeClient.getTxnVerifyPolicyDefault();
    }

    @Override
    public TxnRollPolicy getTxnRollPolicyDefault() {
        return this.aerospikeClient.getTxnRollPolicyDefault();
    }
}

