/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source.enumerator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorPosition;
import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorPositionSerializer;
import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorState;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

@Internal
public class IcebergEnumeratorStateSerializer
implements SimpleVersionedSerializer<IcebergEnumeratorState> {
    private static final int VERSION = 2;
    private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE = ThreadLocal.withInitial(() -> new DataOutputSerializer(1024));
    private final IcebergEnumeratorPositionSerializer positionSerializer = IcebergEnumeratorPositionSerializer.INSTANCE;
    private final IcebergSourceSplitSerializer splitSerializer;

    public IcebergEnumeratorStateSerializer(boolean caseSensitive) {
        this.splitSerializer = new IcebergSourceSplitSerializer(caseSensitive);
    }

    public int getVersion() {
        return 2;
    }

    public byte[] serialize(IcebergEnumeratorState enumState) throws IOException {
        return this.serializeV2(enumState);
    }

    public IcebergEnumeratorState deserialize(int version, byte[] serialized) throws IOException {
        switch (version) {
            case 1: {
                return this.deserializeV1(serialized);
            }
            case 2: {
                return this.deserializeV2(serialized);
            }
        }
        throw new IOException("Unknown version: " + version);
    }

    @VisibleForTesting
    byte[] serializeV1(IcebergEnumeratorState enumState) throws IOException {
        DataOutputSerializer out = SERIALIZER_CACHE.get();
        IcebergEnumeratorStateSerializer.serializeEnumeratorPosition(out, enumState.lastEnumeratedPosition(), this.positionSerializer);
        IcebergEnumeratorStateSerializer.serializePendingSplits(out, enumState.pendingSplits(), this.splitSerializer);
        byte[] result = out.getCopyOfBuffer();
        out.clear();
        return result;
    }

    @VisibleForTesting
    IcebergEnumeratorState deserializeV1(byte[] serialized) throws IOException {
        DataInputDeserializer in = new DataInputDeserializer(serialized);
        IcebergEnumeratorPosition enumeratorPosition = IcebergEnumeratorStateSerializer.deserializeEnumeratorPosition(in, this.positionSerializer);
        Collection<IcebergSourceSplitState> pendingSplits = IcebergEnumeratorStateSerializer.deserializePendingSplits(in, this.splitSerializer);
        return new IcebergEnumeratorState(enumeratorPosition, pendingSplits);
    }

    @VisibleForTesting
    byte[] serializeV2(IcebergEnumeratorState enumState) throws IOException {
        DataOutputSerializer out = SERIALIZER_CACHE.get();
        IcebergEnumeratorStateSerializer.serializeEnumeratorPosition(out, enumState.lastEnumeratedPosition(), this.positionSerializer);
        IcebergEnumeratorStateSerializer.serializePendingSplits(out, enumState.pendingSplits(), this.splitSerializer);
        IcebergEnumeratorStateSerializer.serializeEnumerationSplitCountHistory(out, enumState.enumerationSplitCountHistory());
        byte[] result = out.getCopyOfBuffer();
        out.clear();
        return result;
    }

    @VisibleForTesting
    IcebergEnumeratorState deserializeV2(byte[] serialized) throws IOException {
        DataInputDeserializer in = new DataInputDeserializer(serialized);
        IcebergEnumeratorPosition enumeratorPosition = IcebergEnumeratorStateSerializer.deserializeEnumeratorPosition(in, this.positionSerializer);
        Collection<IcebergSourceSplitState> pendingSplits = IcebergEnumeratorStateSerializer.deserializePendingSplits(in, this.splitSerializer);
        int[] enumerationSplitCountHistory = IcebergEnumeratorStateSerializer.deserializeEnumerationSplitCountHistory(in);
        return new IcebergEnumeratorState(enumeratorPosition, pendingSplits, enumerationSplitCountHistory);
    }

    private static void serializeEnumeratorPosition(DataOutputSerializer out, IcebergEnumeratorPosition enumeratorPosition, IcebergEnumeratorPositionSerializer positionSerializer) throws IOException {
        out.writeBoolean(enumeratorPosition != null);
        if (enumeratorPosition != null) {
            out.writeInt(positionSerializer.getVersion());
            byte[] positionBytes = positionSerializer.serialize(enumeratorPosition);
            out.writeInt(positionBytes.length);
            out.write(positionBytes);
        }
    }

    private static IcebergEnumeratorPosition deserializeEnumeratorPosition(DataInputDeserializer in, IcebergEnumeratorPositionSerializer positionSerializer) throws IOException {
        IcebergEnumeratorPosition enumeratorPosition = null;
        if (in.readBoolean()) {
            int version = in.readInt();
            byte[] positionBytes = new byte[in.readInt()];
            in.read(positionBytes);
            enumeratorPosition = positionSerializer.deserialize(version, positionBytes);
        }
        return enumeratorPosition;
    }

    private static void serializePendingSplits(DataOutputSerializer out, Collection<IcebergSourceSplitState> pendingSplits, IcebergSourceSplitSerializer splitSerializer) throws IOException {
        out.writeInt(splitSerializer.getVersion());
        out.writeInt(pendingSplits.size());
        for (IcebergSourceSplitState splitState : pendingSplits) {
            byte[] splitBytes = splitSerializer.serialize(splitState.split());
            out.writeInt(splitBytes.length);
            out.write(splitBytes);
            out.writeUTF(splitState.status().name());
        }
    }

    private static Collection<IcebergSourceSplitState> deserializePendingSplits(DataInputDeserializer in, IcebergSourceSplitSerializer splitSerializer) throws IOException {
        int splitSerializerVersion = in.readInt();
        int splitCount = in.readInt();
        ArrayList<IcebergSourceSplitState> pendingSplits = Lists.newArrayListWithCapacity(splitCount);
        for (int i = 0; i < splitCount; ++i) {
            byte[] splitBytes = new byte[in.readInt()];
            in.read(splitBytes);
            IcebergSourceSplit split = splitSerializer.deserialize(splitSerializerVersion, splitBytes);
            String statusName = in.readUTF();
            pendingSplits.add(new IcebergSourceSplitState(split, IcebergSourceSplitStatus.valueOf(statusName)));
        }
        return pendingSplits;
    }

    private static void serializeEnumerationSplitCountHistory(DataOutputSerializer out, int[] enumerationSplitCountHistory) throws IOException {
        out.writeInt(enumerationSplitCountHistory.length);
        if (enumerationSplitCountHistory.length > 0) {
            for (int enumerationSplitCount : enumerationSplitCountHistory) {
                out.writeInt(enumerationSplitCount);
            }
        }
    }

    private static int[] deserializeEnumerationSplitCountHistory(DataInputDeserializer in) throws IOException {
        int historySize = in.readInt();
        int[] history = new int[historySize];
        if (historySize > 0) {
            for (int i = 0; i < historySize; ++i) {
                history[i] = in.readInt();
            }
        }
        return history;
    }
}

