/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling.pooled;

import java.lang.invoke.MethodHandles;
import java.time.Clock;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.IntStream;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.AbstractEventProcessor;
import org.axonframework.eventhandling.ErrorHandler;
import org.axonframework.eventhandling.EventHandlerInvoker;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventTrackerStatus;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.ReplayToken;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.StreamingEventProcessor;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackerStatus;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventhandling.pooled.Coordinator;
import org.axonframework.eventhandling.pooled.WorkPackage;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.lifecycle.ShutdownHandler;
import org.axonframework.lifecycle.StartHandler;
import org.axonframework.messaging.StreamableMessageSource;
import org.axonframework.messaging.unitofwork.RollbackConfiguration;
import org.axonframework.messaging.unitofwork.RollbackConfigurationType;
import org.axonframework.monitoring.MessageMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PooledStreamingEventProcessor
extends AbstractEventProcessor
implements StreamingEventProcessor {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final String name;
    private final StreamableMessageSource<TrackedEventMessage<?>> messageSource;
    private final TokenStore tokenStore;
    private final TransactionManager transactionManager;
    private final ScheduledExecutorService workerExecutor;
    private final Coordinator coordinator;
    private final int initialSegmentCount;
    private final Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialToken;
    private final long tokenClaimInterval;
    private final int maxClaimedSegments;
    private final long claimExtensionThreshold;
    private final int batchSize;
    private final Clock clock;
    private final AtomicReference<String> tokenStoreIdentifier = new AtomicReference();
    private final Map<Integer, TrackerStatus> processingStatus = new ConcurrentHashMap<Integer, TrackerStatus>();

    public static Builder builder() {
        return new Builder();
    }

    protected PooledStreamingEventProcessor(Builder builder) {
        super(builder);
        this.name = builder.name();
        this.messageSource = builder.messageSource;
        this.tokenStore = builder.tokenStore;
        this.transactionManager = builder.transactionManager;
        this.workerExecutor = (ScheduledExecutorService)builder.workerExecutorBuilder.apply(this.name);
        this.initialSegmentCount = builder.initialSegmentCount;
        this.initialToken = builder.initialToken;
        this.tokenClaimInterval = builder.tokenClaimInterval;
        this.maxClaimedSegments = builder.maxClaimedSegments;
        this.claimExtensionThreshold = builder.claimExtensionThreshold;
        this.batchSize = builder.batchSize;
        this.clock = builder.clock;
        this.coordinator = Coordinator.builder().name(this.name).messageSource(this.messageSource).tokenStore(this.tokenStore).transactionManager(this.transactionManager).executorService((ScheduledExecutorService)builder.coordinatorExecutorBuilder.apply(this.name)).workPackageFactory(this::spawnWorker).eventFilter(event -> this.canHandleType(event.getPayloadType())).onMessageIgnored(this::reportIgnored).processingStatusUpdater(this::statusUpdater).tokenClaimInterval(this.tokenClaimInterval).clock(this.clock).maxClaimedSegments(this.maxClaimedSegments).build();
    }

    @Override
    @StartHandler(phase=0x3FFFFFFF)
    public void start() {
        logger.info("Starting PooledStreamingEventProcessor [{}].", (Object)this.name);
        this.initializeTokenStore();
        this.coordinator.start();
    }

    private void initializeTokenStore() {
        this.transactionManager.executeInTransaction(() -> {
            int[] segments = this.tokenStore.fetchSegments(this.name);
            try {
                if (segments == null || segments.length == 0) {
                    logger.info("Initializing segments for processor [{}] ({} segments)", (Object)this.name, (Object)this.initialSegmentCount);
                    this.tokenStore.initializeTokenSegments(this.name, this.initialSegmentCount, this.initialToken.apply(this.messageSource));
                }
            }
            catch (Exception e) {
                logger.info("Error while initializing the Token Store. This may simply indicate concurrent attempts to initialize.", (Throwable)e);
            }
        });
    }

    @Override
    public void shutDown() {
        this.shutdownAsync().join();
    }

    @Override
    @ShutdownHandler(phase=0x3FFFFFFF)
    public CompletableFuture<Void> shutdownAsync() {
        logger.info("Stopping processor [{}]", (Object)this.name);
        return this.coordinator.stop();
    }

    @Override
    public boolean isRunning() {
        return this.coordinator.isRunning();
    }

    @Override
    public boolean isError() {
        return this.coordinator.isError();
    }

    @Override
    public String getTokenStoreIdentifier() {
        return this.tokenStoreIdentifier.updateAndGet(i -> i != null ? i : this.calculateIdentifier());
    }

    private String calculateIdentifier() {
        return this.transactionManager.fetchInTransaction(() -> this.tokenStore.retrieveStorageIdentifier().orElse("--unknown--"));
    }

    @Override
    public void releaseSegment(int segmentId) {
        this.releaseSegment(segmentId, this.tokenClaimInterval * 2L, TimeUnit.MILLISECONDS);
    }

    @Override
    public void releaseSegment(int segmentId, long releaseDuration, TimeUnit unit) {
        this.coordinator.releaseUntil(segmentId, GenericEventMessage.clock.instant().plusMillis(unit.toMillis(releaseDuration)));
    }

    @Override
    public CompletableFuture<Boolean> splitSegment(int segmentId) {
        if (!this.tokenStore.requiresExplicitSegmentInitialization()) {
            CompletableFuture<Boolean> result = new CompletableFuture<Boolean>();
            result.completeExceptionally(new UnsupportedOperationException("TokenStore must require explicit initialization to safely split tokens."));
            return result;
        }
        return this.coordinator.splitSegment(segmentId);
    }

    @Override
    public CompletableFuture<Boolean> mergeSegment(int segmentId) {
        if (!this.tokenStore.requiresExplicitSegmentInitialization()) {
            CompletableFuture<Boolean> result = new CompletableFuture<Boolean>();
            result.completeExceptionally(new UnsupportedOperationException("TokenStore must require explicit initialization to safely merge tokens."));
            return result;
        }
        return this.coordinator.mergeSegment(segmentId);
    }

    @Override
    public boolean supportsReset() {
        return this.eventHandlerInvoker().supportsReset();
    }

    @Override
    public void resetTokens() {
        this.resetTokens(this.initialToken);
    }

    @Override
    public <R> void resetTokens(R resetContext) {
        this.resetTokens(this.initialToken, resetContext);
    }

    @Override
    public void resetTokens(Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialTrackingTokenSupplier) {
        this.resetTokens(initialTrackingTokenSupplier.apply(this.messageSource));
    }

    @Override
    public <R> void resetTokens(Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialTrackingTokenSupplier, R resetContext) {
        this.resetTokens(initialTrackingTokenSupplier.apply(this.messageSource), resetContext);
    }

    @Override
    public void resetTokens(TrackingToken startPosition) {
        this.resetTokens(startPosition, null);
    }

    @Override
    public <R> void resetTokens(TrackingToken startPosition, R resetContext) {
        Assert.state(this.supportsReset(), () -> "The handlers assigned to this Processor do not support a reset.");
        Assert.state(!this.isRunning(), () -> "The Processor must be shut down before triggering a reset.");
        this.transactionManager.executeInTransaction(() -> {
            int[] segments = this.tokenStore.fetchSegments(this.getName());
            TrackingToken[] tokens = (TrackingToken[])Arrays.stream(segments).mapToObj(segment -> this.tokenStore.fetchToken(this.getName(), segment)).toArray(TrackingToken[]::new);
            this.eventHandlerInvoker().performReset(resetContext);
            IntStream.range(0, tokens.length).forEach(i -> this.tokenStore.storeToken(ReplayToken.createReplayToken(tokens[i], startPosition), this.getName(), segments[i]));
        });
    }

    @Override
    public int maxCapacity() {
        return this.maxClaimedSegments;
    }

    @Override
    public Map<Integer, EventTrackerStatus> processingStatus() {
        return Collections.unmodifiableMap(this.processingStatus);
    }

    private WorkPackage spawnWorker(Segment segment, TrackingToken initialToken) {
        return WorkPackage.builder().name(this.name).tokenStore(this.tokenStore).transactionManager(this.transactionManager).executorService(this.workerExecutor).eventFilter(this::canHandle).batchProcessor(this::processInUnitOfWork).segment(segment).initialToken(initialToken).batchSize(this.batchSize).claimExtensionThreshold(this.claimExtensionThreshold).segmentStatusUpdater(this.singleStatusUpdater(segment.getSegmentId(), new TrackerStatus(segment, initialToken))).clock(this.clock).build();
    }

    private Consumer<UnaryOperator<TrackerStatus>> singleStatusUpdater(int segmentId, TrackerStatus initialStatus) {
        return statusUpdater -> this.processingStatus.compute(segmentId, (s, status) -> (TrackerStatus)statusUpdater.apply(status == null ? initialStatus : status));
    }

    private void statusUpdater(int segmentId, UnaryOperator<TrackerStatus> segmentUpdater) {
        this.processingStatus.computeIfPresent(segmentId, (s, ts) -> (TrackerStatus)segmentUpdater.apply((TrackerStatus)ts));
    }

    public static class Builder
    extends AbstractEventProcessor.Builder {
        private StreamableMessageSource<TrackedEventMessage<?>> messageSource;
        private TokenStore tokenStore;
        private TransactionManager transactionManager;
        private Function<String, ScheduledExecutorService> coordinatorExecutorBuilder = n -> Executors.newScheduledThreadPool(1, new AxonThreadFactory("Coordinator[" + n + "]"));
        private Function<String, ScheduledExecutorService> workerExecutorBuilder = n -> Executors.newScheduledThreadPool(1, new AxonThreadFactory("WorkPackage[" + n + "]"));
        private int initialSegmentCount = 16;
        private Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialToken = StreamableMessageSource::createTailToken;
        private long tokenClaimInterval = 5000L;
        private int maxClaimedSegments = Short.MAX_VALUE;
        private long claimExtensionThreshold = 5000L;
        private int batchSize = 1;
        private Clock clock = GenericEventMessage.clock;

        protected Builder() {
            this.rollbackConfiguration(RollbackConfigurationType.ANY_THROWABLE);
        }

        @Override
        public Builder name(String name) {
            super.name(name);
            return this;
        }

        @Override
        public Builder eventHandlerInvoker(EventHandlerInvoker eventHandlerInvoker) {
            super.eventHandlerInvoker(eventHandlerInvoker);
            return this;
        }

        @Override
        public Builder rollbackConfiguration(RollbackConfiguration rollbackConfiguration) {
            super.rollbackConfiguration(rollbackConfiguration);
            return this;
        }

        @Override
        public Builder errorHandler(ErrorHandler errorHandler) {
            super.errorHandler(errorHandler);
            return this;
        }

        @Override
        public Builder messageMonitor(MessageMonitor<? super EventMessage<?>> messageMonitor) {
            super.messageMonitor(messageMonitor);
            return this;
        }

        public Builder messageSource(StreamableMessageSource<TrackedEventMessage<?>> messageSource) {
            BuilderUtils.assertNonNull(messageSource, "StreamableMessageSource may not be null");
            this.messageSource = messageSource;
            return this;
        }

        public Builder tokenStore(TokenStore tokenStore) {
            BuilderUtils.assertNonNull(tokenStore, "TokenStore may not be null");
            this.tokenStore = tokenStore;
            return this;
        }

        public Builder transactionManager(TransactionManager transactionManager) {
            BuilderUtils.assertNonNull(transactionManager, "TransactionManager may not be null");
            this.transactionManager = transactionManager;
            return this;
        }

        public Builder coordinatorExecutor(ScheduledExecutorService coordinatorExecutor) {
            BuilderUtils.assertNonNull(coordinatorExecutor, "The Coordinator's ScheduledExecutorService may not be null");
            this.coordinatorExecutorBuilder = ignored -> coordinatorExecutor;
            return this;
        }

        public Builder workerExecutorService(ScheduledExecutorService workerExecutor) {
            BuilderUtils.assertNonNull(workerExecutor, "The Worker's ScheduledExecutorService may not be null");
            this.workerExecutorBuilder = ignored -> workerExecutor;
            return this;
        }

        public Builder initialSegmentCount(int initialSegmentCount) {
            BuilderUtils.assertStrictPositive(initialSegmentCount, "The initial segment count should be a higher valuer than zero");
            this.initialSegmentCount = initialSegmentCount;
            return this;
        }

        public Builder initialToken(Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialToken) {
            BuilderUtils.assertNonNull(initialToken, "The initial token builder Function may not be null");
            this.initialToken = initialToken;
            return this;
        }

        public Builder tokenClaimInterval(long tokenClaimInterval) {
            BuilderUtils.assertStrictPositive(tokenClaimInterval, "Token claim interval should be a higher valuer than zero");
            this.tokenClaimInterval = tokenClaimInterval;
            return this;
        }

        public Builder maxClaimedSegments(int maxClaimedSegments) {
            BuilderUtils.assertStrictPositive(maxClaimedSegments, "Max claimed segments should be a higher valuer than zero");
            this.maxClaimedSegments = maxClaimedSegments;
            return this;
        }

        public Builder claimExtensionThreshold(long claimExtensionThreshold) {
            BuilderUtils.assertStrictPositive(claimExtensionThreshold, "The claim extension threshold should be a higher valuer than zero");
            this.claimExtensionThreshold = claimExtensionThreshold;
            return this;
        }

        public Builder batchSize(int batchSize) {
            BuilderUtils.assertStrictPositive(batchSize, "The batch size should be a higher valuer than zero");
            this.batchSize = batchSize;
            return this;
        }

        public Builder clock(Clock clock) {
            BuilderUtils.assertNonNull(clock, "Clock may not be null");
            this.clock = clock;
            return this;
        }

        public PooledStreamingEventProcessor build() {
            return new PooledStreamingEventProcessor(this);
        }

        @Override
        protected void validate() throws AxonConfigurationException {
            super.validate();
            BuilderUtils.assertNonNull(this.messageSource, "The StreamableMessageSource is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.tokenStore, "The TokenStore is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.transactionManager, "The TransactionManager is a hard requirement and should be provided");
        }

        public String name() {
            return this.name;
        }
    }
}

