Design & Architecture
Rowsmyth generates relational test and seed datasets as Spark DataFrames row by row, with referential integrity between tables. Row generation runs in the driver; the library registers temp views in the active dataset session. Writing to Unity Catalog (or elsewhere) is your responsibility.
Influences
| Idea | Source |
|---|---|
| Create a declarative base, subclass it, auto-register in a catalog | SQLAlchemy declarative bases |
| Faker, weights, column ergonomics | dbldatagen (but rowsmyth is not vectorized and supports cross-table FKs) |
dbldatagen optimises for throughput on a single DataFrame. Rowsmyth trades that for per-row control and real parent/child relationships.
Public API
Import from rowsmyth:
| Symbol | Role |
|---|---|
declarative_base |
Creates a scoped declarative base with its own registry |
Model |
Abstract model machinery inherited by declarative bases |
variant |
Decorator for named partial row overrides |
Factory |
Fluent builder (Model.factory() is the usual entry) |
RowCtx |
Per-row context in generator() and variants |
Dataset |
Object yielded by Base.dataset() (spark, dataframes, faker, random, seed) |
Pool |
Spark-backed distinct values from a temp view (choice, sample) |
RowsmythError and subclasses |
Domain errors for invalid bases, schemas, factories, variants, pools and dataset lookups |
Model.create() and Factory.create() must run inside Base.dataset(...) for the same declarative base. Calling either outside raises DatasetContextError; using a model from another base raises WrongDeclarativeBaseError.
Package layout
src/rowsmyth/
model.py Model, declarative_base(), variant, registry, fqn(), metadata helpers
factory.py Factory - fluent API and create()
dataset.py Dataset, RowCtx, active dataset context
resolution.py FK resolution, row value resolution, validation
pool.py Pool and deferred pool tokens
errors.py Rowsmyth exception hierarchy
Defining a table
Create a base with declarative_base(), subclass it, declare schema metadata on the class and implement generator() for one row.
from pyspark.sql.types import LongType, StringType, StructField, StructType
from rowsmyth import declarative_base, variant
Base = declarative_base()
class Role(Base):
__table_name__ = "roles"
__primary_key__ = ("id",)
__definition__ = StructType([
StructField("id", LongType(), False),
StructField("name", StringType(), False),
])
def generator(self, ctx) -> dict:
return {
"id": ctx.sequence(),
"name": ctx.random.choice(["admin", "user", "guest"]),
}
class User(Base):
__table_name__ = "users"
__catalog__ = "main"
__schema__ = "app"
__comment__ = "Application users"
__primary_key__ = ("id",)
__table_tags__ = {"layer": "silver", "pii": "true"}
__definition__ = StructType([
StructField("id", LongType(), False),
StructField("role_id", LongType(), False, metadata={"comment": "FK -> roles.id"}),
StructField("full_name", StringType(), False),
StructField("email", StringType(), False),
StructField("status", StringType(), False),
])
def generator(self, ctx) -> dict:
first = ctx.faker.first_name()
last = ctx.faker.last_name()
return {
"id": ctx.sequence(),
"role_id": Role.factory(),
"full_name": f"{first} {last}",
"email": ctx.faker.unique.ascii_email(),
"status": ctx.random.choices(["active", "inactive"], weights=[9, 1])[0],
}
@variant
def churned(self, ctx) -> dict:
return {"status": "inactive"}
Because each row is a single dict, columns that depend on each other use normal local variables (first / last above). There is no column-ordering DSL.
Class attributes
| Attribute | Required | Purpose |
|---|---|---|
__table_name__ |
yes | Registry key and temp-view name |
__definition__ |
yes | StructType - types, nullability, UC column metadata |
__primary_key__ |
yes | Tuple of PK column names (one or more) |
__catalog__, __schema__ |
no | Used by Model.fqn() |
__comment__ |
no | Table comment for UC / Lakeflow |
__table_tags__ |
no | {key: value} for UC table tags |
__expectations__ |
no | {name: sql} dict for Lakeflow data quality expectations (expect_all_or_fail) |
Registry and abstract bases
On subclass, if __table_name__ is set, rowsmyth validates that primary-key columns exist in the Spark schema, registers the class in Base.registry[__table_name__] and collects inherited plus locally declared @variant methods into _variants.
If __table_name__ is omitted (mixin or abstract intermediate base), the class is not registered. Concrete children still register normally.
Factory API
User.factory()
.count(10)
.variant("churned") # UnknownVariantError if unknown
.where(status="active") # scalars, Factory FKs, or callables
.has(Post.factory().count(3), via="author_id")
.create() # list[User] + session DataFrames/temp views
| Method | Behaviour |
|---|---|
count(n) |
Rows to generate for this table; 0 is a no-op, negative/non-integer values raise FactoryError |
variant(name) |
Merge partial dict from @variant method |
where(**kwargs) |
Overrides; merged after variant |
has(child, via=None) |
For each parent row, generate child rows with parent injected |
create() |
Materialise all touched tables; register createOrReplaceTempView(__table_name__); return root-table instances |
.create() returns concrete model objects for the root factory table. DataFrames for every touched table (parents created for FKs, children from .has() and the root table itself) are stored on the active Dataset as dataset.dataframes and are available via dataset.dataframe(name).
Static row creation
Use Model.create(**cols) for reference data that should already exist before factories sample from a pool:
with Base.dataset(spark, seed=42) as dataset:
admin = Role.create(name="admin")
user = Role.create(name="user")
users = User.factory().count(10).create()
role_ids = {admin.id, user.id}
assert all(created_user.role_id in role_ids for created_user in users)
users_df = dataset.dataframe("users")
Model.create() builds one row through the same generator(ctx), sequence, callable, FK resolution and validation pipeline as factories. Explicit columns override generator defaults, so Role.create(name="admin") can rely on generator() for the id sequence.
Row materialisation pipeline
For each row, the factory materialisation pipeline runs:
attrs = table().generator(ctx)- If a variant is selected:
attrs.update(variant_method(table_instance, ctx)) attrs.update(factory._where)-.where()wins over generator and variantctx.row = attrs(may still containFactoryinstances or callables)- Resolve each value in
attrs: Factory→ FK resolution (see below); slot = column namecallable→value(ctx); siblings visible onctx.row- Validate the full row against the Spark schema: no unknown columns, every NOT NULL column present, and no
Nonevalues for NOT NULL fields
Variants are bound methods: def churned(self, ctx) -> dict.
Foreign keys and referential integrity
Factory as column value (single-column FK)
Return a parent Factory as the column value. Resolution uses slot = column name (or injected slot from .has(..., via=...)):
- If a model object exists in
ctx._parentsfor that slot → use its primary key - Otherwise → create one parent row (recursive), cache it, use its PK
"role_id": Role.factory()
Restriction: the parent table must have a single-column primary key. For compound keys, resolve_fk raises CompoundPrimaryKeyError with a message to use ctx.parent() instead.
ctx.parent(table, role=None)
Resolve the parent once per row (inject-or-create, cached). Slot = role or table.__table_name__.
def generator(self, ctx):
order = ctx.parent(Order)
return {
"order_id": order.key["order_id"],
"order_region": order.key["region"],
"qty": ctx.random.randint(1, 5),
}
Use order.key (dict of PK columns), order.pk (scalar, single-column PK only; raises CompoundPrimaryKeyError for compound keys), or order.attrs (full row dict).
ctx.pool(view, col)
Not an FK mode: read distinct non-null values from an existing temp view in
the session. choice() returns a deferred token that is resolved in Spark during
commit using hidden rowsmyth columns, which are dropped before public DataFrames
and temp views are registered. Raises EmptyPoolError if the view has no
non-null values, PoolSampleError when sample(k) cannot be satisfied and
UnresolvedPoolError if a deferred choice cannot resolve to a concrete value.
"role_id": ctx.pool("roles", "id").choice()
Create parents with Model.create() or Factory.create() before dependents that use pool. The inject-or-create FK path does not require ordering.
Disambiguation with via
When a child has multiple FKs to the same parent, via names the injection slot (usually the child FK column name):
User.factory().has(Post.factory().count(3), via="author_id")
In Post.generator, either:
"author_id": User.factory()
or:
ctx.parent(User, role="author_id")
Base.dataset(spark, seed=None)
Activates a dataset via a ContextVar. Factories and FK logic use the active session without threading spark through every call. The dataset is bound to the declarative base, so model creation can reject models from other bases before commit.
When seed is set, rowsmyth seeds only session-owned objects: ctx.random / dataset.random and ctx.faker / dataset.faker. It does not call global random.seed() or Faker.seed().
Use ctx.random and ctx.faker for deterministic custom logic. Avoid unseeded sources (uuid4, wall clock) unless intentional.
with Base.dataset(spark, seed=42) as dataset:
Role.create(name="admin")
Role.create(name="user")
users = (
User.factory()
.count(10)
.has(Post.factory().count(3).where(published=True), via="author_id")
.create()
)
users_df = dataset.dataframe("users")
posts_df = dataset.dataframe("posts")
# temp views "users", "posts"; dataset.spark is the active SparkSession
# Persist yourself, e.g.:
# users_df.write.mode("overwrite").saveAsTable(User.fqn())
RowCtx
| Member | Description |
|---|---|
ctx.faker |
Shared Faker instance |
ctx.random |
Seeded random.Random |
ctx.seed |
Seed passed to Base.dataset(), or None |
ctx.spark |
Active SparkSession |
ctx.index |
0-based row index for the current factory |
ctx.row |
In-progress attribute dict (for callables and FK resolution) |
ctx.sequence(name=None) |
Monotonic counter; default name = current __table_name__ |
ctx.parent(table, role=None) |
Resolve parent model object |
ctx.pool(view, col) |
Spark-backed pool from a temp view |
Dataset
Yielded by Base.dataset(). Exposes spark, base, registry, dataframes, dataframe(name), next_seq(name), pool(view, col), faker, random and seed. Internal rows, counters, deferred pool tokens and validation state are not part of the public contract.
Create output
For each table name touched by Model.create() or Factory.create():
createDataFrame(rows, dataset.registry[name].__definition__)- schema is always explicit; never inferredcreateOrReplaceTempView(__table_name__)- Store the DataFrame in
dataset.dataframes[name]
Temp views use the bare __table_name__ (not fqn()). Factories return instances, not DataFrames.
Errors
| Situation | Exception |
|---|---|
create() outside Base.dataset() |
DatasetContextError |
| Model from another declarative base used in active dataset | WrongDeclarativeBaseError |
Model does not extend declarative_base() |
InvalidDeclarativeBaseError |
Model primary key references columns absent from __definition__ |
InvalidModelDefinitionError |
Model declares reserved __rowsmyth_* columns |
ReservedColumnError |
Unknown columns passed to Model.create() or constructor |
UnknownColumnError |
Dataset.dataframe(name) before that table is created |
DataframeNotFoundError |
Unknown .variant(name) |
UnknownVariantError |
NOT NULL column missing or None in any generated row |
MissingRequiredColumnError |
Factory.count() with a negative or non-integer value |
FactoryError |
ctx.pool on empty view or all-null source column |
EmptyPoolError |
Pool.sample(k) cannot sample without replacement |
PoolSampleError |
Deferred Pool.choice() cannot resolve a concrete value |
UnresolvedPoolError |
Factory() as column value for compound-PK parent |
CompoundPrimaryKeyError |
.pk used on a compound-PK model |
CompoundPrimaryKeyError |
generator() not implemented |
NotImplementedError |
Validation runs on every generated row before commit.
Unity Catalog integration
StructField.metadata holds UC column metadata. Use metadata={"comment": "...", "tags": {...}} for column comments and tags. __comment__, __table_tags__, column_comments(), column_tags() and uc_tag_sql() are available on the Model class - rowsmyth generates comment/tag SQL strings but does not execute catalog writes.
for statement in User.uc_tag_sql():
spark.sql(statement)
# Same Model class for Lakeflow / UC declaration:
# @dp.table(name=User.fqn(), comment=User.__comment__, schema=User.__definition__)
Gotchas and non-goals
Scale. Driver-side row generation suits dev, test and seed volumes (thousands to low millions). For large-scale synthetic data, prefer vectorised tools.
Schema. Always use __definition__ with createDataFrame. Inference miss-handles None and Python int vs Spark LongType.
Determinism. seed fixes generated values. Spark may not preserve row order across partitions - sort if you need stable ordering.
Uniqueness. Use ctx.faker.unique.* or ctx.sequence() for unique columns; Faker’s .unique resets each Base.dataset() block.
FK cycles. Inject-or-create recurses forever on cyclic Factory() graphs. Break cycles by seeding one table and using ctx.pool() for the back-reference.
Ordering. No topological sort. Call .create() in dependency order when using pool(); FK inject/create does not require it.
Catalog. No saveAsTable, comments, tags, or grants - only temp views.
Out of scope. Production-scale throughput, automatic cycle breaking, distributed generation and schema inference.
Testing (library development)
Integration tests use a session-scoped local SparkSession (Java 17+). DataFrame assertions use chispa. Unit tests mock Spark where a JVM is not required. See tests/ and CONTRIBUTING.md. Local development installs PySpark via the spark uv dependency group / rowsmyth[spark] extra; end users on managed Spark typically install rowsmyth without that extra.