/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.transport.stream.impl;

import io.atomix.cluster.MemberId;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.transport.stream.api.ClientStreamConsumer;
import io.camunda.zeebe.transport.stream.api.ClientStreamId;
import io.camunda.zeebe.transport.stream.api.ClientStreamMetrics;
import io.camunda.zeebe.transport.stream.api.NoSuchStreamException;
import io.camunda.zeebe.transport.stream.impl.AggregatedClientStream;
import io.camunda.zeebe.transport.stream.impl.ClientStreamImpl;
import io.camunda.zeebe.transport.stream.impl.ClientStreamRegistry;
import io.camunda.zeebe.transport.stream.impl.ClientStreamRequestManager;
import io.camunda.zeebe.transport.stream.impl.messages.PushStreamRequest;
import io.camunda.zeebe.util.buffer.BufferWriter;
import java.util.Collection;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.agrona.DirectBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class ClientStreamManager<M extends BufferWriter> {
    private static final Logger LOG = LoggerFactory.getLogger(ClientStreamManager.class);
    private final ClientStreamRegistry<M> registry;
    private final ClientStreamRequestManager<M> requestManager;
    private final ClientStreamMetrics metrics;
    private final Set<MemberId> servers = new HashSet<MemberId>();

    ClientStreamManager(ClientStreamRegistry<M> registry, ClientStreamRequestManager<M> requestManager, ClientStreamMetrics metrics) {
        this.registry = registry;
        this.requestManager = requestManager;
        this.metrics = metrics;
    }

    void onServerJoined(MemberId serverId) {
        this.servers.add(serverId);
        this.metrics.serverCount(this.servers.size());
        this.registry.list().forEach(c -> this.requestManager.add((AggregatedClientStream<M>)c, serverId));
    }

    void onServerRemoved(MemberId serverId) {
        this.servers.remove(serverId);
        this.metrics.serverCount(this.servers.size());
        this.requestManager.onServerRemoved(serverId);
    }

    ClientStreamId add(DirectBuffer streamType, M metadata, ClientStreamConsumer clientStreamConsumer) {
        ClientStreamImpl<M> clientStream = this.registry.addClient(streamType, metadata, clientStreamConsumer);
        LOG.debug("Added new client stream [{}]", (Object)clientStream.streamId());
        clientStream.serverStream().open(this.requestManager, this.servers);
        return clientStream.streamId();
    }

    void remove(ClientStreamId streamId) {
        LOG.debug("Removing client stream [{}]", (Object)streamId);
        Optional<AggregatedClientStream<M>> serverStream = this.registry.removeClient(streamId);
        serverStream.ifPresent(stream -> {
            LOG.debug("Removing aggregated stream [{}]", (Object)stream.getStreamId());
            stream.close();
            this.requestManager.remove((AggregatedClientStream<M>)stream, (Collection<MemberId>)this.servers);
        });
    }

    void close() {
        this.registry.clear();
        this.requestManager.removeAll(this.servers);
    }

    public void onPayloadReceived(PushStreamRequest pushStreamRequest, ActorFuture<Void> responseFuture) {
        UUID streamId = pushStreamRequest.streamId();
        DirectBuffer payload = pushStreamRequest.payload();
        responseFuture.onComplete((ok, error) -> {
            if (error != null) {
                this.metrics.pushFailed();
            } else {
                this.metrics.pushSucceeded();
            }
        });
        Optional<AggregatedClientStream<M>> clientStream = this.registry.get(streamId);
        clientStream.ifPresentOrElse(stream -> {
            try {
                stream.push(payload, responseFuture);
            }
            catch (Exception e) {
                responseFuture.completeExceptionally((Throwable)e);
            }
        }, () -> {
            this.requestManager.removeUnreliable(streamId, this.servers);
            LOG.warn("Expected to push payload to stream {}, but no stream found.", (Object)streamId);
            responseFuture.completeExceptionally((Throwable)new NoSuchStreamException("Cannot forward pushed payload as chosen client stream %s was already closed".formatted(streamId)));
        });
    }
}

