jml's notebook

Maintaining a user sessions table

Heads up: I have no idea what I'm doing. I'm a plain old programmer figuring out how to do a data engineer's job. This post does not contain any answers, and if it does, it's unlikely that they are good answers.

Further, I'm convinced that this problem has been solved hundreds of times over already, and I just don't know the right places to look to find the prior art. I feel like a bozo for thinking so much about this.


At work, we're moving from Redshift to BigQuery. We have a bunch of tables in Redshift that are transformations from our raw data into something analysts can use. There are a bunch of jobs in Airflow that maintain these tables. I have to port these jobs from Redshift to BigQuery.

I'm currently stuck on our user_sessions job. This takes the raw event data from Segment and turns it into user sessions. The logic for doing this is relatively straightfoward and well-known. There's a great post on the Segment blog and another good post I found by searching.

The problem isn't figuring out how to group a stream of event data into arbitrarily defined sessions. The problem is figuring out how to keep a session table in BigQuery up to date.

Some context:

When you create a BigQuery destination for Segment, they create a bunch of partitioned tables. This partitioning matters a lot because of BigQuery's pricing model. Basically, if you query a column, you pay for reading the entire column unless:

  1. the table is partitioned
  2. your query explicitly filters by partition

So it's great that Segment makes partitioned tables.

The Segment tables are notionally ingestion-time partitioned, but our experiments have shown that this effectively means they are partitioned by the received_at timestamp.

Each Segment event has five different timestamps. received_at is the obvious choice for this use case, as it's the time that Segment received the event, according to whatever clock they use for such things.

Unfortunately, for sessions, it makes way more sense to use the timestamp field, as that is Segment's best guess as to what the time really was on the user's device when they triggered the event.

timestamp and received_at usually differ by less than 30 seconds, as that's roughly how long Segment buffers events before sending them. However, they can differ by up to several months. Imagine a user is on the app on a trip to Antarctica, and the events only get sent when they arrive back in Hobart.

So why is all this a problem?

I want to run a job each day that looks at the events we've received, and updates the session table accordingly.

When I say "events we've received", what I really mean is a single partition of our raw BigQuery events tables, because that's basically the unit of operation.

Say we're looking at the partition for 2019-11-01. Most of the events will have a timestamp on 2019-11-01, or in the last thirty seconds of 2019-10-31. However, a large number (up to 5%), will have events from earlier in the year. Some might even have events with a timestamp from the future, but I'm happy to ignore those as rubbish.

There are two problems as I see it:

  1. Chunking by day wrongly breaks sessions in half
  2. Old events need to integrated into existing sessions

Chunking by day breaks sessions in half

For now, let's pretend that received_at and timestamp are always identical.

A pretty common way of defining a session boundary is to say that 30 minutes activity means the session ended.

So, say you are doing some late-night Halloween shopping and have one event at 2019-10-31 23:59 and the next event at 2019-11-01 00:01. Those two events should be part of the same session.

But if our job looks at events in day-sized batches, then it won't even get a chance to consider whether the 2019-10-31 23:59 event is in the same session as the 2019-11-01 00:01 event. Instead, you'll end up with two sessions, one at the end of 2019-10-31 and one at the beginning of 2019-11-01.

The obvious response to this is to expand the batch size. If you get these problems by looking at just one day at a time, why not look at two days at a time?

Let's walk this through.

When we process the [2019-10-31, 2019-11-01] batch, we'll correctly create a session that include both your pre-midnight and post-midnight events. We'll store this session in the 2019-10-31 partition of our sessions table, because we partition by start time.

The next day, when we process the [2019-11-01, 2019-11-02] batch, we'll wrongly identify that you started a session just after midnight on 2019-11-01. We'll store this session in the 2019-11-01 partition of our sessions table, thus giving us a second, false session.

We get this same problem in reverse for a session that starts late on 2019-11-01 and finishes early on 2019-11-02. The [2019-10-31, 2019-11-01] batch will create a wrong, short session and the [2019-11-01, 2019-01-02] batch will create a correct, longer session.

However, we could correct this with a second pass. It would be relatively straightforward to identify sessions which were wholly contained in other sessions, and to then delete them. If we make (and enforce!) some assumptions about maximum session length, then we can even do it without scanning the whole sessions table. At a minimum, to make sure 2019-11-01 is correct, we have to scan 2019-10-31 and 2019-11-02.

I don't think you can get away without a second, mutatey pass though. I'd love to be wrong about this.

Integrating old events into existing sessions

This is trickier.

Let's pretend the chunking-by-day problem doesn't exist, since we already have something that kind of works, even if it's ugly.

We're looking at the batch of events we received on 2019-11-01. Most of them are for 2019-11-01, which is great. However, about 5% of them weren't. They will be for arbitrary days in the past, perhaps even as far back as 2018.

Let's say we've discovered an event on 2019-10-23

Our session table looks like this:

| sessions       |
|----------------|
| user_id        |
| session_number |
| start_time     |
| end_time       |
| num_events     |

And for a particular user on 2019-10-23, you might see data that looks like this:

| session_number | start_time | end_time | num_events |
|----------------|-----------:|---------:|-----------:|
| 1              |      09:21 |    09:30 |         10 |
| 2              |      10:05 |    10:23 |         15 |
| 3              |      13:25 |    14:10 |         20 |

Case 1: merging sessions

Say the event for this user was for 09:45, and it was a single event.

The right thing to do would be to realise that sessions 1 and 2 should be merged, like so:

| session_number | start_time | end_time | num_events |
|----------------|-----------:|---------:|-----------:|
| 1              |      09:21 |    10:23 |         26 |
| 2              |      13:25 |    14:10 |         20 |

Case 2: falling within sessions

If the event was for 09:23, then we don't want to make any any changes to the sessions, just to bump the number of events in the session it lands in.

| session_number | start_time | end_time | num_events |
|----------------|-----------:|---------:|-----------:|
| 1              |      09:21 |    09:30 |         11 |
| 2              |      10:05 |    10:23 |         15 |
| 3              |      13:25 |    14:10 |         20 |

Case 3: new session

If the event was for 11:15, then we make a new session and bump the session numbers.

| session_number | start_time | end_time | num_events |
|----------------|-----------:|---------:|-----------:|
| 1              |      09:21 |    09:30 |         10 |
| 2              |      10:05 |    10:23 |         15 |
| 3              |      11:15 |    11:15 |          1 |
| 4              |      13:25 |    14:10 |         20 |

Case 4: beginning of day

If the new event is for 00:01, and there was a session ending 23:59 the previous day, it should be merged into that session, not start a new one.

Case 5: end of day

Likewise, if the new event is for 23:59 and there's a session beginning the next day at 00:01, that session should be adjust to be dragged in to start at 2019-10-23 23:59.

Bringing it together

Our source data is in a table partitioned by received_at.

Each day, we want to load the relevant columns from that table into a different table partitioned by timestamp.

Separately, we want to maintain a mapping from received_at days to timestamp days. That is, each time we process a partition by received_at, we want to record which timestamp partitions that might affect.

Then, for each of those timestamp partitions, each of the preceding days and each of the following days, we rebuild the sessions table, itself partitioned by start_timestamp.

At this point, we'll have handled cases 1-5 above, but have the spurious truncated sessions problem. So, for all the dates we rebuilt and for all the dates immediately following or preceding them, we run a query to identify nested sessions and remove them.

This smells a little janky, especially that last step. Still, it's the best I've got.

Other notes

Process

This would have been a lot easier to figure out if I could have just written unit tests.

You can sort of tell from the writing above that I've been trying to think about this using concrete examples. Being able to write those down in code and execute them would have helped me a great deal.

I could probably have done this with BigQuery, except that a lot of the time I had to think about this I was offline.

All of this is complex enough that I really ought to write tests when I write the code for real.

Complexity

This is way too complex for my liking. This is not uncommon with solutions to problems that I come up with in isolation. I would really value talking to someone else about this.

Originality

I honestly cannot be the first person to have these problems. Where are the other solutions?

Purity

Conceptually, this whole thing could be done more simply as a pure transformation from the source event data to the session data as in the linked blog posts. If I wanted to, I could trash our whole sessions table every day and rebuild it from scratch. That would be much simpler.

Unfortunately, BigQuery charges $5 per terabyte scanned, and we have enough terabytes of data to make daily reads uncomfortable. Hence, this more complicated, mutatey approach.

Pragmatic solutions

A much simpler "solution" would be to always rebuild the last 60 days of data, or similar. We'd have problems with data before the 60 day boundary, and it would cost around sixty times more than strictly necessary, but it would be better than what we have today.

I don't know enough about how this data is used to know whether it's a worthwhile trade-off. This has been a perennial problem with my brief career as a data engineer.

Build systems

The more I think about this sort of thing, the more I think that data engineering has a lot to learn from build systems. I keep meaning to re-read Build Systems a la Carte knowing what I know now about Airflow and data pipelines.

Perhaps some day I'll have the time and energy to do so.

Keywords: ETL, Airflow, BigQuery, sessions, Segment