package org.apache.spark.streaming.kafka;

import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer$;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerConnector;
import kafka.consumer.KafkaStream;
import kafka.serializer.Decoder;
import kafka.utils.VerifiableProperties;
import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;
import org.apache.spark.Logging;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import scala.Function0;
import scala.Predef$;
import scala.collection.Map;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag;
import scala.reflect.Manifest;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaInputDStream.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005=e!B\u0001\u0003\u0001\u0011a!!D&bM.\f'+Z2fSZ,'O\u0003\u0002\u0004\t\u0005)1.\u00194lC*\u0011QAB\u0001\ngR\u0014X-Y7j]\u001eT!a\u0002\u0005\u0002\u000bM\u0004\u0018M]6\u000b\u0005%Q\u0011AB1qC\u000eDWMC\u0001\f\u0003\ry'oZ\u000b\u0006\u001b\u001d\u000b\u0016l[\n\u0004\u00019Q\u0002cA\b\u0013)5\t\u0001C\u0003\u0002\u0012\t\u0005A!/Z2fSZ,'/\u0003\u0002\u0014!\tA!+Z2fSZ,'\u000f\u0005\u0002\u001615\taCC\u0001\u0018\u0003\u0015\u00198-\u00197b\u0013\tIbCA\u0002B]f\u0004\"a\u0007\u000f\u000e\u0003\u0019I!!\b\u0004\u0003\u000f1{wmZ5oO\"Aq\u0004\u0001B\u0001B\u0003%\u0011%A\u0006lC\u001a\\\u0017\rU1sC6\u001c8\u0001\u0001\t\u0005E\u0015:s%D\u0001$\u0015\t!c#\u0001\u0006d_2dWm\u0019;j_:L!AJ\u0012\u0003\u00075\u000b\u0007\u000f\u0005\u0002)W9\u0011Q#K\u0005\u0003UY\ta\u0001\u0015:fI\u00164\u0017B\u0001\u0017.\u0005\u0019\u0019FO]5oO*\u0011!F\u0006\u0005\t_\u0001\u0011\t\u0011)A\u0005a\u00051Ao\u001c9jGN\u0004BAI\u0013(cA\u0011QCM\u0005\u0003gY\u00111!\u00138u\u0011%)\u0004A!A!\u0002\u00131D(\u0001\u0007ti>\u0014\u0018mZ3MKZ,G\u000e\u0005\u00028u5\t\u0001H\u0003\u0002:\r\u000591\u000f^8sC\u001e,\u0017BA\u001e9\u00051\u0019Fo\u001c:bO\u0016dUM^3m\u0013\t)$\u0003\u0003\u0005?\u0001\t\r\t\u0015a\u0003@\u0003))g/\u001b3f]\u000e,G%\u000e\t\u0004\u0001\u000e+U\"A!\u000b\u0005\t3\u0012a\u0002:fM2,7\r^\u0005\u0003\t\u0006\u0013\u0001b\u00117bgN$\u0016m\u001a\t\u0003\r\u001ec\u0001\u0001B\u0003I\u0001\t\u0007\u0011JA\u0001L#\tQE\u0003\u0005\u0002\u0016\u0017&\u0011AJ\u0006\u0002\b\u001d>$\b.\u001b8h\u0011!q\u0005AaA!\u0002\u0017y\u0015AC3wS\u0012,gnY3%mA\u0019\u0001i\u0011)\u0011\u0005\u0019\u000bF!\u0002*\u0001\u0005\u0004I%!\u0001,\t\u0011Q\u0003!1!Q\u0001\fU\u000b!\"\u001a<jI\u0016t7-\u001a\u00138!\rAc\u000bW\u0005\u0003/6\u0012\u0001\"T1oS\u001a,7\u000f\u001e\t\u0003\rf#QA\u0017\u0001C\u0002m\u0013\u0011!V\t\u0003\u0015r\u0003$!X3\u0011\u0007y\u0013G-D\u0001`\u0015\t\u0001\u0017-\u0001\u0006tKJL\u0017\r\\5{KJT\u0011aA\u0005\u0003G~\u0013q\u0001R3d_\u0012,'\u000f\u0005\u0002GK\u0012Ia-WA\u0001\u0002\u0003\u0015\t!\u0013\u0002\u0004?\u0012\u001a\u0004\u0002\u00035\u0001\u0005\u0007\u0005\u000b1B5\u0002\u0015\u00154\u0018\u000eZ3oG\u0016$\u0003\bE\u0002)-*\u0004\"AR6\u0005\u000b1\u0004!\u0019A7\u0003\u0003Q\u000b\"A\u001381\u0005=\f\bc\u00010caB\u0011a)\u001d\u0003\ne.\f\t\u0011!A\u0003\u0002%\u00131a\u0018\u00135\u0011\u0015!\b\u0001\"\u0001v\u0003\u0019a\u0014N\\5u}Q!a/ @��)\u00159\u0018P_>}!\u0019A\b!\u0012)YU6\t!\u0001C\u0003?g\u0002\u000fq\bC\u0003Og\u0002\u000fq\nC\u0003Ug\u0002\u000fQ\u000bC\u0003ig\u0002\u000f\u0011\u000eC\u0003 g\u0002\u0007\u0011\u0005C\u00030g\u0002\u0007\u0001\u0007C\u00036g\u0002\u0007a\u0007C\u0005\u0002\u0004\u0001\u0001\r\u0011\"\u0001\u0002\u0006\u0005\t2m\u001c8tk6,'oQ8o]\u0016\u001cGo\u001c:\u0016\u0005\u0005\u001d\u0001\u0003BA\u0005\u0003\u001fi!!a\u0003\u000b\u0007\u00055\u0011-\u0001\u0005d_:\u001cX/\\3s\u0013\u0011\t\t\"a\u0003\u0003#\r{gn];nKJ\u001cuN\u001c8fGR|'\u000fC\u0005\u0002\u0016\u0001\u0001\r\u0011\"\u0001\u0002\u0018\u0005)2m\u001c8tk6,'oQ8o]\u0016\u001cGo\u001c:`I\u0015\fH\u0003BA\r\u0003?\u00012!FA\u000e\u0013\r\tiB\u0006\u0002\u0005+:LG\u000f\u0003\u0006\u0002\"\u0005M\u0011\u0011!a\u0001\u0003\u000f\t1\u0001\u001f\u00132\u0011!\t)\u0003\u0001Q!\n\u0005\u001d\u0011AE2p]N,X.\u001a:D_:tWm\u0019;pe\u0002Bq!!\u000b\u0001\t\u0003\tY#\u0001\u0004p]N#x\u000e\u001d\u000b\u0003\u00033Aq!a\f\u0001\t\u0003\tY#A\u0004p]N#\u0018M\u001d;\u0007\r\u0005M\u0002\u0001BA\u001b\u00059iUm]:bO\u0016D\u0015M\u001c3mKJ,b!a\u000e\u0002\\\u0005}3CBA\u0019\u0003s\tI\u0005\u0005\u0003\u0002<\u0005\u0015SBAA\u001f\u0015\u0011\ty$!\u0011\u0002\t1\fgn\u001a\u0006\u0003\u0003\u0007\nAA[1wC&!\u0011qIA\u001f\u0005\u0019y%M[3diB!\u00111HA&\u0013\u0011\ti%!\u0010\u0003\u0011I+hN\\1cY\u0016D1\"!\u0015\u00022\t\u0005\t\u0015!\u0003\u0002T\u000511\u000f\u001e:fC6\u0004\u0002\"!\u0003\u0002V\u0005e\u0013QL\u0005\u0005\u0003/\nYAA\u0006LC\u001a\\\u0017m\u0015;sK\u0006l\u0007c\u0001$\u0002\\\u00111\u0001*!\rC\u0002%\u00032ARA0\t\u0019\u0011\u0016\u0011\u0007b\u0001\u0013\"Y\u00111MA\u0019\u0005\u0007\u0005\u000b1BA3\u0003))g/\u001b3f]\u000e,G%\u000f\t\u0005\u0001\u000e\u000bI\u0006C\u0006\u0002j\u0005E\"1!Q\u0001\f\u0005-\u0014aC3wS\u0012,gnY3%cA\u0002B\u0001Q\"\u0002^!9A/!\r\u0005\u0002\u0005=D\u0003BA9\u0003w\"b!a\u001d\u0002x\u0005e\u0004\u0003CA;\u0003c\tI&!\u0018\u000e\u0003\u0001A\u0001\"a\u0019\u0002n\u0001\u000f\u0011Q\r\u0005\t\u0003S\ni\u0007q\u0001\u0002l!A\u0011\u0011KA7\u0001\u0004\t\u0019\u0006\u0003\u0005\u0002��\u0005EB\u0011AA\u0016\u0003\r\u0011XO\u001c\u0005\b\u0003\u0007\u0003A\u0011BAC\u0003\u0001\"(/\u001f.p_.,W\r]3s\u0007>t7/^7fe\u001e\u0013x.\u001e9DY\u0016\fg.\u001e9\u0015\r\u0005e\u0011qQAF\u0011\u001d\tI)!!A\u0002\u001d\nQA_6Ve2Dq!!$\u0002\u0002\u0002\u0007q%A\u0004he>,\b/\u00133")
/* loaded from: input_file:org/apache/spark/streaming/kafka/KafkaReceiver.class */
public class KafkaReceiver<K, V, U extends Decoder<?>, T extends Decoder<?>> extends Receiver<Object> implements Logging {
    public final Map<String, String> org$apache$spark$streaming$kafka$KafkaReceiver$$kafkaParams;
    private final Map<String, Object> topics;
    public final ClassTag<K> org$apache$spark$streaming$kafka$KafkaReceiver$$evidence$5;
    public final ClassTag<V> org$apache$spark$streaming$kafka$KafkaReceiver$$evidence$6;
    private final Manifest<U> evidence$7;
    private final Manifest<T> evidence$8;
    private ConsumerConnector consumerConnector;
    private transient Logger org$apache$spark$Logging$$log_;

    /* compiled from: KafkaInputDStream.scala */
    /* loaded from: input_file:org/apache/spark/streaming/kafka/KafkaReceiver$MessageHandler.class */
    public class MessageHandler<K, V> implements Runnable {
        private final KafkaStream<K, V> stream;
        public final /* synthetic */ KafkaReceiver $outer;

        @Override // java.lang.Runnable
        public void run() {
            org$apache$spark$streaming$kafka$KafkaReceiver$MessageHandler$$$outer().logInfo(new KafkaReceiver$MessageHandler$$anonfun$run$1(this));
            this.stream.foreach(new KafkaReceiver$MessageHandler$$anonfun$run$2(this));
        }

        public /* synthetic */ KafkaReceiver org$apache$spark$streaming$kafka$KafkaReceiver$MessageHandler$$$outer() {
            return this.$outer;
        }

        public MessageHandler(KafkaReceiver<K, V, U, T> kafkaReceiver, KafkaStream<K, V> kafkaStream, ClassTag<K> classTag, ClassTag<V> classTag2) {
            this.stream = kafkaStream;
            if (kafkaReceiver == null) {
                throw new NullPointerException();
            }
            this.$outer = kafkaReceiver;
        }
    }

    public Logger org$apache$spark$Logging$$log_() {
        return this.org$apache$spark$Logging$$log_;
    }

    public void org$apache$spark$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$Logging$$log_ = logger;
    }

    public Logger log() {
        return Logging.class.log(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.class.logInfo(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.class.logDebug(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.class.logTrace(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.class.logWarning(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.class.logError(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.class.logInfo(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.class.logDebug(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.class.logTrace(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.class.logWarning(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.class.logError(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled(this);
    }

    public ConsumerConnector consumerConnector() {
        return this.consumerConnector;
    }

    public void consumerConnector_$eq(ConsumerConnector consumerConnector) {
        this.consumerConnector = consumerConnector;
    }

    public void onStop() {
    }

    public void onStart() {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(BoxesRunTime.unboxToInt(this.topics.values().reduce(new KafkaReceiver$$anonfun$1(this))));
        logInfo(new KafkaReceiver$$anonfun$onStart$1(this));
        Properties properties = new Properties();
        this.org$apache$spark$streaming$kafka$KafkaReceiver$$kafkaParams.foreach(new KafkaReceiver$$anonfun$onStart$2(this, properties));
        logInfo(new KafkaReceiver$$anonfun$onStart$3(this));
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        consumerConnector_$eq(Consumer$.MODULE$.create(consumerConfig));
        logInfo(new KafkaReceiver$$anonfun$onStart$4(this));
        if (this.org$apache$spark$streaming$kafka$KafkaReceiver$$kafkaParams.contains("auto.offset.reset")) {
            tryZookeeperConsumerGroupCleanup((String) this.org$apache$spark$streaming$kafka$KafkaReceiver$$kafkaParams.apply("zookeeper.connect"), (String) this.org$apache$spark$streaming$kafka$KafkaReceiver$$kafkaParams.apply("group.id"));
        }
        consumerConnector().createMessageStreams(this.topics, (Decoder) Predef$.MODULE$.manifest(this.evidence$7).runtimeClass().getConstructor(VerifiableProperties.class).newInstance(consumerConfig.props()), (Decoder) Predef$.MODULE$.manifest(this.evidence$8).runtimeClass().getConstructor(VerifiableProperties.class).newInstance(consumerConfig.props())).values().foreach(new KafkaReceiver$$anonfun$onStart$5(this, newFixedThreadPool));
    }

    private void tryZookeeperConsumerGroupCleanup(String str, String str2) {
        try {
            String stringBuilder = new StringBuilder().append("/consumers/").append(str2).toString();
            logInfo(new KafkaReceiver$$anonfun$tryZookeeperConsumerGroupCleanup$1(this, stringBuilder));
            ZkClient zkClient = new ZkClient(str, 30000, 30000, ZKStringSerializer$.MODULE$);
            zkClient.deleteRecursive(stringBuilder);
            zkClient.close();
        } catch (Throwable unused) {
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public KafkaReceiver(Map<String, String> map, Map<String, Object> map2, StorageLevel storageLevel, ClassTag<K> classTag, ClassTag<V> classTag2, Manifest<U> manifest, Manifest<T> manifest2) {
        super(storageLevel);
        this.org$apache$spark$streaming$kafka$KafkaReceiver$$kafkaParams = map;
        this.topics = map2;
        this.org$apache$spark$streaming$kafka$KafkaReceiver$$evidence$5 = classTag;
        this.org$apache$spark$streaming$kafka$KafkaReceiver$$evidence$6 = classTag2;
        this.evidence$7 = manifest;
        this.evidence$8 = manifest2;
        Logging.class.$init$(this);
        this.consumerConnector = null;
    }
}
