/*
 * Decompiled with CFR 0.152.
 */
package com.obsidiandynamics.jgroups;

import com.obsidiandynamics.func.ExceptionHandler;
import com.obsidiandynamics.func.LogLine;
import com.obsidiandynamics.jgroups.GroupMessageHandler;
import com.obsidiandynamics.jgroups.HostMessageHandler;
import com.obsidiandynamics.jgroups.ResponseSync;
import com.obsidiandynamics.jgroups.SyncPacket;
import java.io.PrintStream;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.Receiver;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;

public final class Group
implements AutoCloseable {
    private final JChannel channel;
    private final Set<HostMessageHandler> generalHandlers = new CopyOnWriteArraySet<HostMessageHandler>();
    private final ConcurrentMap<Serializable, Set<HostMessageHandler>> idHandlers = new ConcurrentHashMap<Serializable, Set<HostMessageHandler>>();
    private LogLine debug = LogLine.nop();
    private ExceptionHandler errorHandler = ExceptionHandler.forPrintStream((PrintStream)System.err);

    public Group(final JChannel channel) {
        this.channel = channel;
        channel.setDiscardOwnMessages(true);
        channel.setReceiver((Receiver)new ReceiverAdapter(){

            public void receive(Message msg) {
                Group.this.debug.printf("Received %s", new Object[]{msg});
                try {
                    for (HostMessageHandler onMessage : Group.this.generalHandlers) {
                        onMessage.handle(channel, msg);
                    }
                    Object payload = msg.getObject();
                    if (payload instanceof SyncPacket) {
                        SyncPacket syncMessage = (SyncPacket)payload;
                        Set handlers = (Set)Group.this.idHandlers.get(syncMessage.getId());
                        if (handlers != null) {
                            for (HostMessageHandler handler : handlers) {
                                handler.handle(channel, msg);
                            }
                        }
                    }
                }
                catch (Throwable e) {
                    Group.this.errorHandler.onException(String.format("Exception processing message %s", msg), e);
                }
            }
        });
    }

    public Group withDebug(LogLine debug) {
        this.debug = debug;
        return this;
    }

    public Group withErrorHandler(ExceptionHandler errorHandler) {
        this.errorHandler = errorHandler;
        return this;
    }

    public Group withMessageHandler(HostMessageHandler handler) {
        this.generalHandlers.add(handler);
        return this;
    }

    public int numMessageHandlers() {
        return this.generalHandlers.size();
    }

    public void removeMessageHandler(HostMessageHandler handler) {
        this.generalHandlers.remove(handler);
    }

    public Group withMessageHandler(Serializable id, HostMessageHandler handler) {
        this.idHandlers.computeIfAbsent(id, key -> new CopyOnWriteArraySet()).add(handler);
        return this;
    }

    public void send(Message message) throws Exception {
        this.channel.send(message);
    }

    public int numMessageHandlers(Serializable id) {
        return this.idHandlers.getOrDefault(id, Collections.emptySet()).size();
    }

    public void removeMessageHandler(Serializable id, HostMessageHandler handler) {
        this.idHandlers.computeIfPresent(id, (key, handlers) -> {
            handlers.remove(handler);
            return handlers.isEmpty() ? null : handlers;
        });
    }

    public CompletableFuture<Message> request(Address address, SyncPacket syncMessage, Message.Flag ... flags) throws Exception {
        CompletableFuture<Message> f = new CompletableFuture<Message>();
        ResponseSync rs = this.request(address, syncMessage, (JChannel channel, Message message) -> f.complete(message), flags);
        f.whenComplete((message, throwable) -> {
            if (f.isCancelled()) {
                rs.cancel();
            }
        });
        return f;
    }

    public ResponseSync request(Address address, SyncPacket syncMessage, final HostMessageHandler handler, Message.Flag ... flags) throws Exception {
        final Serializable id = syncMessage.getId();
        HostMessageHandler idHandler = new HostMessageHandler(){

            @Override
            public void handle(JChannel channel, Message resp) throws Exception {
                Group.this.removeMessageHandler(id, this);
                handler.handle(channel, resp);
            }
        };
        this.withMessageHandler(id, idHandler);
        this.channel.send(new Message(address, (Object)syncMessage).setFlag(flags));
        return new ResponseSync(this, id, idHandler);
    }

    public CompletableFuture<Map<Address, Message>> gather(SyncPacket syncMessage, Message.Flag ... flags) throws Exception {
        return this.gather(this.channel.getView().size() - 1, syncMessage, flags);
    }

    public CompletableFuture<Map<Address, Message>> gather(int respondents, SyncPacket syncMessage, Message.Flag ... flags) throws Exception {
        CompletableFuture<Map<Address, Message>> f = new CompletableFuture<Map<Address, Message>>();
        ResponseSync rs = this.gather(respondents, syncMessage, (JChannel __, Map<Address, Message> messages) -> f.complete(messages), flags);
        f.whenComplete((message, throwable) -> {
            if (f.isCancelled()) {
                rs.cancel();
            }
        });
        return f;
    }

    public ResponseSync gather(SyncPacket syncMessage, GroupMessageHandler handler, Message.Flag ... flags) throws Exception {
        return this.gather(this.channel.getView().size() - 1, syncMessage, handler, flags);
    }

    public ResponseSync gather(final int respondents, SyncPacket syncMessage, final GroupMessageHandler handler, Message.Flag ... flags) throws Exception {
        final ConcurrentHashMap responses = new ConcurrentHashMap();
        final Serializable id = syncMessage.getId();
        HostMessageHandler idHandler = new HostMessageHandler(){

            @Override
            public void handle(JChannel channel, Message resp) {
                responses.put(resp.getSrc(), resp);
                if (responses.size() == respondents) {
                    Group.this.removeMessageHandler(id, this);
                    handler.handle(channel, Collections.unmodifiableMap(responses));
                }
            }
        };
        this.withMessageHandler(id, idHandler);
        this.channel.send(new Message(null, (Object)syncMessage).setFlag(flags));
        return new ResponseSync(this, id, idHandler);
    }

    public Group connect(String clusterName) throws Exception {
        this.channel.connect(clusterName);
        return this;
    }

    public JChannel channel() {
        return this.channel;
    }

    public View view() {
        return this.channel.view();
    }

    public Set<Address> peers() {
        Address current = this.channel.getAddress();
        HashSet<Address> addresses = new HashSet<Address>(this.channel.view().getMembers());
        addresses.remove(current);
        return addresses;
    }

    public Address peer() {
        return this.peers().iterator().next();
    }

    @Override
    public void close() {
        this.channel.close();
    }
}

