package org.apache.spark.streaming.kafka;

import java.io.Serializable;
import java.util.HashMap;
import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.Tuple2;

/* loaded from: input_file:org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.class */
public class JavaKafkaRDDSuite implements Serializable {
    private transient JavaSparkContext sc = null;
    private transient KafkaTestUtils kafkaTestUtils = null;

    @Before
    public void setUp() {
        this.kafkaTestUtils = new KafkaTestUtils();
        this.kafkaTestUtils.setup();
        this.sc = new JavaSparkContext(new SparkConf().setMaster("local[4]").setAppName(getClass().getSimpleName()));
    }

    @After
    public void tearDown() {
        if (this.sc != null) {
            this.sc.stop();
            this.sc = null;
        }
        if (this.kafkaTestUtils != null) {
            this.kafkaTestUtils.teardown();
            this.kafkaTestUtils = null;
        }
    }

    @Test
    public void testKafkaRDD() throws InterruptedException {
        createTopicAndSendData("topic1");
        createTopicAndSendData("topic2");
        HashMap hashMap = new HashMap();
        hashMap.put("metadata.broker.list", this.kafkaTestUtils.brokerAddress());
        OffsetRange[] offsetRangeArr = {OffsetRange.create("topic1", 0, 0L, 1L), OffsetRange.create("topic2", 0, 0L, 1L)};
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        String[] split = this.kafkaTestUtils.brokerAddress().split(":");
        Broker create = Broker.create(split[0], Integer.parseInt(split[1]));
        hashMap3.put(new TopicAndPartition("topic1", 0), create);
        hashMap3.put(new TopicAndPartition("topic2", 0), create);
        JavaRDD map = KafkaUtils.createRDD(this.sc, String.class, String.class, StringDecoder.class, StringDecoder.class, hashMap, offsetRangeArr).map(new Function<Tuple2<String, String>, String>() { // from class: org.apache.spark.streaming.kafka.JavaKafkaRDDSuite.1
            public String call(Tuple2<String, String> tuple2) throws Exception {
                return (String) tuple2._2();
            }
        });
        JavaRDD createRDD = KafkaUtils.createRDD(this.sc, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, hashMap, offsetRangeArr, hashMap2, new Function<MessageAndMetadata<String, String>, String>() { // from class: org.apache.spark.streaming.kafka.JavaKafkaRDDSuite.2
            public String call(MessageAndMetadata<String, String> messageAndMetadata) throws Exception {
                return (String) messageAndMetadata.message();
            }
        });
        JavaRDD createRDD2 = KafkaUtils.createRDD(this.sc, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, hashMap, offsetRangeArr, hashMap3, new Function<MessageAndMetadata<String, String>, String>() { // from class: org.apache.spark.streaming.kafka.JavaKafkaRDDSuite.3
            public String call(MessageAndMetadata<String, String> messageAndMetadata) throws Exception {
                return (String) messageAndMetadata.message();
            }
        });
        long count = map.count();
        long count2 = createRDD.count();
        long count3 = createRDD2.count();
        Assert.assertTrue(count > 0);
        Assert.assertEquals(count, count2);
        Assert.assertEquals(count, count3);
    }

    private String[] createTopicAndSendData(String str) {
        String[] strArr = {str + "-1", str + "-2", str + "-3"};
        this.kafkaTestUtils.createTopic(str);
        this.kafkaTestUtils.sendMessages(str, strArr);
        return strArr;
    }
}
