From 3922295f2c82edd52be937ccf9d14293a1fffa91 Mon Sep 17 00:00:00 2001 From: disqualifier Date: Mon, 22 Jun 2026 21:26:40 -0400 Subject: [PATCH] add package: pyproject + source src/ layout package making the repo pip-installable (git+ssh@vX.Y.Z). Mongo class with both usage patterns (object + module proxy), wrapped methods log-and-swallow, raw collection escape hatch raises. Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: disqualifier --- pyproject.toml | 17 ++ src/mongo/__init__.py | 14 ++ src/mongo/mongo.py | 501 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 532 insertions(+) create mode 100644 pyproject.toml create mode 100644 src/mongo/__init__.py create mode 100644 src/mongo/mongo.py diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..d64a6a3 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,17 @@ +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "mongo" +version = "0.1.0" +description = "async mongodb wrapper over motor with a raw escape hatch" +readme = "README.md" +requires-python = ">=3.10" +dependencies = [ + "motor>=3.0", + "pymongo>=4.0", +] + +[tool.setuptools.packages.find] +where = ["src"] diff --git a/src/mongo/__init__.py b/src/mongo/__init__.py new file mode 100644 index 0000000..fa500e0 --- /dev/null +++ b/src/mongo/__init__.py @@ -0,0 +1,14 @@ +from .mongo import Mongo, init, instance + +__all__ = ["Mongo", "init", "instance"] + + +def __getattr__(name: str): + """proxy bare package attribute access to the default instance (PEP 562) + + lets `import mongo; await mongo.get_documents(...)` work after init(). + `from mongo import func` still won't see this (resolved before init) + """ + if not name.startswith("_") and hasattr(Mongo, name): + return getattr(instance(), name) + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/src/mongo/mongo.py b/src/mongo/mongo.py new file mode 100644 index 0000000..24baeba --- /dev/null +++ b/src/mongo/mongo.py @@ -0,0 +1,501 @@ +""" +async mongodb wrapper over motor + +object (preferred), one client per process: + from mongo import Mongo + bot.db = Mongo(conn_string, database) + await bot.db.get_documents("users", {"active": True}) + await bot.db.close() # on shutdown + +module proxy (back-compat), arm once then call bare: + import mongo # not `from mongo import ...` + mongo.init(conn_string, database) + await mongo.get_documents("users", {"active": True}) + +errors: + wrapped methods log and swallow, returning a safe default + (False / [] / {} / 0 / None). .collection(name) / db[name] return + the raw motor collection: full driver surface, raises, nothing swallowed. + +notes: + - the proxy needs `import mongo`; `from mongo import func` resolves before init + - find_one_and_update returns the after-image by default + - bulk_write takes caller-built pymongo ops (UpdateOne/DeleteOne/...) +""" + +import logging +from typing import Any, List, Optional + +from pymongo import ReturnDocument +from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorCollection + +log = logging.getLogger(__name__) + + +class Mongo: + """async mongodb wrapper; one client per process, attach to bot as bot.db""" + + def __init__(self, connection_string: str, database: str): + self._client = AsyncIOMotorClient(connection_string) + self._db = self._client[database] + + def __getitem__(self, collection: str) -> AsyncIOMotorCollection: + """raw collection access via subscript: bot.db['users'].aggregate(...)""" + return self._db[collection] + + def collection(self, name: str) -> AsyncIOMotorCollection: + """raw motor collection escape hatch; full driver surface, raises on error""" + return self._db[name] + + @property + def database(self): + """raw motor database handle""" + return self._db + + async def ping(self) -> bool: + """health check against the server""" + try: + await self._client.admin.command("ping") + return True + except Exception: + log.exception("db.ping()") + return False + + async def close(self) -> None: + """close the client pool on shutdown""" + self._client.close() + + # ------------------------------------------------------------------------- + # collection / index management + + async def create_collection(self, collection: str, indexes=None) -> bool: + """create a collection, optionally seeding an index""" + try: + await self._db.create_collection(collection) + if indexes: + await self._db[collection].create_index(indexes) + log.info(f"created indexes for {collection}") + return True + except Exception: + log.exception(f"db.create_collection() for {collection}") + return False + + async def create_index(self, collection: str, index, **kwargs) -> bool: + """create an index; kwargs pass through (unique=True, name=..., etc)""" + try: + await self._db[collection].create_index(index, **kwargs) + log.info(f"created index for {collection}") + return True + except Exception: + log.exception(f"db.create_index() for {collection}") + return False + + async def drop_collection(self, collection: str) -> bool: + """drop a collection entirely""" + try: + await self._db.drop_collection(collection) + return True + except Exception: + log.exception(f"db.drop_collection() for {collection}") + return False + + async def check_collection_exists(self, collection: str) -> bool: + """return whether a collection exists""" + try: + return collection in await self._db.list_collection_names() + except Exception: + log.exception(f"db.check_collection_exists() for {collection}") + return False + + async def list_collections(self) -> List[str]: + """return all collection names in the database""" + try: + return await self._db.list_collection_names() + except Exception: + log.exception("db.list_collections()") + return [] + + # ------------------------------------------------------------------------- + # create + + async def create_document(self, collection: str, document: dict) -> bool: + """insert a single document""" + try: + await self._db[collection].insert_one(document) + return True + except Exception: + log.exception(f"db.create_document() on {collection}") + return False + + async def create_documents( + self, collection: str, documents: List[dict], ordered: bool = True + ) -> int: + """insert many documents, returning the inserted count""" + try: + result = await self._db[collection].insert_many(documents, ordered=ordered) + return len(result.inserted_ids) + except Exception: + log.exception(f"db.create_documents() on {collection}") + return 0 + + # ------------------------------------------------------------------------- + # read + + async def get_document( + self, collection: str, target: dict, fields: Optional[dict] = None + ) -> Optional[dict]: + """return a single document with optional field projection, or None""" + try: + return await self._db[collection].find_one(target, fields) + except Exception: + log.exception(f"db.get_document() on {collection}") + return None + + async def get_documents( + self, collection: str, target: dict, fields: Optional[dict] = None + ) -> List[dict]: + """return all matching documents""" + try: + cursor = self._db[collection].find(target, fields) + return [doc async for doc in cursor] + except Exception: + log.exception(f"db.get_documents() on {collection}") + return [] + + async def get_documents_sorted( + self, collection: str, target: dict, sort_key: str, sort_order: int = 1, + fields: Optional[dict] = None, limit: int = 0, skip: int = 0, + ) -> List[dict]: + """return matching documents sorted by key; limit=0 means no limit""" + try: + cursor = ( + self._db[collection] + .find(target, fields) + .sort(sort_key, sort_order) + .skip(skip) + .limit(limit) + ) + return [doc async for doc in cursor] + except Exception: + log.exception(f"db.get_documents_sorted() on {collection}") + return [] + + async def get_document_hashmap(self, collection: str, target: dict, key: str) -> dict: + """return matching documents keyed into a dict by the given field""" + try: + cursor = self._db[collection].find(target) + return {doc[key]: doc async for doc in cursor} + except Exception: + log.exception(f"db.get_document_hashmap() on {collection}") + return {} + + async def get_document_fields( + self, collection: str, target: dict, key: str, fields: Optional[dict] = None + ) -> List: + """return a flat list of one field's value across matching documents""" + try: + cursor = self._db[collection].find(target, fields) + return [doc[key] async for doc in cursor] + except Exception: + log.exception(f"db.get_document_fields() on {collection}") + return [] + + async def count_documents(self, collection: str, target: dict) -> int: + """count documents matching the filter""" + try: + return await self._db[collection].count_documents(target) + except Exception: + log.exception(f"db.count_documents() on {collection}") + return 0 + + async def count_value_in_array(self, collection: str, array_field: str, value: str) -> int: + """count documents whose array field contains value""" + try: + return await self._db[collection].count_documents({array_field: value}) + except Exception: + log.exception(f"db.count_value_in_array() on {collection}") + return 0 + + async def exists_in_array(self, collection: str, array_field: str, value: str) -> bool: + """return whether any document's array field contains value""" + try: + doc = await self._db[collection].find_one({array_field: value}, {"_id": 1}) + return doc is not None + except Exception: + log.exception(f"db.exists_in_array() on {collection}") + return False + + async def distinct(self, collection: str, field: str, target: Optional[dict] = None) -> List: + """return the distinct values for a field across matching documents""" + try: + return await self._db[collection].distinct(field, target or {}) + except Exception: + log.exception(f"db.distinct() on {collection}") + return [] + + async def run_aggregation( + self, collection: str, pipeline: list, match_filter: Optional[dict] = None + ) -> List[dict]: + """run an aggregation pipeline, optionally prepending a $match stage""" + try: + stages = [{"$match": match_filter}, *pipeline] if match_filter else pipeline + return await self._db[collection].aggregate(stages).to_list(length=None) + except Exception: + log.exception(f"db.run_aggregation() on {collection}") + return [] + + async def search_documents( + self, collection: str, search_query: dict, sort_key: str, sort_order: int = 1 + ) -> List[dict]: + """find documents by filter and sort them by key""" + try: + cursor = self._db[collection].find(search_query).sort(sort_key, sort_order) + return [doc async for doc in cursor] + except Exception: + log.exception(f"db.search_documents() on {collection}") + return [] + + async def search_indexes(self, collection: str, search_term: str) -> List[dict]: + """run a $text search and return results ordered by relevance score""" + try: + cursor = self._db[collection].find( + {"$text": {"$search": search_term}}, + {"score": {"$meta": "textScore"}}, + ).sort([("score", {"$meta": "textScore"})]) + return [doc async for doc in cursor] + except Exception: + log.exception(f"db.search_indexes() on {collection}") + return [] + + # ------------------------------------------------------------------------- + # update + + async def update_document( + self, collection: str, target: dict, document: dict, do_upsert: bool = False + ) -> bool: + """replace a whole document, optionally upserting""" + try: + response = await self._db[collection].replace_one(target, document, upsert=do_upsert) + return bool(response.matched_count or response.upserted_id) + except Exception: + log.exception(f"db.update_document() on {collection}") + return False + + async def update_documents(self, collection: str, target: dict, values: dict) -> int: + """update_many with a raw update doc, returning modified count""" + try: + response = await self._db[collection].update_many(target, values) + return response.modified_count + except Exception: + log.exception(f"db.update_documents() on {collection}") + return 0 + + async def update_document_field(self, collection: str, target: dict, updates: dict) -> bool: + """$set one or more fields on a single document""" + try: + response = await self._db[collection].update_one(target, {"$set": updates}) + return response.modified_count > 0 + except Exception: + log.exception(f"db.update_document_field() on {collection}") + return False + + async def update_document_operator( + self, collection: str, target: dict, update_query: dict + ) -> bool: + """apply raw update operators ($set/$inc/$unset/...) to a single document""" + try: + response = await self._db[collection].update_one(target, update_query) + return response.modified_count > 0 + except Exception: + log.exception(f"db.update_document_operator() on {collection}") + return False + + async def upsert_document_field(self, collection: str, target: dict, updates: dict) -> bool: + """$set fields on a single document, creating it if absent""" + try: + response = await self._db[collection].update_one( + target, {"$set": updates}, upsert=True + ) + return bool(response.modified_count or response.upserted_id) + except Exception: + log.exception(f"db.upsert_document_field() on {collection}") + return False + + async def update_documents_pipeline( + self, collection: str, target: dict, pipeline: list + ) -> int: + """update_many using an aggregation-pipeline update""" + try: + response = await self._db[collection].update_many(target, pipeline) + return response.modified_count + except Exception: + log.exception(f"db.update_documents_pipeline() on {collection}") + return 0 + + async def document_push_array( + self, collection: str, target: dict, array: str, value: Any + ) -> bool: + """$push a value onto an array field""" + try: + response = await self._db[collection].update_one(target, {"$push": {array: value}}) + return response.modified_count > 0 + except Exception: + log.exception(f"db.document_push_array() on {collection}") + return False + + async def document_push_and_set( + self, collection: str, target: dict, array: str, value: Any, + field_to_set: str, set_value: Any, + ) -> bool: + """$push to an array and $set a field in one update""" + try: + response = await self._db[collection].update_one( + target, + {"$push": {array: value}, "$set": {field_to_set: set_value}}, + ) + return response.modified_count > 0 + except Exception: + log.exception(f"db.document_push_and_set() on {collection}") + return False + + async def document_pop_array( + self, collection: str, target: dict, array: str, value: Any + ) -> bool: + """$pull a value from an array field""" + try: + response = await self._db[collection].update_one(target, {"$pull": {array: value}}) + return response.modified_count > 0 + except Exception: + log.exception(f"db.document_pop_array() on {collection}") + return False + + async def find_one_and_update( + self, collection: str, target: dict, update: dict, + return_after: bool = True, do_upsert: bool = False, fields: Optional[dict] = None, + ) -> Optional[dict]: + """atomically update one doc and return it (after-image by default)""" + try: + return await self._db[collection].find_one_and_update( + target, update, + projection=fields, + upsert=do_upsert, + return_document=ReturnDocument.AFTER if return_after else ReturnDocument.BEFORE, + ) + except Exception: + log.exception(f"db.find_one_and_update() on {collection}") + return None + + async def find_one_and_replace( + self, collection: str, target: dict, document: dict, + return_after: bool = True, do_upsert: bool = False, fields: Optional[dict] = None, + ) -> Optional[dict]: + """atomically replace one doc and return it (after-image by default)""" + try: + return await self._db[collection].find_one_and_replace( + target, document, + projection=fields, + upsert=do_upsert, + return_document=ReturnDocument.AFTER if return_after else ReturnDocument.BEFORE, + ) + except Exception: + log.exception(f"db.find_one_and_replace() on {collection}") + return None + + async def find_one_and_delete( + self, collection: str, target: dict, fields: Optional[dict] = None + ) -> Optional[dict]: + """atomically delete one doc and return it""" + try: + return await self._db[collection].find_one_and_delete(target, projection=fields) + except Exception: + log.exception(f"db.find_one_and_delete() on {collection}") + return None + + # ------------------------------------------------------------------------- + # delete + + async def delete_document(self, collection: str, target: dict) -> int: + """delete a single document, returning deleted count (0 or 1)""" + try: + response = await self._db[collection].delete_one(target) + return response.deleted_count + except Exception: + log.exception(f"db.delete_document() on {collection}") + return 0 + + async def delete_documents(self, collection: str, target: dict) -> int: + """delete all matching documents, returning deleted count""" + try: + response = await self._db[collection].delete_many(target) + return response.deleted_count + except Exception: + log.exception(f"db.delete_documents() on {collection}") + return 0 + + # ------------------------------------------------------------------------- + # bulk / checks + + async def bulk_write(self, collection: str, operations: list, ordered: bool = True) -> dict: + """ + run a batch of pymongo write ops (InsertOne/UpdateOne/DeleteOne/...) + + caller builds the ops from pymongo, e.g.: + from pymongo import UpdateOne + ops = [UpdateOne({'_id': i}, {'$set': {...}}, upsert=True) for i in ids] + returns a summary dict of counts, or an empty dict on failure + """ + try: + result = await self._db[collection].bulk_write(operations, ordered=ordered) + return { + "inserted": result.inserted_count, + "matched": result.matched_count, + "modified": result.modified_count, + "deleted": result.deleted_count, + "upserted": result.upserted_count, + } + except Exception: + log.exception(f"db.bulk_write() on {collection}") + return {} + + async def document_check_multi( + self, collection: str, target: dict, checks: List[str], value: str + ) -> bool: + """ + return whether a doc matching target has value in any of the given arrays + + target is a normal filter dict (merged into the query), not a bare _id + """ + try: + query = { + **target, + "$or": [{check: {"$elemMatch": {"$eq": value}}} for check in checks], + } + return await self._db[collection].find_one(query) is not None + except Exception: + log.exception(f"db.document_check_multi() on {collection}") + return False + + +# ----------------------------------------------------------------------------- +# backwards-compat module proxy +# - lets legacy call sites keep using `await mongo.get_documents(...)` +# without an object, after a one-time `mongo.init(conn, db)` +# - works with `import mongo; mongo.func(...)` (resolved at call time) +# - does NOT work with `from mongo import func` (resolved at import, +# before init runs) — switch those sites to `import mongo` + +_default: Optional[Mongo] = None + + +def init(connection_string: str, database: str) -> Mongo: + """arm the module-level default instance and return it""" + global _default + _default = Mongo(connection_string, database) + return _default + + +def instance() -> Mongo: + """return the default instance, raising if init() has not run""" + if _default is None: + raise RuntimeError("mongo not initialized; call mongo.init(conn, db) first") + return _default