Digital Garden
Computer Science
Distributed Systems
Messaging

Messaging

Asynchronous Communication

We have seen synchronous communication already for example sockets or a telephone connection. However, there is also the asynchronous communication model which involves a Message Oriented Middleware (MOM) between the client and the server. This form of communication can be seen for example when sending an email where the Mail Server is the MOM. Asynchronous communication allows for applications to be loosely coupled as they only need to agree on the message format and not the API. It also means that the sender and receiver don't have to be active at the same time.

When it comes to protocols for asynchronous communication there is either the JMS (Java Message Service / Jakarta Messaging) Protocol or other protcols which use the TCP protocol such as:

  • AMQP: Advanced Message Queuing Protocol, which is a Binary protocol with four messaging models (direct, topic, fanout and header).
  • STOMP: Simple (or Streaming) Text Oriented Messaging Protocol, which is a Text based protocol similar to HTTP.
  • MQTT: MQ Telemetry Transport, a lightweight protocol, intended to be used in IoT environments (small footprint) with a publish and subscribe pattern and no queues and supports the delivery guarantees: at least once, at most once, exactly once and last wish?

AMQP - Advanced Message Queuing Protocol

As mentioned the AMQP protocol is a binary protocol that contains the following three key components:

  • Exchanges: Endpoints of the broker that receives messages.
  • Queues: Endpoints that store messages from exchanges and are used by subscribers to retrieve messages.
  • Bindings/Routings: Rules that bind/route exchanges to queues.

These concepts are programmable, meaning they can be created, modified and deleted. This also means that there can be multiple channels inside a single TCP connection which can save the overhead of having multiple connections. When a message is sent the sender needs to define the exchanger, the routing key and the payload.

amqpComponents

There are four patterns when interacting with messages:

  • Direct Exchange: Queues that are bound to an exchanger with the same key that is used to publish a message will receive the message
  • Fanout Exchange: Broadcast of the message to all queues that are bound to it (binding key is not used) which is suitable for the publish and subscribe pattern.
  • Topic Exchange: Routes messages to all queues that have a binding key that matches the routing key which is suitable for routing messages to different queues based on the type of message.
  • Headers Exchange: Messages are routed based on custom message headers

amqpMessagePatterns

RabbitMQ

Is an open-source message broker written in Erlang that implements the AMQP protocol which can also be extended to MQTT, STOMP and HTTP.

First you need to connect to a broker:

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("username"); // Default: guest
factory.setPassword("password"); // Default: guest
factory.setVirtualHost("myRabbit"); // Default: /
factory.setHost("69.69.69.69"); // Default: localhost
factory.setPort(5672); // Default: 5672
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

The next step would be to declare a queue. Durable queues survive server restarts, exclusive queues are restricted to a connection and auto-delete means the server can delete the queue when it is no longer used.

channel.queueDeclare(QUEUE_NAME,
/* durable: */ false,
/* exclusive: */ false,
/* autoDelete: */ false,
/* arguments: */ null
);

To then publish a message we can use the default exchange:

String message = "Hello World at " + LocalDateTime.now();
channel.basicPublish(
/* exchange: */ "",
/* routing key: */ QUEUE_NAME,
/* props: */ null,
/* body: */ message.getBytes(StandardCharsets.UTF_8));

To then finally receive a message callbacks can be registered:

DeliverCallback deliverCallback = (consumerTag, message) -> {
    String text = new String(message.getBody(), "UTF-8");
};
CancelCallback cancelCallback = consumerTag -> {
    System.out.println("Cancelled by the server");
}
channel.basicConsume(QUEUE_NAME,
/* autoAck */ true, deliverCallback, cancelCallback);

Echo Example

channel.queueDeclare(RPC_QUEUE_NAME,
    /* durable:    */ false,
    /* exclusive:  */ false,
    /* autoDelete: */ false,
    /* arguments:  */ null);
 
final String corrId = UUID.randomUUID().toString();
 
String replyQueueName = channel.queueDeclare().getQueue();
System.out.println(replyQueueName);
 
AMQP.BasicProperties props = new AMQP.BasicProperties
    .Builder()
    .correlationId(corrId)
    .replyTo(replyQueueName)
    .build();
 
String message = String.format("Hello World from %s at %s",
    System.getProperty("user.name"),
    LocalDateTime.now());
System.out.println(message);
 
channel.basicPublish(
    /* exchange:    */ "",   // Exchange: empty string is called "default exchang" which is a direct exchange.
    /* routing key: */ RPC_QUEUE_NAME,
    /* props:       */ props,
    /* body:        */ message.getBytes(StandardCharsets.UTF_8));
 
 
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
 
String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
    response.offer(new String(delivery.getBody(), "UTF-8"));
}
}, consumerTag -> {
});
 
String result = response.take();
channel.basicCancel(ctag);
 
System.out.println(result);

Publish and Subscribe

// PUBLISHER
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = "Current Date: " + LocalDateTime.now();
channel.basicPublish(EXCHANGE_NAME,
/* routing key: */ "", //!!!!!!
/* properties: */ null,
/* body: */ message.getBytes(StandardCharsets.UTF_8));
 
// SUBSCRIBER
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Received '" + message + "'");
};
channel.basicConsume(queueName,
/* autoAck */ true, deliverCallback, consumerTag -> {});