首页 > 解决方案 > 有没有办法一次运行多次事务?

问题描述

首先,我想提前道歉。我几个月前才开始学习这个,所以我需要完全分解的东西。我有一个使用 python 和 datajoint 的项目(使 sql 代码更短),我需要创建一个至少有 7 个机场、不同飞机等等的机场。然后我需要用乘客预订填充表格。这是我到目前为止所拥有的。

        @schema 
        class Seat(dj.Lookup):
            definition = """
            aircraft_seat : varchar(25)
            """
            contents = [["F_Airbus_1A"],["F_Airbus_1B"],["F_Airbus_2A"],["F_Airbus_2B"],["F_Airbus_3A"], 
            ["F_Airbus_3B"],["F_Airbus_4A"],["F_Airbus_4B"],["F_Airbus_5A"],["F_Airbus_5B"], 
            ["B_Airbus_6A"],["B_Airbus_6B"],["B_Airbus_6C"],["B_Airbus_6D"],["B_Airbus_7A"], 
            ["B_Airbus_7B"],["B_Airbus_7C"],["B_Airbus_7D"],["B_Airbus_8A"],["B_Airbus_8B"], 
            ["B_Airbus_8C"],["B_Airbus_8D"],["B_Airbus_9A"],["B_Airbus_9B"],

这继续让我在每架飞机上总共有 144 个座位。

        @schema
        class Flight(dj.Manual):
            definition = """
            flight_no  : int
            ---
            economy_price : decimal(6,2)
            departure : datetime
            arrival : datetime 
            ---
            origin_code : int
            dest_code : int
            """ 
        @schema
        class Passenger(dj.Manual):
            definition = """
            passenger_id : int
            ---
            full_name : varchar(40)
            ssn : varchar(20)
            """
        @schema
        class Reservation(dj.Manual):
            definition = """
            -> Flight
            -> Seat
            ---
            -> Passenger
    
            """

然后我填充航班和乘客:

        Flight.insert((dict(flight_no = i,
                    economy_price = round(random.randint(100, 1000), 2), 
                    departure = faker.date_time_this_month(),
                    arrival = faker.date_time_this_month(),
                    origin_code = random.randint(1,7),
                    dest_code = random.randint(1,7)))
             for i in range(315))
        Passenger.insert(((dict(passenger_id=i, full_name=faker.name(), 
                   ssn = faker.ssn()))
             for i in range(10000)), skip_duplicates = True)

最后我创建了交易:

        def reserve(passenger_id, origin_code, dest_code, departure):
            with dj.conn().transaction:
             available_seats = ((Seat * Flight - Reservation) & Passenger & 
             {'passenger_id':passenger_id}).fetch(as_dict=True)
        try:
            choice = random.choice(available_seats)
        except IndexError:
            raise IndexError(f'Sorry, no seats available for {departure}')
        name = (Passenger & {'passenger_id': passenger_id}).fetch1('full_name')
        print('Success. Reserving seat {aircraft_seat} at ticket_price {economy_price} for 
        {name}'.format(name=name, **choice))
        Reservation.insert1(dict(choice, passenger_id=passenger_id), ignore_extra_fields=True)
  
     
       reserve(random.randint(1,1000), random.randint(1,7), 
       random.randint(1,7),random.choice('departure'))
       
       Output[]: Success. Reserving seat E_Yak242_24A at ticket_price 410.00 for Cynthia Erickson

       Reservation()
       Output[]: flight_no      aircraft_seat      passenger_id

             66           B_Yak242_7A           441

所以我需要每天有 10.5 趟航班,飞机至少 75% 满员,这让我需要超过 30000 次预订。有没有办法一次做到 50 次?我一直在寻找答案,但未能找到解决方案。谢谢你。

标签: pythonsqljupyter-notebookfakerdatajoint

解决方案


DataJoint 的维护者之一。首先,我要感谢您试用 DataJoint;好奇你是怎么知道这个项目的。

预先警告,这将是一个很长的帖子,但我觉得这是一个澄清一些事情的好机会。关于有问题的问题,不确定我是否完全理解您的问题的性质,但让我关注几点。我建议在确定如何最好地处理您的案件之前完整阅读此答案。

TL;DR:计算表是你的朋友。

多线程

由于它已经出现在评论中,因此值得指出的是,截至2021-01-06,DataJoint并不是完全线程安全的(至少从共享连接的角度来看)。不幸的是,这主要是由于 PyMySQL 的一个长期问题,它是 DataJoint 的主要依赖项。也就是说,如果您在每个线程或进程上启动新连接,您不应该遇到任何问题。但是,这是一种昂贵的解决方法,并且不能与事务结合使用,因为它们要求在单个连接中执行操作。说到那个...

计算表和作业预留

计算表是您在上述解决方案尝试中的一个明显遗漏。计算表提供了一种机制,可以将其实体与上游父表中的实体关联起来,并在插入之前进行额外处理(在make计算表类的方法中定义),可以通过调用为每个新条目populate调用方法的方法来调用它make. 对您的方法的调用make是受事务约束的,并且应该实现您正在寻找的内容。有关其使用的更多详细信息,请参阅文档中的此处

此外,为了获得额外的性能提升,还有另一个称为 Job Reservation 的功能,它提供了一种将多个工作人员集中在一起populate以有组织的分布式方式处理大型数据集(使用)的方法。我觉得这里不需要,但值得一提,最终取决于您如何查看下面的结果。您可以在我们的文档中找到有关此功能的更多信息。

架构设计

根据我对您最初设计的理解,我有一些建议,我们可以如何改进数据流以提高清晰度和性能,并提供具体示例来说明我们如何使用计算表的强大功能。在我的本地设置上运行如下图所示,我能够在29m54 秒内处理您的30k预订需求,其中包含2种不同的飞机型号、7个机场、10k可能的乘客、550 个可用航班。至少 75% 的座位容量没有得到验证,只是因为我还没有看到你尝试过这个,但如果你看到我是如何分配座位的,你会注意到它几乎就在那里。:)

免责声明:我应该注意到,下面的设计仍然是对协调适当旅行预订的实际挑战的过度简化。采取了相当多的假设,主要是为了教育的利益,而不是提交一个完整的、即插即用的解决方案。因此,我明确选择避免使用longblob以下解决方案,以便更容易理解。实际上,一个适当的解决方案可能包括更高级的主题以进一步提高性能,例如longblob,_update等。

也就是说,让我们从考虑以下内容开始:

import datajoint as dj # conda install -c conda-forge datajoint or pip install datajoint
import random
from faker import Faker # pip install Faker
faker = Faker()
Faker.seed(0) # Pin down randomizer between runs

schema = dj.Schema('commercial_airtravel') # instantiate a workable database

@schema 
class Plane(dj.Lookup):
    definition = """
    # Defines manufacturable plane model types
    plane_type    : varchar(25) # Name of plane model
    ---
    plane_rows    : int         # Number of rows in plane model i.e. range(1, plane_rows + 1)
    plane_columns : int         # Number of columns in plane model; to extract letter we will need these indices
    """
    contents = [('B_Airbus', 37, 4), ('F_Airbus', 40, 5)] # Since new entries to this table should happen infrequently, this is a good candidate for a Lookup table

@schema 
class Airport(dj.Lookup):
    definition = """
    # Defines airport locations that can serve as origin or destination
    airport_code : int         # Airport's unique identifier
    ---
    airport_city : varchar(25) # Airport's city
    """
    contents = [(i, faker.city()) for i in range(1, 8)] # Also a good candidate for Lookup table

@schema 
class Passenger(dj.Manual):
    definition = """
    # Defines users who have registered accounts with airline i.e. passenger
    passenger_id : serial      # Passenger's unique identifier; serial simply means an auto-incremented, unsigned bigint
    ---
    full_name    : varchar(40) # Passenger's full name
    ssn          : varchar(20) # Passenger's Social Security Number
    """

Passenger.insert((dict(full_name=faker.name(),
                       ssn = faker.ssn()) for _ in range(10000))) # Insert a random set of passengers

@schema
class Flight(dj.Manual):
    definition = """
    # Defines specific planes assigned to a route
    flight_id            : serial                      # Flight's unique identifier
    ---
    -> Plane                                           # Flight's plane model specs; this will simply create a relation to Plane table but not have the constraint of uniqueness
    flight_economy_price : decimal(6,2)                # Flight's fare price
    flight_departure     : datetime                    # Flight's departure time
    flight_arrival       : datetime                    # Flight's arrival time
    -> Airport.proj(flight_origin_code='airport_code') # Flight's origin; by using proj in this way we may rename the relation in this table
    -> Airport.proj(flight_dest_code='airport_code')   # Flight's destination
    """

plane_types = Plane().fetch('plane_type') # Fetch available plane model types
Flight.insert((dict(plane_type = random.choice(plane_types),
                    flight_economy_price = round(random.randint(100, 1000), 2), 
                    flight_departure = faker.date_time_this_month(),
                    flight_arrival = faker.date_time_this_month(),
                    flight_origin_code = random.randint(1, 7),
                    flight_dest_code = random.randint(1, 7))
               for _ in range(550))) # Insert a random set of flights; for simplicity we are not verifying that flight_departure < flight_arrival

@schema
class BookingRequest(dj.Manual):
    definition = """
    # Defines one-way booking requests initiated by passengers
    booking_id : serial                                # Booking Request's unique identifier
    ---
    -> Passenger                                       # Passenger who made request
    -> Airport.proj(flight_origin_code='airport_code') # Booking Request's desired origin
    -> Airport.proj(flight_dest_code='airport_code')   # Booking Request's desired destination
    """

BookingRequest.insert((dict(passenger_id = random.randint(1, 10000),
                            flight_origin_code = random.randint(1, 7),
                            flight_dest_code = random.randint(1, 7))
                       for i in range(30000))) # Insert a random set of booking requests

@schema
class Reservation(dj.Computed):
    definition = """
    # Defines booked reservations 
    -> BookingRequest              # Association to booking request
    ---
    flight_id        : int         # Flight's unique identifier
    reservation_seat : varchar(25) # Reservation's assigned seat
    """
    def make(self, key):
        # Determine booking request's details
        full_name, flight_origin_code, flight_dest_code = (BookingRequest * Passenger & key).fetch1('full_name',
                                                                                                    'flight_origin_code',
                                                                                                    'flight_dest_code')
        # Determine possible flights to satisfy booking
        possible_flights = (Flight * Plane *
                            Airport.proj(flight_dest_city='airport_city',
                                         flight_dest_code='airport_code') &
                            dict(flight_origin_code=flight_origin_code,
                                 flight_dest_code=flight_dest_code)).fetch('flight_id',
                                                                           'plane_rows',
                                                                           'plane_columns',
                                                                           'flight_economy_price',
                                                                           'flight_dest_city',
                                                                           as_dict=True)
        # Iterate until we find a vacant flight and extract details
        for flight_meta in possible_flights:
            # Determine seat capacity
            all_seats = set((f'{r}{l}' for rows, letters in zip(*[[[n if i==0 else chr(n + 64) 
                                                                    for n in range(1, el + 1)]]
                                                                  for i, el in enumerate((flight_meta['plane_rows'],
                                                                                          flight_meta['plane_columns']))])
                             for r in rows
                             for l in letters))
            # Determine unavailable seats
            taken_seats = set((Reservation & dict(flight_id=flight_meta['flight_id'])).fetch('reservation_seat'))
            try:
                # Randomly choose one of the available seats
                reserved_seat = random.choice(list(all_seats - taken_seats))
                # You may uncomment the below line if you wish to print the success message per processed record
                # print(f'Success. Reserving seat {reserved_seat} at ticket_price {flight_meta["flight_economy_price"]} for {full_name}.')
                # Insert new reservation
                self.insert1(dict(key, flight_id=flight_meta['flight_id'], reservation_seat=reserved_seat))
                return
            except IndexError:
                pass
        raise IndexError(f'Sorry, no seats available departing to {flight_meta["flight_dest_city"]}')
    
Reservation.populate(display_progress=True) # This is how we process new booking requests to assign a reservation; you may invoke this as often as necessary

语法和约定

最后,在您提供的代码中提供一些小的反馈。关于表定义,您应该只---在定义中使用一次,以明确区分主键属性和辅助属性(参见您的Flight表)。出乎意料的是,这并没有在您的情况下引发错误,但应该这样做。我将提出一个问题,因为这似乎是一个错误。

虽然transaction暴露在 上dj.conn(),但很少需要直接调用它。DataJoint 提供了在内部处理此问题的好处,以减少用户对此的管理开销。但是,如果极端情况需要,该选项仍然可用。对于您的情况,我会避免直接调用它,而是建议使用计算(或导入)表。


推荐阅读