Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,21 @@ Create, render, and publish data visualizations from notebooks or the in-browser

Combine visualizations into **drag-and-drop dashboards** with resizable panels, lock/unlock layout, and persistent configuration. Each visualization also has a full **in-browser editor** with Monaco, live preview for JSON backends, template insertion, and data/config tabs. See the [Visualizations Guide](docs/VISUALIZATIONS.md) for SDK usage.

#### Individual Visualization Editor
<p align="center">
<img src="docs/screenshots/oms-screenshot3.png" alt="OpenModelStudio Visualization Framework" width="100%" />
</p>

<p align="center">
<img src="docs/screenshots/oms-screenshot4.png" alt="OpenModelStudio Visualization Framework" width="100%" />
</p>

#### Dashboard

<p align="center">
<img src="docs/screenshots/oms-screenshot5.png" alt="OpenModelStudio Visualization Framework" width="100%" />
</p>

### Model Registry

Browse, install, and manage models from the [Open Model Registry](https://github.com/GACWR/open-model-registry) -- a public GitHub repo that acts as a decentralized model package manager.
Expand Down
28 changes: 25 additions & 3 deletions api/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ password-hash = "0.5"
sha2 = "0.10"
hex = "0.4"
rustls = { version = "0.23", default-features = false, features = ["ring"] }
csv = "1"

[dev-dependencies]
tower = { version = "0.5", features = ["util"] }
Expand Down
2 changes: 1 addition & 1 deletion api/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub fn create_access_token(
email: email.to_string(),
role,
iat: now.timestamp(),
exp: (now + Duration::minutes(15)).timestamp(),
exp: (now + Duration::hours(24)).timestamp(),
token_type: "access".into(),
};
encode(
Expand Down
10 changes: 8 additions & 2 deletions api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,13 @@ async fn main() {
let llm = Arc::new(LlmService::new(&config));

let k8s = match K8sService::new(&config).await {
Ok(svc) => Some(Arc::new(svc)),
Ok(svc) => {
tracing::info!("K8s service initialized successfully (namespace: {})", config.k8s_namespace);
Some(Arc::new(svc))
}
Err(e) => {
tracing::warn!("K8s client not available: {e}. Running without K8s integration.");
tracing::error!("K8s service initialization FAILED: {e}");
tracing::error!("Training jobs and workspace pods will NOT work until K8s is properly configured");
None
}
};
Expand Down Expand Up @@ -105,6 +109,7 @@ async fn main() {
.route("/models/{id}/code", put(routes::models::update_code))
.route("/models/{id}/run", post(routes::models::run_model))
.route("/models/{id}/versions", get(routes::models::list_versions))
.route("/models/{id}/experiment-runs", get(routes::models::experiment_runs))
// Training
.route("/training/jobs", get(routes::training::list_all_jobs))
.route("/training/start", post(routes::training::start))
Expand All @@ -130,6 +135,7 @@ async fn main() {
.route("/experiments/{id}/compare", get(routes::experiments::compare))
// Artifacts
.route("/jobs/{job_id}/artifacts", get(routes::artifacts::list))
.route("/models/{model_id}/artifacts", get(routes::artifacts::list_for_model))
.route("/artifacts", post(routes::artifacts::create))
.route("/artifacts/{id}", get(routes::artifacts::get))
.route("/artifacts/{id}", delete(routes::artifacts::delete))
Expand Down
3 changes: 2 additions & 1 deletion api/src/models/artifact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct Artifact {
pub id: Uuid,
pub job_id: Uuid,
pub job_id: Option<Uuid>,
pub workspace_id: Option<Uuid>,
pub name: String,
pub artifact_type: String,
pub s3_key: String,
Expand Down
3 changes: 2 additions & 1 deletion api/src/models/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ pub struct Dataset {
pub row_count: Option<i64>,
pub version: i32,
pub created_by: Uuid,
pub snapshots: i32,
pub schema: Option<serde_json::Value>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub snapshots: i32,
}

#[derive(Debug, Deserialize)]
Expand Down
6 changes: 4 additions & 2 deletions api/src/models/experiment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ pub struct Experiment {
pub struct ExperimentRun {
pub id: Uuid,
pub experiment_id: Uuid,
pub job_id: Uuid,
pub job_id: Option<Uuid>,
pub model_id: Option<Uuid>,
pub parameters: Option<serde_json::Value>,
pub metrics: Option<serde_json::Value>,
pub created_at: DateTime<Utc>,
Expand All @@ -34,7 +35,8 @@ pub struct CreateExperimentRequest {

#[derive(Debug, Deserialize)]
pub struct AddRunRequest {
pub job_id: Uuid,
pub job_id: Option<Uuid>,
pub model_id: Option<Uuid>,
pub parameters: Option<serde_json::Value>,
pub metrics: Option<serde_json::Value>,
}
18 changes: 18 additions & 0 deletions api/src/routes/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,24 @@ pub async fn create(
Ok(Json(artifact))
}

/// List all artifacts for a model (via its jobs)
pub async fn list_for_model(
State(state): State<AppState>,
AuthUser(_claims): AuthUser,
Path(model_id): Path<Uuid>,
) -> AppResult<Json<Vec<Artifact>>> {
let artifacts: Vec<Artifact> = sqlx::query_as(
"SELECT a.* FROM artifacts a
JOIN jobs j ON a.job_id = j.id
WHERE j.model_id = $1
ORDER BY a.created_at DESC"
)
.bind(model_id)
.fetch_all(&state.db)
.await?;
Ok(Json(artifacts))
}

pub async fn download(
State(state): State<AppState>,
AuthUser(_claims): AuthUser,
Expand Down
131 changes: 124 additions & 7 deletions api/src/routes/datasets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,119 @@ pub async fn get(
AuthUser(_claims): AuthUser,
Path(id): Path<Uuid>,
) -> AppResult<Json<Dataset>> {
let dataset: Dataset = sqlx::query_as("SELECT * FROM datasets WHERE id = $1")
let mut dataset: Dataset = sqlx::query_as("SELECT * FROM datasets WHERE id = $1")
.bind(id)
.fetch_one(&state.db)
.await?;

// Lazy backfill: if schema is missing but we have a stored CSV file, extract it now
if dataset.schema.is_none() && dataset.format.eq_ignore_ascii_case("csv") {
if let Some(ref key) = dataset.s3_key {
let path = key.strip_prefix("local:").unwrap_or(key);
if let Ok(bytes) = std::fs::read(path) {
if let Some((schema, row_count)) = extract_csv_schema(&bytes) {
let _ = sqlx::query(
"UPDATE datasets SET schema = $1, row_count = COALESCE(row_count, $2), updated_at = NOW() WHERE id = $3"
)
.bind(&schema)
.bind(row_count)
.bind(dataset.id)
.execute(&state.db)
.await;
dataset.schema = Some(schema);
if dataset.row_count.is_none() {
dataset.row_count = Some(row_count);
}
}
}
}
}

Ok(Json(dataset))
}

/// Infer the type of a CSV cell value by attempting numeric/bool parsing.
fn infer_cell_type(val: &str) -> &'static str {
if val.is_empty() {
return "string";
}
if val.parse::<i64>().is_ok() {
return "int64";
}
if val.parse::<f64>().is_ok() {
return "float64";
}
if val.eq_ignore_ascii_case("true") || val.eq_ignore_ascii_case("false") {
return "boolean";
}
"string"
}

/// Parse a CSV byte slice and return (schema JSON, row_count).
fn extract_csv_schema(bytes: &[u8]) -> Option<(serde_json::Value, i64)> {
let mut rdr = csv::ReaderBuilder::new()
.has_headers(true)
.from_reader(bytes);

let headers = rdr.headers().ok()?.clone();
if headers.is_empty() {
return None;
}

let num_cols = headers.len();
// Track best type per column: start with unknown, refine by sampling rows
let mut col_types: Vec<Option<&'static str>> = vec![None; num_cols];
let mut row_count: i64 = 0;
let sample_limit = 100; // sample first 100 rows for type inference

for result in rdr.records() {
let record = match result {
Ok(r) => r,
Err(_) => continue,
};
row_count += 1;

if row_count <= sample_limit {
for (i, field) in record.iter().enumerate() {
if i >= num_cols {
break;
}
let cell_type = infer_cell_type(field.trim());
col_types[i] = Some(match col_types[i] {
None => cell_type,
Some(prev) => {
if prev == cell_type {
prev
} else if (prev == "int64" && cell_type == "float64")
|| (prev == "float64" && cell_type == "int64")
{
"float64" // promote int ↔ float
} else {
"string" // fall back to string on conflict
}
}
});
}
}
}
// Count remaining rows after sampling
// (rdr already consumed all records in the loop above)

let columns: Vec<serde_json::Value> = headers
.iter()
.enumerate()
.map(|(i, name)| {
serde_json::json!({
"name": name,
"type": col_types.get(i).and_then(|t| *t).unwrap_or("string"),
"nullable": true
})
})
.collect();

Some((serde_json::Value::Array(columns), row_count))
}

pub async fn create(
State(state): State<AppState>,
AuthUser(claims): AuthUser,
Expand All @@ -64,7 +170,7 @@ pub async fn create(
let dataset_id = Uuid::new_v4();

// If file data is provided (base64), store it to local PVC
let (s3_key, size_bytes) = if let Some(ref data_b64) = req.data {
let (s3_key, size_bytes, inferred_schema, inferred_row_count) = if let Some(ref data_b64) = req.data {
use base64::Engine;
let bytes = base64::engine::general_purpose::STANDARD
.decode(data_b64)
Expand All @@ -80,14 +186,24 @@ pub async fn create(
std::fs::write(&file_path, &bytes)
.map_err(|e| AppError::Internal(format!("Failed to write file: {e}")))?;

(Some(format!("local:{}", file_path)), Some(size))
// Extract schema from CSV files
let (schema, row_count) = if ext == "csv" {
extract_csv_schema(&bytes).unwrap_or((serde_json::Value::Null, 0))
} else {
(serde_json::Value::Null, 0)
};

let schema_opt = if schema.is_null() { None } else { Some(schema) };
let row_count_opt = if row_count > 0 { Some(row_count) } else { req.row_count };

(Some(format!("local:{}", file_path)), Some(size), schema_opt, row_count_opt)
} else {
(None, None)
(None, None, None, req.row_count)
};

let dataset: Dataset = sqlx::query_as(
"INSERT INTO datasets (id, project_id, name, description, format, s3_key, size_bytes, row_count, version, created_by, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 1, $9, NOW(), NOW()) RETURNING *"
"INSERT INTO datasets (id, project_id, name, description, format, s3_key, size_bytes, row_count, version, created_by, schema, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 1, $9, $10, NOW(), NOW()) RETURNING *"
)
.bind(dataset_id)
.bind(req.project_id)
Expand All @@ -96,8 +212,9 @@ pub async fn create(
.bind(&req.format)
.bind(&s3_key)
.bind(size_bytes)
.bind(req.row_count)
.bind(inferred_row_count)
.bind(claims.sub)
.bind(&inferred_schema)
.fetch_one(&state.db)
.await?;
notify(&state.db, claims.sub, "Dataset Created", &format!("Dataset '{}' ({}) uploaded", dataset.name, dataset.format), NotifyType::Success, Some(&format!("/datasets/{}", dataset.id))).await;
Expand Down
5 changes: 3 additions & 2 deletions api/src/routes/experiments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,13 @@ pub async fn add_run(
Json(req): Json<AddRunRequest>,
) -> AppResult<Json<ExperimentRun>> {
let run: ExperimentRun = sqlx::query_as(
"INSERT INTO experiment_runs (id, experiment_id, job_id, parameters, metrics, created_at)
VALUES ($1, $2, $3, $4, $5, NOW()) RETURNING *"
"INSERT INTO experiment_runs (id, experiment_id, job_id, model_id, parameters, metrics, created_at)
VALUES ($1, $2, $3, $4, $5, $6, NOW()) RETURNING *"
)
.bind(Uuid::new_v4())
.bind(experiment_id)
.bind(req.job_id)
.bind(req.model_id)
.bind(&req.parameters)
.bind(&req.metrics)
.fetch_one(&state.db)
Expand Down
Loading
Loading