Session Window Feature Engineering¶
Turn a raw stream of messaging events into user sessions -- the same sessionization logic used in clickstream analytics and user behavior modeling.
Problem¶
Given a JSON-lines file of chat events:
{"event_name": "send_message", "timestamp": "2016-11-08T14:09:57Z", "user_id": "1", "channel_id": "1"}
{"event_name": "send_message", "timestamp": "2016-11-08T14:10:01Z", "user_id": "1", "channel_id": "1"}
...
Group events into sessions per user. A session boundary occurs when the gap between consecutive events for the same user exceeds 30 seconds. For each session, output:
- Session start/end timestamps
- User ID
- Total messages sent
- Channel with the most messages
Approach¶
- Stream line by line -- no need to load the entire file into memory. Parse each line as JSON.
- Track per-user state with two maps:
last_event_per_user-- last seen timestamp for session-gap detectionchannel_to_user_messages-- message counts per channel within the current session
- Gap detection: If
current_timestamp - last_timestamp > 30s, flush the current session (compute top channel, total messages) and reset the per-user counters. - Write results to a JSON output file.
This is a classic tumbling session window -- the same pattern Spark Structured Streaming and Flink use for sessionization, just implemented manually.
Implementation¶
import json
from datetime import datetime
def event_sessionize(input_file, output_file):
total_event_count = 0
last_event_per_user = {}
session_start_per_user = {}
channel_to_user_messages = {}
sessions = []
with open(input_file, 'r') as file:
for line in file:
if not line.strip():
continue
event = json.loads(line)
total_event_count += 1
user_id = event['user_id']
channel_id = event['channel_id']
timestamp = datetime.fromisoformat(event['timestamp'].replace('Z', '+00:00'))
if user_id not in channel_to_user_messages:
channel_to_user_messages[user_id] = {}
last_ts = last_event_per_user.get(user_id)
if last_ts and (timestamp - last_ts).total_seconds() > 30:
# Session boundary -- flush
top_channel = max(
channel_to_user_messages[user_id],
key=channel_to_user_messages[user_id].get
)
sessions.append({
"session_start": session_start_per_user[user_id].isoformat(),
"session_end": last_ts.isoformat(),
"user_id": user_id,
"total_messages": sum(channel_to_user_messages[user_id].values()),
"top_channel_id": top_channel,
"top_channel_messages": channel_to_user_messages[user_id][top_channel]
})
channel_to_user_messages[user_id] = {}
if user_id not in session_start_per_user or (
last_ts and (timestamp - last_ts).total_seconds() > 30
):
session_start_per_user[user_id] = timestamp
channel_to_user_messages[user_id][channel_id] = (
channel_to_user_messages[user_id].get(channel_id, 0) + 1
)
last_event_per_user[user_id] = timestamp
# Flush remaining sessions
for user_id in channel_to_user_messages:
if channel_to_user_messages[user_id]:
top_channel = max(
channel_to_user_messages[user_id],
key=channel_to_user_messages[user_id].get
)
sessions.append({
"session_start": session_start_per_user[user_id].isoformat(),
"session_end": last_event_per_user[user_id].isoformat(),
"user_id": user_id,
"total_messages": sum(channel_to_user_messages[user_id].values()),
"top_channel_id": top_channel,
"top_channel_messages": channel_to_user_messages[user_id][top_channel]
})
with open(output_file, 'w') as file_out:
json.dump(sessions, file_out, indent=4)
if __name__ == "__main__":
event_sessionize('messages.json', 'session_data.json')
Complexity¶
- Time: O(n) -- single pass over all events
- Space: O(u * c) where u = unique users and c = unique channels per user
Takeaway¶
Sessionization is a core feature engineering step in recommendation systems, churn prediction, and user engagement models. The gap-based session window is simple but effective -- and understanding it from scratch makes it much easier to debug when Spark's session_window function does not behave the way you expect.