/*
 * Decompiled with CFR 0.152.
 */
package com.azure.spring.integration.eventhub.checkpoint;

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.models.EventBatchContext;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.spring.integration.core.api.CheckpointConfig;
import com.azure.spring.integration.core.api.CheckpointMode;
import com.azure.spring.integration.eventhub.checkpoint.CheckpointManager;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

public class BatchCheckpointManager
extends CheckpointManager {
    private static final Logger LOG = LoggerFactory.getLogger(BatchCheckpointManager.class);
    private static final String CHECKPOINT_FAIL_MSG = "Consumer group '%s' failed to checkpoint offset %s of message %s on partition %s, last checkpointed message is %s";
    private static final String CHECKPOINT_SUCCESS_MSG = "Consumer group '%s' checkpointed offset %s of message %s on partition %s in %s mode";
    private final ConcurrentHashMap<String, EventData> lastEventByPartition = new ConcurrentHashMap();

    BatchCheckpointManager(CheckpointConfig checkpointConfig) {
        super(checkpointConfig);
        Assert.isTrue((this.checkpointConfig.getCheckpointMode() == CheckpointMode.BATCH ? 1 : 0) != 0, () -> "BatchCheckpointManager should have checkpointMode batch");
    }

    @Override
    public void onMessage(EventContext context, EventData eventData) {
        this.lastEventByPartition.put(context.getPartitionContext().getPartitionId(), eventData);
    }

    @Override
    public void completeBatch(EventContext context) {
        EventData eventData = this.lastEventByPartition.get(context.getPartitionContext().getPartitionId());
        context.updateCheckpointAsync().doOnError(t -> this.logCheckpointFail(context, eventData, (Throwable)t)).doOnSuccess(v -> this.logCheckpointSuccess(context, eventData)).subscribe();
    }

    public void onMessages(EventBatchContext context) {
        EventData lastEvent = this.getLastEnqueuedEvent(context);
        Long offset = lastEvent.getOffset();
        context.updateCheckpointAsync().doOnError(t -> this.logCheckpointFail(context, offset, lastEvent, this.lastEventByPartition.get(context.getPartitionContext().getPartitionId()), (Throwable)t)).doOnSuccess(v -> {
            this.lastEventByPartition.put(context.getPartitionContext().getPartitionId(), lastEvent);
            this.logCheckpointSuccess(context, offset, lastEvent);
        }).subscribe();
    }

    @Override
    protected Logger getLogger() {
        return LOG;
    }

    void logCheckpointFail(EventBatchContext context, Long offset, EventData lastEnqueuedEvent, EventData lastCheckpointedEvent, Throwable t) {
        if (this.getLogger().isWarnEnabled()) {
            this.getLogger().warn(String.format(CHECKPOINT_FAIL_MSG, context.getPartitionContext().getConsumerGroup(), offset, lastEnqueuedEvent, lastCheckpointedEvent, context.getPartitionContext().getPartitionId()), t);
        }
    }

    void logCheckpointSuccess(EventBatchContext context, Long offset, EventData lastEnqueuedEvent) {
        if (this.getLogger().isDebugEnabled()) {
            this.getLogger().debug(String.format(CHECKPOINT_SUCCESS_MSG, context.getPartitionContext().getConsumerGroup(), offset, lastEnqueuedEvent, context.getPartitionContext().getPartitionId(), this.checkpointConfig.getCheckpointMode()));
        }
    }

    EventData getLastEnqueuedEvent(EventBatchContext context) {
        List events = context.getEvents();
        return (EventData)events.get(events.size() - 1);
    }
}

