SQLAlchemy session management in long-running process
- by codeape
Scenario:
A .NET-based application server (Wonderware IAS/System Platform) hosts automation objects that communicate with various equipment on the factory floor.
CPython is hosted inside this application server (using Python for .NET).
The automation objects have scripting functionality built-in (using a custom, .NET-based language). These scripts call Python functions.
The Python functions are part of a system to track Work-In-Progress on the factory floor. The purpose of the system is to track the produced widgets along the process, ensure that the widgets go through the process in the correct order, and check that certain conditions are met along the process. The widget production history and widget state is stored in a relational database, this is where SQLAlchemy plays its part.
For example, when a widget passes a scanner, the automation software triggers the following script (written in the application server's custom scripting language):
' wiget_id and scanner_id provided by automation object
' ExecFunction() takes care of calling a CPython function
retval = ExecFunction("WidgetScanned", widget_id, scanner_id);
' if the python function raises an Exception, ErrorOccured will be true
' in this case, any errors should cause the production line to stop.
if (retval.ErrorOccured) then
ProductionLine.Running = False;
InformationBoard.DisplayText = "ERROR: " + retval.Exception.Message;
InformationBoard.SoundAlarm = True
end if;
The script calls the WidgetScanned python function:
# pywip/functions.py
from pywip.database import session
from pywip.model import Widget, WidgetHistoryItem
from pywip import validation, StatusMessage
from datetime import datetime
def WidgetScanned(widget_id, scanner_id):
widget = session.query(Widget).get(widget_id)
validation.validate_widget_passed_scanner(widget, scanner) # raises exception on error
widget.history.append(WidgetHistoryItem(timestamp=datetime.now(), action=u"SCANNED", scanner_id=scanner_id))
widget.last_scanner = scanner_id
widget.last_update = datetime.now()
return StatusMessage("OK")
# ... there are a dozen similar functions
My question is: How do I best manage SQLAlchemy sessions in this scenario? The application server is a long-running process, typically running months between restarts. The application server is single-threaded.
Currently, I do it the following way:
I apply a decorator to the functions I make avaliable to the application server:
# pywip/iasfunctions.py
from pywip import functions
def ias_session_handling(func):
def _ias_session_handling(*args, **kwargs):
try:
retval = func(*args, **kwargs)
session.commit()
return retval
except:
session.rollback()
raise
return _ias_session_handling
# ... actually I populate this module with decorated versions of all the functions in pywip.functions dynamically
WidgetScanned = ias_session_handling(functions.WidgetScanned)
Question: Is the decorator above suitable for handling sessions in a long-running process? Should I call session.remove()?
The SQLAlchemy session object is a scoped session:
# pywip/database.py
from sqlalchemy.orm import scoped_session, sessionmaker
session = scoped_session(sessionmaker())
I want to keep the session management out of the basic functions. For two reasons:
There is another family of functions, sequence functions. The sequence functions call several of the basic functions. One sequence function should equal one database transaction.
I need to be able to use the library from other environments. a) From a TurboGears web application. In that case, session management is done by TurboGears. b) From an IPython shell. In that case, commit/rollback will be explicit.
(I am truly sorry for the long question. But I felt I needed to explain the scenario. Perhaps not necessary?)