/*
 * Decompiled with CFR 0.152.
 */
package com.bloxbean.cardano.yaci.helper;

import com.bloxbean.cardano.yaci.core.protocol.AgentListener;
import com.bloxbean.cardano.yaci.core.protocol.localtxmonitor.LocalTxMonitorAgent;
import com.bloxbean.cardano.yaci.core.protocol.localtxmonitor.LocalTxMonitorListener;
import com.bloxbean.cardano.yaci.core.protocol.localtxmonitor.messages.MsgAcquired;
import com.bloxbean.cardano.yaci.core.protocol.localtxmonitor.messages.MsgAwaitAcquire;
import com.bloxbean.cardano.yaci.core.protocol.localtxmonitor.messages.MsgGetSizes;
import com.bloxbean.cardano.yaci.core.protocol.localtxmonitor.messages.MsgHasTx;
import com.bloxbean.cardano.yaci.core.protocol.localtxmonitor.messages.MsgNextTx;
import com.bloxbean.cardano.yaci.core.protocol.localtxmonitor.messages.MsgReplyGetSizes;
import com.bloxbean.cardano.yaci.core.protocol.localtxmonitor.messages.MsgReplyHasTx;
import com.bloxbean.cardano.yaci.core.protocol.localtxmonitor.messages.MsgReplyNextTx;
import com.bloxbean.cardano.yaci.helper.api.QueryClient;
import com.bloxbean.cardano.yaci.helper.model.MempoolStatus;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

public class LocalTxMonitorClient
extends QueryClient {
    private static final Logger log = LoggerFactory.getLogger(LocalTxMonitorClient.class);
    private LocalTxMonitorAgent localTxMonitorAgent;

    public LocalTxMonitorClient(LocalTxMonitorAgent localTxMonitorAgent) {
        this.localTxMonitorAgent = localTxMonitorAgent;
        this.init();
    }

    private void init() {
        this.localTxMonitorAgent.addListener((AgentListener)new LocalTxMonitorListener(){

            public void acquiredAt(MsgAwaitAcquire request, MsgAcquired msgAcquired) {
                if (log.isDebugEnabled()) {
                    log.debug("TxMonitor acquired at : " + msgAcquired.getSlotNo());
                }
                LocalTxMonitorClient.this.applyMonoSuccess(request, msgAcquired.getSlotNo());
            }

            public void onReplyHashTx(MsgHasTx request, MsgReplyHasTx reply) {
                LocalTxMonitorClient.this.applyMonoSuccess(request, reply);
            }

            public void onReplyNextTx(MsgNextTx request, MsgReplyNextTx reply) {
                byte[] transaction = reply.getTransaction();
                if (transaction == null) {
                    transaction = new byte[]{};
                }
                LocalTxMonitorClient.this.applyMonoSuccess(request, transaction);
            }

            public void onReplyGetSizes(MsgGetSizes request, MsgReplyGetSizes reply) {
                MempoolStatus mempoolStatus = MempoolStatus.builder().capacityInBytes(reply.getCapacityInBytes()).sizeInBytes(reply.getSizeInBytes()).numberOfTxs(reply.getNumberOfTxs()).build();
                LocalTxMonitorClient.this.applyMonoSuccess(request, mempoolStatus);
            }

            public void onDisconnect() {
                LocalTxMonitorClient.this.applyError("Connection Error !!!");
            }
        });
    }

    public Mono<Long> acquire() {
        return Mono.create(monoSink -> {
            if (log.isDebugEnabled()) {
                log.debug("Try to acquire for LocalTxMonitor");
            }
            MsgAwaitAcquire msgAwaitAcquire = this.localTxMonitorAgent.awaitAcquire();
            this.storeMonoSinkReference(msgAwaitAcquire, (MonoSink)monoSink);
            this.localTxMonitorAgent.sendNextMessage();
        });
    }

    public Mono<MempoolStatus> getMempoolSizeAndCapacity() {
        return Mono.create(monoSink -> {
            MsgGetSizes sizeRequest = this.localTxMonitorAgent.getSizeAndCapacity();
            this.storeMonoSinkReference(sizeRequest, (MonoSink)monoSink);
            this.localTxMonitorAgent.sendNextMessage();
        });
    }

    public Mono<MempoolStatus> acquireAndGetMempoolSizeAndCapacity() {
        return Mono.create(monoSink -> this.acquire().doOnNext(aLong -> {
            if (log.isDebugEnabled()) {
                log.debug("Try to acquire for LocalTxMonitor");
            }
            MsgGetSizes sizeRequest = this.localTxMonitorAgent.getSizeAndCapacity();
            this.storeMonoSinkReference(sizeRequest, (MonoSink)monoSink);
            this.localTxMonitorAgent.sendNextMessage();
        }).doOnError(throwable -> {
            if (log.isDebugEnabled()) {
                log.error("Error >> ", throwable);
            }
            monoSink.error(throwable);
        }).subscribe());
    }

    public Mono<List<byte[]>> getCurrentMempoolTransactionsAsMono() {
        return this.getCurrentMempoolTransactions().collectList();
    }

    public Flux<byte[]> getCurrentMempoolTransactions() {
        return Flux.create(fluxSink -> this._getMempoolTransactions((FluxSink)fluxSink, false));
    }

    public Flux<byte[]> acquireAndGetMempoolTransactions() {
        return Flux.create(fluxSink -> this.acquire().doOnError(throwable -> fluxSink.error(throwable)).doOnNext(slot -> this._getMempoolTransactions((FluxSink)fluxSink, false)).subscribe());
    }

    public Mono<List<byte[]>> acquireAndGetMempoolTransactionsAsMono() {
        return this.acquireAndGetMempoolTransactions().collectList();
    }

    public Flux<byte[]> streamMempoolTransactions() {
        return Flux.create(fluxSink -> this.acquire().doOnNext(slot -> this._getMempoolTransactions((FluxSink)fluxSink, true)).doOnError(throwable -> fluxSink.error(throwable)).subscribe());
    }

    private void _getMempoolTransactions(FluxSink fluxSink, boolean stream) {
        this.getNextTx().doOnError(throwable -> fluxSink.error(throwable)).doOnNext(bytes -> {
            if (bytes == null || ((byte[])bytes).length == 0) {
                if (!stream) {
                    if (log.isDebugEnabled()) {
                        log.debug("FluxSink.complete()");
                    }
                    fluxSink.complete();
                } else {
                    if (log.isDebugEnabled()) {
                        log.debug("Streaming mode. Re-acquire and wait for next set of transactions");
                    }
                    this.acquire().doOnError(throwable -> fluxSink.error(throwable)).doOnNext(slot -> this._getMempoolTransactions(fluxSink, stream)).subscribe();
                }
            } else {
                fluxSink.next(bytes);
                this._getMempoolTransactions(fluxSink, stream);
            }
        }).subscribe();
    }

    public Mono<byte[]> getNextTx() {
        return Mono.create(monoSink -> {
            MsgNextTx nextTxRequest = this.localTxMonitorAgent.nextTx();
            this.storeMonoSinkReference(nextTxRequest, (MonoSink)monoSink);
            this.localTxMonitorAgent.sendNextMessage();
        });
    }
}

