/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.system.partitions.impl;

import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.Member;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.atomix.utils.serializer.Serializer;
import io.atomix.utils.serializer.serializers.DefaultSerializers;
import io.camunda.zeebe.broker.system.partitions.PartitionMessagingService;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class AtomixPartitionMessagingService
implements PartitionMessagingService {
    private final ClusterCommunicationService communicationService;
    private final ClusterMembershipService clusterMembershipService;
    private final Supplier<Collection<MemberId>> partitionMembers;
    private final MemberId localMember;

    public AtomixPartitionMessagingService(ClusterCommunicationService communicationService, ClusterMembershipService clusterMembershipService, Supplier<Collection<MemberId>> partitionMembers) {
        this.localMember = clusterMembershipService.getLocalMember().id();
        this.communicationService = communicationService;
        this.clusterMembershipService = clusterMembershipService;
        this.partitionMembers = partitionMembers;
    }

    @Override
    public void subscribe(String subject, Consumer<ByteBuffer> consumer, Executor executor) {
        this.communicationService.consume(subject, arg_0 -> ((Serializer)DefaultSerializers.BASIC).decode(arg_0), consumer, executor);
    }

    @Override
    public void broadcast(String subject, ByteBuffer payload) {
        Set reachableMembers = this.partitionMembers.get().stream().filter(memberId -> !memberId.equals((Object)this.localMember)).filter(this::isReachable).collect(Collectors.toUnmodifiableSet());
        this.communicationService.multicast(subject, (Object)payload, arg_0 -> ((Serializer)DefaultSerializers.BASIC).encode(arg_0), reachableMembers, true);
    }

    @Override
    public void unsubscribe(String subject) {
        this.communicationService.unsubscribe(subject);
    }

    private boolean isReachable(MemberId memberId) {
        return Optional.ofNullable(this.clusterMembershipService.getMember(memberId)).map(Member::isReachable).orElse(false);
    }
}

