Generators, Coroutines & Concurrency

6.S050

Jack Feser

Created: 2023-05-11 Thu 11:06

Generators

def powers_of_2():
  x = 1
  while True:
    yield x
    x *= 2

for (i, n) in zip(range(5), powers_of_2()):
  print(f'2 ** {i} == {n}')

Generators

Removing some syntactic sugar…

def powers_of_2():
    x = 1
    while True:
	yield x
	x *= 2

gen = powers_of_2()
for i in range(5):
    n = next(gen)
    print(f'2 ** {i} == {n}')

Generators

  • Allow execution to be suspended and resumed
  • yield "returns" a value to the caller
  • When next() is called, execution resumes after yield
  • Use cases:
    • Lexing & other streams
    • Traversing data structures
    • Concurrency

Semantics of Generators

  • Two key pieces of state:
    • Return continuation (where to go when the generator yields)
    • Resume continuation (where to pick up again when next is called)

Semantics of Generators

Resume continuation:

while True:
    □
    x *= 2

Return continuation:

for i in range(5):
    n = □
    print(f'2 ** {i} == {n}')

Example

def gen():
  yield 4
  yield 2

i = gen()
print([next(i), next(i)])

Generators

  • Easily implemented in languages with first-class control (i.e. shift and reset)
  • Widely used, easy to reason about
  • Limitations:
    • Can only yield from the body of the generator
    • Information only flows out of the generator
  • Instance of more general concept: coroutines

Coroutines

  • Have yield, like generators
  • Can send values to coroutine
  • Can yield from functions called inside the coroutine

Coroutine constructs

  • create: create a coroutine and return a handle
    • implicit in python, performed by calling the coroutine function
  • resume coroutine value: resume coroutine, with value returned at yield point
    • python calls this coroutine.send(value)
  • yield value: transfer control to the resume point of current coroutine, returning value

Coroutine states

  • Suspended, running, terminated
  • Don't resume a terminated coroutine
  • Don't resume a running coroutine (only relevant in a multithreaded program)

Goal-oriented programming

  • Key idea: frame problem as goal to be solved
  • Goal may be:
    • Primitive
    • Disjunction of alternative goals, one of which must be satisfied
    • Conjunction of subgoals that must be satisfied in order

Example: Sandwich making

  • Make sandwich. Needs all of:
    • Get bread. Needs one of:
      • Have bread in pantry. Primitive.
      • Make bread. Primitive.
    • Apply topping. Needs one of:
      • ….

Example: String matching

  • Test whether string s matches pattern ("abc"|"de")."x"
  • Read this pattern as: "abc" or "de" followed by "x"

Source: "Revisiting Coroutines", De Moura & Ierusalimschy 2009

Example: String matching

  • Primitive goals: does literal string match at position?
  • Composite goals: sequencing and disjunction

Example: String matching

Proposal: Represent goals as coroutines that yield when they succeed.

Example: String matching

def prim(lit):
  def match(s, pos):
    if s[pos:].startswith(lit):
      yield (pos + len(lit))
  return match

Example: String matching

def alt(pat1, pat2):
  def match(s, pos):
    yield from pat1(s, pos)
    yield from pat2(s, pos)
  return match

What's up with yield from?

Example: String matching

def seq(pat1, pat2):
  def match(s, pos):
    for mid_pos in pat1(s, pos):
      yield from pat2(s, mid_pos)
  return match

Concurrency

  • Allow parts of our programs to be executed out of order or interleaved
    • E.g. when one part is waiting for network, another part continues executing
  • Concurrency is not parallelism!
    • Concurrency is useful (and used) in single-threaded programs

Cooperative concurrency

  • Tasks decide when to yield control to other tasks
    • (Often yields are hidden inside calls to IO functions)
  • async=/=await in JavaScript/Python are cooperative
  • Contrast with preemption, where tasks are periodically interrupted

Concurrency and coroutines

def do_stuff():
  while True:
    result = wait_for_results(4)
    print(f'Did stuff: {result}')

def do_more_stuff():
  while True:
    result = wait_for_results(1)
    print(f'Did more stuff: {result}')

run([do_stuff, do_more_stuff])

Can we make progress on do_more_stuff while we wait in do_stuff?

Concurrency and coroutines

Coroutines give us exactly what we need:

  • Ability to to yield control from inside a task
  • Ability to send values back into the task

Concurrency and coroutines

def do_stuff():
  while True:
    result = yield from wait_for_results(4)
    print(f'Did stuff: {result}')

Running concurrent tasks

def run(tasks):
  running = ... # running tasks
  waiting = ... # waiting tasks
  while True:
    for task in running:
      # run task to next yield

    for task in waiting:
      # if task is done waiting, put back on running queue

Running concurrent tasks

def run(tasks):
  # keep track of what to return to task
  running = [(t, None) for t in tasks]
  waiting = ... # waiting tasks
  while True:
    for task in running:
      # run task to next yield

    for task in waiting:
      # if task is done waiting, put back on running queue

Running concurrent tasks

def run(tasks):
  running = [(t, None) for t in tasks]
  waiting = []
  while True:
    for task in running:
      # run task to next yield
      for (t, v) in running:
	match t.send(v):
	# task yielded a "request" to call wait
	  case ("wait", n):
	    waiting.append((t, n))
	  case _:
	    pass
    running = []

    for task in waiting:
      # if task is done waiting, put back on running queue

Running concurrent tasks

def run(tasks):
  running = [(t, None) for t in tasks]
  waiting = []
  while True:
    for task in running:
      # run task to next yield, add to wait queue if needed

    # decrement waiting time
    waiting = [(t, n - 1) for (t, n) in waiting]
    for task in waiting:
      # if task is done waiting, put back on running queue

Running concurrent tasks

def run(tasks):
  running = [(t, None) for t in tasks]
  waiting = []
  while True:
    for task in running:
      # run task to next yield, add to wait queue if needed

    # decrement waiting time
    waiting = [(t, n - 1) for (t, n) in waiting]
    # if task is done waiting, put back on running queue
    for (t, n) in waiting:
      if n == 0:
	running.append((t, 'done waiting!'))
    # remove tasks from waiting queue if done
    waiting = [(t, n) for (t, n) in waiting if n > 0]

Running concurrent tasks


def wait_for_results(n):
  x = yield ("wait", n)
  return x

def task(n):
  while True:
    x = yield from wait_for_results(n)
    print(f"Task {n}: {x}")

run([task(1), task(10), task(3)])
Task 1: done waiting!
Task 1: done waiting!
Task 3: done waiting!
Task 1: done waiting!
Task 1: done waiting!
Task 1: done waiting!
Task 3: done waiting!
Task 1: done waiting!
Task 1: done waiting!
Task 1: done waiting!
Task 3: done waiting!
Task 1: done waiting!