Ingest-Transform-Serve Pipeline¶
A feed processor that ingests XML files on a schedule, converts them to JSON, stores de-normalized records in SQLite, and serves them via REST API. This is a miniature version of the ingest-transform-serve pattern common in ML feature pipelines.
Problem¶
XML product feeds land in a file store periodically. The system needs to:
- Detect and ingest new files on a configurable schedule.
- Parse XML to JSON and persist de-normalized records.
- Expose the data through a RESTful API for search, CRUD, and listing.
Approach¶
Three decoupled subsystems, each independently scalable:
-
Ingestion scheduler — APScheduler runs a background job at a fixed interval, iterating over XML files via a generator pattern. Each file is parsed with
xmltodictand saved to the database. The iterator/generator approach keeps memory flat regardless of file count. -
Database layer — SQLAlchemy ORM with SQLite. A
Datamodel stores the de-normalized JSON along with metadata. Marshmallow handles serialization. -
REST API — Flask-RESTful with endpoints for search, list, get-by-id, create, update, and delete. Each operation maps to a dedicated
Resourceclass.
Scalability path: The subsystems are designed to scale independently. The scheduler can move to a separate process behind a message queue (Kafka, SQS) for high-volume ingestion. The API layer can sit behind a load balancer. Ingested data can be cached out-of-process (Redis, Memcached).
Implementation¶
Data model and schema:
engine = create_engine(
f"sqlite:///{settings.DATABASE_CONNECTION_STRING}",
connect_args={"check_same_thread": False},
)
db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))
Base = declarative_base()
Base.query = db_session.query_property()
class ProductSchema(Schema):
id = fields.Integer()
upc = fields.Str()
name = fields.Str()
category = fields.Str()
image_url = fields.Str()
class Product(Base):
__tablename__ = "products"
id = Column(Integer, primary_key=True)
upc = Column(String(25))
name = Column(String(100))
category = Column(String(25))
image_url = Column(String(255))
def __init__(self, data=None):
if data:
self.upc = data.get("upc")
self.name = data.get("name")
self.id = data.get("id")
def get_dict(self):
return json.loads(ProductSchema().dumps(self))
def save(self):
db_session.add(self)
db_session.commit()
@classmethod
def get_by_upc(cls, upc):
return cls.query.filter(cls.upc == upc).all()
REST API resources:
class SearchProduct(Resource):
def post(self):
json_data = request.get_json(force=True)
products = Product.search_by_key(json_data.get("key"), json_data.get("value"))
return {"result": [item.get_dict() for item in products], "status": "success"}, 200
class GetProductList(Resource):
def get(self):
products = [item.get_dict() for item in Product.query.all()]
return {"result": products, "status": "success"}, 200
class GetProduct(Resource):
def get(self, id):
product = Product.query.get(id)
if product:
return {"result": product.get_dict(), "status": "success"}, 200
return {"message": "Product not found", "status": "error"}, 404
class HttpApi:
def __init__(self):
self.app = Flask("HttpApi")
self.api = Api(self.app)
self.api.add_resource(SearchProduct, "/api/products/search/")
self.api.add_resource(CreateProduct, "/api/products/")
self.api.add_resource(GetProductList, "/api/products")
self.api.add_resource(GetProduct, "/api/products/<int:id>")
self.api.add_resource(DeleteProduct, "/api/products/<int:id>")
self.api.add_resource(UpdateProduct, "/api/products/<int:id>")
Ingestion scheduler — background job using APScheduler with generator-based file iteration:
def xml_to_product(xml_file):
with open(xml_file, 'r') as file:
xml_content = file.read()
json_data = xmltodict.parse(xml_content)
for item in json_data[consts.CATALOG][consts.PRODUCTS][consts.PRODUCT]:
upc = item[consts.UPC]
if not Product.get_by_upc(upc):
product = Product()
product.name = item[consts.NAME]
product.upc = upc
product.category = item[consts.CATEGORY]
product.image_url = item.get(consts.IMAGE_URL, "")
product.save()
def ingest_files():
xml_files = glob.iglob(os.path.join(settings.FILE_STORE, '*.xml'))
for xml_file in xml_files:
xml_to_product(xml_file)
class Scheduler:
def setup_ingestion_scheduler(self):
jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///db_scheduler.sqlite')}
executors = {'default': ThreadPoolExecutor(settings.INGESTION_WORKERS)}
self.scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors)
self.scheduler.remove_all_jobs()
self.scheduler.add_job(ingest_files, 'interval', seconds=settings.INGESTION_INTERVAL)
def start(self):
self.scheduler.start()
Subsystem orchestrator:
class SubSystem:
def start_sub_systems(self):
init_db()
if settings.START_INGESTION:
self.scheduler = Scheduler()
self.scheduler.setup_ingestion_scheduler()
self.scheduler.start()
self.api = HttpApi()
self.api.app.run(host=settings.HOST, port=settings.PORT, debug=settings.DEBUG)
Takeaway¶
This is the ingest-transform-serve pattern at small scale. Swap XML for Kafka events and SQLite for Postgres and you have most production data pipelines.