/*
 * Decompiled with CFR 0.152.
 */
package io.vlingo.lattice.grid.application;

import io.vlingo.actors.Address;
import io.vlingo.actors.LocalMessage;
import io.vlingo.actors.Returns;
import io.vlingo.common.Completes;
import io.vlingo.common.Scheduler;
import io.vlingo.lattice.grid.application.ApplicationMessageHandler;
import io.vlingo.lattice.grid.application.GridActorControl;
import io.vlingo.lattice.grid.application.message.Answer;
import io.vlingo.lattice.grid.application.message.Decoder;
import io.vlingo.lattice.grid.application.message.Deliver;
import io.vlingo.lattice.grid.application.message.Message;
import io.vlingo.lattice.grid.application.message.Relocate;
import io.vlingo.lattice.grid.application.message.Start;
import io.vlingo.lattice.grid.application.message.Visitor;
import io.vlingo.lattice.grid.application.message.serialization.JavaObjectDecoder;
import io.vlingo.lattice.grid.hashring.HashRing;
import io.vlingo.lattice.util.HardRefHolder;
import io.vlingo.lattice.util.WeakQueue;
import io.vlingo.wire.message.RawMessage;
import io.vlingo.wire.node.Id;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class GridApplicationMessageHandler
implements ApplicationMessageHandler {
    private static final Logger logger = LoggerFactory.getLogger(GridApplicationMessageHandler.class);
    private final Id localNode;
    private final HashRing<Id> hashRing;
    private final GridActorControl.Inbound inbound;
    private final GridActorControl.Outbound outbound;
    private final Decoder decoder;
    private final Visitor visitor;
    private final Scheduler scheduler;
    private final HardRefHolder holder;
    private final Queue<Runnable> buffer = new WeakQueue<Runnable>();

    public GridApplicationMessageHandler(Id localNode, HashRing<Id> hashRing, GridActorControl.Inbound inbound, GridActorControl.Outbound outbound, HardRefHolder holder, Scheduler scheduler) {
        this(localNode, hashRing, inbound, outbound, new JavaObjectDecoder(), holder, scheduler);
    }

    public GridApplicationMessageHandler(Id localNode, HashRing<Id> hashRing, GridActorControl.Inbound inbound, GridActorControl.Outbound outbound, Decoder decoder, HardRefHolder holder, Scheduler scheduler) {
        this.localNode = localNode;
        this.hashRing = hashRing;
        this.inbound = inbound;
        this.outbound = outbound;
        this.decoder = decoder;
        this.holder = holder;
        this.scheduler = scheduler;
        this.visitor = new ControlMessageVisitor();
    }

    @Override
    public void handle(RawMessage raw) {
        try {
            Message message = this.decoder.decode(raw.asBinaryMessage());
            Id sender = Id.of((short)raw.header().nodeId());
            logger.debug("Buffering message {} from {}", (Object)message, (Object)sender);
            Runnable runnable = () -> {
                logger.debug("Handling message {} from {}", (Object)message, (Object)sender);
                message.accept(this.localNode, sender, this.visitor);
            };
            this.buffer.offer(runnable);
            if (Objects.nonNull(this.holder)) {
                this.holder.holdOnTo(runnable);
            }
        }
        catch (Exception e) {
            logger.error(String.format("Failed to process message %s", raw), (Throwable)e);
        }
    }

    @Override
    public void disburse(Id id) {
        Runnable next;
        if (!id.equals((Object)this.localNode)) {
            return;
        }
        logger.debug("Disbursing buffered messages");
        do {
            if ((next = this.buffer.poll()) == null) continue;
            next.run();
        } while (next != null);
    }

    final class ControlMessageVisitor
    implements Visitor {
        ControlMessageVisitor() {
        }

        public void visit(Id receiver, Id sender, Answer answer) {
            GridApplicationMessageHandler.this.inbound.answer(receiver, sender, answer);
        }

        @Override
        public <T> void visit(Id receiver, Id sender, Deliver<T> deliver) {
            Id recipient = this.receiver(receiver, deliver.address);
            if (recipient == receiver) {
                GridApplicationMessageHandler.this.inbound.deliver(receiver, sender, this.returnsAnswer(receiver, sender, deliver), deliver.protocol, deliver.address, deliver.definition, deliver.consumer, deliver.representation);
            } else {
                GridApplicationMessageHandler.this.outbound.forward(recipient, sender, deliver);
            }
        }

        @Override
        public <T> void visit(Id receiver, Id sender, Start<T> start) {
            Id recipient = this.receiver(receiver, start.address);
            if (recipient == receiver) {
                GridApplicationMessageHandler.this.inbound.start(receiver, sender, start.protocol, start.address, start.definition);
            } else {
                GridApplicationMessageHandler.this.outbound.forward(recipient, sender, start);
            }
        }

        private Id receiver(Id receiver, Address address) {
            Id recipient = (Id)GridApplicationMessageHandler.this.hashRing.nodeOf(address.idString());
            if (recipient == null || recipient.equals((Object)receiver)) {
                return receiver;
            }
            return recipient;
        }

        @Override
        public void visit(Id receiver, Id sender, Relocate relocate) {
            Id recipient = this.receiver(receiver, relocate.address);
            if (recipient == receiver) {
                List pending = relocate.pending.stream().map(deliver -> new LocalMessage(null, deliver.protocol, deliver.consumer, this.returnsAnswer(receiver, sender, (Deliver<?>)deliver), deliver.representation)).collect(Collectors.toCollection(ArrayList::new));
                GridApplicationMessageHandler.this.inbound.relocate(receiver, sender, relocate.definition, relocate.address, relocate.snapshot, pending);
            } else {
                GridApplicationMessageHandler.this.outbound.forward(recipient, sender, relocate);
            }
        }

        private Returns<?> returnsAnswer(Id receiver, Id sender, Deliver<?> deliver) {
            Returns returns = deliver.answerCorrelationId == null ? null : Returns.value((Completes)Completes.using((Scheduler)GridApplicationMessageHandler.this.scheduler).andThen(result -> new Answer<Object>(deliver.answerCorrelationId, result)).recoverFrom(error -> new Answer(deliver.answerCorrelationId, (Throwable)error)).otherwise(ignored -> new Answer(deliver.answerCorrelationId, new TimeoutException())).andThenConsume(4000L, answer -> GridApplicationMessageHandler.this.outbound.answer(sender, receiver, answer)).andFinally());
            return returns;
        }
    }
}

