/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.kafka;

import com.facebook.presto.kafka.KafkaErrorCode;
import com.facebook.presto.kafka.KafkaPageSink;
import com.facebook.presto.kafka.KafkaTableHandle;
import com.facebook.presto.kafka.PlainTextKafkaProducerFactory;
import com.facebook.presto.kafka.encoder.DispatchingRowEncoderFactory;
import com.facebook.presto.kafka.encoder.EncoderColumnHandle;
import com.facebook.presto.kafka.encoder.RowEncoder;
import com.facebook.presto.kafka.server.KafkaClusterMetadataSupplier;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorPageSink;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.PageSinkContext;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.connector.ConnectorPageSinkProvider;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import javax.inject.Inject;

public class KafkaPageSinkProvider
implements ConnectorPageSinkProvider {
    private final DispatchingRowEncoderFactory encoderFactory;
    private final PlainTextKafkaProducerFactory producerFactory;
    private final KafkaClusterMetadataSupplier kafkaClusterMetadataSupplier;

    @Inject
    public KafkaPageSinkProvider(DispatchingRowEncoderFactory encoderFactory, PlainTextKafkaProducerFactory producerFactory, KafkaClusterMetadataSupplier kafkaClusterMetadataSupplier) {
        this.encoderFactory = Objects.requireNonNull(encoderFactory, "encoderFactory is null");
        this.producerFactory = Objects.requireNonNull(producerFactory, "producerFactory is null");
        this.kafkaClusterMetadataSupplier = Objects.requireNonNull(kafkaClusterMetadataSupplier, "kafkaClusterMetadataSupplier is null");
    }

    public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorOutputTableHandle outputTableHandle, PageSinkContext pageSinkContext) {
        throw new UnsupportedOperationException("Table creation is not supported by the kafka connector");
    }

    public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle insertTableHandle, PageSinkContext pageSinkContext) {
        Objects.requireNonNull(insertTableHandle, "tableHandle is null");
        KafkaTableHandle handle = (KafkaTableHandle)insertTableHandle;
        ImmutableList.Builder keyColumns = ImmutableList.builder();
        ImmutableList.Builder messageColumns = ImmutableList.builder();
        handle.getColumns().forEach(col -> {
            if (col.isInternal()) {
                throw new IllegalArgumentException(String.format("unexpected internal column '%s'", col.getName()));
            }
            if (col.isKeyCodec()) {
                keyColumns.add(col);
            } else {
                messageColumns.add(col);
            }
        });
        RowEncoder keyEncoder = this.encoderFactory.create(session, handle.getKeyDataFormat(), this.getDataSchema(handle.getKeyDataSchemaLocation()), (List<EncoderColumnHandle>)keyColumns.build());
        RowEncoder messageEncoder = this.encoderFactory.create(session, handle.getMessageDataFormat(), this.getDataSchema(handle.getMessageDataSchemaLocation()), (List<EncoderColumnHandle>)messageColumns.build());
        return new KafkaPageSink(handle.getSchemaName(), handle.getTopicName(), handle.getColumns(), keyEncoder, messageEncoder, this.producerFactory, this.kafkaClusterMetadataSupplier);
    }

    private Optional<String> getDataSchema(Optional<String> dataSchemaLocation) {
        return dataSchemaLocation.map(location -> {
            try {
                return new String(Files.readAllBytes(Paths.get(location, new String[0])));
            }
            catch (IOException e) {
                throw new PrestoException((ErrorCodeSupplier)KafkaErrorCode.KAFKA_SCHEMA_ERROR, String.format("Unable to read data schema at '%s'", dataSchemaLocation.get()), (Throwable)e);
            }
        });
    }
}

