Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
3be0467
feat: implement timeseries expressions job for full timeseries comput…
dennisgsmith Feb 20, 2026
58f365f
chore: update completion docs
dennisgsmith Feb 20, 2026
6040683
chore: use displayLabel instead of label for completion content
dennisgsmith Feb 20, 2026
e63c871
feat: implement timeseries CEL expression evaluation for point and wi…
dennisgsmith Feb 24, 2026
c75eda7
chore: update tests
dennisgsmith Feb 24, 2026
601c5a7
Merge branch 'develop' into feat/timeseries-expressions
dennisgsmith Feb 25, 2026
1d13f16
Merge branch 'develop' into feat/timeseries-expressions
dennisgsmith Feb 25, 2026
788bbb7
chore: wip partial update from queue batch pooler
dennisgsmith Feb 25, 2026
425c459
chore: implement incremental updates after source/dependency timeseri…
dennisgsmith Feb 25, 2026
92933e4
fix: bad error handling in batch worker
dennisgsmith Feb 27, 2026
1ffd1a3
fix: add generated sqlc code
dennisgsmith Feb 27, 2026
5c22966
fix: incorrect logic in test looking at upstream expression dependenc…
dennisgsmith Feb 27, 2026
5de54c9
fix: show new expressions as "stored" timeseries since they are not c…
dennisgsmith Mar 5, 2026
d53df9b
Merge branch 'develop' into feat/timeseries-expressions
dennisgsmith Mar 11, 2026
ca3aba3
chore: go work sync
dennisgsmith Mar 11, 2026
0cbc2fa
fix: various logic bugs with uploader
dennisgsmith Mar 11, 2026
69797bd
chore: add validation response to uploader, update tests
dennisgsmith Mar 13, 2026
afb26a0
chore: fix openapi type annotations for uploader response codes
dennisgsmith Mar 16, 2026
4144d1b
chore: add missing "prefer_day_first" option for uploader config
dennisgsmith Mar 16, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 66 additions & 34 deletions api/cmd/midas-task/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@ import (

"github.com/USACE/instrumentation-api/api/v4/internal/cloud"
"github.com/USACE/instrumentation-api/api/v4/internal/config"
"github.com/USACE/instrumentation-api/api/v4/internal/eval"
"github.com/USACE/instrumentation-api/api/v4/internal/logger"
"github.com/USACE/instrumentation-api/api/v4/internal/pgqueue"
"github.com/USACE/instrumentation-api/api/v4/internal/service"
"github.com/USACE/instrumentation-api/api/v4/internal/worker"
"github.com/google/uuid"
"github.com/riverqueue/river"
)

var riverTimeout = 15 * time.Second

func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()
Expand Down Expand Up @@ -50,56 +52,86 @@ func run(ctx context.Context, cfg *config.TaskConfig, l *logger.Logger) (err err
err = errors.Join(err, taskServices.Shutdown(ctx))
}()

workers := river.NewWorkers()
periodicJobs := make([]*river.PeriodicJob, 0)

alertEventWorker := worker.NewAlertEventWorker(dbservice, taskServices)
river.AddWorker(workers, alertEventWorker)

emailEventWorker := worker.NewEamilEventWorker(dbservice, taskServices)
river.AddWorker(workers, emailEventWorker)

river.AddWorker(workers, worker.NewAlertScheduleWorkerV1(dbservice))
periodicJobs = append(periodicJobs, worker.AlertScheduleV1JobOptions)

tlgxWorker, err := worker.NewFetchThinglogixScheduleWorker(ctx, dbservice, &cfg.ThinglogixConfig, l)
// NewEnv loads custom Measurement type from protobuf
baseEnv, err := eval.NewEnv()
if err != nil {
return fmt.Errorf("failed to create program baseEnv: %w", err)
}
// set up lru program cache
cache, err := eval.NewProgramCache(1024)
if err != nil {
l.Error(ctx, "failed to initialize thinglogix; skipping worker", "error", err)
} else {
river.AddWorker(workers, tlgxWorker)
periodicJobs = append(periodicJobs, worker.FetchThinglogixJobOptions)
return fmt.Errorf("failed to create program cache: %w", err)
}

rivercfg := &river.Config{
ID: "midas-task__" + uuid.New().String() + "__" + time.Now().Format("2006_01_02T15_04_05_000000"),
Logger: l.Slogger(),
// Need references for in-process bus used for batching
alertEventW := worker.NewAlertEventWorker(dbservice)
exprTsComputeFullEventW := worker.NewEvaluationTimeseriesComputeFullEventWorker(dbservice, baseEnv, cache)
exprTsComputePartialEventW := worker.NewEvaluationTimeseriesComputePartialEventWorker(dbservice, baseEnv, cache)

pgq, err := pgqueue.NewWorkerClient(ctx, dbpool, &pgqueue.Options{
ClientName: "midas-task",
Logger: l.Slogger(),
Schema: cfg.RiverQueueSchema,
DefaultMaxAttempts: 1,

Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: cfg.MaxWorkers},
},
Schema: cfg.RiverQueueSchema,
Workers: workers,
PeriodicJobs: periodicJobs,
ErrorHandler: pgqueue.NewErrorHandler(l),
MaxAttempts: 1,
}

pgq, err := pgqueue.New(ctx, dbpool, rivercfg)
RegisterWorkers: func(ws *river.Workers) error {
// Workers that publish into the in-process bus will be injected after creation
// Used for tasks with high throughput
river.AddWorker(ws, alertEventW)
river.AddWorker(ws, exprTsComputePartialEventW)

// Other event workers
river.AddWorker(ws, worker.NewEmailEventWorker(dbservice, taskServices))
river.AddWorker(ws, exprTsComputeFullEventW)

// Periodic workers
river.AddWorker(ws, worker.NewAlertScheduleWorkerV1(dbservice))
if tlgx, err := worker.NewFetchThinglogixScheduleWorker(ctx, dbservice, &cfg.ThinglogixConfig, l); err != nil {
l.Error(ctx, "failed to initialize thinglogix; skipping worker", "error", err)
} else {
river.AddWorker(ws, tlgx)
}
return nil
},
Periodic: func(p *pgqueue.Periodics) error {
worker.RegisterAlertScheduleV1(p)
worker.RegisterFetchThinglogixPeriodic(p)
return nil
},
})
if err != nil {
return err
}
// TODO: This isn't a great pattern, if we forget to inject pgq into dbservice.PGQueue we will panic at runtime for certain tasks.
dbservice.PGQueue = pgq
alertEventW.PGQueue = pgq
exprTsComputeFullEventW.PGQueue = pgq
exprTsComputePartialEventW.PGQueue = pgq

defer func() {
stopCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
err = errors.Join(err, pgq.Stop(stopCtx))
err = errors.Join(err, pgq.Stop(ctx, riverTimeout))
}()

l.Info(ctx, "Starting worker pool (background)...")
if err := pgq.Start(ctx); err != nil {
return fmt.Errorf("error starting riverqueue client: %w", err)
}

l.Info(ctx, "Starting ListenPoolProcess for AlertEventWorker")
return alertEventWorker.ListenPoolProcess(ctx, cfg, l)
// Build in-process routes and run the aggregator (blocks until ctx done or error)
routes := pgqueue.Routes{}
if alertEventW != nil {
k, r := alertEventW.Route()
routes[k] = r
}
if exprTsComputePartialEventW != nil {
k, r := exprTsComputePartialEventW.Route()
routes[k] = r
}

l.Info(ctx, "Starting in-process batch dispatcher...")
bw := pgqueue.NewBatchWorker(cfg, l, pgq.WorkerBus.Sub, routes)
return bw.Start(ctx)
}
11 changes: 10 additions & 1 deletion api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,35 @@ require (
github.com/danielgtaylor/huma/v2 v2.34.1
github.com/gofrs/uuid v4.4.0+incompatible
github.com/golang-jwt/jwt/v5 v5.3.0
github.com/google/cel-go v0.27.0
github.com/google/uuid v1.6.0
github.com/hashicorp/go-version v1.7.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/jackc/pgx/v5 v5.7.4
github.com/labstack/echo-contrib v0.17.4
github.com/labstack/echo/v4 v4.13.3
github.com/ncruces/go-strftime v1.0.0
github.com/riverqueue/river v0.22.0
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.22.0
github.com/riverqueue/river/rivertype v0.22.0
github.com/robfig/cron/v3 v3.0.1
github.com/stretchr/testify v1.11.1
github.com/tidwall/btree v1.7.0
github.com/twpayne/go-geom v1.6.1
github.com/twpayne/pgx-geom v1.0.0
github.com/xeipuuv/gojsonschema v1.2.0
github.com/xnacly/go-iso8601-duration v1.1.0
github.com/xuri/excelize/v2 v2.9.0
gocloud.dev v0.45.0
golang.org/x/crypto v0.45.0
golang.org/x/term v0.37.0
golang.org/x/text v0.31.0
google.golang.org/protobuf v1.36.10
)

require (
cel.dev/expr v0.25.1 // indirect
github.com/antlr4-go/antlr/v4 v4.13.1 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.19.2 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.14 // indirect
Expand Down Expand Up @@ -92,15 +100,16 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.40.0 // indirect
go.opentelemetry.io/otel/trace v1.40.0 // indirect
go.uber.org/goleak v1.3.0 // indirect
golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 // indirect
golang.org/x/image v0.27.0 // indirect
golang.org/x/net v0.47.0 // indirect
golang.org/x/sync v0.18.0 // indirect
golang.org/x/sys v0.40.0 // indirect
golang.org/x/time v0.14.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
google.golang.org/api v0.256.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20251124214823-79d6a2a48846 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251124214823-79d6a2a48846 // indirect
google.golang.org/grpc v1.77.0 // indirect
google.golang.org/protobuf v1.36.10 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
14 changes: 14 additions & 0 deletions api/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ github.com/alecthomas/assert/v2 v2.10.0 h1:jjRCHsj6hBJhkmhznrCzoNpbA3zqy0fYiUcYZ
github.com/alecthomas/assert/v2 v2.10.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k=
github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc=
github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4=
github.com/antlr4-go/antlr/v4 v4.13.1 h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYWrPrQ=
github.com/antlr4-go/antlr/v4 v4.13.1/go.mod h1:GKmUxMtwp6ZgGwZSva4eWPC5mS6vUAmOABFgjdkM7Nw=
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de h1:FxWPpzIjnTlhPwqqXc4/vE0f7GvRjuAsbW+HOIe8KnA=
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de/go.mod h1:DCaWoUhZrYW9p1lxo/cm8EmUOOzAPSEZNGF2DK1dJgw=
github.com/aws/aws-lambda-go v1.48.0 h1:1aZUYsrJu0yo5fC4z+Rba1KhNImXcJcvHu763BxoyIo=
Expand Down Expand Up @@ -129,6 +131,8 @@ github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArs
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/cel-go v0.27.0 h1:e7ih85+4qVrBuqQWTW4FKSqZYokVuc3HnhH5keboFTo=
github.com/google/cel-go v0.27.0/go.mod h1:tTJ11FWqnhw5KKpnWpvW9CJC3Y9GK4EIS0WXnBbebzw=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/go-replayers/grpcreplay v1.3.0 h1:1Keyy0m1sIpqstQmgz307zhiJ1pV4uIlFds5weTmxbo=
Expand All @@ -149,6 +153,8 @@ github.com/googleapis/gax-go/v2 v2.15.0 h1:SyjDc1mGgZU5LncH8gimWo9lW1DtIfPibOG81
github.com/googleapis/gax-go/v2 v2.15.0/go.mod h1:zVVkkxAQHa1RQpg9z2AUCMnKhi0Qld9rcmyfL1OZhoc=
github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY=
github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
Expand Down Expand Up @@ -186,6 +192,8 @@ github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D
github.com/mattn/go-runewidth v0.0.10/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk=
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 h1:RWengNIwukTxcDr9M+97sNutRR1RKhG96O6jWumTTnw=
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826/go.mod h1:TaXosZuwdSHYgviHp1DAtfrULt5eUgsSMsZf+YrPgl8=
github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w=
github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
Expand Down Expand Up @@ -250,6 +258,8 @@ github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHo
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74=
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
github.com/xnacly/go-iso8601-duration v1.1.0 h1:5FQeoPSs0lxzCGp82TLR8F6Rm3N6idawnNBf+fsJSZs=
github.com/xnacly/go-iso8601-duration v1.1.0/go.mod h1:S1+1mC/X7HsHlc0DZNB+ljkZoopmYjzJnEXTHkq/nKE=
github.com/xuri/efp v0.0.1 h1:fws5Rv3myXyYni8uwj2qKjVaRP30PdjeYe2Y6FDsCL8=
github.com/xuri/efp v0.0.1/go.mod h1:ybY/Jr0T0GTCnYjKqmdwxyxn2BQf2RcQIIvex5QldPI=
github.com/xuri/excelize/v2 v2.9.0 h1:1tgOaEq92IOEumR1/JfYS/eR0KHOCsRv/rYXXh6YJQE=
Expand All @@ -276,13 +286,17 @@ go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZY
go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
gocloud.dev v0.45.0 h1:WknIK8IbRdmynDvara3Q7G6wQhmEiOGwpgJufbM39sY=
gocloud.dev v0.45.0/go.mod h1:0kXKmkCLG6d31N7NyLZWzt7jDSQura9zD/mWgiB6THI=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200115085410-6d4e4cb37c7d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q=
golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4=
golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 h1:kx6Ds3MlpiUHKj7syVnbp57++8WpuKPcR5yjLBjvLEA=
golang.org/x/exp v0.0.0-20240823005443-9b4947da3948/go.mod h1:akd2r19cwCdwSwWeIdzYQGa/EZZyqcOdwWiwj5L5eKQ=
golang.org/x/image v0.27.0 h1:C8gA4oWU/tKkdCfYT6T2u4faJu3MeNS5O8UPWlPF61w=
golang.org/x/image v0.27.0/go.mod h1:xbdrClrAUway1MUTEZDq9mz/UpRwYAkFFNUslZtcB+g=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down
27 changes: 2 additions & 25 deletions api/internal/cloud/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ type APIServices struct {
MediaBucket *blob.Bucket
// ReportTaskPub publishes report jobs to a queue that are sent to an async PDF report renderer
ReportTaskPub *pubsub.Topic
// AlertEventBatcherPub publishes alert event messages to an in-memory queue
AlertEventBatcherPub *pubsub.Topic
// AlertEventBatcherSub subscribes to the AlertEventBatcherPub topic to batch messages for PGQueue
AlertEventBatcherSub *pubsub.Subscription
}

// NewAPIServices creates a new ApiServices instance
Expand All @@ -36,22 +32,9 @@ func NewAPIServices(ctx context.Context, cfg *config.APIConfig) (*APIServices, e
return nil, err
}

mq := "mem://task-batch-send"
alertEventBatcherPub, err := pubsub.OpenTopic(ctx, mq)
if err != nil {
return nil, err
}

alertEventBatcherSub, err := pubsub.OpenSubscription(ctx, mq)
if err != nil {
return nil, err
}

return &APIServices{
MediaBucket: mediaBucket,
ReportTaskPub: reportTaskPub,
AlertEventBatcherPub: alertEventBatcherPub,
AlertEventBatcherSub: alertEventBatcherSub,
MediaBucket: mediaBucket,
ReportTaskPub: reportTaskPub,
}, nil
}

Expand All @@ -64,11 +47,5 @@ func (s *APIServices) Shutdown(ctx context.Context) error {
if a := s.ReportTaskPub; a != nil {
errs = append(errs, a.Shutdown(ctx))
}
if a := s.AlertEventBatcherPub; a != nil {
errs = append(errs, a.Shutdown(ctx))
}
if a := s.AlertEventBatcherSub; a != nil {
errs = append(errs, a.Shutdown(ctx))
}
return errors.Join(errs...)
}
Loading
Loading