/*
 * Decompiled with CFR 0.152.
 */
package com.azure.spring.eventhubs.checkpoint;

import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.spring.eventhubs.checkpoint.EventCheckpointManager;
import com.azure.spring.messaging.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

class PartitionCountCheckpointManager
extends EventCheckpointManager {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionCountCheckpointManager.class);
    private final ConcurrentHashMap<String, AtomicInteger> countByPartition = new ConcurrentHashMap();

    PartitionCountCheckpointManager(CheckpointConfig checkpointConfig) {
        super(checkpointConfig);
        Assert.isTrue((this.checkpointConfig.getMode() == CheckpointMode.PARTITION_COUNT ? 1 : 0) != 0, () -> "PartitionCountCheckpointManager should have checkpointMode partition_count");
    }

    @Override
    protected Logger getLogger() {
        return LOG;
    }

    @Override
    public void checkpoint(EventContext context) {
        String partitionId = context.getPartitionContext().getPartitionId();
        this.countByPartition.computeIfAbsent(partitionId, k -> new AtomicInteger(0));
        AtomicInteger count = this.countByPartition.get(partitionId);
        if (count.incrementAndGet() >= this.checkpointConfig.getCount()) {
            context.updateCheckpointAsync().doOnError(t -> this.logCheckpointFail(context, context.getEventData(), (Throwable)t)).doOnSuccess(v -> {
                this.logCheckpointSuccess(context, context.getEventData());
                count.set(0);
            }).subscribe();
        }
    }
}

