Skip to content

Session Window Feature Engineering

Turn a raw stream of messaging events into user sessions, the same sessionization logic used in clickstream analytics and user behavior modeling.

Problem

Given a JSON-lines file of chat events:

{"event_name": "send_message", "timestamp": "2016-11-08T14:09:57Z", "user_id": "1", "channel_id": "1"}
{"event_name": "send_message", "timestamp": "2016-11-08T14:10:01Z", "user_id": "1", "channel_id": "1"}
...

Group events into sessions per user. A session boundary occurs when the gap between consecutive events for the same user exceeds 30 seconds. For each session, output:

  • Session start/end timestamps
  • User ID
  • Total messages sent
  • Channel with the most messages

Approach

  1. Stream line by line, no need to load the entire file into memory. Parse each line as JSON.
  2. Track per-user state with two maps: last_event_per_user for the last seen timestamp (used in gap detection) and channel_to_user_messages for message counts per channel within the current session.
  3. Gap detection: if current_timestamp - last_timestamp > 30s, flush the current session (compute top channel and total messages) and reset the per-user counters.
  4. Write the results to a JSON output file.

This is the classic tumbling session window, the same pattern Spark Structured Streaming and Flink use for sessionization, just implemented manually.

Implementation

import json
from datetime import datetime


def event_sessionize(input_file, output_file):
    total_event_count = 0
    last_event_per_user = {}
    session_start_per_user = {}
    channel_to_user_messages = {}
    sessions = []

    with open(input_file, 'r') as file:
        for line in file:
            if not line.strip():
                continue
            event = json.loads(line)
            total_event_count += 1

            user_id = event['user_id']
            channel_id = event['channel_id']
            timestamp = datetime.fromisoformat(event['timestamp'].replace('Z', '+00:00'))

            if user_id not in channel_to_user_messages:
                channel_to_user_messages[user_id] = {}

            last_ts = last_event_per_user.get(user_id)
            if last_ts and (timestamp - last_ts).total_seconds() > 30:
                # Session boundary, flush
                top_channel = max(
                    channel_to_user_messages[user_id],
                    key=channel_to_user_messages[user_id].get
                )
                sessions.append({
                    "session_start": session_start_per_user[user_id].isoformat(),
                    "session_end": last_ts.isoformat(),
                    "user_id": user_id,
                    "total_messages": sum(channel_to_user_messages[user_id].values()),
                    "top_channel_id": top_channel,
                    "top_channel_messages": channel_to_user_messages[user_id][top_channel]
                })
                channel_to_user_messages[user_id] = {}

            if user_id not in session_start_per_user or (
                last_ts and (timestamp - last_ts).total_seconds() > 30
            ):
                session_start_per_user[user_id] = timestamp

            channel_to_user_messages[user_id][channel_id] = (
                channel_to_user_messages[user_id].get(channel_id, 0) + 1
            )

            last_event_per_user[user_id] = timestamp

    # Flush remaining sessions
    for user_id in channel_to_user_messages:
        if channel_to_user_messages[user_id]:
            top_channel = max(
                channel_to_user_messages[user_id],
                key=channel_to_user_messages[user_id].get
            )
            sessions.append({
                "session_start": session_start_per_user[user_id].isoformat(),
                "session_end": last_event_per_user[user_id].isoformat(),
                "user_id": user_id,
                "total_messages": sum(channel_to_user_messages[user_id].values()),
                "top_channel_id": top_channel,
                "top_channel_messages": channel_to_user_messages[user_id][top_channel]
            })

    with open(output_file, 'w') as file_out:
        json.dump(sessions, file_out, indent=4)


if __name__ == "__main__":
    event_sessionize('messages.json', 'session_data.json')

Complexity

  • Time: O(n), single pass over all events
  • Space: O(u * c) where u = unique users and c = unique channels per user

Takeaway

Sessionization is a core feature engineering step in recommendation systems, churn prediction, and user engagement models. The gap-based session window is simple but effective, and understanding it from scratch makes it much easier to debug when Spark's session_window function does not behave the way you expect.


Back to Software Design