Legacy File Connector
The legacy file connector allows you to read and write to files, using the Jet API. This connector is still maintained, but all new development goes into the unified file connector.
File sources generally involve reading a set of files from either a disk or a distributed file system such as Amazon S3 or Hadoop. Most file sources and sinks are batch oriented, but the sinks that support rolling capability can also be used as sinks in streaming jobs.
This connector does not support SQL. To read from files in SQL, see the Unified File Connector. |
Installing the Connector
To read from a local or shared filesystem, you do not need to install any additional modules as these are included in the full and slim distributions of Hazelcast.
To access Hadoop or any of the cloud-based file systems, add one of the downloadable modules listed in Supported Storage Systems to your members' classpaths.
Permissions
Enterprise
If security is enabled, you can set up permissions to restrict clients' access to your files. For details, see Securing Jobs.
Supported File Systems
Hazelcast supports reading from the following file systems.
If you use the slim distribution of Hazelcast, be sure to add the respective modules to your members' classpaths.
Storage System | Module |
---|---|
Member Filesystems (both shared and local) |
|
Apache Avro |
|
Hadoop Distributed File System (HDFS) |
|
Amazon S3 |
Although these are the officially supported sources, you can also read from any file system that’s compatible with Hadoop.
Reading from Local and Remote File Systems
The simplest file source, Sources.files()
, allows you to work with both local and shared
file systems. This source is text-oriented, reads files line by
line, and emits one record per line.
Pipeline p = Pipeline.create();
p.readFrom(Sources.files("/home/data/web-logs"))
.map(line -> LogParser.parse(line))
.filter(log -> log.level().equals("ERROR"))
.writeTo(Sinks.logger());
Working with JSON Files
For JSON files, the source expects to be formatted as streaming JSON, where each JSON string is separated by a new-line. The JSON string can span on multiple lines. The source converts each JSON string to an object of a given type or to a map if no type is given:
Pipeline p = Pipeline.create();
p.readFrom(Sources.json("/home/data/people", Person.class))
.filter(person -> person.location().equals("NYC"))
.writeTo(Sinks.logger());
Hazelcast uses the lightweight JSON library jackson-jr
to parse the given
input or to convert the given objects to JSON string. As a result, you can use
Jackson Annotations
by adding the jackson-annotations
library to your members' classpaths. For example:
public class Person {
private long personId;
private String name;
@JsonGetter("id")
public long getPersonId() {
return this.personId;
}
@JsonSetter("id")
public void setPersonId(long personId) {
this.personId = personId;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
Working with CSV Files
For CSV files or for parsing files in other custom formats, use the filesBuilder
source:
Pipeline p = Pipeline.create();
p.readFrom(Sources.filesBuilder(sourceDir).glob("*.csv").build(path ->
Files.lines(path).skip(1).map(SalesRecordLine::parse))
).writeTo(Sinks.logger());
When reading from a shared file system, set the FilesBuilder.sharedFileSystem() option to make sure that shared files are read only once instead of reading each copy on every member.
|
File Sink
The file sink, like the source works with text and creates a line of output for each record. When the rolling option is used it will roll the filename to a new one once the criteria is met. It supports rolling by size or date. The following will roll to a new file every hour:
Pipeline p = Pipeline.create();
p.readFrom(TestSources.itemStream(100))
.withoutTimestamps()
.writeTo(Sinks.filesBuilder("out")
.rollByDate("YYYY-MM-dd.HH")
.build());
To write JSON files, you can use Sinks.json
or Sinks.filesBuilder
with JsonUtil.toJson()
as toStringFn()
. Sink converts each item to JSON
string and writes it as a new line to the file:
Pipeline p = Pipeline.create();
p.readFrom(TestSources.itemStream(100))
.withoutTimestamps()
.writeTo(Sinks.json("out"));
Each member will write to a unique file with a numerical index. You can achieve the effect of a distributed sink if you manually collect all the output files on all members and combine their contents.
The sink also supports exactly-once processing and can work transactionally.
File Watcher
File watcher is a streaming file source, where only the new files or appended lines are emitted. If the files are modified in more complex ways, the behavior is undefined.
Pipeline p = Pipeline.create();
p.readFrom(Sources.fileWatcher("/home/data"))
.withoutTimestamps()
.writeTo(Sinks.logger());
You can create streaming file source for JSON files too:
Pipeline p = Pipeline.create();
p.readFrom(Sources.jsonWatcher("/home/data", Person.class))
.withoutTimestamps()
.writeTo(Sinks.logger());
Apache Avro
Apache Avro is a binary data storage format which is schema based. The connectors are similar to the local file connectors, but work with binary files stored in Avro Object Container File format.
To use the Avro connector, make sure the hazelcast-jet-avro
module is present in the lib
directory and add the following
dependency to your application:
compile 'com.hazelcast.jet:hazelcast-jet-avro:5.2.5'
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-jet-avro</artifactId>
<version>5.2.5</version>
</dependency>
With Avro sources, you can use either the SpecificReader
or
DatumReader
depending on the data type:
Pipeline p = Pipeline.create();
p.readFrom(AvroSources.files("/home/data", Person.class))
.filter(person -> person.age() > 30)
.writeTo(Sinks.logger());
The sink expects a schema and the type to be written:
p.writeTo(AvroSinks.files(DIRECTORY_NAME, Person.getClassSchema()), Person.class))
Hadoop InputFormat/OutputFormat
You can use Hadoop connector to read/write files from/to Hadoop Distributed File System (HDFS), local file system, or any other system which has Hadoop connectors, including various cloud storages. Hazelcast was tested with:
-
Amazon S3
-
Google Cloud Storage
-
Azure Cloud Storage
-
Azure Data Lake
The Hadoop source and sink require a configuration object of type
Configuration
which supplies the input and output paths and formats. They don’t
actually create a MapReduce job, this config is simply used to describe
the required inputs and outputs. You can share the same Configuration
instance between several source/sink instances.
For example, to do a canonical word count on a Hadoop data source, we can use the following pipeline:
Job job = Job.getInstance();
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextInputFormat.addInputPath(job, new Path("input-path"));
TextOutputFormat.setOutputPath(job, new Path("output-path"));
Configuration configuration = job.getConfiguration();
Pipeline p = Pipeline.create();
p.readFrom(HadoopSources.inputFormat(configuration, (k, v) -> v.toString()))
.flatMap(line -> traverseArray(line.toLowerCase().split("\\W+")))
.groupingKey(word -> word)
.aggregate(AggregateOperations.counting())
.writeTo(HadoopSinks.outputFormat(configuration));
The Hadoop source and sink will use either the new or the old MapReduce API based on the input format configuration.
Each processor will write to a different file in the output directory identified by the unique processor id. The files will be in a temporary state until the job is completed and will be committed when the job is complete. For streaming jobs, they will be committed when the job is cancelled. We have plans to introduce a rolling sink for Hadoop in the future to have better streaming support.
Data Locality
Hazelcast distributes the input data across cluster members, with each processor instance reading only a part of the input. If Hazelcast members are co-located with the Hadoop data nodes, then Hazelcast can make use of data locality by reading the blocks locally where possible. This can bring a significant increase in read throughput.
Serialization and Writables
Hadoop types implement their own serialization mechanism through the use
of Writable
types. Jet provides an adapter to register a Writable
for Hazelcast serialization without having to write
additional serialization code. To use this adapter, you can register
your own Writable
types by extending WritableSerializerHook
and
registering the hook.
Hadoop Classpath
When submitting jobs that use Hadoop, sending Hadoop JARs should be avoided and instead the Hadoop classpath should be used. Hadoop JARs contain some JVM hooks and can keep lingering references inside the JVM long after the job has ended, causing memory leaks.
To obtain the hadoop classpath, use the hadoop classpath
command and
append the output to the CLASSPATH
environment variable before
starting Hazelcast.
Amazon S3
The Amazon S3 connectors are text-based connectors that can read and write files to Amazon S3 storage.
The connectors expect the user to provide either an S3Client
instance
or credentials (or using the default ones) to create the client. The
source and sink assume the data is in the form of plain text and
emit/receive data items which represent individual lines of text.
AwsBasicCredentials credentials = AwsBasicCredentials.create("accessKeyId", "accessKeySecret");
S3Client s3 = S3Client.builder()
.credentialsProvider(StaticCredentialsProvider.create(credentials))
.build();
Pipeline p = Pipeline.create();
p.readFrom(S3Sources.s3(singletonList("input-bucket"), "prefix",
() -> S3Client.builder().credentialsProvider(StaticCredentialsProvider.create(credentials)).build())
.filter(line -> line.contains("ERROR"))
.writeTo(Sinks.logger());
))
The S3 sink works similar to the local file sink, writing a line to the output for each input item:
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items("the", "brown", "fox"))
.writeTo(S3Sinks.s3("output-bucket", () -> S3Client.create()));
The sink creates an object in the bucket for each processor instance.
Name of the file will include a user provided prefix (if defined),
followed by the processor’s global index. For example the processor
having the index 2
with prefix my-object-
will create the object
my-object-2
.
S3 sink uses the multi-part upload feature of S3 SDK. The sink buffers the items to parts and uploads them after buffer reaches to the threshold. The multi-part upload is completed when the job completes and makes the objects available on the S3. Since a streaming jobs never complete, S3 sink is not currently applicable to streaming jobs.