c# - Rabbit MQ unack message not back to queue for consumer to process again -
i use rabbitmq queue message server, use .net c# client. when there error in processing message queue, message not ackknowleage , still stuck in queue not processed again document understand.
i don't know if miss configurations or block of codes.
my idea auto manual ack message if error , manual push message queue again.
i hope have better solution.
thank much.
my code
public void subscribe(string queuename) { while (!cancelled) { try { if (subscription == null) { try { //try open connection connection = connectionfactory.createconnection(); } catch (brokerunreachableexception ex) { //you want log error , cancel after n tries, //otherwise start loop on try connect again after second or so. log.error(ex); continue; } //crate chanel channel = connection.createmodel(); // instructs channel not prefetch more 1 message channel.basicqos(0, 1, false); // create new, durable exchange channel.exchangedeclare(exchangename, exchangetype.direct, true, false, null); // create new, durable queue channel.queuedeclare(queuename, true, false, false, null); // bind queue exchange channel.queuebind(queuename, exchangename, queuename); //create subscription subscription = new subscription(channel, queuename, false); } basicdelivereventargs eventargs; var gotmessage = subscription.next(250, out eventargs);//250 millisecond if (gotmessage) { if (eventargs == null) { //this means connection closed. disposeallconnectionobjects(); continue;//move new iterate } //process message channel.basicack(eventargs.deliverytag, false); } } catch (operationinterruptedexception ex) { log.error(ex); disposeallconnectionobjects(); } } disposeallconnectionobjects(); } private void disposeallconnectionobjects() { //dispose subscription if (subscription != null) { //idisposable implemented explicitly reason. ((idisposable)subscription).dispose(); subscription = null; } //dipose channel if (channel != null) { channel.dispose(); channel = null; } //check if connection not null , dispose if (connection != null) { try { connection.dispose(); } catch (endofstreamexception ex) { log.error(ex); } catch (operationinterruptedexception ex)//handle error dispose connection { log.error(ex); } catch (exception ex) { log.error(ex); } connection = null; } }
i think may have misunderstood rabbitmq documentation. if message not ack'ed consumer rabbit broker requeue message onto queue consumption. dont believe suggested method ack'ing , requeuing message idea, , make problem more complex.
if want explicitly "reject" message because consumer had problem processing it, use nack feature of rabbit.
for example, within catch exception blocks, use:
subscription.model.basicnack(eventargs.deliverytag, false, true);
the above inform rabbit broker requeue message. pass delivery tag, false not multiple messages, , true requeue message. if want reject message , not requeue, change true false.
additionally, have created subscription think should perform ack's directly on this. not through channel.
change:
channel.basicack(eventargs.deliverytag, false);
to:
subscription.ack();
this method of ack'ing cleaner since keeping subscription related on subscription object, rather messing around channel you've subscribed to.
if helps please mark answer, lots of people seem forget important part of community.
Comments
Post a Comment