/*
 * Decompiled with CFR 0.152.
 */
package io.aiven.kafka.connect.common.grouper;

import io.aiven.kafka.connect.common.config.FilenameTemplateVariable;
import io.aiven.kafka.connect.common.config.TimestampSource;
import io.aiven.kafka.connect.common.grouper.RecordGrouper;
import io.aiven.kafka.connect.common.templating.Template;
import io.aiven.kafka.connect.common.templating.VariableTemplatePart;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;

public final class TopicPartitionRecordGrouper
implements RecordGrouper {
    private final Template filenameTemplate;
    private final Integer maxRecordsPerFile;
    private final Map<TopicPartition, SinkRecord> currentHeadRecords = new HashMap<TopicPartition, SinkRecord>();
    private final Map<String, List<SinkRecord>> fileBuffers = new HashMap<String, List<SinkRecord>>();
    private final Function<VariableTemplatePart.Parameter, String> setTimestamp;

    public TopicPartitionRecordGrouper(Template filenameTemplate, Integer maxRecordsPerFile, final TimestampSource tsSource) {
        Objects.requireNonNull(filenameTemplate, "filenameTemplate cannot be null");
        Objects.requireNonNull(tsSource, "tsSource cannot be null");
        this.filenameTemplate = filenameTemplate;
        this.maxRecordsPerFile = maxRecordsPerFile;
        this.setTimestamp = new Function<VariableTemplatePart.Parameter, String>(){
            private final Map<String, DateTimeFormatter> timestampFormatters = Map.of("yyyy", DateTimeFormatter.ofPattern("yyyy"), "MM", DateTimeFormatter.ofPattern("MM"), "dd", DateTimeFormatter.ofPattern("dd"), "HH", DateTimeFormatter.ofPattern("HH"));

            @Override
            public String apply(VariableTemplatePart.Parameter parameter) {
                return tsSource.time().format(this.timestampFormatters.get(parameter.value()));
            }
        };
    }

    @Override
    public void put(SinkRecord record) {
        Objects.requireNonNull(record, "record cannot be null");
        TopicPartition tp = new TopicPartition(record.topic(), record.kafkaPartition().intValue());
        SinkRecord currentHeadRecord = this.currentHeadRecords.computeIfAbsent(tp, ignored -> record);
        String recordKey = this.generateRecordKey(tp, currentHeadRecord);
        if (this.shouldCreateNewFile(recordKey)) {
            this.currentHeadRecords.put(tp, record);
            String newRecordKey = this.generateRecordKey(tp, record);
            this.fileBuffers.computeIfAbsent(newRecordKey, ignored -> new ArrayList()).add(record);
        } else {
            this.fileBuffers.computeIfAbsent(recordKey, ignored -> new ArrayList()).add(record);
        }
    }

    private String generateRecordKey(TopicPartition tp, SinkRecord headRecord) {
        Function<VariableTemplatePart.Parameter, String> setKafkaOffset = usePaddingParameter -> usePaddingParameter.asBoolean() != false ? String.format("%020d", headRecord.kafkaOffset()) : Long.toString(headRecord.kafkaOffset());
        return this.filenameTemplate.instance().bindVariable(FilenameTemplateVariable.TOPIC.name, () -> ((TopicPartition)tp).topic()).bindVariable(FilenameTemplateVariable.PARTITION.name, () -> Integer.toString(tp.partition())).bindVariable(FilenameTemplateVariable.START_OFFSET.name, setKafkaOffset).bindVariable(FilenameTemplateVariable.TIMESTAMP.name, this.setTimestamp).render();
    }

    private boolean shouldCreateNewFile(String recordKey) {
        boolean unlimited;
        boolean bl = unlimited = this.maxRecordsPerFile == null;
        if (unlimited) {
            return false;
        }
        List<SinkRecord> buffer = this.fileBuffers.get(recordKey);
        return buffer == null || buffer.size() >= this.maxRecordsPerFile;
    }

    @Override
    public void clear() {
        this.currentHeadRecords.clear();
        this.fileBuffers.clear();
    }

    @Override
    public Map<String, List<SinkRecord>> records() {
        return Collections.unmodifiableMap(this.fileBuffers);
    }
}

