/*
 * Decompiled with CFR 0.152.
 */
package com.amazonaws.kinesisvideo.parser.examples.lambda;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.kinesisvideo.parser.examples.GetMediaForFragmentListWorker;
import com.amazonaws.kinesisvideo.parser.examples.StreamOps;
import com.amazonaws.kinesisvideo.parser.examples.lambda.DDBBasedFragmentCheckpointManager;
import com.amazonaws.kinesisvideo.parser.examples.lambda.FragmentCheckpoint;
import com.amazonaws.kinesisvideo.parser.examples.lambda.FragmentCheckpointManager;
import com.amazonaws.kinesisvideo.parser.examples.lambda.H264FrameProcessor;
import com.amazonaws.kinesisvideo.parser.kinesis.KinesisDataStreamsWorker;
import com.amazonaws.kinesisvideo.parser.rekognition.pojo.DetectedFace;
import com.amazonaws.kinesisvideo.parser.rekognition.pojo.FaceSearchResponse;
import com.amazonaws.kinesisvideo.parser.rekognition.pojo.MatchedFace;
import com.amazonaws.kinesisvideo.parser.rekognition.pojo.RekognitionOutput;
import com.amazonaws.kinesisvideo.parser.rekognition.pojo.RekognizedFragmentsIndex;
import com.amazonaws.kinesisvideo.parser.rekognition.pojo.RekognizedOutput;
import com.amazonaws.kinesisvideo.parser.utilities.FrameVisitor;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class KinesisVideoRekognitionLambdaExample
implements RequestHandler<KinesisEvent, Context> {
    private static final Logger log = LoggerFactory.getLogger(KinesisVideoRekognitionLambdaExample.class);
    private static final int NUM_RETRIES = 10;
    private static final int KCL_INIT_DELAY_MILLIS = 10000;
    private final ExecutorService kdsWorkers = Executors.newFixedThreadPool(100);
    private final AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain();
    private final RekognizedFragmentsIndex rekognizedFragmentsIndex = new RekognizedFragmentsIndex();
    private String inputKvsStreamName;
    private String outputKvsStreamName;
    private StreamOps kvsClient;
    private FragmentCheckpointManager fragmentCheckpointManager;
    private H264FrameProcessor h264FrameProcessor;

    public static void main(String[] args) throws Exception {
        KinesisVideoRekognitionLambdaExample KinesisVideoRekognitionLambdaExample2 = new KinesisVideoRekognitionLambdaExample();
        KinesisVideoRekognitionLambdaExample2.initialize(System.getProperty("KVSStreamName"), Regions.fromName((String)System.getenv("AWS_REGION")));
        KinesisVideoRekognitionLambdaExample2.startKDSWorker(System.getProperty("KDSStreamName"));
        Thread.sleep(10000L);
        while (true) {
            KinesisVideoRekognitionLambdaExample2.processRekognizedOutputs();
        }
    }

    private void initialize(String kvsStreamName, Regions regionName) {
        this.inputKvsStreamName = kvsStreamName;
        this.outputKvsStreamName = kvsStreamName + "-Rekognized";
        this.kvsClient = new StreamOps(regionName, kvsStreamName, this.credentialsProvider);
        this.h264FrameProcessor = H264FrameProcessor.create(this.credentialsProvider, this.outputKvsStreamName, regionName);
        this.fragmentCheckpointManager = new DDBBasedFragmentCheckpointManager(this.kvsClient.getRegion(), this.credentialsProvider);
        log.info("Initialized with input KVS stream: {}, output {}, region : {}", new Object[]{this.inputKvsStreamName, this.outputKvsStreamName, regionName});
    }

    private void processRekognizedOutputs() throws InterruptedException {
        Optional<FragmentCheckpoint> lastFragmentNumber = this.fragmentCheckpointManager.getLastProcessedItem(this.inputKvsStreamName);
        String fragmentNumber = null;
        while (!this.rekognizedFragmentsIndex.isEmpty()) {
            RekognizedFragmentsIndex.RekognizedFragment rekognizedFragment = this.rekognizedFragmentsIndex.poll();
            fragmentNumber = rekognizedFragment.getFragmentNumber();
            List<RekognizedOutput> rekognizedOutputList = rekognizedFragment.getRekognizedOutputs();
            if (lastFragmentNumber.isPresent() && (fragmentNumber.equals(lastFragmentNumber.get().getFragmentNumber()) || rekognizedFragment.getServerTime() <= lastFragmentNumber.get().getServerTime())) {
                log.info("Current fragment number : {} is already processed or older than last processed fragment. So skipping..", (Object)fragmentNumber);
                continue;
            }
            try {
                FrameVisitor frameVisitor = FrameVisitor.create(this.h264FrameProcessor);
                GetMediaForFragmentListWorker worker = GetMediaForFragmentListWorker.create(this.kvsClient.getStreamName(), Collections.singletonList(fragmentNumber), this.kvsClient.getCredentialsProvider(), this.kvsClient.getRegion(), this.kvsClient.getAmazonKinesisVideo(), frameVisitor);
                this.h264FrameProcessor.setRekognizedOutputs(rekognizedOutputList);
                worker.run();
                this.h264FrameProcessor.resetEncoder();
                log.info("Fragment {} processed successfully ...", (Object)fragmentNumber);
                this.fragmentCheckpointManager.saveCheckPoint(this.inputKvsStreamName, fragmentNumber, rekognizedFragment.getProducerTime(), rekognizedFragment.getServerTime());
            }
            catch (Exception e) {
                log.error("Error while processing fragment number: {}", (Object)fragmentNumber, (Object)e);
            }
        }
    }

    public void startKDSWorker(String kdsStreamName) {
        KinesisDataStreamsWorker kinesisDataStreamsWorker = KinesisDataStreamsWorker.create(Regions.US_WEST_2, this.credentialsProvider, kdsStreamName, this.rekognizedFragmentsIndex);
        this.kdsWorkers.submit(kinesisDataStreamsWorker);
    }

    public Context handleRequest(KinesisEvent kinesisEvent, Context context) {
        try {
            this.initialize(System.getProperty("KVSStreamName"), Regions.fromName((String)System.getenv("AWS_REGION")));
            this.loadProducerJNI(context);
            List<Record> records = kinesisEvent.getRecords().stream().map(KinesisEvent.KinesisEventRecord::getKinesis).collect(Collectors.toList());
            this.processRecordsWithRetries(records);
            this.processRekognizedOutputs();
        }
        catch (Exception e) {
            log.error("Unable to process lambda request !. Exiting... ", (Throwable)e);
        }
        return context;
    }

    private void loadProducerJNI(Context context) throws IOException {
        log.info("Context : {}", (Object)context);
        log.info("Working Directory = {}", (Object)System.getProperty("user.dir"));
        log.info("Java library path = {}", (Object)System.getProperty("java.library.path"));
        log.info("Class path %s", (Object)this.getClass().getProtectionDomain().getCodeSource().getLocation());
        log.info("Loading JNI .so file..");
        ClassLoader classLoader = this.getClass().getClassLoader();
        File cityFile = new File(classLoader.getResource("libKinesisVideoProducerJNI.so").getFile());
        System.load(cityFile.getAbsolutePath());
        log.info("Loaded JNI from {}", (Object)cityFile.getAbsolutePath());
    }

    private void processRecordsWithRetries(List<Record> records) {
        for (Record record : records) {
            boolean processedSuccessfully = false;
            for (int i = 0; i < 10; ++i) {
                try {
                    log.info("Processing single record...");
                    this.processSingleRecord(record);
                    processedSuccessfully = true;
                    break;
                }
                catch (Throwable t) {
                    log.error("Caught throwable while processing record {}", (Object)record, (Object)t);
                    continue;
                }
            }
            if (processedSuccessfully) continue;
            log.warn("Couldn't processRekognizedOutputs record {}. Skipping the record.", (Object)record);
        }
        log.info("Processed all {} KDS records.", (Object)records.size());
    }

    private void processSingleRecord(Record record) {
        String data = null;
        ObjectMapper mapper = new ObjectMapper();
        try {
            ByteBuffer buffer = record.getData();
            data = new String(buffer.array(), "UTF-8");
            RekognitionOutput output = (RekognitionOutput)mapper.readValue(data, RekognitionOutput.class);
            String fragmentNumber = output.getInputInformation().getKinesisVideo().getFragmentNumber();
            Double frameOffsetInSeconds = output.getInputInformation().getKinesisVideo().getFrameOffsetInSeconds();
            Double serverTimestamp = output.getInputInformation().getKinesisVideo().getServerTimestamp();
            Double producerTimestamp = output.getInputInformation().getKinesisVideo().getProducerTimestamp();
            double detectedTime = output.getInputInformation().getKinesisVideo().getServerTimestamp() + output.getInputInformation().getKinesisVideo().getFrameOffsetInSeconds() * 1000.0;
            RekognizedOutput rekognizedOutput = RekognizedOutput.builder().fragmentNumber(fragmentNumber).serverTimestamp(serverTimestamp).producerTimestamp(producerTimestamp).frameOffsetInSeconds(frameOffsetInSeconds).detectedTime(detectedTime).build();
            List<FaceSearchResponse> responses = output.getFaceSearchResponse();
            responses.forEach(response -> {
                DetectedFace detectedFace = response.getDetectedFace();
                List<MatchedFace> matchedFaces = response.getMatchedFaces();
                RekognizedOutput.FaceSearchOutput faceSearchOutput = RekognizedOutput.FaceSearchOutput.builder().detectedFace(detectedFace).matchedFaceList(matchedFaces).build();
                rekognizedOutput.addFaceSearchOutput(faceSearchOutput);
            });
            log.info("Found Rekognized results for fragment number : {}", (Object)fragmentNumber);
            this.rekognizedFragmentsIndex.add(fragmentNumber, producerTimestamp.longValue(), serverTimestamp.longValue(), rekognizedOutput);
        }
        catch (NumberFormatException e) {
            log.warn("Record does not match sample record format. Ignoring record with data : {}", data, (Object)e);
        }
        catch (Exception e) {
            log.error("Unable to process record !", (Throwable)e);
        }
    }
}

