Session Window Feature Engineering¶
Turn a raw stream of messaging events into user sessions, the same sessionization logic used in clickstream analytics and user behavior modeling.
Problem¶
Given a JSON-lines file of chat events:
{"event_name": "send_message", "timestamp": "2016-11-08T14:09:57Z", "user_id": "1", "channel_id": "1"}
{"event_name": "send_message", "timestamp": "2016-11-08T14:10:01Z", "user_id": "1", "channel_id": "1"}
...
Group events into sessions per user. A session boundary occurs when the gap between consecutive events for the same user exceeds 30 seconds. For each session, output:
- Session start/end timestamps
- User ID
- Total messages sent
- Channel with the most messages
Approach¶
- Stream line by line, no need to load the entire file into memory. Parse each line as JSON.
- Track per-user state with two maps:
last_event_per_userfor the last seen timestamp (used in gap detection) andchannel_to_user_messagesfor message counts per channel within the current session. - Gap detection: if
current_timestamp - last_timestamp > 30s, flush the current session (compute top channel and total messages) and reset the per-user counters. - Write the results to a JSON output file.
This is the classic tumbling session window, the same pattern Spark Structured Streaming and Flink use for sessionization, just implemented manually.
Implementation¶
import json
from datetime import datetime
def event_sessionize(input_file, output_file):
total_event_count = 0
last_event_per_user = {}
session_start_per_user = {}
channel_to_user_messages = {}
sessions = []
with open(input_file, 'r') as file:
for line in file:
if not line.strip():
continue
event = json.loads(line)
total_event_count += 1
user_id = event['user_id']
channel_id = event['channel_id']
timestamp = datetime.fromisoformat(event['timestamp'].replace('Z', '+00:00'))
if user_id not in channel_to_user_messages:
channel_to_user_messages[user_id] = {}
last_ts = last_event_per_user.get(user_id)
if last_ts and (timestamp - last_ts).total_seconds() > 30:
# Session boundary, flush
top_channel = max(
channel_to_user_messages[user_id],
key=channel_to_user_messages[user_id].get
)
sessions.append({
"session_start": session_start_per_user[user_id].isoformat(),
"session_end": last_ts.isoformat(),
"user_id": user_id,
"total_messages": sum(channel_to_user_messages[user_id].values()),
"top_channel_id": top_channel,
"top_channel_messages": channel_to_user_messages[user_id][top_channel]
})
channel_to_user_messages[user_id] = {}
if user_id not in session_start_per_user or (
last_ts and (timestamp - last_ts).total_seconds() > 30
):
session_start_per_user[user_id] = timestamp
channel_to_user_messages[user_id][channel_id] = (
channel_to_user_messages[user_id].get(channel_id, 0) + 1
)
last_event_per_user[user_id] = timestamp
# Flush remaining sessions
for user_id in channel_to_user_messages:
if channel_to_user_messages[user_id]:
top_channel = max(
channel_to_user_messages[user_id],
key=channel_to_user_messages[user_id].get
)
sessions.append({
"session_start": session_start_per_user[user_id].isoformat(),
"session_end": last_event_per_user[user_id].isoformat(),
"user_id": user_id,
"total_messages": sum(channel_to_user_messages[user_id].values()),
"top_channel_id": top_channel,
"top_channel_messages": channel_to_user_messages[user_id][top_channel]
})
with open(output_file, 'w') as file_out:
json.dump(sessions, file_out, indent=4)
if __name__ == "__main__":
event_sessionize('messages.json', 'session_data.json')
Complexity¶
- Time: O(n), single pass over all events
- Space: O(u * c) where u = unique users and c = unique channels per user
Takeaway¶
Sessionization is a core feature engineering step in recommendation systems, churn prediction, and user engagement models. The gap-based session window is simple but effective, and understanding it from scratch makes it much easier to debug when Spark's session_window function does not behave the way you expect.