首页 > 解决方案 > 不存在同名列时出现歧义列错误

问题描述

sqlalchemy尝试Document.document_status在我的模型中调用混合列表达式时,我遇到了不明确的列名错误:

class Document(db.Model):
    """A specific Customer Transaction Document.

    Each Transaction may contain one or more document.
    A document is associated with one or more Services
    representing the services performed on the physical
    document.
    """

    transaction_id = db.Column(db.String, db.ForeignKey('transaction.id'), primary_key=True, nullable=False)
    transaction = db.relationship('Transaction', backref=db.backref('documents', lazy=True))

    document_id = db.Column(db.Integer, primary_key=True, nullable=False, autoincrement=False, default=1)

    type_id = db.Column(db.Integer, db.ForeignKey('document_type.id'), nullable=False)
    type = db.relationship('DocumentType')

    locale_id = db.Column(db.String, db.ForeignKey('locale.id'), nullable=True)
    locale = db.relationship('Locale')

    country_of_use = db.Column(db.String, nullable=False)
    rush_service = db.Column(db.Boolean, default=False)
    deadline = db.Column(db.DateTime, nullable=True)

    notification_thread = db.Column(db.String, nullable=True)

    services = db.relationship("Service", secondary=service_association_table)

    @hybrid_property
    def tracking_code(self):
        return f"{self.transaction_id}.{self.document_id}"

    @tracking_code.expression
    def tracking_code(cls):
        return ColumnOperators.concat(ColumnOperators.concat(cls.transaction_id, "."), cast(cls.document_id, db.String))

    @hybrid_property
    def document_status(self):
        if hasattr(self, 'scans') and len(self.scans) > 0:
            return self.scans[0].step.status
        else:
            return "Pending..."

    @document_status.expression
    def document_status(cls):
        # code = select([Step.status]).where(
        #     Step.id == select([Scan.step_id])
        #     .where(and_(cls.document_id == Scan.document_id,
        #                 cls.transaction_id == Scan.transaction_id))
        #     .order_by(Scan.timestamp)
        #     .limit(1)).label("status")
        # print(code.status)
        # return code
        # return Step.status
        q = (select([Scan.step_id]).
             where(and_(Scan.transaction_id == cls.transaction_id,
                        Scan.document_id == cls.document_id)).order_by(Scan.timestamp).
             limit(1).
             label("id"))
        return select([Step.status]).where(Step.id == q)
        # return select([Step.status]).where(
        #     Step.id == q).limit(1)

    def __repr__(self):
        return f'<Document {self.transaction_id}.{self.document_id}>'

    def to_dict(self):
        data = {
            'id': self.document_id,
            'type': self.type.type,
            'locale': self.locale.locale,
            'country_of_use': self.country_of_use,
            'rush_service': self.rush_service,
            'deadline': self.deadline,
            '_links': {
                'self': url_for('api.get_document', document_id=self.document_id, transaction_id=self.transaction_id),
                'transaction': url_for('api.get_transaction', id=self.transaction_id),
                'type': url_for('api.get_document_type', id=self.type_id),
                'locale': url_for('api.get_locale', id=self.locale_id)
            }
        }
        return data

我的其他联接表都不包含任何 astatus属性。如果信息有帮助,我可以提供其他模型。

sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) ambiguous column name: status
    [SQL: SELECT anon_1.document_transaction_id AS anon_1_document_transaction_id, anon_1.document_document_id AS anon_1_document_document_id, anon_1.document_type_id AS anon_1_document_type_id, anon_1.document_locale_id AS anon_1_document_locale_id, anon_1.document_country_of_use AS anon_1_document_country_of_use, anon_1.document_rush_service AS anon_1_document_rush_service, anon_1.document_deadline AS anon_1_document_deadline, anon_1.document_notification_thread AS anon_1_document_notification_thread, anon_1.status AS anon_1_status, anon_1.step_status AS anon_1_step_status, anon_1.anon_2 AS anon_1_anon_2, anon_1.step_id AS anon_1_step_id, anon_1.step_notification_id AS anon_1_step_notification_id, anon_1.step_service_id AS anon_1_step_service_id, service_1.id AS service_1_id, service_1.name AS service_1_name, service_1.price AS service_1_price, service_1.min_turnaround AS service_1_min_turnaround, service_1.max_turnaround AS service_1_max_turnaround 
    FROM (SELECT document.transaction_id AS document_transaction_id, document.document_id AS document_document_id, document.type_id AS document_type_id, document.locale_id AS document_locale_id, document.country_of_use AS document_country_of_use, document.rush_service AS document_rush_service, document.deadline AS document_deadline, document.notification_thread AS document_notification_thread, status AS status, step.status AS step_status, step.id = (SELECT scan.step_id 
    FROM scan 
    WHERE scan.transaction_id = document.transaction_id AND scan.document_id = document.document_id ORDER BY scan.timestamp
     LIMIT ? OFFSET ?) AS anon_2, step.id AS step_id, step.notification_id AS step_notification_id, step.service_id AS step_service_id 
    FROM document, (SELECT step.status AS status 
    FROM step 
    WHERE step.id = (SELECT scan.step_id 
    FROM scan, document 
    WHERE scan.transaction_id = document.transaction_id AND scan.document_id = document.document_id ORDER BY scan.timestamp
     LIMIT ? OFFSET ?)), step ORDER BY (SELECT step.status 
    FROM step 
    WHERE step.id = (SELECT scan.step_id 
    FROM scan, document 
    WHERE scan.transaction_id = document.transaction_id AND scan.document_id = document.document_id ORDER BY scan.timestamp
     LIMIT ? OFFSET ?)) DESC
     LIMIT ? OFFSET ?) AS anon_1 LEFT OUTER JOIN (service_association AS service_association_1 JOIN service AS service_1 ON service_1.id = service_association_1.service_id) ON anon_1.document_document_id = service_association_1.document_id AND anon_1.document_transaction_id = service_association_1.transaction_id ORDER BY (SELECT document.transaction_id AS document_transaction_id, document.document_id AS document_document_id, document.type_id AS document_type_id, document.locale_id AS document_locale_id, document.country_of_use AS document_country_of_use, document.rush_service AS document_rush_service, document.deadline AS document_deadline, document.notification_thread AS document_notification_thread, status AS status, step.status AS step_status, step.id = (SELECT scan.step_id 
    FROM scan 
    WHERE scan.transaction_id = document.transaction_id AND scan.document_id = document.document_id ORDER BY scan.timestamp
     LIMIT ? OFFSET ?) AS anon_2, step.id AS step_id, step.notification_id AS step_notification_id, step.service_id AS step_service_id 
    FROM document, (SELECT step.status AS status 
    FROM step 
    WHERE step.id = (SELECT scan.step_id 
    FROM scan, document 
    WHERE scan.transaction_id = document.transaction_id AND scan.document_id = document.document_id ORDER BY scan.timestamp
     LIMIT ? OFFSET ?)), step ORDER BY (SELECT step.status 
    FROM step 
    WHERE step.id = (SELECT scan.step_id 
    FROM scan, document 
    WHERE scan.transaction_id = document.transaction_id AND scan.document_id = document.document_id ORDER BY scan.timestamp
     LIMIT ? OFFSET ?)) DESC
     LIMIT ? OFFSET ?) DESC]
    [parameters: (1, 0, 1, 0, 1, 0, 20, 0, 1, 0, 1, 0, 1, 0, 20, 0)]
    (Background on this error at: http://sqlalche.me/e/e3q8)

我注意到 SQLAlchemy 似乎在 anon_1 上创建了一个状态属性。这似乎是问题的根源,但我不明白为什么。

标签: pythonsqlalchemyambiguous

解决方案


看起来嵌套查询正在造成问题。我最终用select第二个where子句替换了内部:

 @status.expression
    def status(cls):
        return = (select([Step.status]).
                  where(Scan.step_id == Step.id).
                  where(and_(Scan.transaction_id == cls.transaction_id,
                             Scan.document_id == cls.document_id)).
                  order_by(Scan.timestamp).
                  limit(1).
                  label("abcdefg"))

推荐阅读