Message Queue will be discontinued on March 1, 2017. IBM Bluemix (Formerly SoftLayer) will discontinue support on January 31, 2017.

Fila de mensagens: explorando filas

Terminologia

  • Atraso de visibilidade -- a quantia de tempo em segundos que a fila deve esperar antes de tornar uma mensagem visível para trabalhadores/consumidores.
  • Intervalo de visibilidade -- a quantia de tempo em segundos que a fila deve esperar antes de permitir que uma mensagem apareça novamente na fila toda vez que for recuperada de uma fila sem antes ter sido excluída.
  • Tags de filas -- uma coleção de nomes de tags individuais que podem ser úteis na classificação e no agrupamento de filas.
  • Campos de mensagens -- uma coleção estruturada de valor-chave que pode ser incluída em mensagens individuais.
  • Expiração -- a quantia de tempo em segundos que a fila deve esperar antes de excluir automaticamente uma mensagem que não foi explicitamente excluída de outra forma.
  • Horário de entrada inicial -- o registro de data e hora específico que indica o horário exato no qual uma mensagem foi publicada pela primeira vez em uma fila. Representado como um registro de data e hora UNIX padrão (segundos decorridos desde 01-01-1970 00:00:00 UTC).

Criando e atualizando uma fila

Em Fila de mensagens: introdução, adquirimos nossa chave da API e autoinicializamos nosso aplicativo de exemplo. Agora que autenticamos com sucesso, podemos criar e modificar uma fila e iniciar a publicação e o consumo de mensagens.

$messaging = new SoftLayer_Messaging();
if (!$messaging->authenticate(QUEUE_ACCOUNT, QUEUE_USERNAME, QUEUE_API_KEY)) {
    echo "Unable to authenticate!" . PHP_EOL;
    exit;
}
 
$my_first_queue = $messaging->queue('my_first_queue')->create();

Conforme visto aqui, o único parâmetro necessário para criar uma fila é seu nome. Uma vez criada, várias propriedades da fila podem ser modificadas. Podemos incluir tags, mudar o intervalo de visibilidade da mensagem ou atualizar a idade com a qual as mensagens expirarão.

Já temos um identificador na própria fila, portanto, atributos podem ser atualizados simplesmente configurando seus valores e atualizando a fila.

$my_first_queue->setVisibilityInterval(60);
$my_first_queue->setExpiration(604800); // 1 week
$my_first_queue->setTags(array('tag1'));
$my_first_queue->update();

Publicando mensagens em uma fila

Quando tivermos configurado uma fila, poderemos começar a publicar mensagens na mesma. Em nossa biblioteca do cliente PHP, isso é tão simples quanto o seguinte:

$my_first_queue->message()
    ->setBody('body text')
    ->create();

O corpo pode ser realmente qualquer coisa que você desejar -- uma estrutura de dados codificada por JSON ou XML, dados binários, uma sentença, o nome de sua música favorita da Kylie Minogue, etc.

As mensagens podem, como opção, incluir campos, que são pares simples de valor-chave armazenados em um dicionário com a mensagem. Isso é útil se desejar incluir dados estruturados adicionais com sua mensagem e é necessário se desejar usá-los em substituições de variáveis para notificações de tópicos (vamos discutir isso mais em Fila de mensagens: explorando tópicos).

// Creates a message with two fields.
$my_first_queue->message()
    ->setBody('body text')
    ->addField('hosting_provider', 'SoftLayer')
    ->addField('hosting_rating', 'awesome')
    ->addField('snack_of_the_century', 'legumes')
    ->addField('is_sorry_for_party_rockin', 'no')
    ->create();

Outras opções que podem ser configuradas em mensagens incluem o atraso de visibilidade inicial e o intervalo de visibilidade contínuo.

O atraso de visibilidade simplesmente permite assegurar que, após a publicação inicial de uma mensagem, ela não estará visível para nenhum consumidor por um período de tempo específico.

O intervalo de visibilidade permite definir o mesmo comportamento, exceto que é acionado somente cada vez que a mensagem for recuperada de uma fila; quando um consumidor recuperar a mensagem, ela não estará visível para nenhum consumidor novamente pelo período de tempo especificado. Isso é ideal se você deseja assegurar que um consumidor tenha tempo o suficiente para processar uma mensagem antes que você a exclua explicitamente, de forma que dois consumidores não tentem processar a mesma coisa simultaneamente. E,
caso um cliente trave ou misteriosamente desapareça enquanto estiver processando a mensagem, deixando-a sem processamento, outro trabalhador poderá selecioná-la e processá-la após o intervalo de visibilidade ter decorrido e a mensagem
se tornar novamente visível.

Ambos os valores são definidos em segundos.

// Creates a message with a specific visibility delay and interval,
// as well as a field.
$my_first_queue->message()
    ->setVisibilityDelay(10)
    ->setVisibilityInterval(30)
    ->setBody('making toast')
    ->addField('update_type', 'tweet')
    ->create()

Recuperando mensagens

Após ter publicado em uma fila, poderá desejar recuperá-las eventualmente. É possível executar uma mensagem específica por vez ou em lotes maiores de até 100 mensagens:

// Retrieve one message
$messages = $my_first_queue->messages(1);
 
// Retrieve up to 50 messages at once
$messages = $my_first_queue->messages(50);
foreach ($messages as $message) {
    echo "Picked up message " . $message->getId() . PHP_EOL;
}

Observe que, por questão de consistência, você receberá uma matriz de objetos de mensagens em qualquer um dos casos. Caso nenhuma mensagem esteja na fila, simplesmente obterá uma matriz vazia.

Excluindo mensagens

Após processar as mensagens com sucesso, provavelmente, irá querer excluí-las para não executar o mesmo trabalho novamente. Para fazer isso, você precisará apenas certificar-se de que uma cópia do objeto de mensagem seja mantida, em seguida, chame seu método de exclusão:

// If we still have the message object sitting around:
$message->delete();
 
// Alternatively, if we decide only to keep track of the message ID:
$message_id = 'a7bcf689887bf3e2ea9b4';
$my_first_queue->message()
    ->delete($message_id);

Na maioria dos casos, você irá querer executar isso diretamente após o processamento de uma mensagem. Por exemplo, supondo que criamos uma função de processamento reticulate_splines que retorna true se for processada com sucesso e, caso contrário, false; nossa recuperação, processamento e exclusão podem aparecer da seguinte forma:

$messages = $my_first_queue->messages(1);
 
foreach ($messages as $message) {
    echo "Message " . $message->getId() . PHP_EOL;
    if (reticulate_splines($message)) {
        echo " - Processed, deleting." . PHP_EOL;
        $message->delete();
    }
    else {
        echo " - Could not process!" . PHP_EOL;
    }
}

Cenários de identificação

A identificação permite criar fluxos de trabalho interessantes nos quais podemos não saber (ou se importar em persistir em outro local) o nome de cada fila ou sua função. Podemos simplesmente reunir todas as filas identificadas com image_resizing, por exemplo, para ajudar no redimensionamento de uma imagem original em diversos tamanhos menores.

Embora possa não estar imediatamente claro por que faríamos isso, reduzir um fluxo de trabalho a componentes pequenos e simples permite criar sistemas massivamente paralelos que podem escalar de forma muito mais simples do que componentes maiores fortemente acoplados. Imagine que temos uma imagem de 10 MB sem perdas de 1920x1080 que pode levar 200 ms para testar novamente para 1280x720 e 50 ms para testar novamente da mesma imagem para uma miniatura com perdas cortada para 100x100.

(1x worker, 250ms total)
[1920x1080] ---> [1280x720] ---> [100x100]

Neste cenário, por que permitir que um trabalhador teste novamente em duas resoluções diferentes, uma após a outra? Isso poderia levar muito mais tempo do que permitir que diversos trabalhadores manipulem resoluções diferentes.

Por exemplo, supondo que temos dois tipos diferentes de trabalhadores para corresponder os dois diferentes tamanhos para os quais testamos novamente imagens, podemos ativar quatro trabalhadores para manipularem as imagens grandes de 1280x720 e um trabalhador para manipular as miniaturas pequenas de 100x100.

Poderemos executar, então, quatro novos testes de alta resolução no mesmo tempo que normalmente leva para executar um e, nesse mesmo período de 200 ms, também podemos executar quatro novos testes de miniaturas com nosso trabalhador de baixa resolução, já que cada um leva 50 ms.

Ao término, teremos um conjunto completo de imagens de alta resolução e de baixa resolução para um lote de quatro imagens de origem separadas. Ao conectar quatro trabalhadores simultâneos adicionais à nossa fila, concluímos quatro vezes a quantia de trabalho em praticamente o mesmo período de tempo. Para um site ocupado que redimensiona imagens regularmente, isso aumenta a capacidade de um componente simples com intensidade moderada de CPU da operação do site que, caso contrário, poderia ficar para trás durante picos inesperados no tráfego.

(4x high-res workers, 200ms total)
    [1920x1080] ---> [1280x720]
    [1920x1080] ---> [1280x720]
    [1920x1080] ---> [1280x720]
    [1920x1080] ---> [1280x720]
(1x low-res worker, 200ms total)
    [1920x1080] ---> [100x100], [1920x1080] ---> [100x100], [1920x1080] ---> [100x100], [1920x1080] ---> [100x100]

Script do publicador

Para publicar essas tarefas, vamos supor que criamos uma fila resampler_highres para novos testes de alta resolução e uma fila resampler_lowres separada para novos testes de baixa resolução, identificando ambas com resamplers. Ao enviar tarefas para redimensionar uma imagem em todas as suas várias resoluções, não nos importamos especificamente com quais filas desejam essas informações, apenas que tipo de fila deseja. Sendo assim, identificamos nossas filas com resamplers, neste caso.

Nossa próxima etapa em enviar tarefas a cada fila é obter a lista de filas que têm essa tag e enviar por push uma mensagem a cada uma delas:

$resamplers = $messaging->queues(array('resamplers'));
 
foreach ($resamplers as $resampler) {
    $resampler->message()
        ->setBody('my_original_image.png')
        ->create();
}

Scripts de trabalhador/consumidor

De forma contrária à maneira como nosso produtor se comporta, nossos trabalhadores sabem especificamente qual fila precisam observar para captar novas tarefas de testar novamente. Eles não têm nenhum conceito (nem deveriam ter) de tags. Já distribuímos as mensagens -- esses trabalhadores devem ter somente um único objetivo.

Nosso script de trabalhador é alta resolução pode ser semelhante a este:

while (true) {
    // Grab one message out of the high-res queue
    $messages = $messaging->queue('resampler_highres')->messages(1);
    foreach ($messages as $message) {
        $image_name = $message->getBody();
        $result = resample_image($image_name, ('1280x720-' . $image_name), 1280, 720);
        if ($result === true) {
            $message->delete();
        }
    }
    sleep(1);
}

De forma similar, nosso script de trabalhador se baixa resolução pode ser semelhante a este:

while (true) {
    // Grab one message out of the low-res thumbnail queue
    $messages = $messaging->queue('resampler_thumbnail')->messages(1);
    foreach ($messages as $message) {
        $image_name = $message->getBody();
        $result = resample_image($image_name, ('100x100-' . $image_name), 100, 100);
        if ($result === true) {
            $message->delete();
        }
    }
    sleep(1);
}

E, por fim, nossa função de testar novamente, aceitando o nome da imagem de origem, o nome da imagem de destino e os parâmetros de largura/altura (iguais para ambos os trabalhadores, naturalmente):

function resample_image($image_name, $new_name, $new_width, $new_height)
{
    $image = ExampleImageLib::createImageFromFile($image_name);
    $image->resample($new_width, $new_height);
    $source_ratio = $image->getWidth() / $image->getHeight();
    $target_ratio = $new_width / $new_height;
    if ($target_ratio != $source_ratio) {
        $image->cropFromCenter($new_width, $new_height);
    }
    $image->saveToFile($new_name);
}

Conclusão

Como identificamos nossas filas de testar novamente, qualquer publicador de mensagem que precise de uma imagem redimensionada em diversos tamanhos diferentes pode simplesmente publicar a mesma mensagem em todas as filas que correspondem a uma tag específica. Esse é apenas um caso de uso para tags, no entanto; as possibilidades são infindáveis.

Além disso, desde que tenhamos trabalhadores observando cada uma dessas filas, sendo que cada uma será testada novamente para um tamanho específico, a inclusão de novos tamanhos de imagens simplesmente envolve incluir uma fila e disparar outro trabalhador para processar as mensagens dessa fila. (Nesse ritmo, você provavelmente poderia usar algum armazenamento de objeto para seu arsenal de imagens crescente ou um pequeno exército de servidores em nuvem para realizarem suas tarefas!)

Como verá em Fila de mensagens: explorando tópicos, a publicação em diversas filas pode ser ainda mais simplificada por meio de tópicos e assinaturas.