Source code for process.util

import hashlib
import io
import logging
import os
from contextlib import contextmanager
from textwrap import fill

import ijson
import simplejson as json
from django.conf import settings
from django.db import IntegrityError, connections, transaction
from yapw.clients import AsyncConsumer, Blocking
from yapw.decorators import decorate
from yapw.methods import add_callback_threadsafe, nack

from process.exceptions import AlreadyExists, InvalidFormError
from process.models import Collection, CollectionFile, CollectionNote, ProcessingStep, Record

logger = logging.getLogger(__name__)

YAPW_KWARGS = {"url": settings.RABBIT_URL, "exchange": settings.RABBIT_EXCHANGE_NAME, "prefetch_count": 20}
EXTENSION_URL = "https://raw.githubusercontent.com/open-contracting-extensions/ocds_{}_extension/master/extension.json"


[docs] def wrap(string): """Format a long string as a help message, and return it.""" return "\n".join(fill(paragraph, width=78, replace_whitespace=False) for paragraph in string.split("\n"))
[docs] def walk(paths): for path in paths: if os.path.isfile(path): yield path else: for root, _, files in os.walk(path): for name in files: if not name.startswith("."): yield os.path.join(root, name)
[docs] @contextmanager def get_publisher(): client = Blocking(**YAPW_KWARGS) try: yield client finally: client.close()
[docs] def consume(*args, **kwargs): client = AsyncConsumer(*args, **kwargs, **YAPW_KWARGS) client.start()
[docs] def decorator(decode, callback, state, channel, method, properties, body): """ Close the database connections opened by the callback, before returning. If the callback raises an exception, shut down the client in the main thread, without acknowledgment. For some exceptions, assume that the same message was delivered twice, log an error, and nack the message. """ def errback(exception): # These errors should only occur if the RabbitMQ and/or PostgreSQL connection is lost. It's not possible to # have a transaction that spans both systems, so it's possible to insert a row then fail to ack a message. # # That said, we monitor the frequency of these errors via Sentry, to ensure that they are caused by the above # and not by an error in logic. Their number should not exceed the prefetch count. # # InvalidFormError is included, as it may be for a "unique_together" error, which is an integrity error. # # Collection.DoesNotExist should only occur in the wiper worker due to a duplicate message. It can also occur # in the finisher worker if the worker was stopped, and the wiper ran before the finisher. if isinstance(exception, AlreadyExists | InvalidFormError | IntegrityError | Collection.DoesNotExist): logger.exception("%s maybe caused by duplicate message %r, skipping", type(exception).__name__, body) nack(state, channel, method.delivery_tag, requeue=False) # This error should never occur under normal operations. However, such messages interrupt processing, so they # are nack'ed. elif isinstance(exception, CollectionFile.DoesNotExist | Record.DoesNotExist): logger.exception("Unprocessable message %r, skipping", body) nack(state, channel, method.delivery_tag, requeue=False) else: logger.exception("Unhandled exception when consuming %r, shutting down gracefully", body) add_callback_threadsafe(state.connection, state.interrupt) def finalback(): for conn in connections.all(): conn.close() decorate(decode, callback, state, channel, method, properties, body, errback, finalback)
[docs] def get_or_create(model, data): """Get or create a Data or PackageData instance.""" hash_md5 = hashlib.md5( # noqa: S324 # non-cryptographic json.dumps(data, separators=(",", ":"), sort_keys=True, use_decimal=True).encode("utf-8") ).hexdigest() try: # Another transaction is needed here, otherwise a parent transaction catches the integrity error. with transaction.atomic(): obj, _created = model.objects.get_or_create(hash_md5=hash_md5, defaults={"data": data}) # If another transaction in another thread COMMITs the same data after the SELECT, but before the INSERT. except IntegrityError: obj = model.objects.get(hash_md5=hash_md5) return obj
[docs] def create_note(collection, code, note, **kwargs): if isinstance(note, list): note = "\n".join(note) CollectionNote(collection=collection, code=code, note=note, **kwargs).save()
[docs] def create_step(name, collection_id, **kwargs): ProcessingStep(name=name, collection_id=collection_id, **kwargs).save()
[docs] @contextmanager def deleting_step(*args, **kwargs): """Delete the named step and run any finish callback only if successful or if the error is expected.""" try: yield # Delete the step so that the collection is completable, only if the error was expected. except ( # See the errback() function in the decorator() function. AlreadyExists, InvalidFormError, IntegrityError, # See the try/except block in the callback() function of the file_worker worker. FileNotFoundError, ijson.common.IncompleteJSONError, ) as exception: delete_step(*args, **kwargs, exception=exception) raise else: delete_step(*args, **kwargs)
[docs] def delete_step(name, finish=None, finish_args=(), exception=None, **kwargs): # kwargs must include collection_id, collection_file_id and/or ocid. processing_steps = ProcessingStep.objects.filter(name=name, **kwargs) deleted, _ = processing_steps.delete() if not deleted: logger.warning("No such processing step found: %s: %s", name, kwargs) if finish: finish(*finish_args, exception=exception)
[docs] @contextmanager def create_logger_note(collection, name): stream = io.StringIO() handler = logging.StreamHandler(stream) handler.setLevel(logging.WARNING) logger = logging.getLogger(name) logger.addHandler(handler) yield logger.removeHandler(handler) if note := stream.getvalue(): create_note(collection, CollectionNote.Level.WARNING, note)
[docs] def get_extensions(package): extensions = set() package_extensions = package.get("extensions") if isinstance(package_extensions, list): extensions = {extension for extension in package_extensions if isinstance(extension, str)} # The master version of the lots extension depends on OCDS 1.2 or the submission terms extension. if EXTENSION_URL.format("lots") in extensions: extensions.add(EXTENSION_URL.format("submissionTerms")) return frozenset(extensions)