python - How to dispose of an observable on completion of another observable? -


i have source observable subscribe logger observer logging purposes.

i subscribe source can perform computations. when computations completed, i'm done source , want dispose of logger:

             +-------------------+              |                   |    +---------+ source observable +--------+    |         |                   |        |    |         +-------------------+        |    |                                      |    |                                      | +--v---------------+         +------------v--------+ |                  |         |                     | |     logger       |         |    computations     | |    (observer)    |         |    (observable)     | +-------^----------+         +-----------+---------+         |                                |         |                                |         |        dispose logger          |         +--------------------------------+             when computations completed 

however, logger doesn't quite disposed @ right time -- 1 or 2 ticks occur:

mwe

from rx import observable  # source source = observable.interval(1)  # create logger source logged = [] logger = source.subscribe(logged.append)  # stuff/computations source calculated = source.map(lambda x: x**2).take_while(lambda x: x < 20)  # output computed values , stop logging when we're done our computation calculated.subscribe(print, print, logger.dispose)  # expect values passed through our computation have been logged # last value should 5 because 5**2 = 25 larger 20 # in turn causes our computation terminate assert logged == [0, 1, 2, 3, 4, 5], logged 

but get:

traceback (most recent call last):   file "c:\program files (x86)\python27\lib\site-packages\ipython\core\interactiveshell.py", line 3035, in run_code     exec(code_obj, self.user_global_ns, self.user_ns)   file "<ipython-input-54-e8cb1fb583bf>", line 1, in <module>     assert logged == [0, 1, 2, 3, 4, 5], logged assertionerror: [0, 1, 2, 3, 4, 5, 6, 7] 

how did 7 logged? our computation should terminate after source emits 5, @ point logger gets disposed.

what doing wrong?

this thread synchronization problem. interval() operator launches new threads invoke on_next() in specified intervals. once dispose of subscription takes time until other threads detect signal , stop working. , 1 millisecond close time takes.

in order log messages passing through reactive chain more reliable insert logging function right chain:

logged = [] def logger(x):     logged.append(x)     return x  calculated = source \     .map(logger) \     .map(lambda x: x**2) \     .take_while(lambda x: x < 20) \     .subscribe(print, print) 

Comments

Popular posts from this blog

java - nested exception is org.hibernate.exception.SQLGrammarException: could not extract ResultSet Hibernate+SpringMVC -

sql - Postgresql tables exists, but getting "relation does not exist" when querying -

asp.net mvc - breakpoint on javascript in CSHTML? -