Writing queries
If you’re more of a theory person, maybe go straight to the concepts then come back here.
Setup
Firstly, let’s set up our data types and add a couple of helper functions:
import stepping as st
class Product(st.Data):
name: str
price: float
class LineItem(st.Data):
basket_id: int
product_name: str
qty: int
def pick_price(p: st.Pair[Product, LineItem]) -> float:
return p.left.price * p.right.qty
def to_receipt_item(p: st.Pair[float, int]) -> str:
return f"Basket id: {p.right} total: ${p.left}"
st.Data
is a subclass ofpydantic.BaseModel
with some other methods attached.st.Pair
is just a dataclass with a.left
and a.right
. These are used heavily when joining/grouping data, for example if you were tost.join(A, B)
the return type is aPair[A, B]
.
Query
cache = st.Cache[str]()
def query(
products: st.ZSet[Product],
line_items: st.ZSet[LineItem],
) -> st.ZSet[st.Pair[Product, LineItem]]:
joined = st.join(
products,
line_items,
on_left=st.Index.pick(Product, lambda p: p.name),
on_right=st.Index.pick(LineItem, lambda l: l.product_name),
)
grouped = st.group_reduce_flatten(
joined,
by=st.Index.pick(st.Pair[Product, LineItem], lambda p: p.right.basket_id),
zero=float,
pick_value=pick_price,
)
receipt_items = st.map(grouped, f=to_receipt_item)
_ = cache[receipt_items](lambda z: st.integrate(z))
return joined
st.Cache
is somewhere we persist data. In this example, it’s not particularly exciting as we’re doing everything in memory with Python. But on the following page, we’ll wire it up to a database.
query(...)
is a function equivalent to a SELECT ...
query in SQL. Due to the way this gets compiled later, only a subset of Python is supported within this lexical block.
st.ZSet
is nearly equivalent to a table in SQL. In Python, we implement it (kinda) as a dict[Row, int]
, where the int
is the count of the number of values (including negative counts to represent removed rows). For more details, look at the concepts page.
st.join(...)
is equivalent to a LEFT INNER JOIN
, there is also st.outer_join(...)
.
st.Index.pick(...)
picks fields from a type. These indexes are used by the Store
later to ensure that querying past data is efficient.
st.group_reduce_flatten(...)
is equivalent to SELECT sum(...) FROM ... GROUP BY basket_id
. Unlike in SQL, the group, reduce and flatten can be decomposed, see the definition.
st.map(...)
just maps a function over each value in the ZSet
.
st.integrate(...)
sums its arguments over time. In this case, we build up a big ZSet
of all the receipt_items
over time.
Compile
graph = st.compile(query)
store = st.StorePython.from_graph(graph)
st.compile(...)
is probably the most complex bit of stepping
, it takes the code from query
and parses it into a Graph
that we execute later. The exact type of this graph is:
st.Graph[
st.A2[
st.ZSet[Product],
st.ZSet[LineItem]
],
st.A1[
st.ZSet[st.Pair[Product, LineItem]]
],
]
Aθ
is just a collection of arguments with length θ
.
This corresponds to the type of the function we compiled:
(st.ZSet[Product], st.ZSet[LineItem]) -> st.ZSet[st.Pair[Product, LineItem]]
The store
is where we put data that we need persisting between execution iterations. Right now, this is a st.StorePython
, but on the next page, we will wire up a st.StorePostgres
.
Execute
(product_action, line_item_action) = st.actions(store, graph)
product_action.insert(Product(name="tv", price=3.0))
product_action.insert(Product(name="radio", price=5.0))
line_item_action.insert(
LineItem(basket_id=1, product_name="radio", qty=4),
LineItem(basket_id=1, product_name="tv", qty=1),
LineItem(basket_id=2, product_name="tv", qty=2),
)
product_action.replace(
Product(name="tv", price=3.0),
Product(name="tv", price=4.0),
)
Retrieve
Now let’s retrieve some data from the cache
:
output = cache.zset(store)
print(output)
cache.zset(...)
should return a ZSet[str]
, in this case:
╒═══════════╤═══════════════════════════╕
│ _count_ │ _value_ │
╞═══════════╪═══════════════════════════╡
│ 1 │ Basket id: 2 total: $8.0 │
├───────────┼───────────────────────────┤
│ 1 │ Basket id: 1 total: $24.0 │
╘═══════════╧═══════════════════════════╛
It’s possible to cache with indexes:
_ = cache[receipt_items](
lambda z: st.integrate_indexed(z, indexes=(index_a, ...))
)
Then we can iterate over those values with:
for key, value, count in cache.zset(store).iter_by_index(
index_a,
frozenset((optional, match, values, ...)),
):
...
You may have noticed when looking in your IDE, that product_action.insert(...)
has the type:
(*Product) -> tuple[st.ZSet[st.Pair[Product, LineItem]]]
Every time we call Action.insert(...)
/Action.remove(...)
/Action.replace(...)
, we are returned the ZSet
of changes from running a single iteration. This is useful in the case where we want to do something with the data other than store it in a stepping
cache (putting it on a queue for example).
This also demonstrates how removing/updating data is implemented with ZSet
s (notice the -1
counts):
iteration_output = product_action.replace(
Product(name="tv", price=4.0),
Product(name="tv", price=5.0),
)
print(iteration_output)
╒═══════════╤═══════════════════════════════╤═════════════════════════════════════════════════╕
│ _count_ │ left │ right │
╞═══════════╪═══════════════════════════════╪═════════════════════════════════════════════════╡
│ -1 │ Product(name='tv', price=4.0) │ LineItem(basket_id=2, product_name='tv', qty=2) │
├───────────┼───────────────────────────────┼─────────────────────────────────────────────────┤
│ 1 │ Product(name='tv', price=5.0) │ LineItem(basket_id=2, product_name='tv', qty=2) │
├───────────┼───────────────────────────────┼─────────────────────────────────────────────────┤
│ -1 │ Product(name='tv', price=4.0) │ LineItem(basket_id=1, product_name='tv', qty=1) │
├───────────┼───────────────────────────────┼─────────────────────────────────────────────────┤
│ 1 │ Product(name='tv', price=5.0) │ LineItem(basket_id=1, product_name='tv', qty=1) │
╘═══════════╧═══════════════════════════════╧═════════════════════════════════════════════════╛