/*
 * Decompiled with CFR 0.152.
 */
package com.zendesk.maxwell.producer;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Descriptors;
import com.zendesk.maxwell.MaxwellContext;
import com.zendesk.maxwell.monitoring.Metrics;
import com.zendesk.maxwell.producer.AbstractAsyncProducer;
import com.zendesk.maxwell.producer.AppendContext;
import com.zendesk.maxwell.producer.BigQueryCallback;
import com.zendesk.maxwell.row.RowMap;
import com.zendesk.maxwell.schema.BqToBqStorageSchemaConverter;
import com.zendesk.maxwell.util.StoppableTask;
import com.zendesk.maxwell.util.StoppableTaskState;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import javax.annotation.concurrent.GuardedBy;
import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class MaxwellBigQueryProducerWorker
extends AbstractAsyncProducer
implements Runnable,
StoppableTask {
    static final Logger LOGGER = LoggerFactory.getLogger(MaxwellBigQueryProducerWorker.class);
    private final ArrayBlockingQueue<RowMap> queue;
    private StoppableTaskState taskState;
    private Thread thread;
    private final Object lock = new Object();
    @GuardedBy(value="lock")
    private RuntimeException error = null;
    private JsonStreamWriter streamWriter;

    public MaxwellBigQueryProducerWorker(MaxwellContext context, ArrayBlockingQueue<RowMap> queue, String bigQueryProjectId, String bigQueryDataset, String bigQueryTable) throws IOException {
        super(context);
        this.queue = queue;
        Metrics metrics = context.getMetrics();
        this.taskState = new StoppableTaskState("MaxwellBigQueryProducerWorker");
    }

    public Object getLock() {
        return this.lock;
    }

    public RuntimeException getError() {
        return this.error;
    }

    public void setError(RuntimeException error) {
        this.error = error;
    }

    private void covertJSONObjectFieldsToString(JSONObject record) {
        if (this.context.getConfig().outputConfig.includesPrimaryKeys) {
            record.put("primary_key", (Object)record.get("primary_key").toString());
        }
        String data = record.has("data") ? record.get("data").toString() : null;
        record.put("data", (Object)data);
        String old = record.has("old") ? record.get("old").toString() : null;
        record.put("old", (Object)old);
    }

    public void initialize(TableName tName) throws Descriptors.DescriptorValidationException, IOException, InterruptedException {
        BigQuery bigquery = (BigQuery)((BigQueryOptions.Builder)BigQueryOptions.newBuilder().setProjectId(tName.getProject())).build().getService();
        Table table = bigquery.getTable(tName.getDataset(), tName.getTable(), new BigQuery.TableOption[0]);
        Schema schema = table.getDefinition().getSchema();
        TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(schema);
        this.streamWriter = JsonStreamWriter.newBuilder((String)tName.toString(), (TableSchema)tableSchema).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void requestStop() throws Exception {
        this.taskState.requestStop();
        this.streamWriter.close();
        Object object = this.lock;
        synchronized (object) {
            if (this.error != null) {
                throw this.error;
            }
        }
    }

    @Override
    public void awaitStop(Long timeout) throws TimeoutException {
        this.taskState.awaitStop(this.thread, timeout);
    }

    @Override
    public void run() {
        this.thread = Thread.currentThread();
        try {
            while (true) {
                RowMap row = this.queue.take();
                if (!this.taskState.isRunning()) {
                    this.taskState.stopped();
                    return;
                }
                this.push(row);
            }
        }
        catch (Exception e) {
            this.taskState.stopped();
            this.context.terminate(e);
            return;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void sendAsync(RowMap r, AbstractAsyncProducer.CallbackCompleter cc) throws Exception {
        Object object = this.lock;
        synchronized (object) {
            if (this.error != null) {
                throw this.error;
            }
        }
        JSONArray jsonArr = new JSONArray();
        JSONObject record = new JSONObject(r.toJSON(this.outputConfig));
        this.covertJSONObjectFieldsToString(record);
        jsonArr.put((Object)record);
        AppendContext appendContext = new AppendContext(jsonArr, 0, r);
        ApiFuture future = this.streamWriter.append(appendContext.data);
        ApiFutures.addCallback((ApiFuture)future, (ApiFutureCallback)new BigQueryCallback(this, appendContext, cc, r.getNextPosition(), this.succeededMessageCount, this.failedMessageCount, this.succeededMessageMeter, this.failedMessageMeter, this.context), (Executor)MoreExecutors.directExecutor());
    }
}

