/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.connectors.flink.util;

import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.Serializer;
import io.pravega.connectors.flink.EventTimeOrderingOperator;
import io.pravega.connectors.flink.FlinkPravegaWriter;
import io.pravega.connectors.flink.serialization.WrappingSerializer;
import io.pravega.connectors.flink.util.PravegaEventRouterKeySelector;
import io.pravega.shaded.org.apache.commons.lang3.RandomStringUtils;
import io.pravega.shaded.org.apache.commons.lang3.StringUtils;
import java.nio.ByteBuffer;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.util.Preconditions;

public class FlinkPravegaUtils {
    private FlinkPravegaUtils() {
    }

    public static <T> DataStreamSink<T> writeToPravegaInEventTimeOrder(DataStream<T> stream, FlinkPravegaWriter<T> writer, int parallelism) {
        Preconditions.checkNotNull(writer.getEventRouter(), (String)"Event router should not be null");
        return stream.keyBy(new PravegaEventRouterKeySelector<T>(writer.getEventRouter())).transform("reorder", stream.getType(), new EventTimeOrderingOperator()).setParallelism(parallelism).forward().addSink(writer).setParallelism(parallelism);
    }

    public static String getReaderName(String taskName, int index, int total) {
        String shortTaskName = "";
        if (taskName.length() >= 200) {
            shortTaskName = taskName.substring(0, 200);
        }
        String readerName = "flink-task-" + shortTaskName + "-" + index + "-" + total;
        readerName = StringUtils.removePattern(readerName, "[^\\p{Alnum}\\.\\-]");
        return readerName;
    }

    public static String generateRandomReaderGroupName() {
        return "flink" + RandomStringUtils.randomAlphanumeric(20).toLowerCase();
    }

    public static <T> EventStreamReader<T> createPravegaReader(String readerId, String readerGroupName, DeserializationSchema<T> deserializationSchema, ReaderConfig readerConfig, EventStreamClientFactory eventStreamClientFactory) {
        Serializer<T> deserializer = deserializationSchema instanceof WrappingSerializer ? ((WrappingSerializer)deserializationSchema).getWrappedSerializer() : new FlinkDeserializer<T>(deserializationSchema);
        return eventStreamClientFactory.createReader(readerId, readerGroupName, deserializer, readerConfig);
    }

    public static final class FlinkDeserializer<T>
    implements Serializer<T> {
        private final DeserializationSchema<T> deserializationSchema;

        public FlinkDeserializer(DeserializationSchema<T> deserializationSchema) {
            this.deserializationSchema = deserializationSchema;
        }

        @Override
        public ByteBuffer serialize(T value) {
            throw new IllegalStateException("serialize() called within a deserializer");
        }

        @Override
        public T deserialize(ByteBuffer buffer) {
            byte[] array;
            if (buffer.hasArray() && buffer.arrayOffset() == 0 && buffer.position() == 0 && buffer.limit() == buffer.capacity()) {
                array = buffer.array();
            } else {
                array = new byte[buffer.remaining()];
                buffer.get(array);
            }
            return (T)this.deserializationSchema.deserialize(array);
        }
    }
}

