/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.spring.integration.eventhub.support;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventprocessorhost.PartitionContext;
import com.microsoft.azure.spring.integration.core.api.PartitionSupplier;
import com.microsoft.azure.spring.integration.core.api.StartPosition;
import com.microsoft.azure.spring.integration.eventhub.api.EventHubClientFactory;
import com.microsoft.azure.spring.integration.eventhub.impl.EventHubProcessor;
import com.microsoft.azure.spring.integration.eventhub.impl.EventHubTemplate;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import org.springframework.messaging.Message;

public class EventHubTestOperation
extends EventHubTemplate {
    private final Multimap<String, EventData> eventHubsByName = ArrayListMultimap.create();
    private final Map<String, Map<String, EventHubProcessor>> processorsByNameAndGroup = new ConcurrentHashMap<String, Map<String, EventHubProcessor>>();
    private final Supplier<PartitionContext> partitionContextSupplier;

    public EventHubTestOperation(EventHubClientFactory clientFactory, Supplier<PartitionContext> partitionContextSupplier) {
        super(clientFactory);
        this.partitionContextSupplier = partitionContextSupplier;
    }

    public <U> CompletableFuture<Void> sendAsync(String name, Message<U> message, PartitionSupplier partitionSupplier) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        EventData azureMessage = (EventData)this.getMessageConverter().fromMessage(message, EventData.class);
        this.eventHubsByName.put((Object)name, (Object)azureMessage);
        this.processorsByNameAndGroup.putIfAbsent(name, new ConcurrentHashMap());
        this.processorsByNameAndGroup.get(name).values().forEach(c -> {
            try {
                c.onEvents(this.partitionContextSupplier.get(), Collections.singleton(azureMessage));
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        });
        future.complete(null);
        return future;
    }

    @Override
    protected synchronized void register(String name, String group, EventHubProcessor eventProcessor) {
        this.processorsByNameAndGroup.putIfAbsent(name, new ConcurrentHashMap());
        this.processorsByNameAndGroup.get(name).putIfAbsent(group, eventProcessor);
        if (this.getStartPosition() == StartPosition.EARLIEST) {
            this.processorsByNameAndGroup.get(name).values().forEach(c -> {
                try {
                    c.onEvents(this.partitionContextSupplier.get(), this.eventHubsByName.get((Object)name));
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    }

    @Override
    public boolean unsubscribe(String name, String consumerGroup) {
        this.processorsByNameAndGroup.get(name).remove(consumerGroup);
        return true;
    }
}

