Попытка поймать ошибку целостности с помощью SQLAlchemy

У меня проблемы с попыткой поймать ошибку. Я использую Pyramid / SQLAlchemy и сделал форму регистрации с электронной почтой в качестве первичного ключа. Проблема заключается в том, когда вводится дублирующее письмо, оно вызывает IntegrityError, поэтому я пытаюсь поймать эту ошибку и предоставить сообщение, но независимо от того, что я делаю, я не могу ее уловить, ошибка продолжает появляться.

try: new_user = Users(email, firstname, lastname, password) DBSession.add(new_user) return HTTPFound(location = request.route_url('new')) except IntegrityError: message1 = "Yikes! Your email already exists in our system. Did you forget your password?" 

Я получаю то же сообщение, когда пытался, except exc.SQLAlchemyError (хотя я хочу поймать определенные ошибки, а не одеяло). Я также попробовал exc.IntegrityError но не повезло (хотя оно существует в API).

Что-то не так с моим синтаксисом Python, или есть что-то, что мне нужно делать в SQLAlchemy, чтобы поймать его?


Я не знаю, как решить эту проблему, но у меня есть несколько идей о том, что может вызвать проблему. Возможно, оператор try не терпит неудачу, но преуспевает, потому что SQLAlchemy поднимает само исключение, а Pyramid генерирует представление, поэтому except IntegrityError: никогда не активируется. Или, скорее всего, я полностью ошибаюсь в этой ошибке.

  • Несколько доменов и поддоменов в одном экземпляре Pyramid
  • SQLAlchemy: создать преднамеренно пустой запрос?
  • SQLalchemy AttributeError: объект 'str' не имеет атрибута '_sa_instance_state'
  • SQLAlchemy JSON как blob / text
  • SQLalchemy указывает, какой индекс использовать
  • не удалось создать автоинкрементный первичный ключ с фляжкой-sqlalchemy
  • python sqlalchemy + программа postgresql зависает
  • Запрос фильтра SQLAlchemy связанным объектом
  • 5 Solutions collect form web for “Попытка поймать ошибку целостности с помощью SQLAlchemy”

    В Pyramid, если вы сконфигурировали свой сеанс (который для вас делает автоматически), чтобы использовать ZopeTransactionExtension , тогда сеанс не очищается и не будет выполняться до тех пор, пока выполнение не будет выполнено. Если вы хотите поймать какие-либо ошибки SQL самостоятельно в своем представлении, вам нужно заставить flush отправить SQL в движок. DBSession.flush() должен сделать это после add(...) .

    Обновить

    Я обновляю этот ответ на примере точки сохранения только потому, что очень мало примеров того, как это сделать с пакетом транзакций.

     def create_unique_object(db, max_attempts=3): while True: sp = transaction.savepoint() try: obj = MyObject() obj.identifier = uuid.uuid4().hex db.add(obj) db.flush() except IntegrityError: sp.rollback() max_attempts -= 1 if max_attempts < 1: raise else: return obj obj = create_unique_object(DBSession) 

    Обратите внимание, что даже это подвержено дублированию между транзакциями, если не используется блокировка на уровне таблицы, но она по крайней мере показывает, как использовать точку сохранения.

    Что вам нужно сделать, это уловить общее исключение и вывести его класс; то вы можете сделать исключение более конкретным.

     except Exception as ex: print ex.__class__ 

    Не может быть никаких операций с базой данных до DBSession.commit() пор, пока DBSession.commit() поэтому IntegrityError будет поднят позже в стеке после того, как код контроллера, у которого есть try/except , уже вернулся.

    Изменить: отредактированный ответ выше – лучший способ сделать это, используя откат.

    Если вы хотите обрабатывать транзакции в середине приложения пирамиды или что-то, где автоматическая фиксация транзакции выполняется в конце последовательности, нет никакой магии, которая должна произойти.

    Не забудьте запустить новую транзакцию, если предыдущая транзакция завершилась неудачно.

    Как это:

     def my_view(request): ... # Do things if success: try: instance = self._instance(**data) DBSession.add(instance) transaction.commit() return {'success': True} except IntegrityError as e: # <--- Oh no! Duplicate unique key transaction.abort() transaction.begin() # <--- Start new transaction return {'success': False} 

    Обратите внимание, что вызов .commit () в успешной транзакции прекрасен, поэтому нет необходимости запускать новую транзакцию после успешного вызова.

    Вам нужно только прервать транзакцию и запустить новую, если транзакция находится в состоянии отказа.

    (Если транзакция не была такой poc, вы могли бы использовать точку сохранения и вернуться назад к точке сохранения, а не начинать новую транзакцию, к сожалению, это невозможно, поскольку попытка коммита делает недействительным известную предыдущую точку сохранения. (edit: <— Оказывается, я ошибаюсь в этом …)

    Вот как я это делаю.

     from contextlib import( contextmanager, ) @contextmanager def session_scope(): """Provide a transactional scope around a series of operations.""" session = Session() try: yield session session.commit() except: session.rollback() raise finally: session.close() def create_user(email, firstname, lastname, password): new_user = Users(email, firstname, lastname, password) try: with session_scope() as session: session.add(new_user) except sqlalchemy.exc.IntegrityError as e: pass 

    http://docs.sqlalchemy.org/en/latest/orm/session_basics.html#when-do-i-construct-a-session-when-do-i-commit-it-and-when-do-i- закрой его

    Python - лучший язык программирования в мире.