python - SQLAlchemy ORM Event hook for attribute persisted -
i working on finding way in sqlalchemy events call external api upon attribute gets updated , persisted database. here context:
an
user
model attribute namedbirthday
. when instance ofuser
model gets updated , saved, want call external api update user's birthday accordingly.
i've tried attribute events, however, generates many hits , there no way guarantee set
/remove
attribute event persisted (auto commit set false , transaction gets rolled when errors occurred.)
session events not work either because requires session/sessionfactory parameter , there many places in code based sessions have been used.
i have been looking @ possible sqlalchemy orm event hooks in official documentation couldn't find 1 of them satisfy requirement.
i wonder if else has insight how implement kind of combination event trigger in sqlalchemy. thanks.
you can combining multiple events. specific events need use depend on particular application, basic idea this:
- [
instanceevents.load
] when instance loaded, note down fact loaded , not added session later (we want save initial state if instance loaded) - [
attributeevents.set/append/remove
] when attribute changes, note down fact changed, and, if necessary, changed (these first 2 steps optional if don't need initial state) - [
sessionevents.before_flush
] when flush happens, note down instances being saved - [
sessionevents.before_commit
] before commit completes, note down current state of instance (because may not have access anymore after commit) - [
sessionevents.after_commit
] after commit completes, fire off custom event handler , clear instances saved
an interesting challenge ordering of events. if session.commit()
without doing session.flush()
, you'll notice before_commit
event fires before before_flush
event, different scenario session.flush()
before session.commit()
. solution call session.flush()
in before_commit
call force ordering. not 100% kosher, works me in production.
here's (simple) diagram of ordering of events:
begin load (save initial state) set attribute ... flush set attribute ... flush ... (save modified state) commit (fire off "object saved , changed" event)
complete example
from itertools import chain weakref import weakkeydictionary, weakset sqlalchemy import column, string, integer, create_engine sqlalchemy import event sqlalchemy.orm import sessionmaker, object_session sqlalchemy.ext.declarative import declarative_base base = declarative_base() engine = create_engine("sqlite://") session = sessionmaker(bind=engine) class user(base): __tablename__ = "users" id = column(integer, primary_key=true) birthday = column(string) @event.listens_for(user.birthday, "set", active_history=true) def _record_initial_state(target, value, old, initiator): session = object_session(target) if session none: return if target not in session.info.get("loaded_instances", set()): return initial_state = session.info.setdefault("initial_state", weakkeydictionary()) # save entire object's state, not birthday attribute initial_state.setdefault(target, old) @event.listens_for(user, "load") def _record_loaded_instances_on_load(target, context): session = object_session(target) loaded_instances = session.info.setdefault("loaded_instances", weakset()) loaded_instances.add(target) @event.listens_for(session, "before_flush") def track_instances_before_flush(session, context, instances): modified_instances = session.info.setdefault("modified_instances", weakset()) obj in chain(session.new, session.dirty): if session.is_modified(obj) , isinstance(obj, user): modified_instances.add(obj) @event.listens_for(session, "before_commit") def set_pending_changes_before_commit(session): session.flush() # important initial_state = session.info.get("initial_state", {}) modified_instances = session.info.get("modified_instances", set()) del session.info["modified_instances"] pending_changes = session.info["pending_changes"] = [] obj in modified_instances: initial = initial_state.get(obj) current = obj.birthday pending_changes.append({ "initial": initial, "current": current, }) initial_state[obj] = current @event.listens_for(session, "after_commit") def after_commit(session): pending_changes = session.info.get("pending_changes", {}) del session.info["pending_changes"] changes in pending_changes: print(changes) # fire custom event loaded_instances = session.info["loaded_instances"] = weakset() v in session.identity_map.values(): if isinstance(v, user): loaded_instances.add(v) def main(): engine = create_engine("sqlite://", echo=false) base.metadata.create_all(bind=engine) session = session(bind=engine) user = user(birthday="foo") session.add(user) user.birthday = "bar" session.flush() user.birthday = "baz" session.commit() # prints: {"initial": none, "current": "baz"} user.birthday = "foobar" session.commit() # prints: {"initial": "baz", "current": "foobar"} session.close() if __name__ == "__main__": main()
as can see, it's little complicated , not ergonomic. nicer if integrated orm, understand there may reasons not doing so.
Comments
Post a Comment