Unified File Connector

The unified file connector provides a simple way to read files across different sources. Using this connector, you can read files from the local filesystem, HDFS and cloud storage systems such as Amazon S3, Google Cloud Storage or Azure Blob Storage. This connector supports various data formats, including text files, CSV, JSON, and Avro.

This connector can be used only as a source. To use files as a sink, you can use the Jet API with the legacy file connector. You cannot yet use files as a sink in SQL.

Installing the Connector

To read from a local or shared filesystem, you do not need to install any additional modules as these are included in the full and slim distributions of Hazelcast.

To access Hadoop or any of the cloud-based file systems, you may need to add one of the downloadable modules listed in Supported Storage Systems to your members' classpaths.

Depending on the format of your files, you may also need to add other modules to the classpath.

Permissions

Enterprise Edition

If security is enabled, you can set up permissions to restrict clients' access to your files. For details, see Securing Jobs.

Supported File Systems

Hazelcast supports reading from the following file systems.

If you use the slim distribution of Hazelcast, be sure to add the respective modules to your members' classpaths.

Storage System Module Example path

Member Filesystems (both shared and local)

Included in both full and slim distributions of Hazelcast.

path/to/a/directory

Hadoop Distributed File System (HDFS)

hazelcast-jet-hadoop-all

hdfs://path/to/a/directory

Amazon S3

hazelcast-jet-files-s3

s3a://example-bucket/path/in/the/bucket

Google Cloud Storage

hazelcast-jet-files-gcs

gs://example-bucket/path/in/the/bucket

Windows Azure Blob Storage

hazelcast-jet-files-azure

wasbs://example-container@examplestorageaccount.blob.core.windows.net/path/in/the/container

Azure Data Lake Generation 1

hazelcast-jet-files-azure

adl://exampledatalake.azuredatalakestore.net/path/in/the/container

Azure Data Lake Generation 2

hazelcast-jet-files-azure

abfs://example-container@exampledatalakeaccount.dfs.core.windows.net/path/in/the/container

Although these are the officially supported sources, you can also read from any file system that’s compatible with Hadoop.

Configuration Options

Use these options to configure the file connector.

path

The absolute path to a single directory.

This parameter must not contain any wildcard characters. The given directory is not read recursively.

Hazelcast supports the following path schemes:

  • hdfs: HDFS

  • s3a: Amazon S3

  • wasbs: Azure Cloud Storage

  • adl: Azure Data Lake Generation 1

  • abfs: Azure Data Lake Generation 2

  • gs: Google Cloud Storage

For example, a path to a file in a Hadoop cluster would start with hdfs://path/to/directory/.

Any path that does not start with a schema is considered local (files on Hazelcast members).

Default:

  • Jet

  • SQL

The main entrypoint to the file connector is FileSources.files, which takes a path as a String parameter and returns a FileSourceBuilder object.

If you don’t specify a format, this API tries to read a .txt file line by line.

BatchSource<String> source = FileSources.files("/path/to/my/directory")
  .build();

Create a mapping to a directory, using the path option.

You must also specify a format.

CREATE MAPPING my_files
TYPE File
OPTIONS (
    'path' = '/path/to/directory',
    'format' = 'json-flat'
)

If you omit a column list from the CREATE MAPPING command:

  • Hazelcast will read a sample file from the directory and try to determine column names and types from it.

  • The path directory must be available at the time you execute the CREATE MAPPING statement and it must not be empty.

  • Every cluster member must have at least one file in the path directory.

format

The file format.

  • Jet

  • SQL

The file connector defaults to UTF-8 encoded text with the file read line by line. You can specify the file format using format(FileFormat) method. See the available formats in the FileFormat.* interface.

The Jet API supports all available file formats.

You must provide a file format to create a mapping.

SQL file formats must be structured. As a result, SQL supports only the following file formats:

glob

A filename pattern that uses wildcard characters such as * or ? to filter the files in the path directory.

Default: * (match all files)

For example, if a directory contains JSON files named using the YYYY-MM-DD.log pattern, you can read all the files from January 2020 with the following glob argument:

  • Jet

  • SQL

BatchSource<String> source = FileSources.files("/orders/")
  .glob("2020-01-*.json")
  .build();
CREATE MAPPING my_orders
TYPE File
OPTIONS (
    'path' = '/orders',
    'format' = 'json-flat',
    'glob' = '2020-01-*.json'
)

ignoreFileNotFound

Return zero results instead of throwing an error when files in the path directory are not found.

Default: false

  • Jet

  • SQL

BatchSource<String> source = FileSources.files("/orders/")
  .glob("2020-01-*.json")
  .ignoreFileNotFound(true)
  .build();

If you set this option to true, you must specify the column list.

This option is not valid for file table functions, because they always need at least one record from which to derive the column list.

CREATE MAPPING my_orders
TYPE File
OPTIONS (
    'path' = '/orders',
    'format' = 'json-flat',
    'glob' = '2020-01-*.json'
    'ignoreFileNotFound' = 'true'
)

option

Options to pass to the file system such as authentication options.

  • Jet

  • SQL

BatchSource<String> source = FileSources.files("/orders/")
  .glob("2020-01-*.json")
  .option("fs.s3a.impl.disable.cache", "true")
  .build();
CREATE MAPPING my_orders
TYPE File
OPTIONS (
    'path' = '/orders',
    'format' = 'json-flat',
    'glob' = '2020-01-*.json'
    'fs.s3a.impl.disable.cache' = 'true'
)

sharedFileSystem

Read shared files only once instead of reading each copy on every member.

Default: false

If the path directory is shared among cluster members such as on a network mounted filesystem, set the sharedFileSystem option to true. The files will be assigned among the members and reading the files multiple times (once on each member) will be avoided.

useHadoopforLocalFiles

Use the Hadoop connector module to read files from a local filesystem.

Default: false

This option might be beneficial when you need to parallelize reading from a single large file, or read only a subset of columns when using the Parquet format. The hazelcast-jet-hadoop module must be on your members' classpaths.

  • Jet

  • SQL

BatchSource<String> source = FileSources.files("/data")
  .glob("wikipedia.txt")
  .useHadoopForLocalFiles(true)
  .build();

You can provide additional options to Hadoop. For example, to read all files in a directory recursively:

BatchSource<String> source = FileSources.files("/data")
  .glob("wikipedia.txt")
  .useHadoopForLocalFiles(true)
  .option("mapreduce.input.fileinputformat.input.dir.recursive", "true")
  .build();

This option is not supported by SQL.

Supported File Formats

Hazelcast supports reading from the following file formats.

Avro

The Avro format allows you to read data from files in the Avro Object Container File format. To use the Avro format you must have the hazelcast-jet-avro module on your members' classpaths.

  • Jet

  • SQL

Suppose you have a class User, generated from the Avro schema, you can read the data from an Avro file in the following way. Notice that you don’t need to provide the User class to the builder, but you do need to satisfy the Java type system:

BatchSource<User> source = FileSources.files("/data")
  .glob("users.avro")
  .format(FileFormat.<User>avro())
  .build();

This example will use Avro’s SpecificDatumReader under the hood.

If you don’t have a class generated from the Avro schema, but the structure of your class matches the data, you can use Java reflection to read the data:

BatchSource<User> source = FileSources.files("/data")
  .glob("users.avro")
  .format(FileFormat.avro(User.class))
  .build();

This example will use Avro’s ReflectDatumReader under the hood.

CREATE MAPPING users
TYPE File
OPTIONS (
    'path' = '/users',
    'format' = 'avro',
    'glob' = '*.avro'
)
Avro Type SQL Type

BOOLEAN

BOOLEAN

INT

INT

LONG

BIGINT

FLOAT

REAL

DOUBLE

DOUBLE

STRING

VARCHAR

All other types

OBJECT

CSV

CSV files must have a header. Columns that do not match any fields are ignored, and fields that do not having corresponding columns have null values.

  • Jet

  • SQL

The header columns must match the class fields that you want to deserialize them into.

Create the file source in the following way to read from file users.csv and deserialize into a User class.

BatchSource<User> source = FileSources.files("/data")
  .glob("users.csv")
  .format(FileFormat.csv(User.class))
  .build();

If you omit the column list from the mapping declaration, Hazelcast will try to infer the column names from the file header. All columns have the VARCHAR type.

CREATE MAPPING my_files
TYPE File
OPTIONS (
    'path' = '/path/to/directory',
    'format' = 'csv'
)

JSON

JSON files must be in the JSON Lines format.

  • Jet

  • SQL

JSON fields must match the class fields that you want to deserialize them into.

Create the file source in the following way to read from file users.jsonl and deserialize into a User class.

BatchSource<User> source = FileSources.files("/data")
  .glob("users.jsonl")
  .format(FileFormat.json(User.class))
  .build();

JSON files are expected to contain one valid JSON document per line and be UTF-8 encoded. If you omit any mapping columns from the declaration, Hazelcast infers names and types based on a sample.

CREATE MAPPING my_files
TYPE File
OPTIONS (
    'path' = '/path/to/directory',
    'format' = 'json-flat'
)
JSON type SQL Type

BOOLEAN

BOOLEAN

NUMBER

DOUBLE

STRING

VARCHAR

All other types

OBJECT

Text

Read data from .txt files.

  • Jet

  • SQL

Create the file source in the following way to read file as text, whole file is read as a single String:

BatchSource<String> source = FileSources.files("/data")
  .glob("file.txt")
  .format(FileFormat.text())
  .build();

When reading from local filesystem you can specify the character encoding. This is not supported when using the Hadoop based modules. If provided the option will be ignored.

BatchSource<String> source = FileSources.files("/data")
  .glob("file.txt")
  .format(FileFormat.text(Charset.forName("Cp1250")));

You can read file line by line in the following way, this is the default and you can omit the .format(FileFormat.lines()) part.

BatchSource<String> source = FileSources.files("/data")
  .glob("file.txt")
  .format(FileFormat.lines())
  .build();

This file format is not supported by SQL.

Parquet

Apache Parquet is a columnar storage format. It describes how the data is stored on disk. It doesn’t specify how the data is supposed to be deserialized, and it uses other libraries to achieve that. Namely we use Apache Avro for deserialization.

Parquet has a dependency on Hadoop, so it can be used only with one of the Hadoop based modules.

  • Jet

  • SQL

Create the file source in the following way to read data from a parquet file:

BatchSource<String> source = FileSources.files("/data")
  .glob("users.parquet")
  .format(FileFormat.<SpecificUser>parquet())
  .build();

To read parquet file from the local filesystem, use the useHadoopForLocalFiles option.

CREATE MAPPING my_files
TYPE File
OPTIONS (
    'path' = 'hdfs://path/to/directory',
    'format' = 'parquet'
    /* more Hadoop options ... */
)

Raw Binary

Read binary files such as images.

  • Jet

  • SQL

BatchSource<byte[]> source = FileSources.files("/data")
  .glob("file.txt")
  .format(FileFormat.bytes())
  .build();

This file format is not supported by SQL.

Fvecs and ivecs

Enterprise Edition

Fvecs and ivecs files are binary files containing, respectively, float and integer vectors with IDs.

  • Jet

  • SQL

BatchSource<Map.Entry<Integer, VectorValues>> vectors = FileSources.files("/data")
  .glob("embeddings.fvecs")
  .format(VectorSources.fvecsFormat())
  .build();

BatchSource<Map.Entry<Integer, int[]>> groundTruth = FileSources.files("/data")
  .glob("groundtruth.ivecs")
  .format(VectorSources.ivecsFormat())
  .build();

This file format is not supported by SQL.

Authentication

To allow Hazelcast to access files in remote systems, you must pass authentication credentials to the file connector.

Due to performance, authentication credentials are cached, which may cause issues when submitting jobs with different credentials, or even the same jobs with new credentials such as after credentials rotation.

To turn off authentication caching, set the fs.<prefix>.impl.disable.cache option to true, where <prefix> is a path schema.

Amazon S3

Provide your AWS access key ID and secret key with required access via fs.s3a.access.key and fs.s3a.secret.key options.

For additional ways to authenticate see the Hadoop-AWS documentation and Amazon S3 documentation .

Google Cloud Storage

Provide a location of the keyfile via google.cloud.auth.service.account.json.keyfile source option.

The file must be available on all the cluster members.

For additional ways to authenticate see Google Cloud Hadoop connector.

Windows Azure Blob Storage

Provide an account key via fs.azure.account.key.<your account name>.blob.core.windows.net source option.

For additional ways to authenticate see Hadoop Azure Blob Storage support.

Azure Data Lake Generation 1

Provide the following options:

fs.adl.oauth2.access.token.provider.type
fs.adl.oauth2.refresh.url
fs.adl.oauth2.client.id
fs.adl.oauth2.credential

For additional ways to authenticate see Hadoop Azure Data Lake Support

Azure Data Lake Generation 2

For additional ways to authenticate see Hadoop Azure Data Lake Storage Gen2

Hadoop with Custom Classpath

When submitting jobs that use Hadoop, sending Hadoop JARs should be avoided and instead the Hadoop classpath should be used. Hadoop JARs contain some JVM hooks and can keep lingering references inside the JVM long after the job has ended, causing memory leaks.

To obtain the Hadoop classpath, use the hadoop classpath command and append the output to the CLASSPATH environment variable before starting Hazelcast.

export CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath)

Hadoop Native Libraries

The underlying Hadoop infrastructure can make a use of native libraries for compression/decompression and CRC checksums. When the native libraries are not configured you will see the following message in the logs:

[o.a.h.u.NativeCodeLoader]: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Configure the native libraries by adding the location to LD_LIBRARY_PATH environment variable:

export LD_LIBRARY_PATH=<path to hadoop>/lib/native:$LD_LIBRARY_PATH

To verify that the Hadoop native libraries were successfully configured, you should no longer see the message above and if you enable logging for org.apache.hadoop you should see the following log message:

[o.a.h.u.NativeCodeLoader]: Loaded the native-hadoop library

For more detail please see the Hadoop Native Libraries Guide .