This is a prerelease version.

View latest

Group Messages from Apache Pulsar

Apache Pulsar is an open-source distributed pub-sub messaging system. By using Pulsar connector module, Hazelcast can consume messages from Pulsar topics.

Hazelcast allows us to compute different aggregation functions(sum, avg etc.) on specified window of stream items. It also provide a function to group them by some key and then aggregate over each group separately.

In this tutorial, you will build a pipeline that receives messages from Pulsar and then counts them by grouping them by their user keys. You will use the concept of a sliding window to process this continuous stream.

Before You Begin

To complete this tutorial, you need the following:

Prerequisites Useful resources

A Hazelcast cluster running in client/server mode

Step 1. Start Apache Pulsar

If you already have Pulsar and you skip below steps:

If you don’t have it already, you can download, and run Pulsar in standalone mode on your machine for testing purposes.

  1. Download Apache Pulsar

  2. Extract it into directory and go into ./bin directory.

  3. Run the local Pulsar broker by calling`./pulsar standalone`. It will start on localhost:6650 by default. For more details about standalone mode, view

For the code sample, we will assume that the Pulsar is running on localhost:6650.

Step 2. Create a New Java Project

Create a blank Java project named pulsar-example and copy the Gradle or Maven file into it:

  • Gradle

  • Maven

plugins {
    id 'java'
    id 'com.github.johnrengelman.shadow' version '5.2.0'
}

group 'org.example'
version '1.0-SNAPSHOT'

sourceCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {
    implementation 'com.hazelcast:hazelcast:5.1-SNAPSHOT'
    implementation 'com.hazelcast.jet-contrib:pulsar:0.1'
    implementation 'org.apache.pulsar:pulsar-client:2.5.0'
}

jar {
    enabled = false
    dependsOn(shadowJar{archiveClassifier.set("")})
    manifest.attributes 'Main-Class': 'org.example.JetJob'
}

shadowJar {
    dependencies {
        exclude(dependency('com.hazelcast:hazelcast:5.1-SNAPSHOT'))
    }
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>pulsar-example</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.hazelcast.jet</groupId>
            <artifactId>hazelcast-jet</artifactId>
            <version>5.1-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.hazelcast.jet.contrib</groupId>
            <artifactId>pulsar</artifactId>
            <version>0.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-client</artifactId>
            <version>2.5.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>org.example.JetJob</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.2</version>
                <configuration>
                    <filters>
                        <filter>
                            <artifact>*:*</artifact>
                            <excludes>
                                <exclude>META-INF/*.SF</exclude>
                                <exclude>META-INF/*.DSA</exclude>
                                <exclude>META-INF/*.RSA</exclude>
                            </excludes>
                        </filter>
                    </filters>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.hazelcast:hazelcast</exclude>
                                </excludes>
                            </artifactSet>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Step 3. Publish Messages to a Pulsar Topic

The program below connects the previously started pulsar cluster located at pulsar://localhost:6650. And then, it iteratively picks a user uniformly at random, then creates event on behalf of this user, and sends this event as a message to the Pulsar topic named hz-jet-topic.

package org.example;

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;

import java.util.Random;
import java.util.concurrent.TimeUnit;

public class MessagePublisher {

    public static class Event {
        public String user;
        public Long eventCount;
        public String message;

        public Event() {
        }

        public Event(String user, Long eventCount, String message) {
            this.user = user;
            this.eventCount = eventCount;
            this.message = message;
        }
        public String getUser() {
            return user;
        }

        public Long getEventCount() {
            return eventCount;
        }

        public String getMessage() {
            return message;
        }

    }

    public static void main(String[] args) throws Exception {
        String topicName = "hz-jet-topic";
        PulsarClient client = PulsarClient.builder()
                                          .serviceUrl("pulsar://localhost:6650")
                                          .build();
        String[] userArray = {"user1", "user2", "user3", "user4", "user5"};
        Producer<Event> producer = client
                .newProducer(Schema.JSON(Event.class))
                .topic(topicName)
                .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                .sendTimeout(10, TimeUnit.SECONDS)
                .blockIfQueueFull(true)
                .create();

        for (long eventCount = 0; ; eventCount++) {
            String message = String.format("message-%0,4d", eventCount);
            String user = getRandomUser(userArray);
            producer.send(new Event(user, eventCount, message));
            System.out.format("Published '%s' from '%s' to Pulsar topic '%s'%n", message, user, topicName);
            Thread.sleep(20);
        }
    }

    private static String getRandomUser(String[] userArray) {
        Random r = new Random();
        return userArray[r.nextInt(userArray.length)];
    }
}

Run it from your IDE. You should see this in the output:

Published 'message-0000' from 'user1' to Pulsar topic 'hz-jet-topic'
Published 'message-0001' from 'user3' to Pulsar topic 'hz-jet-topic'
Published 'message-0002' from 'user1' to Pulsar topic 'hz-jet-topic'
Published 'message-0003' from 'user2' to Pulsar topic 'hz-jet-topic'
Published 'message-0004' from 'user4' to Pulsar topic 'hz-jet-topic'
...

Step 4. Use Hazelcast to Count the Messages of Users

The code below is used to connect to the Pulsar topic and gets messages from it and then logs the count of messages by grouping them with their users.

package org.example;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.*;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.contrib.pulsar.PulsarSources;
import com.hazelcast.jet.pipeline.*;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.example.MessagePublisher.Event;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static com.hazelcast.jet.aggregate.AggregateOperations.counting;
import static com.hazelcast.jet.pipeline.WindowDefinition.sliding;

public class JetJob {
    static final DateTimeFormatter TIME_FORMATTER =
            DateTimeFormatter.ofPattern("HH:mm:ss:SSS");

    public static void main(String[] args) {
        String topicName = "hz-jet-topic";

        StreamSource<Event> source = PulsarSources.pulsarReaderBuilder(
                topicName,
                () -> PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(),
                () -> Schema.JSON(Event.class),
                Message::getValue).build();

        Pipeline p = Pipeline.create();
        p.readFrom(source)
         .withNativeTimestamps(0)
         .groupingKey(Event::getUser)
         .window(sliding(TimeUnit.SECONDS.toMillis(60), TimeUnit.SECONDS.toMillis(30)))
         .aggregate(counting())
         .writeTo(Sinks.logger(wr -> String.format(
                 "At %s Pulsar got %,d messages in the previous minute from %s.",
                 TIME_FORMATTER.format(LocalDateTime.ofInstant(
                         Instant.ofEpochMilli(wr.end()), ZoneId.systemDefault())),
                 wr.result(), wr.key())));

        JobConfig cfg = new JobConfig()
                .setName("pulsar-message-counter");
        HazelcastInstance hz = Hazelcast.bootstrappedInstance();
        hz.getJet().newJob(p, cfg);
    }
}

If you run this code from your IDE, it will create its own Hazelcast member and run the job on it. To run this on the previously started Hazelcast member, you need to create a runnable JAR including all dependencies required to run it. Then, submit it to the Hazelcast cluster. Since build.gradle/pom.xml files are configured to create a full JAR, we can do these steps easily as shown as below:

  • Gradle

  • Maven

gradle build
bin/hz-cli submit build/libs/pulsar-example-1.0-SNAPSHOT.jar
mvn package
bin/hz-cli submit target/pulsar-example-1.0-SNAPSHOT.jar

Now go to the window where you started Hazelcast. Its log output will contain the output from the pipeline.

If MessagePublisher was running while you were following these steps, you’ll now get a report on the whole history of the events and then a steady stream of real-time updates.

Sample output: 10:38:44.504 Between 10:32:00:00-10:32:30:000 Pulsar got 508 messages from user4.

10:38:44.504  Between 10:32:00:00-10:32:30:000 Pulsar got 538 messages from user1.
10:38:44.504  Between 10:32:00:00-10:32:30:000 Pulsar got 508 messages from user4.
10:38:44.597  Between 10:32:30:00-10:33:00:000 Pulsar got 584 messages from user2.
10:38:44.597  Between 10:32:30:00-10:33:00:000 Pulsar got 551 messages from user5.
10:38:44.597  Between 10:32:30:00-10:33:00:000 Pulsar got 540 messages from user3.

It is possible to restart a job without suspending and resuming in one atomic action. You can restart the job by calling:

bin/hz-cli restart pulsar-message-counter

After that, you’ll get all the history again. If you want to change this behaviour to continue where it left off, you can change the fault tolerance behaviour of the job to exactly-once by replacing the JobConfig section of the program with the following.

JobConfig cfg = new JobConfig()
                .setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE)
                .setSnapshotIntervalMillis(SECONDS.toMillis(1))
                .setName("pulsar-message-counter");

Step 5. Clean Up

  1. Cancel the job.

    bin/hz-cli cancel pulsar-message-counter
  2. Shut down the Hazelcast cluster.

    bin/hz-stop