/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.transforms;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.apache.kafka.connect.transforms.util.SchemaUtil;
import org.apache.kafka.connect.transforms.util.SimpleConfig;

public abstract class InsertField<R extends ConnectRecord<R>>
implements Transformation<R>,
Versioned {
    public static final String OVERVIEW_DOC = "Insert field(s) using attributes from the record metadata or a configured static value.<p/>Use the concrete transformation type designed for the record key (<code>" + Key.class.getName() + "</code>) or value (<code>" + Value.class.getName() + "</code>).";
    private static final String OPTIONALITY_DOC = "Suffix with <code>!</code> to make this a required field, or <code>?</code> to keep it optional (the default).";
    public static final ConfigDef CONFIG_DEF = new ConfigDef().define("topic.field", ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Field name for Kafka topic. Suffix with <code>!</code> to make this a required field, or <code>?</code> to keep it optional (the default).").define("partition.field", ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Field name for Kafka partition. Suffix with <code>!</code> to make this a required field, or <code>?</code> to keep it optional (the default).").define("offset.field", ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Field name for Kafka offset - only applicable to sink connectors.<br/>Suffix with <code>!</code> to make this a required field, or <code>?</code> to keep it optional (the default).").define("timestamp.field", ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Field name for record timestamp. Suffix with <code>!</code> to make this a required field, or <code>?</code> to keep it optional (the default).").define("static.field", ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Field name for static data field. Suffix with <code>!</code> to make this a required field, or <code>?</code> to keep it optional (the default).").define("static.value", ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Static field value, if field name configured.");
    private static final String PURPOSE = "field insertion";
    private static final Schema OPTIONAL_TIMESTAMP_SCHEMA = Timestamp.builder().optional().build();
    private InsertionSpec topicField;
    private InsertionSpec partitionField;
    private InsertionSpec offsetField;
    private InsertionSpec timestampField;
    private InsertionSpec staticField;
    private String staticValue;
    private Cache<Schema, Schema> schemaUpdateCache;

    public String version() {
        return AppInfoParser.getVersion();
    }

    public void configure(Map<String, ?> props) {
        SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
        this.topicField = InsertionSpec.parse(config.getString("topic.field"));
        this.partitionField = InsertionSpec.parse(config.getString("partition.field"));
        this.offsetField = InsertionSpec.parse(config.getString("offset.field"));
        this.timestampField = InsertionSpec.parse(config.getString("timestamp.field"));
        this.staticField = InsertionSpec.parse(config.getString("static.field"));
        this.staticValue = config.getString("static.value");
        if (this.topicField == null && this.partitionField == null && this.offsetField == null && this.timestampField == null && this.staticField == null) {
            throw new ConfigException("No field insertion configured");
        }
        if (this.staticField != null && this.staticValue == null) {
            throw new ConfigException("static.value", null, "No value specified for static field: " + this.staticField);
        }
        this.schemaUpdateCache = new SynchronizedCache((Cache)new LRUCache(16));
    }

    public R apply(R record) {
        if (this.operatingValue(record) == null) {
            return record;
        }
        if (this.operatingSchema(record) == null) {
            return this.applySchemaless(record);
        }
        return this.applyWithSchema(record);
    }

    private R applySchemaless(R record) {
        Map<String, Object> value = Requirements.requireMap(this.operatingValue(record), PURPOSE);
        HashMap<String, Object> updatedValue = new HashMap<String, Object>(value);
        if (this.topicField != null) {
            updatedValue.put(this.topicField.name, record.topic());
        }
        if (this.partitionField != null && record.kafkaPartition() != null) {
            updatedValue.put(this.partitionField.name, record.kafkaPartition());
        }
        if (this.offsetField != null) {
            updatedValue.put(this.offsetField.name, Requirements.requireSinkRecord(record, PURPOSE).kafkaOffset());
        }
        if (this.timestampField != null && record.timestamp() != null) {
            updatedValue.put(this.timestampField.name, record.timestamp());
        }
        if (this.staticField != null && this.staticValue != null) {
            updatedValue.put(this.staticField.name, this.staticValue);
        }
        return this.newRecord(record, null, updatedValue);
    }

    private R applyWithSchema(R record) {
        Struct value = Requirements.requireStruct(this.operatingValue(record), PURPOSE);
        Schema updatedSchema = (Schema)this.schemaUpdateCache.get((Object)value.schema());
        if (updatedSchema == null) {
            updatedSchema = this.makeUpdatedSchema(value.schema());
            this.schemaUpdateCache.put((Object)value.schema(), (Object)updatedSchema);
        }
        Struct updatedValue = new Struct(updatedSchema);
        for (Field field : value.schema().fields()) {
            updatedValue.put(field.name(), value.get(field));
        }
        if (this.topicField != null) {
            updatedValue.put(this.topicField.name, (Object)record.topic());
        }
        if (this.partitionField != null && record.kafkaPartition() != null) {
            updatedValue.put(this.partitionField.name, (Object)record.kafkaPartition());
        }
        if (this.offsetField != null) {
            updatedValue.put(this.offsetField.name, (Object)Requirements.requireSinkRecord(record, PURPOSE).kafkaOffset());
        }
        if (this.timestampField != null && record.timestamp() != null) {
            updatedValue.put(this.timestampField.name, (Object)new Date(record.timestamp()));
        }
        if (this.staticField != null && this.staticValue != null) {
            updatedValue.put(this.staticField.name, (Object)this.staticValue);
        }
        return this.newRecord(record, updatedSchema, updatedValue);
    }

    private Schema makeUpdatedSchema(Schema schema) {
        SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());
        for (Field field : schema.fields()) {
            builder.field(field.name(), field.schema());
        }
        if (this.topicField != null) {
            builder.field(this.topicField.name, this.topicField.optional ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA);
        }
        if (this.partitionField != null) {
            builder.field(this.partitionField.name, this.partitionField.optional ? Schema.OPTIONAL_INT32_SCHEMA : Schema.INT32_SCHEMA);
        }
        if (this.offsetField != null) {
            builder.field(this.offsetField.name, this.offsetField.optional ? Schema.OPTIONAL_INT64_SCHEMA : Schema.INT64_SCHEMA);
        }
        if (this.timestampField != null) {
            builder.field(this.timestampField.name, this.timestampField.optional ? OPTIONAL_TIMESTAMP_SCHEMA : Timestamp.SCHEMA);
        }
        if (this.staticField != null) {
            builder.field(this.staticField.name, this.staticField.optional ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA);
        }
        return builder.build();
    }

    public void close() {
        this.schemaUpdateCache = null;
    }

    public ConfigDef config() {
        return CONFIG_DEF;
    }

    protected abstract Schema operatingSchema(R var1);

    protected abstract Object operatingValue(R var1);

    protected abstract R newRecord(R var1, Schema var2, Object var3);

    private static interface ConfigName {
        public static final String TOPIC_FIELD = "topic.field";
        public static final String PARTITION_FIELD = "partition.field";
        public static final String OFFSET_FIELD = "offset.field";
        public static final String TIMESTAMP_FIELD = "timestamp.field";
        public static final String STATIC_FIELD = "static.field";
        public static final String STATIC_VALUE = "static.value";
    }

    private static final class InsertionSpec {
        final String name;
        final boolean optional;

        private InsertionSpec(String name, boolean optional) {
            this.name = name;
            this.optional = optional;
        }

        public static InsertionSpec parse(String spec) {
            if (spec == null) {
                return null;
            }
            if (spec.endsWith("?")) {
                return new InsertionSpec(spec.substring(0, spec.length() - 1), true);
            }
            if (spec.endsWith("!")) {
                return new InsertionSpec(spec.substring(0, spec.length() - 1), false);
            }
            return new InsertionSpec(spec, true);
        }
    }

    public static class Key<R extends ConnectRecord<R>>
    extends InsertField<R> {
        @Override
        protected Schema operatingSchema(R record) {
            return record.keySchema();
        }

        @Override
        protected Object operatingValue(R record) {
            return record.key();
        }

        @Override
        protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
            return (R)record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp());
        }
    }

    public static class Value<R extends ConnectRecord<R>>
    extends InsertField<R> {
        @Override
        protected Schema operatingSchema(R record) {
            return record.valueSchema();
        }

        @Override
        protected Object operatingValue(R record) {
            return record.value();
        }

        @Override
        protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
            return (R)record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp());
        }
    }
}

