Skip to content

tesey-io/DeltaIngester

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

37 Commits
 
 
 
 
 
 
 
 

Repository files navigation

Tesey DeltaIngester

Tesey DeltaIngester can be used to ingest data from JDBC sources to HDFS and object storages like s3, gs, etc.

Usage

  1. Clone the repository, and package with:
mvn clean install
  1. Describe endpoints configs in endpoints.json similar to following:
{
    "endpoints": [
        {
            "name": "test-db",
            "options": [
                {"name": "host", "value": "testdb"},
                {"name": "port", "value": "1521"},
                {"name": "dbName", "value": "test"},
                {"name": "dbType", "value": "oracle"},
                {"name": "driver", "value": "oracle.jdbc.driver.OracleDriver"},
                {"name": "user", "value": "root"},
                {"name": "credentialProviderPath", "value": "jceks://hdfs/user/hadoop/test-root-key.jceks"},
                {"name": "passwordAlias", "value": "oracle.password.alias"}
            ]
        },
        {
            "name": "test-parquet",
            "options": [
                {"name": "location", "value": "hdfs:///test/parquet"},
                {"name": "format", "value": "parquet"}
            ]
        }
    ]
}
  1. Prepare Avro schemas corresponding with schemas of tables that should be ingested.

  2. Describe tables in tables.json similar to following:

{
    "tables": [
        {
            "name" : "test_daily",
            "options" : [
                {"name": "tableName", "value": "test"},
                {"name": "schema", "value": "test.avsc"},
                {"name": "mode", "value": "daily"},
                {"name": "checkColumn", "value": "load_date"},
                {"name": "partitionKeys", "value": "load_date"}
            ]
        },
        {
            "name" : "test",
            "options" : [
                {"name": "tableName", "value": "test"},
                {"name": "schema", "value": "test.avsc"},
                {"name": "mode", "value": "completely"},
                {"name": "partitionKeys", "value": "load_date"},
                {"name": "partitionColumn", "value": "load_date"},
                {"name": "lowerBound", "value": "2020-06-28"},
                {"name": "upperBound", "value": "2020-06-30"},
                {"name": "numPartitions", "value": "3"},
            ]
        }
    ]
}
  1. Submit Spark Application like the following:
spark-submit \
--class org.tesey.ingester.spark.DeltaIngester \
--name DeltaIngester \
--master yarn \
--num-executors 2 \
--driver-memory 512m \
--executor-memory 512m \
./target/tesey-delta-ingester-1.0-SNAPSHOT.jar \
--endpointsConfigPath hdfs:///configs/endpoints.json \
--tablesConfigPath hdfs:///configs/tables.json \
--schemasPath hdfs:///schemas \
--sourceName test-db \
--sinkName test-parquet \
--mode daily

Endpoints specification

Field Description
url
string
Database connection URL
host
string
The host name of source database server
port
string
The port of source database server
dbName
string
The name of source database
dbType
string
The name of RDBMS. Currently supported oracle
driver
string
Database driver
user
string
The name of user to connect to source database
credentialProviderPath
string
The path to credential store provider that is used to retrieve the password of user to connect to source database
passwordAlias
string
The credential alias used to retrieve password of user to connect to source database
batchSize
string
The JDBC batch size, which determines how many rows to ingest per round trip
location
string
The path to write the ingested data
format
string
The format to save the ingested data. Currently supported types:
  • AVRO
  • Parquet
  • ORC

Tables specification

Field Description
tableName
string
The name of ingesting table in source database
schema
string
The path to Avro schema, that corresponds with the structure of ingesting rows
mode
string
The ingestion mode. The possible options:
  • completely - ingesting all rows from source table
  • incrementally - ingesting rows where check column has a value greater than the one specified with lastValue
  • daily - ingesting rows from source table inserted in previous day
checkColumn
string
The check column used to identify rows that should be ingested in modes incrementally and daily
lastValue
string
The maximum value of check column in the previous ingestion, used to indentify rows that should be ingested in mode incrementally
partitionKeys
string
A comma-separated list of fields which is used for partitioning the output dataset on
partitionColumn
string
When reading rows from the JDBC source data should be partitioned on the given column
lowerBound
string
The minimum value of partitionColumn to read
upperBound
string
The maximum value of partitionColumn to read
numPartitions
string
The maximum number of partitions used for parallelism in table reading
batchSize
string
The JDBC batch size, which determines how many rows to ingest per round trip

Spark Application arguments

Field Description
endpointsConfigPath
string
The path to endpoints config file
tablesConfigPath
string
The path to tables config file
schemasPath
string
The path to Avro schemas
sourceName
string
The name of endpoint that is used as a data source
sinkName
string
The name of endpoint that is used as a data sink
mode
string
The ingestion mode (completely/incrementally/daily)

About

Tesey DeltaIngester can be used to ingest data from JDBC sources to HDFS and object storages like s3, gs, etc.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages