/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.kafka;

import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.core.AttributeWriterBase;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.direct.core.DataAccessor;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.direct.kafka.ElementSerializer;
import cz.o2.proxima.direct.kafka.KafkaConsumerFactory;
import cz.o2.proxima.direct.kafka.KafkaLogReader;
import cz.o2.proxima.direct.kafka.KafkaStreamElement;
import cz.o2.proxima.direct.kafka.KafkaWatermarkConfiguration;
import cz.o2.proxima.direct.kafka.KafkaWriter;
import cz.o2.proxima.direct.kafka.Utils;
import cz.o2.proxima.direct.view.CachedView;
import cz.o2.proxima.direct.view.LocalCachedPartitionedView;
import cz.o2.proxima.internal.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.internal.shaded.com.google.common.base.Strings;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Iterables;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.admin.AdminClient;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.admin.Config;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.admin.ConfigEntry;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.admin.DescribeConfigsResult;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.config.ConfigResource;
import cz.o2.proxima.repository.AttributeFamilyDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.storage.AbstractStorage;
import cz.o2.proxima.storage.commitlog.KeyPartitioner;
import cz.o2.proxima.storage.commitlog.Partitioner;
import cz.o2.proxima.util.Classpath;
import cz.o2.proxima.util.ExceptionUtils;
import java.io.Serializable;
import java.net.URI;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaAccessor
extends AbstractStorage
implements DataAccessor {
    private static final Logger log = LoggerFactory.getLogger(KafkaAccessor.class);
    private static final long serialVersionUID = 1L;
    public static final String POLL_INTERVAL_CFG = "poll.interval";
    public static final String PARTITIONER_CLASS = "partitioner";
    public static final String SERIALIZER_CLASS = "serializer-class";
    public static final String MAX_BYTES_PER_SEC = "bytes-per-sec-max";
    public static final String TIMESTAMP_SKEW = "timestamp-skew";
    public static final String MAX_POLL_RECORDS = "kafka.max.poll.records";
    public static final String AUTO_COMMIT_INTERVAL_MS = "commit.auto-interval-ms";
    public static final String LOG_STALE_COMMIT_INTERVAL_MS = "commit.log-stale-interval-ms";
    public static final String ASSIGNMENT_TIMEOUT_MS = "assignment-timeout-ms";
    public static final String EMPTY_POLL_TIME = "poll.allowed-empty-before-watermark-move";
    public static final String WRITER_CONFIG_PREFIX = "kafka.";
    private static final int PRODUCE_CONFIG_PREFIX_LENGTH = "kafka.".length();
    private final String topic;
    private final Map<String, Object> cfg;
    private Partitioner partitioner = new KeyPartitioner();
    private long consumerPollInterval = 100L;
    private long maxBytesPerSec = Long.MAX_VALUE;
    private long timestampSkew = 100L;
    private int maxPollRecords = 500;
    private long autoCommitIntervalNs = Long.MAX_VALUE;
    private long logStaleCommitIntervalNs = Long.MAX_VALUE;
    private long assignmentTimeoutMillis = 10000L;
    private KafkaWatermarkConfiguration watermarkConfiguration;
    Class<ElementSerializer<?, ?>> serializerClass;

    public KafkaAccessor(EntityDescriptor entity, URI uri, Map<String, Object> cfg) {
        super(entity, uri);
        if (uri.getPath().length() <= 1) {
            throw new IllegalArgumentException("Specify topic by path in URI");
        }
        if (Strings.isNullOrEmpty((String)uri.getAuthority())) {
            throw new IllegalArgumentException("Specify brokers by authority in URI");
        }
        this.cfg = cfg;
        this.topic = Utils.topic(uri);
        this.configure(cfg);
    }

    private void configure(Map<String, Object> cfg) {
        this.consumerPollInterval = Optional.ofNullable(cfg.get(POLL_INTERVAL_CFG)).map(v -> Long.valueOf(v.toString())).orElse(this.consumerPollInterval);
        this.partitioner = Optional.ofNullable((String)cfg.get(PARTITIONER_CLASS)).map(cls -> (Partitioner)Classpath.newInstance((String)cls, Partitioner.class)).orElse(this.partitioner);
        this.maxBytesPerSec = Optional.ofNullable(cfg.get(MAX_BYTES_PER_SEC)).map(v -> Long.valueOf(v.toString())).orElse(this.maxBytesPerSec);
        this.timestampSkew = Optional.ofNullable(cfg.get(TIMESTAMP_SKEW)).map(v -> Long.valueOf(v.toString())).orElse(this.timestampSkew);
        this.maxPollRecords = Optional.ofNullable(cfg.get(MAX_POLL_RECORDS)).map(v -> Integer.valueOf(v.toString())).orElse(this.maxPollRecords);
        this.autoCommitIntervalNs = Optional.ofNullable(cfg.get(AUTO_COMMIT_INTERVAL_MS)).map(v -> Long.valueOf(v.toString()) * 1000000L).orElse(this.autoCommitIntervalNs);
        this.logStaleCommitIntervalNs = Optional.ofNullable(cfg.get(LOG_STALE_COMMIT_INTERVAL_MS)).map(v -> Long.valueOf(v.toString()) * 1000000L).orElse(this.logStaleCommitIntervalNs);
        this.assignmentTimeoutMillis = Optional.ofNullable(cfg.get(ASSIGNMENT_TIMEOUT_MS)).map(v -> Long.parseLong(v.toString())).orElse(this.assignmentTimeoutMillis);
        Class<KafkaStreamElement.KafkaStreamElementSerializer> serializer = Optional.ofNullable(cfg.get(SERIALIZER_CLASS)).map(Object::toString).map(c -> Classpath.findClass((String)c, ElementSerializer.class)).orElse(KafkaStreamElement.KafkaStreamElementSerializer.class);
        this.serializerClass = serializer;
        this.watermarkConfiguration = new KafkaWatermarkConfiguration(cfg);
        log.info("Configured accessor with consumerPollInterval {},partitionerClass {}, maxBytesPerSec {}, timestampSkew {}, maxPollRecords {}, autoCommitIntervalNs {}, logStaleCommitIntervalNs {}, serializerClass {},for URI {}", this.consumerPollInterval, this.partitioner.getClass(), this.maxBytesPerSec, this.timestampSkew, this.maxPollRecords, this.autoCommitIntervalNs, this.logStaleCommitIntervalNs, this.serializerClass, this.getUri());
    }

    Properties createProps() {
        Properties props = new Properties();
        for (Map.Entry<String, Object> e : this.cfg.entrySet()) {
            if (!e.getKey().startsWith(WRITER_CONFIG_PREFIX)) continue;
            props.put(e.getKey().substring(PRODUCE_CONFIG_PREFIX_LENGTH), e.getValue().toString());
        }
        props.put(MAX_POLL_RECORDS, (Object)this.maxPollRecords);
        return props;
    }

    @VisibleForTesting
    AdminClient createAdmin() {
        Properties props = this.createProps();
        props.put("bootstrap.servers", this.getUri().getAuthority());
        return AdminClient.create(props);
    }

    public <K, V> KafkaConsumerFactory<K, V> createConsumerFactory() {
        ElementSerializer<K, V> serializer = this.getSerializer();
        return new KafkaConsumerFactory<K, V>(this.getUri(), this.createProps(), serializer.keySerde(), serializer.valueSerde());
    }

    public boolean isAcceptable(AttributeFamilyDescriptor familyDescriptor) {
        if (familyDescriptor.getAccess().isStateCommitLog()) {
            try (AdminClient adminClient = this.createAdmin();){
                DescribeConfigsResult configsResult = adminClient.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, this.topic)));
                Config config = (Config)ExceptionUtils.uncheckedFactory((ExceptionUtils.ThrowingFactory & Serializable)() -> (Config)Iterables.getOnlyElement(configsResult.all().get().values()));
                ConfigEntry cleanupPolicy = config.get("cleanup.policy");
                boolean bl = this.verifyCleanupPolicy(cleanupPolicy);
                return bl;
            }
        }
        return true;
    }

    @VisibleForTesting
    public boolean verifyCleanupPolicy(ConfigEntry cleanupPolicy) {
        if (cleanupPolicy != null && cleanupPolicy.value().contains("compact")) {
            return true;
        }
        log.warn("Missing option [cleanup.policy=compact] of kafka topic [{}] with access type [state-commit-log].", (Object)this.topic);
        return false;
    }

    public Optional<AttributeWriterBase> getWriter(Context context) {
        return Optional.of(this.newWriter());
    }

    public Optional<CommitLogReader> getCommitLogReader(Context context) {
        return Optional.of(this.newReader(context));
    }

    public Optional<CachedView> getCachedView(Context context) {
        return Optional.of(new LocalCachedPartitionedView(this.getEntityDescriptor(), (CommitLogReader)this.newReader(context), (OnlineAttributeWriter)this.newWriter()));
    }

    KafkaWriter newWriter() {
        return new KafkaWriter(this);
    }

    KafkaLogReader newReader(Context context) {
        return new KafkaLogReader(this, context);
    }

    public <K, V> ElementSerializer<K, V> getSerializer() {
        ElementSerializer res = (ElementSerializer)Classpath.newInstance(this.serializerClass);
        res.setup(this.getEntityDescriptor());
        return res;
    }

    public String getTopic() {
        return this.topic;
    }

    Map<String, Object> getCfg() {
        return this.cfg;
    }

    Partitioner getPartitioner() {
        return this.partitioner;
    }

    long getConsumerPollInterval() {
        return this.consumerPollInterval;
    }

    long getMaxBytesPerSec() {
        return this.maxBytesPerSec;
    }

    long getTimestampSkew() {
        return this.timestampSkew;
    }

    int getMaxPollRecords() {
        return this.maxPollRecords;
    }

    long getAutoCommitIntervalNs() {
        return this.autoCommitIntervalNs;
    }

    long getLogStaleCommitIntervalNs() {
        return this.logStaleCommitIntervalNs;
    }

    long getAssignmentTimeoutMillis() {
        return this.assignmentTimeoutMillis;
    }

    KafkaWatermarkConfiguration getWatermarkConfiguration() {
        return this.watermarkConfiguration;
    }
}

