Message Queue: Exploring Queues

Terminology

  • Visibility delay -- the amount of time in seconds that the queue should wait before making a message visible to workers/consumers.
  • Visibility interval -- the amount of time in seconds that the queue should wait before allowing a message to reappear in the queue each time it has been retrieved from a queue without being deleted first.
  • Queue tags -- a collection of individual tag names that may be helpful in classifying and grouping queues.
  • Message fields -- a structured key-value collection that can be added to individual messages.
  • Expiration -- amount of time in seconds that the queue should wait before automatically deleting a message that has not been explicitly deleted otherwise.
  • Initial entry time -- the specific timestamp indicating the exact time at which a message was first published to a queue. Represented as a standard UNIX timestamp (seconds elapsed since 1970-01-01 00:00:00 UTC).

Creating and Updating a Queue

In Message Queue: Getting Started we aquired our API key and bootstrapped our example application. Now that we've successfully authenticated, we can create and modify a queue, and begin publishing and consuming messages.

$messaging = new SoftLayer_Messaging();
if (!$messaging->authenticate(QUEUE_ACCOUNT, QUEUE_USERNAME, QUEUE_API_KEY)) {
    echo "Unable to authenticate!" . PHP_EOL;
    exit;
}
 
$my_first_queue = $messaging->queue('my_first_queue')->create();

As seen here, the only parameter required to create a queue is its name. Once created, various properties of the queue can be modified. We can add tags, change the message visibility interval, or update the age at which messages will expire.

We already have a handle on the queue itself, so attributes can be updated by simply setting their values and updating the queue.

$my_first_queue->setVisibilityInterval(60);
$my_first_queue->setExpiration(604800); // 1 week
$my_first_queue->setTags(array('tag1'));
$my_first_queue->update();

Publishing Messages to a Queue

Once we've set up a queue we can begin publishing messages to it. In our PHP client library, this is as simple as the following:

$my_first_queue->message()
    ->setBody('body text')
    ->create();

The body can really be anything you want--a JSON- or XML-encoded data structure, binary data, a sentence, the name of your favorite Kylie Minogue song, and so on.

Messages may optionally include fields, which are simple key-value pairs stored in a dictionary with the message. This is handy if you wish to include additional structured data with your message, and is required if you wish to use them in variable substitutions for topic notifications (we'll discuss this more in Message Queue: Exploring Topics).

// Creates a message with two fields.
$my_first_queue->message()
    ->setBody('body text')
    ->addField('hosting_provider', 'SoftLayer')
    ->addField('hosting_rating', 'awesome')
    ->addField('snack_of_the_century', 'legumes')
    ->addField('is_sorry_for_party_rockin', 'no')
    ->create();

Other options you can set on messages include the initial visibility delay, and the ongoing visibility interval.

Visibility delay simply allows us to ensure that, once we initially publish a message, that it won't be visible to any consumers for a specific amount of time.

The visibility interval allows us to define the same behavior, except that it is only triggered each time the message is retrieved from a queue; once a consumer retrieves the message, it won't be visible to any consumers again for the specified amount of time. This is ideal if you want to ensure that a consumer has enough time to process a message prior to you explicitly deleting it, so that two consumers don't attempt to process the same thing simultaneously. And, in the event a consumer crashes or mysteriously vanishes into thin air whilst processing the message, leaving it unprocessed, another worker may pick it up and process it after the visibility interval has elapsed and the message becomes visible again.

Both values are defined in seconds.

// Creates a message with a specific visibility delay and interval,
// as well as a field.
$my_first_queue->message()
    ->setVisibilityDelay(10)
    ->setVisibilityInterval(30)
    ->setBody('making toast')
    ->addField('update_type', 'tweet')
    ->create()

Retrieving Messages

Once you've published to a queue, you might like to retrieve them eventually. You can perform this one message at a time, or in larger batches of up to 100 mssages:

// Retrieve one message
$messages = $my_first_queue->messages(1);
 
// Retrieve up to 50 messages at once
$messages = $my_first_queue->messages(50);
foreach ($messages as $message) {
    echo "Picked up message " . $message->getId() . PHP_EOL;
}

Note that, for sake of consistency, you'll receive an array of message objects in either case. In the event no messages are in the queue, you'll simply get an empty array.

Deleting Messages

After yol successfully process your messages, you likely want to delete them so you don't perform the same work all over again. To do so, you'll just need to be sure you keep a copy of the message object around, then call its delete method:

// If we still have the message object sitting around:
$message->delete();
 
// Alternatively, if we decide only to keep track of the message ID:
$message_id = 'a7bcf689887bf3e2ea9b4';
$my_first_queue->message()
    ->delete($message_id);

In most cases you'll want to perform this directly after processing a message. For instance, assuming we've created a processing function reticulate_splines that returns true if it processed successfully, and false otherwise, our retrieval, processing, and deletion may appear thusly:

$messages = $my_first_queue->messages(1);
 
foreach ($messages as $message) {
    echo "Message " . $message->getId() . PHP_EOL;
    if (reticulate_splines($message)) {
        echo " - Processed, deleting." . PHP_EOL;
        $message->delete();
    }
    else {
        echo " - Could not process!" . PHP_EOL;
    }
}

Tagging Scenearios

Tagging allows us to create interesting workflows where we may not know (or care to persist elsewhere) the name of each queue or its role. We can simply gather up all queues tagged with image_resizing, for example, to help us in resizing an original image into several smaller sizes.

While it may not be immediately clear why we would do that, reducing a workflow to small, simple components allows us to create massively parelleled systems which can scale much more simply than larger, tightly-coupled components. Imagine we have a 1920x1080 lossless 10MB image which may take 200ms to resample to 1280x720, and 50ms to resample the same image to a 100x100 cropped lossy thumbnail.

(1x worker, 250ms total)
[1920x1080] ---> [1280x720] ---> [100x100]

In this scenario, why let one worker resample into two different resolutions, one after the other? This could take much longer than allowing multiple workers to handle differing resolutions.

For instance, assuming we have two different types of workers to match the two different sizes to which we resample images, we can launch 4 workers to handle the large 1280x720 images, and 1 worker to handle the small 100x100 thumbnails.

We can then perform 4 high-resolution resamples in the amount of time it normally takes to perform 1, and in that same 200ms span of time we can also perform 4 thumbnail resamples with our low-resolution worker, since each one takes 50ms.

In the end, we'll have a complete set of high-res and low-res images for a batch of 4 separate source images. By connecting 4 additional concurrent workers to our queue, we've completed four times the amount of work in virtually the same amount of time. For a busy site that resizes images on a regular basis, this scales up a simple, moderately CPU-intensive component of the site's operation that might otherwise fall behind during unexpected spikes in traffic.

(4x high-res workers, 200ms total)
    [1920x1080] ---> [1280x720]
    [1920x1080] ---> [1280x720]
    [1920x1080] ---> [1280x720]
    [1920x1080] ---> [1280x720]
(1x low-res worker, 200ms total)
    [1920x1080] ---> [100x100], [1920x1080] ---> [100x100], [1920x1080] ---> [100x100], [1920x1080] ---> [100x100]

Publisher Script

To publish these jobs, let's assume we've created a resampler_highres queue for high-res resampling, and a separate resampler_lowres queue for low-res resampling, tagging both of them with resamplers. When submitting jobs to resize an image into all its various resolutions, we don't care specifically which queues want this information, just what type of queue does. We have thusly tagged our queues with resamplers in this case.

Our next step in submitting jobs to each queue is obtaining the list of queues which have this tag, and pushing a message to each of them:

$resamplers = $messaging->queues(array('resamplers'));
 
foreach ($resamplers as $resampler) {
    $resampler->message()
        ->setBody('my_original_image.png')
        ->create();
}

Worker/Consumer Scripts

Contrary to the way our producer behaves, our workers know specifically which queue they need to watch in order to pick up new resample jobs. They have no concept (nor should they) of tags. We have already distributed the messages--these workers should be completely single-minded.

Our high-res worker script may look like this:

while (true) {
    // Grab one message out of the high-res queue
    $messages = $messaging->queue('resampler_highres')->messages(1);
    foreach ($messages as $message) {
        $image_name = $message->getBody();
        $result = resample_image($image_name, ('1280x720-' . $image_name), 1280, 720);
        if ($result === true) {
            $message->delete();
        }
    }
    sleep(1);
}

Similarly, our low-res worker script may look like this:

while (true) {
    // Grab one message out of the low-res thumbnail queue
    $messages = $messaging->queue('resampler_thumbnail')->messages(1);
    foreach ($messages as $message) {
        $image_name = $message->getBody();
        $result = resample_image($image_name, ('100x100-' . $image_name), 100, 100);
        if ($result === true) {
            $message->delete();
        }
    }
    sleep(1);
}

And finally, our resample function, accepting source image name, destination image name, and width/height parameters (same for both workers, naturally):

function resample_image($image_name, $new_name, $new_width, $new_height)
{
    $image = ExampleImageLib::createImageFromFile($image_name);
    $image->resample($new_width, $new_height);
    $source_ratio = $image->getWidth() / $image->getHeight();
    $target_ratio = $new_width / $new_height;
    if ($target_ratio != $source_ratio) {
        $image->cropFromCenter($new_width, $new_height);
    }
    $image->saveToFile($new_name);
}

Conclusion

Since we've tagged our resampling queues, any message publisher that needs an image resized into a number of different sizes can simply publish the same message to all queues matching a specific tag. This is only one use case for tags, however; the possibilities are endless.

Additionally, as long as we have workers watching each of those queues who each resample into a specific size, adding new image sizes simply involves adding a queue and firing up another worker to process that queue's messages. (At this rate, you could probably use some object storage for your growing arsenal of images, or a small army of cloud servers to do your bidding!)

As you'll see in Message Queue: Exploring Topics, publishing to a number of queues can be made even simpler through topics and subscriptions.

Was this helpful?