Session Window Feature Engineering

Turn a raw stream of messaging events into user sessions -- the same sessionization logic used in clickstream analytics and user behavior modeling.

Problem

Given a JSON-lines file of chat events:

{"event_name": "send_message", "timestamp": "2016-11-08T14:09:57Z", "user_id": "1", "channel_id": "1"}
{"event_name": "send_message", "timestamp": "2016-11-08T14:10:01Z", "user_id": "1", "channel_id": "1"}
...

Group events into sessions per user. A session boundary occurs when the gap between consecutive events for the same user exceeds 30 seconds. For each session, output:

  • Session start/end timestamps
  • User ID
  • Total messages sent
  • Channel with the most messages

Approach

  1. Stream line by line -- no need to load the entire file into memory. Parse each line as JSON.
  2. Track per-user state with two maps:
    • last_event_per_user -- last seen timestamp for session-gap detection
    • channel_to_user_messages -- message counts per channel within the current session
  3. Gap detection: If current_timestamp - last_timestamp > 30s, flush the current session (compute top channel, total messages) and reset the per-user counters.
  4. Write results to a JSON output file.

This is a classic tumbling session window -- the same pattern Spark Structured Streaming and Flink use for sessionization, just implemented manually.

Implementation

import json
from datetime import datetime


def event_sessionize(input_file, output_file):
    total_event_count = 0
    last_event_per_user = {}
    session_start_per_user = {}
    channel_to_user_messages = {}
    sessions = []

    with open(input_file, 'r') as file:
        for line in file:
            if not line.strip():
                continue
            event = json.loads(line)
            total_event_count += 1

            user_id = event['user_id']
            channel_id = event['channel_id']
            timestamp = datetime.fromisoformat(event['timestamp'].replace('Z', '+00:00'))

            if user_id not in channel_to_user_messages:
                channel_to_user_messages[user_id] = {}

            last_ts = last_event_per_user.get(user_id)
            if last_ts and (timestamp - last_ts).total_seconds() > 30:
                # Session boundary -- flush
                top_channel = max(
                    channel_to_user_messages[user_id],
                    key=channel_to_user_messages[user_id].get
                )
                sessions.append({
                    "session_start": session_start_per_user[user_id].isoformat(),
                    "session_end": last_ts.isoformat(),
                    "user_id": user_id,
                    "total_messages": sum(channel_to_user_messages[user_id].values()),
                    "top_channel_id": top_channel,
                    "top_channel_messages": channel_to_user_messages[user_id][top_channel]
                })
                channel_to_user_messages[user_id] = {}

            if user_id not in session_start_per_user or (
                last_ts and (timestamp - last_ts).total_seconds() > 30
            ):
                session_start_per_user[user_id] = timestamp

            channel_to_user_messages[user_id][channel_id] = (
                channel_to_user_messages[user_id].get(channel_id, 0) + 1
            )

            last_event_per_user[user_id] = timestamp

    # Flush remaining sessions
    for user_id in channel_to_user_messages:
        if channel_to_user_messages[user_id]:
            top_channel = max(
                channel_to_user_messages[user_id],
                key=channel_to_user_messages[user_id].get
            )
            sessions.append({
                "session_start": session_start_per_user[user_id].isoformat(),
                "session_end": last_event_per_user[user_id].isoformat(),
                "user_id": user_id,
                "total_messages": sum(channel_to_user_messages[user_id].values()),
                "top_channel_id": top_channel,
                "top_channel_messages": channel_to_user_messages[user_id][top_channel]
            })

    with open(output_file, 'w') as file_out:
        json.dump(sessions, file_out, indent=4)


if __name__ == "__main__":
    event_sessionize('messages.json', 'session_data.json')

Complexity

  • Time: O(n) -- single pass over all events
  • Space: O(u * c) where u = unique users and c = unique channels per user

Takeaway

Sessionization is a core feature engineering step in recommendation systems, churn prediction, and user engagement models. The gap-based session window is simple but effective -- and understanding it from scratch makes it much easier to debug when Spark's session_window function does not behave the way you expect.


Back to Software Design