Pause Spark Streaming Job -
i have simple spark streaming application reads data kafka , send data after transformation on http end point (or kafka - question let's consider http). submitting jobs using job-server.
i starting consumption source kafka "auto.offset.reset"="smallest" , interval=3s. in happy case looks good. here's excerpt:
kafkainputdstream.foreachrdd(rdd => { rdd.foreach(item => { //this throw exception if http endpoint isn't reachable httpprocessor.process(item._1, item._2) }) })
since "auto.offset.reset"="smallest", processes 200k messages in 1 job. if stop http server mid-job (simulating issue in posting) , httpprocessor.process throws exception, job fails , whatever unprocessed lost. see keeps on polling every 3 seconds after that.
so question is:
- is assumption right if in next 3 second job if got x messages , y processed before hitting error, rest x-y not processed?
- is there way pause stream/consumption kafka? instance in case there's intermittent network issue , messages consumed lost in time. keeps on retrying (maybe exponential backoff) , whenever http end point up, start consuming again.
thanks
yes, assumption correct if partition fails, remaining events not processed for moment.
however, there quite few parameter tune desired behavior (if use directkafkainputdstream).
lets start "auto.offset.reset"="smallest": parameter tells kafka begin beginning, when there no stored commit current group. mentioned rdd contains lot of messages after starting, assume not commit messages properly. if expect exactly-once semantics, should consider keep track of offsets directkafkastreaminput explicitly not keep track of that.
starting offsets specified in advance, , dstream not responsible committing offsets, can control exactly-once
comment in directkafkainputsream branch 1.6
that said message reprocessed, every time restart streaming job.
if commit processed offsets , pass inputdstream on startup, listener continue last committed offset.
regarding backpressure, directkafkainputdstream uses ratecontroller estimates how events should processed in 1 batch.
to use it, have enable backpressure:
"spark.streaming.backpressure.enabled": true
you can limit "spark.streaming.kafka.maxrateperpartition" add upper bound batch size.
if want control backpressure on own (and perhaps stop consumer while), may want implement methods of streaminglistener , use in job. can e.g. decide after each completed batch stop streaming job or not streaminglistener.
Comments
Post a Comment