Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Introduction

pyro-mysql is a high-performance MySQL driver for Python, backed by Rust.

pip install pyro-mysql

Quick Start

from pyro_mysql.sync import Conn

conn = Conn("mysql://user:password@localhost/mydb")

# Simple query
rows = conn.query("SELECT id, name FROM users")

# Parameterized query
user = conn.exec_first("SELECT * FROM users WHERE id = ?", (42,))

# Transaction
with conn.start_transaction() as tx:
    conn.exec_drop("INSERT INTO users (name) VALUES (?)", ("Alice",))
    conn.exec_drop("INSERT INTO users (name) VALUES (?)", ("Bob",))
    tx.commit()

Features

  • High Performance: Minimal allocations and copies
  • Sync and Async: The library provides both sync and async APIs
  • Binary Protocol: Prepared statements with automatic caching
  • MariaDB Bulk Execution: Single round-trip bulk operations

Limitations

  • No Streaming: All results are fetched into memory
  • Limited Performance Gain in Async API: Due to the overhead of Python GIL, the async module pays a significant cost switching between Python thread and Rust thread. The async performance has a potential to be much faster with Python 3.14 free-threaded builds.

Connection

A connection can be made with an URL string or Opts.

The URL format is:

mysql://[user[:password]@]host[:port][/database][?tls=true&compress=true]

The URL mysql://{USER}:{PASSWORD}@{HOST}:{PORT}/{DATABASE}?tls=true&compress=true is equivalent to:

Opts()
  .user('USER')
  .password('PASSWORD')
  .host('HOST')
  .port(PORT)
  .db('DATABASE')

For the full list of options, see the type stub.

Example: basic

from pyro_mysql.sync import Conn
from pyro_mysql import Opts

# url
conn1 = Conn("mysql://test:1234@localhost:3306/test_db")

# url + Opts
conn2 = Conn(Opts("mysql://test@localhost").tcp_nodelay(True))

# Opts
conn3 = Conn(
    Opts()
        .socket("/tmp/mysql.sock")
        .user("root")
        .db("test_db")
)

Example: async

from pyro_mysql.async_ import Conn
from pyro_mysql import Opts

conn = await Conn.new("mysql://test:1234@localhost:3306/test_db")

Example: unix socket

from pyro_mysql.sync import Conn

# hostname 'localhost' is ignored when socket is set
conn = Conn(Opts().socket("/var/run/mysqld/mysqld.sock").db("test"))

Advanced: Upgrade to Unix Socket

By default, upgrade_to_unix_socket is True.

If the connection is made via TCP to localhost, the driver queries SELECT @@socket to get the Unix socket path, then reconnects using the socket for better performance.

conn = Conn("mysql://test:1234@localhost")
# If localhost, conn may be a Unix socket connection

For production, disable this flag and use TCP or manually specify the socket address:

conn = Conn(Opts("mysql://test:1234@localhost").upgrade_to_unix_socket(False))

Query

There are two sets of query API: Text Protocol and Binary Protocol.

Text Protocol

Text protocol is simple and supports multiple statements separated by ;, but does not support parameter binding.

class Conn:
  def query(self, sql: str, *, as_dict: bool = False) -> list[tuple] | list[dict]: ...
  def query_first(self, sql: str, *, as_dict: bool = False) -> tuple | dict | None: ...
  def query_drop(self, sql: str) -> None: ...
  • query: executes sql and returns the list of rows
  • query_first: executes sql and returns the first row (or None)
  • query_drop: executes sql and discards the result

Example

rows = conn.query("SELECT field1, field2 FROM users")
row = conn.query_first("SELECT * FROM users WHERE id = 1")
conn.query_drop("INSERT INTO users (name) VALUES ('Alice')")

Binary Protocol

Binary protocol uses prepared statements with parameter binding. Use ? as the placeholder.

class Conn:
  def exec(self, query: str, params = (), *, as_dict: bool = False) -> list[tuple] | list[dict]: ...
  def exec_first(self, query: str, params = (), *, as_dict: bool = False) -> tuple | dict | None: ...
  def exec_drop(self, query: str, params = ()) -> None: ...
  def exec_batch(self, query: str, params_list = []) -> None: ...
  def exec_bulk_insert_or_update(self, query: str, params_list = [], *, as_dict: bool = False) -> list[tuple] | list[dict]: ...
  • exec: execute a prepared statement and return the list of rows
  • exec_first: execute a prepared statement and return the first row (or None)
  • exec_drop: execute a prepared statement and discard the result
  • exec_batch: execute a prepared statement multiple times with different parameters
  • exec_bulk_insert_or_update: execute a prepared statement with bulk parameters (MariaDB only, single round trip)

Example: basic

# One-off query with parameters
row = conn.exec_first("SELECT * FROM users WHERE id = ?", (300,))

# Multiple queries with the same prepared statement (automatically cached)
for user_id in [100, 200, 300]:
    row = conn.exec_first("SELECT * FROM users WHERE id = ?", (user_id,))

Example: batch execution

For executing many similar statements (e.g., bulk INSERT):

conn.exec_batch("INSERT INTO users (age, name) VALUES (?, ?)", [
    (20, "Alice"),
    (21, "Bob"),
    (22, "Charlie"),
])

Example: bulk execution (MariaDB)

MariaDB supports bulk execution which sends all parameters in a single packet:

conn.exec_bulk_insert_or_update("INSERT INTO users (age, name) VALUES (?, ?)", [
    (20, "Alice"),
    (21, "Bob"),
    (22, "Charlie"),
])

Statement Caching

Prepared statements are automatically cached per connection. The first exec* call with a query string prepares the statement, and subsequent calls reuse it.

# First call: prepares and executes
conn.exec("SELECT * FROM users WHERE id = ?", (1,))

# Second call: reuses prepared statement
conn.exec("SELECT * FROM users WHERE id = ?", (2,))

Result Format

By default, rows are returned as tuples. Use as_dict=True to get dictionaries with column names as keys:

# As tuples (default)
rows = conn.query("SELECT id, name FROM users")
# [(1, 'Alice'), (2, 'Bob')]

# As dictionaries
rows = conn.query("SELECT id, name FROM users", as_dict=True)
# [{'id': 1, 'name': 'Alice'}, {'id': 2, 'name': 'Bob'}]

Async

For async connections, use await:

rows = await conn.query("SELECT * FROM users")
row = await conn.exec_first("SELECT * FROM users WHERE id = ?", (1,))
await conn.exec_drop("INSERT INTO users (name) VALUES (?)", ("Alice",))

Type Conversion

Python -> MySQL

Python TypeMySQL Binary Protocol Encoding
NoneNULL
boolInt64
intInt64
floatDouble(Float64)
str | bytes | bytearrayBytes
tuple | list | set | frozenset | dictjson-encoded string as Bytes
datetime.datetimeDate(year, month, day, hour, minute, second, microsecond)
datetime.dateDate(year, month, day, 0, 0, 0, 0)
datetime.timeTime(false, 0, hour, minute, second, microsecond)
datetime.timedeltaTime(is_negative, days, hours, minutes, seconds, microseconds)
time.struct_timeDate(year, month, day, hour, minute, second, 0)
decimal.DecimalBytes(str(Decimal))
uuid.UUIDBytes(UUID.hex)

MySQL -> Python

MySQL ColumnPython
NULLNone
INT / TINYINT / SMALLINT / MEDIUMINT / BIGINT / YEARint
FLOAT / DOUBLEfloat
DECIMAL / NUMERICdecimal.Decimal
DATEdatetime.date or None (0000-00-00)
DATETIME / TIMESTAMPdatetime.datetime or None (0000-00-00 00:00:00)
TIMEdatetime.timedelta
CHAR / VARCHAR / TEXT / TINYTEXT / MEDIUMTEXT / LONGTEXTstr
BINARY / VARBINARY / BLOB / TINYBLOB / MEDIUMBLOB / LONGBLOBbytes
JSONstr or the result of json.loads()
ENUM / SETstr
BITbytes
GEOMETRYbytes

Transaction

Transactions ensure a group of operations either all succeed (commit) or all fail (rollback).

class Conn:
  def start_transaction(
    self,
    consistent_snapshot: bool = False,
    isolation_level: IsolationLevel | None = None,
    readonly: bool | None = None,
  ) -> Transaction: ...

class Transaction:
  def commit(self) -> None: ...
  def rollback(self) -> None: ...

The transactions should be entered as a context manager. You must call commit() explicitly. If neither commit() nor rollback() is called, the transaction rolls back on exit.

with conn.start_transaction() as tx:
    conn.exec_drop("INSERT INTO users (name) VALUES (?)", ("Alice",))
    conn.exec_drop("INSERT INTO users (name) VALUES (?)", ("Bob",))
    tx.commit()
# committed
with conn.start_transaction() as tx:
    conn.exec_drop("INSERT INTO users (name) VALUES (?)", ("Alice",))
    raise ValueError("oops")
# rolled back, no data inserted

Explicit Commit / Rollback

You can call commit() or rollback() explicitly inside the context manager. After the call, the transaction object cannot be used anymore.

with conn.start_transaction() as tx:
    conn.exec_drop("INSERT INTO users (name) VALUES (?)", ("Alice",))
    if some_condition:
        tx.commit()
    else:
        tx.rollback()

Isolation Level

from pyro_mysql import IsolationLevel

with conn.start_transaction(isolation_level=IsolationLevel.Serializable) as tx:
    ...
    tx.commit()
LevelDescription
ReadUncommittedAllows dirty reads
ReadCommittedOnly sees committed data
RepeatableReadSnapshot at transaction start (InnoDB default)
SerializableFull serializability

You can also create isolation levels from strings:

level = IsolationLevel("READ COMMITTED")
level = IsolationLevel("repeatable_read")
level = IsolationLevel("sErIaLiZaBle")

assert level.as_str() == "SERIALIZABLE"

Read-Only Transactions

Set readonly=True for read-only transactions. This can improve performance.

with conn.start_transaction(readonly=True) as tx:
    rows = conn.query("SELECT * FROM users")
    tx.commit()

Consistent Snapshot

For InnoDB, you can request a consistent snapshot at transaction start:

with conn.start_transaction(consistent_snapshot=True) as tx:
    rows = conn.query("SELECT * FROM users")
    tx.commit()

This executes START TRANSACTION WITH CONSISTENT SNAPSHOT.

Async

For async connections, use async with and await:

async with conn.start_transaction() as tx:
    await conn.exec_drop("INSERT INTO users (name) VALUES (?)", ("Alice",))
    await tx.commit()

# explicit rollback
async with conn.start_transaction() as tx:
    await conn.exec_drop("INSERT INTO users (name) VALUES (?)", ("Alice",))
    await tx.rollback()

Logging

pyro_mysql uses Python’s standard logging module. The logs from Rust code are automatically bridged to Python logging when the module is imported.

The logger name is pyro_mysql.

Basic Setup

import logging
from pyro_mysql.sync import Conn

logging.basicConfig(level=logging.DEBUG)
conn = Conn("mysql://test:1234@localhost:3306/test_db")  # logs will appear
import logging

logger = logging.getLogger("pyro_mysql")

Performance Tips

  • Prefer MariaDB to MySQL.
  • Prefer Unix socket to TCP.
  • Use BufferPool to reuse allocations between connections.
  • Use Conn.exec_bulk_insert_or_update to group 2~1000 INSERTs or UPDATEs.
  • The async API is fast, but still far from optimal because of the GIL. Wait for Python 3.14 and mature free-threaded builds for faster asyncio performance.
  • The sync API is optimized for single-thread usage. The library does not actively release the GIL during operations. When free-threaded Python becomes mature, the optimal API will be reconsidered.

API Reference

Module Structure

pyro_mysql              # Opts, IsolationLevel, Row
├── sync                # Conn, Pool, PooledConn, Transaction
├── async_              # Conn, Pool, PooledConn, Transaction (async)
├── dbapi               # PEP-249 DBAPI for SQLAlchemy
├── dbapi_async         # Async DBAPI
└── error               # Exception classes

Type Stubs