From 237e3ff765db8a4f0bef2e0bd4248ccd773b6906 Mon Sep 17 00:00:00 2001 From: disqualifier Date: Mon, 29 Jun 2026 22:48:15 -0400 Subject: [PATCH] add package: pyproject + src (kv/hash/ttl surface, fail-loud, blocking pool) Signed-off-by: disqualifier --- pyproject.toml | 15 +++ src/redis_store/__init__.py | 3 + src/redis_store/redis_store.py | 233 +++++++++++++++++++++++++++++++++ 3 files changed, 251 insertions(+) create mode 100644 pyproject.toml create mode 100644 src/redis_store/__init__.py create mode 100644 src/redis_store/redis_store.py diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..cc5dcd5 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,15 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "redis_store" +version = "0.1.0" +description = "async redis wrapper over redis-py asyncio with a raw escape hatch, fail-loud and config-free" +requires-python = ">=3.10" +dependencies = [ + "redis>=5", +] + +[tool.hatch.build.targets.wheel] +packages = ["src/redis_store"] diff --git a/src/redis_store/__init__.py b/src/redis_store/__init__.py new file mode 100644 index 0000000..640ca30 --- /dev/null +++ b/src/redis_store/__init__.py @@ -0,0 +1,3 @@ +from .redis_store import RedisStore + +__all__ = ["RedisStore"] diff --git a/src/redis_store/redis_store.py b/src/redis_store/redis_store.py new file mode 100644 index 0000000..cb5ec5d --- /dev/null +++ b/src/redis_store/redis_store.py @@ -0,0 +1,233 @@ +""" +async redis wrapper over redis-py's asyncio client + +object pattern (one client per process), attach to the app: + from redis_store import RedisStore + app.kv = await RedisStore(host="localhost", port=6379, db=0).connect() + await app.kv.set("user:1:name", "ada", ex=3600) + name = await app.kv.get("user:1:name") # "ada" + await app.kv.close() # on shutdown + +context manager: + async with RedisStore(host="localhost") as kv: + await kv.incr("hits") + +lifecycle: + construction is sync and opens no socket (the pool connects lazily). connect() + issues a ping() so a bad host/port/auth fails loud immediately rather than on the + first real op, and returns self for the one-liner above. close() tears the client + and pool down (await client.aclose() + pool.disconnect()). + +type contract: + decode_responses=True by default, so keys and string values come back as str (and + None for an absent key). pass decode_responses=False at construction if you need raw + bytes. counters (incr/decr/exists/ttl) always return int regardless of this flag. + +errors (FAIL LOUD — unlike the mongo lib's swallow-and-default): + every wrapped method catches the driver's RedisError, logs it via + getLogger(__name__), and re-raises. a None/[]/{} return is only ever a real result + (absent key, empty hash) — never a swallowed failure. for anything not wrapped + (pipelines, pub/sub, scan, Lua, ...) use the raw `.client` escape hatch, which + exposes the full redis.asyncio.Redis surface and raises untouched. + +notes: + - import is `redis_store`; the repo/distribution is `redis` (the driver owns the + `redis` import name, so the package can't also be `redis`) + - pub/sub and pipeline helpers are intentionally not wrapped yet — use `.client` +""" + +import logging +from typing import Optional + +import redis.asyncio as redis +from redis.exceptions import RedisError + +log = logging.getLogger(__name__) + + +class RedisStore: + """async redis wrapper; one client per process, attach as app.kv""" + + def __init__( + self, + host: str = "localhost", + port: int = 6379, + db: int = 0, + password: Optional[str] = None, + *, + max_connections: int = 10, + pool_timeout: float = 30.0, + decode_responses: bool = True, + **pool_kwargs, + ): + """build the (not-yet-connected) pool + client; no I/O happens here + + host/port/db/password/max_connections are injected by the caller. extra + pool_kwargs pass through to the connection pool (socket_timeout, + socket_connect_timeout, ssl, etc). decode_responses=True returns str; set it + False for raw bytes. + + a BlockingConnectionPool is used so that, under concurrency exceeding + max_connections, callers WAIT up to pool_timeout seconds for a free connection + (matching motor/mongo's blocking pool) rather than the plain pool's behavior of + raising MaxConnectionsError immediately. pool_timeout=None waits forever. + """ + self._pool = redis.BlockingConnectionPool( + host=host, + port=port, + db=db, + password=password, + max_connections=max_connections, + timeout=pool_timeout, + decode_responses=decode_responses, + **pool_kwargs, + ) + self._client = redis.Redis(connection_pool=self._pool) + + async def connect(self) -> "RedisStore": + """open + validate the connection with a ping; fail loud on bad config + + returns self so callers can write + `kv = await RedisStore(...).connect()`. + """ + try: + await self._client.ping() + except RedisError: + log.exception("redis.connect() ping failed") + raise + return self + + async def close(self) -> None: + """close the client and disconnect the pool on shutdown""" + try: + await self._client.aclose() + await self._pool.disconnect() + except RedisError: + log.exception("redis.close()") + raise + + async def __aenter__(self) -> "RedisStore": + return await self.connect() + + async def __aexit__(self, exc_type, exc, tb) -> None: + await self.close() + + @property + def client(self) -> redis.Redis: + """raw redis.asyncio.Redis escape hatch; full driver surface, raises + + use for anything not wrapped: pipelines, pub/sub, scan, Lua, etc. + """ + return self._client + + # ------------------------------------------------------------------------- + # key / value + + async def get(self, key: str) -> Optional[str]: + """return the string value at key, or None if the key is absent""" + try: + return await self._client.get(key) + except RedisError: + log.exception("redis.get(%s)", key) + raise + + async def set(self, key: str, value, ex: Optional[int] = None) -> bool: + """set key to value, optionally with a ttl of ex seconds; returns the ack""" + try: + return bool(await self._client.set(key, value, ex=ex)) + except RedisError: + log.exception("redis.set(%s)", key) + raise + + async def delete(self, *keys: str) -> int: + """delete one or more keys, returning how many existed and were removed""" + try: + return await self._client.delete(*keys) + except RedisError: + log.exception("redis.delete(%s)", keys) + raise + + async def exists(self, *keys: str) -> int: + """return how many of the given keys exist (counts duplicates)""" + try: + return await self._client.exists(*keys) + except RedisError: + log.exception("redis.exists(%s)", keys) + raise + + async def incr(self, key: str, amount: int = 1) -> int: + """atomically increment key by amount (default 1), returning the new value""" + try: + return await self._client.incrby(key, amount) + except RedisError: + log.exception("redis.incr(%s)", key) + raise + + async def decr(self, key: str, amount: int = 1) -> int: + """atomically decrement key by amount (default 1), returning the new value""" + try: + return await self._client.decrby(key, amount) + except RedisError: + log.exception("redis.decr(%s)", key) + raise + + # ------------------------------------------------------------------------- + # hash + + async def hget(self, name: str, field: str) -> Optional[str]: + """return the value of field in hash name, or None if either is absent""" + try: + return await self._client.hget(name, field) + except RedisError: + log.exception("redis.hget(%s, %s)", name, field) + raise + + async def hset(self, name: str, key: Optional[str] = None, value=None, mapping: Optional[dict] = None) -> int: + """set field(s) in hash name; returns the count of newly-added fields + + pass a single field via key/value, or several via mapping={field: value}. + """ + try: + return await self._client.hset(name, key=key, value=value, mapping=mapping) + except RedisError: + log.exception("redis.hset(%s)", name) + raise + + async def hgetall(self, name: str) -> dict: + """return the whole hash as a dict; empty dict if the hash does not exist""" + try: + return await self._client.hgetall(name) + except RedisError: + log.exception("redis.hgetall(%s)", name) + raise + + async def hdel(self, name: str, *fields: str) -> int: + """delete fields from hash name, returning how many were removed""" + try: + return await self._client.hdel(name, *fields) + except RedisError: + log.exception("redis.hdel(%s)", name) + raise + + # ------------------------------------------------------------------------- + # expiry / ttl + + async def expire(self, key: str, seconds: int) -> bool: + """set a ttl of seconds on key; returns False if the key does not exist""" + try: + return bool(await self._client.expire(key, seconds)) + except RedisError: + log.exception("redis.expire(%s)", key) + raise + + async def ttl(self, key: str) -> int: + """return the remaining ttl in seconds + + the driver's sentinels pass through: -1 means the key exists but has no expiry, + -2 means the key does not exist. + """ + try: + return await self._client.ttl(key) + except RedisError: + log.exception("redis.ttl(%s)", key) + raise