python - How to dispose of an observable on completion of another observable? -
i have source
observable subscribe logger
observer logging purposes.
i subscribe source
can perform computations. when computations completed, i'm done source
, want dispose of logger
:
+-------------------+ | | +---------+ source observable +--------+ | | | | | +-------------------+ | | | | | +--v---------------+ +------------v--------+ | | | | | logger | | computations | | (observer) | | (observable) | +-------^----------+ +-----------+---------+ | | | | | dispose logger | +--------------------------------+ when computations completed
however, logger
doesn't quite disposed @ right time -- 1 or 2 ticks occur:
mwe
from rx import observable # source source = observable.interval(1) # create logger source logged = [] logger = source.subscribe(logged.append) # stuff/computations source calculated = source.map(lambda x: x**2).take_while(lambda x: x < 20) # output computed values , stop logging when we're done our computation calculated.subscribe(print, print, logger.dispose) # expect values passed through our computation have been logged # last value should 5 because 5**2 = 25 larger 20 # in turn causes our computation terminate assert logged == [0, 1, 2, 3, 4, 5], logged
but get:
traceback (most recent call last): file "c:\program files (x86)\python27\lib\site-packages\ipython\core\interactiveshell.py", line 3035, in run_code exec(code_obj, self.user_global_ns, self.user_ns) file "<ipython-input-54-e8cb1fb583bf>", line 1, in <module> assert logged == [0, 1, 2, 3, 4, 5], logged assertionerror: [0, 1, 2, 3, 4, 5, 6, 7]
how did 7 logged? our computation should terminate after source
emits 5, @ point logger
gets disposed.
what doing wrong?
this thread synchronization problem. interval()
operator launches new threads invoke on_next()
in specified intervals. once dispose of subscription takes time until other threads detect signal , stop working. , 1 millisecond close time takes.
in order log messages passing through reactive chain more reliable insert logging function right chain:
logged = [] def logger(x): logged.append(x) return x calculated = source \ .map(logger) \ .map(lambda x: x**2) \ .take_while(lambda x: x < 20) \ .subscribe(print, print)
Comments
Post a Comment