/*
 * Decompiled with CFR 0.152.
 */
package org.somda.sdc.dpws.soap.wseventing;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import com.google.inject.name.Named;
import jakarta.xml.bind.JAXBException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.invoke.LambdaMetafactory;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jspecify.annotations.Nullable;
import org.somda.sdc.common.logging.InstanceLogger;
import org.somda.sdc.common.util.ExecutorWrapperService;
import org.somda.sdc.dpws.CommunicationLogContext;
import org.somda.sdc.dpws.TransportBinding;
import org.somda.sdc.dpws.client.ClientEventObserver;
import org.somda.sdc.dpws.client.SubscriptionEvent;
import org.somda.sdc.dpws.factory.TransportBindingFactory;
import org.somda.sdc.dpws.guice.NetworkJobThreadPool;
import org.somda.sdc.dpws.http.HttpException;
import org.somda.sdc.dpws.http.HttpServerRegistry;
import org.somda.sdc.dpws.http.HttpUriBuilder;
import org.somda.sdc.dpws.soap.CommunicationContext;
import org.somda.sdc.dpws.soap.NotificationSink;
import org.somda.sdc.dpws.soap.RequestResponseClient;
import org.somda.sdc.dpws.soap.SoapConstants;
import org.somda.sdc.dpws.soap.SoapFaultHttpStatusCodeMapping;
import org.somda.sdc.dpws.soap.SoapMarshalling;
import org.somda.sdc.dpws.soap.SoapMessage;
import org.somda.sdc.dpws.soap.SoapUtil;
import org.somda.sdc.dpws.soap.exception.MalformedSoapMessageException;
import org.somda.sdc.dpws.soap.exception.SoapFaultException;
import org.somda.sdc.dpws.soap.factory.RequestResponseClientFactory;
import org.somda.sdc.dpws.soap.factory.SoapFaultFactory;
import org.somda.sdc.dpws.soap.model.Fault;
import org.somda.sdc.dpws.soap.wsaddressing.WsAddressingUtil;
import org.somda.sdc.dpws.soap.wsaddressing.model.EndpointReferenceType;
import org.somda.sdc.dpws.soap.wseventing.EventSink;
import org.somda.sdc.dpws.soap.wseventing.SinkSubscriptionManager;
import org.somda.sdc.dpws.soap.wseventing.SubscribeResult;
import org.somda.sdc.dpws.soap.wseventing.exception.SubscriptionNotFoundException;
import org.somda.sdc.dpws.soap.wseventing.exception.SubscriptionRequestResponseClientNotFoundException;
import org.somda.sdc.dpws.soap.wseventing.factory.SubscriptionManagerFactory;
import org.somda.sdc.dpws.soap.wseventing.model.DeliveryType;
import org.somda.sdc.dpws.soap.wseventing.model.FilterType;
import org.somda.sdc.dpws.soap.wseventing.model.GetStatus;
import org.somda.sdc.dpws.soap.wseventing.model.GetStatusResponse;
import org.somda.sdc.dpws.soap.wseventing.model.ObjectFactory;
import org.somda.sdc.dpws.soap.wseventing.model.Renew;
import org.somda.sdc.dpws.soap.wseventing.model.RenewResponse;
import org.somda.sdc.dpws.soap.wseventing.model.Subscribe;
import org.somda.sdc.dpws.soap.wseventing.model.SubscribeResponse;
import org.somda.sdc.dpws.soap.wseventing.model.SubscriptionEnd;
import org.somda.sdc.dpws.soap.wseventing.model.Unsubscribe;
import org.somda.sdc.dpws.soap.wseventing.model.WsEventingStatus;

public class EventSinkImpl
implements EventSink {
    private static final Logger LOG = LogManager.getLogger(EventSinkImpl.class);
    private static final String EVENT_SINK_CONTEXT_PREFIX = "/EventSink/";
    private static final String EVENT_SINK_NOTIFY_TO_CONTEXT_PREFIX = "/EventSink/NotifyTo/";
    private static final String EVENT_SINK_END_TO_CONTEXT_PREFIX = "/EventSink/EndTo/";
    private final RequestResponseClient requestResponseClient;
    private final TransportBindingFactory transportBindingFactory;
    private final RequestResponseClientFactory requestResponseClientFactory;
    private final String hostAddress;
    private final HttpServerRegistry httpServerRegistry;
    private final ObjectFactory wseFactory;
    private final WsAddressingUtil wsaUtil;
    private final SoapMarshalling marshalling;
    private final SoapUtil soapUtil;
    private final ExecutorWrapperService<ListeningExecutorService> executorService;
    private final SubscriptionManagerFactory subscriptionManagerFactory;
    private final Map<String, SubscriptionData> subscriptionData;
    private final Lock subscriptionsLock;
    private final Duration maxWaitForFutures;
    private final @Nullable CommunicationLogContext communicationLogContext;
    private final Logger instanceLogger;
    private final HttpUriBuilder httpUriBuilder;
    private final SoapFaultFactory soapFaultFactory;
    private final ClientEventObserver clientEventObserver;

    @AssistedInject
    EventSinkImpl(@Assisted RequestResponseClient requestResponseClient, @Assisted String hostAddress, @Assisted @Nullable CommunicationLogContext communicationLogContext, @Assisted ClientEventObserver clientEventObserver, @Named(value="Dpws.MaxWaitForFutures") Duration maxWaitForFutures, HttpServerRegistry httpServerRegistry, ObjectFactory wseFactory, WsAddressingUtil wsaUtil, SoapMarshalling marshalling, SoapUtil soapUtil, @NetworkJobThreadPool ExecutorWrapperService<ListeningExecutorService> executorService, SubscriptionManagerFactory subscriptionManagerFactory, @Named(value="Common.InstanceIdentifier") String frameworkIdentifier, TransportBindingFactory transportBindingFactory, RequestResponseClientFactory requestResponseClientFactory, SoapFaultFactory soapFaultFactory) {
        this.instanceLogger = InstanceLogger.wrapLogger((Logger)LOG, (String)frameworkIdentifier);
        this.requestResponseClient = requestResponseClient;
        this.transportBindingFactory = transportBindingFactory;
        this.requestResponseClientFactory = requestResponseClientFactory;
        this.hostAddress = hostAddress;
        this.communicationLogContext = communicationLogContext;
        this.maxWaitForFutures = maxWaitForFutures;
        this.httpServerRegistry = httpServerRegistry;
        this.wseFactory = wseFactory;
        this.wsaUtil = wsaUtil;
        this.marshalling = marshalling;
        this.soapUtil = soapUtil;
        this.executorService = executorService;
        this.subscriptionManagerFactory = subscriptionManagerFactory;
        this.soapFaultFactory = soapFaultFactory;
        this.clientEventObserver = clientEventObserver;
        this.subscriptionData = new ConcurrentHashMap<String, SubscriptionData>();
        this.subscriptionsLock = new ReentrantLock();
        this.httpUriBuilder = new HttpUriBuilder();
    }

    @Override
    public ListenableFuture<SubscribeResult> subscribe(String filterDialect, List<Object> filters, @Nullable Duration expires, NotificationSink notificationSink) {
        return ((ListeningExecutorService)this.executorService.get()).submit(() -> {
            String subscriptionId = UUID.randomUUID().toString();
            String endToContext = EVENT_SINK_END_TO_CONTEXT_PREFIX + subscriptionId;
            String endToUri = this.httpServerRegistry.registerContext(this.hostAddress, true, endToContext, null, this.communicationLogContext, (inStream, outStream, communicationContext) -> this.processIncomingEndTo(inStream, outStream, subscriptionId));
            String notifyToContext = EVENT_SINK_NOTIFY_TO_CONTEXT_PREFIX + subscriptionId;
            String notifyToUri = this.httpServerRegistry.registerContext(this.hostAddress, true, notifyToContext, null, this.communicationLogContext, (inStream, outStream, communicationContext) -> this.processIncomingNotification(notificationSink, inStream, outStream, communicationContext, subscriptionId));
            Subscribe subscribeBody = this.wseFactory.createSubscribe();
            DeliveryType deliveryType = this.wseFactory.createDeliveryType();
            deliveryType.setMode("http://schemas.xmlsoap.org/ws/2004/08/eventing/DeliveryModes/Push");
            EndpointReferenceType notifyToEpr = this.wsaUtil.createEprWithAddress(notifyToUri);
            deliveryType.setContent(List.of(this.wseFactory.createNotifyTo(notifyToEpr)));
            subscribeBody.setDelivery(deliveryType);
            EndpointReferenceType endToEpr = this.wsaUtil.createEprWithAddress(endToUri);
            subscribeBody.setEndTo(endToEpr);
            FilterType filterType = this.wseFactory.createFilterType();
            filterType.setDialect(filterDialect);
            filterType.setContent(filters);
            subscribeBody.setExpires(expires);
            subscribeBody.setFilter(filterType);
            SoapMessage subscribeRequest = this.soapUtil.createMessage("http://schemas.xmlsoap.org/ws/2004/08/eventing/Subscribe", subscribeBody);
            SoapMessage soapResponse = this.requestResponseClient.sendRequestResponse(subscribeRequest);
            SubscribeResponse responseBody = this.soapUtil.getBody(soapResponse, SubscribeResponse.class).orElseThrow(() -> new MalformedSoapMessageException("Cannot read WS-Eventing Subscribe response"));
            SinkSubscriptionManager sinkSubMan = this.subscriptionManagerFactory.createSinkSubscriptionManager(responseBody.getSubscriptionManager(), responseBody.getExpires(), notifyToEpr, endToEpr, filters, filterDialect, subscriptionId);
            this.clientEventObserver.onSubscriptionEvent(new SubscriptionEvent.Start(subscriptionId));
            TransportBinding tBinding = this.transportBindingFactory.createTransportBinding(responseBody.getSubscriptionManager().getAddress().getValue(), null);
            RequestResponseClient rrClient = this.requestResponseClientFactory.createRequestResponseClient(tBinding);
            this.subscriptionsLock.lock();
            try {
                this.subscriptionData.put(subscriptionId, new SubscriptionData(sinkSubMan, rrClient));
            }
            finally {
                this.subscriptionsLock.unlock();
            }
            return new SubscribeResult(subscriptionId, sinkSubMan.getExpires());
        });
    }

    @Override
    public ListenableFuture<Duration> renew(String subscriptionId, Duration expires) {
        return ((ListeningExecutorService)this.executorService.get()).submit(() -> {
            SinkSubscriptionManager subMan = this.getSubscriptionManagerProxy(subscriptionId);
            RequestResponseClient subscriptionRequestResponseClient = this.getSubscriptionRequestResponseClient(subscriptionId);
            Renew renew = this.wseFactory.createRenew();
            renew.setExpires(expires);
            String subManAddress = this.wsaUtil.getAddressUri(subMan.getSubscriptionManagerEpr()).orElseThrow(() -> new RuntimeException("No subscription manager EPR found"));
            SoapMessage renewMsg = this.soapUtil.createMessage("http://schemas.xmlsoap.org/ws/2004/08/eventing/Renew", subManAddress, renew, subMan.getSubscriptionManagerEpr().getReferenceParameters());
            SoapMessage renewResMsg = subscriptionRequestResponseClient.sendRequestResponse(renewMsg);
            RenewResponse renewResponse = this.soapUtil.getBody(renewResMsg, RenewResponse.class).orElseThrow(() -> new MalformedSoapMessageException("WS-Eventing RenewResponse message is malformed"));
            Duration newExpires = renewResponse.getExpires();
            subMan.renew(newExpires);
            return newExpires;
        });
    }

    @Override
    public ListenableFuture<Duration> getStatus(String subscriptionId) {
        return ((ListeningExecutorService)this.executorService.get()).submit(() -> {
            SinkSubscriptionManager subMan = this.getSubscriptionManagerProxy(subscriptionId);
            RequestResponseClient subscriptionRequestResponseClient = this.getSubscriptionRequestResponseClient(subscriptionId);
            GetStatus getStatus = this.wseFactory.createGetStatus();
            String subManAddress = this.wsaUtil.getAddressUri(subMan.getSubscriptionManagerEpr()).orElseThrow(() -> new RuntimeException("No subscription manager EPR found"));
            SoapMessage getStatusMsg = this.soapUtil.createMessage("http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatus", subManAddress, getStatus, subMan.getSubscriptionManagerEpr().getReferenceParameters());
            SoapMessage getStatusResMsg = subscriptionRequestResponseClient.sendRequestResponse(getStatusMsg);
            GetStatusResponse getStatusResponse = this.soapUtil.getBody(getStatusResMsg, GetStatusResponse.class).orElseThrow(() -> new MalformedSoapMessageException("WS-Eventing GetStatusResponse message is malformed"));
            return getStatusResponse.getExpires();
        });
    }

    private SubscriptionData removeSubscriptionData(String subscriptionId) {
        SubscriptionData subscription = this.removeSubscription(subscriptionId);
        if (subscription == null) {
            throw new SubscriptionNotFoundException("Subscription with id " + subscriptionId + " not found");
        }
        return subscription;
    }

    @Override
    public ListenableFuture<?> unsubscribe(final String subscriptionId) {
        LOG.debug("Unsubscribing for subscription {}", (Object)subscriptionId);
        SubscriptionData removedData = this.removeSubscriptionData(subscriptionId);
        final SinkSubscriptionManager subMan = removedData.subscriptionManager;
        RequestResponseClient subscriptionRequestResponseClient = removedData.requestResponseClient;
        ListenableFuture unsubscribeFuture = ((ListeningExecutorService)this.executorService.get()).submit(() -> {
            Unsubscribe unsubscribe = this.wseFactory.createUnsubscribe();
            String subManAddress = this.wsaUtil.getAddressUri(subMan.getSubscriptionManagerEpr()).orElseThrow(() -> new RuntimeException("No subscription manager EPR found"));
            SoapMessage unsubscribeMsg = this.soapUtil.createMessage("http://schemas.xmlsoap.org/ws/2004/08/eventing/Unsubscribe", subManAddress, unsubscribe, subMan.getSubscriptionManagerEpr().getReferenceParameters());
            subscriptionRequestResponseClient.sendRequestResponse(unsubscribeMsg);
            this.unregisterContextPaths(subMan);
            return new Object();
        });
        Futures.addCallback((ListenableFuture)unsubscribeFuture, (FutureCallback)new FutureCallback<Object>(){

            public void onSuccess(Object ignored) {
                EventSinkImpl.this.clientEventObserver.onSubscriptionEvent(new SubscriptionEvent.End(subscriptionId, null));
            }

            public void onFailure(Throwable t) {
                EventSinkImpl.this.unregisterContextPaths(subMan);
                EventSinkImpl.this.clientEventObserver.onSubscriptionEvent(new SubscriptionEvent.Failed(subscriptionId, "Error unsubscribing", t));
            }
        }, (Executor)MoreExecutors.directExecutor());
        return unsubscribeFuture;
    }

    @Override
    public void unsubscribeAll() {
        for (SubscriptionData subData : new ArrayList<SubscriptionData>(this.subscriptionData.values())) {
            String subscriptionId = subData.subscriptionManager.getSubscriptionId();
            ListenableFuture<?> future = this.unsubscribe(subscriptionId);
            try {
                future.get(this.maxWaitForFutures.toSeconds(), TimeUnit.SECONDS);
            }
            catch (InterruptedException | ExecutionException e) {
                this.instanceLogger.warn("Subscription {} could not be unsubscribed. Ignore.", (Object)subscriptionId);
                this.instanceLogger.trace("Subscription {} could not be unsubscribed", (Object)subscriptionId, (Object)e);
            }
            catch (TimeoutException e) {
                this.instanceLogger.warn("Subscription {} could not be unsubscribed, timeout after {}s. Ignore.", (Object)subscriptionId, (Object)this.maxWaitForFutures.toSeconds());
                this.instanceLogger.trace("Subscription {} could not be unsubscribed, timeout after {}s", (Object)subscriptionId, (Object)this.maxWaitForFutures.toSeconds(), (Object)e);
                future.cancel(true);
            }
        }
    }

    private void unregisterUri(URI fullUri) {
        String uriWithoutPath = this.httpUriBuilder.buildUri(fullUri.getScheme(), fullUri.getHost(), fullUri.getPort());
        this.httpServerRegistry.unregisterContext(uriWithoutPath, fullUri.getPath());
    }

    private void processIncomingEndTo(InputStream inputStream, OutputStream outputStream, String subscriptionId) throws HttpException {
        SubscriptionEnd subscriptionEnd;
        SoapMessage soapMsg;
        try {
            soapMsg = this.soapUtil.createMessage(this.marshalling.unmarshal(inputStream));
        }
        catch (JAXBException exception) {
            this.handleInboundException((Exception)((Object)exception), outputStream, "An error occurred during the processing of the inbound message", subscriptionId);
            return;
        }
        this.instanceLogger.debug("Received message to EndTo address: {}", (Object)soapMsg);
        try {
            subscriptionEnd = this.soapUtil.getBody(soapMsg, SubscriptionEnd.class).orElseThrow();
        }
        catch (NoSuchElementException exception) {
            this.handleInboundException(exception, outputStream, "Inbound message to EndTo endpoint did not represent a SubscriptionEnd message", subscriptionId);
            throw new HttpException(400, "Received message to EndTo address that is not a SubscriptionEnd");
        }
        this.processSubscriptionEnd(subscriptionEnd, subscriptionId, outputStream);
    }

    private void processSubscriptionEnd(SubscriptionEnd subscriptionEnd, String subscriptionId, OutputStream outputStream) throws HttpException {
        SubscriptionData subData;
        WsEventingStatus status;
        try {
            status = WsEventingStatus.fromString(subscriptionEnd.getStatus());
        }
        catch (IllegalArgumentException exception) {
            this.instanceLogger.error("Incoming SubscriptionEnd presented an unknown status: {}", (Object)subscriptionEnd.getStatus(), (Object)exception);
            this.clientEventObserver.onSubscriptionEvent(new SubscriptionEvent.Failed(subscriptionId, "Processing incoming message on EndTo found an unknown status", exception));
            return;
        }
        try {
            subData = this.removeSubscriptionData(subscriptionId);
        }
        catch (SubscriptionNotFoundException exception) {
            this.instanceLogger.error("Error processing incoming message", (Throwable)exception);
            this.clientEventObserver.onSubscriptionEvent(new SubscriptionEvent.Failed(subscriptionId, "Processing incoming message on EndTo led to unknown subscription", exception));
            return;
        }
        this.unregisterContextPaths(subData.subscriptionManager);
        this.clientEventObserver.onSubscriptionEvent(new SubscriptionEvent.End(subscriptionId, status));
        this.closeOutputStream(outputStream, subscriptionId);
    }

    private void unregisterContextPaths(SinkSubscriptionManager subscriptionManager) {
        Optional<URI> endToUri = subscriptionManager.getEndTo().map(it -> it.getAddress().getValue()).map(URI::create);
        URI notifyToUri = URI.create(subscriptionManager.getNotifyTo().getAddress().getValue());
        endToUri.ifPresent(this::unregisterUri);
        this.unregisterUri(notifyToUri);
    }

    private void closeOutputStream(OutputStream outputStream, String subscriptionId) throws HttpException {
        try {
            outputStream.close();
        }
        catch (IOException exception) {
            this.clientEventObserver.onSubscriptionEvent(new SubscriptionEvent.MessageProcessingFailed(subscriptionId, "Closing the output stream for the response triggered an IOException", exception));
            throw new HttpException(500, "Error closing output stream");
        }
    }

    private void processIncomingNotification(NotificationSink notificationSink, InputStream inputStream, OutputStream outputStream, CommunicationContext communicationContext, String subscriptionId) throws HttpException {
        SoapMessage soapMsg;
        try {
            soapMsg = this.soapUtil.createMessage(this.marshalling.unmarshal(inputStream));
        }
        catch (JAXBException exception) {
            this.handleInboundException((Exception)((Object)exception), outputStream, "An error occurred during the processing of the inbound message", subscriptionId);
            return;
        }
        this.instanceLogger.debug("Received incoming notification {}", (Object)soapMsg);
        try {
            notificationSink.receiveNotification(soapMsg, communicationContext);
        }
        catch (SoapFaultException e) {
            this.instanceLogger.error("Error processing incoming message", (Throwable)e);
            this.sendFaultResponse(e.getFaultMessage(), outputStream);
            this.clientEventObserver.onSubscriptionEvent(new SubscriptionEvent.MessageProcessingFailed(subscriptionId, "Processing incoming message triggered SOAP fault", e));
            throw new HttpException(SoapFaultHttpStatusCodeMapping.get(e.getFault()));
        }
        catch (Exception e) {
            this.clientEventObserver.onSubscriptionEvent(new SubscriptionEvent.MessageProcessingFailed(subscriptionId, "Processing incoming message triggered exception", e));
            throw new HttpException(500, this.extractExceptionMessage(e));
        }
        this.closeOutputStream(outputStream, subscriptionId);
    }

    private String extractExceptionMessage(Exception e) {
        return e.getMessage() != null ? e.getMessage() : e.toString();
    }

    private void handleInboundException(Exception exception, OutputStream outputStream, String rationale, String subscriptionId) throws HttpException {
        LOG.error("Error processing incoming message for subscription {}", (Object)subscriptionId, (Object)exception);
        this.clientEventObserver.onSubscriptionEvent(new SubscriptionEvent.MessageProcessingFailed(subscriptionId, rationale, exception));
        SoapMessage faultMessageToSent = this.soapFaultFactory.createFault("http://www.w3.org/2005/08/addressing/fault", SoapConstants.SENDER, SoapConstants.DEFAULT_SUBCODE, rationale);
        this.sendFaultResponse(faultMessageToSent, outputStream);
        Fault fault = this.soapUtil.getBody(faultMessageToSent, Fault.class).orElseThrow();
        throw new HttpException(SoapFaultHttpStatusCodeMapping.get(fault));
    }

    private void sendFaultResponse(SoapMessage soapFault, OutputStream outputStream) throws HttpException {
        try {
            this.marshalling.marshal(soapFault.getEnvelopeWithMappedHeaders(), outputStream);
        }
        catch (JAXBException exception) {
            throw new HttpException(500, this.extractExceptionMessage((Exception)((Object)exception)));
        }
        finally {
            try {
                outputStream.close();
            }
            catch (IOException e) {
                this.instanceLogger.error("Error closing output stream while sending SOAP fault", (Throwable)e);
            }
        }
    }

    private SinkSubscriptionManager getSubscriptionManagerProxy(String subscriptionId) {
        this.subscriptionsLock.lock();
        try {
            SinkSubscriptionManager sinkSubscriptionManager = Optional.ofNullable(this.subscriptionData.get((Object)subscriptionId)).orElseThrow((Supplier<SubscriptionNotFoundException>)LambdaMetafactory.metafactory(null, null, null, ()Ljava/lang/Object;, <init>(), ()Lorg/somda/sdc/dpws/soap/wseventing/exception/SubscriptionNotFoundException;)()).subscriptionManager;
            return sinkSubscriptionManager;
        }
        finally {
            this.subscriptionsLock.unlock();
        }
    }

    private RequestResponseClient getSubscriptionRequestResponseClient(String subscriptionId) {
        this.subscriptionsLock.lock();
        try {
            RequestResponseClient requestResponseClient = Optional.ofNullable(this.subscriptionData.get((Object)subscriptionId)).orElseThrow((Supplier<SubscriptionRequestResponseClientNotFoundException>)LambdaMetafactory.metafactory(null, null, null, ()Ljava/lang/Object;, <init>(), ()Lorg/somda/sdc/dpws/soap/wseventing/exception/SubscriptionRequestResponseClientNotFoundException;)()).requestResponseClient;
            return requestResponseClient;
        }
        finally {
            this.subscriptionsLock.unlock();
        }
    }

    private @Nullable SubscriptionData removeSubscription(String subscriptionId) {
        this.subscriptionsLock.lock();
        try {
            SubscriptionData subscriptionData = this.subscriptionData.remove(subscriptionId);
            return subscriptionData;
        }
        finally {
            this.subscriptionsLock.unlock();
        }
    }

    private record SubscriptionData(SinkSubscriptionManager subscriptionManager, RequestResponseClient requestResponseClient) {
    }
}

