Spark and Cassandra parallel processing -


i have following task ahead of me.

user provides set of ip addresses config file while executing spark submit command.

lets array looks :

val ips = array(1,2,3,4,5)

there can 100.000 values in array..

for elements in array, should read data cassandra, perform computation , insert data cassandra.

if do:

ips.foreach(ip =>{ - read data casandra specific "ip" // each ip there different amount of data read (within functions determine start , end date each ip) - process - save cassandra}) 

this works fine.

i believe process runs sequentially; don't exploit parallelism.

on other hand if do:

val iprdd = sc.parallelize(array(1,2,3,4,5)) iprdd.foreach(ip => { - read data cassandra // need use spark context make query -process save cassandra}) 

i serialization exception, because spark trying serialize spark context, not serializable.

how make work, still exploit parallelism.

thanks

edited

this execption get:

exception in thread "main" org.apache.spark.sparkexception: task not serializable @ org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner.scala:304) @ org.apache.spark.util.closurecleaner$.org$apache$spark$util$closurecleaner$$clean(closurecleaner.scala:294) @ org.apache.spark.util.closurecleaner$.clean(closurecleaner.scala:122) @ org.apache.spark.sparkcontext.clean(sparkcontext.scala:2055) @ org.apache.spark.rdd.rdd$$anonfun$foreachpartition$1.apply(rdd.scala:919) @ org.apache.spark.rdd.rdd$$anonfun$foreachpartition$1.apply(rdd.scala:918) @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:150) @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:111) @ org.apache.spark.rdd.rdd.withscope(rdd.scala:316) @ org.apache.spark.rdd.rdd.foreachpartition(rdd.scala:918) @ com.enerbyte.spark.jobs.wibeeebatch.wibeeebatchjob$$anonfun$main$1.apply(wibeeebatchjob.scala:59) @ com.enerbyte.spark.jobs.wibeeebatch.wibeeebatchjob$$anonfun$main$1.apply(wibeeebatchjob.scala:54) @ scala.collection.indexedseqoptimized$class.foreach(indexedseqoptimized.scala:33) @ scala.collection.mutable.arrayops$ofref.foreach(arrayops.scala:108) @ com.enerbyte.spark.jobs.wibeeebatch.wibeeebatchjob$.main(wibeeebatchjob.scala:54) @ com.enerbyte.spark.jobs.wibeeebatch.wibeeebatchjob.main(wibeeebatchjob.scala) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:498) @ org.apache.spark.deploy.sparksubmit$.org$apache$spark$deploy$sparksubmit$$runmain(sparksubmit.scala:731) @ org.apache.spark.deploy.sparksubmit$.dorunmain$1(sparksubmit.scala:181) @ org.apache.spark.deploy.sparksubmit$.submit(sparksubmit.scala:206) @ org.apache.spark.deploy.sparksubmit$.main(sparksubmit.scala:121) @ org.apache.spark.deploy.sparksubmit.main(sparksubmit.scala) caused by: java.io.notserializableexception: org.apache.spark.sparkcontext serialization stack: - object not serializable (class: org.apache.spark.sparkcontext, value: org.apache.spark.sparkcontext@311ff287) - field (class: com.enerbyte.spark.jobs.wibeeebatch.wibeeebatchjob$$anonfun$main$1, name: sc$1, type: class org.apache.spark.sparkcontext) - object (class com.enerbyte.spark.jobs.wibeeebatch.wibeeebatchjob$$anonfun$main$1, ) - field (class: com.enerbyte.spark.jobs.wibeeebatch.wibeeebatchjob$$anonfun$main$1$$anonfun$apply$1, name: $outer, type: class com.enerbyte.spark.jobs.wibeeebatch.wibeeebatchjob$$anonfun$main$1) - object (class com.enerbyte.spark.jobs.wibeeebatch.wibeeebatchjob$$anonfun$main$1$$anonfun$apply$1, ) @ org.apache.spark.serializer.serializationdebugger$.improveexception(serializationdebugger.scala:40) @ org.apache.spark.serializer.javaserializationstream.writeobject(javaserializer.scala:47) @ org.apache.spark.serializer.javaserializerinstance.serialize(javaserializer.scala:101) @ org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner.scala:301)

easiest thing use spark cassandra connector can handle connection pooling , serialization.

with like

sc.parallelize(inputdata, numtasks)   .mappartitions {  =>     val con = cassandraconnection(yourconf)     con.withsessiondo{ session =>       //use session     }     //do other processing   }.savetocassandra("ks","table" 

this manual operation of cassandra connection. sessions automatically pooled , cached , if prepare statement cached on executor well.

if use more built in methods, there exists joinwithcassandratable may work in situation.

sc.parallelize(inputdata, numtasks)   .joinwithcassandratable("ks","table") //retrieves records input data primary key   .map( //manipulate returned results if needed )   .savetocassandra("ks","table") 

Comments

Post a Comment

Popular posts from this blog

php - Passing multiple values in a url using checkbox -

sql - Postgresql tables exists, but getting "relation does not exist" when querying -

java - nested exception is org.hibernate.exception.SQLGrammarException: could not extract ResultSet Hibernate+SpringMVC -