Writing queries
If you’re more of a theory person, maybe go straight to the concepts then come back here.
Setup
Firstly, let’s set up our data types and add a couple of helper functions:
import stepping as st
class Product(st.Data):
name: str
price: float
class LineItem(st.Data):
basket_id: int
product_name: str
qty: int
def pick_price(p: st.Pair[Product, LineItem]) -> float:
return p.left.price * p.right.qty
def to_receipt_item(p: st.Pair[float, int]) -> str:
return f"Basket id: {p.right} total: ${p.left}"
st.Datais a subclass ofpydantic.BaseModelwith some other methods attached.st.Pairis just a dataclass with a.leftand a.right. These are used heavily when joining/grouping data, for example if you were tost.join(A, B)the return type is aPair[A, B].
Query
cache = st.Cache[str]()
def query(
products: st.ZSet[Product],
line_items: st.ZSet[LineItem],
) -> st.ZSet[st.Pair[Product, LineItem]]:
joined = st.join(
products,
line_items,
on_left=st.Index.pick(Product, lambda p: p.name),
on_right=st.Index.pick(LineItem, lambda l: l.product_name),
)
grouped = st.group_reduce_flatten(
joined,
by=st.Index.pick(st.Pair[Product, LineItem], lambda p: p.right.basket_id),
zero=float,
pick_value=pick_price,
)
receipt_items = st.map(grouped, f=to_receipt_item)
_ = cache[receipt_items](lambda z: st.integrate(z))
return joined
st.Cache is somewhere we persist data. In this example, it’s not particularly exciting as we’re doing everything in memory with Python. But on the following page, we’ll wire it up to a database.
query(...) is a function equivalent to a SELECT ... query in SQL. Due to the way this gets compiled later, only a subset of Python is supported within this lexical block.
st.ZSet is nearly equivalent to a table in SQL. In Python, we implement it (kinda) as a dict[Row, int], where the int is the count of the number of values (including negative counts to represent removed rows). For more details, look at the concepts page.
st.join(...) is equivalent to a LEFT INNER JOIN, there is also st.outer_join(...).
st.Index.pick(...) picks fields from a type. These indexes are used by the Store later to ensure that querying past data is efficient.
st.group_reduce_flatten(...) is equivalent to SELECT sum(...) FROM ... GROUP BY basket_id. Unlike in SQL, the group, reduce and flatten can be decomposed, see the definition.
st.map(...) just maps a function over each value in the ZSet.
st.integrate(...) sums its arguments over time. In this case, we build up a big ZSet of all the receipt_items over time.
Compile
graph = st.compile(query)
store = st.StorePython.from_graph(graph)
st.compile(...) is probably the most complex bit of stepping, it takes the code from query and parses it into a Graph that we execute later. The exact type of this graph is:
st.Graph[
st.A2[
st.ZSet[Product],
st.ZSet[LineItem]
],
st.A1[
st.ZSet[st.Pair[Product, LineItem]]
],
]
Aθ is just a collection of arguments with length θ.
This corresponds to the type of the function we compiled:
(st.ZSet[Product], st.ZSet[LineItem]) -> st.ZSet[st.Pair[Product, LineItem]]
The store is where we put data that we need persisting between execution iterations. Right now, this is a st.StorePython, but on the next page, we will wire up a st.StorePostgres.
Execute
(product_action, line_item_action) = st.actions(store, graph)
product_action.insert(Product(name="tv", price=3.0))
product_action.insert(Product(name="radio", price=5.0))
line_item_action.insert(
LineItem(basket_id=1, product_name="radio", qty=4),
LineItem(basket_id=1, product_name="tv", qty=1),
LineItem(basket_id=2, product_name="tv", qty=2),
)
product_action.replace(
Product(name="tv", price=3.0),
Product(name="tv", price=4.0),
)
Retrieve
Now let’s retrieve some data from the cache:
output = cache.zset(store)
print(output)
cache.zset(...) should return a ZSet[str], in this case:
╒═══════════╤═══════════════════════════╕
│ _count_ │ _value_ │
╞═══════════╪═══════════════════════════╡
│ 1 │ Basket id: 2 total: $8.0 │
├───────────┼───────────────────────────┤
│ 1 │ Basket id: 1 total: $24.0 │
╘═══════════╧═══════════════════════════╛
It’s possible to cache with indexes:
_ = cache[receipt_items](
lambda z: st.integrate_indexed(z, indexes=(index_a, ...))
)
Then we can iterate over those values with:
for key, value, count in cache.zset(store).iter_by_index(
index_a,
frozenset((optional, match, values, ...)),
):
...
You may have noticed when looking in your IDE, that product_action.insert(...) has the type:
(*Product) -> tuple[st.ZSet[st.Pair[Product, LineItem]]]
Every time we call Action.insert(...)/Action.remove(...)/Action.replace(...), we are returned the ZSet of changes from running a single iteration. This is useful in the case where we want to do something with the data other than store it in a stepping cache (putting it on a queue for example).
This also demonstrates how removing/updating data is implemented with ZSets (notice the -1 counts):
iteration_output = product_action.replace(
Product(name="tv", price=4.0),
Product(name="tv", price=5.0),
)
print(iteration_output)
╒═══════════╤═══════════════════════════════╤═════════════════════════════════════════════════╕
│ _count_ │ left │ right │
╞═══════════╪═══════════════════════════════╪═════════════════════════════════════════════════╡
│ -1 │ Product(name='tv', price=4.0) │ LineItem(basket_id=2, product_name='tv', qty=2) │
├───────────┼───────────────────────────────┼─────────────────────────────────────────────────┤
│ 1 │ Product(name='tv', price=5.0) │ LineItem(basket_id=2, product_name='tv', qty=2) │
├───────────┼───────────────────────────────┼─────────────────────────────────────────────────┤
│ -1 │ Product(name='tv', price=4.0) │ LineItem(basket_id=1, product_name='tv', qty=1) │
├───────────┼───────────────────────────────┼─────────────────────────────────────────────────┤
│ 1 │ Product(name='tv', price=5.0) │ LineItem(basket_id=1, product_name='tv', qty=1) │
╘═══════════╧═══════════════════════════════╧═════════════════════════════════════════════════╛