/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.kafka.source;

import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.ConsumerMetadata;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitState;
import org.apache.seatunnel.format.compatible.kafka.connect.json.CompatibleKafkaConnectDeserializationSchema;
import org.apache.seatunnel.format.compatible.kafka.connect.json.NativeKafkaConnectDeserializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaRecordEmitter
implements RecordEmitter<ConsumerRecord<byte[], byte[]>, SeaTunnelRow, KafkaSourceSplitState> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaRecordEmitter.class);
    private final Map<TablePath, ConsumerMetadata> mapMetadata;
    private final OutputCollector<SeaTunnelRow> outputCollector;
    private final MessageFormatErrorHandleWay messageFormatErrorHandleWay;

    public KafkaRecordEmitter(Map<TablePath, ConsumerMetadata> mapMetadata, MessageFormatErrorHandleWay messageFormatErrorHandleWay) {
        this.mapMetadata = mapMetadata;
        this.messageFormatErrorHandleWay = messageFormatErrorHandleWay;
        this.outputCollector = new OutputCollector();
    }

    @Override
    public void emitRecord(ConsumerRecord<byte[], byte[]> consumerRecord, Collector<SeaTunnelRow> collector, KafkaSourceSplitState splitState) throws Exception {
        ((OutputCollector)this.outputCollector).output = collector;
        DeserializationSchema<SeaTunnelRow> deserializationSchema = this.mapMetadata.get(splitState.getTablePath()).getDeserializationSchema();
        try {
            if (deserializationSchema instanceof CompatibleKafkaConnectDeserializationSchema) {
                ((CompatibleKafkaConnectDeserializationSchema)deserializationSchema).deserialize(consumerRecord, this.outputCollector);
            } else if (deserializationSchema instanceof NativeKafkaConnectDeserializationSchema) {
                ((NativeKafkaConnectDeserializationSchema)deserializationSchema).deserialize(consumerRecord, this.outputCollector);
            } else {
                deserializationSchema.deserialize(consumerRecord.value(), this.outputCollector);
            }
        }
        catch (Exception e) {
            if (this.messageFormatErrorHandleWay == MessageFormatErrorHandleWay.SKIP) {
                logger.warn("Deserialize message failed, skip this message, message: {}", (Object)new String(consumerRecord.value()));
            }
            throw e;
        }
        splitState.setCurrentOffset(consumerRecord.offset() + 1L);
    }

    private static class OutputCollector<T>
    implements Collector<T> {
        private Collector<T> output;

        private OutputCollector() {
        }

        public void collect(T record) {
            this.output.collect(record);
        }

        public void collect(SchemaChangeEvent event) {
            this.output.collect(event);
        }

        public void markSchemaChangeBeforeCheckpoint() {
            this.output.markSchemaChangeBeforeCheckpoint();
        }

        public void markSchemaChangeAfterCheckpoint() {
            this.output.markSchemaChangeAfterCheckpoint();
        }

        public Object getCheckpointLock() {
            return this.output.getCheckpointLock();
        }
    }
}

