/*
 * Decompiled with CFR 0.152.
 */
package ai.eloquent.raft;

import ai.eloquent.data.UDPBroadcastProtos;
import ai.eloquent.data.UDPTransport;
import ai.eloquent.raft.EloquentRaftProto;
import ai.eloquent.raft.RaftAlgorithm;
import ai.eloquent.raft.RaftGrpc;
import ai.eloquent.raft.RaftLifecycle;
import ai.eloquent.raft.RaftTransport;
import ai.eloquent.util.ConcurrencyUtils;
import ai.eloquent.util.IdentityHashSet;
import ai.eloquent.util.SafeTimerTask;
import ai.eloquent.util.Span;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.BindableService;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NetRaftTransport
implements RaftTransport {
    private static final Logger log = LoggerFactory.getLogger(NetRaftTransport.class);
    private static final int DEFAULT_RPC_LISTEN_PORT = 42889;
    public final String serverName;
    public final boolean thread;
    private final Set<RaftAlgorithm> boundAlgorithms = new IdentityHashSet<RaftAlgorithm>();
    private final int rpcListenPort;
    private final Map<String, ManagedChannel> channel = new HashMap<String, ManagedChannel>();

    public NetRaftTransport(String string, int n, final boolean bl) throws IOException {
        this.serverName = string;
        this.rpcListenPort = n;
        this.thread = bl;
        if (!string.matches("[12]?[0-9]?[0-9]\\.[12]?[0-9]?[0-9]\\.[12]?[0-9]?[0-9]\\.[12]?[0-9]?[0-9](_.*)?")) {
            throw new IllegalArgumentException("Invalid server name \"" + string + "\". Server name must start with an IPv4 address, followed by an optional underscore and custom descriptor. For example, \"127.0.0.1_foobar\".");
        }
        ServerBuilder.forPort((int)n).addService((BindableService)new RaftGrpc.RaftImplBase(){

            @Override
            public void rpc(EloquentRaftProto.RaftMessage raftMessage, StreamObserver<EloquentRaftProto.RaftMessage> streamObserver) {
                log.trace("Got an RPC request");
                try {
                    UDPTransport.DEFAULT.get().doAction(bl, "handle inbound RPC", () -> {
                        for (RaftAlgorithm raftAlgorithm : NetRaftTransport.this.boundAlgorithms) {
                            raftAlgorithm.receiveRPC(raftMessage, NetRaftTransport.this.now()).whenComplete((raftMessage, throwable) -> {
                                if (throwable != null || raftMessage == null) {
                                    streamObserver.onError(throwable == null ? new RuntimeException() : throwable);
                                } else {
                                    streamObserver.onNext(raftMessage);
                                    streamObserver.onCompleted();
                                }
                            });
                        }
                    });
                }
                catch (Throwable throwable) {
                    streamObserver.onError(throwable);
                }
            }
        }).build().start();
    }

    public NetRaftTransport(String string) throws IOException {
        this(string, 42889, false);
    }

    @Override
    public void scheduleAtFixedRate(SafeTimerTask safeTimerTask, long l) {
        RaftLifecycle.global.timer.get().scheduleAtFixedRate(safeTimerTask, 0L, l);
    }

    @Override
    public void schedule(SafeTimerTask safeTimerTask, long l) {
        RaftLifecycle.global.timer.get().schedule(safeTimerTask, l);
    }

    @Override
    public synchronized void bind(RaftAlgorithm raftAlgorithm) {
        if (!this.boundAlgorithms.contains(raftAlgorithm)) {
            UDPTransport.DEFAULT.get().bind(UDPBroadcastProtos.MessageType.RAFT, byArray -> {
                log.trace("Received a UDP message");
                try {
                    EloquentRaftProto.RaftMessage raftMessage = EloquentRaftProto.RaftMessage.parseFrom(byArray);
                    if (!raftMessage.getSender().equals(this.serverName)) {
                        assert (ConcurrencyUtils.ensureNoLocksHeld());
                        raftAlgorithm.receiveMessage(raftMessage, raftMessage2 -> this.sendTransport(raftAlgorithm.serverName(), raftMessage.getSender(), (EloquentRaftProto.RaftMessage)raftMessage2), this.now());
                    }
                }
                catch (InvalidProtocolBufferException invalidProtocolBufferException) {
                    log.warn("Not a Raft message: ", (Throwable)invalidProtocolBufferException);
                }
            });
            this.boundAlgorithms.add(raftAlgorithm);
        }
    }

    @Override
    public Collection<RaftAlgorithm> boundAlgorithms() {
        return this.boundAlgorithms;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void rpcTransport(String string, String string3, EloquentRaftProto.RaftMessage raftMessage, Consumer<EloquentRaftProto.RaftMessage> consumer, Runnable runnable, long l) {
        ManagedChannel managedChannel;
        assert (ConcurrencyUtils.ensureNoLocksHeld());
        int n = string3.indexOf("_");
        final String string4 = n > 0 ? string3.substring(0, n) : string3;
        Object object = this;
        synchronized (object) {
            for (Map.Entry<String, ManagedChannel> entry : new HashSet<Map.Entry<String, ManagedChannel>>(this.channel.entrySet())) {
                if (!entry.getValue().isTerminated() && !entry.getValue().isShutdown()) continue;
                entry.getValue().shutdown();
                this.channel.remove(entry.getKey());
            }
            managedChannel = this.channel.computeIfAbsent(string3, string2 -> ManagedChannelBuilder.forAddress((String)string4, (int)this.rpcListenPort).usePlaintext().build());
        }
        object = new AtomicBoolean(true);
        log.trace("Sending RPC request to {} @ ip {}", (Object)string3, (Object)string4);
        ((RaftGrpc.RaftStub)RaftGrpc.newStub((Channel)managedChannel).withDeadlineAfter(l, TimeUnit.MILLISECONDS)).rpc(raftMessage, new StreamObserver<EloquentRaftProto.RaftMessage>((AtomicBoolean)object, consumer, runnable){
            final /* synthetic */ AtomicBoolean val$awaitingResponse;
            final /* synthetic */ Consumer val$onResponseReceived;
            final /* synthetic */ Runnable val$onTimeout;
            {
                this.val$awaitingResponse = atomicBoolean;
                this.val$onResponseReceived = consumer;
                this.val$onTimeout = runnable;
            }

            public void onNext(EloquentRaftProto.RaftMessage raftMessage) {
                log.trace("Got an RPC response from {}", (Object)string4);
                if (this.val$awaitingResponse.getAndSet(false)) {
                    assert (ConcurrencyUtils.ensureNoLocksHeld());
                    this.val$onResponseReceived.accept(raftMessage);
                }
            }

            public void onError(Throwable throwable) {
                if (this.val$awaitingResponse.getAndSet(false)) {
                    assert (ConcurrencyUtils.ensureNoLocksHeld());
                    this.val$onTimeout.run();
                }
            }

            public void onCompleted() {
                if (this.val$awaitingResponse.getAndSet(false)) {
                    assert (ConcurrencyUtils.ensureNoLocksHeld());
                    this.val$onTimeout.run();
                }
            }
        });
    }

    @Override
    public void sendTransport(String string, String string2, EloquentRaftProto.RaftMessage raftMessage) {
        log.trace("Sending a UDP message to {}", (Object)string2);
        assert (ConcurrencyUtils.ensureNoLocksHeld());
        int n = string2.indexOf("_");
        String string3 = n > 0 ? string2.substring(0, n) : string2;
        UDPTransport.DEFAULT.get().sendTransport(string3, UDPBroadcastProtos.MessageType.RAFT, raftMessage.toByteArray());
    }

    @Override
    public void broadcastTransport(String string, EloquentRaftProto.RaftMessage raftMessage) {
        log.trace("Broadcasting a UDP message");
        assert (ConcurrencyUtils.ensureNoLocksHeld());
        UDPTransport.DEFAULT.get().broadcastTransport(UDPBroadcastProtos.MessageType.RAFT, raftMessage.toByteArray());
    }

    @Override
    public Span expectedNetworkDelay() {
        return new Span(10L, 100L);
    }

    public String toString() {
        return "Raft:" + UDPTransport.DEFAULT.get().toString();
    }
}

