Perl

Perl

Github https://github.com/zeromq/perlzmq
cpan https://metacpan.org/pod/ZMQ::FFI

Install

cpanm -v ZMQ::FFI

Examples

send/recv

use 5.012;
use ZMQ::FFI qw(ZMQ_REQ ZMQ_REP);

my $endpoint = "ipc://zmq-ffi-$$";
my $ctx      = ZMQ::FFI->new();

my $s1 = $ctx->socket(ZMQ_REQ);
$s1->connect($endpoint);

my $s2 = $ctx->socket(ZMQ_REP);
$s2->bind($endpoint);

$s1->send('ohhai');

say $s2->recv();
# ohhai

pub/sub

use 5.012;
use ZMQ::FFI qw(ZMQ_PUB ZMQ_SUB);
use Time::HiRes q(usleep);

my $endpoint = "ipc://zmq-ffi-$$";
my $ctx      = ZMQ::FFI->new();

my $s = $ctx->socket(ZMQ_SUB);
my $p = $ctx->socket(ZMQ_PUB);

$s->connect($endpoint);
$p->bind($endpoint);

# all topics
{
    $s->subscribe('');

    until ($s->has_pollin) {
        # compensate for slow subscriber
        usleep 100_000;
        $p->send('ohhai');
    }

    say $s->recv();
    # ohhai

    $s->unsubscribe('');
}

# specific topics
{
    $s->subscribe('topic1');
    $s->subscribe('topic2');

    until ($s->has_pollin) {
        usleep 100_000;
        $p->send('topic1 ohhai');
        $p->send('topic2 ohhai');
    }

    while ($s->has_pollin) {
        say join ' ', $s->recv();
        # topic1 ohhai
        # topic2 ohhai
    }
}

multipart

use 5.012;
use ZMQ::FFI qw(ZMQ_DEALER ZMQ_ROUTER);

my $endpoint = "ipc://zmq-ffi-$$";
my $ctx      = ZMQ::FFI->new();

my $d = $ctx->socket(ZMQ_DEALER);
$d->set_identity('dealer');

my $r = $ctx->socket(ZMQ_ROUTER);

$d->connect($endpoint);
$r->bind($endpoint);

$d->send_multipart([qw(ABC DEF GHI)]);

say join ' ', $r->recv_multipart;
# dealer ABC DEF GHI

nonblocking

use 5.012;
use ZMQ::FFI qw(ZMQ_PUSH ZMQ_PULL);
use AnyEvent;
use EV;

my $endpoint = "ipc://zmq-ffi-$$";
my $ctx      = ZMQ::FFI->new();
my @messages = qw(foo bar baz);


my $pull = $ctx->socket(ZMQ_PULL);
$pull->bind($endpoint);

my $fd = $pull->get_fd();

my $recv = 0;
my $w = AE::io $fd, 0, sub {
    while ( $pull->has_pollin ) {
        say $pull->recv();
        # foo, bar, baz

        $recv++;
        if ($recv == 3) {
            EV::break();
        }
    }
};


my $push = $ctx->socket(ZMQ_PUSH);
$push->connect($endpoint);

my $sent = 0;
my $t;
$t = AE::timer 0, .1, sub {
    $push->send($messages[$sent]);

    $sent++;
    if ($sent == 3) {
        undef $t;
    }
};

EV::run();