/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.transport.stream.impl;

import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorControl;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.transport.stream.api.ClientStream;
import io.camunda.zeebe.transport.stream.api.ClientStreamConsumer;
import io.camunda.zeebe.transport.stream.api.ClientStreamId;
import io.camunda.zeebe.transport.stream.api.ClientStreamMetrics;
import io.camunda.zeebe.transport.stream.api.ClientStreamService;
import io.camunda.zeebe.transport.stream.api.ClientStreamer;
import io.camunda.zeebe.transport.stream.impl.ClientStreamManager;
import io.camunda.zeebe.transport.stream.impl.ClientStreamRegistry;
import io.camunda.zeebe.transport.stream.impl.ClientStreamRequestManager;
import io.camunda.zeebe.transport.stream.impl.messages.MessageUtil;
import io.camunda.zeebe.transport.stream.impl.messages.PushStreamRequest;
import io.camunda.zeebe.transport.stream.impl.messages.StreamTopics;
import io.camunda.zeebe.util.buffer.BufferWriter;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.agrona.DirectBuffer;

public final class ClientStreamServiceImpl<M extends BufferWriter>
extends Actor
implements ClientStreamer<M>,
ClientStreamService<M> {
    private static final byte[] SUCCESS_RESPONSE = new byte[0];
    private final ClientStreamManager<M> clientStreamManager;
    private final ClusterCommunicationService communicationService;
    private final ClientStreamRegistry<M> registry;

    public ClientStreamServiceImpl(ClusterCommunicationService communicationService, ClientStreamMetrics metrics) {
        this.communicationService = communicationService;
        this.registry = new ClientStreamRegistry(metrics);
        this.clientStreamManager = new ClientStreamManager<M>(this.registry, new ClientStreamRequestManager(communicationService, (ConcurrencyControl)this.actor), metrics);
    }

    protected void onActorStarted() {
        this.communicationService.replyTo(StreamTopics.PUSH.topic(), MessageUtil::parsePushRequest, request -> {
            CompletableFuture responseFuture = new CompletableFuture();
            this.actor.run(() -> {
                try {
                    CompletableActorFuture payloadPushed = new CompletableActorFuture();
                    this.clientStreamManager.onPayloadReceived((PushStreamRequest)request, (ActorFuture<Void>)payloadPushed);
                    payloadPushed.onComplete((ok, error) -> {
                        if (error == null) {
                            responseFuture.complete(null);
                        } else {
                            responseFuture.completeExceptionally((Throwable)error);
                        }
                    });
                }
                catch (Exception e) {
                    responseFuture.completeExceptionally(e);
                }
            });
            return responseFuture;
        }, ignore -> SUCCESS_RESPONSE);
        this.communicationService.replyTo(StreamTopics.RESTART_STREAMS.topic(), Function.identity(), (memberId, ignore) -> {
            this.clientStreamManager.onServerRemoved(MemberId.from((String)((String)((Object)memberId.id()))));
            this.clientStreamManager.onServerJoined(MemberId.from((String)((String)((Object)memberId.id()))));
            return SUCCESS_RESPONSE;
        }, Function.identity(), arg_0 -> ((ActorControl)this.actor).run(arg_0));
    }

    protected void onActorCloseRequested() {
        this.clientStreamManager.close();
    }

    @Override
    public ActorFuture<ClientStreamId> add(DirectBuffer streamType, M metadata, ClientStreamConsumer clientStreamConsumer) {
        return this.actor.call(() -> this.clientStreamManager.add(streamType, (BufferWriter)metadata, clientStreamConsumer));
    }

    @Override
    public ActorFuture<Void> remove(ClientStreamId streamId) {
        return this.actor.call(() -> this.clientStreamManager.remove(streamId));
    }

    @Override
    public ActorFuture<Void> start(ActorSchedulingService schedulingService) {
        return schedulingService.submitActor((Actor)this);
    }

    @Override
    public void onServerJoined(MemberId memberId) {
        this.actor.run(() -> this.clientStreamManager.onServerJoined(memberId));
    }

    @Override
    public void onServerRemoved(MemberId memberId) {
        this.actor.run(() -> this.clientStreamManager.onServerRemoved(memberId));
    }

    @Override
    public ClientStreamer<M> streamer() {
        return this;
    }

    @Override
    public ActorFuture<Optional<ClientStream<M>>> streamFor(ClientStreamId id) {
        return this.actor.call(() -> this.registry.getClient(id).map(s -> s));
    }

    @Override
    public ActorFuture<Collection<ClientStream<M>>> streams() {
        return this.actor.call(() -> this.registry.list().stream().flatMap(agg -> agg.list().stream()).map(s -> s).toList());
    }
}

