/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.alluxio.sink;

import alluxio.AlluxioURI;
import alluxio.client.WriteType;
import alluxio.client.file.FileOutStream;
import alluxio.client.file.FileSystem;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.Configuration;
import alluxio.conf.InstancedConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.AlluxioException;
import alluxio.grpc.CreateFilePOptions;
import alluxio.grpc.WritePType;
import alluxio.util.FileSystemOptionsUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import lombok.Generated;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.alluxio.sink.AlluxioSinkConfig;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name="alluxio", type=IOType.SINK, help="The sink connector is used for moving records from Pulsar to Alluxio.", configClass=AlluxioSinkConfig.class)
public class AlluxioSink
implements Sink<GenericObject> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(AlluxioSink.class);
    private FileSystem fileSystem;
    private FileOutStream fileOutStream;
    private long recordsNum;
    private String tmpFilePath;
    private String fileDirPath;
    private String tmpFileDirPath;
    private long lastRotationTime;
    private long rotationRecordsNum;
    private long rotationInterval;
    private AlluxioSinkConfig alluxioSinkConfig;
    private AlluxioState alluxioState;
    private InstancedConfiguration configuration = Configuration.modifiableGlobal();
    private ObjectMapper objectMapper = new ObjectMapper();
    private List<Record<GenericObject>> recordsToAck;

    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        AlluxioURI tmpAlluxioDirPath;
        this.alluxioSinkConfig = AlluxioSinkConfig.load(config);
        this.alluxioSinkConfig.validate();
        String alluxioMasterHost = this.alluxioSinkConfig.getAlluxioMasterHost();
        int alluxioMasterPort = this.alluxioSinkConfig.getAlluxioMasterPort();
        this.configuration.set(PropertyKey.MASTER_HOSTNAME, alluxioMasterHost);
        this.configuration.set(PropertyKey.MASTER_RPC_PORT, (Object)alluxioMasterPort);
        if (this.alluxioSinkConfig.getSecurityLoginUser() != null) {
            this.configuration.set(PropertyKey.SECURITY_LOGIN_USERNAME, this.alluxioSinkConfig.getSecurityLoginUser());
        }
        this.fileSystem = FileSystem.Factory.create((AlluxioConfiguration)this.configuration);
        String alluxioDir = this.alluxioSinkConfig.getAlluxioDir();
        this.fileDirPath = alluxioDir.startsWith("/") ? alluxioDir : "/" + alluxioDir;
        this.tmpFileDirPath = this.fileDirPath + "/tmp";
        AlluxioURI alluxioDirPath = new AlluxioURI(this.fileDirPath);
        if (!this.fileSystem.exists(alluxioDirPath)) {
            this.fileSystem.createDirectory(alluxioDirPath);
        }
        if (!this.fileSystem.exists(tmpAlluxioDirPath = new AlluxioURI(this.tmpFileDirPath))) {
            this.fileSystem.createDirectory(tmpAlluxioDirPath);
        }
        this.recordsNum = 0L;
        this.recordsToAck = Lists.newArrayList();
        this.tmpFilePath = "";
        this.alluxioState = AlluxioState.WRITE_STARTED;
        this.lastRotationTime = System.currentTimeMillis();
        this.rotationRecordsNum = this.alluxioSinkConfig.getRotationRecords();
        this.rotationInterval = this.alluxioSinkConfig.getRotationInterval();
    }

    public void write(Record<GenericObject> record) {
        long now = System.currentTimeMillis();
        switch (this.alluxioState) {
            case WRITE_STARTED: {
                try {
                    this.writeToAlluxio(record);
                    if (!this.shouldRotate(now)) break;
                    this.alluxioState = AlluxioState.FILE_ROTATED;
                }
                catch (AlluxioException | IOException e) {
                    log.error("Unable to write record to alluxio.", e);
                    record.fail();
                    break;
                }
            }
            case FILE_ROTATED: {
                try {
                    this.closeAndCommitTmpFile();
                    this.alluxioState = AlluxioState.FILE_COMMITTED;
                    this.ackRecords();
                }
                catch (AlluxioException | IOException e) {
                    log.error("Unable to flush records to alluxio.", e);
                    this.failRecords();
                    try {
                        this.deleteTmpFile();
                    }
                    catch (AlluxioException | IOException e1) {
                        log.error("Failed to delete tmp cache file.", e);
                    }
                    break;
                }
            }
            case FILE_COMMITTED: {
                this.alluxioState = AlluxioState.WRITE_STARTED;
                break;
            }
            default: {
                log.error("{} is not a valid state when writing record to alluxio temp dir {}.", (Object)this.alluxioState, (Object)this.tmpFileDirPath);
            }
        }
    }

    public void close() throws Exception {
        try {
            this.closeAndCommitTmpFile();
            this.ackRecords();
        }
        catch (AlluxioException | IOException e) {
            log.error("Unable to flush records to alluxio.", e);
            this.failRecords();
        }
        this.deleteTmpFile();
    }

    private void ackRecords() {
        this.recordsToAck.forEach(Record::ack);
        this.recordsToAck.clear();
    }

    private void failRecords() {
        this.recordsToAck.forEach(Record::fail);
        this.recordsToAck.clear();
    }

    private void writeToAlluxio(Record<GenericObject> record) throws AlluxioException, IOException {
        KeyValue<String, String> keyValue = this.extractKeyValue(record);
        if (this.fileOutStream == null) {
            this.createTmpFile();
        }
        this.fileOutStream.write(AlluxioSink.toBytes(keyValue.getValue()));
        if (this.alluxioSinkConfig.getLineSeparator() != '\u0000') {
            this.fileOutStream.write((int)this.alluxioSinkConfig.getLineSeparator());
        }
        ++this.recordsNum;
        this.recordsToAck.add(record);
    }

    private void createTmpFile() throws AlluxioException, IOException {
        CreateFilePOptions.Builder optionsBuilder = FileSystemOptionsUtils.createFileDefaults((AlluxioConfiguration)this.configuration).toBuilder();
        UUID id = UUID.randomUUID();
        String fileExtension = this.alluxioSinkConfig.getFileExtension();
        this.tmpFilePath = this.tmpFileDirPath + "/" + id.toString() + "_tmp" + fileExtension;
        if (this.alluxioSinkConfig.getWriteType() != null) {
            WritePType writePType;
            try {
                writePType = WritePType.valueOf((String)this.alluxioSinkConfig.getWriteType().toUpperCase());
            }
            catch (IllegalArgumentException e) {
                throw new IllegalArgumentException("Illegal write type when creating Alluxio files, valid values are: " + String.valueOf(Arrays.asList(WriteType.values())));
            }
            optionsBuilder.setWriteType(writePType);
        }
        this.fileOutStream = this.fileSystem.createFile(new AlluxioURI(this.tmpFilePath), optionsBuilder.build());
    }

    private void closeAndCommitTmpFile() throws AlluxioException, IOException {
        if (this.fileOutStream != null) {
            this.fileOutStream.close();
        }
        String filePrefix = this.alluxioSinkConfig.getFilePrefix();
        String fileExtension = this.alluxioSinkConfig.getFileExtension();
        String newFile = filePrefix + "-" + System.currentTimeMillis() + fileExtension;
        String newFilePath = this.fileDirPath + "/" + newFile;
        this.fileSystem.rename(new AlluxioURI(this.tmpFilePath), new AlluxioURI(newFilePath));
        this.fileOutStream = null;
        this.tmpFilePath = "";
        this.recordsNum = 0L;
        this.lastRotationTime = System.currentTimeMillis();
    }

    private void deleteTmpFile() throws AlluxioException, IOException {
        if (!this.tmpFilePath.equals("")) {
            this.fileSystem.delete(new AlluxioURI(this.tmpFilePath));
        }
    }

    private boolean shouldRotate(long now) {
        boolean rotated = false;
        if (this.recordsNum >= this.rotationRecordsNum) {
            rotated = true;
        } else if (this.rotationInterval != -1L && now - this.lastRotationTime >= this.rotationInterval) {
            rotated = true;
        }
        return rotated;
    }

    private static byte[] toByteArray(Object obj) throws IOException {
        byte[] bytes = null;
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
             ObjectOutputStream oos = new ObjectOutputStream(baos);){
            oos.writeObject(obj);
            oos.flush();
            bytes = baos.toByteArray();
        }
        catch (IOException e) {
            log.error("Failed to serialize the object.", (Throwable)e);
            throw e;
        }
        return bytes;
    }

    private static byte[] toBytes(Object obj) throws IOException {
        byte[] bytes;
        if (obj instanceof String) {
            String s = (String)obj;
            bytes = s.getBytes(StandardCharsets.UTF_8);
        } else {
            bytes = obj instanceof byte[] ? (byte[])obj : AlluxioSink.toByteArray(obj);
        }
        return bytes;
    }

    public KeyValue<String, String> extractKeyValue(Record<GenericObject> record) throws JsonProcessingException {
        if (this.alluxioSinkConfig.isSchemaEnable()) {
            GenericObject recordValue = null;
            Schema valueSchema = null;
            if (record.getSchema() != null && record.getSchema() instanceof KeyValueSchema) {
                KeyValueSchema keyValueSchema = (KeyValueSchema)record.getSchema();
                valueSchema = keyValueSchema.getValueSchema();
                org.apache.pulsar.common.schema.KeyValue keyValue = (org.apache.pulsar.common.schema.KeyValue)((GenericObject)record.getValue()).getNativeObject();
                recordValue = (GenericObject)keyValue.getValue();
            } else {
                valueSchema = record.getSchema();
                recordValue = (GenericObject)record.getValue();
            }
            String value = null;
            if (recordValue != null) {
                value = valueSchema != null ? this.stringifyValue(valueSchema, recordValue) : (recordValue.getNativeObject() instanceof byte[] ? new String((byte[])recordValue.getNativeObject(), StandardCharsets.UTF_8) : recordValue.getNativeObject().toString());
            }
            return new KeyValue(null, value);
        }
        return new KeyValue(null, (Object)new String(((Message)record.getMessage().orElseThrow(() -> new IllegalArgumentException("Record does not carry message information"))).getData(), StandardCharsets.UTF_8));
    }

    public String stringifyValue(Schema<?> schema, Object val) throws JsonProcessingException {
        if (schema.getSchemaInfo().getType() == SchemaType.JSON) {
            JsonNode jsonNode = (JsonNode)((GenericRecord)val).getNativeObject();
            return this.objectMapper.writeValueAsString((Object)jsonNode);
        }
        throw new UnsupportedOperationException("Unsupported value schemaType=" + String.valueOf(schema.getSchemaInfo().getType()));
    }

    private static enum AlluxioState {
        WRITE_STARTED,
        FILE_ROTATED,
        FILE_COMMITTED;

    }
}

