/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.tools;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.tools.SchemaSourceConnector;
import org.apache.kafka.tools.ThroughputThrottler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SchemaSourceTask
extends SourceTask {
    private static final Logger log = LoggerFactory.getLogger(SchemaSourceTask.class);
    public static final String NAME_CONFIG = "name";
    public static final String ID_CONFIG = "id";
    public static final String TOPIC_CONFIG = "topic";
    public static final String NUM_MSGS_CONFIG = "num.messages";
    public static final String THROUGHPUT_CONFIG = "throughput";
    public static final String MULTIPLE_SCHEMA_CONFIG = "multiple.schema";
    public static final String PARTITION_COUNT_CONFIG = "partition.count";
    private static final String ID_FIELD = "id";
    private static final String SEQNO_FIELD = "seqno";
    private ThroughputThrottler throttler;
    private String name;
    private int id;
    private String topic;
    private Map<String, Integer> partition;
    private long startingSeqno;
    private long seqno;
    private long count;
    private long maxNumMsgs;
    private boolean multipleSchema;
    private int partitionCount;
    private static Schema valueSchema = SchemaBuilder.struct().version(Integer.valueOf(1)).name("record").field("boolean", Schema.BOOLEAN_SCHEMA).field("int", Schema.INT32_SCHEMA).field("long", Schema.INT64_SCHEMA).field("float", Schema.FLOAT32_SCHEMA).field("double", Schema.FLOAT64_SCHEMA).field("partitioning", Schema.INT32_SCHEMA).field("id", Schema.INT32_SCHEMA).field("seqno", Schema.INT64_SCHEMA).build();
    private static Schema valueSchema2 = SchemaBuilder.struct().version(Integer.valueOf(2)).name("record").field("boolean", Schema.BOOLEAN_SCHEMA).field("int", Schema.INT32_SCHEMA).field("long", Schema.INT64_SCHEMA).field("float", Schema.FLOAT32_SCHEMA).field("double", Schema.FLOAT64_SCHEMA).field("partitioning", Schema.INT32_SCHEMA).field("string", SchemaBuilder.string().defaultValue((Object)"abc").build()).field("id", Schema.INT32_SCHEMA).field("seqno", Schema.INT64_SCHEMA).build();

    public String version() {
        return new SchemaSourceConnector().version();
    }

    public void start(Map<String, String> props) {
        long throughput;
        try {
            this.name = props.get(NAME_CONFIG);
            this.id = Integer.parseInt(props.get("id"));
            this.topic = props.get(TOPIC_CONFIG);
            this.maxNumMsgs = Long.parseLong(props.get(NUM_MSGS_CONFIG));
            this.multipleSchema = Boolean.parseBoolean(props.get(MULTIPLE_SCHEMA_CONFIG));
            this.partitionCount = Integer.parseInt(props.containsKey(PARTITION_COUNT_CONFIG) ? props.get(PARTITION_COUNT_CONFIG) : "1");
            throughput = Long.parseLong(props.get(THROUGHPUT_CONFIG));
        }
        catch (NumberFormatException e) {
            throw new ConnectException("Invalid SchemaSourceTask configuration", (Throwable)e);
        }
        this.throttler = new ThroughputThrottler(throughput, System.currentTimeMillis());
        this.partition = Collections.singletonMap("id", this.id);
        Map previousOffset = this.context.offsetStorageReader().offset(this.partition);
        this.seqno = previousOffset != null ? (Long)previousOffset.get(SEQNO_FIELD) + 1L : 0L;
        this.startingSeqno = this.seqno;
        this.count = 0L;
        log.info("Started SchemaSourceTask {}-{} producing to topic {} resuming from seqno {}", new Object[]{this.name, this.id, this.topic, this.startingSeqno});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<SourceRecord> poll() throws InterruptedException {
        if (this.count < this.maxNumMsgs) {
            SourceRecord srcRecord;
            long sendStartMs = System.currentTimeMillis();
            if (this.throttler.shouldThrottle(this.seqno - this.startingSeqno, sendStartMs)) {
                this.throttler.throttle();
            }
            Map<String, Long> ccOffset = Collections.singletonMap(SEQNO_FIELD, this.seqno);
            int partitionVal = (int)(this.seqno % (long)this.partitionCount);
            if (!this.multipleSchema || this.count % 2L == 0L) {
                Struct data = new Struct(valueSchema).put("boolean", (Object)true).put("int", (Object)12).put("long", (Object)12L).put("float", (Object)Float.valueOf(12.2f)).put("double", (Object)12.2).put("partitioning", (Object)partitionVal).put("id", (Object)this.id).put(SEQNO_FIELD, (Object)this.seqno);
                srcRecord = new SourceRecord(this.partition, ccOffset, this.topic, Integer.valueOf(this.id), Schema.STRING_SCHEMA, (Object)"key", valueSchema, (Object)data);
            } else {
                Struct data = new Struct(valueSchema2).put("boolean", (Object)true).put("int", (Object)12).put("long", (Object)12L).put("float", (Object)Float.valueOf(12.2f)).put("double", (Object)12.2).put("partitioning", (Object)partitionVal).put("string", (Object)"def").put("id", (Object)this.id).put(SEQNO_FIELD, (Object)this.seqno);
                srcRecord = new SourceRecord(this.partition, ccOffset, this.topic, Integer.valueOf(this.id), Schema.STRING_SCHEMA, (Object)"key", valueSchema2, (Object)data);
            }
            System.out.println("{\"task\": " + this.id + ", \"seqno\": " + this.seqno + "}");
            List<SourceRecord> result = Arrays.asList(srcRecord);
            ++this.seqno;
            ++this.count;
            return result;
        }
        SchemaSourceTask schemaSourceTask = this;
        synchronized (schemaSourceTask) {
            ((Object)((Object)this)).wait();
        }
        return new ArrayList<SourceRecord>();
    }

    public void stop() {
        this.throttler.wakeup();
    }
}

