Unified File Connector
The unified file connector provides a simple way to read files across different sources. Using this connector, you can read files from the local filesystem, HDFS and cloud storage systems such as Amazon S3, Google Cloud Storage or Azure Blob Storage. This connector supports various data formats, including text files, CSV, JSON, and Avro.
This connector can be used only as a source. To use files as a sink, you can use the Jet API with the legacy file connector. You cannot yet use files as a sink in SQL. |
Installing the Connector
To read from a local or shared filesystem, you do not need to install any additional modules as these are included in the full and slim distributions of Hazelcast.
To access Hadoop or any of the cloud-based file systems, you may need to add one of the downloadable modules listed in Supported Storage Systems to your members' classpaths.
Depending on the format of your files, you may also need to add other modules to the classpath.
Permissions
If you use Hazelcast Enterprise, you can set up permissions to restrict clients' access to your files. For details, see Securing Jobs.
Supported File Systems
Hazelcast supports reading from the following file systems.
If you use the slim distribution of Hazelcast, be sure to add the respective modules to your members' classpaths.
Storage System | Module | Example path |
---|---|---|
Member Filesystems (both shared and local) |
|
|
Hadoop Distributed File System (HDFS) |
|
|
Amazon S3 |
|
|
Google Cloud Storage |
|
|
Windows Azure Blob Storage |
|
|
Azure Data Lake Generation 1 |
|
|
Azure Data Lake Generation 2 |
|
Although these are the officially supported sources, you can also read from any file system that’s compatible with Hadoop.
Configuration Options
Use these options to configure the file connector.
path
The absolute path to a single directory.
This parameter must not contain any wildcard characters. The given directory is not read recursively.
Hazelcast supports the following path schemes:
-
hdfs
: HDFS -
s3a
: Amazon S3 -
wasbs
: Azure Cloud Storage -
adl
: Azure Data Lake Generation 1 -
abfs
: Azure Data Lake Generation 2 -
gs
: Google Cloud Storage
For example, a path to a file in a Hadoop cluster would start
with hdfs://path/to/directory/
.
Any path that does not start with a schema is considered local (files on Hazelcast members). |
Default:
The main entrypoint to the file connector is FileSources.files
, which
takes a path
as a String
parameter and returns a FileSourceBuilder
object.
If you don’t specify a format, this API tries to read a .txt
file line by line.
BatchSource<String> source = FileSources.files("/path/to/my/directory")
.build();
Create a mapping to a directory, using the path
option.
You must also specify a format.
CREATE MAPPING my_files
TYPE File
OPTIONS (
'path' = '/path/to/directory',
'format' = 'json-flat'
)
If you omit a column list from the CREATE MAPPING
command:
-
Hazelcast will read a sample file from the directory and try to determine column names and types from it.
-
The
path
directory must be available at the time you execute theCREATE MAPPING
statement and it must not be empty. -
Every cluster member must have at least one file in the
path
directory.
format
The file format.
The file connector defaults to UTF-8 encoded text with the file
read line by line. You can specify the file format using
format(FileFormat)
method. See the available formats in the
FileFormat.*
interface.
The Jet API supports all available file formats.
glob
A filename pattern that uses wildcard characters such as *
or
?
to filter the files in the path
directory.
Default: *
(match all files)
For example, if a directory contains JSON files named using the
YYYY-MM-DD.log
pattern, you can read all the files from January 2020 with the following glob
argument:
ignoreFileNotFound
Return zero results instead of throwing an error when files in the path
directory are not found.
Default: false
BatchSource<String> source = FileSources.files("/orders/")
.glob("2020-01-*.json")
.ignoreFileNotFound(true)
.build();
If you set this option to true
, you must specify the column list.
This option is not valid for file table functions, because they always need at least one record from which to derive the column list.
CREATE MAPPING my_orders
TYPE File
OPTIONS (
'path' = '/orders',
'format' = 'json-flat',
'glob' = '2020-01-*.json'
'ignoreFileNotFound' = 'true'
)
option
Options to pass to the file system such as authentication options.
sharedFileSystem
Read shared files only once instead of reading each copy on every member.
Default: false
If the path
directory is shared among cluster
members such as on a network mounted filesystem, set the
sharedFileSystem
option to true
. The files will be assigned among
the members and reading the files multiple times (once on each member)
will be avoided.
useHadoopforLocalFiles
Use the Hadoop connector module to read files from a local filesystem.
Default: false
This option might be beneficial when you need to parallelize
reading from a single large file, or read only a subset of columns when
using the Parquet format. The hazelcast-jet-hadoop
module
must be on your members' classpaths.
BatchSource<String> source = FileSources.files("/data")
.glob("wikipedia.txt")
.useHadoopForLocalFiles(true)
.build();
You can provide additional options to Hadoop. For example, to read all files in a directory recursively:
BatchSource<String> source = FileSources.files("/data")
.glob("wikipedia.txt")
.useHadoopForLocalFiles(true)
.option("mapreduce.input.fileinputformat.input.dir.recursive", "true")
.build();
This option is not supported by SQL.
Supported File Formats
Hazelcast supports reading from the following file formats.
Avro
The Avro format allows you to read data from files in the Avro Object Container File
format. To use the Avro format you must have the
hazelcast-jet-avro
module on your members' classpaths.
Suppose you have a class User
, generated from the Avro schema, you can
read the data from an Avro file in the following way. Notice that you
don’t need to provide the User
class to the builder, but you do need to
satisfy the Java type system:
BatchSource<User> source = FileSources.files("/data")
.glob("users.avro")
.format(FileFormat.<User>avro())
.build();
This example will use Avro’s SpecificDatumReader
under the hood.
If you don’t have a class generated from the Avro schema, but the structure of your class matches the data, you can use Java reflection to read the data:
BatchSource<User> source = FileSources.files("/data")
.glob("users.avro")
.format(FileFormat.avro(User.class))
.build();
This example will use Avro’s ReflectDatumReader
under the hood.
CREATE MAPPING users
TYPE File
OPTIONS (
'path' = '/users',
'format' = 'avro',
'glob' = '*.avro'
)
Avro Type | SQL Type |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
All other types |
|
CSV
CSV files must have a header. Columns that do not match any
fields are ignored, and fields that do not having corresponding columns have null
values.
The header columns must match the class fields that you want to deserialize them into.
Create the file source in the following way to read from file
users.csv
and deserialize into a User
class.
BatchSource<User> source = FileSources.files("/data")
.glob("users.csv")
.format(FileFormat.csv(User.class))
.build();
If you omit the column
list from the mapping declaration, Hazelcast will try to infer the column
names from the file header. All columns have the VARCHAR
type.
CREATE MAPPING my_files
TYPE File
OPTIONS (
'path' = '/path/to/directory',
'format' = 'csv'
)
JSON
JSON files must be in the JSON Lines format.
JSON fields must match the class fields that you want to deserialize them into.
Create the file source in the following way to read from file
users.jsonl
and deserialize into a User
class.
BatchSource<User> source = FileSources.files("/data")
.glob("users.jsonl")
.format(FileFormat.json(User.class))
.build();
JSON files are expected to contain one valid JSON document per
line and be UTF-8
encoded. If you omit any mapping columns from the
declaration, Hazelcast infers names and types based on a sample.
CREATE MAPPING my_files
TYPE File
OPTIONS (
'path' = '/path/to/directory',
'format' = 'json-flat'
)
JSON type | SQL Type |
---|---|
|
|
|
|
|
|
All other types |
|
Text
Read data from .txt
files.
Create the file source in the following way to read file as text, whole file is read as a single String:
BatchSource<String> source = FileSources.files("/data")
.glob("file.txt")
.format(FileFormat.text())
.build();
When reading from local filesystem you can specify the character encoding. This is not supported when using the Hadoop based modules. If provided the option will be ignored.
BatchSource<String> source = FileSources.files("/data")
.glob("file.txt")
.format(FileFormat.text(Charset.forName("Cp1250")));
You can read file line by line in the following way, this is the default
and you can omit the .format(FileFormat.lines())
part.
BatchSource<String> source = FileSources.files("/data")
.glob("file.txt")
.format(FileFormat.lines())
.build();
This file format is not supported by SQL.
Parquet
Apache Parquet is a columnar storage format. It describes how the data is stored on disk. It doesn’t specify how the data is supposed to be deserialized, and it uses other libraries to achieve that. Namely we use Apache Avro for deserialization.
Parquet has a dependency on Hadoop, so it can be used only with one of the Hadoop based modules.
Create the file source in the following way to read data from a parquet file:
BatchSource<String> source = FileSources.files("/data")
.glob("users.parquet")
.format(FileFormat.<SpecificUser>parquet())
.build();
To read parquet file from the local
filesystem, use the useHadoopForLocalFiles
option.
CREATE MAPPING my_files
TYPE File
OPTIONS (
'path' = 'hdfs://path/to/directory',
'format' = 'parquet'
/* more Hadoop options ... */
)
Authentication
To allow Hazelcast to access files in remote systems, you must pass authentication credentials to the file connector.
Due to performance, authentication credentials are cached, which may cause issues when submitting jobs with different credentials, or even the same jobs with new credentials such as after credentials rotation.
To turn off authentication caching, set the
fs.<prefix>.impl.disable.cache
option to true
, where <prefix>
is a path schema.
Amazon S3
Provide your AWS access key ID and secret key with required access via
fs.s3a.access.key
and fs.s3a.secret.key
options.
For additional ways to authenticate see the Hadoop-AWS documentation and Amazon S3 documentation .
Google Cloud Storage
Provide a location of the keyfile via
google.cloud.auth.service.account.json.keyfile
source option.
The file must be available on all the cluster members. |
For additional ways to authenticate see Google Cloud Hadoop connector.
Windows Azure Blob Storage
Provide an account key via
fs.azure.account.key.<your account name>.blob.core.windows.net
source
option.
For additional ways to authenticate see Hadoop Azure Blob Storage support.
Azure Data Lake Generation 1
Provide the following options:
fs.adl.oauth2.access.token.provider.type
fs.adl.oauth2.refresh.url
fs.adl.oauth2.client.id
fs.adl.oauth2.credential
For additional ways to authenticate see Hadoop Azure Data Lake Support
Azure Data Lake Generation 2
For additional ways to authenticate see Hadoop Azure Data Lake Storage Gen2
Hadoop with Custom Classpath
When submitting jobs that use Hadoop, sending Hadoop JARs should be avoided and instead the Hadoop classpath should be used. Hadoop JARs contain some JVM hooks and can keep lingering references inside the JVM long after the job has ended, causing memory leaks.
To obtain the Hadoop classpath, use the hadoop classpath
command and
append the output to the CLASSPATH
environment variable before
starting Hazelcast.
export CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath)
Hadoop Native Libraries
The underlying Hadoop infrastructure can make a use of native libraries for compression/decompression and CRC checksums. When the native libraries are not configured you will see the following message in the logs:
[o.a.h.u.NativeCodeLoader]: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Configure the native libraries by adding the location to LD_LIBRARY_PATH environment variable:
export LD_LIBRARY_PATH=<path to hadoop>/lib/native:$LD_LIBRARY_PATH
To verify that the Hadoop native libraries were successfully configured,
you should no longer see the message above and if you enable logging
for org.apache.hadoop
you should see the following log message:
[o.a.h.u.NativeCodeLoader]: Loaded the native-hadoop library
For more detail please see the Hadoop Native Libraries Guide .