/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.kafka;

import cz.o2.proxima.direct.kafka.ElementSerializer;
import cz.o2.proxima.internal.shaded.com.google.common.base.MoreObjects;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.serialization.Serde;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.serialization.Serdes;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.util.Pair;
import java.util.Optional;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaStreamElement
extends StreamElement {
    private static final Logger log = LoggerFactory.getLogger(KafkaStreamElement.class);
    private static final long serialVersionUID = 1L;
    private final int partition;
    private final long offset;

    KafkaStreamElement(EntityDescriptor entityDesc, AttributeDescriptor<?> attributeDesc, String uuid, String key, String attribute, long stamp, byte[] value, int partition, long offset) {
        super(entityDesc, attributeDesc, uuid, key, attribute, stamp, false, value);
        this.partition = partition;
        this.offset = offset;
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)((Object)this)).add("entityDesc", (Object)this.getEntityDescriptor()).add("attributeDesc", (Object)this.getAttributeDescriptor()).add("uuid", (Object)this.getUuid()).add("key", (Object)this.getKey()).add("attribute", (Object)this.getAttribute()).add("stamp", this.getStamp()).add("value.length", this.getValue() == null ? -1 : this.getValue().length).add("partition", this.partition).add("offset", this.offset).toString();
    }

    public int getPartition() {
        return this.partition;
    }

    public long getOffset() {
        return this.offset;
    }

    public static class KafkaStreamElementSerializer
    implements ElementSerializer<String, byte[]> {
        private static final long serialVersionUID = 1L;

        @Override
        @Nullable
        public StreamElement read(ConsumerRecord<String, byte[]> record, EntityDescriptor entityDesc) {
            String key = record.key();
            byte[] value = record.value();
            int hashPos = key.lastIndexOf(35);
            if (hashPos < 0 || hashPos >= key.length()) {
                log.error("Invalid key in kafka topic: {}", (Object)key);
            } else {
                String entityKey = key.substring(0, hashPos);
                String attribute = key.substring(hashPos + 1);
                Optional attr = entityDesc.findAttribute(attribute, true);
                if (!attr.isPresent()) {
                    log.error("Invalid attribute {} in kafka key {}", (Object)attribute, (Object)key);
                } else {
                    return new KafkaStreamElement(entityDesc, (AttributeDescriptor)attr.get(), record.topic() + "#" + record.partition() + "#" + record.offset(), entityKey, attribute, record.timestamp(), value, record.partition(), record.offset());
                }
            }
            return null;
        }

        @Override
        public Pair<String, byte[]> write(StreamElement data) {
            return Pair.of((Object)(data.getKey() + "#" + data.getAttribute()), (Object)data.getValue());
        }

        @Override
        public Serde<String> keySerde() {
            return Serdes.String();
        }

        @Override
        public Serde<byte[]> valueSerde() {
            return Serdes.ByteArray();
        }
    }
}

