/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.extensions.multitenancy.configuration;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.config.Configuration;
import org.axonframework.config.EventProcessingConfigurer;
import org.axonframework.config.EventProcessingModule;
import org.axonframework.eventhandling.DirectEventProcessingStrategy;
import org.axonframework.eventhandling.EventHandlerInvoker;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventProcessingStrategy;
import org.axonframework.eventhandling.EventProcessor;
import org.axonframework.eventhandling.SubscribingEventProcessor;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingEventProcessor;
import org.axonframework.eventhandling.TrackingEventProcessorConfiguration;
import org.axonframework.eventhandling.pooled.PooledStreamingEventProcessor;
import org.axonframework.extensions.multitenancy.TenantWrappedTransactionManager;
import org.axonframework.extensions.multitenancy.components.TenantDescriptor;
import org.axonframework.extensions.multitenancy.components.TenantProvider;
import org.axonframework.extensions.multitenancy.components.deadletterqueue.MultiTenantDeadLetterProcessor;
import org.axonframework.extensions.multitenancy.components.deadletterqueue.MultiTenantDeadLetterQueue;
import org.axonframework.extensions.multitenancy.components.deadletterqueue.MultiTenantDeadLetterQueueFactory;
import org.axonframework.extensions.multitenancy.components.eventhandeling.MultiTenantEventProcessor;
import org.axonframework.extensions.multitenancy.components.eventstore.MultiTenantEventStore;
import org.axonframework.extensions.multitenancy.components.eventstore.MultiTenantSubscribableMessageSource;
import org.axonframework.extensions.multitenancy.configuration.MultiTenantEventProcessorPredicate;
import org.axonframework.extensions.multitenancy.configuration.MultiTenantStreamableMessageSourceProvider;
import org.axonframework.messaging.StreamableMessageSource;
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.messaging.deadletter.SequencedDeadLetterProcessor;
import org.axonframework.messaging.deadletter.SequencedDeadLetterQueue;

public class MultiTenantEventProcessingModule
extends EventProcessingModule {
    private final TenantProvider tenantProvider;
    private final MultiTenantStreamableMessageSourceProvider multiTenantStreamableMessageSourceProvider;
    private final MultiTenantEventProcessorPredicate multiTenantEventProcessorPredicate;
    protected final MultiTenantDeadLetterQueueFactory<EventMessage<?>> multiTenantDeadLetterQueueFactory;

    public MultiTenantEventProcessingModule(TenantProvider tenantProvider) {
        this.tenantProvider = tenantProvider;
        this.multiTenantDeadLetterQueueFactory = null;
        this.multiTenantStreamableMessageSourceProvider = (defaultSource, processorName, tenantDescriptor, configuration) -> defaultSource;
        this.multiTenantEventProcessorPredicate = MultiTenantEventProcessorPredicate.enableMultiTenancy();
    }

    public MultiTenantEventProcessingModule(TenantProvider tenantProvider, MultiTenantDeadLetterQueueFactory<EventMessage<?>> multiTenantDeadLetterQueueFactory) {
        this.tenantProvider = tenantProvider;
        this.multiTenantDeadLetterQueueFactory = multiTenantDeadLetterQueueFactory;
        this.multiTenantStreamableMessageSourceProvider = (defaultSource, processorName, tenantDescriptor, configuration) -> defaultSource;
        this.multiTenantEventProcessorPredicate = MultiTenantEventProcessorPredicate.enableMultiTenancy();
    }

    public MultiTenantEventProcessingModule(TenantProvider tenantProvider, MultiTenantStreamableMessageSourceProvider multiTenantStreamableMessageSourceProvider, MultiTenantDeadLetterQueueFactory<EventMessage<?>> multiTenantDeadLetterQueueFactory, MultiTenantEventProcessorPredicate multiTenantEventProcessorPredicate) {
        this.tenantProvider = tenantProvider;
        this.multiTenantEventProcessorPredicate = multiTenantEventProcessorPredicate;
        this.multiTenantDeadLetterQueueFactory = multiTenantDeadLetterQueueFactory;
        this.multiTenantStreamableMessageSourceProvider = multiTenantStreamableMessageSourceProvider;
    }

    private static String getName(String name, TenantDescriptor tenantDescriptor) {
        return name + "@" + tenantDescriptor.tenantId();
    }

    public Optional<EventProcessor> eventProcessor(String name, TenantDescriptor tenantDescriptor) {
        return Optional.ofNullable(this.eventProcessors().get(MultiTenantEventProcessingModule.getName(name, tenantDescriptor)));
    }

    public Map<String, EventProcessor> eventProcessors() {
        Map original = super.eventProcessors();
        Map<String, EventProcessor> allProcessors = original.entrySet().stream().filter(entry -> ((EventProcessor)entry.getValue()).getClass().isAssignableFrom(MultiTenantEventProcessor.class)).flatMap(entry -> ((MultiTenantEventProcessor)entry.getValue()).tenantEventProcessors().stream()).collect(Collectors.toMap(EventProcessor::getName, processor -> processor));
        allProcessors.putAll(original);
        return allProcessors;
    }

    public EventProcessor subscribingEventProcessor(String name, EventHandlerInvoker eventHandlerInvoker, SubscribableMessageSource<? extends EventMessage<?>> messageSource) {
        if (!this.multiTenantEventProcessorPredicate.test(name)) {
            return this.buildSep(name, eventHandlerInvoker, messageSource);
        }
        MultiTenantEventProcessor eventProcessor = MultiTenantEventProcessor.builder().name(name).tenantSegmentFactory(tenantDescriptor -> {
            SubscribableMessageSource<? extends EventMessage<?>> tenantSource = MultiTenantEventProcessingModule.tenantSource(messageSource, tenantDescriptor);
            return this.buildSep((TenantDescriptor)tenantDescriptor, name, eventHandlerInvoker, tenantSource);
        }).build();
        this.tenantProvider.subscribe(eventProcessor);
        return eventProcessor;
    }

    private static SubscribableMessageSource<? extends EventMessage<?>> tenantSource(SubscribableMessageSource<? extends EventMessage<?>> messageSource, TenantDescriptor tenantDescriptor) {
        return messageSource instanceof MultiTenantSubscribableMessageSource ? (SubscribableMessageSource)((MultiTenantSubscribableMessageSource)messageSource).tenantSegments().get(tenantDescriptor) : messageSource;
    }

    private SubscribingEventProcessor buildSep(String name, EventHandlerInvoker eventHandlerInvoker, SubscribableMessageSource<? extends EventMessage<?>> source, TransactionManager transactionManager) {
        return SubscribingEventProcessor.builder().name(name).eventHandlerInvoker(eventHandlerInvoker).rollbackConfiguration(super.rollbackConfiguration(name)).errorHandler(super.errorHandler(name)).messageMonitor(super.messageMonitor(SubscribingEventProcessor.class, name)).messageSource(source).processingStrategy((EventProcessingStrategy)DirectEventProcessingStrategy.INSTANCE).transactionManager(transactionManager).build();
    }

    private SubscribingEventProcessor buildSep(TenantDescriptor tenantDescriptor, String name, EventHandlerInvoker eventHandlerInvoker, SubscribableMessageSource<? extends EventMessage<?>> source) {
        TenantWrappedTransactionManager transactionManager = new TenantWrappedTransactionManager(super.transactionManager(name), tenantDescriptor);
        return this.buildSep(MultiTenantEventProcessingModule.getName(name, tenantDescriptor), eventHandlerInvoker, source, transactionManager);
    }

    private SubscribingEventProcessor buildSep(String name, EventHandlerInvoker eventHandlerInvoker, SubscribableMessageSource<? extends EventMessage<?>> source) {
        return this.buildSep(name, eventHandlerInvoker, source, super.transactionManager(name));
    }

    public EventProcessor trackingEventProcessor(String name, EventHandlerInvoker eventHandlerInvoker, TrackingEventProcessorConfiguration config, StreamableMessageSource<TrackedEventMessage<?>> source) {
        if (!this.multiTenantEventProcessorPredicate.test(name)) {
            return this.buildTep(name, eventHandlerInvoker, source, config);
        }
        MultiTenantEventProcessor eventProcessor = MultiTenantEventProcessor.builder().name(name).tenantSegmentFactory(tenantDescriptor -> {
            StreamableMessageSource<TrackedEventMessage<?>> tenantSource = this.multiTenantStreamableMessageSourceProvider.build(MultiTenantEventProcessingModule.defaultSource(source, tenantDescriptor), name, (TenantDescriptor)tenantDescriptor, this.configuration);
            return this.buildTep((TenantDescriptor)tenantDescriptor, name, eventHandlerInvoker, tenantSource, config);
        }).build();
        this.tenantProvider.subscribe(eventProcessor);
        return eventProcessor;
    }

    private TrackingEventProcessor buildTep(String name, EventHandlerInvoker eventHandlerInvoker, StreamableMessageSource<TrackedEventMessage<?>> source, TransactionManager transactionManager, TrackingEventProcessorConfiguration config) {
        return TrackingEventProcessor.builder().name(name).eventHandlerInvoker(eventHandlerInvoker).rollbackConfiguration(super.rollbackConfiguration(name)).errorHandler(super.errorHandler(name)).messageMonitor(super.messageMonitor(TrackingEventProcessor.class, name)).messageSource(source).tokenStore(super.tokenStore(name)).transactionManager(transactionManager).trackingEventProcessorConfiguration(config).build();
    }

    private TrackingEventProcessor buildTep(TenantDescriptor tenantDescriptor, String name, EventHandlerInvoker eventHandlerInvoker, StreamableMessageSource<TrackedEventMessage<?>> source, TrackingEventProcessorConfiguration config) {
        TenantWrappedTransactionManager transactionManager = new TenantWrappedTransactionManager(super.transactionManager(name), tenantDescriptor);
        return this.buildTep(MultiTenantEventProcessingModule.getName(name, tenantDescriptor), eventHandlerInvoker, source, transactionManager, config);
    }

    private TrackingEventProcessor buildTep(String name, EventHandlerInvoker eventHandlerInvoker, StreamableMessageSource<TrackedEventMessage<?>> source, TrackingEventProcessorConfiguration config) {
        return this.buildTep(name, eventHandlerInvoker, source, super.transactionManager(name), config);
    }

    public EventProcessor pooledStreamingEventProcessor(String name, EventHandlerInvoker eventHandlerInvoker, Configuration config, StreamableMessageSource<TrackedEventMessage<?>> source, EventProcessingConfigurer.PooledStreamingProcessorConfiguration processorConfiguration) {
        if (!this.multiTenantEventProcessorPredicate.test(name)) {
            return this.psepBuilder(name, eventHandlerInvoker, source, config).build();
        }
        MultiTenantEventProcessor eventProcessor = MultiTenantEventProcessor.builder().name(name).tenantSegmentFactory(tenantDescriptor -> {
            StreamableMessageSource<TrackedEventMessage<?>> tenantSource = MultiTenantEventProcessingModule.defaultSource(source, tenantDescriptor);
            tenantSource = this.multiTenantStreamableMessageSourceProvider.build(tenantSource, name, (TenantDescriptor)tenantDescriptor, this.configuration);
            PooledStreamingEventProcessor.Builder builder = this.psepBuilder((TenantDescriptor)tenantDescriptor, name, eventHandlerInvoker, tenantSource, config);
            return ((PooledStreamingEventProcessor.Builder)this.psepConfigs.getOrDefault("___DEFAULT_PSEP_CONFIG", EventProcessingConfigurer.PooledStreamingProcessorConfiguration.noOp()).andThen(this.psepConfigs.getOrDefault(name, EventProcessingConfigurer.PooledStreamingProcessorConfiguration.noOp())).andThen(processorConfiguration).apply((Object)config, (Object)builder)).build();
        }).build();
        this.tenantProvider.subscribe(eventProcessor);
        return eventProcessor;
    }

    private static StreamableMessageSource<TrackedEventMessage<?>> defaultSource(StreamableMessageSource<TrackedEventMessage<?>> source, TenantDescriptor tenantDescriptor) {
        return source instanceof MultiTenantEventStore ? (StreamableMessageSource)((MultiTenantEventStore)source).tenantSegments().get(tenantDescriptor) : source;
    }

    private PooledStreamingEventProcessor.Builder psepBuilder(String name, EventHandlerInvoker eventHandlerInvoker, StreamableMessageSource<TrackedEventMessage<?>> source, TransactionManager transactionManager, Configuration config) {
        return PooledStreamingEventProcessor.builder().name(name).eventHandlerInvoker(eventHandlerInvoker).rollbackConfiguration(super.rollbackConfiguration(name)).errorHandler(super.errorHandler(name)).messageMonitor(super.messageMonitor(PooledStreamingEventProcessor.class, name)).messageSource(source).tokenStore(super.tokenStore(name)).transactionManager(transactionManager).coordinatorExecutor(processorName -> {
            ScheduledExecutorService coordinatorExecutor = this.defaultExecutor("Coordinator[" + processorName + "]");
            config.onShutdown(coordinatorExecutor::shutdown);
            return coordinatorExecutor;
        }).workerExecutor(processorName -> {
            ScheduledExecutorService workerExecutor = this.defaultExecutor("WorkPackage[" + processorName + "]");
            config.onShutdown(workerExecutor::shutdown);
            return workerExecutor;
        });
    }

    private PooledStreamingEventProcessor.Builder psepBuilder(TenantDescriptor tenantDescriptor, String name, EventHandlerInvoker eventHandlerInvoker, StreamableMessageSource<TrackedEventMessage<?>> source, Configuration config) {
        TenantWrappedTransactionManager transactionManager = new TenantWrappedTransactionManager(super.transactionManager(name), tenantDescriptor);
        return this.psepBuilder(MultiTenantEventProcessingModule.getName(name, tenantDescriptor), eventHandlerInvoker, source, transactionManager, config);
    }

    private PooledStreamingEventProcessor.Builder psepBuilder(String name, EventHandlerInvoker eventHandlerInvoker, StreamableMessageSource<TrackedEventMessage<?>> source, Configuration config) {
        return this.psepBuilder(name, eventHandlerInvoker, source, super.transactionManager(name), config);
    }

    public EventProcessingConfigurer registerDeadLetterQueue(@Nonnull String processingGroup, @Nonnull Function<Configuration, SequencedDeadLetterQueue<EventMessage<?>>> queueBuilder) {
        if (this.multiTenantDeadLetterQueueFactory == null) {
            throw new AxonConfigurationException("Cannot register a DeadLetterQueue without a MultiTenantDeadLetterQueueFactory");
        }
        MultiTenantDeadLetterQueue<EventMessage<?>> deadLetterQueue = this.multiTenantDeadLetterQueueFactory.getDeadLetterQueue(processingGroup);
        deadLetterQueue.registerDeadLetterQueueSupplier(() -> (SequencedDeadLetterQueue)queueBuilder.apply(this.configuration));
        return super.registerDeadLetterQueue(processingGroup, configuration -> deadLetterQueue);
    }

    public Optional<SequencedDeadLetterProcessor<EventMessage<?>>> sequencedDeadLetterProcessor(@Nonnull String processingGroup) {
        return super.sequencedDeadLetterProcessor(processingGroup).map(MultiTenantDeadLetterProcessor::new);
    }

    private ScheduledExecutorService defaultExecutor(String factoryName) {
        return Executors.newScheduledThreadPool(1, (ThreadFactory)new AxonThreadFactory(factoryName));
    }
}

