Spark and Cassandra parallel processing -
i have following task ahead of me.
user provides set of ip addresses config file while executing spark submit command.
lets array looks :
val ips = array(1,2,3,4,5)
there can 100.000 values in array..
for elements in array, should read data cassandra, perform computation , insert data cassandra.
if do:
ips.foreach(ip =>{ - read data casandra specific "ip" // each ip there different amount of data read (within functions determine start , end date each ip) - process - save cassandra})
this works fine.
i believe process runs sequentially; don't exploit parallelism.
on other hand if do:
val iprdd = sc.parallelize(array(1,2,3,4,5)) iprdd.foreach(ip => { - read data cassandra // need use spark context make query -process save cassandra})
i serialization exception, because spark trying serialize spark context, not serializable.
how make work, still exploit parallelism.
thanks
edited
this execption get:
exception in thread "main" org.apache.spark.sparkexception: task not serializable @ org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner.scala:304) @ org.apache.spark.util.closurecleaner$.org$apache$spark$util$closurecleaner$$clean(closurecleaner.scala:294) @ org.apache.spark.util.closurecleaner$.clean(closurecleaner.scala:122) @ org.apache.spark.sparkcontext.clean(sparkcontext.scala:2055) @ org.apache.spark.rdd.rdd$$anonfun$foreachpartition$1.apply(rdd.scala:919) @ org.apache.spark.rdd.rdd$$anonfun$foreachpartition$1.apply(rdd.scala:918) @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:150) @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:111) @ org.apache.spark.rdd.rdd.withscope(rdd.scala:316) @ org.apache.spark.rdd.rdd.foreachpartition(rdd.scala:918) @ com.enerbyte.spark.jobs.wibeeebatch.wibeeebatchjob$$anonfun$main$1.apply(wibeeebatchjob.scala:59) @ com.enerbyte.spark.jobs.wibeeebatch.wibeeebatchjob$$anonfun$main$1.apply(wibeeebatchjob.scala:54) @ scala.collection.indexedseqoptimized$class.foreach(indexedseqoptimized.scala:33) @ scala.collection.mutable.arrayops$ofref.foreach(arrayops.scala:108) @ com.enerbyte.spark.jobs.wibeeebatch.wibeeebatchjob$.main(wibeeebatchjob.scala:54) @ com.enerbyte.spark.jobs.wibeeebatch.wibeeebatchjob.main(wibeeebatchjob.scala) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:498) @ org.apache.spark.deploy.sparksubmit$.org$apache$spark$deploy$sparksubmit$$runmain(sparksubmit.scala:731) @ org.apache.spark.deploy.sparksubmit$.dorunmain$1(sparksubmit.scala:181) @ org.apache.spark.deploy.sparksubmit$.submit(sparksubmit.scala:206) @ org.apache.spark.deploy.sparksubmit$.main(sparksubmit.scala:121) @ org.apache.spark.deploy.sparksubmit.main(sparksubmit.scala) caused by: java.io.notserializableexception: org.apache.spark.sparkcontext serialization stack: - object not serializable (class: org.apache.spark.sparkcontext, value: org.apache.spark.sparkcontext@311ff287) - field (class: com.enerbyte.spark.jobs.wibeeebatch.wibeeebatchjob$$anonfun$main$1, name: sc$1, type: class org.apache.spark.sparkcontext) - object (class com.enerbyte.spark.jobs.wibeeebatch.wibeeebatchjob$$anonfun$main$1, ) - field (class: com.enerbyte.spark.jobs.wibeeebatch.wibeeebatchjob$$anonfun$main$1$$anonfun$apply$1, name: $outer, type: class com.enerbyte.spark.jobs.wibeeebatch.wibeeebatchjob$$anonfun$main$1) - object (class com.enerbyte.spark.jobs.wibeeebatch.wibeeebatchjob$$anonfun$main$1$$anonfun$apply$1, ) @ org.apache.spark.serializer.serializationdebugger$.improveexception(serializationdebugger.scala:40) @ org.apache.spark.serializer.javaserializationstream.writeobject(javaserializer.scala:47) @ org.apache.spark.serializer.javaserializerinstance.serialize(javaserializer.scala:101) @ org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner.scala:301)
easiest thing use spark cassandra connector can handle connection pooling , serialization.
with like
sc.parallelize(inputdata, numtasks) .mappartitions { => val con = cassandraconnection(yourconf) con.withsessiondo{ session => //use session } //do other processing }.savetocassandra("ks","table"
this manual operation of cassandra connection. sessions automatically pooled , cached , if prepare statement cached on executor well.
if use more built in methods, there exists joinwithcassandratable
may work in situation.
sc.parallelize(inputdata, numtasks) .joinwithcassandratable("ks","table") //retrieves records input data primary key .map( //manipulate returned results if needed ) .savetocassandra("ks","table")
very nice blog,keep sharing more blogs with us.
ReplyDeletebig data hadoop course