/*
 * Decompiled with CFR 0.152.
 */
package io.eventuate.tram.sagas.orchestration;

import io.eventuate.tram.commands.common.Command;
import io.eventuate.tram.commands.common.CommandMessageHeaders;
import io.eventuate.tram.commands.common.CommandReplyOutcome;
import io.eventuate.tram.commands.common.Failure;
import io.eventuate.tram.commands.common.Success;
import io.eventuate.tram.commands.producer.CommandProducer;
import io.eventuate.tram.messaging.common.Message;
import io.eventuate.tram.messaging.consumer.MessageConsumer;
import io.eventuate.tram.messaging.producer.MessageBuilder;
import io.eventuate.tram.sagas.common.LockTarget;
import io.eventuate.tram.sagas.common.SagaLockManager;
import io.eventuate.tram.sagas.common.SagaReplyHeaders;
import io.eventuate.tram.sagas.common.SagaUnlockCommand;
import io.eventuate.tram.sagas.orchestration.DestinationAndResource;
import io.eventuate.tram.sagas.orchestration.Saga;
import io.eventuate.tram.sagas.orchestration.SagaActions;
import io.eventuate.tram.sagas.orchestration.SagaCommandProducer;
import io.eventuate.tram.sagas.orchestration.SagaDataSerde;
import io.eventuate.tram.sagas.orchestration.SagaDefinition;
import io.eventuate.tram.sagas.orchestration.SagaInstance;
import io.eventuate.tram.sagas.orchestration.SagaInstanceRepository;
import io.eventuate.tram.sagas.orchestration.SagaManager;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Optional;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SagaManagerImpl<Data>
implements SagaManager<Data> {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    private Saga<Data> saga;
    private SagaInstanceRepository sagaInstanceRepository;
    private CommandProducer commandProducer;
    private MessageConsumer messageConsumer;
    private SagaLockManager sagaLockManager;
    private SagaCommandProducer sagaCommandProducer;

    public SagaManagerImpl(Saga<Data> saga, SagaInstanceRepository sagaInstanceRepository, CommandProducer commandProducer, MessageConsumer messageConsumer, SagaLockManager sagaLockManager, SagaCommandProducer sagaCommandProducer) {
        this.saga = saga;
        this.sagaInstanceRepository = sagaInstanceRepository;
        this.commandProducer = commandProducer;
        this.messageConsumer = messageConsumer;
        this.sagaLockManager = sagaLockManager;
        this.sagaCommandProducer = sagaCommandProducer;
    }

    public void setSagaCommandProducer(SagaCommandProducer sagaCommandProducer) {
        this.sagaCommandProducer = sagaCommandProducer;
    }

    public void setSagaInstanceRepository(SagaInstanceRepository sagaInstanceRepository) {
        this.sagaInstanceRepository = sagaInstanceRepository;
    }

    public void setCommandProducer(CommandProducer commandProducer) {
        this.commandProducer = commandProducer;
    }

    public void setMessageConsumer(MessageConsumer messageConsumer) {
        this.messageConsumer = messageConsumer;
    }

    public void setSagaLockManager(SagaLockManager sagaLockManager) {
        this.sagaLockManager = sagaLockManager;
    }

    @Override
    public SagaInstance create(Data sagaData) {
        return this.create(sagaData, Optional.empty());
    }

    @Override
    public SagaInstance create(Data data, Class targetClass, Object targetId) {
        return this.create(data, Optional.of(new LockTarget(targetClass, targetId).getTarget()));
    }

    @Override
    public SagaInstance create(Data sagaData, Optional<String> resource) {
        SagaInstance sagaInstance = new SagaInstance(this.getSagaType(), null, "????", null, SagaDataSerde.serializeSagaData(sagaData), new HashSet<DestinationAndResource>());
        this.sagaInstanceRepository.save(sagaInstance);
        String sagaId = sagaInstance.getId();
        this.saga.onStarting(sagaId, sagaData);
        resource.ifPresent(r -> {
            if (!this.sagaLockManager.claimLock(this.getSagaType(), sagaId, r)) {
                throw new RuntimeException("Cannot claim lock for resource");
            }
        });
        SagaActions<Data> actions = this.getStateDefinition().start(sagaData);
        actions.getLocalException().ifPresent(e -> {
            throw e;
        });
        this.processActions(this.saga.getSagaType(), sagaId, sagaInstance, sagaData, actions);
        return sagaInstance;
    }

    private void performEndStateActions(String sagaId, SagaInstance sagaInstance, boolean compensating, boolean failed, Data sagaData) {
        for (DestinationAndResource dr : sagaInstance.getDestinationsAndResources()) {
            HashMap<String, String> headers = new HashMap<String, String>();
            headers.put("command_saga_id", sagaId);
            headers.put("command_saga_type", this.getSagaType());
            this.commandProducer.send(dr.getDestination(), dr.getResource(), (Command)new SagaUnlockCommand(), this.makeSagaReplyChannel(), headers);
        }
        if (failed) {
            this.saga.onSagaFailed(sagaId, sagaData);
        }
        if (compensating) {
            this.saga.onSagaRolledBack(sagaId, sagaData);
        } else {
            this.saga.onSagaCompletedSuccessfully(sagaId, sagaData);
        }
    }

    private SagaDefinition<Data> getStateDefinition() {
        SagaDefinition<Data> sm = this.saga.getSagaDefinition();
        if (sm == null) {
            throw new RuntimeException("state machine cannot be null");
        }
        return sm;
    }

    private String getSagaType() {
        return this.saga.getSagaType();
    }

    @Override
    @PostConstruct
    public void subscribeToReplyChannel() {
        this.messageConsumer.subscribe(this.saga.getSagaType() + "-consumer", Collections.singleton(this.makeSagaReplyChannel()), this::handleMessage);
    }

    private String makeSagaReplyChannel() {
        return this.getSagaType() + "-reply";
    }

    public void handleMessage(Message message) {
        this.logger.debug("handle message invoked {}", (Object)message);
        if (message.hasHeader(SagaReplyHeaders.REPLY_SAGA_ID)) {
            this.handleReply(message);
        } else {
            this.logger.warn("Handle message doesn't know what to do with: {} ", (Object)message);
        }
    }

    private void handleReply(Message message) {
        if (!this.isReplyForThisSagaType(message).booleanValue()) {
            return;
        }
        this.logger.debug("Handle reply: {}", (Object)message);
        String sagaId = message.getRequiredHeader(SagaReplyHeaders.REPLY_SAGA_ID);
        String sagaType = message.getRequiredHeader(SagaReplyHeaders.REPLY_SAGA_TYPE);
        SagaInstance sagaInstance = this.sagaInstanceRepository.find(sagaType, sagaId);
        Object sagaData = SagaDataSerde.deserializeSagaData(sagaInstance.getSerializedSagaData());
        message.getHeader("saga-locked-target").ifPresent(lockedTarget -> {
            String destination = message.getRequiredHeader(CommandMessageHeaders.inReply((String)"command__destination"));
            sagaInstance.addDestinationsAndResources(Collections.singleton(new DestinationAndResource(destination, (String)lockedTarget)));
        });
        String currentState = sagaInstance.getStateName();
        this.logger.info("Current state={}", (Object)currentState);
        SagaActions<Data> actions = this.getStateDefinition().handleReply(sagaType, sagaId, currentState, sagaData, message);
        this.logger.info("Handled reply. Sending commands {}", actions.getCommands());
        this.processActions(sagaType, sagaId, sagaInstance, sagaData, actions);
    }

    private void processActions(String sagaType, String sagaId, SagaInstance sagaInstance, Data sagaData, SagaActions<Data> actions) {
        while (true) {
            if (actions.getLocalException().isPresent()) {
                actions = this.getStateDefinition().handleReply(sagaType, sagaId, actions.getUpdatedState().get(), actions.getUpdatedSagaData().get(), MessageBuilder.withPayload((String)"{}").withHeader("reply_outcome-type", CommandReplyOutcome.FAILURE.name()).withHeader("reply_type", Failure.class.getName()).build());
                continue;
            }
            String lastRequestId = this.sagaCommandProducer.sendCommands(this.getSagaType(), sagaId, actions.getCommands(), this.makeSagaReplyChannel());
            sagaInstance.setLastRequestId(lastRequestId);
            this.updateState(sagaInstance, actions);
            sagaInstance.setSerializedSagaData(SagaDataSerde.serializeSagaData(actions.getUpdatedSagaData().orElse(sagaData)));
            if (actions.isEndState()) {
                this.performEndStateActions(sagaId, sagaInstance, actions.isCompensating(), actions.isFailed(), sagaData);
            }
            this.sagaInstanceRepository.update(sagaInstance);
            if (!actions.isLocal()) break;
            actions = this.getStateDefinition().handleReply(sagaType, sagaId, actions.getUpdatedState().get(), actions.getUpdatedSagaData().get(), MessageBuilder.withPayload((String)"{}").withHeader("reply_outcome-type", CommandReplyOutcome.SUCCESS.name()).withHeader("reply_type", Success.class.getName()).build());
        }
    }

    private void updateState(SagaInstance sagaInstance, SagaActions<Data> actions) {
        actions.getUpdatedState().ifPresent(stateName -> {
            sagaInstance.setStateName((String)stateName);
            sagaInstance.setEndState(actions.isEndState());
            sagaInstance.setCompensating(actions.isCompensating());
            sagaInstance.setFailed(actions.isFailed());
        });
    }

    private Boolean isReplyForThisSagaType(Message message) {
        return message.getHeader(SagaReplyHeaders.REPLY_SAGA_TYPE).map(x -> x.equals(this.getSagaType())).orElse(false);
    }
}

