rowsmyth - usage guide
Full API reference and usage patterns. For a quick overview see the README; for internal design decisions see design.md.
Install
- Local / CI:
pip install "rowsmyth[spark]"oruv add "rowsmyth[spark]"to install PySpark 4.0+ alongside rowsmyth. - Databricks / managed Spark:
pip install rowsmyth(no extra) and use the cluster's PySpark. rowsmyth requires PySpark 4.0+ at import time but does not pin it as a core dependency.
Contents
- Defining a table
- Variants
- Factory API
- Base.dataset() context manager
- RowCtx reference
- Foreign keys and referential integrity
- Create output
- Databricks Lakeflow and Unity Catalog
- Generating test fixtures
- Error reference
- Gotchas
Defining a table
Create a declarative base with declarative_base(), then subclass that base. Declare schema metadata as class attributes; implement generator() to return one row as a plain dict.
from pyspark.sql.types import LongType, StringType, StructField, StructType
from rowsmyth import declarative_base
Base = declarative_base()
class Role(Base):
__table_name__ = "roles"
__primary_key__ = ("id",)
__definition__ = StructType([
StructField("id", LongType(), False),
StructField("name", StringType(), False),
])
def generator(self, ctx):
return {
"id": ctx.sequence(),
"name": ctx.random.choice(["admin", "user", "guest"]),
}
Class attributes
| Attribute | Required | Type | Purpose |
|---|---|---|---|
__table_name__ |
yes | str |
Registry key and temp-view name |
__definition__ |
yes | StructType |
Column types, nullability and UC column metadata |
__primary_key__ |
yes | tuple[str, ...] |
One or more PK column names |
__catalog__ |
no | str \| None |
Unity Catalog catalog name; used by Model.fqn() |
__schema__ |
no | str \| None |
Schema name; used by Model.fqn() |
__comment__ |
no | str \| None |
Table comment for Unity Catalog / Lakeflow |
__table_tags__ |
no | dict[str, str] |
UC table tags |
__expectations__ |
no | dict[str, str] |
{name: sql} pairs for Lakeflow data quality expectations |
Model.fqn() joins the configured catalog parts in order. A model with both
__catalog__ and __schema__ returns catalog.schema.table_name; a partial
configuration returns the available prefix plus __table_name__.
Registry
Every subclass that declares __table_name__ is auto-registered in its declarative base registry. Omit __table_name__ on a mixin or abstract intermediate base; concrete children still register normally.
Base.registry["roles"] # -> Role class
Column metadata
Attach Unity Catalog column comments and column tags directly on StructField.
uc_tag_sql() can generate SQL for table comments, table tags, column comments
and column tags (see Apply Unity Catalog metadata).
StructField("email", StringType(), False, metadata={
"comment": "Customer email, PII",
"tags": {"pii": "true", "classification": "restricted"},
})
Multi-column relationships in definition()
Because each row is a single dict, columns that depend on each other use ordinary local variables - no DSL needed:
def generator(self, ctx):
first = ctx.faker.first_name()
last = ctx.faker.last_name()
return {
"id": ctx.sequence(),
"full_name": f"{first} {last}",
"email": f"{first.lower()}.{last.lower()}@example.com",
}
Variants
A @variant method returns a partial dict that is merged into the row produced by generator(). It lets you describe named states (churn, suspension, premium tier) without duplicating the full row.
from rowsmyth import declarative_base, variant
Base = declarative_base()
class User(Base):
__table_name__ = "users"
__primary_key__ = ("id",)
__definition__ = StructType([
StructField("id", LongType(), False),
StructField("email", StringType(), False),
StructField("status", StringType(), False),
])
def generator(self, ctx):
return {
"id": ctx.sequence(),
"email": ctx.faker.unique.ascii_email(),
"status": "active",
}
@variant
def churned(self, ctx):
return {"status": "inactive"}
@variant
def suspended(self, ctx):
return {"status": "suspended"}
Activate with .variant("churned") in the factory chain. Passing an unknown name raises UnknownVariantError.
Merge order (later wins):
1. generator()
2. @variant return dict
3. .where() overrides
Factory API
Model.factory() returns a Factory instance. All methods return self for chaining; call .create() to materialise rows and return root model instances.
User.factory()
.count(10)
.variant("churned")
.where(status="inactive", role_id=Role.factory())
.has(Post.factory().count(3), via="author_id")
.create()
Methods
| Method | Description |
|---|---|
count(n) |
Number of rows to generate (default: 1); 0 is an explicit no-op, negative/non-integer values raise FactoryError |
variant(name) |
Apply a named @variant; raises UnknownVariantError if unknown |
where(**kwargs) |
Column overrides; merged last, wins over generator() and @variant. Values may be scalars, Factory instances, deferred pool choices, or callable(ctx) -> value |
has(child_factory, via=None) |
For each parent row, generate child rows and inject this parent. via names the injection slot (usually the FK column name) |
create() |
Materialise all touched tables; register temp views; return a list of concrete root model objects |
.create() must run inside a Base.dataset(...) block for the same declarative base, or it raises DatasetContextError. Using a model from another base raises WrongDeclarativeBaseError.
Model.create() for static rows
Use Model.create(**cols) for reference data that should exist before generated rows sample from ctx.pool(...):
with Base.dataset(spark, seed=42) as dataset:
admin = Role.create(name="admin")
user = Role.create(name="user")
users = User.factory().count(20).create()
role_ids = {admin.id, user.id}
assert all(created_user.role_id in role_ids for created_user in users)
Model.create() returns one concrete model object and immediately updates dataset.dataframes[table_name] plus the matching temp view. Explicit columns override defaults from generator(ctx), so Role.create(name="admin") can still use ctx.sequence() from generator() for the primary key.
.where() with callables
Pass a callable that receives RowCtx for deferred/dependent values:
.where(
score=lambda ctx: round(ctx.random.gauss(0.5, 0.15), 4),
grade=lambda ctx: "pass" if ctx.row["score"] >= 0.4 else "fail",
)
Callables are resolved after generator() and @variant, so ctx.row contains the current in-progress attributes when the callable runs.
.has() - parent/child relationships
Generate child rows for each parent:
with Base.dataset(spark) as dataset:
users = (
User.factory()
.count(5)
.has(Post.factory().count(3), via="author_id")
.create()
)
# users - 5 user instances
# dataset.dataframe("posts") - 15 rows (3 per user), each with the correct author_id
via names the slot used to inject the parent. In Post.generator(), reference it as:
"author_id": User.factory() # slot = "author_id" (column name)
# or:
"author_id": ctx.parent(User, role="author_id").pk
Base.dataset() context manager
All .create() calls must run inside Base.dataset(...) for the declarative base that owns the models being created. It activates a session-scoped ContextVar so table creation, factories and FK resolution can find the active SparkSession and generators without threading them through every call.
with Base.dataset(spark, seed=42) as dataset:
Role.create(name="admin")
Role.create(name="user")
users = (
User.factory()
.count(10)
.has(Post.factory().count(3).where(published=True), via="author_id")
.create()
)
users_df = dataset.dataframe("users")
posts_df = dataset.dataframe("posts")
# dataset.spark, dataset.faker, dataset.random, dataset.seed - shared state
Seeding
When seed is provided, rowsmyth seeds only the active dataset state:
ctx.random/dataset.randomis a dedicatedrandom.Random(seed)instancectx.faker/dataset.fakeris a dedicated Faker instance seeded withseed
Rowsmyth does not call global random.seed() or Faker.seed(). Use ctx.random and ctx.faker inside generator() for deterministic output. Avoid unseeded sources (uuid4, wall-clock timestamps) unless non-determinism is intentional.
Dataset object
The object yielded by with Base.dataset(...) as dataset:
| Attribute | Type | Description |
|---|---|---|
dataset.spark |
SparkSession |
Active session |
dataset.base |
type[Model] |
Declarative base bound to this dataset |
dataset.registry |
dict[str, type[Model]] |
Registry for the bound declarative base |
dataset.dataframes |
dict[str, DataFrame] |
DataFrames created in this dataset |
dataset.dataframe(name) |
DataFrame |
Checked lookup by table name |
dataset.next_seq(name) |
int |
Next value for a named sequence counter |
dataset.pool(view, col) |
Pool |
Distinct values from a Spark temp view column |
dataset.faker |
Faker |
Shared Faker instance |
dataset.random |
random.Random |
Seeded RNG |
dataset.seed |
int \| None |
Seed passed to Base.dataset() |
RowCtx reference
RowCtx is passed to generator() and every @variant method. It gives access to generators, sequence counters, parent rows and pool sampling.
| Member | Description |
|---|---|
ctx.faker |
Shared Faker instance |
ctx.random |
Seeded random.Random |
ctx.seed |
Seed from Base.dataset(), or None |
ctx.spark |
Active SparkSession |
ctx.index |
0-based row index for the current factory |
ctx.row |
In-progress attribute dict (populated during resolution; useful in callables) |
ctx.sequence(name=None) |
Monotonic counter; default name is __table_name__ |
ctx.parent(table, role=None) |
Resolve or create a parent model object |
ctx.pool(view, col) |
Spark-backed pool of distinct values from a temp view |
ctx.sequence()
Returns an ever-increasing integer, distinct per named counter. Useful for surrogate keys:
"id": ctx.sequence() # counter keyed to __table_name__
"order_num": ctx.sequence("order_num") # named counter, shared across tables
ctx.faker and ctx.random
"email": ctx.faker.unique.ascii_email()
"name": ctx.faker.name()
"score": ctx.random.uniform(0, 1)
"status": ctx.random.choices(["active", "inactive"], weights=[9, 1])[0]
ctx.faker.unique resets at the start of each Base.dataset() block.
Foreign keys and referential integrity
Pattern 1 - Factory as column value (single-column FK)
Return a Factory as a column value. Rowsmyth resolves it to the parent's primary key, creating a new parent row per child row if none has been injected via .has():
class OrderItem(Base):
...
def generator(self, ctx):
return {
"id": ctx.sequence(),
"order_id": Order.factory(), # creates one Order per item unless injected
"qty": ctx.random.randint(1, 5),
}
Use .has() to share the same parent across a batch of children, or ctx.pool() for small reference tables (roles, statuses) seeded once at the start:
# Reference tables: create first, sample with pool()
with Base.dataset(spark):
Role.create(name="admin")
Role.create(name="user")
User.factory().count(20).create() # User.generator() uses ctx.pool("roles", "id").choice()
# Parent/child: use .has() to wire the relationship
with Base.dataset(spark):
orders = Order.factory().count(10).has(OrderItem.factory().count(3)).create()
The column name is the injection slot. Requires a single-column primary key on the parent; raises CompoundPrimaryKeyError for compound PKs (use ctx.parent() instead).
Pattern 2 - ctx.parent() (compound or named FKs)
Use when you need multiple FK columns from the same parent, or when the parent has a compound PK:
def generator(self, ctx):
order = ctx.parent(Order)
return {
"order_id": order.key["order_id"],
"order_region": order.key["region"],
"qty": ctx.random.randint(1, 5),
}
order.key - dict of PK columns
order.pk - scalar value (single-column PK only; raises CompoundPrimaryKeyError otherwise)
order.attrs - full row dict
Slot defaults to table.__table_name__; pass role="slot_name" to disambiguate multiple parents of the same type.
Pattern 3 - ctx.pool() (sample from an existing temp view)
Read distinct values from a temp view already registered in the session. Does not create rows:
with Base.dataset(spark):
Role.create(name="admin") # must create first
Role.create(name="user")
users = User.factory().count(20).create() # pool() safe to use now
# In User.generator():
"role_id": ctx.pool("roles", "id").choice()
pool.choice() - a deferred value resolved against Spark during commit
pool.sample(k) - immediate k distinct values without replacement
Null source values are ignored. Raises EmptyPoolError if the view has no
non-null values, PoolSampleError if k cannot be sampled without replacement,
and UnresolvedPoolError if a deferred choice cannot resolve to a concrete
value during commit.
Disambiguation with via
When a child has multiple FKs to the same parent table, use via to name the slot:
User.factory().has(Post.factory().count(3), via="author_id")
Then in Post.generator():
"author_id": User.factory() # slot = "author_id" (column name)
# or:
author = ctx.parent(User, role="author_id")
"author_id": author.pk
Create output
Factory.create() returns concrete model objects for the root table. DataFrames for every table touched in the factory tree (parents created for FKs, children from .has() and the root table itself) are stored on the active dataset.
For each table:
createDataFrame(rows, Model.__definition__)- schema is always explicit; inference is never usedcreateOrReplaceTempView(__table_name__)- bare name, not the fully qualified name- DataFrame stored in
dataset.dataframes[__table_name__]
with Base.dataset(spark, seed=42) as dataset:
users = (
User.factory()
.count(10)
.has(Post.factory().count(3), via="author_id")
.create()
)
users # list[User], 10 users
dataset.dataframe("users") # DataFrame, 10 rows
dataset.dataframe("posts") # DataFrame, 30 rows (3 per user)
Persist to Unity Catalog yourself:
dataset.dataframe("users").write.mode("overwrite").saveAsTable(User.fqn())
Databricks Lakeflow and Unity Catalog
A single Model subclass serves as the source of truth for your pipeline declaration, Unity Catalog metadata and test fixtures.
Full table definition with Lakeflow metadata
from pyspark.sql.types import LongType, StringType, StructField, StructType
from rowsmyth import declarative_base, variant
Base = declarative_base()
class Customer(Base):
__table_name__ = "customers"
__catalog__ = "main"
__schema__ = "commerce"
__comment__ = "One row per customer account"
__primary_key__ = ("id",)
__table_tags__ = {"layer": "silver", "pii": "true"}
__expectations__ = {
"id_not_null": "id IS NOT NULL",
"email_not_null": "email IS NOT NULL",
"valid_tier": "tier IN ('standard', 'premium')",
}
__definition__ = StructType([
StructField("id", LongType(), False),
StructField("email", StringType(), False, metadata={
"comment": "Customer email, PII",
"tags": {"pii": "true", "classification": "restricted"},
}),
StructField("tier", StringType(), False),
])
def generator(self, ctx):
return {
"id": ctx.sequence(),
"email": ctx.faker.unique.ascii_email(),
"tier": ctx.random.choices(["standard", "premium"], weights=[7, 3])[0],
}
@variant
def premium(self, ctx):
return {"tier": "premium"}
Lakeflow pipeline declaration
Pass class attributes directly to the pipeline decorators - no duplication:
from pyspark import pipelines as dp
from tables.customer import Customer
@dp.table(
name=Customer.__table_name__,
comment=Customer.__comment__,
schema=Customer.__definition__,
)
@dp.expect_all_or_fail(Customer.__expectations__)
def customers():
return spark.read.table("main.bronze.raw_customers")
__expectations__ is a dict[str, str] - keys are constraint names, values are SQL expressions - which maps directly to expect_all_or_fail.
Apply Unity Catalog metadata after pipeline run
rowsmyth stores metadata on the class but does not write to the catalog itself. Generate comment and tag SQL in a notebook or job that runs after the pipeline:
for statement in Customer.uc_tag_sql():
spark.sql(statement)
Model.fqn()
Returns the table name with any configured catalog/schema prefix:
Customer.fqn() # → "main.commerce.customers"
Generating test fixtures
Use rowsmyth to create deterministic seed data for integration tests against Lakeflow pipelines.
Write to a Unity Catalog volume
The pipeline reads from a volume path; write fixture parquet there:
from pyspark.sql import SparkSession
from tables.base import Base
from tables.customer import Customer
spark = SparkSession.builder.getOrCreate()
with Base.dataset(spark, seed=42) as dataset:
Customer.factory().count(100).create()
customers_df = dataset.dataframe("customers")
customers_df.write.mode("overwrite").parquet(
"/Volumes/main/bronze/ingest/raw_customers/"
)
Write to a persistent bronze table
The pipeline reads from a Unity Catalog table; populate it directly:
with Base.dataset(spark, seed=42) as dataset:
Customer.factory().count(100).create()
customers_df = dataset.dataframe("customers")
customers_df.write.mode("overwrite").saveAsTable("main.bronze.raw_customers")
Multi-table fixture with related data
Create all tables the pipeline depends on in one session to maintain referential integrity:
from tables.order import Order
from tables.order_item import OrderItem
from tables.customer import Customer
with Base.dataset(spark, seed=42) as dataset:
Customer.factory().count(50).create()
Order.factory().count(200).has(OrderItem.factory().count(3), via="order_id").create()
customers_df = dataset.dataframe("customers")
orders_df = dataset.dataframe("orders")
order_items_df = dataset.dataframe("order_items")
customers_df.write.mode("overwrite").saveAsTable(
"main.bronze.raw_customers"
)
orders_df.write.mode("overwrite").saveAsTable("main.bronze.raw_orders")
order_items_df.write.mode("overwrite").saveAsTable(
"main.bronze.raw_order_items"
)
Fixture with variants
Use variants to generate a realistic mix of row states:
with Base.dataset(spark, seed=42) as dataset:
# 70 standard + 30 premium customers
Customer.factory().count(70).create()
Customer.factory().count(30).variant("premium").create()
all_customers = dataset.dataframe("customers")
all_customers.write.mode("overwrite").saveAsTable("main.bronze.raw_customers")
Error reference
| Situation | Exception | Message |
|---|---|---|
.create() called outside Base.dataset() |
DatasetContextError |
rowsmyth factories must be used inside Base.dataset(spark, ...) |
| Model from another declarative base used in active dataset | WrongDeclarativeBaseError |
{model} belongs to a different declarative base than the active dataset |
Model does not extend declarative_base() |
InvalidDeclarativeBaseError |
{model} must extend a rowsmyth declarative base created by declarative_base() |
Model declares reserved __rowsmyth_* columns |
ReservedColumnError |
{table}: reserved rowsmyth columns: {cols} |
Model.create() or model constructor with unknown columns |
UnknownColumnError |
{table}: unknown columns: {cols} |
Dataset.dataframe(name) before that table is created |
DataframeNotFoundError |
{name!r} has not been created in this dataset |
Unknown .variant(name) |
UnknownVariantError |
{table} has no variant {name!r} |
Model primary key references columns absent from __definition__ |
InvalidModelDefinitionError |
{table}: missing primary key columns: {cols} |
NOT NULL column missing or None in any generated row |
MissingRequiredColumnError |
{table}: NOT NULL columns without a value: {cols} |
Factory.count() with a negative or non-integer value |
FactoryError |
Factory.count() requires a non-negative integer |
ctx.pool() on empty temp view or all-null source column |
EmptyPoolError |
pool({view!r}, {col!r}): no non-null values in temp view |
pool.sample(k) cannot sample without replacement |
PoolSampleError |
pool({view!r}, {col!r}): cannot sample {k} values from {n} available values |
pool.choice() cannot resolve a concrete value |
UnresolvedPoolError |
pool choice for {table}.{column} could not be resolved |
Factory() as value for compound-PK parent |
CompoundPrimaryKeyError |
{table}: Factory() as column value requires single-column PK; use ctx.parent() |
.pk used on a compound-PK model |
CompoundPrimaryKeyError |
{table}: pk requires a single-column primary key |
generator() not implemented |
NotImplementedError |
{table} must implement generator() |
Rowsmyth validates every generated row before commit: unknown columns fail,
non-nullable columns must be present, and non-nullable values cannot be None.
Gotchas
Scale. Row generation runs in the Spark driver, row by row. Deferred pool choices are resolved with Spark joins, but rowsmyth still returns model objects to the driver. This suits dev, test and seed volumes. For large-scale synthetic data prefer vectorised tools such as dbldatagen.
Schema. Always pass __definition__ to createDataFrame. Schema inference miss-handles None values and conflates Python int with Spark LongType.
Determinism. seed fixes generated values, but Spark does not guarantee row order across partitions. Sort the DataFrame if stable ordering matters.
Uniqueness. Use ctx.faker.unique.* or ctx.sequence() for columns with unique constraints. ctx.faker.unique resets at the start of each Base.dataset() block.
FK cycles. Inject-or-create recurses forever on cyclic Factory() graphs. Break cycles by creating one side with .create() first, then referencing it with ctx.pool() for the back-reference.
Create order. There is no automatic topological sort. Call .create() in dependency order when using ctx.pool(). The inject-or-create FK path does not require explicit ordering.
Temp view names. createOrReplaceTempView uses the bare __table_name__, not the fully-qualified name. Queries against temp views must use the bare name.
No catalog writes. rowsmyth only creates temp views. Writing to Unity Catalog tables, volumes, comments, tags or grants is always your responsibility.