Package to send data from Janis microservices to the Janis Data Lake service.
The package orchestrates initial and incremental syncs of entity data: a scheduled Lambda enqueues load messages per client; an SQS consumer reads those messages, queries the service model by date range, and uploads the results to S3 as compressed NDJSON (.ndjson.gz). The Data Lake then ingests from that raw bucket.
Flow: EventBridge Scheduler → DataLakeLoad Lambda → SQS → DataLakeSync Consumer → S3 (raw bucket).
npm install @janiscommerce/data-lakeWarning: The consumer uses
returnType: 'cursor'on the modelget(). Your service must use @janiscommerce/mongodb version 3.14.0 or higher, where cursor support was added.
To enable Data Lake sync in a microservice, follow these steps.
Register the Data Lake hooks in your Serverless config so the Lambda, SQS queue, consumer, schedules, and IAM roles are created.
You need to pass SQSHelper from sls-helper-plugin-janis into the hooks.
File: serverless.js (or where you define helper)
'use strict';
const { helper } = require('sls-helper');
const { SQSHelper } = require('sls-helper-plugin-janis');
const { dataLakeServerlessHelperHooks } = require('@janiscommerce/data-lake');
module.exports = helper({
hooks: [
// ... other hooks (api, functions, etc.)
...dataLakeServerlessHelperHooks(SQSHelper)
]
});What this adds:
- Lambda DataLakeLoad (handler:
src/lambda/DataLakeLoad/index.js) - SQS queue and consumer Lambda for dataLakeSync (handler:
src/sqs-consumer/data-lake-sync-consumer.js) - EventBridge Schedule Group and one Schedule per entity (from Settings), invoking the Lambda with
incremental: trueandentityin kebab-case - IAM role for the scheduler to invoke the Lambda
Create the Lambda handler that extends the package class. It must live at the path expected by the hooks: src/lambda/DataLakeLoad/index.js.
File: src/lambda/DataLakeLoad/index.js
'use strict';
const { Handler } = require('@janiscommerce/lambda');
const { DataLakeLoadFunction } = require('@janiscommerce/data-lake');
module.exports.handler = (...args) => Handler.handle(DataLakeLoadFunction, ...args);Payload: The Lambda receives a payload with entity, incremental, and optionally from, to, limit, maxSizeMB. The schedules send only { incremental: true, entity: "<entity-kebab-case>" }. For an initial load you must invoke the Lambda manually with entity, incremental: false, and from (and optionally to, limit, maxSizeMB). Sync messages sent to SQS can also carry additionalFilters and filenamePrefix when you override prepareIncrementalMessages (see below).
Performance: The Lambda reuses a single SqsEmitter instance across all clients in a single invocation, avoiding file descriptor exhaustion (EMFILE errors) when processing hundreds or thousands of clients. Initial load messages are prepared once and reused for all clients.
Some entities need compound query filters (for example, to match a MongoDB index) while still using the same incremental date range (dateModifiedFrom / dateModifiedTo). In that case the microservice can split one incremental run into several SQS messages by overriding prepareIncrementalMessages on DataLakeLoadFunction.
Default behavior: prepareIncrementalMessages(clientCode, content) returns [{ content }] — one queue message per client per scheduled incremental run.
Override: Return an array of objects shaped like { content: { ... } }, each content being a full sync payload. The Data Lake Sync consumer merges optional additionalFilters into the model get() filters together with the date-range filters.
content shape (incremental) — usually you spread the incoming content and add fields per message:
| Field | Type | Description |
|---|---|---|
entity |
string | Entity in kebab-case (same as settings). |
incremental |
boolean | true for incremental. |
from |
Date or string | Start of range (provided by the base class; serializes to ISO when sent to SQS). |
to |
Date or string | End of range. |
additionalFilters |
object | Optional. Merged into the consumer’s filters (e.g. { warehouse: id }). Keys must match what your model’s get() expects. |
filenamePrefix |
string | Optional. Prepended to the S3 object file name (with a trailing -) so partitioned runs for the same window do not collide (e.g. warehouse-123). |
limit / maxSizeMB |
number | Optional. Passed through like manual invokes. |
Consumer: See DataLakeSyncConsumer — it validates additionalFilters as an optional object and applies it alongside dateModifiedFrom / dateModifiedTo (incremental) or dateCreatedFrom / dateCreatedTo (initial). Optional filenamePrefix is applied to uploaded .ndjson.gz keys: …/<prefix>-<from>-<pushedAt>-<part>.ndjson.gz (omit filenamePrefix for the default …/<from>-<pushedAt>-<part>.ndjson.gz).
Client watermark: After a successful publish, the base class updates settings.<entity>.lastIncrementalLoadDate to to for that client. If you emit multiple messages from prepareIncrementalMessages, they are published in one publishEvents batch for that client; the watermark advances only when that batch succeeds (no partial update on failure).
Initial load: Partitioning is only documented here for incremental flows. Initial load still uses the built-in per-day messages from sendInitialLoadMessages unless you extend that path separately.
Example: one incremental message per warehouse for entity stock:
'use strict';
const { Handler } = require('@janiscommerce/lambda');
const { DataLakeLoadFunction } = require('@janiscommerce/data-lake');
const { ApiSession } = require('@janiscommerce/api-session');
const WarehouseModel = require('../../models/warehouse');
class WMSDataLakeLoadFunction extends DataLakeLoadFunction {
async prepareIncrementalMessages(clientCode, content) {
if(content.entity !== 'stock')
return [{ content }];
const session = new ApiSession({ clientCode });
const warehouseModel = session.getSessionInstance(WarehouseModel);
const messages = [];
await warehouseModel.getPaged({
fields: ['id']
}, warehouses => {
warehouses.forEach(({ id }) => {
messages.push({
content: {
...content,
additionalFilters: { warehouse: id },
filenamePrefix: `warehouse-${id}`
}
});
});
});
return messages;
}
}
module.exports.handler = (...args) => Handler.handle(WMSDataLakeLoadFunction, ...args);Create the SQS consumer file at the path expected by the hooks: src/sqs-consumer/data-lake-sync-consumer.js.
File: src/sqs-consumer/data-lake-sync-consumer.js
'use strict';
const { SQSHandler } = require('@janiscommerce/sqs-consumer');
const { DataLakeSyncConsumer } = require('@janiscommerce/data-lake');
module.exports.handler = event => SQSHandler.handle(DataLakeSyncConsumer, event);This re-exports the package consumer handler. The consumer reads messages from the Data Lake sync queue, loads data from your model (by entity and date range, plus optional additionalFilters), and uploads NDJSON.gz to S3. Optional filenamePrefix on the message body changes the uploaded object key so parallel or split runs stay distinguishable in the raw bucket.
Options: The consumer uses IterativeSQSConsumer (one record per invocation when batchSize: 1).
Configure which entities are synced and how often (in minutes). Settings are read from your service config (e.g. environments/{environment}/.janiscommercerc.json by @janiscommerce/settings).
File: src/environments/{environment}/.janiscommercerc.json
{
"dataLake": {
"entities": [{
"name": "order",
"frequency": 60,
"initialLoadDate": "2025-01-01 00:00:00"
}, {
"name": "product",
"frequency": 120,
"initialLoadDate": "2025-01-01 00:00:00"
}]
}
}Entity options:
| Property | Type | Required | Description | Since |
|---|---|---|---|---|
name |
string | Yes | Entity name in kebab-case. Must match the model path: the package loads the model from models/<name>.js (e.g. order → models/order.js). This same value is sent in the payload as entity. |
1.0.0 |
frequency |
number | No | How often (in minutes) to run the incremental sync. Default: 60. Used in the Schedule expression: rate(<frequency> minutes). |
1.0.0 |
initialLoadDate |
string | Yes | Valid date string (e.g. YYYY-MM-DD HH:mm:ss or ISO). Used when the client has no settings.<entity>.lastIncrementalLoadDate for this entity (e.g. first run or new clients). Required so incremental sync can compute the date range. |
1.0.0 |
fields |
array | No | If set, only these fields are requested from the model in the consumer (reduces payload size and control what goes to the Data Lake). | 1.0.0 |
excludeFields |
array | No | If set, these fields are excluded from the model in the consumer (reduces payload size and control what goes to the Data Lake). | 1.1.0 |
Example with fields, excludeFields and initialLoadDate:
{
"dataLake": {
"entities": [
{
"name": "order",
"frequency": 30,
"initialLoadDate": "2025-01-01 00:00:00",
"fields": ["id", "commerceId", "total", "status"]
}
]
}
}Important: dataLake.entities is required. If it is missing, the hooks will throw at load time: dataLake.entities is required in Settings file. Each entity must have a model at models/<name>.js and the model must support get() with filters dateCreatedFrom/dateCreatedTo (initial) or dateModifiedFrom/dateModifiedTo (incremental) and returnType: 'cursor'.
To run an initial load (full export by date range) you must invoke the DataLakeLoad Lambda manually. The payload must include the entity and the start date; end date and client are optional (default: all active clients, up to today).
Payload:
| Field | Type | Required | Description | Since |
|---|---|---|---|---|
entity |
string | Yes | Entity name in kebab-case (e.g. order, product). |
1.0.0 |
incremental |
boolean | Yes | Use false for initial load. |
1.0.0 |
from |
string | Yes | Start date (valid date string, e.g. YYYY-MM-DD HH:mm:ss or ISO). |
1.0.0 |
to |
string | No | End date. Default: today end of day. | 1.0.0 |
additionalFilters |
object | No | Optional additional filters to be merged into the consumer’s filters (e.g. { warehouse: id }). Keys must match what your model’s get() expects. |
1.1.0 |
filenamePrefix |
string | No | Optional prefix for the S3 file name segment (before from-pushedAt-part). Use when several exports share the same date window. |
1.2.0 |
clientCode |
string | No | If set, only this client is synced; otherwise all active clients. | 1.0.0 |
limit |
number | No | Optional limit passed to the consumer. | 1.0.0 |
maxSizeMB |
number | No | Optional max size per file (MB) in the consumer. | 1.0.0 |
Example: all clients, from a given date to today
{
"body": {
"entity": "order",
"incremental": false,
"from": "2025-01-01 00:00:00"
}
}Example: with end date and a single client
{
"body": {
"entity": "product",
"incremental": false,
"from": "2025-01-01 00:00:00",
"to": "2025-06-30 23:59:59",
"clientCode": "my-client-code"
}
}DATA_LAKE_SYNC_SQS_QUEUE_URL– Set by the hooks (SQS queue URL for the sync queue). The DataLakeLoad Lambda publishes messages here.S3_DATA_LAKE_RAW_BUCKET– Set by the hooks on the consumer Lambda (e.g.janis-data-lake-service-raw-${stage}). The consumer uploads NDJSON.gz files here.