1-844-696-6465 (US)        +91 77600 44484        help@dezyre.com

SparkStreaming 2 Redis with serialization error



0

Hi All,

I just started Spark course. I'm trying to create a project with Sparkstreaming.  The sparkstreaming project will get data from Kafka(log line), and after it will push this data in Redis and in a different kafka topic

For the first part (getting Kafka data) it's ok, but when I'm trying to send the data to Redis I'm getting serialization error : "  Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext . "

Please find below the  code

Thanks in advance

package com.halim.wifi

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils 
import com.redislabs.provider.redis._ 
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

class RedisActions(sc: SparkContext) extends Serializable {
  var stringHashRDD: RDD[(String, String)] =_
  def parallelize() {
     stringHashRDD = sc.parallelize(Seq(("myKey1", "MyValue1")))
  }
  def setHash() = {
    sc.toRedisHASH(stringHashRDD, "MyValueName", ("localhost", 6379))
  }
}

object SparkMyProgram {
  def main(args: Array[String]) {
  val sparkConf = new SparkConf().setAppName("AppTest").setMaster("local[*]")
  .set("redis.host", "localhost")
  .set("redis.port", "6379")
  val sc = new SparkContext(sparkConf)
        //val ssc = new StreamingContext(sparkConf, Seconds(2))
        val ssc = new StreamingContext(sc, Seconds(2))
        //val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("AppTest"))

        val lines = KafkaUtils.createStream(ssc, "Spark-machine:2181", "spark-consumer-group", Map("topicname" -> 5))
        
        lines.foreachRDD { x => 
          x.foreach {       
            msg => println("Message: "+msg)
            var ttt: String = msg._2
            var i = ttt.indexOf("Client Mac")
            if( i > 0 ) {
              var tt = ttt.substring(i)
              tt = tt.replaceAll(" ","")
              tt = tt.replaceAll("[\\]\\(\\)]","")
              tt = tt.replaceAll("\\["," ")
              println(tt)
              val redisActions = new RedisActions(sc)
              redisActions.parallelize()
              redisActions.setHash()
              
            }
          }      
        }
        

        ssc.start()
        ssc.awaitTermination()
      }
}

 


3 Answer(s)


0

Hi Halim,

Could you please share the complete error, it will easy for me to debug than.

Thanks.


0

Hi Abhijit,

Please find below the complet log

Thanks a lot for your help

 

spark-submit  --class "com.halim.wifi.SparkMyProgram" target/MonitorRuckus-1.0-SNAPSHOT-jar-with-dependencies.jar
17/10/11 18:44:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/10/11 18:44:50 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
17/10/11 18:44:50 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
17/10/11 18:44:52 ERROR JobScheduler: Error running job streaming job 1507743892000 ms.0
org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:869)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:868)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
	at org.apache.spark.rdd.RDD.foreach(RDD.scala:868)
	at com.halim.wifi.SparkMyProgram$$anonfun$main$1.apply(SparkMyProgram.scala:62)
	at com.halim.wifi.SparkMyProgram$$anonfun$main$1.apply(SparkMyProgram.scala:61)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
	at scala.util.Try$.apply(Try.scala:161)
	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:193)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:192)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
	- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@6a802af9)
	- field (class: bn.com.telbru.wifi.SparkKMeansProgram$$anonfun$main$1, name: sc$1, type: class org.apache.spark.SparkContext)
	- object (class bn.com.telbru.wifi.SparkKMeansProgram$$anonfun$main$1, <function1>)
	- field (class: bn.com.telbru.wifi.SparkKMeansProgram$$anonfun$main$1$$anonfun$apply$1, name: $outer, type: class bn.com.telbru.wifi.SparkKMeansProgram$$anonfun$main$1)
	- object (class bn.com.telbru.wifi.SparkKMeansProgram$$anonfun$main$1$$anonfun$apply$1, <function1>)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
	... 30 more

 


0

Hi Halim,

You are trying to directly use iteration for not serialized class object. Please refer the following link for reference:

https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala

Hope this helps.

Thanks.

 

Your Answer

Click on this code-snippet-icon icon to add code snippet.

Upload Files (Maximum image file size - 1.5 MB, other file size - 10 MB, total size - not more than 50 MB)

Email
Password