/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.cdc.base.source.reader;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
import org.apache.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IncrementalSourceRecordEmitter<T>
implements RecordEmitter<SourceRecords, T, SourceSplitStateBase> {
    private static final Logger log = LoggerFactory.getLogger(IncrementalSourceRecordEmitter.class);
    protected final DebeziumDeserializationSchema<T> debeziumDeserializationSchema;
    protected final OutputCollector<T> outputCollector;
    protected final OffsetFactory offsetFactory;

    public IncrementalSourceRecordEmitter(DebeziumDeserializationSchema<T> debeziumDeserializationSchema, OffsetFactory offsetFactory) {
        this.debeziumDeserializationSchema = debeziumDeserializationSchema;
        this.outputCollector = new OutputCollector();
        this.offsetFactory = offsetFactory;
    }

    @Override
    public void emitRecord(SourceRecords sourceRecords, Collector<T> collector, SourceSplitStateBase splitState) throws Exception {
        Iterator<SourceRecord> elementIterator = sourceRecords.iterator();
        while (elementIterator.hasNext()) {
            this.processElement(elementIterator.next(), collector, splitState);
        }
    }

    protected void processElement(SourceRecord element, Collector<T> output, SourceSplitStateBase splitState) throws Exception {
        if (WatermarkEvent.isWatermarkEvent(element)) {
            Offset watermark = this.getWatermark(element);
            if (WatermarkEvent.isLowWatermarkEvent(element) && splitState.isSnapshotSplitState()) {
                splitState.asSnapshotSplitState().setHighWatermark(watermark);
            }
        } else if (!SourceRecordUtils.isSchemaChangeEvent(element) || !splitState.isIncrementalSplitState()) {
            if (SourceRecordUtils.isDataChangeRecord(element)) {
                if (splitState.isIncrementalSplitState()) {
                    Offset position = this.getOffsetPosition(element);
                    splitState.asIncrementalSplitState().setStartupOffset(position);
                }
                this.emitElement(element, output);
            } else {
                log.info("Meet unknown element {}, just skip.", (Object)element);
            }
        }
    }

    private Offset getWatermark(SourceRecord watermarkEvent) {
        return this.getOffsetPosition(watermarkEvent.sourceOffset());
    }

    public Offset getOffsetPosition(SourceRecord dataRecord) {
        return this.getOffsetPosition(dataRecord.sourceOffset());
    }

    public Offset getOffsetPosition(Map<String, ?> offset) {
        HashMap<String, String> offsetStrMap = new HashMap<String, String>();
        for (Map.Entry<String, ?> entry : offset.entrySet()) {
            offsetStrMap.put(entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString());
        }
        return this.offsetFactory.specific(offsetStrMap);
    }

    protected void emitElement(SourceRecord element, Collector<T> output) throws Exception {
        ((OutputCollector)this.outputCollector).output = output;
        this.debeziumDeserializationSchema.deserialize(element, this.outputCollector);
    }

    private static class OutputCollector<T>
    implements Collector<T> {
        private Collector<T> output;

        private OutputCollector() {
        }

        public void collect(T record) {
            this.output.collect(record);
        }

        public Object getCheckpointLock() {
            return null;
        }
    }
}

