/*
 * Decompiled with CFR 0.152.
 */
package io.vlingo.lattice.grid.application;

import io.vlingo.actors.Actor;
import io.vlingo.actors.ActorInstantiator;
import io.vlingo.actors.Address;
import io.vlingo.actors.Definition;
import io.vlingo.actors.Returns;
import io.vlingo.common.SerializableConsumer;
import io.vlingo.lattice.grid.application.GridActorControl;
import io.vlingo.lattice.grid.application.message.Answer;
import io.vlingo.lattice.grid.application.message.Deliver;
import io.vlingo.lattice.grid.application.message.Encoder;
import io.vlingo.lattice.grid.application.message.Forward;
import io.vlingo.lattice.grid.application.message.Message;
import io.vlingo.lattice.grid.application.message.Relocate;
import io.vlingo.lattice.grid.application.message.Start;
import io.vlingo.lattice.grid.application.message.serialization.FSTEncoder;
import io.vlingo.lattice.util.OutBuffers;
import io.vlingo.wire.fdx.outbound.ApplicationOutboundStream;
import io.vlingo.wire.message.RawMessage;
import io.vlingo.wire.node.Id;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OutboundGridActorControl
extends Actor
implements GridActorControl.Outbound {
    private static final Logger logger = LoggerFactory.getLogger(OutboundGridActorControl.class);
    private final Id localNodeId;
    private ApplicationOutboundStream stream;
    private final Encoder encoder;
    private final BiConsumer<UUID, Returns<?>> correlation;
    private final OutBuffers outBuffers;

    public OutboundGridActorControl(Id localNodeId, Encoder encoder, BiConsumer<UUID, Returns<?>> correlation, OutBuffers outBuffers) {
        this(localNodeId, null, encoder, correlation, outBuffers);
    }

    public OutboundGridActorControl(Id localNodeId, ApplicationOutboundStream stream, Encoder encoder, BiConsumer<UUID, Returns<?>> correlation, OutBuffers outBuffers) {
        this.localNodeId = localNodeId;
        this.stream = stream;
        this.encoder = encoder;
        this.correlation = correlation;
        this.outBuffers = outBuffers;
    }

    @Override
    public void disburse(Id id) {
        Runnable next;
        Queue<Runnable> buffer = this.outBuffers.queue(id);
        logger.debug("Disbursing buffered messages");
        do {
            if ((next = buffer.poll()) == null) continue;
            next.run();
        } while (next != null);
    }

    private void send(Id recipient, Message message) {
        logger.debug("Buffering message {} to {}", (Object)message, (Object)recipient);
        this.outBuffers.enqueue(recipient, () -> {
            logger.debug("Sending message {} to {}", (Object)message, (Object)recipient);
            byte[] payload = this.encoder.encode(message);
            RawMessage raw = RawMessage.from((int)this.localNodeId.value(), (int)-1, (int)payload.length);
            raw.putRemaining(ByteBuffer.wrap(payload));
            this.stream.sendTo(raw, recipient);
        });
    }

    @Override
    public <T> void start(Id recipient, Id sender, Class<T> protocol, Address address, Definition.SerializationProxy definitionProxy) {
        this.send(recipient, new Start<T>(protocol, address, definitionProxy));
    }

    @Override
    public <T> void deliver(Id recipient, Id sender, Returns<?> returns, Class<T> protocol, Address address, Definition.SerializationProxy definitionProxy, SerializableConsumer<T> consumer, String representation) {
        Deliver<T> deliver;
        if (returns == null) {
            deliver = new Deliver<T>(protocol, address, definitionProxy, consumer, representation);
        } else {
            UUID answerCorrelationId = UUID.randomUUID();
            deliver = new Deliver<T>(protocol, address, definitionProxy, consumer, answerCorrelationId, representation);
            this.correlation.accept(answerCorrelationId, returns);
        }
        this.send(recipient, deliver);
    }

    @Override
    public <T> void answer(Id receiver, Id sender, Answer<T> answer) {
        this.send(receiver, answer);
    }

    @Override
    public void forward(Id receiver, Id sender, Message message) {
        this.send(receiver, new Forward(sender, message));
    }

    @Override
    public void relocate(Id receiver, Id sender, Definition.SerializationProxy definitionProxy, Address address, Object snapshot, List<? extends io.vlingo.actors.Message> pending) {
        List<Deliver<?>> messages = pending.stream().map(Deliver.from(this.correlation)).collect(Collectors.toList());
        this.send(receiver, new Relocate(address, definitionProxy, snapshot, messages));
    }

    @Override
    public void useStream(ApplicationOutboundStream outbound) {
        this.stream = outbound;
    }

    public static class OutboundGridActorControlInstantiator
    implements ActorInstantiator<OutboundGridActorControl> {
        private static final long serialVersionUID = 8987209018742138417L;
        private final Id id;
        private final FSTEncoder fstEncoder;
        private final BiConsumer<UUID, Returns<?>> correlation;
        private final OutBuffers outBuffers;

        public OutboundGridActorControlInstantiator(Id id, FSTEncoder fstEncoder, BiConsumer<UUID, Returns<?>> correlation, OutBuffers outBuffers) {
            this.id = id;
            this.fstEncoder = fstEncoder;
            this.correlation = correlation;
            this.outBuffers = outBuffers;
        }

        public OutboundGridActorControl instantiate() {
            return new OutboundGridActorControl(this.id, this.fstEncoder, this.correlation, this.outBuffers);
        }
    }
}

