package com.cloudera.oryx.lambda;

import com.cloudera.oryx.common.lang.ClassUtils;
import com.cloudera.oryx.common.random.RandomManager;
import com.cloudera.oryx.common.settings.ConfigUtils;
import com.cloudera.oryx.kafka.util.KafkaUtils;
import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValue;
import java.io.Closeable;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.Decoder;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaCluster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Set;
import scala.collection.immutable.Set$;

/* loaded from: input_file:com/cloudera/oryx/lambda/AbstractSparkLayer.class */
public abstract class AbstractSparkLayer<K, M> implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(AbstractSparkLayer.class);
    private final Config config;
    private final String id;
    private final String streamingMaster;
    private final String inputTopic;
    private final String inputTopicLockMaster;
    private final String inputBroker;
    private final String updateTopic;
    private final String updateTopicLockMaster;
    private final Class<K> keyClass;
    private final Class<M> messageClass;
    private final Class<? extends Decoder<K>> keyDecoderClass;
    private final Class<? extends Decoder<M>> messageDecoderClass;
    private final int generationIntervalSec;
    private final Map<String, Object> extraSparkConfig;

    /* loaded from: input_file:com/cloudera/oryx/lambda/AbstractSparkLayer$MMDToTuple2Fn.class */
    public static final class MMDToTuple2Fn<K, M> implements PairFunction<MessageAndMetadata<K, M>, K, M> {
        public Tuple2<K, M> call(MessageAndMetadata<K, M> messageAndMetadata) {
            return new Tuple2<>(messageAndMetadata.key(), messageAndMetadata.message());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    public AbstractSparkLayer(Config config) {
        Objects.requireNonNull(config);
        log.info("Configuration:\n{}", ConfigUtils.prettyPrint(config));
        String configGroup = getConfigGroup();
        this.config = config;
        String optionalString = ConfigUtils.getOptionalString(config, "oryx.id");
        this.id = optionalString == null ? generateRandomID() : optionalString;
        this.streamingMaster = config.getString("oryx." + configGroup + ".streaming.master");
        this.inputTopic = config.getString("oryx.input-topic.message.topic");
        this.inputTopicLockMaster = config.getString("oryx.input-topic.lock.master");
        this.inputBroker = config.getString("oryx.input-topic.broker");
        this.updateTopic = ConfigUtils.getOptionalString(config, "oryx.update-topic.message.topic");
        this.updateTopicLockMaster = ConfigUtils.getOptionalString(config, "oryx.update-topic.lock.master");
        this.keyClass = ClassUtils.loadClass(config.getString("oryx.input-topic.message.key-class"));
        this.messageClass = ClassUtils.loadClass(config.getString("oryx.input-topic.message.message-class"));
        this.keyDecoderClass = ClassUtils.loadClass(config.getString("oryx.input-topic.message.key-decoder-class"), Decoder.class);
        this.messageDecoderClass = ClassUtils.loadClass(config.getString("oryx.input-topic.message.message-decoder-class"), Decoder.class);
        this.generationIntervalSec = config.getInt("oryx." + configGroup + ".streaming.generation-interval-sec");
        this.extraSparkConfig = new HashMap();
        for (Map.Entry entry : config.getConfig("oryx." + configGroup + ".streaming.config").entrySet()) {
            this.extraSparkConfig.put(entry.getKey(), ((ConfigValue) entry.getValue()).unwrapped());
        }
        Preconditions.checkArgument(this.generationIntervalSec > 0);
    }

    private static String generateRandomID() {
        return Integer.toString(RandomManager.getRandom().nextInt() & Integer.MAX_VALUE);
    }

    protected abstract String getConfigGroup();

    protected abstract String getLayerName();

    /* JADX INFO: Access modifiers changed from: protected */
    public final Config getConfig() {
        return this.config;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final String getID() {
        return this.id;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final String getGroupID() {
        return "OryxGroup-" + getLayerName() + "-" + getID();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final String getInputTopicLockMaster() {
        return this.inputTopicLockMaster;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Class<K> getKeyClass() {
        return this.keyClass;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Class<M> getMessageClass() {
        return this.messageClass;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final JavaStreamingContext buildStreamingContext() {
        log.info("Starting SparkContext with interval {} seconds", Integer.valueOf(this.generationIntervalSec));
        SparkConf sparkConf = new SparkConf();
        if (sparkConf.getOption("spark.master").isEmpty()) {
            log.info("Overriding master to {} for tests", this.streamingMaster);
            sparkConf.setMaster(this.streamingMaster);
        }
        if (sparkConf.getOption("spark.app.name").isEmpty()) {
            String str = "Oryx" + getLayerName();
            if (this.id != null) {
                str = str + "-" + this.id;
            }
            log.info("Overriding app name to {} for tests", str);
            sparkConf.setAppName(str);
        }
        for (Map.Entry<String, Object> entry : this.extraSparkConfig.entrySet()) {
            sparkConf.setIfMissing(entry.getKey(), entry.getValue().toString());
        }
        sparkConf.setIfMissing("spark.streaming.gracefulStopTimeout", Long.toString(TimeUnit.MILLISECONDS.convert(this.generationIntervalSec, TimeUnit.SECONDS)));
        sparkConf.setIfMissing("spark.cleaner.ttl", Integer.toString(20 * this.generationIntervalSec));
        return new JavaStreamingContext(JavaSparkContext.fromSparkContext(SparkContext.getOrCreate(sparkConf)), new Duration(TimeUnit.MILLISECONDS.convert(this.generationIntervalSec, TimeUnit.SECONDS)));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final JavaInputDStream<MessageAndMetadata<K, M>> buildInputDStream(JavaStreamingContext javaStreamingContext) {
        Preconditions.checkArgument(KafkaUtils.topicExists(this.inputTopicLockMaster, this.inputTopic), "Topic %s does not exist; did you create it?", new Object[]{this.inputTopic});
        if (this.updateTopic != null && this.updateTopicLockMaster != null) {
            Preconditions.checkArgument(KafkaUtils.topicExists(this.updateTopicLockMaster, this.updateTopic), "Topic %s does not exist; did you create it?", new Object[]{this.updateTopic});
        }
        HashMap hashMap = new HashMap();
        String groupID = getGroupID();
        hashMap.put("group.id", groupID);
        hashMap.put("auto.offset.reset", "largest");
        hashMap.put("metadata.broker.list", this.inputBroker);
        hashMap.put("bootstrap.servers", this.inputBroker);
        Map offsets = KafkaUtils.getOffsets(this.inputTopicLockMaster, groupID, this.inputTopic);
        fillInLatestOffsets(offsets, hashMap);
        log.info("Initial offsets: {}", offsets);
        return org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(javaStreamingContext, this.keyClass, this.messageClass, this.keyDecoderClass, this.messageDecoderClass, MessageAndMetadata.class, hashMap, offsets, Functions.identity());
    }

    private static void fillInLatestOffsets(Map<TopicAndPartition, Long> map, Map<String, String> map2) {
        KafkaCluster kafkaCluster = new KafkaCluster(Map$.MODULE$.apply(JavaConversions.mapAsScalaMap(map2).toSeq()));
        for (Map.Entry<TopicAndPartition, ?> entry : getLeaderOffsets(kafkaCluster, map, true, false).entrySet()) {
            TopicAndPartition key = entry.getKey();
            long readOffset = readOffset(entry.getValue());
            log.info("No initial offsets for {}; using latest offset {} from topic", key, Long.valueOf(readOffset));
            map.put(key, Long.valueOf(readOffset));
        }
        for (Map.Entry<TopicAndPartition, ?> entry2 : getLeaderOffsets(kafkaCluster, map, false, true).entrySet()) {
            TopicAndPartition key2 = entry2.getKey();
            long readOffset2 = readOffset(entry2.getValue());
            long longValue = map.get(key2).longValue();
            if (longValue < readOffset2) {
                log.warn("Initial offset {} for {} before earliest offset {} from topic! using topic offset", new Object[]{Long.valueOf(longValue), key2, Long.valueOf(readOffset2)});
                map.put(key2, Long.valueOf(readOffset2));
            }
        }
        for (Map.Entry<TopicAndPartition, ?> entry3 : getLeaderOffsets(kafkaCluster, map, false, false).entrySet()) {
            TopicAndPartition key3 = entry3.getKey();
            long readOffset3 = readOffset(entry3.getValue());
            long longValue2 = map.get(key3).longValue();
            if (longValue2 > readOffset3) {
                log.warn("Initial offset {} for {} after latest offset {} from topic! using topic offset", new Object[]{Long.valueOf(longValue2), key3, Long.valueOf(readOffset3)});
                map.put(key3, Long.valueOf(readOffset3));
            }
        }
    }

    private static Map<TopicAndPartition, ?> getLeaderOffsets(KafkaCluster kafkaCluster, Map<TopicAndPartition, Long> map, boolean z, boolean z2) {
        HashSet hashSet = new HashSet();
        for (Map.Entry<TopicAndPartition, Long> entry : map.entrySet()) {
            if ((entry.getValue() == null) == z) {
                hashSet.add(entry.getKey());
            }
        }
        if (hashSet.isEmpty()) {
            return Collections.emptyMap();
        }
        Set apply = Set$.MODULE$.apply(JavaConversions.asScalaSet(hashSet).toSeq());
        return JavaConversions.mapAsJavaMap((scala.collection.Map) (z2 ? kafkaCluster.getEarliestLeaderOffsets(apply) : kafkaCluster.getLatestLeaderOffsets(apply)).right().get());
    }

    private static long readOffset(Object obj) {
        Matcher matcher = Pattern.compile("LeaderOffset\\([^,]+,[^,]+,([^)]+)\\)").matcher(obj.toString());
        Preconditions.checkState(matcher.matches());
        return Long.parseLong(matcher.group(1));
    }
}
