Submitting Jobs
To execute a data pipeline it needs to be submitted to the cluster as a job. Once the job is submitted, it’s distributed automatically to all the cluster members and executed on all members.
To run a job, a Hazelcast member must have access to all classes that are used in the job. Depending on how you submit a job, you may also need to upload those classes to your cluster.
Hazelcast cannot check client permissions in code that’s uploaded with a job. In uploaded code, clients can access data or features without permission. |
Submitting a Job from the CLI
The submit
command of the CLI allows you to upload all classes that are used in your job automatically.
bin/hz-cli submit <jar file>
To do so, your pipeline code must acquire a
JetService
using Hazelcast.bootstrappedInstance()
.
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(TestSources.items("the", "quick", "brown", "fox"))
.map(item -> item.toUpperCase())
.writeTo(Sinks.logger());
HazelcastInstance hz = Hazelcast.bootstrappedInstance();
JetService jet = hz.getJet();
try {
jet.newJob(pipeline);
} finally {
hz.shutdown();
}
The bootstrapped instance disables auto discovery, so we recommend not using this instance in embedded mode. |
By default, the submit
command executes the Main
class of the
provided JAR file and submits the job to a member at dev@localhost
.
To execute a different class use the --class
option of the submit
command.
bin/hz-cli submit --class MainClass <jar file> [<arguments>...]
Or to submit the job to a member at a different address, use one of the following options of the hz-cli
command:
-
-t
: Comma-separated list of member names and addresses to which you want to connect. -
-f
: Path to a client config file, which can be used instead of the-t
option.
The submit
command has the following additional options:
-
-v
: Verbose mode, which will show the connection logs and exception stack traces, if any. -
-n
: Job name to use. This option overrides the one set in theJobConfig
object. -
-s
: Name of the initial snapshot to start the job from.
Example:
bin/hz-cli \
-t dev@192.168.1.100:5701,192.168.1.101:5701 \
submit \
-c <MainClass> \
<jar file> \
[<arguments>...]
Submitting a Job using a Java Client or Embedded Mode
If you are using the Java client as part of an application and need to submit jobs within that application, you can use the Hazelcast client to directly submit jobs:
HazelcastInstance hz = HazelcastClient.newHazelcastClient();
..
hz.getJet().newJob(pipeline);
Or, if you’re using Hazelcast in embedded mode, you can use the Hazelcast instance to directly submit jobs.
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
..
hz.getJet().newJob(pipeline);
Uploading Classes to Members
When submitting a job using the Hazelcast client, no additional classes are sent with the job. Instead, you must upload those classes, using one of the following options:
-
Use the API
HazelcastInstance hz = HazelcastClient.newHazelcastClient(); hz.getJet().newJob(pipeline, new JobConfig().addClass(AppClass.class))
When adding classes this way, nested (inner and anonymous) classes are automatically added to the members' classpaths as well.
It’s also possible to add all the classes in a given package (and recursively in all its subpackages) using
JobConfig.addPackage()
or even a whole JAR file withJobConfig.addJar()
. -
Add JAR files to the
<HAZELCAST_HOME>/lib
directory.Hazelcast automatically picks up any dependencies placed in this directory during startup.
After making changes to the lib
directory, you must restart the member before it will recognize the new files. -
Add the path to your JAR files, using the
CLASSPATH=<path_to_jar>
environment variable when starting a Hazelcast member.
You cannot upload the following classes using the Jet API or the CLI. These classes must be uploaded at the same time as the member starts.
|
Job Placement Control
To activate job placement, your license key must include Advanced Compute
.
You can define which members an individual Jet job runs on. This is known as job placement.
This approach is particularly useful in the following situations:
-
When you want to run the job on a lite member to isolate computation from storage
Before isolating your computation, ensure that the processing element of the job is substantially more resource-intensive than the data retrieval element. Although isolating computation from storage can mean that your storage components benefit from the reduced resource workload, serving the data still has some impact and you must ensure that isolated jobs provide the right balance for your needs. -
When you want to run the job on an edge node to take advantage of edge computing
Job placement supports the following:
-
Auto-scaling
-
AT_LEAST_ONCE
andEXACTLY_ONCE
Fault-tolerance -
Split-brain protection
-
Metrics
You can use the Hazelcast Java client to submit your job to specific members as follows:
HazelcastInstance hz = HazelcastClient.newHazelcastClient();
// ...
Map map = hz.getMap(MAP_NAME);
Pipeline p = Pipeline.create()
.readFrom(Sources.map(map))
.map(Entry::getValue)
.writeTo(sink)
.getPipeline();
Job job = hz.getJet()
.newJobBuilder(p)
.withMemberSelector(JetMemberSelector.ALL_LITE_MEMBERS)
.start();
For further information on job placement control, see Jet Job Placement Control.
Submitting a Job using SQL
To submit a job to the cluster with SQL, use the CREATE JOB
statement.
For an example of how to use this statement, see Get Started with SQL Over Kafka.
Uploading Classes to Members
You cannot use SQL to add classes to a member’s classpath.
If your job uses classes that aren’t already in your members classpath, you can use one of the following options:
-
Add JAR files to the
<HAZELCAST_HOME>/lib
directory.Hazelcast automatically picks up any dependencies placed in this directory during startup.
After making changes to the lib
directory, you must restart the member before it will recognize the new files. -
Add the path to your JAR files, using the
CLASSPATH=<path_to_jar>
environment variable when starting a Hazelcast member.
Submitting a Job from a Dockerfile
You can also create your own Docker image using Dockerfiles to start Hazelcast and submit jobs.
Create a Dockerfile as follows:
FROM hazelcast/hazelcast:5.5.2
ADD examples/hello-world.jar /examples/
ENV HAZELCAST_MEMBER_ADDRESS 172.17.0.2
CMD ["sh", "-c", "hz-cli -t $HAZELCAST_MEMBER_ADDRESS submit /examples/hello-world.jar"]
The Hazelcast address is exposed through the HAZELCAST_MEMBER_ADDRESS
environment
variable, with the default value of 172.17.0.2
. This makes it easy to
pass a different address with docker run -e HAZELCAST_MEMBER_ADDRESS=<another.one>
.
Then, you create your own Docker image using the following command, giving it the name hazelcast-hello-world
:
docker build . -t hazelcast-hello-world
You will see an output similar to the following:
Sending build context to Docker daemon 77.35MB
...
Successfully built 6bc0f527b69c
Successfully tagged hazelcast-hello-world:latest
Finally, you submit the job as follows:
docker run -it hazelcast-hello-world
Options for Packaging Dependencies
A pipeline is built with several transform which typically consist of lambda expressions. During the job submission, the pipeline is serialized and sent to the cluster, which must be able to execute these expressions on each member. Imagine the simple mapping pipeline below:
class MyJob {
public static void main(String[] args) {
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items(1, 2, 3, 4))
.map(x -> x * x)
.writeTo(Sinks.logger());
HazelcastInstance hz = Hazelcast.bootstrappedInstance();
hz.getJet().newJob(p).join();
}
}
The lambda x → x * x
will get compiled by Java into an anonymous
class with a name like MyJob$$Lambda$30/0x00000008000a1840
. These and
other classes which may depend on these functions need to be present
on the members that will be executing the job. Hazelcast supports several ways to make these classes available on the members.
Uber JAR
The easiest way to get additional dependencies to the cluster is to bundle it as a so-called uber JAR, which contains all the required dependencies inside.
To build an uber JAR, there are several options:
Adding to Member Classpaths
Some dependencies may either be large or may be required to be present on classpath during application startup.
The convention is to add these dependencies to $HZ_HOME/lib
directory.
Hazelcast automatically picks up any dependencies placed on this directory during
startup. Several out-of-the-box modules (such as connectors for
Kafka, Hadoop) are already available in the lib
directory and can simply
be used. Any changes to lib
directory
requires the node to be restarted to take effect.
Alternatively, you can use the CLASSPATH
environment variable
to add additional classes:
CLASSPATH=<path_to_jar> bin/hz-start
You cannot upload the following classes using the API or the CLI. These classes must be uploaded at the same time as the member starts.
|