add package: pyproject + src (kv/hash/ttl surface, fail-loud, blocking pool)
Signed-off-by: disqualifier <dev@disqualifier.me>
This commit is contained in:
parent
f1668befb6
commit
237e3ff765
15
pyproject.toml
Normal file
15
pyproject.toml
Normal file
@ -0,0 +1,15 @@
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
[project]
|
||||
name = "redis_store"
|
||||
version = "0.1.0"
|
||||
description = "async redis wrapper over redis-py asyncio with a raw escape hatch, fail-loud and config-free"
|
||||
requires-python = ">=3.10"
|
||||
dependencies = [
|
||||
"redis>=5",
|
||||
]
|
||||
|
||||
[tool.hatch.build.targets.wheel]
|
||||
packages = ["src/redis_store"]
|
||||
3
src/redis_store/__init__.py
Normal file
3
src/redis_store/__init__.py
Normal file
@ -0,0 +1,3 @@
|
||||
from .redis_store import RedisStore
|
||||
|
||||
__all__ = ["RedisStore"]
|
||||
233
src/redis_store/redis_store.py
Normal file
233
src/redis_store/redis_store.py
Normal file
@ -0,0 +1,233 @@
|
||||
"""
|
||||
async redis wrapper over redis-py's asyncio client
|
||||
|
||||
object pattern (one client per process), attach to the app:
|
||||
from redis_store import RedisStore
|
||||
app.kv = await RedisStore(host="localhost", port=6379, db=0).connect()
|
||||
await app.kv.set("user:1:name", "ada", ex=3600)
|
||||
name = await app.kv.get("user:1:name") # "ada"
|
||||
await app.kv.close() # on shutdown
|
||||
|
||||
context manager:
|
||||
async with RedisStore(host="localhost") as kv:
|
||||
await kv.incr("hits")
|
||||
|
||||
lifecycle:
|
||||
construction is sync and opens no socket (the pool connects lazily). connect()
|
||||
issues a ping() so a bad host/port/auth fails loud immediately rather than on the
|
||||
first real op, and returns self for the one-liner above. close() tears the client
|
||||
and pool down (await client.aclose() + pool.disconnect()).
|
||||
|
||||
type contract:
|
||||
decode_responses=True by default, so keys and string values come back as str (and
|
||||
None for an absent key). pass decode_responses=False at construction if you need raw
|
||||
bytes. counters (incr/decr/exists/ttl) always return int regardless of this flag.
|
||||
|
||||
errors (FAIL LOUD — unlike the mongo lib's swallow-and-default):
|
||||
every wrapped method catches the driver's RedisError, logs it via
|
||||
getLogger(__name__), and re-raises. a None/[]/{} return is only ever a real result
|
||||
(absent key, empty hash) — never a swallowed failure. for anything not wrapped
|
||||
(pipelines, pub/sub, scan, Lua, ...) use the raw `.client` escape hatch, which
|
||||
exposes the full redis.asyncio.Redis surface and raises untouched.
|
||||
|
||||
notes:
|
||||
- import is `redis_store`; the repo/distribution is `redis` (the driver owns the
|
||||
`redis` import name, so the package can't also be `redis`)
|
||||
- pub/sub and pipeline helpers are intentionally not wrapped yet — use `.client`
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
import redis.asyncio as redis
|
||||
from redis.exceptions import RedisError
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RedisStore:
|
||||
"""async redis wrapper; one client per process, attach as app.kv"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
host: str = "localhost",
|
||||
port: int = 6379,
|
||||
db: int = 0,
|
||||
password: Optional[str] = None,
|
||||
*,
|
||||
max_connections: int = 10,
|
||||
pool_timeout: float = 30.0,
|
||||
decode_responses: bool = True,
|
||||
**pool_kwargs,
|
||||
):
|
||||
"""build the (not-yet-connected) pool + client; no I/O happens here
|
||||
|
||||
host/port/db/password/max_connections are injected by the caller. extra
|
||||
pool_kwargs pass through to the connection pool (socket_timeout,
|
||||
socket_connect_timeout, ssl, etc). decode_responses=True returns str; set it
|
||||
False for raw bytes.
|
||||
|
||||
a BlockingConnectionPool is used so that, under concurrency exceeding
|
||||
max_connections, callers WAIT up to pool_timeout seconds for a free connection
|
||||
(matching motor/mongo's blocking pool) rather than the plain pool's behavior of
|
||||
raising MaxConnectionsError immediately. pool_timeout=None waits forever.
|
||||
"""
|
||||
self._pool = redis.BlockingConnectionPool(
|
||||
host=host,
|
||||
port=port,
|
||||
db=db,
|
||||
password=password,
|
||||
max_connections=max_connections,
|
||||
timeout=pool_timeout,
|
||||
decode_responses=decode_responses,
|
||||
**pool_kwargs,
|
||||
)
|
||||
self._client = redis.Redis(connection_pool=self._pool)
|
||||
|
||||
async def connect(self) -> "RedisStore":
|
||||
"""open + validate the connection with a ping; fail loud on bad config
|
||||
|
||||
returns self so callers can write
|
||||
`kv = await RedisStore(...).connect()`.
|
||||
"""
|
||||
try:
|
||||
await self._client.ping()
|
||||
except RedisError:
|
||||
log.exception("redis.connect() ping failed")
|
||||
raise
|
||||
return self
|
||||
|
||||
async def close(self) -> None:
|
||||
"""close the client and disconnect the pool on shutdown"""
|
||||
try:
|
||||
await self._client.aclose()
|
||||
await self._pool.disconnect()
|
||||
except RedisError:
|
||||
log.exception("redis.close()")
|
||||
raise
|
||||
|
||||
async def __aenter__(self) -> "RedisStore":
|
||||
return await self.connect()
|
||||
|
||||
async def __aexit__(self, exc_type, exc, tb) -> None:
|
||||
await self.close()
|
||||
|
||||
@property
|
||||
def client(self) -> redis.Redis:
|
||||
"""raw redis.asyncio.Redis escape hatch; full driver surface, raises
|
||||
|
||||
use for anything not wrapped: pipelines, pub/sub, scan, Lua, etc.
|
||||
"""
|
||||
return self._client
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# key / value
|
||||
|
||||
async def get(self, key: str) -> Optional[str]:
|
||||
"""return the string value at key, or None if the key is absent"""
|
||||
try:
|
||||
return await self._client.get(key)
|
||||
except RedisError:
|
||||
log.exception("redis.get(%s)", key)
|
||||
raise
|
||||
|
||||
async def set(self, key: str, value, ex: Optional[int] = None) -> bool:
|
||||
"""set key to value, optionally with a ttl of ex seconds; returns the ack"""
|
||||
try:
|
||||
return bool(await self._client.set(key, value, ex=ex))
|
||||
except RedisError:
|
||||
log.exception("redis.set(%s)", key)
|
||||
raise
|
||||
|
||||
async def delete(self, *keys: str) -> int:
|
||||
"""delete one or more keys, returning how many existed and were removed"""
|
||||
try:
|
||||
return await self._client.delete(*keys)
|
||||
except RedisError:
|
||||
log.exception("redis.delete(%s)", keys)
|
||||
raise
|
||||
|
||||
async def exists(self, *keys: str) -> int:
|
||||
"""return how many of the given keys exist (counts duplicates)"""
|
||||
try:
|
||||
return await self._client.exists(*keys)
|
||||
except RedisError:
|
||||
log.exception("redis.exists(%s)", keys)
|
||||
raise
|
||||
|
||||
async def incr(self, key: str, amount: int = 1) -> int:
|
||||
"""atomically increment key by amount (default 1), returning the new value"""
|
||||
try:
|
||||
return await self._client.incrby(key, amount)
|
||||
except RedisError:
|
||||
log.exception("redis.incr(%s)", key)
|
||||
raise
|
||||
|
||||
async def decr(self, key: str, amount: int = 1) -> int:
|
||||
"""atomically decrement key by amount (default 1), returning the new value"""
|
||||
try:
|
||||
return await self._client.decrby(key, amount)
|
||||
except RedisError:
|
||||
log.exception("redis.decr(%s)", key)
|
||||
raise
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# hash
|
||||
|
||||
async def hget(self, name: str, field: str) -> Optional[str]:
|
||||
"""return the value of field in hash name, or None if either is absent"""
|
||||
try:
|
||||
return await self._client.hget(name, field)
|
||||
except RedisError:
|
||||
log.exception("redis.hget(%s, %s)", name, field)
|
||||
raise
|
||||
|
||||
async def hset(self, name: str, key: Optional[str] = None, value=None, mapping: Optional[dict] = None) -> int:
|
||||
"""set field(s) in hash name; returns the count of newly-added fields
|
||||
|
||||
pass a single field via key/value, or several via mapping={field: value}.
|
||||
"""
|
||||
try:
|
||||
return await self._client.hset(name, key=key, value=value, mapping=mapping)
|
||||
except RedisError:
|
||||
log.exception("redis.hset(%s)", name)
|
||||
raise
|
||||
|
||||
async def hgetall(self, name: str) -> dict:
|
||||
"""return the whole hash as a dict; empty dict if the hash does not exist"""
|
||||
try:
|
||||
return await self._client.hgetall(name)
|
||||
except RedisError:
|
||||
log.exception("redis.hgetall(%s)", name)
|
||||
raise
|
||||
|
||||
async def hdel(self, name: str, *fields: str) -> int:
|
||||
"""delete fields from hash name, returning how many were removed"""
|
||||
try:
|
||||
return await self._client.hdel(name, *fields)
|
||||
except RedisError:
|
||||
log.exception("redis.hdel(%s)", name)
|
||||
raise
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# expiry / ttl
|
||||
|
||||
async def expire(self, key: str, seconds: int) -> bool:
|
||||
"""set a ttl of seconds on key; returns False if the key does not exist"""
|
||||
try:
|
||||
return bool(await self._client.expire(key, seconds))
|
||||
except RedisError:
|
||||
log.exception("redis.expire(%s)", key)
|
||||
raise
|
||||
|
||||
async def ttl(self, key: str) -> int:
|
||||
"""return the remaining ttl in seconds
|
||||
|
||||
the driver's sentinels pass through: -1 means the key exists but has no expiry,
|
||||
-2 means the key does not exist.
|
||||
"""
|
||||
try:
|
||||
return await self._client.ttl(key)
|
||||
except RedisError:
|
||||
log.exception("redis.ttl(%s)", key)
|
||||
raise
|
||||
Loading…
Reference in New Issue
Block a user