Onyx models include a built-in simulate() method that handles multi-step prediction with automatic state integration. This tutorial covers when and how to use simulation.
Simulating vs. Direct Inference
If your model inputs/outputs do not have any parent and relation attributes, you do not need to use simulate(). Instead, you can use the underlying PyTorch model predictions directly:
import torch
from onyxengine import Onyx
onyx = Onyx()
model = onyx.load_model('example_model')
# Make a single prediction by directly calling the PyTorch model
batch_size = 1
sequence_length = model.config.sequence_length
num_inputs = len(model.config.inputs)
x = torch.randn(batch_size, sequence_length, num_inputs)
with torch.no_grad():
output = model(x)
print("\nModel Prediction:\n", output)
What is convenient about this case, is that all of your predictions can be made in parallel by passing in a batch of inputs, versus rolling out trajectories step-by-step.
Often times for hardware dynamics modeling though, you will have model inputs that need to be derived from the next predicted output in deployment (eg. acceleration_output -> velocity_input). In these cases, you should set parent and relation accordingly and use the simulate() method.
Using simulate()
To use the simulate() method, you simply need to pass in the following:
x0: initial conditions for the model inputs that have a parent and relation
external_inputs: full trajectory of inputs that do not have a parent and relation
sim_steps: number of simulation steps to roll out (can be inferred from external_inputs)
import torch
from onyxengine import Onyx
# Initialize the client
onyx = Onyx()
# Load a trained model
model = onyx.load_model('example_model')
# Get model parameters
batch_size = 1
sequence_length = model.config.sequence_length
sim_steps = 100
# Create initial state tensors (batch_size, sequence_length, 1)
velocity = torch.zeros(batch_size, sequence_length, 1)
position = torch.zeros(batch_size, sequence_length, 1)
# Create external input tensor (batch_size, sequence_length + sim_steps, 1)
control_input = torch.ones(batch_size, sequence_length + sim_steps, 1)
# Run simulation
result = model.simulate(
x0={'velocity': velocity, 'position': position},
external_inputs={'control_input': control_input},
sim_steps=sim_steps
)
# Access results
print("Velocity:", result.inputs['velocity'].shape)
print("Position:", result.inputs['position'].shape)
print("Acceleration:", result.outputs['acceleration'].shape)
# View the model outputs
print("Acceleration output:", result.outputs['acceleration'])
If it is not possible to provide the full trajectory of external inputs in deployment (e.g. if you are using a controller that computes a control_input at each step), you can call the simulate() method in a loop with sim_steps=1 to still benefit from the parent-relation state management.
# Control loop pseudocode
for step in range(sim_steps):
control_input = my_controller(velocity, position)
result = model.simulate(
x0={'velocity': velocity, 'position': position},
external_inputs={'control_input': control_input},
sim_steps=1
)
Understanding the simulate() API
Parameters
| Parameter | Type | Description |
|---|
x0 | Dict[str, Tensor] | Initial state values for derived inputs |
external_inputs | Dict[str, Tensor] | External inputs for the full trajectory |
sim_steps | int | Number of simulation steps (optional if inferable) |
device | torch.device | Target device (default: input tensor device) |
dtype | torch.dtype | Target dtype (default: input tensor dtype) |
Tensor Shapes
All tensors must be 3D with shape (batch_size, time_steps, 1):
- x0 tensors:
(batch_size, sequence_length, 1)
- external_inputs tensors:
(batch_size, sequence_length + sim_steps, 1)
Return Value
simulate() returns a SimulationResult object with two FeatureTrajectory attributes:
result = model.simulate(...)
# Access input trajectories (states + external inputs)
result.inputs['velocity'] # Shape: (batch_size, sequence_length + sim_steps, 1)
result.inputs['position'] # Shape: (batch_size, sequence_length + sim_steps, 1)
result.inputs['control_input'] # Shape: (batch_size, sequence_length + sim_steps, 1)
# Access output trajectories
result.outputs['acceleration'] # Shape: (batch_size, sim_steps, 1)
# Get full tensors
full_input_tensor = result.inputs.tensor # All inputs stacked
full_output_tensor = result.outputs.tensor # All outputs stacked
State Integration
During simulation, states update automatically based on their relation:
Derivative Relation
For Input(name='velocity', parent='acceleration', relation='derivative'):
velocity[t+1] = velocity[t] + acceleration[t] * dt
Delta Relation
For Input(name='position', parent='displacement', relation='delta'):
position[t+1] = position[t] + displacement[t]
Equal Relation
For Input(name='state', parent='output', relation='equal'):
Batch Simulation
Simulate multiple trajectories at once by leveraging the batch dimension of your inputs:
# Simulate 100 trajectories at once
batch_size = 100
sequence_length = model.config.sequence_length
sim_steps = 100
# Batch of different initial conditions
velocity_batch = torch.randn(batch_size, sequence_length, 1)
position_batch = torch.randn(batch_size, sequence_length, 1)
control_batch = torch.ones(batch_size, sequence_length + sim_steps, 1)
result = model.simulate(
x0={'velocity': velocity_batch, 'position': position_batch},
external_inputs={'control_input': control_batch},
sim_steps=sim_steps
)
# Result shapes: (batch_size, sim_steps, 1)
GPU Acceleration
If your batch size is large, you can move computation to GPU for faster simulation by specifying the device parameter:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Tensors on GPU
velocity = torch.zeros(batch_size, sequence_length, 1, device=device)
position = torch.zeros(batch_size, sequence_length, 1, device=device)
control = torch.ones(batch_size, sequence_length + sim_steps, 1, device=device)
# Model moves to GPU automatically
result = model.simulate(
x0={'velocity': velocity, 'position': position},
external_inputs={'control_input': control},
sim_steps=sim_steps,
device=device
)
It is worth measuring the execution time of simulating on both CPU and GPU to determine which is faster, it is not always the case that GPU will be faster.
Working with Results
Convert to NumPy
import numpy as np
velocity_np = result.inputs['velocity'].cpu().numpy()
position_np = result.inputs['position'].cpu().numpy()
Plotting Trajectories
import matplotlib.pyplot as plt
# Get trajectory data
time = np.arange(sim_steps) * model.config.dt
velocity = result.inputs['velocity'][0, sequence_length:, 0].cpu().numpy()
position = result.inputs['position'][0, sequence_length:, 0].cpu().numpy()
fig, axes = plt.subplots(2, 1, figsize=(10, 6))
axes[0].plot(time, velocity)
axes[0].set_ylabel('Velocity')
axes[1].plot(time, position)
axes[1].set_ylabel('Position')
axes[1].set_xlabel('Time (s)')
plt.tight_layout()
plt.show()