/*
 * Decompiled with CFR 0.152.
 */
package org.hibernate.search.mapper.orm.outboxpolling.event.impl;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.hibernate.search.engine.reporting.FailureContext;
import org.hibernate.search.engine.reporting.FailureHandler;
import org.hibernate.search.mapper.orm.outboxpolling.cluster.impl.Agent;
import org.hibernate.search.mapper.orm.outboxpolling.cluster.impl.AgentPersister;
import org.hibernate.search.mapper.orm.outboxpolling.cluster.impl.AgentState;
import org.hibernate.search.mapper.orm.outboxpolling.cluster.impl.AgentType;
import org.hibernate.search.mapper.orm.outboxpolling.cluster.impl.ClusterDescriptor;
import org.hibernate.search.mapper.orm.outboxpolling.cluster.impl.ShardAssignmentDescriptor;
import org.hibernate.search.mapper.orm.outboxpolling.event.impl.AbstractAgentClusterLink;
import org.hibernate.search.mapper.orm.outboxpolling.event.impl.ClusterTarget;
import org.hibernate.search.mapper.orm.outboxpolling.event.impl.OutboxPollingEventProcessingInstructions;
import org.hibernate.search.mapper.orm.outboxpolling.event.impl.ShardAssignment;
import org.hibernate.search.mapper.orm.outboxpolling.logging.impl.OutboxPollingEventsLog;
import org.hibernate.search.util.common.AssertionFailure;
import org.hibernate.search.util.common.SearchException;
import org.hibernate.search.util.common.spi.ToStringTreeAppender;

public final class OutboxPollingEventProcessorClusterLink
extends AbstractAgentClusterLink<OutboxPollingEventProcessingInstructions> {
    private final ShardAssignment.Provider shardAssignmentProvider;
    final boolean shardAssignmentIsStatic;
    ShardAssignment lastShardAssignment;

    public OutboxPollingEventProcessorClusterLink(String agentName, FailureHandler failureHandler, Clock clock, ShardAssignment.Provider shardAssignmentProvider, Duration pollingInterval, Duration pulseInterval, Duration pulseExpiration, ShardAssignmentDescriptor staticShardAssignment) {
        super(new AgentPersister(staticShardAssignment == null ? AgentType.EVENT_PROCESSING_DYNAMIC_SHARDING : AgentType.EVENT_PROCESSING_STATIC_SHARDING, agentName, staticShardAssignment), failureHandler, clock, pollingInterval, pulseInterval, pulseExpiration);
        this.shardAssignmentProvider = shardAssignmentProvider;
        if (staticShardAssignment == null) {
            this.shardAssignmentIsStatic = false;
            this.lastShardAssignment = null;
        } else {
            this.shardAssignmentIsStatic = true;
            this.lastShardAssignment = shardAssignmentProvider.create(staticShardAssignment);
        }
    }

    @Override
    public void appendTo(ToStringTreeAppender appender) {
        super.appendTo(appender);
        appender.attribute("shardAssignmentProvider", (Object)this.shardAssignmentProvider).attribute("shardAssignmentIsStatic", (Object)this.shardAssignmentIsStatic);
    }

    @Override
    protected AbstractAgentClusterLink.WriteAction<OutboxPollingEventProcessingInstructions> doPulse(List<Agent> allAgentsInIdOrder, Agent currentSelf) {
        ClusterTarget clusterTarget;
        for (Agent agent : allAgentsInIdOrder) {
            if (!AgentType.MASS_INDEXING.equals((Object)agent.getType())) continue;
            if (currentSelf.getState() != AgentState.SUSPENDED) {
                OutboxPollingEventsLog.INSTANCE.agentOtherAgentIsIndexingInfo(this.selfReference(), agent);
            } else {
                OutboxPollingEventsLog.INSTANCE.agentOtherAgentIsIndexingTrace(this.selfReference(), agent);
            }
            return (now, self, agentPersister) -> {
                agentPersister.setSuspended(self);
                return this.instructCommitAndRetryPulseAfterDelay(now, this.pulseInterval);
            };
        }
        try {
            clusterTarget = ClusterTarget.create(allAgentsInIdOrder);
        }
        catch (SearchException e) {
            FailureContext.Builder contextBuilder = FailureContext.builder();
            contextBuilder.throwable((Throwable)OutboxPollingEventsLog.INSTANCE.outboxEventProcessorPulseFailed(this.selfReference(), e.getMessage(), allAgentsInIdOrder, (RuntimeException)((Object)e)));
            contextBuilder.failingOperation((Object)OutboxPollingEventsLog.INSTANCE.outboxEventProcessorPulse(this.selfReference()));
            this.failureHandler.handle(contextBuilder.build());
            return (now, self, agentPersister) -> {
                agentPersister.setSuspended(self);
                return this.instructCommitAndRetryPulseAfterDelay(now, this.pulseInterval);
            };
        }
        Optional<ShardAssignmentDescriptor> shardAssignmentOptional = ShardAssignmentDescriptor.fromClusterMemberList(clusterTarget.descriptor.memberIdsInShardOrder, this.selfReference().id);
        if (shardAssignmentOptional.isEmpty()) {
            if (currentSelf.getState() != AgentState.SUSPENDED) {
                OutboxPollingEventsLog.INSTANCE.agentSuperfluousInfo(this.selfReference(), clusterTarget.descriptor);
            } else {
                OutboxPollingEventsLog.INSTANCE.agentSuperfluousTrace(this.selfReference(), clusterTarget.descriptor);
            }
            return (now, self, agentPersister) -> {
                agentPersister.setSuspended(self);
                return this.instructCommitAndRetryPulseAfterDelay(now, this.pulseInterval);
            };
        }
        ShardAssignmentDescriptor targetShardAssignment = shardAssignmentOptional.get();
        if (clusterTarget.descriptor.memberIdsInShardOrder.contains(null)) {
            if (currentSelf.getState() != AgentState.SUSPENDED) {
                OutboxPollingEventsLog.INSTANCE.agentClusterMembersMissingInfo(this.selfReference(), clusterTarget.descriptor);
            } else {
                OutboxPollingEventsLog.INSTANCE.agentClusterMembersMissingTrace(this.selfReference(), clusterTarget.descriptor);
            }
            return (now, self, agentPersister) -> {
                agentPersister.setSuspended(self);
                return this.instructCommitAndRetryPulseAfterDelay(now, this.pollingInterval);
            };
        }
        ShardAssignmentDescriptor persistedShardAssignment = currentSelf.getShardAssignment();
        if (!targetShardAssignment.equals(persistedShardAssignment)) {
            OutboxPollingEventsLog.INSTANCE.agentAssignmentDoesNotMatchTarget(this.selfReference(), persistedShardAssignment, targetShardAssignment, clusterTarget.descriptor);
            return (now, self, agentPersister) -> {
                agentPersister.setWaiting(self, clusterTarget.descriptor, targetShardAssignment);
                return this.instructCommitAndRetryPulseAfterDelay(now, this.pollingInterval);
            };
        }
        if (!this.excludedAgentsAreOutOfCluster(clusterTarget.excluded)) {
            return (now, self, agentPersister) -> {
                agentPersister.setWaiting(self, clusterTarget.descriptor, targetShardAssignment);
                return this.instructCommitAndRetryPulseAfterDelay(now, this.pollingInterval);
            };
        }
        if (!this.clusterMembersAreInCluster(clusterTarget.membersInShardOrder, clusterTarget.descriptor)) {
            return (now, self, agentPersister) -> {
                agentPersister.setWaiting(self, clusterTarget.descriptor, targetShardAssignment);
                return this.instructCommitAndRetryPulseAfterDelay(now, this.pollingInterval);
            };
        }
        if (this.lastShardAssignment == null || !targetShardAssignment.equals(this.lastShardAssignment.descriptor)) {
            if (this.shardAssignmentIsStatic) {
                throw new AssertionFailure("Agent '" + String.valueOf(this.selfReference()) + "' has a static shard assignment, but the target shard assignment (" + String.valueOf(targetShardAssignment) + ") does not match the static shard assignment (" + String.valueOf(this.lastShardAssignment) + ")");
            }
            OutboxPollingEventsLog.INSTANCE.agentAssignment(this.selfReference(), targetShardAssignment);
            this.lastShardAssignment = this.shardAssignmentProvider.create(targetShardAssignment);
        }
        return (now, self, agentPersister) -> {
            agentPersister.setRunning(self, clusterTarget.descriptor);
            return this.instructProceedWithEventProcessing(now);
        };
    }

    private boolean excludedAgentsAreOutOfCluster(List<Agent> excludedAgents) {
        if (excludedAgents.isEmpty()) {
            return true;
        }
        AgentState expectedState = AgentState.SUSPENDED;
        for (Agent agent : excludedAgents) {
            if (expectedState.equals((Object)agent.getState())) continue;
            OutboxPollingEventsLog.INSTANCE.agentWaitingAgentReachState(this.selfReference(), agent.getReference(), expectedState);
            return false;
        }
        OutboxPollingEventsLog.INSTANCE.agentExcluded(this.selfReference(), expectedState);
        return true;
    }

    private boolean clusterMembersAreInCluster(List<Agent> clusterMembersInShardOrder, ClusterDescriptor clusterDescriptor) {
        int expectedTotalShardCount = clusterMembersInShardOrder.size();
        int expectedAssignedShardIndex = 0;
        Set<AgentState> expectedStates = AgentState.WAITING_OR_RUNNING;
        for (Agent agent : clusterMembersInShardOrder) {
            AgentState state = agent.getState();
            if (!expectedStates.contains((Object)agent.getState())) {
                OutboxPollingEventsLog.INSTANCE.clusterMembersAreInClusterWaitingForState(this.selfReference(), agent.getReference(), state, expectedStates);
                return false;
            }
            Integer totalShardCount = agent.getTotalShardCount();
            if (totalShardCount == null || expectedTotalShardCount != totalShardCount) {
                OutboxPollingEventsLog.INSTANCE.clusterMembersAreInClusterShardCountExpectation(this.selfReference(), agent.getReference(), totalShardCount, expectedTotalShardCount);
                return false;
            }
            Integer assignedShardIndex = agent.getAssignedShardIndex();
            if (assignedShardIndex == null || expectedAssignedShardIndex != assignedShardIndex) {
                OutboxPollingEventsLog.INSTANCE.clusterMembersAreInClusterSharIndexExpectation(this.selfReference(), agent.getReference(), assignedShardIndex, expectedAssignedShardIndex);
                return false;
            }
            ++expectedAssignedShardIndex;
        }
        OutboxPollingEventsLog.INSTANCE.clusterMembersAreInClusterReachedExpectedStates(this.selfReference(), expectedStates, clusterDescriptor);
        return true;
    }

    @Override
    protected OutboxPollingEventProcessingInstructions instructCommitAndRetryPulseAfterDelay(Instant now, Duration delay) {
        Instant expiration = now.plus(delay);
        OutboxPollingEventsLog.INSTANCE.instructCommitAndRetryPulseAfterDelay(this.selfReference(), delay, expiration);
        return new OutboxPollingEventProcessingInstructions(this.clock, expiration, Optional.empty());
    }

    private OutboxPollingEventProcessingInstructions instructProceedWithEventProcessing(Instant now) {
        Instant expiration = now.plus(this.pulseInterval);
        OutboxPollingEventsLog.INSTANCE.instructProceedWithEventProcessing(this.selfReference(), this.pulseInterval, expiration);
        return new OutboxPollingEventProcessingInstructions(this.clock, expiration, Optional.of(this.lastShardAssignment.eventFinder));
    }
}

