/*
 * Decompiled with CFR 0.152.
 */
package com.datasqrl.flinkrunner.connector.kafka;

import com.datasqrl.flinkrunner.connector.kafka.DeserFailureHandlerOptions;
import com.datasqrl.flinkrunner.connector.kafka.DeserFailureHandlerType;
import com.datasqrl.flinkrunner.connector.kafka.DeserFailureProducer;
import java.io.IOException;
import java.io.Serializable;
import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeserFailureHandler
implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(DeserFailureHandler.class);
    private static final long serialVersionUID = 1L;
    private final DeserFailureHandlerType handlerType;
    @Nullable
    private final DeserFailureProducer producer;

    DeserFailureHandler(DeserFailureHandlerType handlerType, @Nullable DeserFailureProducer producer) {
        this.handlerType = handlerType;
        this.producer = producer;
    }

    public static DeserFailureHandler of(ReadableConfig tableOptions, Properties consumerProps) {
        DeserFailureHandlerType handlerType = (DeserFailureHandlerType)((Object)tableOptions.get(DeserFailureHandlerOptions.SCAN_DESER_FAILURE_HANDLER));
        DeserFailureProducer producer = handlerType == DeserFailureHandlerType.KAFKA ? new DeserFailureProducer((String)tableOptions.get(DeserFailureHandlerOptions.SCAN_DESER_FAILURE_TOPIC), consumerProps) : null;
        return new DeserFailureHandler(handlerType, producer);
    }

    public void deserWithFailureHandling(ConsumerRecord<byte[], byte[]> record, DeserializationCaller deser) throws Exception {
        try {
            deser.call();
        }
        catch (IOException e) {
            if (DeserFailureHandlerType.NONE == this.handlerType) {
                throw e;
            }
            if (DeserFailureHandlerType.LOG == this.handlerType) {
                LOG.warn("Deserialization failure occurred. Topic: {}, Partition: {}, Offset: {}", new Object[]{record.topic(), record.partition(), record.offset()});
            } else if (DeserFailureHandlerType.KAFKA == this.handlerType) {
                LOG.warn(String.format("Deserialization failure occurred, sending the record to the configured topic (%s). Topic: %s, Partition: %d, Offset: %s", this.producer.getTopic(), record.topic(), record.partition(), record.offset()));
                this.producer.send(record);
            }
            LOG.debug("Failure cause", (Throwable)e);
            LOG.trace("Failed record: {}", record);
        }
    }

    public static interface DeserializationCaller
    extends Serializable {
        public void call() throws Exception;
    }
}

