/*
 * Decompiled with CFR 0.152.
 */
package com.emc.mongoose.storage.driver.kafka;

import com.emc.mongoose.base.Exceptions;
import com.emc.mongoose.base.config.IllegalConfigurationException;
import com.emc.mongoose.base.data.DataInput;
import com.emc.mongoose.base.item.DataItem;
import com.emc.mongoose.base.item.Item;
import com.emc.mongoose.base.item.ItemFactory;
import com.emc.mongoose.base.item.PathItem;
import com.emc.mongoose.base.item.op.OpType;
import com.emc.mongoose.base.item.op.Operation;
import com.emc.mongoose.base.item.op.data.DataOperation;
import com.emc.mongoose.base.item.op.path.PathOperation;
import com.emc.mongoose.base.logging.LogUtil;
import com.emc.mongoose.base.storage.Credential;
import com.emc.mongoose.storage.driver.kafka.cache.AdminClientCreateFunctionImpl;
import com.emc.mongoose.storage.driver.kafka.cache.ProducerCreateFunctionImpl;
import com.emc.mongoose.storage.driver.kafka.cache.TopicCreateFunction;
import com.emc.mongoose.storage.driver.kafka.cache.TopicCreateFunctionImpl;
import com.emc.mongoose.storage.driver.preempt.PreemptStorageDriverBase;
import com.github.akurilov.commons.concurrent.ContextAwareThreadFactory;
import com.github.akurilov.confuse.Config;
import java.io.EOFException;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.ThreadContext;

public class KafkaStorageDriver<I extends Item, O extends Operation<I>>
extends PreemptStorageDriverBase<I, O> {
    private final String[] endpointAddrs;
    private final boolean useKey;
    private final int requestSizeLimit;
    private final int batchSize;
    private final int sndBuf;
    private final int rcvBuf;
    private final int linger;
    private final long buffer;
    private final String compression;
    private final Duration recordOpTimeout;
    private final Semaphore concurrencyThrottle;
    private final AtomicInteger rrc = new AtomicInteger(0);
    private final Map<String, Properties> configCache = new ConcurrentHashMap<String, Properties>();
    private final Map<Properties, AdminClientCreateFunctionImpl> adminClientCreateFuncCache = new ConcurrentHashMap<Properties, AdminClientCreateFunctionImpl>();
    private final Map<String, AdminClient> adminClientCache = new ConcurrentHashMap<String, AdminClient>();
    private final Map<Properties, ProducerCreateFunctionImpl> producerCreateFuncCache = new ConcurrentHashMap<Properties, ProducerCreateFunctionImpl>();
    private final ThreadLocal<Map<String, KafkaProducer>> threadLocalProducerCache = ThreadLocal.withInitial(ConcurrentHashMap::new);
    private final Map<AdminClient, TopicCreateFunctionImpl> topicCreateFuncCache = new ConcurrentHashMap<AdminClient, TopicCreateFunctionImpl>();
    private final Map<String, NewTopic> topicCache = new ConcurrentHashMap<String, NewTopic>();
    protected final Map<String, String> sharedHeaders = new HashMap<String, String>();
    protected final Map<String, String> dynamicHeaders = new HashMap<String, String>();
    private volatile boolean listWasCalled = false;

    public KafkaStorageDriver(String testStepId, DataInput dataInput, Config storageConfig, boolean verifyFlag, int batchSize) throws IllegalConfigurationException {
        super(testStepId, dataInput, storageConfig, verifyFlag, batchSize);
        Config driverConfig = storageConfig.configVal("driver");
        Config recordConfig = driverConfig.configVal("record");
        Map headersMap = recordConfig.mapVal("headers");
        if (!headersMap.isEmpty()) {
            for (Map.Entry header : headersMap.entrySet()) {
                String headerKey = (String)header.getKey();
                String headerValue = (String)header.getValue();
                if (headerKey.contains("#{") || headerKey.contains("${") || headerKey.contains("%{") || headerValue.contains("#{") || headerValue.contains("${") || headerValue.contains("%{")) {
                    this.dynamicHeaders.put(headerKey, headerValue);
                    continue;
                }
                this.sharedHeaders.put(headerKey, headerValue);
            }
        }
        this.useKey = recordConfig.boolVal("key-enabled");
        this.requestSizeLimit = driverConfig.intVal("request-size");
        this.batchSize = batchSize;
        Config netConfig = storageConfig.configVal("net");
        this.buffer = driverConfig.longVal("buffer-memory");
        this.compression = driverConfig.stringVal("compression-type");
        Config nodeConfig = netConfig.configVal("node");
        int nodePort = nodeConfig.intVal("port");
        List endpointAddrList = nodeConfig.listVal("addrs");
        this.endpointAddrs = endpointAddrList.toArray(new String[endpointAddrList.size()]);
        for (int i = 0; i < this.endpointAddrs.length; ++i) {
            if (this.endpointAddrs[i].contains(":")) continue;
            this.endpointAddrs[i] = this.endpointAddrs[i] + ":" + nodePort;
        }
        this.sndBuf = netConfig.intVal("sndBuf");
        this.rcvBuf = netConfig.intVal("rcvBuf");
        this.linger = netConfig.intVal("linger");
        long recordOpTimeoutMillis = recordConfig.longVal("timeoutMillis");
        this.recordOpTimeout = recordOpTimeoutMillis > 0L ? Duration.ofMillis(recordOpTimeoutMillis) : Duration.ofDays(Long.MAX_VALUE);
        this.concurrencyThrottle = new Semaphore(this.concurrencyLimit > 0 ? this.concurrencyLimit : Integer.MAX_VALUE);
        this.requestAuthTokenFunc = null;
        this.requestNewPathFunc = null;
    }

    protected ThreadFactory ioWorkerThreadFactory() {
        return new IoWorkerThreadFactory();
    }

    protected final boolean isBatch(List<O> ops, int from, int to) {
        return true;
    }

    protected final void execute(O op) {
        block15: {
            OpType opType;
            block14: {
                opType = op.type();
                if (!(op instanceof DataOperation)) break block14;
                switch (opType) {
                    case NOOP: {
                        this.noop(List.of(op));
                        break block15;
                    }
                    case CREATE: {
                        this.produceRecords(List.of(op));
                        break block15;
                    }
                    case READ: {
                        this.consumeRecords(List.of(op));
                        break block15;
                    }
                    default: {
                        throw new AssertionError((Object)("Unsupported records operation type: " + opType));
                    }
                }
            }
            if (op instanceof PathOperation) {
                switch (opType) {
                    case NOOP: {
                        this.noop(List.of(op));
                        break;
                    }
                    case CREATE: {
                        this.createTopics(List.of(op));
                        break;
                    }
                    case READ: {
                        this.readTopics(List.of(op));
                        break;
                    }
                    case DELETE: {
                        this.deleteTopics(List.of(op));
                        break;
                    }
                    case LIST: {
                        throw new AssertionError((Object)("Unsupported topics operation type: " + opType));
                    }
                }
            } else {
                throw new AssertionError((Object)("Unsupported operation class: " + op.getClass()));
            }
        }
    }

    protected final void execute(List<O> ops) throws IllegalStateException {
        block15: {
            OpType opType;
            Operation op;
            block14: {
                op = (Operation)ops.get(0);
                opType = op.type();
                if (!(op instanceof DataOperation)) break block14;
                switch (opType) {
                    case NOOP: {
                        this.noop(ops);
                        break block15;
                    }
                    case CREATE: {
                        this.produceRecords(ops);
                        break block15;
                    }
                    case READ: {
                        this.consumeRecords(ops);
                        break block15;
                    }
                    default: {
                        throw new AssertionError((Object)("Unsupported record operation type: " + opType));
                    }
                }
            }
            if (op instanceof PathOperation) {
                switch (opType) {
                    case NOOP: {
                        this.noop(List.of(op));
                        break;
                    }
                    case CREATE: {
                        this.createTopics(ops);
                        break;
                    }
                    case READ: {
                        this.readTopics(ops);
                        break;
                    }
                    case DELETE: {
                        this.deleteTopics(ops);
                        break;
                    }
                    case LIST: {
                        throw new AssertionError((Object)("Unsupported topics operation type: " + opType));
                    }
                }
            } else {
                throw new AssertionError((Object)("Unsupported operation class: " + op.getClass()));
            }
        }
    }

    void noop(List<O> ops) {
        try {
            this.concurrencyThrottle.acquire();
        }
        catch (InterruptedException e) {
            com.github.akurilov.commons.lang.Exceptions.throwUnchecked((Throwable)e);
        }
        Operation op = (Operation)ops.get(0);
        if (op instanceof DataOperation) {
            for (int i = 0; i < ops.size(); ++i) {
                op = (Operation)ops.get(i);
                op.startRequest();
                op.finishRequest();
                op.startResponse();
                op.finishResponse();
                try {
                    DataOperation dataOp = (DataOperation)op;
                    dataOp.countBytesDone(dataOp.item().size());
                    continue;
                }
                catch (IOException iOException) {
                    // empty catch block
                }
            }
        } else {
            for (int i = 0; i < ops.size(); ++i) {
                op = (Operation)ops.get(i);
                op.startRequest();
                op.finishRequest();
                op.startResponse();
                op.finishResponse();
            }
        }
        this.concurrencyThrottle.release();
        this.completeOperations(ops, Operation.Status.SUCC);
    }

    void completeOperations(List<? extends O> ops, Operation.Status status) {
        Operation op = null;
        for (int i = 0; i < ops.size(); ++i) {
            op = (Operation)ops.get(i);
            op.status(status);
            this.handleCompleted(op);
        }
    }

    void produceRecords(List<O> recOps) {
        String nodeAddr = ((Operation)recOps.get(0)).nodeAddr();
        try {
            Properties config = this.configCache.computeIfAbsent(nodeAddr, this::createConfig);
            ProducerCreateFunctionImpl producerConfig = this.producerCreateFuncCache.computeIfAbsent(config, ProducerCreateFunctionImpl::new);
            Map<String, KafkaProducer> producerCache = this.threadLocalProducerCache.get();
            KafkaProducer producer = producerCache.computeIfAbsent(nodeAddr, producerConfig);
            AdminClientCreateFunctionImpl adminConfig = this.adminClientCreateFuncCache.computeIfAbsent(config, AdminClientCreateFunctionImpl::new);
            AdminClient adminClient = this.adminClientCache.computeIfAbsent(nodeAddr, adminConfig);
            String topicName = null;
            TopicCreateFunction topicCreateFunc = null;
            for (int i = 0; i < recOps.size(); ++i) {
                DataOperation recOp = (DataOperation)recOps.get(i);
                topicName = recOp.dstPath().replaceAll("/", "");
                topicCreateFunc = this.topicCreateFuncCache.computeIfAbsent(adminClient, TopicCreateFunctionImpl::new);
                this.topicCache.computeIfAbsent(topicName, topicCreateFunc);
                DataItem recItem = recOp.item();
                String producerKey = this.useKey ? recItem.name() : null;
                ProducerRecord<String, DataItem> producerRecord = new ProducerRecord<String, DataItem>(topicName, producerKey, recItem);
                long recSize = recItem.size();
                this.concurrencyThrottle.acquire();
                recOp.startRequest();
                producer.send(producerRecord, (md, e) -> this.handleRecordProduce(recOp, recSize, e));
                try {
                    recOp.finishRequest();
                    continue;
                }
                catch (IllegalStateException illegalStateException) {
                    // empty catch block
                }
            }
            producer.flush();
        }
        catch (Throwable e2) {
            Exceptions.throwUncheckedIfInterrupted((Throwable)e2);
            LogUtil.exception((Level)Level.DEBUG, (Throwable)e2, (String)"Producing records failure", (Object[])new Object[0]);
            this.completeOperations(recOps, Operation.Status.FAIL_UNKNOWN);
        }
    }

    void handleRecordProduce(DataOperation recOp, long recSize, Throwable e) {
        recOp.startResponse();
        recOp.finishResponse();
        this.concurrencyThrottle.release();
        if (null == e) {
            recOp.countBytesDone(recSize);
            this.completeOperation(recOp, Operation.Status.SUCC);
        } else {
            this.completeFailedOperation(recOp, e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void consumeRecords(List<O> recOps) {
        int opCount;
        String nodeAddr = ((Operation)recOps.get(0)).nodeAddr();
        int remainingOpCount = opCount = recOps.size();
        while (remainingOpCount > 0) {
            Properties consumerConfig = this.createConsumerConfig(nodeAddr);
            consumerConfig.put("max.poll.records", (Object)remainingOpCount);
            DataOperation recOp = null;
            try (KafkaConsumer consumer = new KafkaConsumer(consumerConfig);){
                try {
                    this.concurrencyThrottle.acquire();
                    HashSet<String> topics = new HashSet<String>();
                    for (int i = opCount - remainingOpCount; i < opCount; ++i) {
                        recOp = (DataOperation)recOps.get(i);
                        recOp.startRequest();
                        recOp.finishRequest();
                        String topicName = recOp.srcPath().replaceAll("/", "");
                        topics.add(topicName);
                    }
                    consumer.subscribe(topics);
                    ConsumerRecords pollResult = consumer.poll(this.recordOpTimeout);
                    Iterator recIter = pollResult.iterator();
                    ConsumerRecord rec = null;
                    while (recIter.hasNext()) {
                        rec = recIter.next();
                        recOp = (DataOperation)recOps.get(opCount - remainingOpCount);
                        recOp.startResponse();
                        recOp.finishResponse();
                        recOp.countBytesDone((long)rec.serializedValueSize());
                        this.completeOperation(recOp, Operation.Status.SUCC);
                        --remainingOpCount;
                    }
                }
                finally {
                    this.concurrencyThrottle.release();
                }
            }
            catch (Throwable e) {
                this.completeFailedOperation(recOp, e);
                Exceptions.throwUncheckedIfInterrupted((Throwable)e);
            }
        }
    }

    void createTopics(List<O> topicOps) {
        String nodeAddr = ((Operation)topicOps.get(0)).nodeAddr();
        try {
            Properties config = this.configCache.computeIfAbsent(nodeAddr, this::createAdminClientConfig);
            AdminClientCreateFunctionImpl adminClientCreateFunc = this.adminClientCreateFuncCache.computeIfAbsent(config, AdminClientCreateFunctionImpl::new);
            AdminClient adminClient = this.adminClientCache.computeIfAbsent(nodeAddr, adminClientCreateFunc);
            ArrayList<NewTopic> topicCollection = new ArrayList<NewTopic>();
            for (int i = 0; i < topicOps.size(); ++i) {
                Operation topicOp = (Operation)topicOps.get(i);
                String topicName = topicOp.item().name();
                NewTopic newTopic = new NewTopic(topicName, 1, 1);
                topicCollection.add(newTopic);
            }
            this.concurrencyThrottle.acquire(topicOps.size());
            Map<String, KafkaFuture<Void>> createTopicsResultMap = adminClient.createTopics(topicCollection).values();
            for (int i = 0; i < topicOps.size(); ++i) {
                this.hanldeTopicCreateOperation((Operation)topicOps.get(i), createTopicsResultMap);
            }
        }
        catch (Throwable thrown) {
            Exceptions.throwUncheckedIfInterrupted((Throwable)thrown);
            for (int i = 0; i < topicOps.size(); ++i) {
                Operation topicOp = (Operation)topicOps.get(i);
                this.completeFailedOperation(topicOp, thrown);
            }
            LogUtil.exception((Level)Level.DEBUG, (Throwable)thrown, (String)"{}: unexpected failure while trying to create {} topics", (Object[])new Object[]{this.stepId, topicOps.size()});
        }
    }

    KafkaFuture.BiConsumer<Void, Throwable> handleTopicCreateFuture(PathOperation topicOp) {
        String topicName = topicOp.item().name();
        KafkaFuture.BiConsumer<Void, Throwable> action = (aVoid, throwable) -> {
            if (throwable == null) {
                topicOp.startResponse();
                topicOp.finishResponse();
                this.completeOperation(topicOp, Operation.Status.SUCC);
            } else {
                LogUtil.exception((Level)Level.DEBUG, (Throwable)throwable, (String)"{}: Failed to create topic \"{}\"", (Object[])new Object[]{this.stepId, topicName});
                if (throwable instanceof TopicExistsException) {
                    LogUtil.exception((Level)Level.DEBUG, (Throwable)throwable, (String)"{}: Topic \"{}\" already exists", (Object[])new Object[]{this.stepId, topicName});
                }
                this.completeOperation(topicOp, Operation.Status.RESP_FAIL_UNKNOWN);
            }
            this.concurrencyThrottle.release();
        };
        return action;
    }

    void hanldeTopicCreateOperation(O topicOp, Map<String, KafkaFuture<Void>> resultMap) {
        String topicName = topicOp.item().name();
        topicOp.startRequest();
        KafkaFuture<Void> oneTopicResult = resultMap.get(topicName);
        oneTopicResult.whenComplete(this.handleTopicCreateFuture((PathOperation)topicOp));
        topicOp.finishRequest();
    }

    void readTopics(List<O> topicOps) {
        String nodeAddr = ((Operation)topicOps.get(0)).nodeAddr();
        try {
            this.concurrencyThrottle.acquire();
            for (int i = 0; i < topicOps.size(); ++i) {
                PathOperation topicOp = (PathOperation)topicOps.get(i);
                PathItem topicItem = topicOp.item();
                String topicName = topicItem.name().replaceAll("/", "");
                try {
                    Properties consumerConfig = this.createConsumerConfig(nodeAddr);
                    consumerConfig.put("max.poll.records", (Object)Integer.MAX_VALUE);
                    topicOp.startRequest();
                    try (KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerConfig);){
                        kafkaConsumer.subscribe(Arrays.asList(topicName));
                        topicOp.finishRequest();
                        topicOp.startResponse();
                        int sizeOfReadData = 0;
                        ConsumerRecords records = null;
                        while (!(records = kafkaConsumer.poll(this.recordOpTimeout)).isEmpty()) {
                            for (ConsumerRecord record : records) {
                                sizeOfReadData += ((byte[])record.value()).length;
                            }
                        }
                        topicOp.finishResponse();
                        topicOp.countBytesDone((long)sizeOfReadData);
                    }
                    this.completeOperation(topicOp, Operation.Status.SUCC);
                }
                catch (RuntimeException e) {
                    this.completeOperation(topicOp, Operation.Status.RESP_FAIL_UNKNOWN);
                }
                this.concurrencyThrottle.release();
            }
        }
        catch (Throwable thrown) {
            Exceptions.throwUncheckedIfInterrupted((Throwable)thrown);
            for (int i = 0; i < topicOps.size(); ++i) {
                Operation topicOp = (Operation)topicOps.get(i);
                this.completeFailedOperation(topicOp, thrown);
            }
            LogUtil.exception((Level)Level.DEBUG, (Throwable)thrown, (String)"{}: unexpected failure while trying to read {} topics", (Object[])new Object[]{this.stepId, topicOps.size()});
        }
    }

    void deleteTopics(List<O> topicOps) {
    }

    void completeOperation(O op, Operation.Status status) {
        op.status(status);
        this.handleCompleted((Operation)op);
    }

    void completeFailedOperation(O op, Throwable e) {
        LogUtil.exception((Level)Level.DEBUG, (Throwable)e, (String)"{}: operation failed: {}", (Object[])new Object[]{this.stepId, op});
        this.completeOperation(op, Operation.Status.FAIL_UNKNOWN);
    }

    Properties createConfig(String nodeAddr) {
        Properties producerConfig = new Properties();
        producerConfig.put("bootstrap.servers", nodeAddr);
        producerConfig.put("batch.size", (Object)this.batchSize);
        producerConfig.put("max.request.size", (Object)this.requestSizeLimit);
        producerConfig.put("buffer.memory", (Object)this.buffer);
        producerConfig.put("compression.type", this.compression);
        producerConfig.put("send.buffer.bytes", (Object)this.sndBuf);
        producerConfig.put("linger.ms", (Object)this.linger);
        producerConfig.put("receive.buffer.bytes", (Object)this.rcvBuf);
        try {
            producerConfig.put("key.serializer", Class.forName("org.apache.kafka.common.serialization.StringSerializer"));
            producerConfig.put("value.serializer", Class.forName("com.emc.mongoose.storage.driver.kafka.io.DataItemSerializer"));
        }
        catch (ClassNotFoundException e) {
            LogUtil.exception((Level)Level.DEBUG, (Throwable)e, (String)"{}: operation failed", (Object[])new Object[]{this.stepId});
        }
        return producerConfig;
    }

    Properties createConsumerConfig(String nodeAddr) {
        Properties consumerConfig = new Properties();
        consumerConfig.setProperty("client.id", String.valueOf(System.currentTimeMillis()));
        consumerConfig.put("bootstrap.servers", nodeAddr);
        consumerConfig.put("send.buffer.bytes", (Object)this.sndBuf);
        consumerConfig.put("receive.buffer.bytes", (Object)this.rcvBuf);
        consumerConfig.put("group.id", String.valueOf(System.currentTimeMillis()));
        consumerConfig.put("auto.offset.reset", "earliest");
        try {
            consumerConfig.put("key.deserializer", Class.forName("org.apache.kafka.common.serialization.StringDeserializer"));
            consumerConfig.put("value.deserializer", Class.forName("org.apache.kafka.common.serialization.ByteArrayDeserializer"));
        }
        catch (ClassNotFoundException e) {
            LogUtil.exception((Level)Level.DEBUG, (Throwable)e, (String)"{}: operation failed", (Object[])new Object[]{this.stepId});
        }
        return consumerConfig;
    }

    Properties createAdminClientConfig(String nodeAddr) {
        Properties adminClientConfig = new Properties();
        adminClientConfig.put("bootstrap.servers", nodeAddr);
        return adminClientConfig;
    }

    String nextEndpointAddr() {
        return this.endpointAddrs[this.rrc.getAndIncrement() % this.endpointAddrs.length];
    }

    protected boolean prepare(O operation) {
        super.prepare(operation);
        String endpointAddr = operation.nodeAddr();
        if (endpointAddr == null) {
            operation.nodeAddr(this.nextEndpointAddr());
        }
        return true;
    }

    protected String requestNewPath(String path) {
        throw new AssertionError((Object)"Should not be invoked");
    }

    protected void doClose() throws IOException, IllegalStateException {
        super.doClose();
        this.configCache.clear();
        this.adminClientCreateFuncCache.clear();
        this.adminClientCache.values().parallelStream().forEach(adminClient -> adminClient.close(Duration.ofSeconds(10L)));
        this.adminClientCache.clear();
        this.producerCreateFuncCache.clear();
        this.topicCreateFuncCache.clear();
        this.topicCache.clear();
    }

    protected String requestNewAuthToken(Credential credential) {
        throw new AssertionError((Object)"Should not be invoked");
    }

    public List<I> list(ItemFactory<I> itemFactory, String path, String prefix, int idRadix, I lastPrevItem, int count) throws IOException {
        if (this.listWasCalled) {
            throw new EOFException();
        }
        ArrayList<Item> buff = new ArrayList<Item>(1);
        buff.add(itemFactory.getItem(path + prefix, 0L, 0L));
        this.listWasCalled = true;
        return buff;
    }

    public void adjustIoBuffers(long avgTransferSize, OpType opType) {
    }

    final class IoWorkerThreadFactory
    extends ContextAwareThreadFactory {
        public IoWorkerThreadFactory() {
            super("io_worker_" + KafkaStorageDriver.this.stepId, true, ThreadContext.getContext());
        }

        public final Thread newThread(Runnable task) {
            return new IoWorkerThread(task, this.threadNamePrefix + "#" + this.threadNumber.incrementAndGet(), this.daemonFlag, exceptionHandler, this.threadContext);
        }

        final class IoWorkerThread
        extends ContextAwareThreadFactory.ContextAwareThread {
            public IoWorkerThread(Runnable task, String name, boolean daemonFlag, Thread.UncaughtExceptionHandler exceptionHandler, Map<String, String> threadContext) {
                super(task, name, daemonFlag, exceptionHandler, threadContext);
            }

            public final void interrupt() {
                Map<String, KafkaProducer> producerCache = KafkaStorageDriver.this.threadLocalProducerCache.get();
                producerCache.values().parallelStream().forEach(producer -> producer.close(Duration.ofSeconds(10L)));
                producerCache.clear();
                super.interrupt();
            }
        }
    }
}

