/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import gnu.trove.map.hash.TIntLongHashMap;
import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.annotations.SimpleStyle;
import io.deephaven.annotations.SingletonStyle;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.liveness.LivenessManager;
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.liveness.LivenessScope;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.rowset.TrackingRowSet;
import io.deephaven.engine.rowset.TrackingWritableRowSet;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.PartitionedTable;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.WritableColumnSource;
import io.deephaven.engine.table.impl.BlinkTableTools;
import io.deephaven.engine.table.impl.ConstituentDependency;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.impl.TableUpdateImpl;
import io.deephaven.engine.table.impl.partitioned.PartitionedTableImpl;
import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource;
import io.deephaven.engine.table.impl.sources.ring.RingTableTools;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateSourceCombiner;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.kafka.AvroImpl;
import io.deephaven.kafka.IgnoreImpl;
import io.deephaven.kafka.ImmutableAppend;
import io.deephaven.kafka.ImmutableBlink;
import io.deephaven.kafka.ImmutablePerPartition;
import io.deephaven.kafka.ImmutableRing;
import io.deephaven.kafka.ImmutableSingle;
import io.deephaven.kafka.JsonImpl;
import io.deephaven.kafka.KafkaPublishOptions;
import io.deephaven.kafka.KeyOrValueSpecObjectProcessorImpl;
import io.deephaven.kafka.ProtobufImpl;
import io.deephaven.kafka.RawImpl;
import io.deephaven.kafka.SimpleImpl;
import io.deephaven.kafka.ingest.ConsumerRecordToStreamPublisherAdapter;
import io.deephaven.kafka.ingest.KafkaIngester;
import io.deephaven.kafka.ingest.KafkaRecordConsumer;
import io.deephaven.kafka.ingest.KafkaStreamPublisher;
import io.deephaven.kafka.ingest.KeyOrValueProcessor;
import io.deephaven.kafka.protobuf.ProtobufConsumeOptions;
import io.deephaven.kafka.publish.KafkaPublisherException;
import io.deephaven.kafka.publish.KeyOrValueSerializer;
import io.deephaven.kafka.publish.PublishToKafka;
import io.deephaven.processor.NamedObjectProcessor;
import io.deephaven.processor.ObjectProcessor;
import io.deephaven.qst.column.header.ColumnHeader;
import io.deephaven.qst.type.Type;
import io.deephaven.stream.StreamPublisher;
import io.deephaven.stream.StreamToBlinkTableAdapter;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.ReferentialIntegrity;
import io.deephaven.util.annotations.ScriptApi;
import io.deephaven.util.mutable.MutableInt;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.IntPredicate;
import java.util.function.IntToLongFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.avro.Schema;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.apache.kafka.common.serialization.ByteBufferSerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.DoubleDeserializer;
import org.apache.kafka.common.serialization.DoubleSerializer;
import org.apache.kafka.common.serialization.FloatDeserializer;
import org.apache.kafka.common.serialization.FloatSerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.ShortDeserializer;
import org.apache.kafka.common.serialization.ShortSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.immutables.value.Value;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public class KafkaTools {
    public static final String KAFKA_PARTITION_COLUMN_NAME_PROPERTY = "deephaven.partition.column.name";
    public static final String KAFKA_PARTITION_COLUMN_NAME_DEFAULT = "KafkaPartition";
    public static final String OFFSET_COLUMN_NAME_PROPERTY = "deephaven.offset.column.name";
    public static final String OFFSET_COLUMN_NAME_DEFAULT = "KafkaOffset";
    public static final String TIMESTAMP_COLUMN_NAME_PROPERTY = "deephaven.timestamp.column.name";
    public static final String TIMESTAMP_COLUMN_NAME_DEFAULT = "KafkaTimestamp";
    public static final String RECEIVE_TIME_COLUMN_NAME_PROPERTY = "deephaven.receivetime.column.name";
    public static final String RECEIVE_TIME_COLUMN_NAME_DEFAULT = null;
    public static final String KEY_BYTES_COLUMN_NAME_PROPERTY = "deephaven.keybytes.column.name";
    public static final String KEY_BYTES_COLUMN_NAME_DEFAULT = null;
    public static final String VALUE_BYTES_COLUMN_NAME_PROPERTY = "deephaven.valuebytes.column.name";
    public static final String VALUE_BYTES_COLUMN_NAME_DEFAULT = null;
    public static final String KEY_COLUMN_NAME_PROPERTY = "deephaven.key.column.name";
    public static final String KEY_COLUMN_NAME_DEFAULT = "KafkaKey";
    public static final String VALUE_COLUMN_NAME_PROPERTY = "deephaven.value.column.name";
    public static final String VALUE_COLUMN_NAME_DEFAULT = "KafkaValue";
    public static final String KEY_COLUMN_TYPE_PROPERTY = "deephaven.key.column.type";
    public static final String VALUE_COLUMN_TYPE_PROPERTY = "deephaven.value.column.type";
    public static final String SCHEMA_SERVER_PROPERTY = "schema.registry.url";
    public static final String SHORT_DESERIALIZER = ShortDeserializer.class.getName();
    public static final String INT_DESERIALIZER = IntegerDeserializer.class.getName();
    public static final String LONG_DESERIALIZER = LongDeserializer.class.getName();
    public static final String FLOAT_DESERIALIZER = FloatDeserializer.class.getName();
    public static final String DOUBLE_DESERIALIZER = DoubleDeserializer.class.getName();
    public static final String BYTE_ARRAY_DESERIALIZER = ByteArrayDeserializer.class.getName();
    public static final String STRING_DESERIALIZER = StringDeserializer.class.getName();
    public static final String BYTE_BUFFER_DESERIALIZER = ByteBufferDeserializer.class.getName();
    public static final String AVRO_DESERIALIZER = KafkaAvroDeserializer.class.getName();
    public static final String DESERIALIZER_FOR_IGNORE = BYTE_BUFFER_DESERIALIZER;
    public static final String SHORT_SERIALIZER = ShortSerializer.class.getName();
    public static final String INT_SERIALIZER = IntegerSerializer.class.getName();
    public static final String LONG_SERIALIZER = LongSerializer.class.getName();
    public static final String FLOAT_SERIALIZER = FloatSerializer.class.getName();
    public static final String DOUBLE_SERIALIZER = DoubleSerializer.class.getName();
    public static final String BYTE_ARRAY_SERIALIZER = ByteArraySerializer.class.getName();
    public static final String STRING_SERIALIZER = StringSerializer.class.getName();
    public static final String BYTE_BUFFER_SERIALIZER = ByteBufferSerializer.class.getName();
    public static final String AVRO_SERIALIZER = KafkaAvroSerializer.class.getName();
    public static final String SERIALIZER_FOR_IGNORE = BYTE_BUFFER_SERIALIZER;
    public static final String NESTED_FIELD_NAME_SEPARATOR = ".";
    public static final String NESTED_FIELD_COLUMN_NAME_SEPARATOR = "__";
    public static final String AVRO_LATEST_VERSION = "latest";
    private static final Logger log = LoggerFactory.getLogger(KafkaTools.class);
    public static final long SEEK_TO_BEGINNING = -1L;
    public static final long DONT_SEEK = -2L;
    public static final long SEEK_TO_END = -3L;
    public static final IntPredicate ALL_PARTITIONS = KafkaIngester.ALL_PARTITIONS;
    public static final IntToLongFunction ALL_PARTITIONS_SEEK_TO_BEGINNING = KafkaIngester.ALL_PARTITIONS_SEEK_TO_BEGINNING;
    public static final IntToLongFunction ALL_PARTITIONS_DONT_SEEK = KafkaIngester.ALL_PARTITIONS_DONT_SEEK;
    public static final IntToLongFunction ALL_PARTITIONS_SEEK_TO_END = KafkaIngester.ALL_PARTITIONS_SEEK_TO_END;
    public static final Function<String, String> DIRECT_MAPPING = fieldName -> fieldName.replace(NESTED_FIELD_NAME_SEPARATOR, NESTED_FIELD_COLUMN_NAME_SEPARATOR);
    public static final Consume.KeyOrValueSpec FROM_PROPERTIES = Consume.FROM_PROPERTIES;

    public static Schema getAvroSchema(String avroSchemaAsJsonString) {
        return new Schema.Parser().parse(avroSchemaAsJsonString);
    }

    public static Schema columnDefinitionsToAvroSchema(Table t, String schemaName, String namespace, Properties colProps, Predicate<String> includeOnly, Predicate<String> exclude, MutableObject<Properties> colPropsOut) {
        return AvroImpl.columnDefinitionsToAvroSchema(t, schemaName, namespace, colProps, includeOnly, exclude, colPropsOut);
    }

    public static void avroSchemaToColumnDefinitions(List<ColumnDefinition<?>> columnsOut, Map<String, String> fieldPathToColumnNameOut, Schema schema, Function<String, String> requestedFieldPathToColumnName) {
        KafkaTools.avroSchemaToColumnDefinitions(columnsOut, fieldPathToColumnNameOut, schema, requestedFieldPathToColumnName, false);
    }

    public static void avroSchemaToColumnDefinitions(List<ColumnDefinition<?>> columnsOut, Map<String, String> fieldPathToColumnNameOut, Schema schema, Function<String, String> requestedFieldPathToColumnName, boolean useUTF8Strings) {
        AvroImpl.avroSchemaToColumnDefinitions(columnsOut, fieldPathToColumnNameOut, schema, requestedFieldPathToColumnName, useUTF8Strings);
    }

    public static void avroSchemaToColumnDefinitions(List<ColumnDefinition<?>> columnsOut, Schema schema, Function<String, String> requestedFieldPathToColumnName) {
        KafkaTools.avroSchemaToColumnDefinitions(columnsOut, null, schema, requestedFieldPathToColumnName);
    }

    public static void avroSchemaToColumnDefinitions(List<ColumnDefinition<?>> columnsOut, Schema schema) {
        KafkaTools.avroSchemaToColumnDefinitions(columnsOut, schema, DIRECT_MAPPING);
    }

    @ScriptApi
    public static TableType friendlyNameToTableType(@NotNull String typeName) {
        String[] split = typeName.split(":");
        switch (split[0].trim()) {
            case "blink": 
            case "stream": {
                if (split.length != 1) {
                    throw KafkaTools.unexpectedType(typeName, null);
                }
                return TableType.blink();
            }
            case "append": {
                if (split.length != 1) {
                    throw KafkaTools.unexpectedType(typeName, null);
                }
                return TableType.append();
            }
            case "ring": {
                if (split.length != 2) {
                    throw KafkaTools.unexpectedType(typeName, null);
                }
                try {
                    return TableType.ring(Integer.parseInt(split[1].trim()));
                }
                catch (NumberFormatException e) {
                    throw KafkaTools.unexpectedType(typeName, e);
                }
            }
        }
        throw KafkaTools.unexpectedType(typeName, null);
    }

    private static IllegalArgumentException unexpectedType(@NotNull String typeName, @Nullable Exception cause) {
        return new IllegalArgumentException("Unexpected type format \"" + typeName + "\", expected \"blink\", \"append\", or \"ring:<capacity>\"", cause);
    }

    public static Table consumeToTable(@NotNull Properties kafkaProperties, @NotNull String topic, @NotNull IntPredicate partitionFilter, @NotNull IntToLongFunction partitionToInitialOffset, @NotNull Consume.KeyOrValueSpec keySpec, @NotNull Consume.KeyOrValueSpec valueSpec, @NotNull TableType tableType) {
        MutableObject resultHolder = new MutableObject();
        ExecutionContext enclosingExecutionContext = ExecutionContext.getContext();
        LivenessManager enclosingLivenessManager = LivenessScopeStack.peek();
        SingleConsumerRegistrar registrar = (tableDefinition, streamPublisher) -> {
            try (SafeCloseable ignored1 = enclosingExecutionContext.open();
                 SafeCloseable ignored2 = LivenessScopeStack.open();){
                StreamToBlinkTableAdapter streamToBlinkTableAdapter = new StreamToBlinkTableAdapter(tableDefinition, streamPublisher, (UpdateSourceRegistrar)enclosingExecutionContext.getUpdateGraph(), "Kafka-" + topic + "-" + partitionFilter);
                Table blinkTable = streamToBlinkTableAdapter.table();
                Table result = tableType.walk(new BlinkTableOperation(blinkTable));
                enclosingLivenessManager.manage((LivenessReferent)result);
                resultHolder.setValue((Object)result);
            }
        };
        KafkaTools.consume(kafkaProperties, topic, partitionFilter, InitialOffsetLookup.adapt(partitionToInitialOffset), keySpec, valueSpec, StreamConsumerRegistrarProvider.single(registrar), null);
        return (Table)resultHolder.getValue();
    }

    public static PartitionedTable consumeToPartitionedTable(@NotNull Properties kafkaProperties, @NotNull String topic, @NotNull IntPredicate partitionFilter, @NotNull IntToLongFunction partitionToInitialOffset, @NotNull Consume.KeyOrValueSpec keySpec, @NotNull Consume.KeyOrValueSpec valueSpec, @NotNull TableType tableType) {
        AtomicReference resultHolder = new AtomicReference();
        ExecutionContext enclosingExecutionContext = ExecutionContext.getContext();
        LivenessManager enclosingLivenessManager = LivenessScopeStack.peek();
        PerPartitionConsumerRegistrar registrar = (tableDefinition, topicPartition, streamPublisher) -> {
            try (SafeCloseable ignored1 = enclosingExecutionContext.open();
                 SafeCloseable ignored2 = LivenessScopeStack.open();){
                StreamPartitionedQueryTable result = (StreamPartitionedQueryTable)resultHolder.get();
                if (result == null) {
                    AtomicReference atomicReference = resultHolder;
                    synchronized (atomicReference) {
                        result = (StreamPartitionedQueryTable)resultHolder.get();
                        if (result == null) {
                            result = new StreamPartitionedQueryTable(tableDefinition);
                            enclosingLivenessManager.manage((LivenessReferent)result);
                            resultHolder.set(result);
                        }
                    }
                }
                StreamToBlinkTableAdapter streamToBlinkTableAdapter = new StreamToBlinkTableAdapter(tableDefinition, streamPublisher, result.getRegistrar(), "Kafka-" + topic + "-" + topicPartition.partition());
                Table blinkTable = streamToBlinkTableAdapter.table();
                Table derivedTable = tableType.walk(new BlinkTableOperation(blinkTable));
                result.enqueueAdd(topicPartition.partition(), derivedTable);
            }
        };
        KafkaTools.consume(kafkaProperties, topic, partitionFilter, InitialOffsetLookup.adapt(partitionToInitialOffset), keySpec, valueSpec, StreamConsumerRegistrarProvider.perPartition(registrar), null);
        return ((StreamPartitionedQueryTable)resultHolder.get()).toPartitionedTable();
    }

    public static void consume(@NotNull Properties kafkaProperties, @NotNull String topic, @NotNull IntPredicate partitionFilter, @NotNull InitialOffsetLookup partitionToInitialOffset, @NotNull Consume.KeyOrValueSpec keySpec, @NotNull Consume.KeyOrValueSpec valueSpec, @NotNull StreamConsumerRegistrarProvider streamConsumerRegistrarProvider, @Nullable ConsumerLoopCallback consumerLoopCallback) {
        if (Consume.isIgnore(keySpec) && Consume.isIgnore(valueSpec)) {
            throw new IllegalArgumentException("can't ignore both key and value: keySpec and valueSpec can't both be ignore specs");
        }
        Map<String, ?> configs = KafkaTools.asStringMap(kafkaProperties);
        SchemaRegistryClient schemaRegistryClient = KafkaTools.schemaRegistryClient(keySpec, valueSpec, configs).orElse(null);
        Deserializer<?> keyDeser = keySpec.getDeserializer(KeyOrValue.KEY, schemaRegistryClient, configs);
        keyDeser.configure(configs, true);
        Deserializer<?> valueDeser = valueSpec.getDeserializer(KeyOrValue.VALUE, schemaRegistryClient, configs);
        valueDeser.configure(configs, false);
        KafkaStreamPublisher.Parameters.Builder publisherParametersBuilder = KafkaStreamPublisher.Parameters.builder();
        MutableInt nextColumnIndex = new MutableInt(0);
        ArrayList columnDefinitions = new ArrayList();
        Arrays.stream(CommonColumn.values()).forEach(cc -> {
            ColumnDefinition<?> commonColumnDefinition = cc.getDefinition(kafkaProperties);
            if (commonColumnDefinition == null) {
                return;
            }
            columnDefinitions.add(commonColumnDefinition);
            cc.setColumnIndex.setColumnIndex(publisherParametersBuilder, nextColumnIndex.getAndIncrement());
        });
        KeyOrValueIngestData keyIngestData = keySpec.getIngestData(KeyOrValue.KEY, schemaRegistryClient, configs, nextColumnIndex, columnDefinitions);
        KeyOrValueIngestData valueIngestData = valueSpec.getIngestData(KeyOrValue.VALUE, schemaRegistryClient, configs, nextColumnIndex, columnDefinitions);
        TableDefinition tableDefinition = TableDefinition.of(columnDefinitions);
        publisherParametersBuilder.setTableDefinition(tableDefinition);
        if (keyIngestData != null) {
            publisherParametersBuilder.setKeyProcessor(keySpec.getProcessor(tableDefinition, keyIngestData)).setSimpleKeyColumnIndex(keyIngestData.simpleColumnIndex).setKeyToChunkObjectMapper(keyIngestData.toObjectChunkMapper);
        }
        if (valueIngestData != null) {
            publisherParametersBuilder.setValueProcessor(valueSpec.getProcessor(tableDefinition, valueIngestData)).setSimpleValueColumnIndex(valueIngestData.simpleColumnIndex).setValueToChunkObjectMapper(valueIngestData.toObjectChunkMapper);
        }
        KafkaStreamPublisher.Parameters publisherParameters = publisherParametersBuilder.build();
        MutableObject kafkaIngesterHolder = new MutableObject();
        Function<TopicPartition, KafkaRecordConsumer> kafkaRecordConsumerFactory = streamConsumerRegistrarProvider.walk(new KafkaRecordConsumerFactoryCreator(publisherParameters, () -> ((MutableObject)kafkaIngesterHolder).getValue()));
        KafkaIngester ingester = new KafkaIngester(log, kafkaProperties, topic, partitionFilter, kafkaRecordConsumerFactory, partitionToInitialOffset, keyDeser, valueDeser, consumerLoopCallback);
        kafkaIngesterHolder.setValue((Object)ingester);
        ingester.start();
    }

    private static Optional<SchemaRegistryClient> schemaRegistryClient(SchemaProviderProvider key, SchemaProviderProvider value, Map<String, ?> configs) {
        HashMap providers = new HashMap();
        key.getSchemaProvider().ifPresent(p -> providers.put(p.schemaType(), p));
        value.getSchemaProvider().ifPresent(p -> providers.putIfAbsent(p.schemaType(), p));
        if (providers.isEmpty()) {
            return Optional.empty();
        }
        for (SchemaProvider schemaProvider : providers.values()) {
            schemaProvider.configure(configs);
        }
        return Optional.of(KafkaTools.newSchemaRegistryClient(configs, List.copyOf(providers.values())));
    }

    static SchemaRegistryClient newSchemaRegistryClient(Map<String, ?> configs, List<SchemaProvider> providers) {
        AbstractKafkaSchemaSerDeConfig config = new AbstractKafkaSchemaSerDeConfig(AbstractKafkaSchemaSerDeConfig.baseConfigDef(), configs);
        return new CachedSchemaRegistryClient(config.getSchemaRegistryUrls(), config.getMaxSchemasPerSubject(), List.copyOf(providers), config.originalsWithPrefix(""), config.requestHeaders());
    }

    public static Runnable produceFromTable(@NotNull Table table, @NotNull Properties kafkaProperties, @NotNull String topic, @NotNull Produce.KeyOrValueSpec keySpec, @NotNull Produce.KeyOrValueSpec valueSpec, boolean lastByKeyColumns) {
        return KafkaTools.produceFromTable(KafkaPublishOptions.builder().table(table).topic(topic).config(kafkaProperties).keySpec(keySpec).valueSpec(valueSpec).lastBy(lastByKeyColumns && !Produce.isIgnore(keySpec)).publishInitial(true).build());
    }

    public static Runnable produceFromTable(KafkaPublishOptions options) {
        Table table = options.table();
        try {
            QueryTable.checkInitiateOperation((Table)table);
        }
        catch (IllegalStateException e) {
            throw new KafkaPublisherException("Calling thread must hold an exclusive or shared UpdateGraph lock to publish live sources", e);
        }
        Map<String, ?> config = KafkaTools.asStringMap(options.config());
        Produce.KeyOrValueSpec keySpec = options.keySpec();
        Produce.KeyOrValueSpec valueSpec = options.valueSpec();
        SchemaRegistryClient schemaRegistryClient = KafkaTools.schemaRegistryClient(keySpec, valueSpec, config).orElse(null);
        TableDefinition tableDefinition = table.getDefinition();
        Serializer<?> keySpecSerializer = keySpec.getSerializer(schemaRegistryClient, tableDefinition);
        keySpecSerializer.configure(config, true);
        Serializer<?> valueSpecSerializer = valueSpec.getSerializer(schemaRegistryClient, tableDefinition);
        valueSpecSerializer.configure(config, false);
        String[] keyColumns = keySpec.getColumnNames(table, schemaRegistryClient);
        String[] valueColumns = valueSpec.getColumnNames(table, schemaRegistryClient);
        LivenessScope publisherScope = new LivenessScope(true);
        try (SafeCloseable ignored = LivenessScopeStack.open((LivenessScope)publisherScope, (boolean)false);){
            Table effectiveTable = options.lastBy() ? (Table)table.lastBy(keyColumns) : table.coalesce();
            KeyOrValueSerializer<?> keySerializer = keySpec.getKeyOrValueSerializer(effectiveTable, keyColumns);
            KeyOrValueSerializer<?> valueSerializer = valueSpec.getKeyOrValueSerializer(effectiveTable, valueColumns);
            new PublishToKafka(options.config(), effectiveTable, options.topic(), options.partition().isEmpty() ? null : Integer.valueOf(options.partition().getAsInt()), keyColumns, keySpecSerializer, keySerializer, valueColumns, valueSpecSerializer, valueSerializer, options.topicColumn().orElse(null), options.partitionColumn().orElse(null), options.timestampColumn().orElse(null), options.publishInitial());
        }
        return () -> ((LivenessScope)publisherScope).release();
    }

    public static IntPredicate partitionFilterFromArray(int[] partitions) {
        Arrays.sort(partitions);
        return p -> Arrays.binarySearch(partitions, p) >= 0;
    }

    public static IntToLongFunction partitionToOffsetFromParallelArrays(int[] partitions, long[] offsets) {
        if (partitions.length != offsets.length) {
            throw new IllegalArgumentException("lengths of array arguments do not match");
        }
        TIntLongHashMap map = new TIntLongHashMap(partitions.length, 0.5f, 0, -2L);
        for (int i = 0; i < partitions.length; ++i) {
            map.put(partitions[i], offsets[i]);
        }
        return arg_0 -> ((TIntLongHashMap)map).get(arg_0);
    }

    public static Predicate<String> predicateFromSet(Set<String> set) {
        return set == null ? null : set::contains;
    }

    public static Set<String> topics(@NotNull Properties kafkaProperties) {
        Set set;
        block8: {
            Admin admin = Admin.create((Properties)kafkaProperties);
            try {
                ListTopicsResult result = admin.listTopics();
                set = (Set)result.names().get();
                if (admin == null) break block8;
            }
            catch (Throwable throwable) {
                try {
                    if (admin != null) {
                        try {
                            admin.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException("Failed to list Kafka Topics for " + kafkaProperties, e);
                }
            }
            admin.close();
        }
        return set;
    }

    public static String[] listTopics(@NotNull Properties kafkaProperties) {
        Set<String> topics = KafkaTools.topics(kafkaProperties);
        String[] r = new String[topics.size()];
        return topics.toArray(r);
    }

    static Map<String, ?> asStringMap(Map<?, ?> map) {
        for (Map.Entry<?, ?> entry : map.entrySet()) {
            Object key = entry.getKey();
            if (key instanceof String) continue;
            throw new UncheckedDeephavenException(String.format("key must be a string, is key.getClass().getName()=%s, key.toString()=%s", key.getClass().getName(), key));
        }
        return map;
    }

    private static class IntToLongLookupAdapter
    implements InitialOffsetLookup {
        private final IntToLongFunction function;

        IntToLongLookupAdapter(IntToLongFunction function) {
            this.function = Objects.requireNonNull(function);
        }

        @Override
        public long getInitialOffset(KafkaConsumer<?, ?> consumer, TopicPartition topicPartition) {
            return this.function.applyAsLong(topicPartition.partition());
        }
    }

    private static class SimpleKafkaRecordConsumer
    implements KafkaRecordConsumer {
        private final ConsumerRecordToStreamPublisherAdapter adapter;

        private SimpleKafkaRecordConsumer(@NotNull ConsumerRecordToStreamPublisherAdapter adapter) {
            this.adapter = adapter;
        }

        @Override
        public long consume(long receiveTime, @NotNull List<? extends ConsumerRecord<?, ?>> consumerRecords) {
            try {
                return this.adapter.consumeRecords(receiveTime, consumerRecords);
            }
            catch (Exception e) {
                this.acceptFailure(e);
                return 0L;
            }
        }

        public void acceptFailure(@NotNull Throwable cause) {
            this.adapter.propagateFailure(cause);
        }
    }

    private static enum CommonColumn {
        KafkaPartition("deephaven.partition.column.name", "KafkaPartition", ColumnDefinition::ofInt, KafkaStreamPublisher.Parameters.Builder::setKafkaPartitionColumnIndex),
        Offset("deephaven.offset.column.name", "KafkaOffset", ColumnDefinition::ofLong, KafkaStreamPublisher.Parameters.Builder::setOffsetColumnIndex),
        Timestamp("deephaven.timestamp.column.name", "KafkaTimestamp", ColumnDefinition::ofTime, KafkaStreamPublisher.Parameters.Builder::setTimestampColumnIndex),
        ReceiveTime("deephaven.receivetime.column.name", RECEIVE_TIME_COLUMN_NAME_DEFAULT, ColumnDefinition::ofTime, KafkaStreamPublisher.Parameters.Builder::setReceiveTimeColumnIndex),
        KeyBytes("deephaven.keybytes.column.name", KEY_BYTES_COLUMN_NAME_DEFAULT, ColumnDefinition::ofInt, KafkaStreamPublisher.Parameters.Builder::setKeyBytesColumnIndex),
        ValueBytes("deephaven.valuebytes.column.name", VALUE_BYTES_COLUMN_NAME_DEFAULT, ColumnDefinition::ofInt, KafkaStreamPublisher.Parameters.Builder::setValueBytesColumnIndex);

        private final String nameProperty;
        private final String nameDefault;
        private final Function<String, ColumnDefinition<?>> definitionFactory;
        private final SetColumnIndex setColumnIndex;

        private CommonColumn(@NotNull String nameProperty, @NotNull String nameDefault, Function<String, ColumnDefinition<?>> definitionFactory, SetColumnIndex setColumnIndex) {
            this.nameProperty = nameProperty;
            this.nameDefault = nameDefault;
            this.definitionFactory = definitionFactory;
            this.setColumnIndex = setColumnIndex;
        }

        private ColumnDefinition<?> getDefinition(@NotNull Properties consumerProperties) {
            String commonColumnName;
            ColumnDefinition<?> result = consumerProperties.containsKey(this.nameProperty) ? ((commonColumnName = consumerProperties.getProperty(this.nameProperty)) == null || commonColumnName.equals("") ? null : this.definitionFactory.apply(commonColumnName)) : (this.nameDefault == null ? null : this.definitionFactory.apply(this.nameDefault));
            return result;
        }
    }

    private static interface SetColumnIndex {
        public void setColumnIndex(KafkaStreamPublisher.Parameters.Builder var1, int var2);
    }

    public static class KeyOrValueIngestData {
        public Map<String, String> fieldPathToColumnName;
        public int simpleColumnIndex = -1;
        public Function<Object, Object> toObjectChunkMapper = Function.identity();
        public Object extra;
    }

    private static final class StreamPartitionedQueryTable
    extends QueryTable
    implements Runnable {
        private static final String PARTITION_COLUMN_NAME = "Partition";
        private static final String CONSTITUENT_COLUMN_NAME = "Table";
        private final TableDefinition constituentDefinition;
        private final WritableColumnSource<Integer> partitionColumn;
        private final WritableColumnSource<Table> constituentColumn;
        @ReferentialIntegrity
        private final UpdateSourceCombiner refreshCombiner;
        private volatile long lastAddedPartitionRowKey = -1L;

        private StreamPartitionedQueryTable(@NotNull TableDefinition constituentDefinition) {
            super((TrackingRowSet)RowSetFactory.empty().toTracking(), StreamPartitionedQueryTable.makeSources());
            this.setFlat();
            this.setRefreshing(true);
            this.constituentDefinition = constituentDefinition;
            this.partitionColumn = (WritableColumnSource)this.getColumnSource(PARTITION_COLUMN_NAME, Integer.TYPE);
            this.constituentColumn = (WritableColumnSource)this.getColumnSource(CONSTITUENT_COLUMN_NAME, Table.class);
            this.refreshCombiner = new UpdateSourceCombiner(this.getUpdateGraph());
            this.manage((LivenessReferent)this.refreshCombiner);
            this.refreshCombiner.addSource((Runnable)this);
            ConstituentDependency.install((Table)this, (NotificationQueue.Dependency)this.refreshCombiner);
            this.refreshCombiner.install();
        }

        public UpdateSourceRegistrar getRegistrar() {
            return this.refreshCombiner;
        }

        @Override
        public void run() {
            long newLastRowKey = this.lastAddedPartitionRowKey;
            TrackingWritableRowSet rowSet = this.getRowSet().writableCast();
            long oldLastRowKey = rowSet.lastRowKey();
            if (newLastRowKey != oldLastRowKey) {
                WritableRowSet added = RowSetFactory.fromRange((long)(oldLastRowKey + 1L), (long)newLastRowKey);
                rowSet.insert((RowSet)added);
                this.notifyListeners((TableUpdate)new TableUpdateImpl((RowSet)added, (RowSet)RowSetFactory.empty(), (RowSet)RowSetFactory.empty(), RowSetShiftData.EMPTY, ModifiedColumnSet.EMPTY));
            }
        }

        private synchronized void enqueueAdd(int partition, @NotNull Table partitionTable) {
            this.manage((LivenessReferent)partitionTable);
            long partitionRowKey = this.lastAddedPartitionRowKey + 1L;
            this.partitionColumn.ensureCapacity(partitionRowKey + 1L);
            this.partitionColumn.set(partitionRowKey, partition);
            this.constituentColumn.ensureCapacity(partitionRowKey + 1L);
            this.constituentColumn.set(partitionRowKey, (Object)partitionTable);
            this.lastAddedPartitionRowKey = partitionRowKey;
        }

        private PartitionedTable toPartitionedTable() {
            return new PartitionedTableImpl((Table)this, Set.of(PARTITION_COLUMN_NAME), true, CONSTITUENT_COLUMN_NAME, this.constituentDefinition, true, false);
        }

        private static Map<String, ColumnSource<?>> makeSources() {
            LinkedHashMap sources = new LinkedHashMap(2);
            sources.put(PARTITION_COLUMN_NAME, (ColumnSource<?>)ArrayBackedColumnSource.getMemoryColumnSource(Integer.TYPE, null));
            sources.put(CONSTITUENT_COLUMN_NAME, (ColumnSource<?>)ArrayBackedColumnSource.getMemoryColumnSource(Table.class, null));
            return sources;
        }
    }

    private static class BlinkTableOperation
    implements TableType.Visitor<Table> {
        private final Table blinkTable;

        public BlinkTableOperation(Table blinkTable) {
            this.blinkTable = Objects.requireNonNull(blinkTable);
        }

        @Override
        public Table visit(TableType.Blink blink) {
            return this.blinkTable;
        }

        @Override
        public Table visit(TableType.Append append) {
            return BlinkTableTools.blinkToAppendOnly((Table)this.blinkTable);
        }

        @Override
        public Table visit(TableType.Ring ring) {
            return RingTableTools.of((Table)this.blinkTable, (int)ring.capacity());
        }
    }

    private static class KafkaRecordConsumerFactoryCreator
    implements StreamConsumerRegistrarProvider.Visitor<Function<TopicPartition, KafkaRecordConsumer>> {
        private final KafkaStreamPublisher.Parameters publisherParameters;
        private final Supplier<KafkaIngester> ingesterSupplier;

        private KafkaRecordConsumerFactoryCreator(@NotNull KafkaStreamPublisher.Parameters publisherParameters, @NotNull Supplier<KafkaIngester> ingesterSupplier) {
            this.publisherParameters = publisherParameters;
            this.ingesterSupplier = ingesterSupplier;
        }

        @Override
        public Function<TopicPartition, KafkaRecordConsumer> visit(@NotNull StreamConsumerRegistrarProvider.Single single) {
            ConsumerRecordToStreamPublisherAdapter adapter = KafkaStreamPublisher.make(this.publisherParameters, () -> this.ingesterSupplier.get().shutdown());
            single.registrar().register(this.publisherParameters.getTableDefinition(), adapter);
            return tp -> new SimpleKafkaRecordConsumer(adapter);
        }

        @Override
        public Function<TopicPartition, KafkaRecordConsumer> visit(@NotNull StreamConsumerRegistrarProvider.PerPartition perPartition) {
            return tp -> {
                ConsumerRecordToStreamPublisherAdapter adapter = KafkaStreamPublisher.make(this.publisherParameters, () -> this.ingesterSupplier.get().shutdownPartition(tp.partition()));
                perPartition.registrar().register(this.publisherParameters.getTableDefinition(), (TopicPartition)tp, adapter);
                return new SimpleKafkaRecordConsumer(adapter);
            };
        }
    }

    public static interface StreamConsumerRegistrarProvider {
        public static Single single(@NotNull SingleConsumerRegistrar registrar) {
            return Single.of(registrar);
        }

        public static PerPartition perPartition(@NotNull PerPartitionConsumerRegistrar registrar) {
            return PerPartition.of(registrar);
        }

        public <T> T walk(Visitor<T> var1);

        @Value.Immutable
        @SimpleStyle
        public static abstract class PerPartition
        implements StreamConsumerRegistrarProvider {
            public static PerPartition of(@NotNull PerPartitionConsumerRegistrar registrar) {
                return ImmutablePerPartition.of(registrar);
            }

            @Value.Parameter
            public abstract PerPartitionConsumerRegistrar registrar();

            @Override
            public final <T> T walk(@NotNull Visitor<T> visitor) {
                return visitor.visit(this);
            }
        }

        @Value.Immutable
        @SimpleStyle
        public static abstract class Single
        implements StreamConsumerRegistrarProvider {
            public static Single of(@NotNull SingleConsumerRegistrar registrar) {
                return ImmutableSingle.of(registrar);
            }

            @Value.Parameter
            public abstract SingleConsumerRegistrar registrar();

            @Override
            public final <T> T walk(@NotNull Visitor<T> visitor) {
                return visitor.visit(this);
            }
        }

        public static interface Visitor<T> {
            public T visit(@NotNull Single var1);

            public T visit(@NotNull PerPartition var1);
        }
    }

    @FunctionalInterface
    public static interface PerPartitionConsumerRegistrar {
        public void register(@NotNull TableDefinition var1, @NotNull TopicPartition var2, @NotNull StreamPublisher var3);
    }

    @FunctionalInterface
    public static interface SingleConsumerRegistrar {
        public void register(@NotNull TableDefinition var1, @NotNull StreamPublisher var2);
    }

    public static interface TableType {
        public static Blink blink() {
            return Blink.of();
        }

        public static Append append() {
            return Append.of();
        }

        public static Ring ring(int capacity) {
            return Ring.of(capacity);
        }

        public <T> T walk(Visitor<T> var1);

        @Value.Immutable
        @SimpleStyle
        public static abstract class Ring
        implements TableType {
            public static Ring of(int capacity) {
                return ImmutableRing.of(capacity);
            }

            @Value.Parameter
            public abstract int capacity();

            @Override
            public final <T> T walk(Visitor<T> visitor) {
                return visitor.visit(this);
            }
        }

        @Value.Immutable
        @SingletonStyle
        public static abstract class Append
        implements TableType {
            public static Append of() {
                return ImmutableAppend.of();
            }

            @Override
            public final <T> T walk(Visitor<T> visitor) {
                return visitor.visit(this);
            }
        }

        @Value.Immutable
        @SingletonStyle
        public static abstract class Blink
        implements TableType {
            public static Blink of() {
                return ImmutableBlink.of();
            }

            @Override
            public final <T> T walk(Visitor<T> visitor) {
                return visitor.visit(this);
            }
        }

        public static interface Visitor<T> {
            public T visit(Blink var1);

            public T visit(Append var1);

            public T visit(Ring var1);
        }
    }

    public static class Produce {
        public static final KeyOrValueSpec IGNORE = new IgnoreImpl.IgnoreProduce();

        public static KeyOrValueSpec ignoreSpec() {
            return IGNORE;
        }

        static boolean isIgnore(KeyOrValueSpec keyOrValueSpec) {
            return keyOrValueSpec == IGNORE;
        }

        public static KeyOrValueSpec simpleSpec(String columnName) {
            return new SimpleImpl.SimpleProduce(columnName);
        }

        public static KeyOrValueSpec rawSpec(String columnName, Class<? extends Serializer<?>> serializer) {
            return new RawImpl.RawProduce(columnName, serializer);
        }

        public static KeyOrValueSpec jsonSpec(String[] includeColumns, Predicate<String> excludeColumns, Map<String, String> columnToFieldMapping, String nestedObjectDelimiter, boolean outputNulls, String timestampFieldName) {
            if (includeColumns != null && excludeColumns != null) {
                throw new IllegalArgumentException("Both includeColumns (=" + includeColumns + ") and excludeColumns (=" + excludeColumns + ") are not null, at least one of them should be null.");
            }
            return new JsonImpl.JsonProduce(includeColumns, excludeColumns, columnToFieldMapping, nestedObjectDelimiter, outputNulls, timestampFieldName);
        }

        public static KeyOrValueSpec jsonSpec(String[] includeColumns, Predicate<String> excludeColumns, Map<String, String> columnToFieldMapping) {
            return Produce.jsonSpec(includeColumns, excludeColumns, columnToFieldMapping, null, false, null);
        }

        public static KeyOrValueSpec avroSpec(Schema schema, Map<String, String> fieldToColumnMapping, String timestampFieldName, Predicate<String> includeOnlyColumns, Predicate<String> excludeColumns) {
            return new AvroImpl.AvroProduce(schema, null, null, fieldToColumnMapping, timestampFieldName, includeOnlyColumns, excludeColumns, false, null, null);
        }

        public static KeyOrValueSpec avroSpec(String schemaName, String schemaVersion, Map<String, String> fieldToColumnMapping, String timestampFieldName, Predicate<String> includeOnlyColumns, Predicate<String> excludeColumns, boolean publishSchema, String schemaNamespace, Properties columnProperties) {
            return new AvroImpl.AvroProduce(null, schemaName, schemaVersion, fieldToColumnMapping, timestampFieldName, includeOnlyColumns, excludeColumns, publishSchema, schemaNamespace, columnProperties);
        }

        public static abstract class KeyOrValueSpec
        implements SchemaProviderProvider {
            abstract Serializer<?> getSerializer(SchemaRegistryClient var1, TableDefinition var2);

            abstract String[] getColumnNames(@NotNull Table var1, SchemaRegistryClient var2);

            abstract KeyOrValueSerializer<?> getKeyOrValueSerializer(@NotNull Table var1, @NotNull String[] var2);
        }
    }

    public static class Consume {
        private static final KeyOrValueSpec FROM_PROPERTIES = new SimpleImpl.SimpleConsume(null, null);
        public static final KeyOrValueSpec IGNORE = new IgnoreImpl.IgnoreConsume();

        public static KeyOrValueSpec ignoreSpec() {
            return IGNORE;
        }

        private static boolean isIgnore(KeyOrValueSpec keyOrValueSpec) {
            return keyOrValueSpec == IGNORE;
        }

        public static KeyOrValueSpec jsonSpec(@NotNull ColumnDefinition<?>[] columnDefinitions, @Nullable Map<String, String> fieldToColumnName, @Nullable ObjectMapper objectMapper) {
            return new JsonImpl.JsonConsume(columnDefinitions, fieldToColumnName, objectMapper);
        }

        public static KeyOrValueSpec jsonSpec(@NotNull ColumnDefinition<?>[] columnDefinitions, @Nullable Map<String, String> fieldToColumnName) {
            return new JsonImpl.JsonConsume(columnDefinitions, fieldToColumnName, null);
        }

        public static KeyOrValueSpec jsonSpec(@NotNull ColumnDefinition<?>[] columnDefinitions, @Nullable ObjectMapper objectMapper) {
            return Consume.jsonSpec(columnDefinitions, null, objectMapper);
        }

        public static KeyOrValueSpec jsonSpec(@NotNull ColumnDefinition<?>[] columnDefinitions) {
            return Consume.jsonSpec(columnDefinitions, null, null);
        }

        public static KeyOrValueSpec avroSpec(Schema schema, Function<String, String> fieldNameToColumnName) {
            return new AvroImpl.AvroConsume(schema, fieldNameToColumnName);
        }

        public static KeyOrValueSpec avroSpec(Schema schema) {
            return new AvroImpl.AvroConsume(schema, DIRECT_MAPPING);
        }

        public static KeyOrValueSpec avroSpec(String schemaName, String schemaVersion, Function<String, String> fieldNameToColumnName) {
            return new AvroImpl.AvroConsume(schemaName, schemaVersion, fieldNameToColumnName);
        }

        public static KeyOrValueSpec avroSpec(String schemaName, String schemaVersion, Function<String, String> fieldNameToColumnName, boolean useUTF8Strings) {
            return new AvroImpl.AvroConsume(schemaName, schemaVersion, fieldNameToColumnName, useUTF8Strings);
        }

        public static KeyOrValueSpec avroSpec(String schemaName, Function<String, String> fieldNameToColumnName) {
            return new AvroImpl.AvroConsume(schemaName, KafkaTools.AVRO_LATEST_VERSION, fieldNameToColumnName);
        }

        public static KeyOrValueSpec avroSpec(String schemaName, String schemaVersion) {
            return new AvroImpl.AvroConsume(schemaName, schemaVersion, DIRECT_MAPPING);
        }

        public static KeyOrValueSpec avroSpec(String schemaName) {
            return new AvroImpl.AvroConsume(schemaName, KafkaTools.AVRO_LATEST_VERSION, DIRECT_MAPPING);
        }

        public static KeyOrValueSpec protobufSpec(ProtobufConsumeOptions options) {
            return new ProtobufImpl.ProtobufConsumeImpl(options);
        }

        public static KeyOrValueSpec simpleSpec(String columnName, Class<?> dataType) {
            return new SimpleImpl.SimpleConsume(columnName, dataType);
        }

        public static KeyOrValueSpec simpleSpec(String columnName) {
            return new SimpleImpl.SimpleConsume(columnName, null);
        }

        public static KeyOrValueSpec rawSpec(ColumnHeader<?> header, Class<? extends Deserializer<?>> deserializer) {
            return new RawImpl.RawConsume(ColumnDefinition.from(header), deserializer);
        }

        public static <T> KeyOrValueSpec objectProcessorSpec(Deserializer<? extends T> deserializer, ObjectProcessor<? super T> processor, List<String> columnNames) {
            return Consume.objectProcessorSpec(deserializer, NamedObjectProcessor.of(processor, columnNames));
        }

        public static KeyOrValueSpec objectProcessorSpec(ObjectProcessor<? super byte[]> processor, List<String> columnNames) {
            return Consume.objectProcessorSpec((NamedObjectProcessor<? super byte[]>)NamedObjectProcessor.of(processor, columnNames));
        }

        public static <T> KeyOrValueSpec objectProcessorSpec(Deserializer<? extends T> deserializer, NamedObjectProcessor<? super T> processor) {
            return new KeyOrValueSpecObjectProcessorImpl<T>(deserializer, processor);
        }

        public static KeyOrValueSpec objectProcessorSpec(NamedObjectProcessor<? super byte[]> processor) {
            return Consume.objectProcessorSpec(new ByteArrayDeserializer(), processor);
        }

        public static KeyOrValueSpec objectProcessorSpec(NamedObjectProcessor.Provider provider) {
            return Consume.objectProcessorSpec((NamedObjectProcessor<? super byte[]>)provider.named((Type)Type.byteType().arrayType()));
        }

        public static abstract class KeyOrValueSpec
        implements SchemaProviderProvider {
            protected abstract Deserializer<?> getDeserializer(KeyOrValue var1, SchemaRegistryClient var2, Map<String, ?> var3);

            protected abstract KeyOrValueIngestData getIngestData(KeyOrValue var1, SchemaRegistryClient var2, Map<String, ?> var3, MutableInt var4, List<ColumnDefinition<?>> var5);

            protected abstract KeyOrValueProcessor getProcessor(TableDefinition var1, KeyOrValueIngestData var2);
        }
    }

    public static interface ConsumerLoopCallback {
        public void beforePoll(KafkaConsumer<?, ?> var1);

        public void afterPoll(KafkaConsumer<?, ?> var1, boolean var2);
    }

    @FunctionalInterface
    public static interface InitialOffsetLookup {
        public static InitialOffsetLookup adapt(IntToLongFunction intToLongFunction) {
            return new IntToLongLookupAdapter(intToLongFunction);
        }

        public long getInitialOffset(KafkaConsumer<?, ?> var1, TopicPartition var2);
    }

    private static interface SchemaProviderProvider {
        public Optional<SchemaProvider> getSchemaProvider();
    }

    public static enum KeyOrValue {
        KEY,
        VALUE;

    }
}

