Ingest-Transform-Serve Pipeline

A feed processor that ingests XML files on a schedule, converts them to JSON, stores de-normalized records in SQLite, and serves them via REST API. This is a miniature version of the ingest-transform-serve pattern common in ML feature pipelines.

Problem

XML product feeds land in a file store periodically. The system needs to:

  1. Detect and ingest new files on a configurable schedule.
  2. Parse XML to JSON and persist de-normalized records.
  3. Expose the data through a RESTful API for search, CRUD, and listing.

Approach

Three decoupled subsystems, each independently scalable:

  1. Ingestion scheduler — APScheduler runs a background job at a fixed interval, iterating over XML files via a generator pattern. Each file is parsed with xmltodict and saved to the database. The iterator/generator approach keeps memory flat regardless of file count.

  2. Database layer — SQLAlchemy ORM with SQLite. A Data model stores the de-normalized JSON along with metadata. Marshmallow handles serialization.

  3. REST API — Flask-RESTful with endpoints for search, list, get-by-id, create, update, and delete. Each operation maps to a dedicated Resource class.

Scalability path: The subsystems are designed to scale independently. The scheduler can move to a separate process behind a message queue (Kafka, SQS) for high-volume ingestion. The API layer can sit behind a load balancer. Ingested data can be cached out-of-process (Redis, Memcached).

Implementation

Data model and schema:

engine = create_engine(
    f"sqlite:///{settings.DATABASE_CONNECTION_STRING}",
    connect_args={"check_same_thread": False},
)
db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))
Base = declarative_base()
Base.query = db_session.query_property()

class ProductSchema(Schema):
    id = fields.Integer()
    upc = fields.Str()
    name = fields.Str()
    category = fields.Str()
    image_url = fields.Str()

class Product(Base):
    __tablename__ = "products"
    id = Column(Integer, primary_key=True)
    upc = Column(String(25))
    name = Column(String(100))
    category = Column(String(25))
    image_url = Column(String(255))

    def __init__(self, data=None):
        if data:
            self.upc = data.get("upc")
            self.name = data.get("name")
            self.id = data.get("id")

    def get_dict(self):
        return json.loads(ProductSchema().dumps(self))

    def save(self):
        db_session.add(self)
        db_session.commit()

    @classmethod
    def get_by_upc(cls, upc):
        return cls.query.filter(cls.upc == upc).all()

REST API resources:

class SearchProduct(Resource):
    def post(self):
        json_data = request.get_json(force=True)
        products = Product.search_by_key(json_data.get("key"), json_data.get("value"))
        return {"result": [item.get_dict() for item in products], "status": "success"}, 200

class GetProductList(Resource):
    def get(self):
        products = [item.get_dict() for item in Product.query.all()]
        return {"result": products, "status": "success"}, 200

class GetProduct(Resource):
    def get(self, id):
        product = Product.query.get(id)
        if product:
            return {"result": product.get_dict(), "status": "success"}, 200
        return {"message": "Product not found", "status": "error"}, 404

class HttpApi:
    def __init__(self):
        self.app = Flask("HttpApi")
        self.api = Api(self.app)
        self.api.add_resource(SearchProduct, "/api/products/search/")
        self.api.add_resource(CreateProduct, "/api/products/")
        self.api.add_resource(GetProductList, "/api/products")
        self.api.add_resource(GetProduct, "/api/products/<int:id>")
        self.api.add_resource(DeleteProduct, "/api/products/<int:id>")
        self.api.add_resource(UpdateProduct, "/api/products/<int:id>")

Ingestion scheduler — background job using APScheduler with generator-based file iteration:

def xml_to_product(xml_file):
    with open(xml_file, 'r') as file:
        xml_content = file.read()
        json_data = xmltodict.parse(xml_content)

        for item in json_data[consts.CATALOG][consts.PRODUCTS][consts.PRODUCT]:
            upc = item[consts.UPC]
            if not Product.get_by_upc(upc):
                product = Product()
                product.name = item[consts.NAME]
                product.upc = upc
                product.category = item[consts.CATEGORY]
                product.image_url = item.get(consts.IMAGE_URL, "")
                product.save()

def ingest_files():
    xml_files = glob.iglob(os.path.join(settings.FILE_STORE, '*.xml'))
    for xml_file in xml_files:
        xml_to_product(xml_file)

class Scheduler:
    def setup_ingestion_scheduler(self):
        jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///db_scheduler.sqlite')}
        executors = {'default': ThreadPoolExecutor(settings.INGESTION_WORKERS)}
        self.scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors)
        self.scheduler.remove_all_jobs()
        self.scheduler.add_job(ingest_files, 'interval', seconds=settings.INGESTION_INTERVAL)

    def start(self):
        self.scheduler.start()

Subsystem orchestrator:

class SubSystem:
    def start_sub_systems(self):
        init_db()

        if settings.START_INGESTION:
            self.scheduler = Scheduler()
            self.scheduler.setup_ingestion_scheduler()
            self.scheduler.start()

        self.api = HttpApi()
        self.api.app.run(host=settings.HOST, port=settings.PORT, debug=settings.DEBUG)

Takeaway

This is the ingest-transform-serve pattern at small scale. Swap XML for Kafka events and SQLite for Postgres and you have most production data pipelines.


Back to Microservices & APIs