Tesey DeltaIngester can be used to ingest data from JDBC sources to HDFS and object storages like s3, gs, etc.
- Clone the repository, and package with:
mvn clean install
- Describe endpoints configs in endpoints.json similar to following:
{
"endpoints": [
{
"name": "test-db",
"options": [
{"name": "host", "value": "testdb"},
{"name": "port", "value": "1521"},
{"name": "dbName", "value": "test"},
{"name": "dbType", "value": "oracle"},
{"name": "driver", "value": "oracle.jdbc.driver.OracleDriver"},
{"name": "user", "value": "root"},
{"name": "credentialProviderPath", "value": "jceks://hdfs/user/hadoop/test-root-key.jceks"},
{"name": "passwordAlias", "value": "oracle.password.alias"}
]
},
{
"name": "test-parquet",
"options": [
{"name": "location", "value": "hdfs:///test/parquet"},
{"name": "format", "value": "parquet"}
]
}
]
}-
Prepare Avro schemas corresponding with schemas of tables that should be ingested.
-
Describe tables in tables.json similar to following:
{
"tables": [
{
"name" : "test_daily",
"options" : [
{"name": "tableName", "value": "test"},
{"name": "schema", "value": "test.avsc"},
{"name": "mode", "value": "daily"},
{"name": "checkColumn", "value": "load_date"},
{"name": "partitionKeys", "value": "load_date"}
]
},
{
"name" : "test",
"options" : [
{"name": "tableName", "value": "test"},
{"name": "schema", "value": "test.avsc"},
{"name": "mode", "value": "completely"},
{"name": "partitionKeys", "value": "load_date"},
{"name": "partitionColumn", "value": "load_date"},
{"name": "lowerBound", "value": "2020-06-28"},
{"name": "upperBound", "value": "2020-06-30"},
{"name": "numPartitions", "value": "3"},
]
}
]
}- Submit Spark Application like the following:
spark-submit \
--class org.tesey.ingester.spark.DeltaIngester \
--name DeltaIngester \
--master yarn \
--num-executors 2 \
--driver-memory 512m \
--executor-memory 512m \
./target/tesey-delta-ingester-1.0-SNAPSHOT.jar \
--endpointsConfigPath hdfs:///configs/endpoints.json \
--tablesConfigPath hdfs:///configs/tables.json \
--schemasPath hdfs:///schemas \
--sourceName test-db \
--sinkName test-parquet \
--mode daily| Field | Description |
|---|---|
urlstring |
Database connection URL |
hoststring |
The host name of source database server |
portstring |
The port of source database server |
dbNamestring |
The name of source database |
dbTypestring |
The name of RDBMS. Currently supported oracle
|
driverstring |
Database driver |
userstring |
The name of user to connect to source database |
credentialProviderPathstring |
The path to credential store provider that is used to retrieve the password of user to connect to source database |
passwordAliasstring |
The credential alias used to retrieve password of user to connect to source database |
batchSizestring |
The JDBC batch size, which determines how many rows to ingest per round trip |
locationstring |
The path to write the ingested data |
formatstring |
The format to save the ingested data. Currently supported types:
|
| Field | Description |
|---|---|
tableNamestring |
The name of ingesting table in source database |
schemastring |
The path to Avro schema, that corresponds with the structure of ingesting rows |
modestring |
The ingestion mode. The possible options:
|
checkColumnstring |
The check column used to identify rows that should be ingested in modes incrementally and daily
|
lastValuestring |
The maximum value of check column in the previous ingestion, used to indentify rows that should be ingested in mode incrementally
|
partitionKeysstring |
A comma-separated list of fields which is used for partitioning the output dataset on |
partitionColumnstring |
When reading rows from the JDBC source data should be partitioned on the given column |
lowerBoundstring |
The minimum value of partitionColumn to read
|
upperBoundstring |
The maximum value of partitionColumn to read
|
numPartitionsstring |
The maximum number of partitions used for parallelism in table reading |
batchSizestring |
The JDBC batch size, which determines how many rows to ingest per round trip |
| Field | Description |
|---|---|
endpointsConfigPathstring |
The path to endpoints config file |
tablesConfigPathstring |
The path to tables config file |
schemasPathstring |
The path to Avro schemas |
sourceNamestring |
The name of endpoint that is used as a data source |
sinkNamestring |
The name of endpoint that is used as a data sink |
modestring |
The ingestion mode (completely/incrementally/daily)
|