/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.broker.jobstream;

import io.camunda.zeebe.broker.jobstream.JobStreamErrorHandler;
import io.camunda.zeebe.logstreams.log.LogStreamWriter;
import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.protocol.impl.stream.job.ActivatedJob;
import io.camunda.zeebe.stream.api.scheduling.ScheduledCommandCache;
import io.camunda.zeebe.stream.api.scheduling.TaskResult;
import io.camunda.zeebe.stream.api.scheduling.TaskResultBuilder;
import io.camunda.zeebe.stream.impl.BufferedTaskResultBuilder;
import io.camunda.zeebe.transport.stream.api.RemoteStreamErrorHandler;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.logging.ThrottledLogger;
import java.time.Duration;
import org.agrona.collections.Int2ObjectHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class RemoteJobStreamErrorHandler
implements RemoteStreamErrorHandler<ActivatedJob> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RemoteStreamErrorHandler.class);
    private static final Logger NO_WRITER_LOGGER = new ThrottledLogger(LOGGER, Duration.ofSeconds(1L));
    private static final Logger FAILED_WRITER_LOGGER = new ThrottledLogger(LOGGER, Duration.ofSeconds(1L));
    private final JobStreamErrorHandler errorHandler;
    private final Int2ObjectHashMap<LogStreamWriter> partitionWriters = new Int2ObjectHashMap();

    RemoteJobStreamErrorHandler(JobStreamErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

    public void handleError(Throwable error, ActivatedJob job) {
        int partitionId = Protocol.decodePartitionId((long)job.jobKey());
        LogStreamWriter writer = (LogStreamWriter)this.partitionWriters.get(partitionId);
        if (writer == null) {
            NO_WRITER_LOGGER.warn("Cannot handle failed job push on partition {} there is no writer registered;\nthis can occur during an election", (Object)partitionId);
            return;
        }
        BufferedTaskResultBuilder resultBuilder = new BufferedTaskResultBuilder((arg_0, arg_1) -> ((LogStreamWriter)writer).canWriteEvents(arg_0, arg_1), (ScheduledCommandCache.StagedScheduledCommandCache)new ScheduledCommandCache.NoopScheduledCommandCache());
        this.errorHandler.handleError(job, error, (TaskResultBuilder)resultBuilder);
        TaskResult result = resultBuilder.build();
        this.writeEntries(partitionId, job, writer, result);
    }

    void addWriter(int partitionId, LogStreamWriter writer) {
        this.partitionWriters.put(partitionId, (Object)writer);
    }

    void removeWriter(int partitionId) {
        this.partitionWriters.remove(partitionId);
    }

    private void writeEntries(int partitionId, ActivatedJob job, LogStreamWriter writer, TaskResult result) {
        Either writeResult = writer.tryWrite(result.getRecordBatch().entries());
        if (writeResult.isLeft()) {
            FAILED_WRITER_LOGGER.warn("Failed to handle failed job push {} on partition {}. Write to logstream failed with {};\njob will remain activated until it times out.", new Object[]{job.jobKey(), partitionId, writeResult.getLeft()});
        }
    }
}

