package com.cloudera.oryx.lambda.speed;

import com.cloudera.oryx.api.KeyMessage;
import com.cloudera.oryx.api.KeyMessageImpl;
import com.cloudera.oryx.api.speed.ScalaSpeedModelManager;
import com.cloudera.oryx.api.speed.SpeedModelManager;
import com.cloudera.oryx.common.lang.ClassUtils;
import com.cloudera.oryx.common.lang.LoggingRunnable;
import com.cloudera.oryx.common.settings.ConfigUtils;
import com.cloudera.oryx.lambda.AbstractSparkLayer;
import com.cloudera.oryx.lambda.UpdateOffsetsFn;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.typesafe.config.Config;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.Decoder;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/oryx/lambda/speed/SpeedLayer.class */
public final class SpeedLayer<K, M, U> extends AbstractSparkLayer<K, M> {
    private static final Logger log = LoggerFactory.getLogger(SpeedLayer.class);
    private final String updateBroker;
    private final String updateTopic;
    private final int maxMessageSize;
    private final String updateTopicLockMaster;
    private final String modelManagerClassName;
    private final Class<? extends Decoder<U>> updateDecoderClass;
    private JavaStreamingContext streamingContext;
    private ConsumerConnector consumer;
    private SpeedModelManager<K, M, U> modelManager;

    public SpeedLayer(Config config) {
        super(config);
        this.updateBroker = config.getString("oryx.update-topic.broker");
        this.updateTopic = config.getString("oryx.update-topic.message.topic");
        this.maxMessageSize = config.getInt("oryx.update-topic.message.max-size");
        this.updateTopicLockMaster = config.getString("oryx.update-topic.lock.master");
        this.modelManagerClassName = config.getString("oryx.speed.model-manager-class");
        this.updateDecoderClass = ClassUtils.loadClass(config.getString("oryx.update-topic.message.decoder-class"), Decoder.class);
        Preconditions.checkArgument(this.maxMessageSize > 0);
    }

    @Override // com.cloudera.oryx.lambda.AbstractSparkLayer
    protected String getConfigGroup() {
        return "speed";
    }

    @Override // com.cloudera.oryx.lambda.AbstractSparkLayer
    protected String getLayerName() {
        return "SpeedLayer";
    }

    public synchronized void start() {
        String id = getID();
        if (id != null) {
            log.info("Starting Speed Layer {}", id);
        }
        this.streamingContext = buildStreamingContext();
        log.info("Creating message stream from topic");
        JavaInputDStream<MessageAndMetadata<K, M>> buildInputDStream = buildInputDStream(this.streamingContext);
        JavaPairDStream mapToPair = buildInputDStream.mapToPair(new AbstractSparkLayer.MMDToTuple2Fn());
        this.consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(ConfigUtils.keyValueToProperties(new Object[]{"group.id", "OryxGroup-" + getLayerName() + "-" + UUID.randomUUID(), "zookeeper.connect", this.updateTopicLockMaster, "fetch.message.max.bytes", Integer.valueOf(this.maxMessageSize), "auto.offset.reset", "smallest"})));
        final Iterator transform = Iterators.transform(((KafkaStream) ((List) this.consumer.createMessageStreams(Collections.singletonMap(this.updateTopic, 1), new StringDecoder((VerifiableProperties) null), loadDecoderInstance()).get(this.updateTopic)).get(0)).iterator(), new Function<MessageAndMetadata<String, U>, KeyMessage<String, U>>() { // from class: com.cloudera.oryx.lambda.speed.SpeedLayer.1
            public KeyMessage<String, U> apply(MessageAndMetadata<String, U> messageAndMetadata) {
                return new KeyMessageImpl(messageAndMetadata.key(), messageAndMetadata.message());
            }
        });
        this.modelManager = loadManagerInstance();
        new Thread((Runnable) new LoggingRunnable() { // from class: com.cloudera.oryx.lambda.speed.SpeedLayer.2
            public void doRun() {
                try {
                    SpeedLayer.this.modelManager.consume(transform, SpeedLayer.this.streamingContext.sparkContext().hadoopConfiguration());
                } catch (Throwable th) {
                    SpeedLayer.log.error("Error while consuming updates", th);
                    SpeedLayer.this.close();
                }
            }
        }, "OryxSpeedLayerUpdateConsumerThread").start();
        mapToPair.foreachRDD(new SpeedLayerUpdate(this.modelManager, this.updateBroker, this.updateTopic));
        buildInputDStream.foreachRDD(new UpdateOffsetsFn(getGroupID(), getInputTopicLockMaster()));
        log.info("Starting Spark Streaming");
        this.streamingContext.start();
    }

    public void await() {
        Preconditions.checkState(this.streamingContext != null);
        log.info("Spark Streaming is running");
        this.streamingContext.awaitTermination();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.modelManager != null) {
            log.info("Shutting down model manager");
            this.modelManager.close();
            this.modelManager = null;
        }
        if (this.consumer != null) {
            log.info("Shutting down consumer");
            this.consumer.commitOffsets();
            this.consumer.shutdown();
            this.consumer = null;
        }
        if (this.streamingContext != null) {
            log.info("Shutting down Spark Streaming; this may take some time");
            this.streamingContext.stop(true, true);
            this.streamingContext = null;
        }
    }

    private SpeedModelManager<K, M, U> loadManagerInstance() {
        Class loadClass = ClassUtils.loadClass(this.modelManagerClassName);
        if (SpeedModelManager.class.isAssignableFrom(loadClass)) {
            try {
                return (SpeedModelManager) ClassUtils.loadInstanceOf(this.modelManagerClassName, SpeedModelManager.class, new Class[]{Config.class}, new Object[]{getConfig()});
            } catch (IllegalArgumentException e) {
                return (SpeedModelManager) ClassUtils.loadInstanceOf(this.modelManagerClassName, SpeedModelManager.class);
            }
        }
        if (!ScalaSpeedModelManager.class.isAssignableFrom(loadClass)) {
            throw new IllegalArgumentException("Bad manager class: " + loadClass);
        }
        try {
            return new ScalaSpeedModelManagerAdapter((ScalaSpeedModelManager) ClassUtils.loadInstanceOf(this.modelManagerClassName, ScalaSpeedModelManager.class, new Class[]{Config.class}, new Object[]{getConfig()}));
        } catch (IllegalArgumentException e2) {
            return new ScalaSpeedModelManagerAdapter((ScalaSpeedModelManager) ClassUtils.loadInstanceOf(this.modelManagerClassName, ScalaSpeedModelManager.class));
        }
    }

    private Decoder<U> loadDecoderInstance() {
        try {
            return (Decoder) ClassUtils.loadInstanceOf(this.updateDecoderClass);
        } catch (IllegalArgumentException e) {
            return (Decoder) ClassUtils.loadInstanceOf(this.updateDecoderClass.getName(), this.updateDecoderClass, new Class[]{VerifiableProperties.class}, new Object[]{null});
        }
    }
}
