add package: pyproject + source

src/ layout package making the repo pip-installable
(git+ssh@vX.Y.Z). Mongo class with both usage patterns (object +
module proxy), wrapped methods log-and-swallow, raw collection escape
hatch raises.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: disqualifier <dev@disqualifier.me>
This commit is contained in:
disqualifier 2026-06-22 21:26:40 -04:00
parent b79f965d07
commit 3922295f2c
3 changed files with 532 additions and 0 deletions

17
pyproject.toml Normal file
View File

@ -0,0 +1,17 @@
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"
[project]
name = "mongo"
version = "0.1.0"
description = "async mongodb wrapper over motor with a raw escape hatch"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"motor>=3.0",
"pymongo>=4.0",
]
[tool.setuptools.packages.find]
where = ["src"]

14
src/mongo/__init__.py Normal file
View File

@ -0,0 +1,14 @@
from .mongo import Mongo, init, instance
__all__ = ["Mongo", "init", "instance"]
def __getattr__(name: str):
"""proxy bare package attribute access to the default instance (PEP 562)
lets `import mongo; await mongo.get_documents(...)` work after init().
`from mongo import func` still won't see this (resolved before init)
"""
if not name.startswith("_") and hasattr(Mongo, name):
return getattr(instance(), name)
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

501
src/mongo/mongo.py Normal file
View File

@ -0,0 +1,501 @@
"""
async mongodb wrapper over motor
object (preferred), one client per process:
from mongo import Mongo
bot.db = Mongo(conn_string, database)
await bot.db.get_documents("users", {"active": True})
await bot.db.close() # on shutdown
module proxy (back-compat), arm once then call bare:
import mongo # not `from mongo import ...`
mongo.init(conn_string, database)
await mongo.get_documents("users", {"active": True})
errors:
wrapped methods log and swallow, returning a safe default
(False / [] / {} / 0 / None). .collection(name) / db[name] return
the raw motor collection: full driver surface, raises, nothing swallowed.
notes:
- the proxy needs `import mongo`; `from mongo import func` resolves before init
- find_one_and_update returns the after-image by default
- bulk_write takes caller-built pymongo ops (UpdateOne/DeleteOne/...)
"""
import logging
from typing import Any, List, Optional
from pymongo import ReturnDocument
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorCollection
log = logging.getLogger(__name__)
class Mongo:
"""async mongodb wrapper; one client per process, attach to bot as bot.db"""
def __init__(self, connection_string: str, database: str):
self._client = AsyncIOMotorClient(connection_string)
self._db = self._client[database]
def __getitem__(self, collection: str) -> AsyncIOMotorCollection:
"""raw collection access via subscript: bot.db['users'].aggregate(...)"""
return self._db[collection]
def collection(self, name: str) -> AsyncIOMotorCollection:
"""raw motor collection escape hatch; full driver surface, raises on error"""
return self._db[name]
@property
def database(self):
"""raw motor database handle"""
return self._db
async def ping(self) -> bool:
"""health check against the server"""
try:
await self._client.admin.command("ping")
return True
except Exception:
log.exception("db.ping()")
return False
async def close(self) -> None:
"""close the client pool on shutdown"""
self._client.close()
# -------------------------------------------------------------------------
# collection / index management
async def create_collection(self, collection: str, indexes=None) -> bool:
"""create a collection, optionally seeding an index"""
try:
await self._db.create_collection(collection)
if indexes:
await self._db[collection].create_index(indexes)
log.info(f"created indexes for {collection}")
return True
except Exception:
log.exception(f"db.create_collection() for {collection}")
return False
async def create_index(self, collection: str, index, **kwargs) -> bool:
"""create an index; kwargs pass through (unique=True, name=..., etc)"""
try:
await self._db[collection].create_index(index, **kwargs)
log.info(f"created index for {collection}")
return True
except Exception:
log.exception(f"db.create_index() for {collection}")
return False
async def drop_collection(self, collection: str) -> bool:
"""drop a collection entirely"""
try:
await self._db.drop_collection(collection)
return True
except Exception:
log.exception(f"db.drop_collection() for {collection}")
return False
async def check_collection_exists(self, collection: str) -> bool:
"""return whether a collection exists"""
try:
return collection in await self._db.list_collection_names()
except Exception:
log.exception(f"db.check_collection_exists() for {collection}")
return False
async def list_collections(self) -> List[str]:
"""return all collection names in the database"""
try:
return await self._db.list_collection_names()
except Exception:
log.exception("db.list_collections()")
return []
# -------------------------------------------------------------------------
# create
async def create_document(self, collection: str, document: dict) -> bool:
"""insert a single document"""
try:
await self._db[collection].insert_one(document)
return True
except Exception:
log.exception(f"db.create_document() on {collection}")
return False
async def create_documents(
self, collection: str, documents: List[dict], ordered: bool = True
) -> int:
"""insert many documents, returning the inserted count"""
try:
result = await self._db[collection].insert_many(documents, ordered=ordered)
return len(result.inserted_ids)
except Exception:
log.exception(f"db.create_documents() on {collection}")
return 0
# -------------------------------------------------------------------------
# read
async def get_document(
self, collection: str, target: dict, fields: Optional[dict] = None
) -> Optional[dict]:
"""return a single document with optional field projection, or None"""
try:
return await self._db[collection].find_one(target, fields)
except Exception:
log.exception(f"db.get_document() on {collection}")
return None
async def get_documents(
self, collection: str, target: dict, fields: Optional[dict] = None
) -> List[dict]:
"""return all matching documents"""
try:
cursor = self._db[collection].find(target, fields)
return [doc async for doc in cursor]
except Exception:
log.exception(f"db.get_documents() on {collection}")
return []
async def get_documents_sorted(
self, collection: str, target: dict, sort_key: str, sort_order: int = 1,
fields: Optional[dict] = None, limit: int = 0, skip: int = 0,
) -> List[dict]:
"""return matching documents sorted by key; limit=0 means no limit"""
try:
cursor = (
self._db[collection]
.find(target, fields)
.sort(sort_key, sort_order)
.skip(skip)
.limit(limit)
)
return [doc async for doc in cursor]
except Exception:
log.exception(f"db.get_documents_sorted() on {collection}")
return []
async def get_document_hashmap(self, collection: str, target: dict, key: str) -> dict:
"""return matching documents keyed into a dict by the given field"""
try:
cursor = self._db[collection].find(target)
return {doc[key]: doc async for doc in cursor}
except Exception:
log.exception(f"db.get_document_hashmap() on {collection}")
return {}
async def get_document_fields(
self, collection: str, target: dict, key: str, fields: Optional[dict] = None
) -> List:
"""return a flat list of one field's value across matching documents"""
try:
cursor = self._db[collection].find(target, fields)
return [doc[key] async for doc in cursor]
except Exception:
log.exception(f"db.get_document_fields() on {collection}")
return []
async def count_documents(self, collection: str, target: dict) -> int:
"""count documents matching the filter"""
try:
return await self._db[collection].count_documents(target)
except Exception:
log.exception(f"db.count_documents() on {collection}")
return 0
async def count_value_in_array(self, collection: str, array_field: str, value: str) -> int:
"""count documents whose array field contains value"""
try:
return await self._db[collection].count_documents({array_field: value})
except Exception:
log.exception(f"db.count_value_in_array() on {collection}")
return 0
async def exists_in_array(self, collection: str, array_field: str, value: str) -> bool:
"""return whether any document's array field contains value"""
try:
doc = await self._db[collection].find_one({array_field: value}, {"_id": 1})
return doc is not None
except Exception:
log.exception(f"db.exists_in_array() on {collection}")
return False
async def distinct(self, collection: str, field: str, target: Optional[dict] = None) -> List:
"""return the distinct values for a field across matching documents"""
try:
return await self._db[collection].distinct(field, target or {})
except Exception:
log.exception(f"db.distinct() on {collection}")
return []
async def run_aggregation(
self, collection: str, pipeline: list, match_filter: Optional[dict] = None
) -> List[dict]:
"""run an aggregation pipeline, optionally prepending a $match stage"""
try:
stages = [{"$match": match_filter}, *pipeline] if match_filter else pipeline
return await self._db[collection].aggregate(stages).to_list(length=None)
except Exception:
log.exception(f"db.run_aggregation() on {collection}")
return []
async def search_documents(
self, collection: str, search_query: dict, sort_key: str, sort_order: int = 1
) -> List[dict]:
"""find documents by filter and sort them by key"""
try:
cursor = self._db[collection].find(search_query).sort(sort_key, sort_order)
return [doc async for doc in cursor]
except Exception:
log.exception(f"db.search_documents() on {collection}")
return []
async def search_indexes(self, collection: str, search_term: str) -> List[dict]:
"""run a $text search and return results ordered by relevance score"""
try:
cursor = self._db[collection].find(
{"$text": {"$search": search_term}},
{"score": {"$meta": "textScore"}},
).sort([("score", {"$meta": "textScore"})])
return [doc async for doc in cursor]
except Exception:
log.exception(f"db.search_indexes() on {collection}")
return []
# -------------------------------------------------------------------------
# update
async def update_document(
self, collection: str, target: dict, document: dict, do_upsert: bool = False
) -> bool:
"""replace a whole document, optionally upserting"""
try:
response = await self._db[collection].replace_one(target, document, upsert=do_upsert)
return bool(response.matched_count or response.upserted_id)
except Exception:
log.exception(f"db.update_document() on {collection}")
return False
async def update_documents(self, collection: str, target: dict, values: dict) -> int:
"""update_many with a raw update doc, returning modified count"""
try:
response = await self._db[collection].update_many(target, values)
return response.modified_count
except Exception:
log.exception(f"db.update_documents() on {collection}")
return 0
async def update_document_field(self, collection: str, target: dict, updates: dict) -> bool:
"""$set one or more fields on a single document"""
try:
response = await self._db[collection].update_one(target, {"$set": updates})
return response.modified_count > 0
except Exception:
log.exception(f"db.update_document_field() on {collection}")
return False
async def update_document_operator(
self, collection: str, target: dict, update_query: dict
) -> bool:
"""apply raw update operators ($set/$inc/$unset/...) to a single document"""
try:
response = await self._db[collection].update_one(target, update_query)
return response.modified_count > 0
except Exception:
log.exception(f"db.update_document_operator() on {collection}")
return False
async def upsert_document_field(self, collection: str, target: dict, updates: dict) -> bool:
"""$set fields on a single document, creating it if absent"""
try:
response = await self._db[collection].update_one(
target, {"$set": updates}, upsert=True
)
return bool(response.modified_count or response.upserted_id)
except Exception:
log.exception(f"db.upsert_document_field() on {collection}")
return False
async def update_documents_pipeline(
self, collection: str, target: dict, pipeline: list
) -> int:
"""update_many using an aggregation-pipeline update"""
try:
response = await self._db[collection].update_many(target, pipeline)
return response.modified_count
except Exception:
log.exception(f"db.update_documents_pipeline() on {collection}")
return 0
async def document_push_array(
self, collection: str, target: dict, array: str, value: Any
) -> bool:
"""$push a value onto an array field"""
try:
response = await self._db[collection].update_one(target, {"$push": {array: value}})
return response.modified_count > 0
except Exception:
log.exception(f"db.document_push_array() on {collection}")
return False
async def document_push_and_set(
self, collection: str, target: dict, array: str, value: Any,
field_to_set: str, set_value: Any,
) -> bool:
"""$push to an array and $set a field in one update"""
try:
response = await self._db[collection].update_one(
target,
{"$push": {array: value}, "$set": {field_to_set: set_value}},
)
return response.modified_count > 0
except Exception:
log.exception(f"db.document_push_and_set() on {collection}")
return False
async def document_pop_array(
self, collection: str, target: dict, array: str, value: Any
) -> bool:
"""$pull a value from an array field"""
try:
response = await self._db[collection].update_one(target, {"$pull": {array: value}})
return response.modified_count > 0
except Exception:
log.exception(f"db.document_pop_array() on {collection}")
return False
async def find_one_and_update(
self, collection: str, target: dict, update: dict,
return_after: bool = True, do_upsert: bool = False, fields: Optional[dict] = None,
) -> Optional[dict]:
"""atomically update one doc and return it (after-image by default)"""
try:
return await self._db[collection].find_one_and_update(
target, update,
projection=fields,
upsert=do_upsert,
return_document=ReturnDocument.AFTER if return_after else ReturnDocument.BEFORE,
)
except Exception:
log.exception(f"db.find_one_and_update() on {collection}")
return None
async def find_one_and_replace(
self, collection: str, target: dict, document: dict,
return_after: bool = True, do_upsert: bool = False, fields: Optional[dict] = None,
) -> Optional[dict]:
"""atomically replace one doc and return it (after-image by default)"""
try:
return await self._db[collection].find_one_and_replace(
target, document,
projection=fields,
upsert=do_upsert,
return_document=ReturnDocument.AFTER if return_after else ReturnDocument.BEFORE,
)
except Exception:
log.exception(f"db.find_one_and_replace() on {collection}")
return None
async def find_one_and_delete(
self, collection: str, target: dict, fields: Optional[dict] = None
) -> Optional[dict]:
"""atomically delete one doc and return it"""
try:
return await self._db[collection].find_one_and_delete(target, projection=fields)
except Exception:
log.exception(f"db.find_one_and_delete() on {collection}")
return None
# -------------------------------------------------------------------------
# delete
async def delete_document(self, collection: str, target: dict) -> int:
"""delete a single document, returning deleted count (0 or 1)"""
try:
response = await self._db[collection].delete_one(target)
return response.deleted_count
except Exception:
log.exception(f"db.delete_document() on {collection}")
return 0
async def delete_documents(self, collection: str, target: dict) -> int:
"""delete all matching documents, returning deleted count"""
try:
response = await self._db[collection].delete_many(target)
return response.deleted_count
except Exception:
log.exception(f"db.delete_documents() on {collection}")
return 0
# -------------------------------------------------------------------------
# bulk / checks
async def bulk_write(self, collection: str, operations: list, ordered: bool = True) -> dict:
"""
run a batch of pymongo write ops (InsertOne/UpdateOne/DeleteOne/...)
caller builds the ops from pymongo, e.g.:
from pymongo import UpdateOne
ops = [UpdateOne({'_id': i}, {'$set': {...}}, upsert=True) for i in ids]
returns a summary dict of counts, or an empty dict on failure
"""
try:
result = await self._db[collection].bulk_write(operations, ordered=ordered)
return {
"inserted": result.inserted_count,
"matched": result.matched_count,
"modified": result.modified_count,
"deleted": result.deleted_count,
"upserted": result.upserted_count,
}
except Exception:
log.exception(f"db.bulk_write() on {collection}")
return {}
async def document_check_multi(
self, collection: str, target: dict, checks: List[str], value: str
) -> bool:
"""
return whether a doc matching target has value in any of the given arrays
target is a normal filter dict (merged into the query), not a bare _id
"""
try:
query = {
**target,
"$or": [{check: {"$elemMatch": {"$eq": value}}} for check in checks],
}
return await self._db[collection].find_one(query) is not None
except Exception:
log.exception(f"db.document_check_multi() on {collection}")
return False
# -----------------------------------------------------------------------------
# backwards-compat module proxy
# - lets legacy call sites keep using `await mongo.get_documents(...)`
# without an object, after a one-time `mongo.init(conn, db)`
# - works with `import mongo; mongo.func(...)` (resolved at call time)
# - does NOT work with `from mongo import func` (resolved at import,
# before init runs) — switch those sites to `import mongo`
_default: Optional[Mongo] = None
def init(connection_string: str, database: str) -> Mongo:
"""arm the module-level default instance and return it"""
global _default
_default = Mongo(connection_string, database)
return _default
def instance() -> Mongo:
"""return the default instance, raising if init() has not run"""
if _default is None:
raise RuntimeError("mongo not initialized; call mongo.init(conn, db) first")
return _default