/*
 * Decompiled with CFR 0.152.
 */
package com.bloxbean.cardano.yaci.helper.reactive;

import com.bloxbean.cardano.yaci.core.common.Constants;
import com.bloxbean.cardano.yaci.core.common.NetworkType;
import com.bloxbean.cardano.yaci.core.model.Block;
import com.bloxbean.cardano.yaci.core.model.BlockHeader;
import com.bloxbean.cardano.yaci.core.network.TCPNodeClient;
import com.bloxbean.cardano.yaci.core.protocol.Agent;
import com.bloxbean.cardano.yaci.core.protocol.AgentListener;
import com.bloxbean.cardano.yaci.core.protocol.State;
import com.bloxbean.cardano.yaci.core.protocol.blockfetch.BlockfetchAgent;
import com.bloxbean.cardano.yaci.core.protocol.blockfetch.BlockfetchAgentListener;
import com.bloxbean.cardano.yaci.core.protocol.chainsync.messages.Point;
import com.bloxbean.cardano.yaci.core.protocol.chainsync.messages.Tip;
import com.bloxbean.cardano.yaci.core.protocol.chainsync.n2n.ChainSyncAgentListener;
import com.bloxbean.cardano.yaci.core.protocol.chainsync.n2n.ChainsyncAgent;
import com.bloxbean.cardano.yaci.core.protocol.handshake.HandshakeAgent;
import com.bloxbean.cardano.yaci.core.protocol.handshake.HandshakeAgentListener;
import com.bloxbean.cardano.yaci.core.protocol.handshake.messages.VersionTable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

public class BlockStreamer {
    private static final Logger log = LoggerFactory.getLogger(BlockStreamer.class);
    private TCPNodeClient n2nClient;
    private Flux<Block> blockFlux;

    private BlockStreamer() {
    }

    public static BlockStreamer fromLatest(NetworkType networkType) {
        switch (networkType) {
            case MAINNET: {
                return BlockStreamer.fromLatest("relays-new.cardano-mainnet.iohk.io", 3001, Constants.WELL_KNOWN_MAINNET_POINT, networkType.getN2NVersionTable());
            }
            case LEGACY_TESTNET: {
                return BlockStreamer.fromLatest("relays-new.cardano-testnet.iohkdev.io", 3001, Constants.WELL_KNOWN_TESTNET_POINT, networkType.getN2NVersionTable());
            }
            case PREPROD: {
                return BlockStreamer.fromLatest("preprod-node.world.dev.cardano.org", 30000, Constants.WELL_KNOWN_PREPROD_POINT, networkType.getN2NVersionTable());
            }
            case PREVIEW: {
                return BlockStreamer.fromLatest("preview-node.world.dev.cardano.org", 30002, Constants.WELL_KNOWN_PREVIEW_POINT, networkType.getN2NVersionTable());
            }
        }
        return null;
    }

    public static BlockStreamer fromPoint(NetworkType networkType, Point point) {
        switch (networkType) {
            case MAINNET: {
                return BlockStreamer.fromPoint("relays-new.cardano-mainnet.iohk.io", 3001, point, networkType.getN2NVersionTable());
            }
            case LEGACY_TESTNET: {
                return BlockStreamer.fromPoint("relays-new.cardano-testnet.iohkdev.io", 3001, point, networkType.getN2NVersionTable());
            }
            case PREPROD: {
                return BlockStreamer.fromPoint("preprod-node.world.dev.cardano.org", 30000, point, networkType.getN2NVersionTable());
            }
            case PREVIEW: {
                return BlockStreamer.fromPoint("preview-node.world.dev.cardano.org", 30002, point, networkType.getN2NVersionTable());
            }
        }
        return null;
    }

    public static BlockStreamer fromLatest(String host, int port, Point wellKnownPoint, VersionTable versionTable) {
        BlockStreamer blockStreamer = new BlockStreamer();
        blockStreamer.initBlockFluxFromPoint(host, port, wellKnownPoint, versionTable, true);
        return blockStreamer;
    }

    public static BlockStreamer fromPoint(String host, int port, Point point, VersionTable versionTable) {
        BlockStreamer blockStreamer = new BlockStreamer();
        blockStreamer.initBlockFluxFromPoint(host, port, point, versionTable, false);
        return blockStreamer;
    }

    public void shutdown() {
        if (this.n2nClient != null) {
            this.n2nClient.shutdown();
        }
    }

    public Flux<Block> stream() {
        return this.blockFlux;
    }

    private void initBlockFluxFromPoint(String host, int port, Point wellKnownPoint, VersionTable versionTable, final boolean startFromTip) {
        final AtomicBoolean tipFound = new AtomicBoolean(false);
        final ChainsyncAgent chainSyncAgent = new ChainsyncAgent(new Point[]{wellKnownPoint});
        final BlockfetchAgent blockFetch = new BlockfetchAgent();
        HandshakeAgent handshakeAgent = new HandshakeAgent(versionTable);
        handshakeAgent.addListener((AgentListener)new HandshakeAgentListener(){

            public void handshakeOk() {
                chainSyncAgent.sendNextMessage();
            }
        });
        this.n2nClient = new TCPNodeClient(host, port, handshakeAgent, new Agent[]{chainSyncAgent, blockFetch});
        this.blockFlux = Flux.create(sink -> {
            sink.onDispose(() -> {});
            blockFetch.addListener((AgentListener)new BlockfetchAgentListener((FluxSink)sink, chainSyncAgent){
                final /* synthetic */ FluxSink val$sink;
                final /* synthetic */ ChainsyncAgent val$chainSyncAgent;
                {
                    this.val$sink = fluxSink;
                    this.val$chainSyncAgent = chainsyncAgent;
                }

                public void blockFound(Block block) {
                    if (log.isTraceEnabled()) {
                        log.trace("Block found {}", (Object)block);
                    }
                    this.val$sink.next((Object)block);
                    this.val$chainSyncAgent.sendNextMessage();
                }

                public void batchDone() {
                    if (log.isTraceEnabled()) {
                        log.trace("batchDone");
                    }
                }
            });
        });
        this.blockFlux = this.blockFlux.doOnSubscribe(subscription -> {
            if (!this.n2nClient.isRunning()) {
                log.debug("Subscription started");
                this.n2nClient.start();
            }
        });
        chainSyncAgent.addListener((AgentListener)new ChainSyncAgentListener(){

            public void intersactFound(Tip tip, Point point) {
                log.debug("Intersact found {}", (Object)point);
                if (startFromTip && !tip.getPoint().equals((Object)point) && !tipFound.get()) {
                    chainSyncAgent.reset(tip.getPoint());
                    tipFound.set(true);
                }
                chainSyncAgent.sendNextMessage();
            }

            public void intersactNotFound(Tip tip) {
                log.error("IntersactNotFound: {}", (Object)tip);
            }

            public void rollforward(Tip tip, BlockHeader blockHeader) {
                long slot = blockHeader.getHeaderBody().getSlot();
                String hash = blockHeader.getHeaderBody().getBlockHash();
                blockFetch.resetPoints(new Point(slot, hash), new Point(slot, hash));
                if (log.isDebugEnabled()) {
                    log.debug("Trying to fetch block for {}", (Object)new Point(slot, hash));
                }
                blockFetch.sendNextMessage();
            }

            public void rollbackward(Tip tip, Point toPoint) {
                if (log.isDebugEnabled()) {
                    log.debug("Rolling backward {}", (Object)toPoint);
                }
                chainSyncAgent.sendNextMessage();
            }

            public void onStateUpdate(State oldState, State newState) {
            }
        });
    }

    public static BlockStreamer forRange(NetworkType networkType, Point fromPoint, Point toPoint) {
        switch (networkType) {
            case MAINNET: {
                return BlockStreamer.forRange("relays-new.cardano-mainnet.iohk.io", 3001, fromPoint, toPoint, networkType.getN2NVersionTable());
            }
            case LEGACY_TESTNET: {
                return BlockStreamer.forRange("relays-new.cardano-testnet.iohkdev.io", 3001, fromPoint, toPoint, networkType.getN2NVersionTable());
            }
            case PREPROD: {
                return BlockStreamer.forRange("preprod-node.world.dev.cardano.org", 30000, fromPoint, toPoint, networkType.getN2NVersionTable());
            }
            case PREVIEW: {
                return BlockStreamer.forRange("preview-node.world.dev.cardano.org", 30002, fromPoint, toPoint, networkType.getN2NVersionTable());
            }
        }
        return null;
    }

    public static BlockStreamer forRange(String host, int port, Point fromPoint, Point toPoint, VersionTable versionTable) {
        BlockStreamer blockStreamer = new BlockStreamer();
        blockStreamer.initBlockFluxForRange(host, port, fromPoint, toPoint, versionTable);
        return blockStreamer;
    }

    private void initBlockFluxForRange(String host, int port, Point fromPoint, Point toPoint, VersionTable versionTable) {
        final BlockfetchAgent blockFetch = new BlockfetchAgent();
        blockFetch.resetPoints(fromPoint, toPoint);
        HandshakeAgent handshakeAgent = new HandshakeAgent(versionTable);
        handshakeAgent.addListener((AgentListener)new HandshakeAgentListener(){

            public void handshakeOk() {
                blockFetch.sendNextMessage();
            }
        });
        TCPNodeClient n2CClient = new TCPNodeClient(host, port, handshakeAgent, new Agent[]{blockFetch});
        AtomicInteger subscriberCount = new AtomicInteger(0);
        this.blockFlux = Flux.create(sink -> {
            sink.onDispose(() -> {
                int count = subscriberCount.decrementAndGet();
                if (count == 0) {
                    n2CClient.shutdown();
                }
            });
            blockFetch.addListener((AgentListener)new BlockfetchAgentListener((FluxSink)sink){
                final /* synthetic */ FluxSink val$sink;
                {
                    this.val$sink = fluxSink;
                }

                public void blockFound(Block block) {
                    if (log.isTraceEnabled()) {
                        log.trace("Block found {}", (Object)block);
                    }
                    this.val$sink.next((Object)block);
                }

                public void batchDone() {
                    if (log.isTraceEnabled()) {
                        log.trace("batchDone");
                    }
                    this.val$sink.complete();
                }
            });
        });
        this.blockFlux = this.blockFlux.doOnSubscribe(subscription -> {
            subscriberCount.incrementAndGet();
            if (!n2CClient.isRunning()) {
                log.debug("Subscription started");
                n2CClient.start();
            }
        });
    }
}

