Agent-Daemon Communication
This document provides a comprehensive guide to the communication architecture between nebulad (the control plane daemon) and nebula-agent (the compute agent running on GPU nodes).
Overview
Nebula uses a two-plane architecture where the control plane (nebulad) orchestrates compute agents (nebula-agent) distributed across GPU nodes.
Architecture Diagram
┌─────────────────────────────────────────────────────────────────────────┐
│ CONTROL PLANE (nebulad) │
│ │
│ ┌─────────────────────┐ ┌──────────────────┐ ┌──────────────────┐ │
│ │ gRPC Server :9090 │ │ HTTP Gateway │ │ SQLite Store │ │
│ │ (PlatformService) │ │ :8080 │ │ (nebula.db) │ │
│ │ │ │ │ │ │ │
│ │ • Register │ │ • /v1/chat │ │ • nodes │ │
│ │ • Heartbeat │ │ • /v1/models │ │ • deployments │ │
│ │ • Deregister │ │ • /health │ │ │ │
│ └─────────┬───────────┘ └────────┬─────────┘ └──────────────────┘ │
│ │ │ │
│ ┌─────────┴───────────────────────┴─────────────────────────────────┐ │
│ │ Services Layer │ │
│ │ ┌────────────────┐ ┌─────────────────┐ ┌────────────────────┐ │ │
│ │ │ Registry Svc │ │ Deployment Svc │ │ Gateway Router │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ • Agent state │ │ • Deploy models │ │ • Route requests │ │ │
│ │ │ • Heartbeats │ │ • Token gen │ │ • Runtime adapters │ │ │
│ │ │ • Offline det. │ │ • Scheduling │ │ │ │ │
│ │ └────────────────┘ └─────────────────┘ └────────────────────┘ │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ External Integrations │ │
│ │ ┌────────────────┐ ┌─────────────────────────┐ │ │
│ │ │ Caddy Client │ │ Route53 Client │ │ │
│ │ │ (TLS + Proxy) │ │ (DNS management) │ │ │
│ │ └────────────────┘ └─────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
└───────────────────────────────────┬─────────────────────────────────────┘
│
gRPC (bidirectional)
│
┌───────────────────────────────────┴─────────────────────────────────────┐
│ COMPUTE PLANE (nebula-agent) │
│ │
│ ┌─────────────────────┐ ┌──────────────────┐ ┌──────────────────┐ │
│ │ gRPC Server :9091 │ │ Auth Proxy │ │ SQLite Store │ │
│ │ (AgentService) │ │ :8888 │ │ (agent.db) │ │
│ │ │ │ │ │ │ │
│ │ • GetCapabilities │ │ • Token valid. │ │ • deployments │ │
│ │ • StartRuntime │ │ • Request proxy │ │ • events │ │
│ │ • StopRuntime │ │ • Multi-deploy │ │ • port_alloc │ │
│ │ • GetStats │ │ │ │ │ │
│ │ • +12 more methods │ │ │ │ │ │
│ └─────────────────────┘ └────────┬─────────┘ └──────────────────┘ │
│ │ │
│ ┌─────────────────────────────────┴───────────────────────────────┐ │
│ │ Runtime Containers │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ vLLM │ │ TGI │ │ Ollama │ │ Triton │ │ │
│ │ │ :30000 │ │ :30001 │ │ :30002 │ │ :30003 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Agent Components │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌───────────────────────┐ │ │
│ │ │ Registrar │ │ Docker │ │ GPU Monitor │ │ │
│ │ │ (Platform │ │ Manager │ │ (NVML) │ │ │
│ │ │ client) │ │ │ │ │ │ │
│ │ └─────────────┘ └─────────────┘ └───────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
Port Summary
| Component | Port | Protocol | Purpose |
|---|---|---|---|
| nebulad gRPC | 9090 | gRPC | Agent registration, heartbeats |
| nebulad HTTP | 8080 | HTTP | OpenAI-compatible API gateway |
| nebula-agent gRPC | 9091 | gRPC | Deployment management |
| nebula-agent Auth Proxy | 8888 | HTTP | Token validation, request proxying |
| Runtime containers | 30000-31000 | HTTP | Model inference endpoints |
| Metrics (agent) | 9100 | HTTP | Prometheus metrics |
Communication Flow
Direction: Agent → Platform (Registration & Heartbeat)
┌──────────────┐ ┌──────────────┐
│ nebula-agent │ ─── Register ──────────── │ nebulad │
│ │ ─── Heartbeat (30s) ───── │ │
│ │ ─── Deregister ────────── │ │
└──────────────┘ └──────────────┘
Direction: Platform → Agent (Deployment Control)
┌──────────────┐ ┌──────────────┐
│ nebulad │ ─── StartRuntime ──────── │ nebula-agent │
│ │ ─── StopRuntime ───────── │ │
│ │ ─── GetStats ──────────── │ │
└──────────────┘ └──────────────┘
Direction: Client → Agent (via Gateway)
┌──────────────┐ ┌──────────────┐
│ Client │ ─── API Request ───────── │ Auth Proxy │
│ │ │ :8888 │
│ │ │ │ │
│ │ │ ▼ │
│ │ │ Runtime │
│ │ │ Container │
└──────────────┘ └──────────────┘
gRPC Protocols
Nebula defines two gRPC services for inter-component communication.
PlatformService (platform.proto)
Location: platform/api/grpc/platform.proto
This service is implemented by nebulad and called by nebula-agent for registration and heartbeat.
service PlatformService {
// Register a new agent or re-register an existing one
rpc Register(RegisterRequest) returns (RegisterResponse);
// Send periodic status updates and receive pending commands
rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
// Gracefully remove an agent from the platform
rpc Deregister(DeregisterRequest) returns (DeregisterResponse);
}
RegisterRequest
message RegisterRequest {
string node_id = 1; // UUID, auto-generated if empty or "auto"
string node_name = 2; // Human-readable node name
string version = 3; // Agent version
string agent_address = 4; // host:port for platform to call back
NodeCapabilities capabilities = 5;
repeated AgentDeploymentStatus deployments = 6;
}
message NodeCapabilities {
repeated GPUInfo gpus = 1;
int64 total_memory_bytes = 2;
int64 available_memory_bytes = 3;
string os = 4; // "linux", "darwin"
string arch = 5; // "amd64", "arm64"
int32 cpu_cores = 6;
string cuda_version = 7;
string driver_version = 8;
repeated string supported_runtimes = 9; // ["vllm", "tgi", "ollama"]
}
RegisterResponse
message RegisterResponse {
bool success = 1;
string error = 2;
string assigned_node_id = 3; // Confirmed UUID
string gateway_subdomain = 4; // e.g., "abc123de.gateway.nebulactl.com"
int32 heartbeat_interval_seconds = 5; // Server-specified interval
PlatformConfig config = 6;
}
HeartbeatRequest
message HeartbeatRequest {
string node_id = 1;
AgentNodeStats stats = 2; // CPU, memory, GPU statistics
AgentHealthStatus health = 3; // healthy/degraded/unhealthy
repeated AgentDeploymentStatus deployments = 4;
int64 timestamp = 5; // Unix timestamp
}
message AgentNodeStats {
float cpu_usage_percent = 1;
int64 memory_used_bytes = 2;
int64 memory_total_bytes = 3;
repeated GPUStats gpu_stats = 4;
}
message GPUStats {
string uuid = 1;
string name = 2;
float utilization_percent = 3;
int64 memory_used_bytes = 4;
int64 memory_total_bytes = 5;
float temperature_celsius = 6;
float power_watts = 7;
}
HeartbeatResponse
message HeartbeatResponse {
bool acknowledged = 1;
PlatformConfig config = 2;
repeated PlatformCommand commands = 3; // Pending commands for agent
}
message PlatformCommand {
string id = 1;
string type = 2; // "start", "stop", "restart", "delete"
string target_id = 3; // Deployment ID
map<string, string> params = 4;
}
AgentService (agent.proto)
Location: platform/api/grpc/agent.proto
This service is implemented by nebula-agent and called by nebulad or nebulactl for deployment management.
service AgentService {
// Node information
rpc GetCapabilities(GetCapabilitiesRequest) returns (GetCapabilitiesResponse);
rpc GetStats(StatsRequest) returns (StatsResponse);
rpc GetHealth(HealthRequest) returns (HealthResponse);
rpc GetSystemInfo(GetSystemInfoRequest) returns (GetSystemInfoResponse);
// Model preparation
rpc PrepareModel(PrepareModelRequest) returns (PrepareModelResponse);
rpc StreamOllamaPull(OllamaPullRequest) returns (stream OllamaPullProgress);
rpc StreamHFDownload(HFDownloadRequest) returns (stream HFDownloadProgress);
// Runtime management
rpc StartRuntime(StartRuntimeRequest) returns (StartRuntimeResponse);
rpc StopRuntime(StopRuntimeRequest) returns (StopRuntimeResponse);
rpc RestartDeployment(RestartDeploymentRequest) returns (RestartDeploymentResponse);
// Deployment queries
rpc ListDeployments(ListDeploymentsRequest) returns (ListDeploymentsResponse);
rpc GetDeployment(GetDeploymentRequest) returns (GetDeploymentResponse);
rpc DeleteDeployment(DeleteDeploymentRequest) returns (DeleteDeploymentResponse);
// Logs and events
rpc GetDeploymentLogs(GetDeploymentLogsRequest) returns (GetDeploymentLogsResponse);
rpc GetDeploymentEvents(GetDeploymentEventsRequest) returns (GetDeploymentEventsResponse);
// Container operations
rpc ExecCommand(ExecCommandRequest) returns (ExecCommandResponse);
}
StartRuntimeRequest (Key Message)
This is the most important message for deployment creation:
message StartRuntimeRequest {
string deployment_id = 1; // UUID for this deployment
string runtime = 2; // "vllm", "tgi", "ollama", "triton"
string image = 3; // Docker image
string model = 4; // Model identifier
string model_path = 5; // hf://, ollama://, s3://, fs://
repeated string args = 6; // Runtime arguments
map<string, string> env = 7; // Environment variables
repeated int32 gpu_devices = 8; // GPU indices to use
int32 port = 9; // Preferred port (0 = auto-assign)
string access_token = 10; // Authentication token (nbtok_...)
bool is_public = 11; // If true, no auth required
}
StartRuntimeResponse
message StartRuntimeResponse {
bool success = 1;
string error = 2;
string container_id = 3;
int32 port = 4; // Assigned port
string gateway_endpoint = 5; // https://node-xxx.gateway.nebulactl.com
}
Registration Process
When nebula-agent starts, it registers with the control plane through the Registrar component.
Sequence Diagram
┌──────────────┐ ┌──────────────────┐ ┌──────────────┐
│ nebula-agent │ │ nebulad │ │ Caddy │
│ │ │ (Platform) │ │ │
└──────┬───────┘ └────────┬─────────┘ └──────┬───────┘
│ │ │
│ 1. RegisterRequest │ │
│ (NodeID, Capabilities, │ │
│ AgentAddress) │ │
│ ─────────────────────────►│ │
│ │ │
│ │ 2. Generate NodeID │
│ │ if empty/"auto" │
│ │ │
│ │ 3. Create gateway │
│ │ subdomain │
│ │ ─────────────────────────►│
│ │ │
│ │ 4. Configure reverse │
│ │ proxy route │
│ │ ◄─────────────────────────│
│ │ │
│ │ 5. Save node to SQLite │
│ │ │
│ 6. RegisterResponse │ │
│ (AssignedNodeID, │ │
│ GatewaySubdomain, │ │
│ HeartbeatInterval) │ │
│ ◄─────────────────────────│ │
│ │ │
│ 7. Start heartbeat loop │ │
│ │ │
▼ ▼ ▼
Implementation Details
Agent Side (Registrar)
File: compute/daemon/registrar.go
// Start initiates the registration and heartbeat loop
func (r *Registrar) Start(ctx context.Context) error {
// Connect to platform
if err := r.connect(ctx); err != nil {
return err
}
// Register with retry and exponential backoff
if err := r.registerWithRetry(ctx); err != nil {
return err
}
// Start heartbeat goroutine
go r.heartbeatLoop(ctx)
return nil
}
// register sends the registration request
func (r *Registrar) register(ctx context.Context) error {
capabilities, _ := r.agent.GetCapabilities(ctx)
deployments := r.agent.GetRunningDeployments()
req := &proto.RegisterRequest{
NodeId: r.config.NodeID,
NodeName: r.config.NodeName,
Version: r.agent.version,
AgentAddress: r.config.AgentAddress, // e.g., "192.168.1.100:9091"
Capabilities: convertToProtoCapabilities(capabilities),
Deployments: convertToProtoDeployments(deployments),
}
resp, err := r.client.Register(ctx, req)
if err != nil {
return err
}
// Store assigned values
r.nodeID = resp.AssignedNodeId
r.gatewaySubdomain = resp.GatewaySubdomain
r.heartbeatInterval = time.Duration(resp.HeartbeatIntervalSeconds) * time.Second
return nil
}
Platform Side (Registry Service)
File: platform/service/registry/service.go
// RegisterAgent processes a registration request from an agent
func (s *Service) RegisterAgent(ctx context.Context, req *proto.RegisterRequest) (*RegisterResult, error) {
// Generate NodeID if not provided
nodeID := req.NodeId
if nodeID == "" || nodeID == "auto" {
nodeID = uuid.New().String()
}
// Parse agent address
host, port := parseAddress(req.AgentAddress)
// Generate gateway subdomain (first 8 chars of UUID)
subdomain := route53.BuildSubdomain(nodeID, s.config.GatewayDomain)
// Example: "abc123de.gateway.nebulactl.com"
// Configure Caddy reverse proxy
if s.caddy != nil {
err := s.caddy.AddRoute(ctx, caddy.RouteConfig{
Subdomain: subdomain,
UpstreamHost: host,
UpstreamPort: 8888, // Auth proxy port on agent
EnableTLS: true,
})
if err != nil {
return nil, err
}
}
// Create DNS record (optional)
if s.route53 != nil && s.route53.IsEnabled() {
s.route53.CreateARecord(ctx, subdomain)
}
// Save to database
node := &domain.Node{
ID: nodeID,
Name: req.NodeName,
Host: host,
GRPCPort: port,
Status: domain.NodeStatusOnline,
GatewaySubdomain: subdomain,
}
s.store.CreateNode(ctx, node)
// Track in memory
s.agents[nodeID] = &AgentState{
NodeID: nodeID,
LastHeartbeat: time.Now(),
Status: "online",
AgentAddress: req.AgentAddress,
Capabilities: req.Capabilities,
}
return &RegisterResult{
NodeID: nodeID,
GatewaySubdomain: subdomain,
HeartbeatInterval: s.config.HeartbeatInterval,
}, nil
}
Heartbeat Mechanism
Agents send periodic heartbeats to report their status and receive pending commands.
Configuration
| Parameter | Default | Description |
|---|---|---|
| Heartbeat interval | 30 seconds | How often agent sends heartbeat |
| Offline timeout | 90 seconds | Time without heartbeat before marking offline |
| Max consecutive failures | 3 | Failures before attempting reconnection |
Heartbeat Content
Each heartbeat includes:
-
Node Statistics
- CPU usage percentage
- Memory used/total bytes
- Per-GPU statistics (utilization, memory, temperature, power)
-
Health Status
- Status: "healthy", "degraded", "unhealthy"
- Issues: List of current problems
- Uptime: Seconds since agent started
-
Deployment Status
- List of all deployments with their current state
- Container ID, port, model, runtime
Implementation
Agent Side
File: compute/daemon/registrar.go
func (r *Registrar) heartbeatLoop(ctx context.Context) {
ticker := time.NewTicker(r.heartbeatInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := r.sendHeartbeat(ctx); err != nil {
r.consecutiveFailures++
if r.consecutiveFailures >= maxConsecutiveFailures {
r.attemptReconnect(ctx)
}
} else {
r.consecutiveFailures = 0
}
}
}
}
func (r *Registrar) sendHeartbeat(ctx context.Context) error {
stats, _ := r.agent.GetStats(ctx)
health, _ := r.agent.GetHealth(ctx)
deployments := r.agent.GetRunningDeployments()
req := &proto.HeartbeatRequest{
NodeId: r.nodeID,
Stats: convertToProtoStats(stats),
Health: convertToProtoHealth(health),
Deployments: convertToProtoDeployments(deployments),
Timestamp: time.Now().Unix(),
}
resp, err := r.client.Heartbeat(ctx, req)
if err != nil {
return err
}
// Process pending commands
for _, cmd := range resp.Commands {
r.handleCommand(ctx, cmd)
}
// Update heartbeat interval if changed
if resp.Config != nil && resp.Config.HeartbeatIntervalSeconds > 0 {
r.heartbeatInterval = time.Duration(resp.Config.HeartbeatIntervalSeconds) * time.Second
}
return nil
}
Platform Side
File: platform/service/registry/service.go
// ProcessHeartbeat updates agent state and returns pending commands
func (s *Service) ProcessHeartbeat(ctx context.Context, req *proto.HeartbeatRequest) error {
s.mu.Lock()
defer s.mu.Unlock()
agent, ok := s.agents[req.NodeId]
if !ok {
return fmt.Errorf("agent not registered: %s", req.NodeId)
}
agent.LastHeartbeat = time.Now()
agent.Status = "online"
// Update node status in database (async)
go s.store.UpdateNodeStatus(ctx, req.NodeId, domain.NodeStatusOnline)
return nil
}
// StartOfflineChecker runs background goroutine to detect offline agents
func (s *Service) StartOfflineChecker(ctx context.Context) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.checkOfflineAgents(ctx)
}
}
}
func (s *Service) checkOfflineAgents(ctx context.Context) {
s.mu.Lock()
defer s.mu.Unlock()
threshold := time.Now().Add(-s.config.HeartbeatTimeout)
for nodeID, agent := range s.agents {
if agent.LastHeartbeat.Before(threshold) && agent.Status == "online" {
agent.Status = "offline"
go s.store.UpdateNodeStatus(ctx, nodeID, domain.NodeStatusOffline)
// Remove Caddy route for offline agent
if s.caddy != nil {
s.caddy.RemoveRoute(ctx, agent.GatewaySubdomain)
}
}
}
}
Authentication & Tokens
Nebula uses bearer tokens to authenticate API requests to deployed models.
Token Generation
File: platform/service/deployment/service.go
// generateAccessToken creates a cryptographically secure token
func generateAccessToken() string {
bytes := make([]byte, 24) // 192 bits of entropy
if _, err := rand.Read(bytes); err != nil {
// Fallback to UUID if crypto/rand fails
return "nbtok_" + strings.ReplaceAll(uuid.New().String(), "-", "")
}
return "nbtok_" + hex.EncodeToString(bytes)
}
// Example output: nbtok_a1b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4
Token Characteristics:
- Prefix:
nbtok_(for identification) - Entropy: 192 bits (24 bytes)
- Format:
nbtok_+ 48 hex characters - Source:
crypto/rand.Read()
Token Flow
┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Platform │ │ nebula-agent │ │ Auth Proxy │ │ Runtime │
│ (Deployment │ │ (gRPC) │ │ (:8888) │ │ Container │
│ Service) │ │ │ │ │ │ │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │ │
│ 1. Deploy() │ │ │
│ generateAccessToken │ │ │
│ token="nbtok_..." │ │ │
│ │ │ │
│ 2. StartRuntime │ │ │
│ (access_token, │ │ │
│ is_public) │ │ │
│ ───────────────────►│ │ │
│ │ │ │
│ │ 3. Register route │ │
│ │ with token │ │
│ │ ───────────────────►│ │
│ │ │ │
│ │ 4. Start container │ │
│ │ ────────────────────────────────────────►│
│ │ │ │
│ │ 5. Store in SQLite │ │
│ │ (deployments table) │ │
│ │ │ │
▼ ▼ ▼ ▼
LATER: API Request
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Client │ │ Auth Proxy │ │ Runtime │
│ │ │ (:8888) │ │ Container │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
│ 6. POST /v1/chat/completions │ │
│ Authorization: Bearer nbtok_... │ │
│ ─────────────────────────────────────────►│ │
│ │ │
│ 7. Validate │ │
│ token │ │
│ │ │
│ 8. If valid, │ │
│ proxy to │ │
│ container │ │
│ │ ───────────────────►│
│ │ │
│ │ ◄───────────────────│
│ ◄─────────────────────────────────────────│ │
│ │ │
▼ ▼ ▼
Token Storage
Agent SQLite Database (agent.db):
CREATE TABLE deployments (
id TEXT PRIMARY KEY,
model TEXT NOT NULL,
runtime TEXT NOT NULL,
status TEXT NOT NULL,
container_id TEXT,
port INTEGER NOT NULL,
access_token TEXT, -- Token stored here
is_public INTEGER DEFAULT 0, -- 0 = requires token, 1 = public
...
);
Token Validation
File: compute/daemon/api/proxy.go
// DeploymentRoute holds routing information for a deployment
type DeploymentRoute struct {
DeploymentID string
RuntimePort int // Container's localhost port
AccessToken string // Token for authentication
IsPublic bool // If true, skip auth
Runtime string // ollama, vllm, tgi
}
// handleRequest processes incoming API requests
func (p *AuthProxy) handleRequest(w http.ResponseWriter, r *http.Request) {
// 1. Extract deployment ID from header or path
deploymentID := r.Header.Get("X-Nebula-Deployment")
if deploymentID == "" {
deploymentID = extractFromPath(r.URL.Path)
}
// 2. Get route for deployment
route, ok := p.routes[deploymentID]
if !ok {
p.writeError(w, http.StatusNotFound, "deployment not found")
return
}
// 3. Validate token (unless public)
if !route.IsPublic {
if !p.validateToken(r, route.AccessToken) {
p.writeError(w, http.StatusUnauthorized, "invalid or missing access token")
return
}
}
// 4. Proxy to runtime container
proxy := p.proxies[deploymentID]
proxy.ServeHTTP(w, r)
}
// validateToken checks the Authorization header
func (p *AuthProxy) validateToken(r *http.Request, expectedToken string) bool {
if expectedToken == "" {
return true // No token required
}
authHeader := r.Header.Get("Authorization")
if authHeader == "" {
return false
}
// Support both "Bearer <token>" and raw token
token := authHeader
if strings.HasPrefix(authHeader, "Bearer ") {
token = strings.TrimPrefix(authHeader, "Bearer ")
}
return token == expectedToken
}
Using Tokens
Example API Request:
# With Bearer token
curl -X POST https://abc123de.gateway.nebulactl.com/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer nbtok_a1b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4" \
-d '{
"model": "llama3.2:1b",
"messages": [{"role": "user", "content": "Hello!"}]
}'
# For public deployments (no token needed)
curl -X POST https://abc123de.gateway.nebulactl.com/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "llama3.2:1b",
"messages": [{"role": "user", "content": "Hello!"}]
}'
Deployment Lifecycle
Full Deployment Flow
┌──────────────────────────────────────────────────────────────────────────────┐
│ DEPLOYMENT CREATION │
└──────────────────────────────────────────────────────────────────────────────┘
1. User Request
nebulactl deploy --model llama3.2:1b --runtime ollama --node gpu-server-1
2. Platform Processing (platform/service/deployment/service.go)
├─ Generate deployment ID: uuid.New().String()
├─ Auto-detect model source: ollama:// or hf://
├─ Select node (if not specified): scheduler.SelectNode()
├─ Generate access token: generateAccessToken() → "nbtok_..."
└─ Connect to agent via gRPC
3. Agent Processing (compute/daemon/agent.go)
├─ StartRuntime receives request with token
├─ Allocate port: portAllocator.Allocate() → 30000
├─ Create state: stateManager.CreateDeployment()
│ └─ Saves to deployments table with access_token
├─ Pull image if needed: docker.PullImage()
├─ Create container: docker.CreateContainer()
│ ├─ Mount volumes (model cache)
│ ├─ Set GPU devices (NVIDIA_VISIBLE_DEVICES)
│ └─ Configure port binding (localhost:30000)
├─ Start container: docker.StartContainer()
├─ Wait for health: waitForHealth() with timeout
└─ Register with auth proxy: authProxy.RegisterDeployment()
4. Auth Proxy Registration (compute/daemon/api/proxy.go)
├─ Create reverse proxy: httputil.NewSingleHostReverseProxy()
├─ Store route with token
└─ Ready to accept requests
5. Return to User
├─ Container ID
├─ Port: 30000
├─ Gateway endpoint: https://abc123de.gateway.nebulactl.com
└─ Access token: nbtok_...
┌──────────────────────────────────────────────────────────────────────────────┐
│ DEPLOYMENT STATES │
└──────────────────────────────────────────────────────────────────────────────┘
┌───────────┐
│ starting │ ──── Initial state after StartRuntime called
└─────┬─────┘
│
▼
┌───────────┐
│ running │ ──── Container started and healthy
└─────┬─────┘
│
│ StopRuntime or error
▼
┌───────────┐ ┌───────────┐
│ stopped │ │ failed │
└───────────┘ └───────────┘
State Management
File: compute/daemon/state_manager.go
// CreateDeployment creates a new deployment record
func (sm *StateManager) CreateDeployment(ctx context.Context, config *runtime.RuntimeConfig,
deploymentID, accessToken string, isPublic bool) error {
deployment := &store.Deployment{
ID: deploymentID,
Model: config.Model,
Runtime: config.Runtime,
Status: "starting",
Image: config.Image,
Port: config.Port,
GPUDevices: config.GPUDevices,
ModelPath: config.ModelPath,
Args: config.Args,
Env: config.Env,
AccessToken: accessToken,
IsPublic: isPublic,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
return sm.store.CreateDeployment(ctx, deployment)
}
// UpdateDeploymentStarted marks deployment as running
func (sm *StateManager) UpdateDeploymentStarted(ctx context.Context,
deploymentID, containerID string) error {
deployment, _ := sm.store.GetDeployment(ctx, deploymentID)
deployment.Status = "running"
deployment.ContainerID = containerID
deployment.StartedAt = &now
return sm.store.UpdateDeployment(ctx, deployment)
}
Database Schema
Platform Database (nebula.db)
Location: /var/lib/nebulad/nebula.db
File: platform/storage/sqlite/schema.sql
-- Nodes table: registered agents
CREATE TABLE IF NOT EXISTS nodes (
id TEXT PRIMARY KEY, -- UUID
name TEXT NOT NULL, -- Human-readable name
host TEXT NOT NULL UNIQUE, -- IP address
grpc_port INTEGER NOT NULL, -- Agent gRPC port (9091)
status TEXT NOT NULL, -- online, offline, provisioning, error
provider TEXT NOT NULL, -- ssh
connection_type TEXT NOT NULL DEFAULT 'direct',
ssh_user TEXT,
ssh_port INTEGER,
ssh_key_path TEXT,
gateway_subdomain TEXT, -- abc123de.gateway.nebulactl.com
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_nodes_status ON nodes(status);
CREATE INDEX IF NOT EXISTS idx_nodes_host ON nodes(host);
Agent Database (agent.db)
Location: {DataPath}/agent.db (e.g., /var/lib/nebula/agent.db)
File: compute/storage/agent_store.go
-- Deployments table: model deployments on this agent
CREATE TABLE IF NOT EXISTS deployments (
id TEXT PRIMARY KEY, -- UUID
model TEXT NOT NULL, -- meta-llama/Llama-2-7b
runtime TEXT NOT NULL, -- vllm, tgi, ollama
status TEXT NOT NULL, -- starting, running, stopping, stopped, failed
container_id TEXT, -- Docker container ID
image TEXT NOT NULL, -- Docker image
port INTEGER NOT NULL, -- Runtime port (30000-31000)
gpu_devices TEXT, -- JSON array: [0, 1]
model_path TEXT NOT NULL, -- hf://, ollama://, s3://, fs://
args TEXT, -- JSON array of arguments
env TEXT, -- JSON object of env vars
error_message TEXT,
access_token TEXT, -- nbtok_... (for auth)
is_public INTEGER DEFAULT 0, -- 0 = private, 1 = public
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
started_at INTEGER,
stopped_at INTEGER
);
-- Deployment events: history of state changes
CREATE TABLE IF NOT EXISTS deployment_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
deployment_id TEXT NOT NULL,
event_type TEXT NOT NULL, -- started, stopped, failed, restarted
message TEXT NOT NULL,
timestamp INTEGER NOT NULL,
FOREIGN KEY (deployment_id) REFERENCES deployments(id) ON DELETE CASCADE
);
-- Port allocations: track which ports are in use
CREATE TABLE IF NOT EXISTS port_allocations (
port INTEGER PRIMARY KEY,
deployment_id TEXT NOT NULL,
allocated_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_deployments_status ON deployments(status);
CREATE INDEX IF NOT EXISTS idx_deployments_runtime ON deployments(runtime);
CREATE INDEX IF NOT EXISTS idx_events_deployment ON deployment_events(deployment_id);
CREATE INDEX IF NOT EXISTS idx_events_timestamp ON deployment_events(timestamp);
CREATE INDEX IF NOT EXISTS idx_port_allocations_deployment ON port_allocations(deployment_id);
Code References
Critical Files
| Component | File | Key Lines | Description |
|---|---|---|---|
| Agent entry | cmd/nebula-agent/main.go | 29-62 | CLI setup, main() |
| Agent run | cmd/nebula-agent/main.go | 65-263 | runAgent() function |
| Daemon entry | cmd/nebulad/main.go | 33-134 | CLI setup, main() |
| Daemon run | cmd/nebulad/main.go | 142-314 | runServer() function |
| Registrar | compute/daemon/registrar.go | 68-90 | Start() method |
| Register request | compute/daemon/registrar.go | 163-210 | register() method |
| Heartbeat loop | compute/daemon/registrar.go | 213-261 | heartbeatLoop() |
| Send heartbeat | compute/daemon/registrar.go | 264-319 | sendHeartbeat() |
| Registry service | platform/service/registry/service.go | 89-186 | RegisterAgent() |
| Process heartbeat | platform/service/registry/service.go | 189-210 | ProcessHeartbeat() |
| Offline checker | platform/service/registry/service.go | 261-310 | checkOfflineAgents() |
| Token generation | platform/service/deployment/service.go | 52-60 | generateAccessToken() |
| Deploy | platform/service/deployment/service.go | 62-188 | Deploy() |
| Auth proxy | compute/daemon/api/proxy.go | 36-46 | NewAuthProxy() |
| Handle request | compute/daemon/api/proxy.go | 142-188 | handleRequest() |
| Validate token | compute/daemon/api/proxy.go | 191-208 | validateToken() |
| State manager | compute/daemon/state_manager.go | 34-64 | CreateDeployment() |
| Agent gRPC server | platform/api/grpc/server/agent_server.go | 89-143 | StartRuntime() |
| Platform gRPC server | platform/api/grpc/server/platform_server.go | 30-64 | Register() |
Proto Files
| File | Description |
|---|---|
platform/api/grpc/platform.proto | PlatformService: Register, Heartbeat, Deregister |
platform/api/grpc/agent.proto | AgentService: 16+ deployment management methods |
Generated Code
| File | Description |
|---|---|
platform/api/grpc/proto/platform.pb.go | Generated from platform.proto |
platform/api/grpc/proto/platform_grpc.pb.go | gRPC service stubs |
platform/api/grpc/proto/agent.pb.go | Generated from agent.proto |
platform/api/grpc/proto/agent_grpc.pb.go | gRPC service stubs |
Example: Full Request Flow
Here's a complete example showing how a chat completion request flows through the system:
1. User sends request to platform gateway:
POST https://api.nebulactl.com/v1/chat/completions
{
"model": "my-llama-deployment",
"messages": [{"role": "user", "content": "Hello"}]
}
2. Gateway Router resolves model to deployment:
- Looks up "my-llama-deployment" in deployments
- Gets node info: node-abc123
- Gets route: {endpoint: "https://abc123de.gateway.nebulactl.com", token: "nbtok_..."}
3. Gateway forwards to agent's gateway subdomain:
POST https://abc123de.gateway.nebulactl.com/v1/chat/completions
Authorization: Bearer nbtok_...
4. Caddy receives request and proxies to agent:
→ http://192.168.1.100:8888/v1/chat/completions
5. Auth Proxy on agent validates token:
- Extracts token from Authorization header
- Looks up route for default deployment
- Compares token with stored access_token
- Token matches → proceed
6. Auth Proxy forwards to runtime container:
→ http://127.0.0.1:30000/v1/chat/completions
7. Ollama/vLLM/TGI processes the request and returns response
8. Response flows back through the chain:
Runtime → Auth Proxy → Caddy → Gateway → User
Security Considerations
Current Security Model
-
Token Generation
- Uses
crypto/randfor cryptographic randomness - 192-bit entropy (considered secure)
- Prefix
nbtok_for easy identification
- Uses
-
Token Transmission
- Sent via gRPC (StartRuntimeRequest) - internal network
- Client uses HTTPS (via Caddy) for API requests
-
Token Storage
- Stored in SQLite on agent (file system permissions)
- In-memory in Auth Proxy routes map
-
Token Validation
- Simple string comparison
- No rate limiting (future improvement)
- No token expiration (future improvement)
Recommendations
- Enable TLS for internal gRPC - Currently uses insecure credentials
- Add token expiration - Tokens never expire
- Implement rate limiting - Protect against brute force
- Add token rotation - Allow refreshing tokens
- Use secure storage - Consider encryption at rest
Troubleshooting
Agent Not Registering
-
Check platform address is reachable:
nc -zv platform-host 9090 -
Verify TLS settings match:
# Agent config
platform:
use_tls: true # Must match platform -
Check agent logs:
journalctl -u nebula-agent -f
Heartbeat Failures
- Check network connectivity
- Verify node ID matches
- Look for clock skew issues
- Check platform heartbeat timeout setting
Token Validation Failing
- Verify token format:
nbtok_+ 48 hex chars - Check
is_publicflag in deployment - Ensure correct header format:
Authorization: Bearer <token> - Check auth proxy logs for details
Deployment Not Starting
-
Check container logs:
docker logs <container-id> -
Verify GPU availability:
nvidia-smi -
Check port availability:
netstat -tlnp | grep 30000 -
Review deployment events in agent database