spring - SpringFramework .NET - Rollback/Commit for NMS Messaging (ActiveMQ) Transactions -
i'm working on converting existing java/springboot/activemq console app (which consumes messages named activemq message queue , processes messages) analogous c# .net console app. i've got things running correctly success case, i'm having trouble replicating java app's behavior cases in message handler fails process message successfully.
the failure case behavior i'm attempting replicate rollback & re-queueing of message (back named queue received) when message handling fails.
in java/springboot, via adding transactional configuration appropriate classes/methods wherein message processing occurs. failure cases, throw runtimeexception (unchecked) , allow such thrown exceptions handled in spring transactional framework (using springboot defaults such exception handling) and, based on transaction rollback processing framework, message re-queued. i'm not explicitly doing rollback processing in java application code logic, but, rather, allowing framework default flow handle this.
i'm less familiar spring .net analog achieving auto-rollback/re-queue behavior in c# application. and, of yet, i've not been able replicate this. according section 31.5.5 of spring .net documentation:
invoking message listener within transaction requires reconfiguration of listener container. local message transactions can activated setting property sessionacknowledgemode nms of enum type acknowledgementmode, acknowledgementmode.transactional. each message listener invocation operate within active messaging transaction, message reception rolled in case of listener execution failure.
i have followed prescription above in main method of c# console app:
static void main(string[] args) { var ctx = contextregistry.getcontext(); var msglistenercontainer = ctx.getobject<simplemessagelistenercontainer>("messagelistenercontainer"); msglistenercontainer.sessionacknowledgemode = acknowledgementmode.transactional; console.readline(); }
however, not clear me how trigger listener execution failure in c# console application. i've tried throwing various types of exceptions (applicationexception, exception, nmsexception) , see no re-queuing of messages.
any enlightenment appreciated.
fyi, here additional germane code logic , configuration:
message handling method:
public void handlemessage(hashtable message) { logger.debug("entered handlemessage"); var ticketuuid = message["ti"] string; if (ticketuuid == null) throw new argumentnullexception("ticketuuid"); var gametitle = message["gt"] string; if (gametitle == null) throw new argumentnullexception("gametitle"); var recalldata = message["grd"] string; if (recalldata == null) throw new argumentnullexception("recalldata"); logger.debug(string.format("handlemessage - ticketuuid={0} gametitle={1} recalldata={2}", ticketuuid, gametitle, recalldata)); videorecordingservice.recordvideo(ticketuuid, gametitle, recalldata); }
here version of recordvideo method irrelevant code logic elided clarity:
[transaction] public async task recordvideo(string ticketid, string gametitle, string gamerecalldata) { <elided code> // start video recording app recorder.poweron(); // start air game exe separate process runairgameproc(gametitle); // testing failed message processing throw new applicationexception("forced exception"); var videobytes = file.readallbytes(gamesharevideofilename); msgproducer.updatejobstatus(ticketid, ticketstatusenum.recorded, videobytes); msgproducer.uploadvideo(ticketid, videobytes); } catch (exception ex) { logger.warnformat("exception caught: {0}" + environment.newline + "{1}", ex.message, ex.stacktrace); throw new nmsexception(ex.message); } { // clean files <elided> } }
and, here relevant spring .net configuration console app:
<spring> <context> <resource uri="config://spring/objects" /> </context> <objects xmlns="http://www.springframework.net"> <description>game share video recording service spring ioc configuration</description> <object name="msgproducer" type="gameshare.videorecorder.messageproducer, gameshare.videorecorder"> <property name="nmstemplate" ref="nmstemplate" /> <property name="jobinfodestination"> <object type="apache.nms.activemq.commands.activemqqueue, apache.nms.activemq"> <constructor-arg value="gamesharejobinfo" /> </object> </property> <property name="videouploaddestination"> <object type="apache.nms.activemq.commands.activemqqueue, apache.nms.activemq"> <constructor-arg value="gamesharevideoupload" /> </object> </property> </object> <object name="connectionfactory" type="spring.messaging.nms.connections.cachingconnectionfactory, spring.messaging.nms"> <property name="sessioncachesize" value="10" /> <property name="targetconnectionfactory"> <object type="apache.nms.activemq.connectionfactory, apache.nms.activemq"> <constructor-arg index="0" value="tcp://localhost:61616" /> </object> </property> </object> <object name="messagehandler" type="gameshare.videorecorder.messagehandler, gameshare.videorecorder" autowire="autodetect" /> <object name="messagelisteneradapter" type="spring.messaging.nms.listener.adapter.messagelisteneradapter, spring.messaging.nms"> <property name="handlerobject" ref="messagehandler" /> </object> <object name="messagelistenercontainer" type="spring.messaging.nms.listener.simplemessagelistenercontainer, spring.messaging.nms"> <property name="connectionfactory" ref="connectionfactory" /> <property name="destinationname" value="gamesharevideorecording" /> <property name="messagelistener" ref="messagelisteneradapter" /> </object> <object name="nmstemplate" type="spring.messaging.nms.core.nmstemplate, spring.messaging.nms"> <property name="connectionfactory" ref="connectionfactory" /> </object> </objects> </spring>
update: after re-reading chapter 31 of spring .net docs (and tinkering console app bit more), i've arrived @ solution/answer. default listener-container (spring.messaging.nms.listener.simplemessagelistenercontainer) not provide behavior trying emulate java/spring/activemq model (which, btw, is default behavior message handling in context of transaction using java/springboot framework). specifically, noted in section 31.5.2 asynchronous reception:
exceptions thrown during message processing can passed implementation of iexceptionhandler , registered container via property exceptionlistener. registered iexceptionhandler invoked if exception of type nmsexception (or equivalent root exception type other providers). simplemessagelistenercontainer log exception @ error level and not propagate exception provider. handling of acknowledgement and/or transactions done listener container. can override method handlelistenerexception change behavior.
so, in order desired behavior, had provide own implementations of iexceptionhandler , ierrorhandler propagate exceptions provider. provider (activemq in case) see exception , re-queue message.
a simple implementation these interfaces follows:
public class videorecorderapp { static void main(string[] args) { var ctx = contextregistry.getcontext(); var msglistenercontainer = ctx.getobject<simplemessagelistenercontainer>("messagelistenercontainer"); msglistenercontainer.sessionacknowledgemode = acknowledgementmode.transactional; msglistenercontainer.errorhandler = new myerrorhandler(); msglistenercontainer.exceptionlistener = new myexceptionlistener(); console.readline(); } } internal class myerrorhandler : ierrorhandler { /// <summary> /// logger /// </summary> private static readonly ilog logger = logmanager.getlogger<myerrorhandler>(); /// <summary> /// handles error. /// </summary> /// <param name="exception">the exception.</param> public void handleerror(exception exception) { logger.warnformat("handleerror: {0}", exception.message); throw exception; } } internal class myexceptionlistener : iexceptionlistener { /// <summary> /// logger /// </summary> private static readonly ilog logger = logmanager.getlogger<myexceptionlistener>(); /// <summary> /// called when there exception in message processing. /// </summary> /// <param name="exception">the exception.</param> public void onexception(exception exception) { logger.warnformat("onexception: {0}", exception.message); throw exception; } }
and, if 1 wants/needs distinguish between specific exception types (e.g., of should trigger re-queue , others of wish log/swallow), 1 can add code logic handlers so.
Comments
Post a Comment