Legacy File Connector

The legacy file connector allows you to read and write to files, using the Jet API. This connector is still maintained, but all new development goes into the unified file connector.

File sources generally involve reading a set of files from either a disk or a distributed file system such as Amazon S3 or Hadoop. Most file sources and sinks are batch oriented, but the sinks that support rolling capability can also be used as sinks in streaming jobs.

This connector does not support SQL. To read from files in SQL, see the Unified File Connector.

Installing the Connector

To read from a local or shared filesystem, you do not need to install any additional modules as these are included in the full and slim distributions of Hazelcast.

To access Hadoop or any of the cloud-based file systems, add one of the downloadable modules listed in Supported Storage Systems to your members' classpaths.

Permissions

If you use Hazelcast Enterprise, you can set up permissions to restrict clients' access to your files. For details, see Securing Jobs.

Supported File Systems

Hazelcast supports reading from the following file systems.

If you use the slim distribution of Hazelcast, be sure to add the respective modules to your members' classpaths.

Storage System Module

Member Filesystems (both shared and local)

Included in both full and slim distributions of Hazelcast.

Apache Avro

hazelcast-jet-avro

Hadoop Distributed File System (HDFS)

hazelcast-jet-hadoop

Amazon S3

hazelcast-jet-files-s3

Although these are the officially supported sources, you can also read from any file system that’s compatible with Hadoop.

Reading from Local and Remote File Systems

The simplest file source, Sources.files(), allows you to work with both local and shared file systems. This source is text-oriented, reads files line by line, and emits one record per line.

Pipeline p = Pipeline.create();
p.readFrom(Sources.files("/home/data/web-logs"))
 .map(line -> LogParser.parse(line))
 .filter(log -> log.level().equals("ERROR"))
 .writeTo(Sinks.logger());

Working with JSON Files

For JSON files, the source expects to be formatted as streaming JSON, where each JSON string is separated by a new-line. The JSON string can span on multiple lines. The source converts each JSON string to an object of a given type or to a map if no type is given:

Pipeline p = Pipeline.create();
p.readFrom(Sources.json("/home/data/people", Person.class))
 .filter(person -> person.location().equals("NYC"))
 .writeTo(Sinks.logger());

Hazelcast uses the lightweight JSON library jackson-jr to parse the given input or to convert the given objects to JSON string. As a result, you can use Jackson Annotations by adding the jackson-annotations library to your members' classpaths. For example:

public class Person {

  private long personId;
  private String name;

  @JsonGetter("id")
  public long getPersonId() {
    return this.personId;
  }

  @JsonSetter("id")
  public void setPersonId(long personId) {
    this.personId = personId;
  }

  public String getName() {
      return name;
  }

  public void setName(String name) {
    this.name = name;
  }
}

Working with CSV Files

For CSV files or for parsing files in other custom formats, use the filesBuilder source:

Pipeline p = Pipeline.create();
p.readFrom(Sources.filesBuilder(sourceDir).glob("*.csv").build(path ->
    Files.lines(path).skip(1).map(SalesRecordLine::parse))
).writeTo(Sinks.logger());
When reading from a shared file system, set the FilesBuilder.sharedFileSystem() option to make sure that shared files are read only once instead of reading each copy on every member.

File Sink

The file sink, like the source works with text and creates a line of output for each record. When the rolling option is used it will roll the filename to a new one once the criteria is met. It supports rolling by size or date. The following will roll to a new file every hour:

Pipeline p = Pipeline.create();
p.readFrom(TestSources.itemStream(100))
 .withoutTimestamps()
 .writeTo(Sinks.filesBuilder("out")
 .rollByDate("YYYY-MM-dd.HH")
 .build());

To write JSON files, you can use Sinks.json or Sinks.filesBuilder with JsonUtil.toJson() as toStringFn(). Sink converts each item to JSON string and writes it as a new line to the file:

Pipeline p = Pipeline.create();
p.readFrom(TestSources.itemStream(100))
 .withoutTimestamps()
 .writeTo(Sinks.json("out"));

Each member will write to a unique file with a numerical index. You can achieve the effect of a distributed sink if you manually collect all the output files on all members and combine their contents.

The sink also supports exactly-once processing and can work transactionally.

File Watcher

File watcher is a streaming file source, where only the new files or appended lines are emitted. If the files are modified in more complex ways, the behavior is undefined.

Pipeline p = Pipeline.create();
p.readFrom(Sources.fileWatcher("/home/data"))
 .withoutTimestamps()
 .writeTo(Sinks.logger());

You can create streaming file source for JSON files too:

Pipeline p = Pipeline.create();
p.readFrom(Sources.jsonWatcher("/home/data", Person.class))
 .withoutTimestamps()
 .writeTo(Sinks.logger());

Apache Avro

Apache Avro is a binary data storage format which is schema based. The connectors are similar to the local file connectors, but work with binary files stored in Avro Object Container File format.

To use the Avro connector, make sure the hazelcast-jet-avro module is present in the lib directory and add the following dependency to your application:

  • Gradle

  • Maven

compile 'com.hazelcast.jet:hazelcast-jet-avro:5.0'
<dependency>
  <groupId>com.hazelcast.jet</groupId>
  <artifactId>hazelcast-jet-avro</artifactId>
  <version>5.0</version>
</dependency>

With Avro sources, you can use either the SpecificReader or DatumReader depending on the data type:

Pipeline p = Pipeline.create();
p.readFrom(AvroSources.files("/home/data", Person.class))
 .filter(person -> person.age() > 30)
 .writeTo(Sinks.logger());

The sink expects a schema and the type to be written:

p.writeTo(AvroSinks.files(DIRECTORY_NAME, Person.getClassSchema()), Person.class))

Hadoop InputFormat/OutputFormat

You can use Hadoop connector to read/write files from/to Hadoop Distributed File System (HDFS), local file system, or any other system which has Hadoop connectors, including various cloud storages. Hazelcast was tested with:

  • Amazon S3

  • Google Cloud Storage

  • Azure Cloud Storage

  • Azure Data Lake

The Hadoop source and sink require a configuration object of type Configuration which supplies the input and output paths and formats. They don’t actually create a MapReduce job, this config is simply used to describe the required inputs and outputs. You can share the same Configuration instance between several source/sink instances.

For example, to do a canonical word count on a Hadoop data source, we can use the following pipeline:

Job job = Job.getInstance();
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextInputFormat.addInputPath(job, new Path("input-path"));
TextOutputFormat.setOutputPath(job, new Path("output-path"));
Configuration configuration = job.getConfiguration();

Pipeline p = Pipeline.create();
p.readFrom(HadoopSources.inputFormat(configuration, (k, v) -> v.toString()))
 .flatMap(line -> traverseArray(line.toLowerCase().split("\\W+")))
 .groupingKey(word -> word)
 .aggregate(AggregateOperations.counting())
 .writeTo(HadoopSinks.outputFormat(configuration));

The Hadoop source and sink will use either the new or the old MapReduce API based on the input format configuration.

Each processor will write to a different file in the output directory identified by the unique processor id. The files will be in a temporary state until the job is completed and will be committed when the job is complete. For streaming jobs, they will be committed when the job is cancelled. We have plans to introduce a rolling sink for Hadoop in the future to have better streaming support.

Data Locality

Hazelcast distributes the input data across cluster members, with each processor instance reading only a part of the input. If Hazelcast members are co-located with the Hadoop data nodes, then Hazelcast can make use of data locality by reading the blocks locally where possible. This can bring a significant increase in read throughput.

Serialization and Writables

Hadoop types implement their own serialization mechanism through the use of Writable types. Jet provides an adapter to register a Writable for Hazelcast serialization without having to write additional serialization code. To use this adapter, you can register your own Writable types by extending WritableSerializerHook and registering the hook.

Hadoop Classpath

When submitting jobs that use Hadoop, sending Hadoop JARs should be avoided and instead the Hadoop classpath should be used. Hadoop JARs contain some JVM hooks and can keep lingering references inside the JVM long after the job has ended, causing memory leaks.

To obtain the hadoop classpath, use the hadoop classpath command and append the output to the CLASSPATH environment variable before starting Hazelcast.

Amazon S3

The Amazon S3 connectors are text-based connectors that can read and write files to Amazon S3 storage.

The connectors expect the user to provide either an S3Client instance or credentials (or using the default ones) to create the client. The source and sink assume the data is in the form of plain text and emit/receive data items which represent individual lines of text.

AwsBasicCredentials credentials = AwsBasicCredentials.create("accessKeyId", "accessKeySecret");
S3Client s3 = S3Client.builder()
  .credentialsProvider(StaticCredentialsProvider.create(credentials))
  .build();

Pipeline p = Pipeline.create();
p.readFrom(S3Sources.s3(singletonList("input-bucket"), "prefix",
() -> S3Client.builder().credentialsProvider(StaticCredentialsProvider.create(credentials)).build())
 .filter(line -> line.contains("ERROR"))
 .writeTo(Sinks.logger());
))

The S3 sink works similar to the local file sink, writing a line to the output for each input item:

Pipeline p = Pipeline.create();
p.readFrom(TestSources.items("the", "brown", "fox"))
 .writeTo(S3Sinks.s3("output-bucket", () -> S3Client.create()));

The sink creates an object in the bucket for each processor instance. Name of the file will include a user provided prefix (if defined), followed by the processor’s global index. For example the processor having the index 2 with prefix my-object- will create the object my-object-2.

S3 sink uses the multi-part upload feature of S3 SDK. The sink buffers the items to parts and uploads them after buffer reaches to the threshold. The multi-part upload is completed when the job completes and makes the objects available on the S3. Since a streaming jobs never complete, S3 sink is not currently applicable to streaming jobs.