/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.kafka.ingest;

import io.deephaven.base.Pair;
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.util.unboxer.ChunkUnboxer;
import io.deephaven.kafka.StreamPublisherBase;
import io.deephaven.kafka.ingest.ConsumerRecordToStreamPublisherAdapter;
import io.deephaven.kafka.ingest.KeyOrValueProcessor;
import io.deephaven.stream.StreamChunkUtils;
import io.deephaven.time.DateTimeUtils;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.record.TimestampType;
import org.jetbrains.annotations.NotNull;

public class KafkaStreamPublisher
extends StreamPublisherBase
implements ConsumerRecordToStreamPublisherAdapter {
    public static final int NULL_COLUMN_INDEX = -1;
    private final Runnable shutdownCallback;
    private final int kafkaPartitionColumnIndex;
    private final int offsetColumnIndex;
    private final int timestampColumnIndex;
    private final int simpleKeyColumnIndex;
    private final int simpleValueColumnIndex;
    private final int receiveTimeColumnIndex;
    private final int keyBytesColumnIndex;
    private final int valueBytesColumnIndex;
    private final boolean keyIsSimpleObject;
    private final boolean valueIsSimpleObject;
    private final KeyOrValueProcessor keyProcessor;
    private final KeyOrValueProcessor valueProcessor;
    private final Function<Object, Object> keyToChunkObjectMapper;
    private final Function<Object, Object> valueToChunkObjectMapper;

    private KafkaStreamPublisher(@NotNull TableDefinition tableDefinition, @NotNull Runnable shutdownCallback, int kafkaPartitionColumnIndex, int offsetColumnIndex, int timestampColumnIndex, KeyOrValueProcessor keyProcessor, KeyOrValueProcessor valueProcessor, int simpleKeyColumnIndex, int simpleValueColumnIndex, Function<Object, Object> keyToChunkObjectMapper, Function<Object, Object> valueToChunkObjectMapper, int receiveTimeColumnIndex, int keyBytesColumnIndex, int valueBytesColumnIndex) {
        super(tableDefinition);
        this.shutdownCallback = shutdownCallback;
        this.kafkaPartitionColumnIndex = kafkaPartitionColumnIndex;
        this.offsetColumnIndex = offsetColumnIndex;
        this.timestampColumnIndex = timestampColumnIndex;
        this.simpleKeyColumnIndex = simpleKeyColumnIndex;
        this.simpleValueColumnIndex = simpleValueColumnIndex;
        this.keyProcessor = keyProcessor;
        this.valueProcessor = valueProcessor;
        this.keyToChunkObjectMapper = keyToChunkObjectMapper;
        this.valueToChunkObjectMapper = valueToChunkObjectMapper;
        this.receiveTimeColumnIndex = receiveTimeColumnIndex;
        this.keyBytesColumnIndex = keyBytesColumnIndex;
        this.valueBytesColumnIndex = valueBytesColumnIndex;
        boolean bl = this.keyIsSimpleObject = this.simpleKeyColumnIndex >= 0;
        if (this.keyIsSimpleObject && keyProcessor != null) {
            throw new IllegalArgumentException("Simple Key Column Index can not be set when a keyProcessor is set");
        }
        boolean bl2 = this.valueIsSimpleObject = this.simpleValueColumnIndex >= 0;
        if (this.valueIsSimpleObject && valueProcessor != null) {
            throw new IllegalArgumentException("Simple Value Column Index can not be set when a valueProcessor is set");
        }
    }

    public static ConsumerRecordToStreamPublisherAdapter make(@NotNull Parameters parameters, @NotNull Runnable shutdownCallback) {
        int simpleValueColumnIndex;
        KeyOrValueProcessor valueProcessor;
        int simpleKeyColumnIndex;
        KeyOrValueProcessor keyProcessor;
        if (parameters.getSimpleKeyColumnIndex() == -1) {
            keyProcessor = parameters.getKeyProcessor();
            simpleKeyColumnIndex = -1;
        } else {
            Pair<KeyOrValueProcessor, Integer> keyPair = KafkaStreamPublisher.getProcessorAndSimpleIndex(parameters.getSimpleKeyColumnIndex(), StreamChunkUtils.chunkTypeForColumnIndex((TableDefinition)parameters.getTableDefinition(), (int)parameters.getSimpleKeyColumnIndex()));
            keyProcessor = (KeyOrValueProcessor)keyPair.first;
            simpleKeyColumnIndex = (Integer)keyPair.second;
        }
        if (parameters.getSimpleValueColumnIndex() == -1) {
            valueProcessor = parameters.getValueProcessor();
            simpleValueColumnIndex = -1;
        } else {
            Pair<KeyOrValueProcessor, Integer> valuePair = KafkaStreamPublisher.getProcessorAndSimpleIndex(parameters.getSimpleValueColumnIndex(), StreamChunkUtils.chunkTypeForColumnIndex((TableDefinition)parameters.getTableDefinition(), (int)parameters.getSimpleValueColumnIndex()));
            valueProcessor = (KeyOrValueProcessor)valuePair.first;
            simpleValueColumnIndex = (Integer)valuePair.second;
        }
        return new KafkaStreamPublisher(parameters.getTableDefinition(), shutdownCallback, parameters.getKafkaPartitionColumnIndex(), parameters.getOffsetColumnIndex(), parameters.getTimestampColumnIndex(), keyProcessor, valueProcessor, simpleKeyColumnIndex, simpleValueColumnIndex, parameters.getKeyToChunkObjectMapper(), parameters.getValueToChunkObjectMapper(), parameters.getReceiveTimeColumnIndex(), parameters.getKeyBytesColumnIndex(), parameters.getValueBytesColumnIndex());
    }

    @NotNull
    private static Pair<KeyOrValueProcessor, Integer> getProcessorAndSimpleIndex(int columnIndex, ChunkType chunkType) {
        int simpleIndex;
        SimpleKeyOrValueProcessor processor;
        boolean isSimpleObject;
        boolean bl = isSimpleObject = chunkType == ChunkType.Object;
        if (!isSimpleObject) {
            processor = new SimpleKeyOrValueProcessor(columnIndex, ChunkUnboxer.getEmptyUnboxer((ChunkType)chunkType));
            simpleIndex = -1;
        } else {
            processor = null;
            simpleIndex = columnIndex;
        }
        return new Pair((Object)processor, (Object)simpleIndex);
    }

    private boolean haveKey() {
        return !this.keyIsSimpleObject && this.keyProcessor != null;
    }

    private boolean haveValue() {
        return !this.valueIsSimpleObject && this.valueProcessor != null;
    }

    @Override
    public void propagateFailure(@NotNull Throwable cause) {
        this.consumer.acceptFailure(cause);
    }

    @Override
    public synchronized long consumeRecords(long receiveTime, @NotNull List<? extends ConsumerRecord<?, ?>> records) {
        WritableChunk<Values>[] chunks = this.getChunksToFill();
        this.checkChunkSizes(chunks);
        int remaining = chunks[0].capacity() - chunks[0].size();
        int chunkSize = Math.min(records.size(), chunks[0].capacity());
        long bytesProcessed = 0L;
        try (WritableObjectChunk keyChunkCloseable = this.haveKey() ? WritableObjectChunk.makeWritableChunk((int)chunkSize) : null;
             WritableObjectChunk valueChunkCloseable = this.haveValue() ? WritableObjectChunk.makeWritableChunk((int)chunkSize) : null;){
            Object valueChunk;
            Object keyChunk;
            if (keyChunkCloseable != null) {
                keyChunkCloseable.setSize(0);
                keyChunk = keyChunkCloseable;
            } else {
                keyChunk = this.keyIsSimpleObject ? chunks[this.simpleKeyColumnIndex].asWritableObjectChunk() : null;
            }
            if (valueChunkCloseable != null) {
                valueChunkCloseable.setSize(0);
                valueChunk = valueChunkCloseable;
            } else {
                valueChunk = this.valueIsSimpleObject ? chunks[this.simpleValueColumnIndex].asWritableObjectChunk() : null;
            }
            Object partitionChunk = this.kafkaPartitionColumnIndex >= 0 ? chunks[this.kafkaPartitionColumnIndex].asWritableIntChunk() : null;
            Object offsetChunk = this.offsetColumnIndex >= 0 ? chunks[this.offsetColumnIndex].asWritableLongChunk() : null;
            Object timestampChunk = this.timestampColumnIndex >= 0 ? chunks[this.timestampColumnIndex].asWritableLongChunk() : null;
            Object receiveTimeChunk = this.receiveTimeColumnIndex >= 0 ? chunks[this.receiveTimeColumnIndex].asWritableLongChunk() : null;
            Object keyBytesChunk = this.keyBytesColumnIndex >= 0 ? chunks[this.keyBytesColumnIndex].asWritableIntChunk() : null;
            Object valueBytesChunk = this.valueBytesColumnIndex >= 0 ? chunks[this.valueBytesColumnIndex].asWritableIntChunk() : null;
            for (ConsumerRecord<?, ?> record : records) {
                if (remaining == 0) {
                    if (keyChunk != null) {
                        this.flushKeyChunk((WritableObjectChunk<Object, Values>)keyChunk, chunks);
                    }
                    if (valueChunk != null) {
                        this.flushValueChunk((WritableObjectChunk<Object, Values>)valueChunk, chunks);
                    }
                    this.checkChunkSizes(chunks);
                    this.flush();
                    chunks = this.getChunksToFill();
                    this.checkChunkSizes(chunks);
                    remaining = chunks[0].capacity() - chunks[0].size();
                    Assert.gtZero((int)remaining, (String)"remaining");
                    partitionChunk = this.kafkaPartitionColumnIndex >= 0 ? chunks[this.kafkaPartitionColumnIndex].asWritableIntChunk() : null;
                    offsetChunk = this.offsetColumnIndex >= 0 ? chunks[this.offsetColumnIndex].asWritableLongChunk() : null;
                    timestampChunk = this.timestampColumnIndex >= 0 ? chunks[this.timestampColumnIndex].asWritableLongChunk() : null;
                    receiveTimeChunk = this.receiveTimeColumnIndex >= 0 ? chunks[this.receiveTimeColumnIndex].asWritableLongChunk() : null;
                    keyBytesChunk = this.keyBytesColumnIndex >= 0 ? chunks[this.keyBytesColumnIndex].asWritableIntChunk() : null;
                    valueBytesChunk = this.valueBytesColumnIndex >= 0 ? chunks[this.valueBytesColumnIndex].asWritableIntChunk() : null;
                    if (this.keyIsSimpleObject) {
                        keyChunk = chunks[this.simpleKeyColumnIndex].asWritableObjectChunk();
                    }
                    if (this.valueIsSimpleObject) {
                        valueChunk = chunks[this.simpleValueColumnIndex].asWritableObjectChunk();
                    }
                }
                if (partitionChunk != null) {
                    partitionChunk.add(record.partition());
                }
                if (offsetChunk != null) {
                    offsetChunk.add(record.offset());
                }
                if (timestampChunk != null) {
                    long timestamp = record.timestamp();
                    if (record.timestampType() == TimestampType.NO_TIMESTAMP_TYPE) {
                        timestampChunk.add(Long.MIN_VALUE);
                    } else {
                        timestampChunk.add(DateTimeUtils.millisToNanos((long)timestamp));
                    }
                }
                if (receiveTimeChunk != null) {
                    receiveTimeChunk.add(receiveTime);
                }
                int keyBytes = record.serializedKeySize();
                if (keyBytesChunk != null) {
                    keyBytesChunk.add(keyBytes >= 0 ? keyBytes : Integer.MIN_VALUE);
                }
                int valueBytes = record.serializedValueSize();
                if (valueBytesChunk != null) {
                    valueBytesChunk.add(valueBytes >= 0 ? valueBytes : Integer.MIN_VALUE);
                }
                if (keyChunk != null) {
                    keyChunk.add(this.keyToChunkObjectMapper.apply(record.key()));
                    if (keyBytes > 0) {
                        bytesProcessed += (long)keyBytes;
                    }
                }
                if (valueChunk != null) {
                    valueChunk.add(this.valueToChunkObjectMapper.apply(record.value()));
                    if (valueBytes > 0) {
                        bytesProcessed += (long)valueBytes;
                    }
                }
                --remaining;
            }
            if (keyChunk != null) {
                this.flushKeyChunk((WritableObjectChunk<Object, Values>)keyChunk, chunks);
            }
            if (valueChunk != null) {
                this.flushValueChunk((WritableObjectChunk<Object, Values>)valueChunk, chunks);
            }
            this.checkChunkSizes(chunks);
        }
        return bytesProcessed;
    }

    private void checkChunkSizes(WritableChunk<Values>[] chunks) {
        for (int cc = 1; cc < chunks.length; ++cc) {
            if (chunks[cc].size() == chunks[0].size()) continue;
            throw new IllegalStateException("Publisher chunks have size mismatch: " + Arrays.stream(chunks).map(c -> Integer.toString(c.size())).collect(Collectors.joining(", ")));
        }
    }

    void flushKeyChunk(WritableObjectChunk<Object, Values> objectChunk, WritableChunk<Values>[] publisherChunks) {
        if (this.keyIsSimpleObject) {
            return;
        }
        this.keyProcessor.handleChunk((ObjectChunk<Object, Values>)objectChunk, publisherChunks);
        objectChunk.setSize(0);
    }

    void flushValueChunk(WritableObjectChunk<Object, Values> objectChunk, WritableChunk<Values>[] publisherChunks) {
        if (this.valueIsSimpleObject) {
            return;
        }
        this.valueProcessor.handleChunk((ObjectChunk<Object, Values>)objectChunk, publisherChunks);
        objectChunk.setSize(0);
    }

    public void shutdown() {
        this.shutdownCallback.run();
    }

    public static class Parameters {
        @NotNull
        private final TableDefinition tableDefinition;
        private final int kafkaPartitionColumnIndex;
        private final int offsetColumnIndex;
        private final int timestampColumnIndex;
        private final int receiveTimeColumnIndex;
        private final int keyBytesColumnIndex;
        private final int valueBytesColumnIndex;
        private final KeyOrValueProcessor keyProcessor;
        private final KeyOrValueProcessor valueProcessor;
        private final int simpleKeyColumnIndex;
        private final int simpleValueColumnIndex;
        private final Function<Object, Object> keyToChunkObjectMapper;
        private final Function<Object, Object> valueToChunkObjectMapper;

        private Parameters(@NotNull TableDefinition tableDefinition, int kafkaPartitionColumnIndex, int offsetColumnIndex, int timestampColumnIndex, KeyOrValueProcessor keyProcessor, KeyOrValueProcessor valueProcessor, int simpleKeyColumnIndex, int simpleValueColumnIndex, Function<Object, Object> keyToChunkObjectMapper, Function<Object, Object> valueToChunkObjectMapper, int receiveTimeColumnIndex, int keyBytesColumnIndex, int valueBytesColumnIndex) {
            this.tableDefinition = tableDefinition;
            this.kafkaPartitionColumnIndex = kafkaPartitionColumnIndex;
            this.offsetColumnIndex = offsetColumnIndex;
            this.timestampColumnIndex = timestampColumnIndex;
            this.keyProcessor = keyProcessor;
            this.valueProcessor = valueProcessor;
            this.simpleKeyColumnIndex = simpleKeyColumnIndex;
            this.simpleValueColumnIndex = simpleValueColumnIndex;
            this.keyToChunkObjectMapper = keyToChunkObjectMapper;
            this.valueToChunkObjectMapper = valueToChunkObjectMapper;
            this.receiveTimeColumnIndex = receiveTimeColumnIndex;
            this.keyBytesColumnIndex = keyBytesColumnIndex;
            this.valueBytesColumnIndex = valueBytesColumnIndex;
        }

        @NotNull
        public TableDefinition getTableDefinition() {
            return this.tableDefinition;
        }

        public int getKafkaPartitionColumnIndex() {
            return this.kafkaPartitionColumnIndex;
        }

        public int getOffsetColumnIndex() {
            return this.offsetColumnIndex;
        }

        public int getTimestampColumnIndex() {
            return this.timestampColumnIndex;
        }

        public KeyOrValueProcessor getKeyProcessor() {
            return this.keyProcessor;
        }

        public KeyOrValueProcessor getValueProcessor() {
            return this.valueProcessor;
        }

        public int getSimpleKeyColumnIndex() {
            return this.simpleKeyColumnIndex;
        }

        public int getSimpleValueColumnIndex() {
            return this.simpleValueColumnIndex;
        }

        public int getReceiveTimeColumnIndex() {
            return this.receiveTimeColumnIndex;
        }

        public int getKeyBytesColumnIndex() {
            return this.keyBytesColumnIndex;
        }

        public int getValueBytesColumnIndex() {
            return this.valueBytesColumnIndex;
        }

        public Function<Object, Object> getKeyToChunkObjectMapper() {
            return this.keyToChunkObjectMapper;
        }

        public Function<Object, Object> getValueToChunkObjectMapper() {
            return this.valueToChunkObjectMapper;
        }

        public static Builder builder() {
            return new Builder();
        }

        public static class Builder {
            private TableDefinition tableDefinition;
            private int kafkaPartitionColumnIndex = -1;
            private int offsetColumnIndex = -1;
            private int timestampColumnIndex = -1;
            private int receiveTimeColumnIndex = -1;
            private int keyBytesColumnIndex = -1;
            private int valueBytesColumnIndex = -1;
            private KeyOrValueProcessor keyProcessor;
            private KeyOrValueProcessor valueProcessor;
            private int simpleKeyColumnIndex = -1;
            private int simpleValueColumnIndex = -1;
            private Function<Object, Object> keyToChunkObjectMapper = Function.identity();
            private Function<Object, Object> valueToChunkObjectMapper = Function.identity();

            private Builder() {
            }

            public Builder setTableDefinition(@NotNull TableDefinition tableDefinition) {
                this.tableDefinition = tableDefinition;
                return this;
            }

            public Builder setKafkaPartitionColumnIndex(int kafkaPartitionColumnIndex) {
                this.kafkaPartitionColumnIndex = kafkaPartitionColumnIndex;
                return this;
            }

            public Builder setOffsetColumnIndex(int offsetColumnIndex) {
                this.offsetColumnIndex = offsetColumnIndex;
                return this;
            }

            public Builder setTimestampColumnIndex(int timestampColumnIndex) {
                this.timestampColumnIndex = timestampColumnIndex;
                return this;
            }

            public Builder setReceiveTimeColumnIndex(int receiveTimeColumnIndex) {
                this.receiveTimeColumnIndex = receiveTimeColumnIndex;
                return this;
            }

            public Builder setKeyBytesColumnIndex(int keyBytesColumnIndex) {
                this.keyBytesColumnIndex = keyBytesColumnIndex;
                return this;
            }

            public Builder setValueBytesColumnIndex(int valueBytesColumnIndex) {
                this.valueBytesColumnIndex = valueBytesColumnIndex;
                return this;
            }

            public Builder setKeyProcessor(KeyOrValueProcessor keyProcessor) {
                this.keyProcessor = keyProcessor;
                return this;
            }

            public Builder setValueProcessor(KeyOrValueProcessor valueProcessor) {
                this.valueProcessor = valueProcessor;
                return this;
            }

            public Builder setSimpleKeyColumnIndex(int simpleKeyColumnIndex) {
                this.simpleKeyColumnIndex = simpleKeyColumnIndex;
                return this;
            }

            public Builder setSimpleValueColumnIndex(int simpleValueColumnIndex) {
                this.simpleValueColumnIndex = simpleValueColumnIndex;
                return this;
            }

            public Builder setKeyToChunkObjectMapper(@NotNull Function<Object, Object> keyToChunkObjectMapper) {
                this.keyToChunkObjectMapper = keyToChunkObjectMapper;
                return this;
            }

            public Builder setValueToChunkObjectMapper(@NotNull Function<Object, Object> valueToChunkObjectMapper) {
                this.valueToChunkObjectMapper = valueToChunkObjectMapper;
                return this;
            }

            public Parameters build() {
                if (this.keyProcessor != null && this.simpleKeyColumnIndex >= 0) {
                    throw new IllegalArgumentException("Only one of keyProcessor or simpleKeyColumnIndex may be set");
                }
                if (this.valueProcessor != null && this.simpleValueColumnIndex >= 0) {
                    throw new IllegalArgumentException("Only one of valueProcessor or simpleValueColumnIndex may be set");
                }
                return new Parameters(this.tableDefinition, this.kafkaPartitionColumnIndex, this.offsetColumnIndex, this.timestampColumnIndex, this.keyProcessor, this.valueProcessor, this.simpleKeyColumnIndex, this.simpleValueColumnIndex, this.keyToChunkObjectMapper, this.valueToChunkObjectMapper, this.receiveTimeColumnIndex, this.keyBytesColumnIndex, this.valueBytesColumnIndex);
            }
        }
    }

    static class SimpleKeyOrValueProcessor
    implements KeyOrValueProcessor {
        final int offset;
        final ChunkUnboxer.UnboxerKernel unboxer;

        SimpleKeyOrValueProcessor(int offset, ChunkUnboxer.UnboxerKernel unboxer) {
            this.offset = offset;
            this.unboxer = unboxer;
        }

        @Override
        public void handleChunk(ObjectChunk<Object, Values> inputChunk, WritableChunk<Values>[] publisherChunks) {
            WritableChunk<Values> publisherChunk = publisherChunks[this.offset];
            int existingSize = publisherChunk.size();
            publisherChunk.setSize(existingSize + inputChunk.size());
            this.unboxer.unboxTo(inputChunk, publisherChunk, 0, existingSize);
        }
    }
}

