Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ae554e7
datadeps: Fix KeyError on key ainfo in AliasedObjectCache
jpsamaroo Feb 4, 2026
ce75198
fixup! datadeps: span_end must be inclusive
jpsamaroo Feb 12, 2026
bec6280
Add linear algebra functions for matrix inversion 'inv' and triangula…
AkhilAkkapelli Aug 10, 2025
9a6b097
TEMP: Add demo/trace stuff
jpsamaroo Aug 14, 2025
5479ae5
TEMP: Attempt to implement apply_ipiv_rows
jpsamaroo Aug 19, 2025
e398e83
DaggerMPI: Initial implementation
Jan 10, 2025
2f72b1e
linalg: Make norm2 work for non-matrices
jpsamaroo Sep 4, 2025
4b1b185
TEMP DArray: Add GMRES
jpsamaroo Sep 4, 2025
e34a363
sparse array in-place send/recv
yanzin00 Jun 17, 2025
353335d
fixup! sparse array in-place send/recv
jpsamaroo Oct 4, 2025
2ac8cb3
DArray: Restrict copyto! scope to destination
yanzin00 Sep 29, 2025
77003cf
Add opcounter debugging tool
yanzin00 Sep 29, 2025
f2ff6f2
Add largest value tracker tool
yanzin00 Oct 4, 2025
5ffeb88
MPI: Make check_uniform more useful
yanzin00 Sep 29, 2025
85e0b80
MPI: Optimizations and fix some uniformity issues
yanzin00 Sep 29, 2025
e8e7eb8
MPI: de-hashing the tags based on uniformity
fda-tome Oct 30, 2025
fc1ae09
MPI: final re-work of tags
fda-tome Nov 10, 2025
4f97dc2
MPI: recv pool+queue, malloc send buffers, MemPool completion queue, …
Feb 27, 2026
cc700c6
Revert to fc1ae09c, keep Sch.jl changes
Mar 4, 2026
f8f5756
WIP: MPI works
Mar 5, 2026
bc2f7d7
MPI: Fixing non-uniformity in dict-key iteration
fda-tome Mar 5, 2026
d0b0c71
MPI benchmarks and matmul correctness: Float32, 10k, per-block check
fda-tome Mar 12, 2026
989349d
WIP: MPI inference
fda-tome Mar 12, 2026
0207022
Removing a faulty log file
fda-tome Mar 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions LocalPreferences.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# When using system MPI, run once in the environment where you run MPI jobs (with MPI module loaded):
# julia --project=Dagger.jl -e 'using MPIPreferences; MPIPreferences.use_system_binary()'
# That populates abi, libmpi, mpiexec and avoids "Unknown MPI ABI nothing".
[MPIPreferences]
_format = "1.0"
abi = "MPICH"
binary = "system"
libmpi = "libmpi"
mpiexec = "mpiexec"
preloads = []
2 changes: 2 additions & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Graphs = "86223c79-3864-5bf0-83f7-82e725a168b6"
KernelAbstractions = "63c18a36-062a-441e-b654-da1e3ab1ce7c"
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
MacroTools = "1914dd2f-81c6-5fcd-8719-6d5c9610ff09"
MPI = "da04e1cc-30fd-572f-bb4f-1f8673147195"
MemPool = "f9f48841-c794-520a-933b-121f7ba6ed94"
NextLA = "d37ed344-79c4-486d-9307-6d11355a15a3"
OnlineStats = "a15396b6-48d5-5d58-9928-6d29437db91e"
Expand Down Expand Up @@ -77,6 +78,7 @@ Graphs = "1"
JSON3 = "1"
KernelAbstractions = "0.9"
MacroTools = "0.5"
MPI = "0.20.22"
MemPool = "0.4.12"
Metal = "1.1"
NextLA = "0.2.2"
Expand Down
111 changes: 111 additions & 0 deletions benchmarks/check_comm_asymmetry.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#!/usr/bin/env julia
# Parse MPI+Dagger logs and report communication decision asymmetry per tag.
# Asymmetry: for the same tag, one rank decides to send (local+bcast, sender+communicated, etc.)
# and another rank decides to infer (inferred, uninvolved) and never recv → deadlock.
#
# Usage: julia check_comm_asymmetry.jl < logfile
# Or: mpiexec -n 10 julia ... run_matmul.jl 2>&1 | tee matmul.log; julia check_comm_asymmetry.jl < matmul.log

const SEND_DECISIONS = Set([
"local+bcast", "sender+communicated", "sender+inferred", "receiver+bcast",
"aliasing", # when followed by local+bcast we already capture local+bcast
])
const RECV_DECISIONS = Set([
"communicated", "receiver", "sender+communicated", # received data
])
const INFER_DECISIONS = Set([
"inferred", "uninvolved", # did not recv (uses inferred type)
])

function parse_line(line)
# Match [rank X][tag Y] then any [...] and capture the last bracket pair before space or end
rank = nothing
tag = nothing
decision = nothing
category = nothing # aliasing, execute!, remotecall_endpoint
for m in eachmatch(r"\[rank\s+(\d+)\]", line)
rank = parse(Int, m.captures[1])
end
for m in eachmatch(r"\[tag\s+(\d+)\]", line)
tag = parse(Int, m.captures[1])
end
for m in eachmatch(r"\[(execute!|aliasing|remotecall_endpoint)\]", line)
category = m.captures[1]
end
# Decision is usually in last [...] that looks like [word] or [word+word]
for m in eachmatch(r"\]\[([^\]]+)\]", line)
candidate = m.captures[1]
# Normalize: "communicated" "inferred" "local+bcast" "sender+inferred" "receiver" etc.
if occursin("inferred", candidate) && !occursin("communicated", candidate)
decision = "inferred"
break
elseif occursin("communicated", candidate)
decision = "communicated"
break
elseif occursin("local+bcast", candidate)
decision = "local+bcast"
break
elseif occursin("sender+", candidate)
decision = startswith(candidate, "sender+inferred") ? "sender+inferred" : "sender+communicated"
break
elseif candidate == "receiver"
decision = "receiver"
break
elseif candidate == "receiver+bcast"
decision = "receiver+bcast"
break
elseif candidate == "inplace_move"
decision = "inplace_move"
break
end
end
return rank, tag, category, decision
end

function main()
# tag => Dict(rank => decision)
by_tag = Dict{Int, Dict{Int, String}}()
for line in eachline(stdin)
rank, tag, category, decision = parse_line(line)
isnothing(rank) && continue
isnothing(tag) && continue
isnothing(decision) && continue
if !haskey(by_tag, tag)
by_tag[tag] = Dict{Int, String}()
end
by_tag[tag][rank] = decision
end

# For each tag, check: is there at least one sender and one inferrer (non-receiver)?
send_keys = Set(["local+bcast", "sender+communicated", "sender+inferred", "receiver+bcast"])
infer_keys = Set(["inferred", "sender+inferred"]) # sender+inferred means sender didn't need to recv
recv_keys = Set(["communicated", "receiver", "sender+communicated"])

asymmetries = []
for (tag, ranks) in sort(collect(by_tag), by = first)
senders = [r for (r, d) in ranks if d in send_keys]
inferrers = [r for (r, d) in ranks if d in infer_keys || d == "uninvolved"]
receivers = [r for (r, d) in ranks if d in recv_keys]
# Asymmetry: someone sends (bcast) so will send to ALL other ranks; someone chose infer and won't recv.
if !isempty(senders) && !isempty(inferrers)
push!(asymmetries, (tag, senders, inferrers, receivers, ranks))
end
end

if isempty(asymmetries)
println("No communication decision asymmetry found (no tag has both sender and inferrer).")
return
end

println("=== Communication decision asymmetry (can cause deadlock) ===\n")
for (tag, senders, inferrers, receivers, ranks) in asymmetries
println("Tag $tag:")
println(" Senders (will bcast to all others): $senders")
println(" Inferrers (did not recv): $inferrers")
println(" Receivers: $receivers")
println(" All decisions: $ranks")
println()
end
end

main()
97 changes: 97 additions & 0 deletions benchmarks/check_comm_asymmetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#!/usr/bin/env python3
"""
Parse MPI+Dagger logs and report communication decision asymmetry per tag.
Asymmetry: for the same tag, one rank decides to send (local+bcast, etc.)
and another decides to infer (inferred) and never recv → deadlock.

Usage:
# Capture full log (all ranks' Core.println from mpi.jl go to stdout):
mpiexec -n 10 julia --project=/path/to/Dagger.jl benchmarks/run_matmul.jl 2>&1 | tee matmul.log
# Then look for asymmetry (same tag: one rank sends, another infers → deadlock):
python3 check_comm_asymmetry.py < matmul.log
"""

import re
import sys
from collections import defaultdict

SEND_DECISIONS = {"local+bcast", "sender+communicated", "sender+inferred", "receiver+bcast"}
RECV_DECISIONS = {"communicated", "receiver", "sender+communicated"}
INFER_DECISIONS = {"inferred", "uninvolved", "sender+inferred"}


def parse_line(line: str):
rank = tag = category = decision = None
m = re.search(r"\[rank\s+(\d+)\]", line)
if m:
rank = int(m.group(1))
m = re.search(r"\[tag\s+(\d+)\]", line)
if m:
tag = int(m.group(1))
m = re.search(r"\[(execute!|aliasing|remotecall_endpoint)\]", line)
if m:
category = m.group(1)
# Capture decision from [...] blocks
for m in re.finditer(r"\]\[([^\]]+)\]", line):
candidate = m.group(1)
if "inferred" in candidate and "communicated" not in candidate:
decision = "inferred"
break
if "communicated" in candidate:
decision = "communicated"
break
if "local+bcast" in candidate:
decision = "local+bcast"
break
if candidate.startswith("sender+"):
decision = "sender+inferred" if "inferred" in candidate else "sender+communicated"
break
if candidate == "receiver":
decision = "receiver"
break
if candidate == "receiver+bcast":
decision = "receiver+bcast"
break
if candidate == "inplace_move":
decision = "inplace_move"
break
return rank, tag, category, decision


def main():
by_tag = defaultdict(dict) # tag -> {rank: decision}
for line in sys.stdin:
rank, tag, category, decision = parse_line(line)
if rank is None or tag is None or decision is None:
continue
by_tag[tag][rank] = decision

send_keys = {"local+bcast", "sender+communicated", "sender+inferred", "receiver+bcast"}
infer_keys = {"inferred", "sender+inferred", "uninvolved"}
recv_keys = {"communicated", "receiver", "sender+communicated"}

asymmetries = []
for tag in sorted(by_tag.keys()):
ranks = by_tag[tag]
senders = [r for r, d in ranks.items() if d in send_keys]
inferrers = [r for r, d in ranks.items() if d in infer_keys]
receivers = [r for r, d in ranks.items() if d in recv_keys]
if senders and inferrers:
asymmetries.append((tag, senders, inferrers, receivers, ranks))

if not asymmetries:
print("No communication decision asymmetry found (no tag has both sender and inferrer).")
return

print("=== Communication decision asymmetry (can cause deadlock) ===\n")
for tag, senders, inferrers, receivers, ranks in asymmetries:
print(f"Tag {tag}:")
print(f" Senders (will bcast to all others): {senders}")
print(f" Inferrers (did not recv): {inferrers}")
print(f" Receivers: {receivers}")
print(f" All decisions: {dict(ranks)}")
print()


if __name__ == "__main__":
main()
42 changes: 42 additions & 0 deletions benchmarks/run_distribute_fetch.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/usr/bin/env julia
# Create a matrix with a fixed reproducible pattern, distribute it with an
# MPI procgrid, then on each rank fetch and println the chunk(s) it owns.
# Usage (from repo root, use full path to Dagger.jl):
# mpiexec -n 4 julia --project=/path/to/Dagger.jl benchmarks/run_distribute_fetch.jl

using MPI
using Dagger

if !isdefined(Dagger, :accelerate!)
error("Dagger.accelerate! not found. Run with the local Dagger project: julia --project=/path/to/Dagger.jl ...")
end
Dagger.accelerate!(:mpi)

const comm = MPI.COMM_WORLD
const rank = MPI.Comm_rank(comm)
const nranks = MPI.Comm_size(comm)

# Fixed reproducible pattern: 6×6 matrix, M[i,j] = 10*i + j (same on all ranks)
const N = 6
const BLOCK = 2
A = [10 * i + j for i in 1:N, j in 1:N]

# Procgrid: use Dagger's compatible processors so the procgrid passes validation
availprocs = collect(Dagger.compatible_processors())
nblocks = (cld(N, BLOCK), cld(N, BLOCK))
procgrid = reshape(
[availprocs[mod(i - 1, length(availprocs)) + 1] for i in 1:prod(nblocks)],
nblocks,
)

# Distribute so chunk (i,j) is computed on procgrid[i,j]
D = distribute(A, Blocks(BLOCK, BLOCK), procgrid)
D_fetched = fetch(D)

# On each rank: fetch and print only the chunk(s) this rank owns
for (idx, ch) in enumerate(D_fetched.chunks)
if ch isa Dagger.Chunk && ch.handle isa Dagger.MPIRef && ch.handle.rank == rank
data = fetch(ch)
println("rank $rank chunk $idx: ", data)
end
end
105 changes: 105 additions & 0 deletions benchmarks/run_matmul.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#!/usr/bin/env julia
# N×N matmul benchmark (Float32); block size scales with number of ranks.
# Usage (use the full path to Dagger.jl, not "..."):
# mpiexec -n 10 julia --project=/home/felipetome/dagger-dev/mpi/Dagger.jl benchmarks/run_matmul.jl
# Set CHECK_CORRECTNESS=true to collect and compare against GPU baseline:
# CHECK_CORRECTNESS=true mpiexec -n 10 julia --project=/home/felipetome/dagger-dev/mpi/Dagger.jl benchmarks/run_matmul.jl

using MPI
using Dagger
using LinearAlgebra

if !isdefined(Dagger, :accelerate!)
error("Dagger.accelerate! not found. Run with the local Dagger project: julia --project=/path/to/Dagger.jl ...")
end
Dagger.accelerate!(:mpi)

const N = 2_000
const comm = MPI.COMM_WORLD
const rank = MPI.Comm_rank(comm)
const nranks = MPI.Comm_size(comm)
# Block size proportional to ranks: ~nranks blocks in 2D => side blocks ≈ √nranks
const BLOCK = max(1, ceil(Int, N / ceil(Int, sqrt(nranks))))

const CHECK_CORRECTNESS = parse(Bool, get(ENV, "CHECK_CORRECTNESS", "false"))

if rank == 0
println("Benchmark: ", nranks, " ranks, N=", N, ", block size ", BLOCK, "×", BLOCK, " (matmul)")
end

# Allocate and fill matrices in blocks (Float32)
A = rand(Blocks(BLOCK, BLOCK), Float32, N, N)
B = rand(Blocks(BLOCK, BLOCK), Float32, N, N)

# Matrix multiply C = A * B
t_matmul = @elapsed begin
C = A * B
end

if rank == 0
println("Matmul time: ", round(t_matmul; digits=4), " s")
end

# Optional: collect via datadeps (root=0). All ranks participate in the datadeps region.
if CHECK_CORRECTNESS
t_collect = @elapsed begin
A_full = Dagger.collect_datadeps(A; root=0)
B_full = Dagger.collect_datadeps(B; root=0)
C_dagger = Dagger.collect_datadeps(C; root=0)
end
if rank == 0
println("Collecting result and computing baseline for correctness check (GPU)...")
using CUDA
CUDA.functional() || error("CUDA not functional; cannot compute GPU baseline. Check CUDA driver and device.")
t_upload = @elapsed begin
A_g = CUDA.cu(A_full)
B_g = CUDA.cu(B_full)
end
println("Collect + upload time: ", round(t_collect + t_upload; digits=4), " s")

t_baseline = @elapsed begin
C_ref_g = A_g * B_g
end
println("Baseline (GPU/CUDA) time: ", round(t_baseline; digits=4), " s")

# Require all elements within 100× machine epsilon relative error (componentwise)
C_dagger_cpu = C_dagger
C_ref_cpu = Array(C_ref_g)
eps_f = eps(Float32)
rtol = 50.0f0 * eps_f
diff = C_dagger_cpu .- C_ref_cpu
# rel_ij = |diff|/|C_ref|, denominator at least eps to avoid div by zero
denom = max.(abs.(C_ref_cpu), eps_f)
rel_err = abs.(diff) ./ denom
max_rel_err = Float32(maximum(rel_err))
ok = max_rel_err <= rtol
if ok
println("Correctness: OK (max rel_err = ", max_rel_err, " <= 100×eps = ", rtol, ")")
else
println("Correctness: FAIL (max rel_err = ", max_rel_err, " > 100×eps = ", rtol, ")")
end

# Per-block: which blocks have any element with rel_err > 100×eps
n_bi = ceil(Int, N / BLOCK)
n_bj = ceil(Int, N / BLOCK)
bad_blocks = Tuple{Int,Int,Float32}[]
for bi in 1:n_bi, bj in 1:n_bj
ri = (bi - 1) * BLOCK + 1 : min(bi * BLOCK, N)
rj = (bj - 1) * BLOCK + 1 : min(bj * BLOCK, N)
block_rel = Float32(maximum(@view(rel_err[ri, rj])))
if block_rel > rtol
push!(bad_blocks, (bi, bj, block_rel))
end
end
if isempty(bad_blocks)
println("Per-block: all ", n_bi * n_bj, " blocks within 100×eps rel_err.")
else
println("Per-block: ", length(bad_blocks), " block(s) exceed 100×eps rel_err (block size ", BLOCK, "×", BLOCK, "):")
sort!(bad_blocks; by = x -> -x[3])
for (bi, bj, block_rel) in bad_blocks
println(" block [", bi, ",", bj, "] rows ", (bi - 1) * BLOCK + 1, ":", min(bi * BLOCK, N),
", cols ", (bj - 1) * BLOCK + 1, ":", min(bj * BLOCK, N), " max rel_err = ", block_rel)
end
end
end
end
Loading
Loading