In this post, we’ll explore how asynchronous programming concepts in PyUVM and cocotb transform the way we build hardware verification environments, with a focus on practical implementation patterns that experienced verification engineers can immediately apply. If that’s your first time in these waters, be sure to check my previous posts about PyUVM. Especially this and this.
TL;DR Table for this post
Concept | SystemVerilog UVM | Python PyUVM/cocotb | Notes |
---|---|---|---|
Function Definition | task run_phase(); or virtual task run_phase(uvm_phase phase); | async def run_phase(self): | Python requires async keyword to define coroutines |
Waiting for Time | #10; | await Timer(10, units="ns") | Python explicitly awaits time triggers |
Waiting for Signal Edge | @(posedge clk); | await RisingEdge(clk) | Python uses awaitable trigger objects |
Parallel Execution | fork A(); B(); join | task_a = start_soon(A()); task_b = start_soon(B()); await Combine( Join(task_a), Join(task_b)) | Python explicitly starts and joins tasks |
Timeout Handling | fork begin #1000; $display("Timeout"); end begin wait(done); end join_any disable fork; | result = await First(Timer(1000, units="ns"), wait_done()) | Python’s approach is more composable and explicit |
Where Allowed | Inside tasks, not allowed in functions | Only inside functions defined with async def | Python strictly separates synchronous and asynchronous code |
What Can Be Awaited | Implicit (time, events) | Explicit (only awaitable objects/coroutines) | Python requires explicit awaitable triggers |
1. Async/Await Fundamentals: Replacing Tasks with Coroutines
In traditional SystemVerilog UVM, concurrent operations are typically handled using tasks, semaphores, and events. PyUVM replaces these with Python’s coroutines, which provide a more structured and readable approach to concurrency.
SV/UVM Approach:
task run_phase(uvm_phase phase);
phase.raise_objection(this);
// Drive signals, wait for responses
@(posedge clk);
vif.valid <= 1;
@(posedge clk);
vif.valid <= 0;
phase.drop_objection(this);
endtask
PyUVM Approach:
async def run_phase(self):
self.raise_objection()
# Drive signals, wait for responses
await self.bfm.clock_posedge()
self.bfm.valid.value = 1
await self.bfm.clock_posedge()
self.bfm.valid.value = 0
self.drop_objection()
Some takeaways:
- Python’s
async def
defines a coroutine (similar to a SystemVerilog task) - The
await
keyword explicitly indicates where execution may pause - More natural flow control with Python’s standard constructs
IMPORTANT! You can only use await
inside functions defined with async def
(just like you would only use @(posedge clk)
inside a systemverilog task. It works with any awaitable object, including cocotb triggers, tasks, and other coroutines.
Note that in PyUVM, most UVM phase methods are implemented as coroutines, allowing them to use await
to pause execution without blocking the entire simulation. This approach makes the code more readable and easier to reason about, especially when dealing with complex timing scenarios.
2. Timer Management: Precise Control Over Simulation Time
Cocotb provides elegant ways to control timing in the simulation through various timer mechanisms. These are crucial for scheduling events, implementing timeouts, and modeling real-world timing constraints.
# Wait for a specific amount of simulation time
await cocotb.triggers.Timer(10, units="ns")
# Wait for multiple clock cycles
await cocotb.triggers.ClockCycles(clock, 5)
# Wait for a timeout or an event, whichever comes first
result = await First(
timeout_trigger := Timer(100, units="ns"),
done_trigger := RisingEdge(self.bfm.done)
)
if result is timeout_trigger:
self.logger.error("Operation timed out")
else:
self.logger.info("Operation completed")
This approach replaces SystemVerilog’s #delay
notation with Timer()
and fork-join_any
constructs with First()
, both are cocotb.triggers
constructs. The ability to name triggers and compare them directly simplifies timeout handling compared to the complex event-based approach in traditional UVM.

3. Edge and Event Handling: Responding to Hardware Signals
Responding to hardware signal changes is fundamental to verification. Cocotb provides a rich set of triggers that make this straightforward.
# Wait for rising edge on a signal
await cocotb.triggers.RisingEdge(self.bfm.clk)
# Wait for falling edge on a signal
await cocotb.triggers.FallingEdge(self.bfm.reset_n)
# Wait for any edge (rising or falling)
await cocotb.triggers.Edge(self.bfm.data_valid)
# Wait for a specific value
await cocotb.triggers.ValueChange(self.bfm.result)
Unlike SystemVerilog, where you would use @(posedge clk)
or wait(condition)
, cocotb’s triggers can be composed and combined in powerful ways:
# Wait for either condition to occur
await cocotb.triggers.First(
cocotb.triggers.RisingEdge(self.bfm.done),
cocotb.triggers.Timer(1000, units="ns")
)
# Wait for all conditions to occur (in any order)
await cocotb.triggers.Combine(
cocotb.triggers.RisingEdge(self.bfm.valid),
cocotb.triggers.ValueChange(self.bfm.data)
)
How’s that for modelling complex hardware?
4. Coroutine Management: Parallelism with start_soon/Join
One of the most powerful aspects of cocotb’s asynchronous model is the ability to easily manage multiple concurrent operations. This is especially useful for modeling parallel hardware processes and implementing sophisticated test scenarios.
In SystemVerilog UVM, you might use fork-join
or fork-join_none
to launch parallel processes:
task test_parallel_operations();
fork
drive_data();
monitor_responses();
join
endtask
In PyUVM, this is handled with cocotb’s start_soon
and Join
triggers:
async def test_parallel_operations(self):
# Start multiple coroutines concurrently
data_task = cocotb.start_soon(self.drive_data())
monitor_task = cocotb.start_soon(self.monitor_responses())
# Wait for both to complete
await Combine(Join(data_task), Join(monitor_task))
This bit is one of the more confusing ones.
So here is how I would break it down:
Conclusion: Putting It All Together
await
is the basic mechanism to pause and wait for any single awaitableCombine()
waits for multiple triggers to all complete, in any orderCombine(Join(), Join())
specifically waits for multiple concurrently running tasks to complete. So you need to start the tasks usingstart_soon()
, then pass the handles to theJoins
inside theCombine
.
The difference between just doing multiple sequential await
statements and using Combine()
is that with sequential awaits, you’re waiting for things in a specific order:
# This waits for valid, THEN waits for 100ns
await RisingEdge(valid)
await Timer(100, "ns")
Versus with Combine()
, you’re waiting for all events to happen in any order:
# This waits for both valid AND 100ns, in any order
await Combine(RisingEdge(valid), Timer(100, "ns"))
Using Combine(Join(), Join())
for tasks is how you implement a parallel execution pattern that waits for all concurrent operations to finish – the Python equivalent of SystemVerilog’s fork-join
.
Advanced material – what’s going under the hood?
Combine(Join(), Join())
, First(Join(), Join())
Both use Python’s asyncio primitives (Future objects). They maintain internal state to track which triggers have fired and then handle proper cleanup (SystemVerilog knows it as disable fork
) to avoid hanging tasks or memory leaks. Pretty neat, huh?
Now about using or not using Join inside the Combine and First clauses:
Why the difference?
- Tasks started with
start_soon()
return aTask
object which needs to be wrapped withJoin()
to create a trigger that fires when the task completes. - Direct triggers (like
Timer
,RisingEdge
, etc.) are already awaitable objects specifically designed for simulation events.
Eeasier now? let me know in the comments 🙂