/*
 * Decompiled with CFR 0.152.
 */
package org.reaktivity.reaktor.internal;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.MessageHandler;
import org.agrona.concurrent.broadcast.BroadcastReceiver;
import org.agrona.concurrent.broadcast.CopyBroadcastReceiver;
import org.agrona.concurrent.ringbuffer.ManyToOneRingBuffer;
import org.agrona.concurrent.ringbuffer.RingBuffer;
import org.reaktivity.nukleus.Controller;
import org.reaktivity.nukleus.ControllerBuilder;
import org.reaktivity.nukleus.ControllerSpi;
import org.reaktivity.nukleus.function.MessageFunction;
import org.reaktivity.reaktor.ReaktorConfiguration;
import org.reaktivity.reaktor.internal.layouts.ControlLayout;
import org.reaktivity.reaktor.internal.types.control.CommandFW;
import org.reaktivity.reaktor.internal.types.control.ResolvedFW;
import org.reaktivity.reaktor.internal.types.control.ResponseFW;
import org.reaktivity.reaktor.internal.types.control.RoutedFW;

public final class ControllerBuilderImpl<T extends Controller>
implements ControllerBuilder<T> {
    private final ReaktorConfiguration config;
    private final Class<T> kind;
    private Function<ControllerSpi, T> factory;

    public ControllerBuilderImpl(ReaktorConfiguration config, Class<T> kind) {
        this.config = config;
        this.kind = kind;
    }

    @Override
    public Class<T> kind() {
        return this.kind;
    }

    @Override
    public ControllerBuilder<T> setFactory(Function<ControllerSpi, T> factory) {
        this.factory = factory;
        return this;
    }

    @Override
    public T build() {
        Objects.requireNonNull(this.factory, "factory");
        ControllerSpiImpl controllerSpi = new ControllerSpiImpl(this.config);
        return (T)((Controller)this.factory.apply(controllerSpi));
    }

    private final class ControllerSpiImpl
    implements ControllerSpi {
        private final ControlLayout.Builder controlRW = new ControlLayout.Builder();
        private final CommandFW commandRO = new CommandFW();
        private final ResponseFW responseRO = new ResponseFW();
        private final RoutedFW routedRO = new RoutedFW();
        private final ResolvedFW resolvedRO = new ResolvedFW();
        private final RingBuffer conductorCommands;
        private final CopyBroadcastReceiver conductorResponses;
        private final ConcurrentMap<Long, PendingCommand<?>> commandsByCorrelationId;
        private final MessageHandler responseHandler;
        private final ControlLayout control;

        private ControllerSpiImpl(ReaktorConfiguration config) {
            this.control = this.controlRW.controlPath(config.directory().resolve("control")).commandBufferCapacity(config.commandBufferCapacity()).responseBufferCapacity(config.responseBufferCapacity()).readonly(true).build();
            this.conductorCommands = new ManyToOneRingBuffer(this.control.commandBuffer());
            this.conductorResponses = new CopyBroadcastReceiver(new BroadcastReceiver(this.control.responseBuffer()));
            this.commandsByCorrelationId = new ConcurrentHashMap();
            this.responseHandler = this::handleResponse;
        }

        @Override
        public long nextCorrelationId() {
            return this.conductorCommands.nextCorrelationId();
        }

        @Override
        public int doProcess() {
            return this.conductorResponses.receive(this.responseHandler);
        }

        @Override
        public void doClose() {
            CloseHelper.quietClose((AutoCloseable)this.control);
        }

        @Override
        public CompletableFuture<Long> doResolve(int msgTypeId, DirectBuffer buffer, int index, int length) {
            assert (msgTypeId == 17);
            return this.doCommand(msgTypeId, buffer, index, length, (t, b, i, l) -> this.resolvedRO.wrap(b, i, i + l).authorization());
        }

        @Override
        public CompletableFuture<Long> doRoute(int msgTypeId, DirectBuffer buffer, int index, int length) {
            assert (msgTypeId == 1);
            return this.doCommand(msgTypeId, buffer, index, length, (t, b, i, l) -> this.routedRO.wrap(b, i, i + l).correlationId());
        }

        @Override
        public CompletableFuture<Void> doUnresolve(int msgTypeId, DirectBuffer buffer, int index, int length) {
            assert (msgTypeId == 18);
            return this.doCommand(msgTypeId, buffer, index, length);
        }

        @Override
        public CompletableFuture<Void> doUnroute(int msgTypeId, DirectBuffer buffer, int index, int length) {
            assert (msgTypeId == 2);
            return this.doCommand(msgTypeId, buffer, index, length);
        }

        @Override
        public CompletableFuture<Void> doFreeze(int msgTypeId, DirectBuffer buffer, int index, int length) {
            assert (msgTypeId == 3);
            return this.doCommand(msgTypeId, buffer, index, length);
        }

        @Override
        public CompletableFuture<Void> doCommand(int msgTypeId, DirectBuffer buffer, int index, int length) {
            return this.doCommand(msgTypeId, buffer, index, length, (t, b, i, l) -> null);
        }

        @Override
        public <R> CompletableFuture<R> doCommand(int msgTypeId, DirectBuffer buffer, int index, int length, MessageFunction<R> mapper) {
            CompletableFuture promise = new CompletableFuture();
            CommandFW command = this.commandRO.wrap(buffer, index, index + length);
            long correlationId = command.correlationId();
            PendingCommand pending = new PendingCommand(mapper, promise);
            this.commandSent(correlationId, pending);
            if (!this.conductorCommands.write(msgTypeId, buffer, index, length)) {
                this.commandSendFailed(correlationId);
            }
            return promise;
        }

        private int handleResponse(int msgTypeId, DirectBuffer buffer, int index, int length) {
            ResponseFW response = this.responseRO.wrap(buffer, index, length);
            long correlationId = response.correlationId();
            PendingCommand command = (PendingCommand)this.commandsByCorrelationId.remove(correlationId);
            switch (msgTypeId) {
                case 0x40000000: {
                    this.commandFailed(command, "command failed");
                    break;
                }
                default: {
                    this.commandSucceeded(command, msgTypeId, buffer, index, length);
                }
            }
            return 1;
        }

        private <R> void commandSent(long correlationId, PendingCommand<R> command) {
            this.commandsByCorrelationId.put(correlationId, command);
        }

        private <R> boolean commandSucceeded(PendingCommand<R> command, int msgTypeId, DirectBuffer buffer, int index, int length) {
            return command != null && ((PendingCommand)command).succeeded(msgTypeId, buffer, index, length);
        }

        private boolean commandSendFailed(long correlationId) {
            PendingCommand command = (PendingCommand)this.commandsByCorrelationId.remove(correlationId);
            return this.commandFailed(command, "unable to offer command");
        }

        private boolean commandFailed(PendingCommand<?> command, String message) {
            return command != null && ((PendingCommand)command).failed(message);
        }
    }

    private static final class PendingCommand<R> {
        final MessageFunction<R> mapper;
        final CompletableFuture<R> promise;

        private PendingCommand(MessageFunction<R> mapper, CompletableFuture<R> promise) {
            this.mapper = mapper;
            this.promise = promise;
        }

        private boolean succeeded(int msgTypeId, DirectBuffer buffer, int index, int length) {
            return this.promise.complete(this.mapper.apply(msgTypeId, buffer, index, length));
        }

        private boolean failed(String message) {
            return this.promise.completeExceptionally(new IllegalStateException(message));
        }
    }
}

