/*
 * Decompiled with CFR 0.152.
 */
package com.scylladb.cdc.debezium.connector;

import com.scylladb.cdc.debezium.connector.ScyllaCollectionSchema;
import com.scylladb.cdc.debezium.connector.ScyllaConnectorConfig;
import com.scylladb.cdc.debezium.connector.ScyllaPartition;
import com.scylladb.cdc.debezium.connector.ScyllaSchema;
import com.scylladb.cdc.model.worker.ChangeSchema;
import com.scylladb.cdc.model.worker.RawChange;
import com.scylladb.cdc.model.worker.cql.Cell;
import com.scylladb.cdc.model.worker.cql.CqlDate;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.data.Envelope;
import io.debezium.pipeline.AbstractChangeRecordEmitter;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.schema.DataCollectionSchema;
import io.debezium.util.Clock;
import java.time.Instant;
import java.util.Calendar;
import java.util.Date;
import java.util.TimeZone;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;

public class ScyllaChangeRecordEmitter
extends AbstractChangeRecordEmitter<ScyllaPartition, ScyllaCollectionSchema> {
    private final RawChange change;
    private final ScyllaSchema schema;
    private final RawChange preImage;

    public ScyllaChangeRecordEmitter(ScyllaPartition partition, RawChange preImage, RawChange change, OffsetContext offsetContext, ScyllaSchema schema, Clock clock, ScyllaConnectorConfig connectorConfig) {
        super((Partition)partition, offsetContext, clock, (CommonConnectorConfig)connectorConfig);
        this.change = change;
        this.schema = schema;
        this.preImage = preImage;
    }

    public RawChange getChange() {
        return this.change;
    }

    public ScyllaSchema getSchema() {
        return this.schema;
    }

    public Envelope.Operation getOperation() {
        RawChange.OperationType operationType = this.change.getOperationType();
        switch (operationType) {
            case ROW_UPDATE: {
                return Envelope.Operation.UPDATE;
            }
            case ROW_INSERT: {
                return Envelope.Operation.CREATE;
            }
            case PARTITION_DELETE: 
            case ROW_DELETE: {
                return Envelope.Operation.DELETE;
            }
        }
        throw new RuntimeException(String.format("Unsupported operation type: %s.", operationType));
    }

    protected void emitReadRecord(ChangeRecordEmitter.Receiver receiver, ScyllaCollectionSchema scyllaCollectionSchema) throws InterruptedException {
        throw new UnsupportedOperationException();
    }

    protected void emitCreateRecord(ChangeRecordEmitter.Receiver<ScyllaPartition> receiver, ScyllaCollectionSchema scyllaCollectionSchema) throws InterruptedException {
        Struct envelope;
        scyllaCollectionSchema = this.schema.updateChangeSchema(scyllaCollectionSchema.id(), this.change.getSchema());
        Struct keyStruct = new Struct(scyllaCollectionSchema.keySchema());
        Struct afterStruct = new Struct(scyllaCollectionSchema.afterSchema());
        this.fillStructWithChange(scyllaCollectionSchema, keyStruct, afterStruct, this.change);
        if (this.preImage != null) {
            Struct beforeStruct = new Struct(scyllaCollectionSchema.beforeSchema());
            this.fillStructWithChange(scyllaCollectionSchema, null, beforeStruct, this.preImage);
            envelope = this.generalizedEnvelope(scyllaCollectionSchema.getEnvelopeSchema().schema(), beforeStruct, afterStruct, this.getOffset().getSourceInfo(), this.getClock().currentTimeAsInstant(), Envelope.Operation.CREATE);
        } else {
            envelope = scyllaCollectionSchema.getEnvelopeSchema().create((Object)afterStruct, this.getOffset().getSourceInfo(), this.getClock().currentTimeAsInstant());
        }
        receiver.changeRecord((Partition)((ScyllaPartition)this.getPartition()), (DataCollectionSchema)scyllaCollectionSchema, this.getOperation(), (Object)keyStruct, envelope, this.getOffset(), null);
    }

    protected void emitUpdateRecord(ChangeRecordEmitter.Receiver<ScyllaPartition> receiver, ScyllaCollectionSchema scyllaCollectionSchema) throws InterruptedException {
        Struct envelope;
        scyllaCollectionSchema = this.schema.updateChangeSchema(scyllaCollectionSchema.id(), this.change.getSchema());
        Struct keyStruct = new Struct(scyllaCollectionSchema.keySchema());
        Struct afterStruct = new Struct(scyllaCollectionSchema.afterSchema());
        this.fillStructWithChange(scyllaCollectionSchema, keyStruct, afterStruct, this.change);
        if (this.preImage != null) {
            Struct beforeStruct = new Struct(scyllaCollectionSchema.beforeSchema());
            this.fillStructWithChange(scyllaCollectionSchema, null, beforeStruct, this.preImage);
            envelope = this.generalizedEnvelope(scyllaCollectionSchema.getEnvelopeSchema().schema(), beforeStruct, afterStruct, this.getOffset().getSourceInfo(), this.getClock().currentTimeAsInstant(), Envelope.Operation.UPDATE);
        } else {
            envelope = scyllaCollectionSchema.getEnvelopeSchema().update(null, afterStruct, this.getOffset().getSourceInfo(), this.getClock().currentTimeAsInstant());
        }
        receiver.changeRecord((Partition)((ScyllaPartition)this.getPartition()), (DataCollectionSchema)scyllaCollectionSchema, this.getOperation(), (Object)keyStruct, envelope, this.getOffset(), null);
    }

    protected void emitDeleteRecord(ChangeRecordEmitter.Receiver<ScyllaPartition> receiver, ScyllaCollectionSchema scyllaCollectionSchema) throws InterruptedException {
        scyllaCollectionSchema = this.schema.updateChangeSchema(scyllaCollectionSchema.id(), this.change.getSchema());
        Struct keyStruct = new Struct(scyllaCollectionSchema.keySchema());
        Struct beforeStruct = new Struct(scyllaCollectionSchema.beforeSchema());
        if (this.preImage != null) {
            this.fillStructWithChange(scyllaCollectionSchema, keyStruct, beforeStruct, this.preImage);
        } else {
            this.fillStructWithChange(scyllaCollectionSchema, keyStruct, beforeStruct, this.change);
        }
        Struct envelope = scyllaCollectionSchema.getEnvelopeSchema().delete((Object)beforeStruct, this.getOffset().getSourceInfo(), this.getClock().currentTimeAsInstant());
        receiver.changeRecord((Partition)((ScyllaPartition)this.getPartition()), (DataCollectionSchema)scyllaCollectionSchema, this.getOperation(), (Object)keyStruct, envelope, this.getOffset(), null);
    }

    private void fillStructWithChange(ScyllaCollectionSchema schema, Struct keyStruct, Struct valueStruct, RawChange change) {
        for (ChangeSchema.ColumnDefinition cdef : change.getSchema().getNonCdcColumnDefinitions()) {
            if (!ScyllaSchema.isSupportedColumnSchema(cdef)) continue;
            Object value = this.translateCellToKafka(change.getCell(cdef.getColumnName()));
            if (cdef.getBaseTableColumnType() == ChangeSchema.ColumnType.PARTITION_KEY || cdef.getBaseTableColumnType() == ChangeSchema.ColumnType.CLUSTERING_KEY) {
                valueStruct.put(cdef.getColumnName(), value);
                if (keyStruct == null) continue;
                keyStruct.put(cdef.getColumnName(), value);
                continue;
            }
            Boolean isDeleted = this.change.getCell("cdc$deleted_" + cdef.getColumnName()).getBoolean();
            if (value == null && (isDeleted == null || !isDeleted.booleanValue())) continue;
            Struct cell = new Struct(schema.cellSchema(cdef.getColumnName()));
            cell.put("value", value);
            valueStruct.put(cdef.getColumnName(), (Object)cell);
        }
    }

    private Struct generalizedEnvelope(Schema schema, Object before, Object after, Struct source, Instant timestamp, Envelope.Operation operationType) {
        Struct struct = new Struct(schema);
        struct.put("op", (Object)operationType.code());
        if (before != null) {
            struct.put("before", before);
        }
        if (after != null) {
            struct.put("after", after);
        }
        if (source != null) {
            struct.put("source", (Object)source);
        }
        if (timestamp != null) {
            struct.put("ts_ms", (Object)timestamp.toEpochMilli());
        }
        return struct;
    }

    private Object translateCellToKafka(Cell cell) {
        ChangeSchema.DataType dataType = cell.getColumnDefinition().getCdcLogDataType();
        if (cell.getAsObject() == null) {
            return null;
        }
        if (dataType.getCqlType() == ChangeSchema.CqlType.DECIMAL) {
            return cell.getDecimal().toString();
        }
        if (dataType.getCqlType() == ChangeSchema.CqlType.UUID) {
            return cell.getUUID().toString();
        }
        if (dataType.getCqlType() == ChangeSchema.CqlType.TIMEUUID) {
            return cell.getUUID().toString();
        }
        if (dataType.getCqlType() == ChangeSchema.CqlType.VARINT) {
            return cell.getVarint().toString();
        }
        if (dataType.getCqlType() == ChangeSchema.CqlType.INET) {
            return cell.getInet().getHostAddress();
        }
        if (dataType.getCqlType() == ChangeSchema.CqlType.DATE) {
            CqlDate cqlDate = cell.getDate();
            Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
            calendar.clear();
            calendar.set(cqlDate.getYear(), cqlDate.getMonth() - 1, cqlDate.getDay());
            return Date.from(calendar.toInstant());
        }
        if (dataType.getCqlType() == ChangeSchema.CqlType.DURATION) {
            return cell.getDuration().toString();
        }
        return cell.getAsObject();
    }
}

