首页 > 解决方案 > Python中接口和类之间的UML聚合

问题描述

我想从我写的一个小程序创建一个 UML。到目前为止,我只为 Java 程序制作了 UML 图,它对我来说是为 python 绘制一个非常新的。问题是,有一个 Writer - “接口”,例如一个从 abc.ABCMeta 继承并实现抽象方法 write() 的类。该接口在两个类中实现。一个数据库类和一个 CSVWriter 类。构造函数,例如另一个名为 DataCollector 的类的init () 方法,将一个已实现 Writer 接口的类作为参数。然后实例 CSVWriter 或数据库将作为实例变量存储在 DataCollector 对象中。

我如何在 UML 中显示这种关系?Python 并没有真正的接口。对我来说,它似乎只是继承自“接口”。我尝试了一个 UML,并将 DataCollector 与 WriterInterface 聚合在一起。可以在接口和类之间使用聚合,还是必须在实现接口的类和类 DataCollector 之间绘制聚合?

到目前为止我是这样画的:

在此处输入图像描述

UML 基于的代码:

import os
import abc
import Adafruit_DHT
import smbus
import csv
import RPi.GPIO as GPIO
import time
import datetime
import mh_z19
#import psycopg2

import config as cfg


I2C_SETTINGS = {
    "DEVICE": 0x23,
    "POWER_DOWN": 0x00,
    "POWER_ON": 0x01,
    "RESET": 0x07,
    "RESOLUTION": {"ONE_TIME_HIGH_RES_MODE_1": 0x20},
    "BUS": 1,
}

CSV_HEADERS = [
    "timestamp",
    "light",
    "humidity",
    "temperature",
    "co2",
    "occupancy",
    "motion_count",
]


class SensorConnector:
    def __init__(self, pin_pir, pin_dht22, i2c_settings):
        self.pin_pir = pin_pir
        self.pin_dht22 = pin_dht22
        self.dht = Adafruit_DHT.DHT22
        self.i2c_settings = i2c_settings
        self.bus = smbus.SMBus(i2c_settings["BUS"])
        GPIO.setmode(GPIO.BCM)
        GPIO.setup(self.pin_pir, GPIO.IN)

    def convert_to_number(self, data):
        "Simple Function to convert 2 bytes of data into a decimal number"
        result = (data[1] + (256 * data[0])) / 1.2
        return result

    def read_light(self, resolution):
        "Read data from I2C Interface"
        data = self.bus.read_i2c_block_data(self.i2c_settings["DEVICE"], resolution)
        return self.convert_to_number(data)

    def read_temp_hum(self):
        hum, temp = Adafruit_DHT.read_retry(self.dht, self.pin_dht22)
        return hum, temp

    def read_sensors(self):
        data = {}
        data["light"] = self.read_light(
            self.i2c_settings["RESOLUTION"]["ONE_TIME_HIGH_RES_MODE_1"]
        )
        data["humidity"], data["temperature"] = self.read_temp_hum()
        data["co2"] = mh_z19.read()["co2"]
        return data


class WriterInterface(metaclass=abc.ABCMeta):
    @classmethod
    def __subclasshook__(cls, subclass):
        return hasattr(subclass, "write") and callable(subclass.write) or NotImplemented

    @abc.abstractmethod
    def write(self, data):
        raise NotImplementedError


class CSVWriter(WriterInterface):
    def __init__(self, file, headers):
        file_exists = os.path.isfile(file)
        self.csv_file = open(file, "a")
        self.writer = csv.DictWriter(
            self.csv_file, delimiter=",", lineterminator="\n", fieldnames=headers
        )
        if not file_exists:
            self.writer.writeheader()
            print("initialized csv_file")

    def write(self, data):
        self.writer.writerow(data)
        self.csv_file.flush()

class Database(WriterInterface):
    def __init__(self):
        self.con = psycopg2.connect(
            host=cfg.postgres["host"],
            dbname=cfg.postgres["db"],
            user=cfg.postgres["user"],
            password=cfg.postgres["passwd"],
        )

    def write(self, data):
        if None not in data.values():
            try:
                cur = self.con.cursor()
                cur.execute(
                    """insert into "sensordata" (created_time, temperature, humidity, \
                    light, occupancy, people_count) values (%s, %s, %s, %s, %s, %s);""",
                    (
                        data["timestamp"],
                        data["temperature"],
                        data["humidity"],
                        data["light"],
                        data["occupancy"],
                        data["motion_count"],
                    ),
                )
                self.con.commit()
                cur.close()
            except Exception as e:
                print(e)
        else:
            print(f"problem with data:\n{data}")


class DataCollector:
    def __init__(self, s_connector, writer):
        self.s_connector = s_connector
        self.writer = writer
        self.ts_last_motion = None
        self.occupied_state = False
        self.motion_count = 0

    def motion_handler(self, channel):
        print("motion detected")
        self.ts_last_motion = datetime.datetime.now()
        if self.occupied_state == False:
            self.occupied_state = True
        self.motion_count += 1

    def collect_data(self):
        GPIO.add_event_detect(
            self.s_connector.pin_pir, GPIO.RISING, callback=self.motion_handler
        )
        while True:
            now = datetime.datetime.now()
            timedelta_since_last_motion = (
                (now - self.ts_last_motion) if self.ts_last_motion else 0
            )
            if (
                self.occupied_state == True
                and timedelta_since_last_motion.seconds > 900
            ):
                print("Room vacant at: ")
                print(time.strftime("%H"), ":", time.strftime("%M"))
                self.occupied_state = False

            data = {
                **self.s_connector.read_sensors(),
                "occupancy": self.occupied_state,
                "timestamp": datetime.datetime.now(),
                "motion_count": self.motion_count,
            }

            print(data)
            self.writer.write(data)
            time.sleep(60)


if __name__ == "__main__":

    s_connector = SensorConnector(pin_pir=17, pin_dht22=4, i2c_settings=I2C_SETTINGS)
    csv_writer = CSVWriter('/path/to/data.csv', CSV_HEADERS)
    #writer = Database()
    dc = DataCollector(s_connector, csv_writer)
    dc.collect_data()

标签: pythonoopinterfaceumlclass-diagram

解决方案


可以在接口和类之间使用聚合

是的,这是正确的方法

我是否必须在实现接口的类和类 DataCollector 之间绘制聚合

不,有几个原因:

  • 接口的目标是隐藏有效的实现,与每个实现类的关系揭示具有所有相关后果的实现

  • 实现该接口的新类可以稍后出现,您不希望每次出现时都将修改 DataCollector 的关系添加到新的

  • DataCollector 只有一个 writer,如果你有几个关系,他们需要是独占的,这是一个没有用的复杂方法


推荐阅读