Introduction
pyro-postgres is a high-performance PostgreSQL driver for Python, backed by Rust. It provides both synchronous and asynchronous APIs with a focus on speed and ergonomics.
pip install pyro-postgres
Quick Start
from pyro_postgres.sync import Conn
conn = Conn("pg://user:password@localhost/mydb")
# Simple query
rows = conn.query("SELECT id, name FROM users")
# Parameterized query
user = conn.exec_first("SELECT * FROM users WHERE id = $1", (42,))
# Transaction
with conn.tx():
conn.exec_drop("INSERT INTO users (name) VALUES ($1)", ("Alice",))
conn.exec_drop("INSERT INTO users (name) VALUES ($1)", ("Bob",))
Features
- High Performance: Minimal allocations and copies
- Sync and Async: The library provides both sync and async APIs
- Pipelining: Batch multiple queries in a single round trip
- Streaming: Process large result sets without loading everything into memory
Comparisons
pyro-postgres is built on zero-postgres, providing significant performance benefits over pure Python implementations.
| Query | pyro-postgres | psycopg | Speedup |
|---|---|---|---|
| SELECT 1 row | 39 us | 104 us | 2.7x faster |
| SELECT 10 rows | 46 us | 113 us | 2.4x faster |
| SELECT 100 rows | 112 us | 178 us | 1.6x faster |
| SELECT 1000 rows | 570 us | 810 us | 1.4x faster |
| INSERT | 6 us | 20 us | 3.0x faster |
Limitations
- Python 3.10+: Requires Python 3.10 or later
- PostgreSQL 18: Supports PostgresSQL 18 or later
- Limited Type Coverage: Not all PostgreSQL types are supported yet
- Limited Performance Gain in Async API: Due to the overhead of Python 3.14 Free-threading, the async module pays a significant cost switching between Python thread and Rust thread. Upon receiving the network packet, the Rust thread needs to attach to GIL to construct objects like PyList/PyInt/PyString and detach soon after. It was observed in one of the benchmarks that more than 30% time is spent on the destruction of
GILStatec struct. To avoid this, we accumulate the received row data in the Rust buffer and convert to Python at once. The async performance has a potential to be much faster than now with the advance of single-threaded overhead of Python Free-threading.
Connection
A connection can be made with an URL string or Opts.
An URL can start with
pg://postgres://postgresql://
The URL pg://{USER}:{PASSWORD}@{HOST}:{PORT}/{DATABASE}?sslmode=require is equivalent to
Opts()
.user('USER')
.password('PASSWORD')
.host('HOST')
.port(5432)
.db('DATABASE')
.ssl_mode('require')
For the full list of options, see the type stub.
Example: basic
from pyro_postgres.sync import Conn
from pyro_postgres import Opts
# url
conn1 = Conn("pg://test:1234@localhost:5432/test_db?sslmode=require")
# url + Opts
conn2 = Conn(Opts("pg://test@localhost").ssl_mode("require"))
# Opts
conn3 = Conn(
Opts()
.socket("/tmp/pg/.s.PGSQL.5432")
.db("test_db")
)
Example: async
from pyro_postgres.async_ import Conn, Opts
conn = await Conn.new("pg://test:1234@localhost:5432/test_db")
Example: unix socket
from pyro_postgres.sync import Conn
# hostname 'localhost' is ignored
conn = Conn("pg://localhost/test?socket=/tmp/pg/.s.PGSQL.5432")
Advanced: Upgrade to Unix Socket
By default, Opts.upgrade_to_unix_socket is True.
If upgrade_to_unix_socket is True and the tcp peer IP is local, the library sends SHOW unix_socket_directories to get the unix socket path, and then tries to reconnect to {unix_socket_directories}/.s.PGSQL.{opts.port}.
This upgrade happens transparently in connection time. If succeeds, the constructor returns the new unix socket connection. If fails, returns the original TCP connection.
conn = Conn("pg://test:1234@localhost") # `socket` parameter is not provided, but `conn` can be a TCP connection or Unix socket connection.
This feature is useful if your local socket address is located at a dynamic location like /run/user/1000/devenv-8c67ae1/postgres/.s.PGSQL.5432.
For production, disable this flag and use the TCP connection or manually specify the socket address.
Query
There are two sets of query API: Simple Query and Extended Query.
Simple Query
Simple query is simple and does not support passing parameters.
class Conn:
def query(self, sql: str, *, as_dict: bool = False) -> list[tuple] | list[dict]: ...
def query_first(self, sql: str, *, as_dict: bool = False) -> tuple | dict | None: ...
def query_drop(self, sql: str) -> int: ...
query: executessqland returns the list of rows.query_first: executessqland returns the first row (or None).query_drop: executessqland drops the result. Returns the number of affected rows. Useful forINSERT/UPDATE.
Example
rows: list = conn.query("SELECT field1, field2 FROM table")
row = conn.query_first("SELECT ..") # store only the first row and throw away others.
conn.query_drop("INSERT ..") # use `query_drop` not interested in result
Extended Query
Statement = PreparedStatement | str
class Conn:
def prepare(self, sql: str) -> PreparedStatement: ...
def prepare_batch(self, sqls: list[str]) -> list[PreparedStatement]: ...
def exec(self, stmt: Statement, params = (), *, as_dict: bool = False) -> list[tuple] | list[dict]: ...
def exec_first(self, stmt: Statement, params = (), *, as_dict: bool = False) -> tuple | dict | None: ...
def exec_drop(self, stmt: Statement, params = ()) -> int: ...
def exec_batch(self, stmt: Statement, params_list = []) -> None: ...
def exec_iter(self, stmt: Statement, params, callback: Callable[[UnnamedPortal], T]) -> T: ...
class Transaction:
def exec_portal(self, query: str, params = ()) -> NamedPortal: ...
exec: execute a statement and returns the list of rowsexec_first: execute a statement and returns the first row (or None)exec_drop: execute a statement and returns the number of affected rows. useful forINSERTorUPDATEexec_batch: execute a statement many times with parameters in a single round trip. useful for bulkINSERTorUPDATEexec_iter: execute a statement and process rows on demand via a callback. useful to read rows larger than memory.exec_portal: create and returns a portal which can read rows on demand. useexec_iterfor a single row stream, andexec_portalto interleave multiple row streams.
Example: basic
# One-off query
row = conn.exec_first("SELECT field1 WHERE id = $1", (300,))
# Repeat query - parse once, execute many times
stmt = conn.prepare("SELECT field1 WHERE id = $1")
for i in [100, 200, 300]:
conn.exec_first(stmt, (i,))
Example: executing many homogeneous queries
conn.exec_batch("INSERT INTO users (age, name) VALUES ($1, $2)", [
(20, "Alice"),
(21, "Bob"),
(22, "Charlie"),
])
Example: fetching many rows larger than RAM
def process(portal):
total = 0
while True:
rows, has_more = portal.fetch(1000)
total += sum(row[0] for row in rows)
if not has_more:
break
return total
result = conn.exec_iter("SELECT value FROM large_table", (), process)
Example: interleaving two row streams
with conn.tx() as tx:
# Create portals within a transaction
portal1 = tx.exec_portal("SELECT * FROM table1")
portal2 = tx.exec_portal("SELECT * FROM table2")
# Interleave execution
while True:
rows1 = portal1.exec_collect(100)
rows2 = portal2.exec_collect(100)
process(rows1, rows2)
if portal1.is_complete() and portal2.is_complete():
break
# Cleanup
portal1.close()
portal2.close()
Transaction
Transactions ensure a group of operations either all succeed (commit) or all fail (rollback).
class Conn:
def tx(
self,
isolation_level: IsolationLevel | None = None,
readonly: bool | None = None,
) -> Transaction: ...
class Transaction:
def commit(self) -> None: ...
def rollback(self) -> None: ...
The transactions should be entered as a context manager. On successful exit, the transaction commits. On exception, it rolls back.
with conn.tx():
conn.query_drop("INSERT INTO users (name) VALUES ('Alice')")
conn.query_drop("INSERT INTO users (name) VALUES ('Bob')")
# auto-committed here
with conn.tx():
conn.query_drop("INSERT INTO users (name) VALUES ('Alice')")
raise ValueError("oops")
# auto-rolled back, no data inserted
Explicit Commit / Rollback
You can also call commit() or rollback() explicitly inside the context manager.
After the call, the transaction object cannot be used anymore.
with conn.tx() as tx:
conn.query_drop("INSERT ...")
if some_condition:
tx.commit()
else:
tx.rollback()
Isolation Level
from pyro_postgres import IsolationLevel
# BEGIN ISOLATION LEVEL SERIALIZABLE
with conn.tx(isolation_level=IsolationLevel.Serializable):
...
| Level | Description |
|---|---|
ReadUncommitted | Allows dirty reads (PostgreSQL treats as ReadCommitted) |
ReadCommitted | Default. Only sees committed data |
RepeatableRead | Snapshot at transaction start |
Serializable | Full serializability |
You can also create isolation levels from strings:
level = IsolationLevel("READ COMMITTED")
level = IsolationLevel("repeatable_read")
level = IsolationLevel("sErIaLiZaBle")
assert level.as_str() == "SERIALIZABLE"
Read-Only Transactions
Set readonly=True for read-only transactions. This can improve performance and is required for read replicas.
with conn.tx(readonly=True): # BEGIN READ ONLY
rows = conn.query("SELECT * FROM users")
Async
For async connections, use async with and await:
async with conn.tx() as tx:
await conn.query_drop("INSERT ...")
# explicit commit/rollback
async with conn.tx() as tx:
await conn.query_drop("INSERT ...")
await tx.commit()
Pipelining
Pipelining is an advanced feature to reduce the client-side waiting and the number of network round trips.
Use conn.exec_batch for executing many homogeneous queries,
and conn.pipeline for executing many heterogenous queries.
with conn.pipeline() as p:
ticket1 = p.execute("SELECT ...") # no network packet is sent
ticket2 = p.execute("INSERT ...") # no network packet is sent
ticket3 = p.execute("INSERT ...") # no network packet is sent
# The buffered commands are sent with SYNC when the next response is requested.
rows = p.claim(ticket1)
p.claim_drop(ticket2) # read the second response
p.claim_drop(ticket3)
ticket4 = p.execute("UPDATE ...")
ticket5 = p.execute("INSERT ...")
ticket6 = p.execute("SELECT ...")
# Pipeline.__exit__ sends the pending queries with SYNC.
# The responses for those queries are read and dropped.
It is recommended to prepare a set of statements before entering the pipeline.
stmt1, stmt2, stmt3 = conn.prepare_batch([
"SELECT ...",
"INSERT ...",
"INSERT ...",
])
with conn.pipeline() as p:
t1 = p.execute(stmt1, params)
t2 = p.execute(stmt2, params)
t3 = p.execute(stmt3, params)
p.sync() # you can explicitly call p.sync() or p.flush()
rows = p.claim(t1)
p.claim_drop(t2)
p.claim_drop(t3)
Logging
pyro_postgres uses Python’s standard logging module. The logs from Rust code are automatically bridged to Python logging when the module is imported.
The logger names are pyro_postgres and zero_postgres.
Basic Setup
import logging
from pyro_postgres.sync import Conn
logging.basicConfig(level=logging.DEBUG)
conn = Conn("pg://localhost/test") # logs will appear
import logging
logger = logging.getLogger("pyro_postgres") # Python layer logger
logger = logging.getLogger("zero_postgres") # Rust layer logger
API Reference
Module Structure
pyro_postgres # Opts, IsolationLevel, PreparedStatement, Ticket, init()
├── sync # Conn, Transaction, Pipeline, Portal
├── async_ # Conn, Transaction, Pipeline, Portal (async)
└── error # Exception classes
Type Stubs
pyro_postgres- Core types and initializationpyro_postgres.sync- Synchronous APIpyro_postgres.async_- Asynchronous APIpyro_postgres.error- Exceptions