i trying make spark streaming application connects flume.
i managed save data when rdd, if try convert dataframe using todf function makes error. working shell can't see error is.
this code doing:
//importing relevant libraries import org.apache.spark.sparkconf import org.apache.spark.streaming._ import org.apache.spark.streaming.flume._ import org.apache.spark.util.intparam import org.apache.spark.storage.storagelevel //creating spark streaming configuration val ssc = new streamingcontext(sc, seconds(5)) val stream = flumeutils.createstream(ssc, "0.0.0.0", 44444, storagelevel.memory_only_ser_2) //starting streaming job val textstream = stream.map(e => new string(e.event.getbody.array) ) val numlines = textstream.count() numlines.print() textstream.foreachrdd { rdd => //some stuff needs created import java.util.date val d = new date //delimeter of '&' val rdd_s = rdd.map(line => line.split("&")) val rdd_split = rdd_s.map(line => (d.gettime.tostring, line(2), line(3).toint)) //only saves data if todf comented out. rdd_split.saveastextfile("/flume/text/final/") //creating data-frame - if commented out, data saved file val sqlcontext = sqlcontext.getorcreate(rdd.sparkcontext) import sqlcontext.implicits._ val df = rdd_split.todf("moment", "id","amount") df.saveastextfile("/idan/streaming/flume/text/final/withtime") } ssc.start()
found answer this needed create lazy singleton of sqlcontext create dataframe. here final of singleton code:
//creating case class dataframe case class record(moment:string, name:string, id:string, amount:int) /** lazily instantiated singleton instance of sqlcontext */ object sqlcontextsingleton { @transient private var instance: sqlcontext = null // instantiate sqlcontext on demand def getinstance(sc: sparkcontext): sqlcontext = synchronized { if (instance == null) { instance = new sqlcontext(sc) } instance } }
and create dataframe needed create singleton it:
val sqlcontext = sqlcontextsingleton.getinstance(rdd_s.sparkcontext) import sqlcontext.implicits._ val df = sqlcontext.createdataframe(rdd_s.map(line => (d.gettime, line(0), line(1), line(2), line(3))))
edit: gives me error:
16/08/09 13:16:05 error scheduler.jobscheduler: error running job streaming job 1470748565000 ms.1 java.lang.nullpointerexception @ org.apache.spark.sql.hive.client.clientwrapper.conf(clientwrapper.scala:205) @ org.apache.spark.sql.hive.hivecontext.hiveconf$lzycompute(hivecontext.scala:554) @ org.apache.spark.sql.hive.hivecontext.hiveconf(hivecontext.scala:553) @ org.apache.spark.sql.hive.hivecontext$$anonfun$configure$1.apply(hivecontext.scala:540) @ org.apache.spark.sql.hive.hivecontext$$anonfun$configure$1.apply(hivecontext.scala:539) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:244) @ scala.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:244) @ scala.collection.immutable.list.foreach(list.scala:318) @ scala.collection.traversablelike$class.map(traversablelike.scala:244) @ scala.collection.abstracttraversable.map(traversable.scala:105) @ org.apache.spark.sql.hive.hivecontext.configure(hivecontext.scala:539) @ org.apache.spark.sql.hive.hivecontext.metadatahive$lzycompute(hivecontext.scala:252) @ org.apache.spark.sql.hive.hivecontext.metadatahive(hivecontext.scala:239) @ org.apache.spark.sql.hive.hivecontext$$anon$2.<init>(hivecontext.scala:459) @ org.apache.spark.sql.hive.hivecontext.catalog$lzycompute(hivecontext.scala:459) @ org.apache.spark.sql.hive.hivecontext.catalog(hivecontext.scala:458) @ org.apache.spark.sql.hive.hivecontext$$anon$3.<init>(hivecontext.scala:475) @ org.apache.spark.sql.hive.hivecontext.analyzer$lzycompute(hivecontext.scala:475) @ org.apache.spark.sql.hive.hivecontext.analyzer(hivecontext.scala:474) @ org.apache.spark.sql.execution.queryexecution.assertanalyzed(queryexecution.scala:34) @ org.apache.spark.sql.dataframe.<init>(dataframe.scala:133) @ org.apache.spark.sql.dataframe$.apply(dataframe.scala:52) @ org.apache.spark.sql.sqlcontext.createdataframe(sqlcontext.scala:417) @ org.apache.spark.sql.sqlimplicits.rddtodataframeholder(sqlimplicits.scala:155) @ $line46.$read$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$anonfun$1.apply(<console>:58) @ $line46.$read$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$anonfun$1.apply(<console>:48) @ org.apache.spark.streaming.dstream.dstream$$anonfun$foreachrdd$1$$anonfun$apply$mcv$sp$3.apply(dstream.scala:661) @ org.apache.spark.streaming.dstream.dstream$$anonfun$foreachrdd$1$$anonfun$apply$mcv$sp$3.apply(dstream.scala:661) @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1$$anonfun$apply$mcv$sp$1.apply$mcv$sp(foreachdstream.scala:50) @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1$$anonfun$apply$mcv$sp$1.apply(foreachdstream.scala:50) @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1$$anonfun$apply$mcv$sp$1.apply(foreachdstream.scala:50) @ org.apache.spark.streaming.dstream.dstream.createrddwithlocalproperties(dstream.scala:426) @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply$mcv$sp(foreachdstream.scala:49) @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply(foreachdstream.scala:49) @ org.apache.spark.streaming.dstream.foreachdstream$$anonfun$1.apply(foreachdstream.scala:49) @ scala.util.try$.apply(try.scala:161) @ org.apache.spark.streaming.scheduler.job.run(job.scala:39) @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply$mcv$sp(jobscheduler.scala:224) @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply(jobscheduler.scala:224) @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler$$anonfun$run$1.apply(jobscheduler.scala:224) @ scala.util.dynamicvariable.withvalue(dynamicvariable.scala:57) @ org.apache.spark.streaming.scheduler.jobscheduler$jobhandler.run(jobscheduler.scala:223) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615) @ java.lang.thread.run(thread.java:745) 16/08/09 13:16:08 error scheduler.receivertracker: deregistered receiver stream 0: stopped driver
did happend have kind of problem , know how help? :)
Comments
Post a Comment