markAsProcessed {Proc4} | R Documentation |
A collection of message objects can serve as a queue: they can be
sorted by their timestamp
and then processed one at a
time. The function markAsProcessed
sets the processed flag on
the message and then saves it back to the database. The function
processed
returns the processed flag.
The function markAsError
attaches an error to the message and
saves it. The function processingError
returns the error (if
it exists).
markAsProcessed(mess, col) markAsError(mess, col, e) processed(x) processingError(x)
mess |
An object of class |
col |
A |
e |
An object indicating the error occurred. Note this could be either a string giving the error message of an object of an error class. In either case, it is converted to a string before saving. |
x |
A message object to be queried. |
A mongo
collection of messages can serve as a
queue. As messages are added into the queue, the processed
flag is set to false. The handler then fetches them one at a time
(sorting by the timestamp). It then does whatever action is required
to handle the message. Then the function markAsProcessed
is
called to set the processed
flag to true and update the entry
in the database.
A typical query (this example is taken from the
EIEvent-package
) is
getOneRec(buildJQuery(app=app, processed=FALSE),
eventdb(), parseEvent, sort = c(timestamp = 1))
. Here the
buildJQuery
call searches for unprocessed events
corresponding to a particular app
. The sort
argument ensures that the records will be sorted in ascending order
according to timestamp
. In this example
eventdb()
in an internal method which returns the event
collection, and parseEvent
create event objects
(which are a subclass of P4Message
.
Some thought needs to be given as to how to handle errors. The
function markAsError
attaches an error object to the message
and then updates it in the collection. The error object is turned
into a string (using toString
) before saving, so
it can be any type of R object (in particular, it could be either the
error message or the actual error object thrown by the function).
The functions markAsProcessed
and markAsError
both
return the modified message.
The function processed
returns a logical value indicating
whether or not the message has been processed.
The function processingError
returns the error object attached
to the message, or NULL
if no error object is returned. Note
that the error object could be of any type.
The functions markAsProcessed
and markAsError
do not
save the complete record, they just update the processed or error
field.
There was a bug in early version of this function, which caused the error to be put into a list when it was saved. This needs to be carefully checked.
Russell Almond
P4Message
, getOneRec
,
buildJQuery
, timestamp
col <- mongo("TestMessages") col$remove('{}') # Clear out anything else in queue. mess1 <- P4Message("One","Adder","Tester","Add me",app="adder", details=list(x=1,y=1)) mess2 <- P4Message("Two","Adder","Tester","Add me",app="adder", details=list(x="two",y=2)) mess1 <- saveRec(mess1,col,FALSE) mess2 <- saveRec(mess2,col,FALSE) mess <- getOneRec(buildJQuery(app="adder", processed=FALSE), col, parseMessage, sort = c(timestamp = 1)) while (!is.null(mess)) { print(details(mess)) out <- try(print(details(mess)$x+details(mess)$y)) if (is(out,'try-error')) mess <- markAsError(mess,col,out) mess <- markAsProcessed(mess,col) mess <- getOneRec(buildJQuery(app="adder", processed=FALSE), col, parseMessage, sort = c(timestamp = 1)) } mess1a <- getOneRec(buildJQuery(app="adder",uid="One"),col,parseMessage) mess2a <- getOneRec(buildJQuery(app="adder",uid="Two"),col,parseMessage) stopifnot(processed(mess1a),processed(mess2a), is.null(processingError(mess1a)), grepl("Error",processingError(mess2a)))