This is a prerelease version.

View latest

Submitting Jobs

To execute a data pipeline it needs to be submitted to the cluster as a job. Once the job is submitted, it’s distributed automatically to all the cluster members and executed on all members.

To run a job, a Hazelcast member must have access to all classes that are used in the job. Depending on how you submit a job, you may also need to upload those classes to your cluster.

Hazelcast cannot check client permissions in code that’s uploaded with a job. In uploaded code, clients can access data or features without permission.

Submitting a Job from the CLI

The submit command of the CLI allows you to upload all classes that are used in your job automatically.

bin/hz-cli submit <jar file>

To do so, your pipeline code must acquire a JetService using Hazelcast.bootstrappedInstance().

Pipeline pipeline = Pipeline.create();
pipeline.readFrom(TestSources.items("the", "quick", "brown", "fox"))
 .map(item -> item.toUpperCase())
 .writeTo(Sinks.logger());

HazelcastInstance hz = Hazelcast.bootstrappedInstance();
JetService jet = hz.getJet();

try {
   jet.newJob(pipeline);
} finally {
   hz.shutdown();
}
The bootstrapped instance disables auto discovery, so we recommend not using this instance in embedded mode.

By default, the submit command executes the Main class of the provided JAR file and submits the job to a member at dev@localhost.

To execute a different class use the --class option of the submit command.

bin/hz-cli submit --class MainClass <jar file> [<arguments>...]

Or to submit the job to a member at a different address, use one of the following options of the hz-cli command:

  • -t: Comma-separated list of member names and addresses to which you want to connect.

  • -f: Path to a client config file, which can be used instead of the -t option.

The submit command has the following additional options:

  • -v: Verbose mode, which will show the connection logs and exception stack traces, if any.

  • -n: Job name to use. This option overrides the one set in the JobConfig object.

  • -s: Name of the initial snapshot to start the job from.

Example:

bin/hz-cli \
  -t dev@192.168.1.100:5701,192.168.1.101:5701 \
  submit \
  -c <MainClass> \
  <jar file> \
  [<arguments>...]

Submitting a Job using a Java Client or Embedded Mode

If you are using the Java client as part of an application and need to submit jobs within that application, you can use the Hazelcast client to directly submit jobs:

HazelcastInstance hz = HazelcastClient.newHazelcastClient();
..
hz.getJet().newJob(pipeline);

Or, if you’re using Hazelcast in embedded mode, you can use the Hazelcast instance to directly submit jobs.

HazelcastInstance hz = Hazelcast.newHazelcastInstance();
..
hz.getJet().newJob(pipeline);

Uploading Classes to Members

When submitting a job using the Hazelcast client, no additional classes are sent with the job. Instead, you must upload those classes, using one of the following options:

  • Use the API

    HazelcastInstance hz = HazelcastClient.newHazelcastClient();
    hz.getJet().newJob(pipeline, new JobConfig().addClass(AppClass.class))

    When adding classes this way, nested (inner and anonymous) classes are automatically added to the members' classpaths as well.

    It’s also possible to add all the classes in a given package (and recursively in all its subpackages) using JobConfig.addPackage() or even a whole JAR file with JobConfig.addJar().

  • Add JAR files to the <HAZELCAST_HOME>/lib directory.

    Hazelcast automatically picks up any dependencies placed in this directory during startup.

    After making changes to the lib directory, you must restart the member before it will recognize the new files.
  • Add the path to your JAR files, using the CLASSPATH=<path_to_jar> environment variable when starting a Hazelcast member.

You cannot upload the following classes using the Jet API or the CLI. These classes must be uploaded at the same time as the member starts.

Job Placement Control

To activate job placement, your license key must include Advanced Compute.

You can define which members an individual Jet job runs on. This is known as job placement.

This approach is particularly useful in the following situations:

  • When you want to run the job on a lite member to isolate computation from storage

    Before isolating your computation, ensure that the processing element of the job is substantially more resource-intensive than the data retrieval element. Although isolating computation from storage can mean that your storage components benefit from the reduced resource workload, serving the data still has some impact and you must ensure that isolated jobs provide the right balance for your needs.
  • When you want to run the job on an edge node to take advantage of edge computing

Job placement supports the following:

  • Auto-scaling

  • AT_LEAST_ONCE and EXACTLY_ONCE Fault-tolerance

  • Split-brain protection

  • Metrics

You can use the Hazelcast Java client to submit your job to specific members as follows:

HazelcastInstance hz = HazelcastClient.newHazelcastClient();
// ...
Map map = hz.getMap(MAP_NAME);
Pipeline p = Pipeline.create()
                .readFrom(Sources.map(map))
                .map(Entry::getValue)
                .writeTo(sink)
                .getPipeline();
Job job = hz.getJet()
        .newJobBuilder(p)
        .withMemberSelector(JetMemberSelector.ALL_LITE_MEMBERS)
        .start();

For further information on job placement control, see Jet Job Placement Control.

Submitting a Job using SQL

To submit a job to the cluster with SQL, use the CREATE JOB statement.

For an example of how to use this statement, see Get Started with SQL Over Kafka.

Uploading Classes to Members

You cannot use SQL to add classes to a member’s classpath.

If your job uses classes that aren’t already in your members classpath, you can use one of the following options:

  • Add JAR files to the <HAZELCAST_HOME>/lib directory.

    Hazelcast automatically picks up any dependencies placed in this directory during startup.

    After making changes to the lib directory, you must restart the member before it will recognize the new files.
  • Add the path to your JAR files, using the CLASSPATH=<path_to_jar> environment variable when starting a Hazelcast member.

Submitting a Job from a Dockerfile

You can also create your own Docker image using Dockerfiles to start Hazelcast and submit jobs.

Create a Dockerfile as follows:

FROM hazelcast/hazelcast:5.5.0
ADD examples/hello-world.jar /examples/
ENV HAZELCAST_MEMBER_ADDRESS 172.17.0.2
CMD ["sh", "-c", "hz-cli -t $HAZELCAST_MEMBER_ADDRESS submit /examples/hello-world.jar"]

The Hazelcast address is exposed through the HAZELCAST_MEMBER_ADDRESS environment variable, with the default value of 172.17.0.2. This makes it easy to pass a different address with docker run -e HAZELCAST_MEMBER_ADDRESS=<another.one>.

Then, you create your own Docker image using the following command, giving it the name hazelcast-hello-world:

docker build . -t hazelcast-hello-world

You will see an output similar to the following:

Sending build context to Docker daemon  77.35MB
...
Successfully built 6bc0f527b69c
Successfully tagged hazelcast-hello-world:latest

Finally, you submit the job as follows:

docker run -it hazelcast-hello-world

Options for Packaging Dependencies

A pipeline is built with several transform which typically consist of lambda expressions. During the job submission, the pipeline is serialized and sent to the cluster, which must be able to execute these expressions on each member. Imagine the simple mapping pipeline below:

class MyJob {

  public static void main(String[] args) {
    Pipeline p = Pipeline.create();
    p.readFrom(TestSources.items(1, 2, 3, 4))
     .map(x -> x * x)
     .writeTo(Sinks.logger());

     HazelcastInstance hz = Hazelcast.bootstrappedInstance();
     hz.getJet().newJob(p).join();
  }
}

The lambda x → x * x will get compiled by Java into an anonymous class with a name like MyJob$$Lambda$30/0x00000008000a1840. These and other classes which may depend on these functions need to be present on the members that will be executing the job. Hazelcast supports several ways to make these classes available on the members.

Uber JAR

The easiest way to get additional dependencies to the cluster is to bundle it as a so-called uber JAR, which contains all the required dependencies inside.

To build an uber JAR, there are several options:

Adding to Member Classpaths

Some dependencies may either be large or may be required to be present on classpath during application startup.

The convention is to add these dependencies to $HZ_HOME/lib directory. Hazelcast automatically picks up any dependencies placed on this directory during startup. Several out-of-the-box modules (such as connectors for Kafka, Hadoop) are already available in the lib directory and can simply be used. Any changes to lib directory requires the node to be restarted to take effect.

Alternatively, you can use the CLASSPATH environment variable to add additional classes:

CLASSPATH=<path_to_jar> bin/hz-start

You cannot upload the following classes using the API or the CLI. These classes must be uploaded at the same time as the member starts.