Meter reads

Problem

We’re an energy supplier, users have meters have reads, we’d like to sum up each meter’s daily usage.

With stepping, we’re able to declaratively describe a query joining and grouping data. When we insert new data, it knows only to recompute what is necessary to update the output. When we’re done inserting, we can easily retreive data back out from an indexed cache.

Setup

We’re going to skip over most of the setup – full details in test_meter_reads.py.

For context, it’s hopefully enough to know that we have:

class User(st.Data):
    user_id: int
    name: str

class Meter(st.Data):
    meter_id: UUID
    user_id: int

class HalfHourlyMeterRead(st.Data):
    meter_id: UUID
    timestamp: datetime
    value: float

class UserMeter(User, Meter):
    ...

class UserMeterRead(User, Meter, HalfHourlyMeterRead):
    date: date

class DailyUsage(st.Data):
    user_id: int
    meter_id: UUID
    date: date
    value: float

def make_user_meter(p: st.Pair[User, Meter]) -> UserMeter: ...
def with_date(p: st.Pair[UserMeter, HalfHourlyMeterRead]) -> UserMeterRead: ...
def to_daily(p: st.Pair[float, tuple[int, UUID, date]]) -> DailyUsage: ...
def pick_value(u: UserMeterRead) -> float: ...

Here’s our test data:

user_1 = User(user_id=1, name="oli")
meter_id_1 = UUID("00000001-b3fb-47ef-b22f-a45ba5b7b645")
meter_id_2 = UUID("00000002-b3fb-47ef-b22f-a45ba5b7b645")
meter_1 = Meter(meter_id=meter_id_1, user_id=1)
half_hourly_reads_1 = [
    HalfHourlyMeterRead(
        meter_id=meter_id_1, timestamp=datetime(2023, 1, 1, 2), value=24.0
    ),
    HalfHourlyMeterRead(
        meter_id=meter_id_1, timestamp=datetime(2023, 1, 1, 3), value=0.0
    ),
    HalfHourlyMeterRead(
        meter_id=meter_id_1, timestamp=datetime(2023, 1, 2, 4), value=12.5
    ),
    HalfHourlyMeterRead(
        meter_id=meter_id_1, timestamp=datetime(2023, 1, 3, 5), value=8.0
    ),
    HalfHourlyMeterRead(
        meter_id=meter_id_1, timestamp=datetime(2023, 1, 2, 6), value=6.0
    ),
    HalfHourlyMeterRead(
        meter_id=meter_id_2, timestamp=datetime(2023, 1, 2, 6), value=100.0
    ),
]

Query

Now let’s write our query:

index_daily = st.Index.pick(DailyUsage, lambda d: d.date)
daily_cache = st.Cache[DailyUsage]()


def query(
    users: st.ZSet[User],
    meters: st.ZSet[Meter],
    reads: st.ZSet[HalfHourlyMeterRead],
) -> st.ZSet[DailyUsage]:
    join_meters = st.join(
        users,
        meters,
        on_left=st.Index.pick(User, lambda u: u.user_id),
        on_right=st.Index.pick(Meter, lambda m: m.user_id),
    )
    join_meters_flat = st.map(join_meters, f=make_user_meter)
    join_reads = st.join(
        join_meters_flat,
        reads,
        on_left=st.Index.pick(UserMeter, lambda p: p.meter_id),
        on_right=st.Index.pick(HalfHourlyMeterRead, lambda m: m.meter_id),
    )
    merged = st.map(join_reads, f=with_date)
    grouped = st.group_reduce_flatten(
        merged,
        by=st.Index.pick(UserMeterRead, lambda f: (f.user_id, f.meter_id, f.date)),
        zero=float,
        pick_value=pick_value
    )
    as_daily = st.map(grouped, f=to_daily)

    _ = daily_cache[as_daily](lambda a: st.integrate_indexed(a, indexes=(index_daily,)))

    return as_daily

Before querying, we set up a cache of the DailyUsage, indexed by date.

In the query itself, we join users to meters to reads.

Then we add the date of each read (from its .timestamp).

Then we group by user_id, meter_id, date, and convert to DailyUsage. This gets stored in the cache.

Insert

Now lets insert some data, then assert that the values we retrieve form the cache make sense:

graph = st.compile(query)
store = st.StorePostgres.from_graph(postgres_conn, graph, create_tables=True)
i_users, i_meters, i_reads = st.actions(store, graph)
i_users.insert(user_1)
i_meters.insert(meter_1)
i_reads.insert(*half_hourly_reads_1)

actual = list(
    daily_cache.zset(store).iter_by_index(
        index_daily, frozenset((date(2023, 1, 2), date(2023, 1, 3)))
    )
)
expected = [
    (
        date(2023, 1, 2),
        DailyUsage(
            user_id=1,
            meter_id=UUID("00000001-b3fb-47ef-b22f-a45ba5b7b645"),
            date=date(2023, 1, 2),
            value=18.5,
        ),
        1,
    ),
    (
        date(2023, 1, 3),
        DailyUsage(
            user_id=1,
            meter_id=UUID("00000001-b3fb-47ef-b22f-a45ba5b7b645"),
            date=date(2023, 1, 3),
            value=8.0,
        ),
        1,
    ),
]
assert actual == expected

Remove

Remember, we can set up a store any time from any process by doing:

graph = st.compile(query)
store = st.StorePostgres.from_graph(postgres_conn, graph, create_tables=False)

Now let’s remove a read, and make sure that the daily value for 2023-01-02 has gone down.

i_reads.remove(half_hourly_reads_1[2])  # remove a read from 2023-01-02
actual = list(
    daily_cache.zset(store).iter_by_index(
        index_daily, frozenset((date(2023, 1, 2), date(2023, 1, 3)))
    )
)
expected = [
    (
        date(2023, 1, 2),
        DailyUsage(
            user_id=1,
            meter_id=UUID("00000001-b3fb-47ef-b22f-a45ba5b7b645"),
            date=date(2023, 1, 2),
            value=6.0,
        ),
        1,
    ),
    (
        date(2023, 1, 3),
        DailyUsage(
            user_id=1,
            meter_id=UUID("00000001-b3fb-47ef-b22f-a45ba5b7b645"),
            date=date(2023, 1, 3),
            value=8.0,
        ),
        1,
    ),
]
assert actual == expected