WARNING: This text is deprecated and refers to an old version of ØMQ. It remains here for historical interest. DO NOT USE THIS TO LEARN ØMQ.
There are several new features in ØMQ/0.5. Most importantly, from this version onwards ØMQ becomes a multi-protocol messaging system. Instead relying on single wire-level protocol, user is free to choose the protocol to use. Other new features include auto-reconnect, flow control and queue limits. .NET extension is part of the new release as well.
We have already discussed some of the concepts and algorithms in design documents for previous versions of ØMQ:
- The ØMQ message wire format;
- The analysis of how to pass messages efficiently between threads;
- The analysis of how to achieve high message throughput over the network;
- How to get flexible handling of many concurrent message flows with possibly distinct wire formats and polling mechanisms while preserving optimal scaling on multicore systems is discussed;
- How AMQP-style exchange/queue semantics are implemented in ØMQ;
- Discussion of ØMQ directory service - how objects are located on the network;
- Discussion of how congestion control is handled in ØMQ.
- How large amount of simultaneous connections is managed in efficient way.
With previous version of ØMQ when connection broke between two components, both were notified about the fact using callback function (error_handler) but no attempt to reestablish the connection was made. The responsibility for reestablishment was shifted to applications on the top of ØMQ.
Starting with ØMQ/0.5 broken connections will be reestablished automatically. This was made the default behaviour.
What follows is a description of how you can hook-up into the auto-reconnect mechanism:
error_handler functionality still works as it used to work. When connection is broken, error handler you've supplied is invoked immediately. You can do whatever work needed at that point, however, keep in mind that error handler is invoked in asynchronous manner and thus you need to synchronise it manually with your application threads. Error handler can return either false in which case application is crashed immediately or true in which case disconnection is ignored silently and auto-reconnect functionality is invoked.
Moreover, there's a new notification mechanism that allows you to spot the place in the queue where reconnection happened and where thus some messages may be missing. This kind of functionality is important when messages are dependent on preceding messages. Imagine a video feed being passed via ØMQ where full frame is send once a second while intermediate frames are sent only in a form of diff from a previous frame. When reconnection happens, you want to be notified that there are messages missing at a particular point so that you can start dropping diffs until you get a full frame and resume normal behaviour.
To get this functionality, you have to switch "gap messages" on first (they are disabled by default to keep backward compatibility):
Now you can receive messages in a standard manner:
message_t message; api->receive (&message);
However, the message received can actually be a gap message. Gap message is not an actual message, rather it's a notification about messages possibly missing at the particular point of the queue. You can test whether message is a gap message this way:
if (message.type () == message_gap) ...
Caution: For auto reconnect to work correctly, it is essential to use fixed port numbers. If auto assigned port numbers are used, they will change as applications fail and get restarted. This can cause application to reconnect to the non-existent port or even port that's used by a different application. Auto assigned port numbers (e.g. "eth0") are still supported to guarantee backward compatibility, however, you should use fixed port numbers if you are relying on auto-reconnect feature (e.g. "eth0:5555").
Flow control & queue limits
The main motivation for this work was a need to distribute queue storage capacity across the network. There are nodes that have a lot of memory and there are nodes with so little memory is barely enough to run the application. Storage on some nodes is highly reliable, while other nodes are prone to failures and data loses. To reflect this fact, messaging infrastructure administrator has to decide how much data to store on each node and what to do when the storage limit on a particular node is exceeded.
From version 0.5 on all the internal queues (those not directly visible to the user) in ØMQ are limited, i.e. the number of messages in the internal queue cannot exceed some preset limit. The only unbound queues are the queues created by user (via create_queue method). These are unlimited by default but there's an option to set a limit when required.
The diagram above shows how queue limits work. Each queue is actually a collection of independent pipes (one pipe per queue writer) so the high watermark and low watermark you set when creating a queue isn't applied to the queue as a whole, rather it is used on per-writer basis. The messages in the pipe are numbered in a sequential manner. The thread that reads messages from the pipe knows the sequence number of the last message retrieved from the pipe (head), the thread that writes messages to the pipe knows the sequence number of the last message written to the pipe (tail). Once in a while reading thread sends a notification to the writer thread containing the actual head sequence number. Writer thread remembers it as last-head value.
The algorithm for enforcing pipe limits is pretty easy. If tail - last-head > high-watermark then no more messages can be written to the pipe. If it's application thread that attempts to write the message, it gets blocked. If it's protocol engine trying to write message received from the network, underlying network connection is suspended and no more data are read from it.
Once the size of a suspended pipe drops below low-watermark (tail - last-head < low-watermark) writing of messages is unblocked - either application thread resumes execution or more data can be read from the socket.
Now, with a little thinking it's clear that tail position notifications from the reader thread to the writer thread have to be sent at least each high-watermark - low-watermark messages to make queue limits work in a precise way. The consequence is that if high-watermark and low-watermark are close each to another, system has to pass a lot of notifications between the two threads. That can hurt performance in a serious manner. If high-watermark and low-watermark are just 1 message apart, passing messages through the pipe is performed in a lockstep fashion. Try to keep your watermarks hundreds or thousands messages apart to get the best performance.
At this point, let's discuss what actually happens when a queue is full. The intention of the following text is to make it clear that applying queue limits without caution can be a dangerous practice.
When queue is filled up no more messages can be written to it. If the sender is a business application, it gets blocked and stops processing more data till there's empty space in the queue to write the messages to. If the application happens to work in receive-request/process-it/send-response loop (which is often the case) it means that no more messages are received (application thread is blocked in send method, it cannot invoke receive method). Consequently, if there's a queue limit set on the queue, it ultimately hits its limit as no messages are read from it.
When the sender is a protocol engine (component handling specific network data flow) it stops reading data from the network. If the network protocol itself applies flow control mechanism (that's the case with TCP or SCTP) network buffers get filled and finally no more data can be sent through the connection - sending application will block.
It now becomes clear that if queue limits are set at every node in the network, failure to move the processing ahead at a specific point (because of network failure, application hang-up etc.) propagates upstream until it reaches ultimate source of the data.
Let's consider distribution of data in a tree like network topology:
If the leftmost application hangs up, message jam propagates up the pyramid as already explained (dotted arrows represent congestion propagation path). At the same time if a node gets stuck because it has blocked child node, it cannot propagate messages even to the healthy subnodes - siblings of the blocked node. Congestion propagates downstream. Once the topmost node gets stuck, no more messages can enter the system and what you get is global standstill.
So, how should the problem be handled?
Crucial point is not to mindlessly set the same queue limits everywhere. You should take into account which nodes are able to accumulate messages in congestion periods and which nodes have to block pretty quickly not to run out of memory. That way you can engineer "data dams" to hold the messages in the peak periods and pass them on once the processing resources are available.
Following diagram shows such "data dam" architecture. There are boxes labeled "app" which have a lot of CPU power but just a moderate amount of storage. There are different boxes labeled "data dam" that have plenty of storage space and are specifically designed to hold messages in the periods when the system is congested. "Apps" have very tight queue limits to ensure that they won't run out of memory. "Data dams" on the other hand have either no queue limits or limits are set very high. The dotted arrows show the paths of congestion propagation. Obviously, congestion cannot propagate more than one step upstream because in the front of each application there is a "data dam" that is able to store excess messages:
Note: If you are considering using "data dam" architecture, you may want to have a look at swap-to-file functionality implemented in 0.3.3 branch (check swap_size parameter to create_queue function). It's an optional feature that stores messages on the disk once the queue limit is reached. It makes it possible to store hundreds of gigabytes of data in the "data dam" when congestion hits while keeping the performance high when the "dam" is empty.
Auto selection of network interface
This is a small enhancement, but it makes life easier in early phases of application development. It allows you to create a global object (queue or exchange) without specifying the network interface to use. Use * (asterisk sign) optionally with a port number (e.g. *:5555) instead of exact network interface name (e.g. eth0).
What happens in the background is that the object binds to all the available network interfaces and registers itself in the directory service (zmq_server) using host name rather than IP address. Applications binding to such a global object get the host name from the directory service and resolve it to get IP address to connect to.
While it is a convenient feature while you are developing the application, we discourage you to use it in production. To work, it requires the host name resolving service to be accessible and unavailability of DNS service will seriously damage your system. If you still want to use it, we suggest to modify your local list of know hosts (/etc/hosts on Linux) so that you don't have to rely on DNS to resolve host names.
.NET extension for ØMQ is a library called libdnzmq.dll containing a single object called Dnzmq. Here is an example of how it can be used from C#:
// Create .NET object encapsulating 0MQ functionality. Dnzmq transport = new Dnzmq ("server001.brouhaha-technologies.com:5682"); // Set up the wiring. int eid = transport.create_exchange ("E", Dnzmq.SCOPE_GLOBAL, "192.168.0.1:5555"); int qid = transport.create_queue ("Q", Dnzmq.SCOPE_LOCAL, ""); transport.bind ("XYZ", "Q"); // Send a message. byte  out_msg = new byte ; out_msg  = 100; out_msg  = 101; transport.send (eid, out_msg); // Receive a message. byte  in_msg = transport.receive ();
Here's the same piece of code translated into Visual Basic:
' Create .NET object encapsulating 0MQ functionality. Dim Transport As New Dnzmq("server001.brouhaha-technologies.com:5682") ' Set up the wiring. Dim ExchangeId As Integer = Transport.create_exchange("E", _ Dnzmq.SCOPE_GLOBAL, "192.168.0.1:5555") Dim QueueId As Integer = Transport.create_queue("Q", Dnzmq.SCOPE_LOCAL, "") Transport.bind("XYZ", "Q") ' Send a message. Dim OutMsg(1) As Byte OutMsg(0) = 100 OutMsg(1) = 101 Transport.send(ExchangeId, OutMsg) ' Receive a message Dim InMsg() As Byte = Transport.receive()
Quick C# and Visual Basic tests performed on two untuned Core2 Duo desktops with Windows XP and 1Gb Ethernet connecting them both show end-to-end latency of 90 microseconds and throughput of 1.6 million messages per second (for messages 1 byte long).
.NET extension is available on Windows platform. It is written in managed C++ and thus it cannot be compiled with Mono.
ØMQ/0.5 allows for different transport-level and MOM-level protocols. To keep backward compatibility original ØMQ backend protocol over TCP was made default. You can specify the global object location in the old way, say "eth0:5555". However, this is now considered only an abbreviated version of full object location, i.e. "bp/tcp://eth0:5555".
In other words, when you are creating a queue at "bp/tcp://192.168.0.1:5555" what you are saying is: "This queue is accessible via ØMQ backend protocol over TCP on IP address 192.168.0.1 and port 5555."
Aside of bp/tcp protocol you can use other protocols as well. PGM ("bp/pgm://"), AMQP ("amqp://") and SCTP ("sctp://") were added as experimental extensions to ØMQ/0.5. So, when specifying object location, you may choose to use "sctp://eth0:5555" instead of "bp/tcp://eth0:5555" and ØMQ will automatically transfer messages via SCTP instead of TCP.
Following features are part of ØMQ/0.5, however, they are meant as a technology preview rather than production-quality features.
PGM reliable multicast support
Pragmatic General Multicast (PGM) is a reliable multicast transport protocol for applications that require ordered or unordered, duplicate-free, multicast data delivery from multiple sources to multiple receivers. PGM guarantees that a receiver in the group either receives all data packets from transmissions and repairs, or is able to detect unrecoverable data packet loss. PGM is specifically intended as a workable solution for multicast applications with basic reliability requirements. Its central design goal is simplicity of operation with due regard for scalability and network efficiency.
The goal of using multicast is to minimise network traffic. If there are 10 applications receiving same data you can cut your network traffic by 90% by using multicast. With multicast, each packet is passed though the network once only to be delivered to all the consumers. Multicast can have a positive effect on the latency as well. If you are sending a message to 10 consumers via TCP, sender application has to traverse networking stack 10 times to send the message to each consumer. With multicast it traverses networking stack once only, making the average latency better.
There are several open source as well as proprietary implementations of PGM protocol. Moreover, Cisco routers are optimised for handling PGM traffic (NAK and RDATA suppression).
To achieve high throughput, ØMQ batches messages into PGM packets same way as it does with TCP (see here). Thus, if traffic is high, many ØMQ messages can be transferred in a single PGM data packet. Large messages exceeding MTU size of your network, on the other hand, can be split between several PGM packets. The consequence is that ØMQ messages aren't aligned with PGM packet boundary.
This can become a problem when there are late joiners entering the message feed. "Late joiner" is a message consumer that starts to consume messages once the feed is running rather that joining the feed in advance.
Late joiner can get a first PGM packet that has a trailing part of the message started in the previous packet at its beginning. As ØMQ messages are meant to be atomic (you cannot get half of a message) we have to discard the trailing part of the message and start parsing the feed at the beginning of the next message. To achieve this behaviour we had to modify the wire format to include an information about where the first message in the packet begins:
As can be seen in the diagram, each PGM packet contains offset field (16 bit integer) immediately following PGM header. Offset specifies where the first ØMQ message in the packet begins. If there's no beginning of a message in the packet (we are in the middle of a large message), offset field is set to 0xffff.
To use PGM within the context of ØMQ, create global exchange that will act as multicast message source. Parmeters required are network interface to send multicast data to, multicast group and port number:
api->create_exchange ("E", scope_global, "bp/pgm://eth0;184.108.40.206:5555", io)
Comment: For IPv4, multicast groups range from 220.127.116.11 to 18.104.22.168.
On the receiver side, create a local queue and bind to the global exchange created by the sender application. Still, you'll need to supply one more parameter on the receiver side - address of the network interface that should be used for receiving the multicast packets. Pass this argument to the bind method:
api->create_queue ("Q", scope_local); api->bind ("E", "Q", io, NULL, "eth0", NULL);
At this point you should start to receive messages from the sender.
Comment: On Linux, PGM-enabled applications have to be run with root privileges. The cause is that there is no PGM implementation in the kernel and you have to send/receive PGM packets from the user space. Manipulation of raw IP sockets from user space requires root privileges. If you want to use multicast without root privileges you have to use PGM encapsulated within UDP (use say "bp/pgm://udp:eth0;22.214.171.124:5555" instead of "bp/pgm://eth0;126.96.36.199:5555"). Note that this is not the real PGM, i.e. it won't intercommunicate with other PGM implementations. Also, you have to set these two environment variables:
$ export PGM_TIMER="TSC" $ export PGM_SLEEP="TSC"
PGM support for ØMQ is available on Linux platform only (using OpenPGM implementation of the protocol). In the future we are going to implement PGM support on Windows platform using native Windows PGM implementation.
AMQP is an open Internet Protocol for Business Messaging. AMQP's scope covers messaging within and between firms, with applicability to both business and infrastructure messaging. AMQP is developed by a working group consisting of Cisco Systems, Credit Suisse, Deutsche Börse Systems, Envoy Technologies, Inc., Goldman Sachs, IONA Technologies PLC, iMatix Corporation sprl., JPMorgan Chase Bank Inc. N.A, Microsoft Corporation, Novell, Rabbit Technologies Ltd., Red Hat, Inc., TWIST Process Innovations Ltd, WS02, Inc. and 29West Inc.
AMQP model defines two basic roles. AMQP broker is the hub of the system, speaking to all clients and dispatching messages between them. AMQP clients connect to the AMQP broker. There is no way for clients to speak each to another directly.
This architecture is inherently different from ØMQ's distributed peer-to-peer symmetric model. Thus, AMQP cannot be used to communicate between different ØMQ applications. Instead, AMQP can be used to connect ØMQ universe of applications to an AMQP broker. Via the broker these applications are able to speak to all the AMQP applications (clients) connected to the broker. At the moment there's no way for AMQP applications to speak to ØMQ applications directly without a broker in the middle.
The simplest possible scenario is to use ØMQ as an AMQP client. ØMQ universe in this case consist of a single application, the rest of the system is handled by the AMQP broker:
Slightly more complex scenario would be to use AMQP broker as a simple node within a ØMQ universe. The nice thing about this architecture is that you can use AMQP broker to provide the functionality that is not yet provided by ØMQ itself - say AMQP broker can be used as a persistent store for messages:
Following scenario features ØMQ as a way to bridge disparate AMQP brokers. Some AMQP brokers like OpenAMQ do have this bridging functionality built-in (federation), others do not. ØMQ builds on this idea and provides much more flexible way of managing such distributed clusters:
As for now, it should be noted that you can plug only a single AMQP broker to ØMQ universe. It's not caused by any deficiency of ØMQ system, it's just an attempt to keep zmq_server command line simple:
$ ./zmq_server 5682 amqp.bigenterprise.com:5672
In the future the configuration will be stored in a file or - optionally - in a directory service. At that point adding several AMQP brokers into the universe will be pretty simple.
At the moment, however, there is at most one AMQP broker, represented as a pair of global objects. You can bind your application to the global queue called AMQP or to the global exchange called AMQP depending on whether you want to send or receive messages. Use the name of AMQP queue you want to send/receive messages from as a binding parameter:
// Send message to queue MyQueue located at AMQP broker. int eid = api->create_exchange ("E"); api->bind ("E", "AMQP", NULL, io, NULL, "MyQueue"); message_t msg (100); api->send (eid, msg);
// Receive message from queue MyQueue located at AMQP broker. api->create_queue ("Q"); api->bind ("AMQP", "Q", io, NULL, "MyQueue", NULL); message_t msg; api->receive (&msg);
ØMQ supports 0-9-1 version of AMQP. This version is currently implemented by OpenAMQ message broker. RabbitMQ team is also committed to implementing version 0-9-1 of AMQP. You should contact RabbitMQ by the mailing list for current details.
ØMQ support for AMQP is available on all platforms.
The Stream Control Transmission Protocol (SCTP) is a new IP transport protocol, existing at an equivalent level with UDP and TCP, which provide transport layer functions to many Internet applications. Like TCP, SCTP provides a reliable transport service, ensuring that data is transported across the network without error and in sequence. Unlike TCP, SCTP provides a number of functions that are critical for telephony signaling transport, and at the same time can potentially benefit other applications needing transport with additional performance and reliability.
SCTP can be used as a substitute for TCP in ØMQ-based system. Just change the way you declare your global objects from:
api->create_queue ("Q", scope_global, "bp/tcp://192.168.0.12:5555", io, 1, &io);
api->create_queue ("Q", scope_global, "sctp://192.168.0.12:5555", io, 1, &io);
To allow interoperability with native SCTP applications and leverage specific SCTP capabilities (e.g. possibility of on-wire interception & analysis of messages) design decision was made to map ØMQ messages to SCTP messages on one-to-one basis. The consequence is that we cannot batch several ØMQ messages into a single SCTP message to improve performance. Thus, you should be aware that SCTP transport isn't as fast as TCP transport.
Another problem with SCTP transport is that Linux kernel implementation of SCTP doesn't support explicit EOR mode at the moment meaning that the maximal size of the message you can send is limited by the size of SCTP buffer. At ØMQ level, we make sure that the size of message doesn't exceed 4kB. If you want to send messages larger than that, modify max_sctp_message_size constant in config.hpp source file.
SCTP support is available out of the box on Linux, Solaris and FreeBSD platforms. If you want to run ØMQ over SCTP on a platform where SCTP is not an integral part of the operating system, it should be trivial to tweak ØMQ to work with your implementation of SCTP.
ØMQ/0.5 delivers several new features that can prove to be of great value in real-world scenarios. Auto-reconnect improves overall robustness of the system and makes it tolerate even network outages and application failures. Flow control and queue limits allow you to explicitly control the system in congestion conditions. .NET extension provides access to the universe of C# and Visual Basic business applications while multi-protocol support allows you to leverage advantages of different networking protocols.
If you found this page useful, please rate it up so others will find it.
- 0MQ Termination
- 0MQ/3.0 pubsub
- Background to AMQP
- Broker vs. Brokerless
- High-speed message matching
- Internal Architecture of libzmq
- Market Analysis
- Measuring jitter
- Measuring messaging performance
- Messaging enabled network
- Multithreading Magic
- ØMQ Lightweight Messaging Kernel (v0.1)
- ØMQ Lightweight Messaging Kernel (v0.2)
- ØMQ Lightweight Messaging Kernel (v0.3)
- ØMQ Lightweight Messaging Kernel (v0.4)
- ØMQ Lightweight Messaging Kernel (v0.6)
- Routing (early experience)
- Switch or Broker
- Traffic Monitoring
- Whalesharks - an Open Source Data Plant
Who's watching this page?Dominic Luciano
Fernando J Quintero
... and more