/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling.processors.streaming.token.store.jpa;

import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import jakarta.persistence.EntityManager;
import jakarta.persistence.LockModeType;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.time.temporal.TemporalAmount;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.DateTimeUtils;
import org.axonframework.common.FutureUtils;
import org.axonframework.common.ObjectUtils;
import org.axonframework.common.jpa.EntityManagerProvider;
import org.axonframework.eventhandling.processors.streaming.segmenting.Segment;
import org.axonframework.eventhandling.processors.streaming.token.TrackingToken;
import org.axonframework.eventhandling.processors.streaming.token.store.ConfigToken;
import org.axonframework.eventhandling.processors.streaming.token.store.TokenStore;
import org.axonframework.eventhandling.processors.streaming.token.store.UnableToClaimTokenException;
import org.axonframework.eventhandling.processors.streaming.token.store.UnableToInitializeTokenException;
import org.axonframework.eventhandling.processors.streaming.token.store.UnableToRetrieveIdentifierException;
import org.axonframework.eventhandling.processors.streaming.token.store.jpa.TokenEntry;
import org.axonframework.messaging.unitofwork.ProcessingContext;
import org.axonframework.serialization.SerializedObject;
import org.axonframework.serialization.SerializedType;
import org.axonframework.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JpaTokenStore
implements TokenStore {
    private static final Logger logger = LoggerFactory.getLogger(JpaTokenStore.class);
    private static final String CONFIG_TOKEN_ID = "__config";
    private static final int CONFIG_SEGMENT = 0;
    private static final String OWNER_PARAM = "owner";
    private static final String PROCESSOR_NAME_PARAM = "processorName";
    private static final String SEGMENT_PARAM = "segment";
    private final EntityManagerProvider entityManagerProvider;
    private final Serializer serializer;
    private final TemporalAmount claimTimeout;
    private final String nodeId;
    private final LockModeType loadingLockMode;

    protected JpaTokenStore(Builder builder) {
        builder.validate();
        this.entityManagerProvider = builder.entityManagerProvider;
        this.serializer = builder.serializer;
        this.claimTimeout = builder.claimTimeout;
        this.nodeId = builder.nodeId;
        this.loadingLockMode = builder.loadingLockMode;
    }

    public static Builder builder() {
        return new Builder();
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> initializeTokenSegments(@Nonnull String processorName, int segmentCount, @Nullable TrackingToken initialToken, @Nullable ProcessingContext context) throws UnableToClaimTokenException {
        EntityManager entityManager = this.entityManagerProvider.getEntityManager();
        if (FutureUtils.joinAndUnwrap(this.fetchSegments(processorName, context)).length > 0) {
            throw new UnableToClaimTokenException("Could not initialize segments. Some segments were already present.");
        }
        for (int segment = 0; segment < segmentCount; ++segment) {
            TokenEntry token = new TokenEntry(processorName, segment, initialToken, this.serializer);
            entityManager.persist((Object)token);
        }
        entityManager.flush();
        return FutureUtils.emptyCompletedFuture();
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> storeToken(@Nullable TrackingToken token, @Nonnull String processorName, int segment, @Nullable ProcessingContext context) {
        EntityManager entityManager = this.entityManagerProvider.getEntityManager();
        TokenEntry tokenToStore = new TokenEntry(processorName, segment, token, this.serializer);
        byte[] tokenDataToStore = ObjectUtils.getOrDefault(tokenToStore.getSerializedToken(), SerializedObject::getData, null);
        String tokenTypeToStore = ObjectUtils.getOrDefault(tokenToStore.getTokenType(), SerializedType::getName, null);
        int updatedTokens = entityManager.createQuery("UPDATE TokenEntry te SET te.token = :token, te.tokenType = :tokenType, te.timestamp = :timestamp WHERE te.owner = :owner AND te.processorName = :processorName AND te.segment = :segment").setParameter("token", (Object)tokenDataToStore).setParameter("tokenType", (Object)tokenTypeToStore).setParameter("timestamp", (Object)tokenToStore.timestampAsString()).setParameter(OWNER_PARAM, (Object)this.nodeId).setParameter(PROCESSOR_NAME_PARAM, (Object)processorName).setParameter(SEGMENT_PARAM, (Object)segment).executeUpdate();
        if (updatedTokens == 0) {
            logger.debug("Could not update token [{}] for processor [{}] and segment [{}]. Trying load-then-save approach instead.", new Object[]{token, processorName, segment});
            TokenEntry tokenEntry = this.loadToken(processorName, segment, entityManager);
            tokenEntry.updateToken(token, this.serializer);
        }
        return FutureUtils.emptyCompletedFuture();
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> releaseClaim(@Nonnull String processorName, int segment, @Nullable ProcessingContext context) {
        EntityManager entityManager = this.entityManagerProvider.getEntityManager();
        entityManager.createQuery("UPDATE TokenEntry te SET te.owner = null WHERE te.owner = :owner AND te.processorName = :processorName AND te.segment = :segment").setParameter(PROCESSOR_NAME_PARAM, (Object)processorName).setParameter(SEGMENT_PARAM, (Object)segment).setParameter(OWNER_PARAM, (Object)this.nodeId).executeUpdate();
        return FutureUtils.emptyCompletedFuture();
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> initializeSegment(@Nullable TrackingToken token, @Nonnull String processorName, int segment, @Nullable ProcessingContext context) throws UnableToInitializeTokenException {
        EntityManager entityManager = this.entityManagerProvider.getEntityManager();
        TokenEntry entry = new TokenEntry(processorName, segment, token, this.serializer);
        entityManager.persist((Object)entry);
        entityManager.flush();
        return FutureUtils.emptyCompletedFuture();
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> deleteToken(@Nonnull String processorName, int segment, @Nullable ProcessingContext context) throws UnableToClaimTokenException {
        EntityManager entityManager = this.entityManagerProvider.getEntityManager();
        int updates = entityManager.createQuery("DELETE FROM TokenEntry te WHERE te.owner = :owner AND te.processorName = :processorName AND te.segment = :segment").setParameter(PROCESSOR_NAME_PARAM, (Object)processorName).setParameter(SEGMENT_PARAM, (Object)segment).setParameter(OWNER_PARAM, (Object)this.nodeId).executeUpdate();
        if (updates == 0) {
            throw new UnableToClaimTokenException("Unable to remove token. It is not owned by " + this.nodeId);
        }
        return FutureUtils.emptyCompletedFuture();
    }

    @Override
    @Nonnull
    public CompletableFuture<TrackingToken> fetchToken(@Nonnull String processorName, int segment, @Nullable ProcessingContext context) {
        EntityManager entityManager = this.entityManagerProvider.getEntityManager();
        return CompletableFuture.completedFuture(this.loadToken(processorName, segment, entityManager).getToken(this.serializer));
    }

    @Override
    @Nonnull
    public CompletableFuture<TrackingToken> fetchToken(@Nonnull String processorName, @Nonnull Segment segment, @Nullable ProcessingContext context) throws UnableToClaimTokenException {
        EntityManager entityManager = this.entityManagerProvider.getEntityManager();
        return CompletableFuture.completedFuture(this.loadToken(processorName, segment, entityManager).getToken(this.serializer));
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> extendClaim(@Nonnull String processorName, int segment, @Nullable ProcessingContext context) throws UnableToClaimTokenException {
        EntityManager entityManager = this.entityManagerProvider.getEntityManager();
        int updates = entityManager.createQuery("UPDATE TokenEntry te SET te.timestamp = :timestamp WHERE te.processorName = :processorName AND te.segment = :segment AND te.owner = :owner").setParameter(PROCESSOR_NAME_PARAM, (Object)processorName).setParameter(SEGMENT_PARAM, (Object)segment).setParameter(OWNER_PARAM, (Object)this.nodeId).setParameter("timestamp", (Object)DateTimeUtils.formatInstant(TokenEntry.clock.instant())).executeUpdate();
        if (updates == 0) {
            throw new UnableToClaimTokenException("Unable to extend the claim on token for processor '" + processorName + "[" + segment + "]'. It is either claimed by another process, or there is no such token.");
        }
        return FutureUtils.emptyCompletedFuture();
    }

    @Override
    @Nonnull
    public CompletableFuture<int[]> fetchSegments(@Nonnull String processorName, @Nullable ProcessingContext context) {
        EntityManager entityManager = this.entityManagerProvider.getEntityManager();
        List resultList = entityManager.createQuery("SELECT te.segment FROM TokenEntry te WHERE te.processorName = :processorName ORDER BY te.segment ASC", Integer.class).setParameter(PROCESSOR_NAME_PARAM, (Object)processorName).getResultList();
        return CompletableFuture.completedFuture(resultList.stream().mapToInt(i -> i).toArray());
    }

    @Override
    @Nonnull
    public CompletableFuture<List<Segment>> fetchAvailableSegments(@Nonnull String processorName, @Nullable ProcessingContext context) {
        EntityManager entityManager = this.entityManagerProvider.getEntityManager();
        List resultList = entityManager.createQuery("SELECT te FROM TokenEntry te WHERE te.processorName = :processorName ORDER BY te.segment ASC", TokenEntry.class).setParameter(PROCESSOR_NAME_PARAM, (Object)processorName).getResultList();
        int[] allSegments = resultList.stream().mapToInt(TokenEntry::getSegment).toArray();
        return CompletableFuture.completedFuture(resultList.stream().filter(tokenEntry -> tokenEntry.mayClaim(this.nodeId, this.claimTimeout)).map(tokenEntry -> Segment.computeSegment(tokenEntry.getSegment(), allSegments)).collect(Collectors.toList()));
    }

    protected TokenEntry loadToken(String processorName, int segment, EntityManager entityManager) {
        TokenEntry token = (TokenEntry)entityManager.find(TokenEntry.class, (Object)new TokenEntry.PK(processorName, segment), this.loadingLockMode);
        if (token == null) {
            throw new UnableToClaimTokenException(String.format("Unable to claim token '%s[%s]'. It has not been initialized yet", processorName, segment));
        }
        if (!token.claim(this.nodeId, this.claimTimeout)) {
            throw new UnableToClaimTokenException(String.format("Unable to claim token '%s[%s]'. It is owned by '%s'", processorName, segment, token.getOwner()));
        }
        return token;
    }

    protected TokenEntry loadToken(String processorName, Segment segment, EntityManager entityManager) {
        TokenEntry token = this.loadToken(processorName, segment.getSegmentId(), entityManager);
        try {
            this.validateSegment(processorName, segment, entityManager);
        }
        catch (UnableToClaimTokenException e) {
            token.releaseClaim(this.nodeId);
            throw e;
        }
        return token;
    }

    private void validateSegment(String processorName, Segment segment, EntityManager entityManager) {
        TokenEntry mergeableSegment = (TokenEntry)entityManager.find(TokenEntry.class, (Object)new TokenEntry.PK(processorName, segment.mergeableSegmentId()), this.loadingLockMode);
        if (mergeableSegment == null) {
            throw new UnableToClaimTokenException(String.format("Unable to claim token '%s[%s]'. It has been merged with another segment", processorName, segment.getSegmentId()));
        }
        TokenEntry splitSegment = (TokenEntry)entityManager.find(TokenEntry.class, (Object)new TokenEntry.PK(processorName, segment.splitSegmentId()), this.loadingLockMode);
        if (splitSegment != null) {
            throw new UnableToClaimTokenException(String.format("Unable to claim token '%s[%s]'. It has been split into two segments", processorName, segment.getSegmentId()));
        }
    }

    @Override
    @Nonnull
    public CompletableFuture<Optional<String>> retrieveStorageIdentifier(@Nullable ProcessingContext context) {
        try {
            return CompletableFuture.completedFuture(Optional.of(this.getConfig()).map(i -> i.get("id")));
        }
        catch (Exception e) {
            throw new UnableToRetrieveIdentifierException("Exception occurred while trying to establish storage identifier", e);
        }
    }

    private ConfigToken getConfig() {
        EntityManager em = this.entityManagerProvider.getEntityManager();
        TokenEntry token = (TokenEntry)em.find(TokenEntry.class, (Object)new TokenEntry.PK(CONFIG_TOKEN_ID, 0), LockModeType.NONE);
        if (token == null) {
            token = new TokenEntry(CONFIG_TOKEN_ID, 0, new ConfigToken(Collections.singletonMap("id", UUID.randomUUID().toString())), this.serializer);
            em.persist((Object)token);
            em.flush();
        }
        return (ConfigToken)token.getToken(this.serializer);
    }

    public Serializer serializer() {
        return this.serializer;
    }

    public static class Builder {
        private LockModeType loadingLockMode = LockModeType.PESSIMISTIC_WRITE;
        private EntityManagerProvider entityManagerProvider;
        private Serializer serializer;
        private TemporalAmount claimTimeout = Duration.ofSeconds(10L);
        private String nodeId = ManagementFactory.getRuntimeMXBean().getName();

        public Builder entityManagerProvider(EntityManagerProvider entityManagerProvider) {
            BuilderUtils.assertNonNull(entityManagerProvider, "EntityManagerProvider may not be null");
            this.entityManagerProvider = entityManagerProvider;
            return this;
        }

        public Builder serializer(Serializer serializer) {
            BuilderUtils.assertNonNull(serializer, "Serializer may not be null");
            this.serializer = serializer;
            return this;
        }

        public Builder claimTimeout(TemporalAmount claimTimeout) {
            BuilderUtils.assertNonNull(claimTimeout, "The claim timeout may not be null");
            this.claimTimeout = claimTimeout;
            return this;
        }

        public Builder nodeId(String nodeId) {
            this.assertNodeId(nodeId, "The nodeId may not be null or empty");
            this.nodeId = nodeId;
            return this;
        }

        public Builder loadingLockMode(LockModeType loadingLockMode) {
            this.loadingLockMode = loadingLockMode;
            return this;
        }

        public JpaTokenStore build() {
            return new JpaTokenStore(this);
        }

        protected void validate() throws AxonConfigurationException {
            BuilderUtils.assertNonNull(this.entityManagerProvider, "The EntityManagerProvider is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.serializer, "The Serializer is a hard requirement and should be provided");
            this.assertNodeId(this.nodeId, "The nodeId is a hard requirement and should be provided");
        }

        private void assertNodeId(String nodeId, String exceptionMessage) {
            BuilderUtils.assertThat(nodeId, name -> Objects.nonNull(name) && !"".equals(name), exceptionMessage);
        }
    }
}

