scala - spark streaming can not do the toDF function -


i trying make spark streaming application connects flume.

i managed save data when rdd, if try convert dataframe using todf function makes error. working shell can't see error is.

this code doing:

//importing relevant libraries import org.apache.spark.sparkconf import org.apache.spark.streaming._ import org.apache.spark.streaming.flume._ import org.apache.spark.util.intparam import org.apache.spark.storage.storagelevel  //creating spark streaming configuration val ssc = new streamingcontext(sc, seconds(5)) val stream = flumeutils.createstream(ssc, "0.0.0.0", 44444, storagelevel.memory_only_ser_2)  //starting streaming job val textstream = stream.map(e => new string(e.event.getbody.array) ) val numlines = textstream.count() numlines.print() textstream.foreachrdd { rdd =>   //some stuff needs created   import java.util.date   val d = new date   //delimeter of '&'   val rdd_s = rdd.map(line => line.split("&"))   val rdd_split = rdd_s.map(line => (d.gettime.tostring, line(2), line(3).toint))   //only saves data if todf comented out.   rdd_split.saveastextfile("/flume/text/final/")    //creating data-frame - if commented out, data saved file   val sqlcontext = sqlcontext.getorcreate(rdd.sparkcontext)   import sqlcontext.implicits._     val df = rdd_split.todf("moment", "id","amount")   df.saveastextfile("/idan/streaming/flume/text/final/withtime") }  ssc.start() 

found answer this needed create lazy singleton of sqlcontext create dataframe. here final of singleton code:

//creating case class dataframe case class record(moment:string, name:string, id:string, amount:int) /** lazily instantiated singleton instance of sqlcontext */ object sqlcontextsingleton {   @transient private var instance: sqlcontext = null   // instantiate sqlcontext on demand   def getinstance(sc: sparkcontext): sqlcontext = synchronized {     if (instance == null) {       instance = new sqlcontext(sc)     }     instance   } } 

and create dataframe needed create singleton it:

val sqlcontext = sqlcontextsingleton.getinstance(rdd_s.sparkcontext) import sqlcontext.implicits._ val df = sqlcontext.createdataframe(rdd_s.map(line => (d.gettime, line(0), line(1), line(2), line(3)))) 

edit: gives me error:

16/08/09 13:16:05 error scheduler.jobscheduler: error running job streaming job 1470748565000 ms.1 java.lang.nullpointerexception     @ org.apache.spark.sql.hive.client.clientwrapper.conf(clientwrapper.scala:205)     @ org.apache.spark.sql.hive.hivecontext.hiveconf$lzycompute(hivecontext.scala:554)     @ org.apache.spark.sql.hive.hivecontext.hiveconf(hivecontext.scala:553)     @ org.apache.spark.sql.hive.hivecontext$$anonfun$configure$1.apply(hivecontext.scala:540)     @ org.apache.spark.sql.hive.hivecontext$$anonfun$configure$1.apply(hivecontext.scala:539)     @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:244)     @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:244)     @ scala.collection.immutable.list.foreach(list.scala:318)     @ scala.collection.traversablelike$class.map(traversablelike.scala:244)     @ scala.collection.abstracttraversable.map(traversable.scala:105)     @ org.apache.spark.sql.hive.hivecontext.configure(hivecontext.scala:539)     @ org.apache.spark.sql.hive.hivecontext.metadatahive$lzycompute(hivecontext.scala:252)     @ org.apache.spark.sql.hive.hivecontext.metadatahive(hivecontext.scala:239)     @ org.apache.spark.sql.hive.hivecontext$$anon$2.<init>(hivecontext.scala:459)     @ org.apache.spark.sql.hive.hivecontext.catalog$lzycompute(hivecontext.scala:459)     @ org.apache.spark.sql.hive.hivecontext.catalog(hivecontext.scala:458)     @ org.apache.spark.sql.hive.hivecontext$$anon$3.<init>(hivecontext.scala:475)     @ org.apache.spark.sql.hive.hivecontext.analyzer$lzycompute(hivecontext.scala:475)     @ org.apache.spark.sql.hive.hivecontext.analyzer(hivecontext.scala:474)     @ org.apache.spark.sql.execution.queryexecution.assertanalyzed(queryexecution.scala:34)     @ org.apache.spark.sql.dataframe.<init>(dataframe.scala:133)     @ org.apache.spark.sql.dataframe$.apply(dataframe.scala:52)     @ org.apache.spark.sql.sqlcontext.createdataframe(sqlcontext.scala:417)     @ org.apache.spark.sql.sqlimplicits.rddtodataframeholder(sqlimplicits.scala:155)     @ $line46.$read$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$anonfun$1.apply(<console>:58)     @ $line46.$read$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$anonfun$1.apply(<console>:48)     @ org.apache.spark.streaming.dstream.dstream$$anonfun$foreachrdd$1$$anonfun$apply$mcv$sp$3.apply(dstream.scala:661)     @ org.apache.spark.streaming.dstream.dstream$$anonfun$foreachrdd$1$$anonfun$apply$mcv$sp$3.apply(dstream.scala:661)     @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1$$anonfun$apply$mcv$sp$1.apply$mcv$sp(foreachdstream.scala:50)     @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1$$anonfun$apply$mcv$sp$1.apply(foreachdstream.scala:50)     @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1$$anonfun$apply$mcv$sp$1.apply(foreachdstream.scala:50)     @ org.apache.spark.streaming.dstream.dstream.createrddwithlocalproperties(dstream.scala:426)     @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply$mcv$sp(foreachdstream.scala:49)     @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply(foreachdstream.scala:49)     @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply(foreachdstream.scala:49)     @ scala.util.try$.apply(try.scala:161)     @ org.apache.spark.streaming.scheduler.job.run(job.scala:39)     @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply$mcv$sp(jobscheduler.scala:224)     @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply(jobscheduler.scala:224)     @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply(jobscheduler.scala:224)     @ scala.util.dynamicvariable.withvalue(dynamicvariable.scala:57)     @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler.run(jobscheduler.scala:223)     @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145)     @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615)     @ java.lang.thread.run(thread.java:745) 16/08/09 13:16:08 error scheduler.receivertracker: deregistered receiver stream 0: stopped driver 

did happend have kind of problem , know how help? :)


Comments