/*
 * Decompiled with CFR 0.152.
 */
package io.trino.plugin.accumulo.io;

import com.google.common.collect.ImmutableList;
import io.airlift.concurrent.MoreFutures;
import io.airlift.slice.Slice;
import io.trino.plugin.accumulo.AccumuloErrorCode;
import io.trino.plugin.accumulo.Types;
import io.trino.plugin.accumulo.index.Indexer;
import io.trino.plugin.accumulo.metadata.AccumuloTable;
import io.trino.plugin.accumulo.model.AccumuloColumnHandle;
import io.trino.plugin.accumulo.model.Field;
import io.trino.plugin.accumulo.model.Row;
import io.trino.plugin.accumulo.serializers.AccumuloRowSerializer;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.Page;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.connector.ConnectorPageSink;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.BooleanType;
import io.trino.spi.type.DateType;
import io.trino.spi.type.DoubleType;
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.RealType;
import io.trino.spi.type.SmallintType;
import io.trino.spi.type.TimeType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TinyintType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeUtils;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.hadoop.io.Text;

public class AccumuloPageSink
implements ConnectorPageSink {
    public static final Text ROW_ID_COLUMN = new Text("___ROW___");
    private final AccumuloRowSerializer serializer;
    private final BatchWriter writer;
    private final Optional<Indexer> indexer;
    private final List<AccumuloColumnHandle> columns;
    private final int rowIdOrdinal;
    private long numRows;

    public AccumuloPageSink(AccumuloClient client, AccumuloTable table, String username) {
        Objects.requireNonNull(table, "table is null");
        this.columns = table.getColumns();
        this.rowIdOrdinal = this.columns.stream().filter(columnHandle -> columnHandle.getName().equals(table.getRowId())).map(AccumuloColumnHandle::getOrdinal).findAny().orElseThrow(() -> new TrinoException((ErrorCodeSupplier)StandardErrorCode.FUNCTION_IMPLEMENTATION_ERROR, "Row ID ordinal not found"));
        this.serializer = table.getSerializerInstance();
        try {
            BatchWriterConfig conf = new BatchWriterConfig();
            this.writer = client.createBatchWriter(table.getFullTableName(), conf);
            this.indexer = table.isIndexed() ? Optional.of(new Indexer(client, client.securityOperations().getUserAuthorizations(username), table, conf)) : Optional.empty();
        }
        catch (AccumuloException | AccumuloSecurityException e) {
            throw new TrinoException((ErrorCodeSupplier)AccumuloErrorCode.UNEXPECTED_ACCUMULO_ERROR, "Accumulo error when creating BatchWriter and/or Indexer", e);
        }
        catch (TableNotFoundException e) {
            throw new TrinoException((ErrorCodeSupplier)AccumuloErrorCode.ACCUMULO_TABLE_DNE, "Accumulo error when creating BatchWriter and/or Indexer, table does not exist", (Throwable)e);
        }
    }

    public static Mutation toMutation(Row row, int rowIdOrdinal, List<AccumuloColumnHandle> columns, AccumuloRowSerializer serializer) {
        Text value = new Text();
        Field rowField = row.getField(rowIdOrdinal);
        if (rowField.isNull()) {
            throw new TrinoException((ErrorCodeSupplier)StandardErrorCode.INVALID_FUNCTION_ARGUMENT, "Column mapped as the Accumulo row ID cannot be null");
        }
        AccumuloPageSink.setText(rowField, value, serializer);
        Mutation mutation = new Mutation(value);
        mutation.put(ROW_ID_COLUMN, ROW_ID_COLUMN, new Value(value.copyBytes()));
        for (AccumuloColumnHandle columnHandle : columns) {
            if (columnHandle.getOrdinal() == rowIdOrdinal || row.getField(columnHandle.getOrdinal()).isNull()) continue;
            AccumuloPageSink.setText(row.getField(columnHandle.getOrdinal()), value, serializer);
            mutation.put((CharSequence)columnHandle.getFamily().get(), (CharSequence)columnHandle.getQualifier().get(), new Value(value.copyBytes()));
        }
        return mutation;
    }

    private static void setText(Field field, Text value, AccumuloRowSerializer serializer) {
        Type type = field.getType();
        if (Types.isArrayType(type)) {
            serializer.setArray(value, type, field.getArray());
        } else if (Types.isMapType(type)) {
            serializer.setMap(value, type, field.getMap());
        } else if (type.equals((Object)BigintType.BIGINT)) {
            serializer.setLong(value, field.getLong());
        } else if (type.equals((Object)BooleanType.BOOLEAN)) {
            serializer.setBoolean(value, field.getBoolean());
        } else if (type.equals((Object)DateType.DATE)) {
            serializer.setDate(value, field.getDate());
        } else if (type.equals((Object)DoubleType.DOUBLE)) {
            serializer.setDouble(value, field.getDouble());
        } else if (type.equals((Object)IntegerType.INTEGER)) {
            serializer.setInt(value, field.getInt());
        } else if (type.equals((Object)RealType.REAL)) {
            serializer.setFloat(value, field.getFloat());
        } else if (type.equals((Object)SmallintType.SMALLINT)) {
            serializer.setShort(value, field.getShort());
        } else if (type.equals((Object)TimeType.TIME_MILLIS)) {
            serializer.setTime(value, field.getTime());
        } else if (type.equals((Object)TinyintType.TINYINT)) {
            serializer.setByte(value, field.getByte());
        } else if (type.equals((Object)TimestampType.TIMESTAMP_MILLIS)) {
            serializer.setTimestamp(value, field.getTimestamp());
        } else if (type.equals((Object)VarbinaryType.VARBINARY)) {
            serializer.setVarbinary(value, field.getVarbinary());
        } else if (type instanceof VarcharType) {
            serializer.setVarchar(value, field.getVarchar());
        } else {
            throw new UnsupportedOperationException("Unsupported type " + String.valueOf(type));
        }
    }

    public CompletableFuture<?> appendPage(Page page) {
        for (int position = 0; position < page.getPositionCount(); ++position) {
            Row row = new Row();
            for (int channel = 0; channel < page.getChannelCount(); ++channel) {
                Type type = this.columns.get(channel).getType();
                row.addField(TypeUtils.readNativeValue((Type)type, (Block)page.getBlock(channel), (int)position), type);
            }
            try {
                Mutation mutation = AccumuloPageSink.toMutation(row, this.rowIdOrdinal, this.columns, this.serializer);
                this.writer.addMutation(mutation);
                if (this.indexer.isPresent()) {
                    this.indexer.get().index(mutation);
                }
                ++this.numRows;
            }
            catch (MutationsRejectedException e) {
                throw new TrinoException((ErrorCodeSupplier)AccumuloErrorCode.UNEXPECTED_ACCUMULO_ERROR, "Mutation rejected by server", (Throwable)e);
            }
            if (this.numRows % 100000L != 0L) continue;
            this.flush();
        }
        return NOT_BLOCKED;
    }

    public CompletableFuture<Collection<Slice>> finish() {
        try {
            this.writer.flush();
            this.writer.close();
            if (this.indexer.isPresent()) {
                this.indexer.get().close();
            }
        }
        catch (MutationsRejectedException e) {
            throw new TrinoException((ErrorCodeSupplier)AccumuloErrorCode.UNEXPECTED_ACCUMULO_ERROR, "Mutation rejected by server on flush", (Throwable)e);
        }
        return CompletableFuture.completedFuture(ImmutableList.of());
    }

    public void abort() {
        MoreFutures.getFutureValue(this.finish());
    }

    private void flush() {
        try {
            if (this.indexer.isPresent()) {
                this.indexer.get().flush();
            }
            this.writer.flush();
        }
        catch (MutationsRejectedException e) {
            throw new TrinoException((ErrorCodeSupplier)AccumuloErrorCode.UNEXPECTED_ACCUMULO_ERROR, "Mutation rejected by server on flush", (Throwable)e);
        }
    }
}

