/*
 * Decompiled with CFR 0.152.
 */
package io.vlingo.cluster.model.outbound;

import io.vlingo.actors.Actor;
import io.vlingo.cluster.model.message.ApplicationSays;
import io.vlingo.cluster.model.message.Directory;
import io.vlingo.cluster.model.message.MessageConverters;
import io.vlingo.cluster.model.message.OperationalMessageCache;
import io.vlingo.cluster.model.message.Split;
import io.vlingo.cluster.model.outbound.OperationalOutboundStream;
import io.vlingo.common.pool.ResourcePool;
import io.vlingo.wire.fdx.outbound.ManagedOutboundChannelProvider;
import io.vlingo.wire.fdx.outbound.Outbound;
import io.vlingo.wire.message.ConsumerByteBuffer;
import io.vlingo.wire.message.Converters;
import io.vlingo.wire.message.RawMessage;
import io.vlingo.wire.node.Id;
import io.vlingo.wire.node.Node;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OperationalOutboundStreamActor
extends Actor
implements OperationalOutboundStream {
    private static final Logger logger = LoggerFactory.getLogger(OperationalOutboundStreamActor.class);
    private final OperationalMessageCache cache;
    private final Node node;
    private final Outbound outbound;

    public OperationalOutboundStreamActor(Node node, ManagedOutboundChannelProvider provider, ResourcePool<ConsumerByteBuffer, String> byteBufferPool) {
        this.node = node;
        this.outbound = new Outbound(provider, byteBufferPool);
        this.cache = new OperationalMessageCache(node);
    }

    @Override
    public void close(Id id) {
        logger.debug("Closing Id: {}", (Object)id);
        this.outbound.close(id);
    }

    @Override
    public void application(ApplicationSays says, Collection<Node> unconfirmedNodes) {
        ConsumerByteBuffer buffer = this.outbound.lendByteBuffer();
        MessageConverters.messageToBytes(says, buffer.asByteBuffer());
        RawMessage message = Converters.toRawMessage((short)this.node.id().value(), (ByteBuffer)buffer.asByteBuffer());
        logger.debug("Broadcasting ApplicationSays {} to {}", (Object)says.saysId, (Object)this.debug(unconfirmedNodes));
        this.outbound.broadcast(unconfirmedNodes, this.outbound.bytesFrom(message, buffer));
    }

    private <E> String debug(Collection<E> collection) {
        if (logger.isDebugEnabled()) {
            return "";
        }
        return String.format("[%s]", collection.stream().map(Object::toString).collect(Collectors.joining(", ")));
    }

    @Override
    public void directory(Set<Node> allLiveNodes) {
        Directory dir = new Directory(this.node.id(), this.node.name(), allLiveNodes);
        ConsumerByteBuffer buffer = this.outbound.lendByteBuffer();
        MessageConverters.messageToBytes(dir, buffer.asByteBuffer());
        RawMessage message = Converters.toRawMessage((short)this.node.id().value(), (ByteBuffer)buffer.asByteBuffer());
        logger.debug("Broadcasting directory {}", (Object)this.debug(allLiveNodes));
        this.outbound.broadcast(this.outbound.bytesFrom(message, buffer));
    }

    @Override
    public void elect(Collection<Node> allGreaterNodes) {
        logger.debug("Broadcasting ellect {}", (Object)this.debug(allGreaterNodes));
        this.outbound.broadcast(allGreaterNodes, this.cache.cachedRawMessage("ELECT"));
    }

    @Override
    public void join() {
        logger.debug("Broadcasting join");
        this.outbound.broadcast(this.cache.cachedRawMessage("JOIN"));
    }

    @Override
    public void leader() {
        logger.debug("Broadcasting leader");
        this.outbound.broadcast(this.cache.cachedRawMessage("LEADER"));
    }

    @Override
    public void leader(Id id) {
        logger.debug("Broadcasting leader Id: {}", (Object)id);
        this.outbound.sendTo(this.cache.cachedRawMessage("LEADER"), id);
    }

    @Override
    public void leave() {
        logger.debug("Broadcasting leave");
        this.outbound.broadcast(this.cache.cachedRawMessage("LEAVE"));
    }

    @Override
    public void open(Id id) {
        logger.debug("open Id: {}", (Object)id);
        this.outbound.open(id);
    }

    @Override
    public void ping(Id targetNodeId) {
        logger.debug("Sending ping to: {}", (Object)targetNodeId);
        this.outbound.sendTo(this.cache.cachedRawMessage("PING"), targetNodeId);
    }

    @Override
    public void pulse(Id targetNodeId) {
        logger.debug("Sending pulse to: {}", (Object)targetNodeId);
        this.outbound.sendTo(this.cache.cachedRawMessage("PULSE"), targetNodeId);
    }

    @Override
    public void pulse() {
        logger.debug("Broadcasting pulse");
        this.outbound.broadcast(this.cache.cachedRawMessage("PULSE"));
    }

    @Override
    public void split(Id targetNodeId, Id currentLeaderId) {
        Split split = new Split(currentLeaderId);
        ConsumerByteBuffer buffer = this.outbound.lendByteBuffer();
        MessageConverters.messageToBytes(split, buffer.asByteBuffer());
        RawMessage message = Converters.toRawMessage((short)this.node.id().value(), (ByteBuffer)buffer.asByteBuffer());
        logger.debug("Sending split: {} to: {}", (Object)split, (Object)currentLeaderId);
        this.outbound.sendTo(this.outbound.bytesFrom(message, buffer), targetNodeId);
    }

    @Override
    public void vote(Id targetNodeId) {
        logger.debug("Sending vote to: {}", (Object)targetNodeId);
        this.outbound.sendTo(this.cache.cachedRawMessage("VOTE"), targetNodeId);
    }

    public void stop() {
        logger.debug("Stopping...");
        this.outbound.close();
        super.stop();
    }
}

