Skip to content

janis-commerce/data-lake

Repository files navigation

Data Lake

Build Status Coverage Status

Package to send data from Janis microservices to the Janis Data Lake service.

What it does

The package orchestrates initial and incremental syncs of entity data: a scheduled Lambda enqueues load messages per client; an SQS consumer reads those messages, queries the service model by date range, and uploads the results to S3 as compressed NDJSON (.ndjson.gz). The Data Lake then ingests from that raw bucket.

Flow: EventBridge Scheduler → DataLakeLoad Lambda → SQS → DataLakeSync Consumer → S3 (raw bucket).

Installation

npm install @janiscommerce/data-lake

Warning: The consumer uses returnType: 'cursor' on the model get(). Your service must use @janiscommerce/mongodb version 3.14.0 or higher, where cursor support was added.

Usage in a Janis service

To enable Data Lake sync in a microservice, follow these steps.


1. Add the hooks

Register the Data Lake hooks in your Serverless config so the Lambda, SQS queue, consumer, schedules, and IAM roles are created.

You need to pass SQSHelper from sls-helper-plugin-janis into the hooks.

File: serverless.js (or where you define helper)

'use strict';

const { helper } = require('sls-helper');
const { SQSHelper } = require('sls-helper-plugin-janis');
const { dataLakeServerlessHelperHooks } = require('@janiscommerce/data-lake');

module.exports = helper({
	hooks: [
		// ... other hooks (api, functions, etc.)
		...dataLakeServerlessHelperHooks(SQSHelper)
	]
});

What this adds:

  • Lambda DataLakeLoad (handler: src/lambda/DataLakeLoad/index.js)
  • SQS queue and consumer Lambda for dataLakeSync (handler: src/sqs-consumer/data-lake-sync-consumer.js)
  • EventBridge Schedule Group and one Schedule per entity (from Settings), invoking the Lambda with incremental: true and entity in kebab-case
  • IAM role for the scheduler to invoke the Lambda

2. Add the DataLakeLoad Lambda

Create the Lambda handler that extends the package class. It must live at the path expected by the hooks: src/lambda/DataLakeLoad/index.js.

File: src/lambda/DataLakeLoad/index.js

'use strict';

const { Handler } = require('@janiscommerce/lambda');
const { DataLakeLoadFunction } = require('@janiscommerce/data-lake');

module.exports.handler = (...args) => Handler.handle(DataLakeLoadFunction, ...args);

Payload: The Lambda receives a payload with entity, incremental, and optionally from, to, limit, maxSizeMB. The schedules send only { incremental: true, entity: "<entity-kebab-case>" }. For an initial load you must invoke the Lambda manually with entity, incremental: false, and from (and optionally to, limit, maxSizeMB). Sync messages sent to SQS can also carry additionalFilters and filenamePrefix when you override prepareIncrementalMessages (see below).

Performance: The Lambda reuses a single SqsEmitter instance across all clients in a single invocation, avoiding file descriptor exhaustion (EMFILE errors) when processing hundreds or thousands of clients. Initial load messages are prepared once and reused for all clients.

Partitioning incremental loads (optional)

Some entities need compound query filters (for example, to match a MongoDB index) while still using the same incremental date range (dateModifiedFrom / dateModifiedTo). In that case the microservice can split one incremental run into several SQS messages by overriding prepareIncrementalMessages on DataLakeLoadFunction.

Default behavior: prepareIncrementalMessages(clientCode, content) returns [{ content }] — one queue message per client per scheduled incremental run.

Override: Return an array of objects shaped like { content: { ... } }, each content being a full sync payload. The Data Lake Sync consumer merges optional additionalFilters into the model get() filters together with the date-range filters.

content shape (incremental) — usually you spread the incoming content and add fields per message:

Field Type Description
entity string Entity in kebab-case (same as settings).
incremental boolean true for incremental.
from Date or string Start of range (provided by the base class; serializes to ISO when sent to SQS).
to Date or string End of range.
additionalFilters object Optional. Merged into the consumer’s filters (e.g. { warehouse: id }). Keys must match what your model’s get() expects.
filenamePrefix string Optional. Prepended to the S3 object file name (with a trailing -) so partitioned runs for the same window do not collide (e.g. warehouse-123).
limit / maxSizeMB number Optional. Passed through like manual invokes.

Consumer: See DataLakeSyncConsumer — it validates additionalFilters as an optional object and applies it alongside dateModifiedFrom / dateModifiedTo (incremental) or dateCreatedFrom / dateCreatedTo (initial). Optional filenamePrefix is applied to uploaded .ndjson.gz keys: …/<prefix>-<from>-<pushedAt>-<part>.ndjson.gz (omit filenamePrefix for the default …/<from>-<pushedAt>-<part>.ndjson.gz).

Client watermark: After a successful publish, the base class updates settings.<entity>.lastIncrementalLoadDate to to for that client. If you emit multiple messages from prepareIncrementalMessages, they are published in one publishEvents batch for that client; the watermark advances only when that batch succeeds (no partial update on failure).

Initial load: Partitioning is only documented here for incremental flows. Initial load still uses the built-in per-day messages from sendInitialLoadMessages unless you extend that path separately.

Example: one incremental message per warehouse for entity stock:

'use strict';

const { Handler } = require('@janiscommerce/lambda');
const { DataLakeLoadFunction } = require('@janiscommerce/data-lake');
const { ApiSession } = require('@janiscommerce/api-session');

const WarehouseModel = require('../../models/warehouse');

class WMSDataLakeLoadFunction extends DataLakeLoadFunction {

	async prepareIncrementalMessages(clientCode, content) {

		if(content.entity !== 'stock')
			return [{ content }];

		const session = new ApiSession({ clientCode });

		const warehouseModel = session.getSessionInstance(WarehouseModel);

		const messages = [];

		await warehouseModel.getPaged({
			fields: ['id']
		}, warehouses => {
			warehouses.forEach(({ id }) => {
				messages.push({
					content: {
						...content,
						additionalFilters: { warehouse: id },
						filenamePrefix: `warehouse-${id}`
					}
				});
			});
		});

		return messages;
	}
}

module.exports.handler = (...args) => Handler.handle(WMSDataLakeLoadFunction, ...args);

3. Add the Data Lake Sync consumer

Create the SQS consumer file at the path expected by the hooks: src/sqs-consumer/data-lake-sync-consumer.js.

File: src/sqs-consumer/data-lake-sync-consumer.js

'use strict';

const { SQSHandler } = require('@janiscommerce/sqs-consumer');
const { DataLakeSyncConsumer } = require('@janiscommerce/data-lake');

module.exports.handler = event => SQSHandler.handle(DataLakeSyncConsumer, event);

This re-exports the package consumer handler. The consumer reads messages from the Data Lake sync queue, loads data from your model (by entity and date range, plus optional additionalFilters), and uploads NDJSON.gz to S3. Optional filenamePrefix on the message body changes the uploaded object key so parallel or split runs stay distinguishable in the raw bucket.

Options: The consumer uses IterativeSQSConsumer (one record per invocation when batchSize: 1).


4. Add settings for entities to sync

Configure which entities are synced and how often (in minutes). Settings are read from your service config (e.g. environments/{environment}/.janiscommercerc.json by @janiscommerce/settings).

File: src/environments/{environment}/.janiscommercerc.json

{
	"dataLake": {
		"entities": [{
			"name": "order",
			"frequency": 60,
			"initialLoadDate": "2025-01-01 00:00:00"
		}, {
			"name": "product",
			"frequency": 120,
			"initialLoadDate": "2025-01-01 00:00:00"
		}]
	}
}

Entity options:

Property Type Required Description Since
name string Yes Entity name in kebab-case. Must match the model path: the package loads the model from models/<name>.js (e.g. ordermodels/order.js). This same value is sent in the payload as entity. 1.0.0
frequency number No How often (in minutes) to run the incremental sync. Default: 60. Used in the Schedule expression: rate(<frequency> minutes). 1.0.0
initialLoadDate string Yes Valid date string (e.g. YYYY-MM-DD HH:mm:ss or ISO). Used when the client has no settings.<entity>.lastIncrementalLoadDate for this entity (e.g. first run or new clients). Required so incremental sync can compute the date range. 1.0.0
fields array No If set, only these fields are requested from the model in the consumer (reduces payload size and control what goes to the Data Lake). 1.0.0
excludeFields array No If set, these fields are excluded from the model in the consumer (reduces payload size and control what goes to the Data Lake). 1.1.0

Example with fields, excludeFields and initialLoadDate:

{
	"dataLake": {
		"entities": [
			{
				"name": "order",
				"frequency": 30,
				"initialLoadDate": "2025-01-01 00:00:00",
				"fields": ["id", "commerceId", "total", "status"]
			}
		]
	}
}

Important: dataLake.entities is required. If it is missing, the hooks will throw at load time: dataLake.entities is required in Settings file. Each entity must have a model at models/<name>.js and the model must support get() with filters dateCreatedFrom/dateCreatedTo (initial) or dateModifiedFrom/dateModifiedTo (incremental) and returnType: 'cursor'.


Manual execution: initial load

To run an initial load (full export by date range) you must invoke the DataLakeLoad Lambda manually. The payload must include the entity and the start date; end date and client are optional (default: all active clients, up to today).

Payload:

Field Type Required Description Since
entity string Yes Entity name in kebab-case (e.g. order, product). 1.0.0
incremental boolean Yes Use false for initial load. 1.0.0
from string Yes Start date (valid date string, e.g. YYYY-MM-DD HH:mm:ss or ISO). 1.0.0
to string No End date. Default: today end of day. 1.0.0
additionalFilters object No Optional additional filters to be merged into the consumer’s filters (e.g. { warehouse: id }). Keys must match what your model’s get() expects. 1.1.0
filenamePrefix string No Optional prefix for the S3 file name segment (before from-pushedAt-part). Use when several exports share the same date window. 1.2.0
clientCode string No If set, only this client is synced; otherwise all active clients. 1.0.0
limit number No Optional limit passed to the consumer. 1.0.0
maxSizeMB number No Optional max size per file (MB) in the consumer. 1.0.0

Example: all clients, from a given date to today

{
  "body": {
    "entity": "order",
    "incremental": false,
    "from": "2025-01-01 00:00:00"
  }
}

Example: with end date and a single client

{
  "body": {
    "entity": "product",
    "incremental": false,
    "from": "2025-01-01 00:00:00",
    "to": "2025-06-30 23:59:59",
    "clientCode": "my-client-code"
  }
}

Environment variables

  • DATA_LAKE_SYNC_SQS_QUEUE_URL – Set by the hooks (SQS queue URL for the sync queue). The DataLakeLoad Lambda publishes messages here.
  • S3_DATA_LAKE_RAW_BUCKET – Set by the hooks on the consumer Lambda (e.g. janis-data-lake-service-raw-${stage}). The consumer uploads NDJSON.gz files here.

About

Package to send data to Janis Commerce Data Lake service

Resources

Stars

Watchers

Forks

Packages

 
 
 

Contributors