Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Introduction

pyro-postgres is a high-performance PostgreSQL driver for Python, backed by Rust. It provides both synchronous and asynchronous APIs with a focus on speed and ergonomics.

pip install pyro-postgres

Quick Start

from pyro_postgres.sync import Conn

conn = Conn("pg://user:password@localhost/mydb")

# Simple query
rows = conn.query("SELECT id, name FROM users")

# Parameterized query
user = conn.exec_first("SELECT * FROM users WHERE id = $1", (42,))

# Transaction
with conn.tx():
    conn.exec_drop("INSERT INTO users (name) VALUES ($1)", ("Alice",))
    conn.exec_drop("INSERT INTO users (name) VALUES ($1)", ("Bob",))

Features

  • High Performance: Minimal allocations and copies
  • Sync and Async: The library provides both sync and async APIs
  • Pipelining: Batch multiple queries in a single round trip
  • Streaming: Process large result sets without loading everything into memory

Comparisons

pyro-postgres is built on zero-postgres, providing significant performance benefits over pure Python implementations.

Querypyro-postgrespsycopgSpeedup
SELECT 1 row39 us104 us2.7x faster
SELECT 10 rows46 us113 us2.4x faster
SELECT 100 rows112 us178 us1.6x faster
SELECT 1000 rows570 us810 us1.4x faster
INSERT6 us20 us3.0x faster

Limitations

  • Python 3.10+: Requires Python 3.10 or later
  • PostgreSQL 18: Supports PostgresSQL 18 or later
  • Limited Type Coverage: Not all PostgreSQL types are supported yet
  • Limited Performance Gain in Async API: Due to the overhead of Python 3.14 Free-threading, the async module pays a significant cost switching between Python thread and Rust thread. Upon receiving the network packet, the Rust thread needs to attach to GIL to construct objects like PyList/PyInt/PyString and detach soon after. It was observed in one of the benchmarks that more than 30% time is spent on the destruction of GILState c struct. To avoid this, we accumulate the received row data in the Rust buffer and convert to Python at once. The async performance has a potential to be much faster than now with the advance of single-threaded overhead of Python Free-threading.

Connection

A connection can be made with an URL string or Opts.

An URL can start with

  • pg://
  • postgres://
  • postgresql://

The URL pg://{USER}:{PASSWORD}@{HOST}:{PORT}/{DATABASE}?sslmode=require is equivalent to

Opts()
  .user('USER')
  .password('PASSWORD')
  .host('HOST')
  .port(5432)
  .db('DATABASE')
  .ssl_mode('require')

For the full list of options, see the type stub.

Example: basic

from pyro_postgres.sync import Conn
from pyro_postgres import Opts

# url
conn1 = Conn("pg://test:1234@localhost:5432/test_db?sslmode=require")

# url + Opts
conn2 = Conn(Opts("pg://test@localhost").ssl_mode("require"))

# Opts
conn3 = Conn(
    Opts()
        .socket("/tmp/pg/.s.PGSQL.5432")
        .db("test_db")
)

Example: async

from pyro_postgres.async_ import Conn, Opts

conn = await Conn.new("pg://test:1234@localhost:5432/test_db")

Example: unix socket

from pyro_postgres.sync import Conn

# hostname 'localhost' is ignored
conn = Conn("pg://localhost/test?socket=/tmp/pg/.s.PGSQL.5432")

Advanced: Upgrade to Unix Socket

By default, Opts.upgrade_to_unix_socket is True.

If upgrade_to_unix_socket is True and the tcp peer IP is local, the library sends SHOW unix_socket_directories to get the unix socket path, and then tries to reconnect to {unix_socket_directories}/.s.PGSQL.{opts.port}. This upgrade happens transparently in connection time. If succeeds, the constructor returns the new unix socket connection. If fails, returns the original TCP connection.

conn = Conn("pg://test:1234@localhost")  # `socket` parameter is not provided, but `conn` can be a TCP connection or Unix socket connection.

This feature is useful if your local socket address is located at a dynamic location like /run/user/1000/devenv-8c67ae1/postgres/.s.PGSQL.5432. For production, disable this flag and use the TCP connection or manually specify the socket address.

Query

There are two sets of query API: Simple Query and Extended Query.

Simple Query

Simple query is simple and does not support passing parameters.

class Conn:
    def query(self, sql: str, *, as_dict: bool = False) -> list[tuple] | list[dict]: ...
    def query_first(self, sql: str, *, as_dict: bool = False) -> tuple | dict | None: ...
    def query_drop(self, sql: str) -> int: ...
  • query: executes sql and returns the list of rows.
  • query_first: executes sql and returns the first row (or None).
  • query_drop: executes sql and drops the result. Returns the number of affected rows. Useful for INSERT/UPDATE.

Example

rows: list = conn.query("SELECT field1, field2 FROM table")
row = conn.query_first("SELECT ..")  # store only the first row and throw away others.
conn.query_drop("INSERT ..")  # use `query_drop` not interested in result

Extended Query

Statement = PreparedStatement | str

class Conn:
    def prepare(self, sql: str) -> PreparedStatement: ...
    def prepare_batch(self, sqls: list[str]) -> list[PreparedStatement]: ...
    def exec(self, stmt: Statement, params = (), *, as_dict: bool = False) -> list[tuple] | list[dict]: ...
    def exec_first(self, stmt: Statement, params = (), *, as_dict: bool = False) -> tuple | dict | None: ...
    def exec_drop(self, stmt: Statement, params = ()) -> int: ...
    def exec_batch(self, stmt: Statement, params_list = []) -> None: ...
    def exec_iter(self, stmt: Statement, params, callback: Callable[[UnnamedPortal], T]) -> T: ...

class Transaction:
    def exec_portal(self, query: str, params = ()) -> NamedPortal: ...
  • exec: execute a statement and returns the list of rows
  • exec_first: execute a statement and returns the first row (or None)
  • exec_drop: execute a statement and returns the number of affected rows. useful for INSERT or UPDATE
  • exec_batch: execute a statement many times with parameters in a single round trip. useful for bulk INSERT or UPDATE
  • exec_iter: execute a statement and process rows on demand via a callback. useful to read rows larger than memory.
  • exec_portal: create and returns a portal which can read rows on demand. use exec_iter for a single row stream, and exec_portal to interleave multiple row streams.

Example: basic

# One-off query
row = conn.exec_first("SELECT field1 WHERE id = $1", (300,))

# Repeat query - parse once, execute many times
stmt = conn.prepare("SELECT field1 WHERE id = $1")
for i in [100, 200, 300]:
    conn.exec_first(stmt, (i,))

Example: executing many homogeneous queries

conn.exec_batch("INSERT INTO users (age, name) VALUES ($1, $2)", [
    (20, "Alice"),
    (21, "Bob"),
    (22, "Charlie"),
])

Example: fetching many rows larger than RAM

def process(portal):
    total = 0
    while True:
        rows, has_more = portal.fetch(1000)
        total += sum(row[0] for row in rows)
        if not has_more:
            break
    return total

result = conn.exec_iter("SELECT value FROM large_table", (), process)

Example: interleaving two row streams

with conn.tx() as tx:
    # Create portals within a transaction
    portal1 = tx.exec_portal("SELECT * FROM table1")
    portal2 = tx.exec_portal("SELECT * FROM table2")

    # Interleave execution
    while True:
        rows1 = portal1.exec_collect(100)
        rows2 = portal2.exec_collect(100)
        process(rows1, rows2)
        if portal1.is_complete() and portal2.is_complete():
            break

    # Cleanup
    portal1.close()
    portal2.close()

Transaction

Transactions ensure a group of operations either all succeed (commit) or all fail (rollback).

class Conn:
  def tx(
    self,
    isolation_level: IsolationLevel | None = None,
    readonly: bool | None = None,
  ) -> Transaction: ...

class Transaction:
  def commit(self) -> None: ...
  def rollback(self) -> None: ...

The transactions should be entered as a context manager. On successful exit, the transaction commits. On exception, it rolls back.

with conn.tx():
    conn.query_drop("INSERT INTO users (name) VALUES ('Alice')")
    conn.query_drop("INSERT INTO users (name) VALUES ('Bob')")
# auto-committed here
with conn.tx():
    conn.query_drop("INSERT INTO users (name) VALUES ('Alice')")
    raise ValueError("oops")
# auto-rolled back, no data inserted

Explicit Commit / Rollback

You can also call commit() or rollback() explicitly inside the context manager. After the call, the transaction object cannot be used anymore.

with conn.tx() as tx:
    conn.query_drop("INSERT ...")
    if some_condition:
        tx.commit()
    else:
        tx.rollback()

Isolation Level

from pyro_postgres import IsolationLevel

# BEGIN ISOLATION LEVEL SERIALIZABLE
with conn.tx(isolation_level=IsolationLevel.Serializable):
    ...
LevelDescription
ReadUncommittedAllows dirty reads (PostgreSQL treats as ReadCommitted)
ReadCommittedDefault. Only sees committed data
RepeatableReadSnapshot at transaction start
SerializableFull serializability

You can also create isolation levels from strings:

level = IsolationLevel("READ COMMITTED")
level = IsolationLevel("repeatable_read")
level = IsolationLevel("sErIaLiZaBle")

assert level.as_str() == "SERIALIZABLE"

Read-Only Transactions

Set readonly=True for read-only transactions. This can improve performance and is required for read replicas.

with conn.tx(readonly=True):  # BEGIN READ ONLY
    rows = conn.query("SELECT * FROM users")

Async

For async connections, use async with and await:

async with conn.tx() as tx:
    await conn.query_drop("INSERT ...")

# explicit commit/rollback
async with conn.tx() as tx:
    await conn.query_drop("INSERT ...")
    await tx.commit()

Pipelining

Pipelining is an advanced feature to reduce the client-side waiting and the number of network round trips.

Use conn.exec_batch for executing many homogeneous queries,
and conn.pipeline for executing many heterogenous queries.

with conn.pipeline() as p:
    ticket1 = p.execute("SELECT ...")  # no network packet is sent
    ticket2 = p.execute("INSERT ...")  # no network packet is sent
    ticket3 = p.execute("INSERT ...")  # no network packet is sent

    # The buffered commands are sent with SYNC when the next response is requested.
    rows = p.claim(ticket1)
    p.claim_drop(ticket2)  # read the second response
    p.claim_drop(ticket3)

    ticket4 = p.execute("UPDATE ...")
    ticket5 = p.execute("INSERT ...")
    ticket6 = p.execute("SELECT ...")

    # Pipeline.__exit__ sends the pending queries with SYNC.
    # The responses for those queries are read and dropped.

It is recommended to prepare a set of statements before entering the pipeline.

stmt1, stmt2, stmt3 = conn.prepare_batch([
      "SELECT ...",
      "INSERT ...",
      "INSERT ...",
  ])

with conn.pipeline() as p:
    t1 = p.execute(stmt1, params)
    t2 = p.execute(stmt2, params)
    t3 = p.execute(stmt3, params)

    p.sync()  # you can explicitly call p.sync() or p.flush()

    rows = p.claim(t1)
    p.claim_drop(t2)
    p.claim_drop(t3)

Logging

pyro_postgres uses Python’s standard logging module. The logs from Rust code are automatically bridged to Python logging when the module is imported.

The logger names are pyro_postgres and zero_postgres.

Basic Setup

import logging
from pyro_postgres.sync import Conn

logging.basicConfig(level=logging.DEBUG)
conn = Conn("pg://localhost/test")  # logs will appear
import logging

logger = logging.getLogger("pyro_postgres")  # Python layer logger
logger = logging.getLogger("zero_postgres")  # Rust layer logger

API Reference

Module Structure

pyro_postgres           # Opts, IsolationLevel, PreparedStatement, Ticket, init()
├── sync                # Conn, Transaction, Pipeline, Portal
├── async_              # Conn, Transaction, Pipeline, Portal (async)
└── error               # Exception classes

Type Stubs