/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.smallrye.reactivemessaging.kafka;

import io.quarkus.hibernate.orm.PersistenceUnit;
import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntity;
import io.quarkus.smallrye.reactivemessaging.kafka.CheckpointEntityId;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore;
import io.smallrye.reactive.messaging.kafka.commit.ProcessingState;
import io.vertx.mutiny.core.Vertx;
import java.lang.annotation.Annotation;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.stream.Collectors;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import org.apache.kafka.common.TopicPartition;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.Transaction;

public class HibernateOrmStateStore
implements CheckpointStateStore {
    public static final String QUARKUS_HIBERNATE_ORM = "quarkus-hibernate-orm";
    private final String consumerGroupId;
    private final SessionFactory sf;
    private final Class<? extends CheckpointEntity> stateType;

    public HibernateOrmStateStore(String consumerGroupId, SessionFactory sf, Class<? extends CheckpointEntity> stateType) {
        this.consumerGroupId = consumerGroupId;
        this.sf = sf;
        this.stateType = stateType;
    }

    public Uni<Map<TopicPartition, ProcessingState<?>>> fetchProcessingState(Collection<TopicPartition> partitions) {
        return Uni.createFrom().deferred(() -> {
            Object[] ids = partitions.stream().map(tp -> new CheckpointEntityId(this.consumerGroupId, (TopicPartition)tp)).toArray(Object[]::new);
            return Vertx.currentContext().executeBlocking(Uni.createFrom().item(() -> {
                ArrayList<CheckpointEntity> fetched = new ArrayList<CheckpointEntity>();
                try (Session session = this.sf.openSession();){
                    for (Object id : ids) {
                        CheckpointEntity entity = (CheckpointEntity)session.find(this.stateType, id);
                        if (entity == null) continue;
                        fetched.add(entity);
                    }
                }
                return fetched.stream().filter(e -> e != null && CheckpointEntity.topicPartition(e) != null).collect(Collectors.toMap(CheckpointEntity::topicPartition, e -> new ProcessingState(e, e.offset.longValue())));
            }));
        });
    }

    public Uni<Void> persistProcessingState(Map<TopicPartition, ProcessingState<?>> state) {
        return Uni.createFrom().deferred(() -> {
            Object[] entities = state.entrySet().stream().filter(e -> !ProcessingState.isEmptyOrNull((ProcessingState)((ProcessingState)e.getValue()))).map(e -> CheckpointEntity.from((ProcessingState)e.getValue(), new CheckpointEntityId(this.consumerGroupId, (TopicPartition)e.getKey()))).toArray();
            return Vertx.currentContext().executeBlocking(Uni.createFrom().emitter(e -> {
                Transaction tx = null;
                try (Session session = this.sf.openSession();){
                    tx = session.beginTransaction();
                    for (Object entity : entities) {
                        session.merge(entity);
                    }
                    session.flush();
                    tx.commit();
                    e.complete(null);
                }
                catch (Throwable t) {
                    if (tx != null) {
                        tx.rollback();
                    }
                    e.fail(t);
                }
            }));
        });
    }

    @ApplicationScoped
    @Identifier(value="quarkus-hibernate-orm")
    public static class Factory
    implements CheckpointStateStore.Factory {
        @Inject
        @Any
        Instance<SessionFactory> sessionFactories;

        public CheckpointStateStore create(KafkaConnectorIncomingConfiguration config, Vertx vertx, KafkaConsumer<?, ?> consumer, Class<?> stateType) {
            String consumerGroupId = (String)consumer.configuration().get("group.id");
            if (!CheckpointEntity.class.isAssignableFrom(stateType)) {
                throw new IllegalArgumentException("State type needs to extend `CheckpointEntity`");
            }
            String persistenceUnit = config.config().getOptionalValue("checkpoint.quarkus-hibernate-orm.persistence-unit", String.class).orElse(null);
            SessionFactory sf = persistenceUnit != null ? (SessionFactory)this.sessionFactories.select(new Annotation[]{new PersistenceUnit.PersistenceUnitLiteral(persistenceUnit)}).get() : (SessionFactory)this.sessionFactories.get();
            return new HibernateOrmStateStore(consumerGroupId, sf, stateType);
        }
    }
}

