markAsProcessed {Proc4}R Documentation

Functions for manipulating entries in a message queue.

Description

A collection of message objects can serve as a queue: they can be sorted by their timestamp and then processed one at a time. The function markAsProcessed sets the processed flag on the message and then saves it back to the database. The function processed returns the processed flag.

The function markAsError attaches an error to the message and saves it. The function processingError returns the error (if it exists).

Usage

markAsProcessed(mess, col)
markAsError(mess, col, e)
processed(x)
processingError(x)

Arguments

mess

An object of class P4Message to be modified.

col

A mongo collection where the message queue is stored. This can also be NULL in which case the message will not be saved to the database.

e

An object indicating the error occurred. Note this could be either a string giving the error message of an object of an error class. In either case, it is converted to a string before saving.

x

A message object to be queried.

Details

A mongo collection of messages can serve as a queue. As messages are added into the queue, the processed flag is set to false. The handler then fetches them one at a time (sorting by the timestamp). It then does whatever action is required to handle the message. Then the function markAsProcessed is called to set the processed flag to true and update the entry in the database.

A typical query (this example is taken from the EIEvent-package) is getOneRec(buildJQuery(app=app, processed=FALSE), eventdb(), parseEvent, sort = c(timestamp = 1)). Here the buildJQuery call searches for unprocessed events corresponding to a particular app. The sort argument ensures that the records will be sorted in ascending order according to timestamp. In this example eventdb() in an internal method which returns the event collection, and parseEvent create event objects (which are a subclass of P4Message.

Some thought needs to be given as to how to handle errors. The function markAsError attaches an error object to the message and then updates it in the collection. The error object is turned into a string (using toString) before saving, so it can be any type of R object (in particular, it could be either the error message or the actual error object thrown by the function).

Value

The functions markAsProcessed and markAsError both return the modified message.

The function processed returns a logical value indicating whether or not the message has been processed.

The function processingError returns the error object attached to the message, or NULL if no error object is returned. Note that the error object could be of any type.

Note

The functions markAsProcessed and markAsError do not save the complete record, they just update the processed or error field.

There was a bug in early version of this function, which caused the error to be put into a list when it was saved. This needs to be carefully checked.

Author(s)

Russell Almond

See Also

P4Message, getOneRec, buildJQuery, timestamp

Examples


col <- mongo("TestMessages")
col$remove('{}')             # Clear out anything else in queue.
mess1 <- P4Message("One","Adder","Tester","Add me",app="adder",
                   details=list(x=1,y=1))
mess2 <- P4Message("Two","Adder","Tester","Add me",app="adder",
                   details=list(x="two",y=2))
mess1 <- saveRec(mess1,col,FALSE)
mess2 <- saveRec(mess2,col,FALSE)

mess <- getOneRec(buildJQuery(app="adder", processed=FALSE),
    col, parseMessage, sort = c(timestamp = 1))
iterations <- 0
while (!is.null(mess)) {
  if (iterations > 4L)
    stop ("Test not terminating, flag not being set?")
  iterations <- iterations + 1
  print(mess)
  print(details(mess))
  out <- try(print(details(mess)$x+details(mess)$y))
  if (is(out,'try-error'))
   mess <- markAsError(mess,col,out)
  mess <- markAsProcessed(mess,col)
  mess <- getOneRec(buildJQuery(app="adder", processed=FALSE),
    col, parseMessage, sort = c(timestamp = 1))

}

mess1a <- getOneRec(buildJQuery(app="adder",uid="One"),col,parseMessage)
mess2a <- getOneRec(buildJQuery(app="adder",uid="Two"),col,parseMessage)
stopifnot(processed(mess1a),processed(mess2a),
          is.null(processingError(mess1a)),
          grepl("Error",processingError(mess2a)))


[Package Proc4 version 0.4-7 Index]