Introduction
pyro-mysql is a high-performance MySQL driver for Python, backed by Rust.
pip install pyro-mysql
Quick Start
from pyro_mysql.sync import Conn
conn = Conn("mysql://user:password@localhost/mydb")
# Simple query
rows = conn.query("SELECT id, name FROM users")
# Parameterized query
user = conn.exec_first("SELECT * FROM users WHERE id = ?", (42,))
# Transaction
with conn.start_transaction() as tx:
conn.exec_drop("INSERT INTO users (name) VALUES (?)", ("Alice",))
conn.exec_drop("INSERT INTO users (name) VALUES (?)", ("Bob",))
tx.commit()
Features
- High Performance: Minimal allocations and copies
- Sync and Async: The library provides both sync and async APIs
- Binary Protocol: Prepared statements with automatic caching
- MariaDB Bulk Execution: Single round-trip bulk operations
Limitations
- No Streaming: All results are fetched into memory
- Limited Performance Gain in Async API: Due to the overhead of Python GIL, the async module pays a significant cost switching between Python thread and Rust thread. The async performance has a potential to be much faster with Python 3.14 free-threaded builds.
Connection
A connection can be made with an URL string or Opts.
The URL format is:
mysql://[user[:password]@]host[:port][/database][?tls=true&compress=true]
The URL mysql://{USER}:{PASSWORD}@{HOST}:{PORT}/{DATABASE}?tls=true&compress=true is equivalent to:
Opts()
.user('USER')
.password('PASSWORD')
.host('HOST')
.port(PORT)
.db('DATABASE')
For the full list of options, see the type stub.
Example: basic
from pyro_mysql.sync import Conn
from pyro_mysql import Opts
# url
conn1 = Conn("mysql://test:1234@localhost:3306/test_db")
# url + Opts
conn2 = Conn(Opts("mysql://test@localhost").tcp_nodelay(True))
# Opts
conn3 = Conn(
Opts()
.socket("/tmp/mysql.sock")
.user("root")
.db("test_db")
)
Example: async
from pyro_mysql.async_ import Conn
from pyro_mysql import Opts
conn = await Conn.new("mysql://test:1234@localhost:3306/test_db")
Example: unix socket
from pyro_mysql.sync import Conn
# hostname 'localhost' is ignored when socket is set
conn = Conn(Opts().socket("/var/run/mysqld/mysqld.sock").db("test"))
Advanced: Upgrade to Unix Socket
By default, upgrade_to_unix_socket is True.
If the connection is made via TCP to localhost, the driver queries SELECT @@socket to get the Unix socket path, then reconnects using the socket for better performance.
conn = Conn("mysql://test:1234@localhost")
# If localhost, conn may be a Unix socket connection
For production, disable this flag and use TCP or manually specify the socket address:
conn = Conn(Opts("mysql://test:1234@localhost").upgrade_to_unix_socket(False))
Query
There are two sets of query API: Text Protocol and Binary Protocol.
Text Protocol
Text protocol is simple and supports multiple statements separated by ;, but does not support parameter binding.
class Conn:
def query(self, sql: str, *, as_dict: bool = False) -> list[tuple] | list[dict]: ...
def query_first(self, sql: str, *, as_dict: bool = False) -> tuple | dict | None: ...
def query_drop(self, sql: str) -> None: ...
query: executessqland returns the list of rowsquery_first: executessqland returns the first row (or None)query_drop: executessqland discards the result
Example
rows = conn.query("SELECT field1, field2 FROM users")
row = conn.query_first("SELECT * FROM users WHERE id = 1")
conn.query_drop("INSERT INTO users (name) VALUES ('Alice')")
Binary Protocol
Binary protocol uses prepared statements with parameter binding. Use ? as the placeholder.
class Conn:
def exec(self, query: str, params = (), *, as_dict: bool = False) -> list[tuple] | list[dict]: ...
def exec_first(self, query: str, params = (), *, as_dict: bool = False) -> tuple | dict | None: ...
def exec_drop(self, query: str, params = ()) -> None: ...
def exec_batch(self, query: str, params_list = []) -> None: ...
def exec_bulk_insert_or_update(self, query: str, params_list = [], *, as_dict: bool = False) -> list[tuple] | list[dict]: ...
exec: execute a prepared statement and return the list of rowsexec_first: execute a prepared statement and return the first row (or None)exec_drop: execute a prepared statement and discard the resultexec_batch: execute a prepared statement multiple times with different parametersexec_bulk_insert_or_update: execute a prepared statement with bulk parameters (MariaDB only, single round trip)
Example: basic
# One-off query with parameters
row = conn.exec_first("SELECT * FROM users WHERE id = ?", (300,))
# Multiple queries with the same prepared statement (automatically cached)
for user_id in [100, 200, 300]:
row = conn.exec_first("SELECT * FROM users WHERE id = ?", (user_id,))
Example: batch execution
For executing many similar statements (e.g., bulk INSERT):
conn.exec_batch("INSERT INTO users (age, name) VALUES (?, ?)", [
(20, "Alice"),
(21, "Bob"),
(22, "Charlie"),
])
Example: bulk execution (MariaDB)
MariaDB supports bulk execution which sends all parameters in a single packet:
conn.exec_bulk_insert_or_update("INSERT INTO users (age, name) VALUES (?, ?)", [
(20, "Alice"),
(21, "Bob"),
(22, "Charlie"),
])
Statement Caching
Prepared statements are automatically cached per connection. The first exec* call with a query string prepares the statement, and subsequent calls reuse it.
# First call: prepares and executes
conn.exec("SELECT * FROM users WHERE id = ?", (1,))
# Second call: reuses prepared statement
conn.exec("SELECT * FROM users WHERE id = ?", (2,))
Result Format
By default, rows are returned as tuples. Use as_dict=True to get dictionaries with column names as keys:
# As tuples (default)
rows = conn.query("SELECT id, name FROM users")
# [(1, 'Alice'), (2, 'Bob')]
# As dictionaries
rows = conn.query("SELECT id, name FROM users", as_dict=True)
# [{'id': 1, 'name': 'Alice'}, {'id': 2, 'name': 'Bob'}]
Async
For async connections, use await:
rows = await conn.query("SELECT * FROM users")
row = await conn.exec_first("SELECT * FROM users WHERE id = ?", (1,))
await conn.exec_drop("INSERT INTO users (name) VALUES (?)", ("Alice",))
Type Conversion
Python -> MySQL
| Python Type | MySQL Binary Protocol Encoding |
|---|---|
None | NULL |
bool | Int64 |
int | Int64 |
float | Double(Float64) |
str | bytes | bytearray | Bytes |
tuple | list | set | frozenset | dict | json-encoded string as Bytes |
datetime.datetime | Date(year, month, day, hour, minute, second, microsecond) |
datetime.date | Date(year, month, day, 0, 0, 0, 0) |
datetime.time | Time(false, 0, hour, minute, second, microsecond) |
datetime.timedelta | Time(is_negative, days, hours, minutes, seconds, microseconds) |
time.struct_time | Date(year, month, day, hour, minute, second, 0) |
decimal.Decimal | Bytes(str(Decimal)) |
uuid.UUID | Bytes(UUID.hex) |
MySQL -> Python
| MySQL Column | Python |
|---|---|
NULL | None |
INT / TINYINT / SMALLINT / MEDIUMINT / BIGINT / YEAR | int |
FLOAT / DOUBLE | float |
DECIMAL / NUMERIC | decimal.Decimal |
DATE | datetime.date or None (0000-00-00) |
DATETIME / TIMESTAMP | datetime.datetime or None (0000-00-00 00:00:00) |
TIME | datetime.timedelta |
CHAR / VARCHAR / TEXT / TINYTEXT / MEDIUMTEXT / LONGTEXT | str |
BINARY / VARBINARY / BLOB / TINYBLOB / MEDIUMBLOB / LONGBLOB | bytes |
JSON | str or the result of json.loads() |
ENUM / SET | str |
BIT | bytes |
GEOMETRY | bytes |
Transaction
Transactions ensure a group of operations either all succeed (commit) or all fail (rollback).
class Conn:
def start_transaction(
self,
consistent_snapshot: bool = False,
isolation_level: IsolationLevel | None = None,
readonly: bool | None = None,
) -> Transaction: ...
class Transaction:
def commit(self) -> None: ...
def rollback(self) -> None: ...
The transactions should be entered as a context manager.
You must call commit() explicitly. If neither commit() nor rollback() is called, the transaction rolls back on exit.
with conn.start_transaction() as tx:
conn.exec_drop("INSERT INTO users (name) VALUES (?)", ("Alice",))
conn.exec_drop("INSERT INTO users (name) VALUES (?)", ("Bob",))
tx.commit()
# committed
with conn.start_transaction() as tx:
conn.exec_drop("INSERT INTO users (name) VALUES (?)", ("Alice",))
raise ValueError("oops")
# rolled back, no data inserted
Explicit Commit / Rollback
You can call commit() or rollback() explicitly inside the context manager.
After the call, the transaction object cannot be used anymore.
with conn.start_transaction() as tx:
conn.exec_drop("INSERT INTO users (name) VALUES (?)", ("Alice",))
if some_condition:
tx.commit()
else:
tx.rollback()
Isolation Level
from pyro_mysql import IsolationLevel
with conn.start_transaction(isolation_level=IsolationLevel.Serializable) as tx:
...
tx.commit()
| Level | Description |
|---|---|
ReadUncommitted | Allows dirty reads |
ReadCommitted | Only sees committed data |
RepeatableRead | Snapshot at transaction start (InnoDB default) |
Serializable | Full serializability |
You can also create isolation levels from strings:
level = IsolationLevel("READ COMMITTED")
level = IsolationLevel("repeatable_read")
level = IsolationLevel("sErIaLiZaBle")
assert level.as_str() == "SERIALIZABLE"
Read-Only Transactions
Set readonly=True for read-only transactions. This can improve performance.
with conn.start_transaction(readonly=True) as tx:
rows = conn.query("SELECT * FROM users")
tx.commit()
Consistent Snapshot
For InnoDB, you can request a consistent snapshot at transaction start:
with conn.start_transaction(consistent_snapshot=True) as tx:
rows = conn.query("SELECT * FROM users")
tx.commit()
This executes START TRANSACTION WITH CONSISTENT SNAPSHOT.
Async
For async connections, use async with and await:
async with conn.start_transaction() as tx:
await conn.exec_drop("INSERT INTO users (name) VALUES (?)", ("Alice",))
await tx.commit()
# explicit rollback
async with conn.start_transaction() as tx:
await conn.exec_drop("INSERT INTO users (name) VALUES (?)", ("Alice",))
await tx.rollback()
Logging
pyro_mysql uses Python’s standard logging module. The logs from Rust code are automatically bridged to Python logging when the module is imported.
The logger name is pyro_mysql.
Basic Setup
import logging
from pyro_mysql.sync import Conn
logging.basicConfig(level=logging.DEBUG)
conn = Conn("mysql://test:1234@localhost:3306/test_db") # logs will appear
import logging
logger = logging.getLogger("pyro_mysql")
Performance Tips
- Prefer MariaDB to MySQL.
- Prefer Unix socket to TCP.
- Use
BufferPoolto reuse allocations between connections. - Use
Conn.exec_bulk_insert_or_updateto group 2~1000INSERTs orUPDATEs. - The async API is fast, but still far from optimal because of the GIL. Wait for Python 3.14 and mature free-threaded builds for faster asyncio performance.
- The sync API is optimized for single-thread usage. The library does not actively release the GIL during operations. When free-threaded Python becomes mature, the optimal API will be reconsidered.
API Reference
Module Structure
pyro_mysql # Opts, IsolationLevel, Row
├── sync # Conn, Pool, PooledConn, Transaction
├── async_ # Conn, Pool, PooledConn, Transaction (async)
├── dbapi # PEP-249 DBAPI for SQLAlchemy
├── dbapi_async # Async DBAPI
└── error # Exception classes
Type Stubs
pyro_mysql- Core types and optionspyro_mysql.sync- Synchronous APIpyro_mysql.async_- Asynchronous APIpyro_mysql.dbapi- PEP-249 DBAPIpyro_mysql.dbapi_async- Async DBAPIpyro_mysql.error- Exceptions