首页 > 技术文章 > mysql 读写分离(手动和自动方法)

wjun0 2019-11-19 15:13 原文

使用sqlalchemy 使mysq自动读写分离:

代码如下:

from flask import Flask
from flask_sqlalchemy import SQLAlchemy, SignallingSession, get_state
from sqlalchemy import orm

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@192.168.105.134:3306/test30'  # 设置数据库连接地址
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False  # 是否追踪数据库变化(触发某些钩子函数), 开启后效率会变
app.config['SQLALCHEMY_ECHO'] = True  # 开启后, 控制台会打印底层执行的SQL语句

app.config['SQLALCHEMY_BINDS'] = {  # get_engine的bind参数为该配置的键
    'master': 'mysql://root:mysql@192.168.105.134:3306/test30',
    'slave': 'mysql://root:mysql@192.168.105.134:8306/test30'
}


class RoutingSession(SignallingSession):
    def get_bind(self, mapper=None, clause=None):
        """当进行数据操作时, 会调用该方法来获取进行该操作的数据库引擎(连接)"""

        state = get_state(self.app)
        if self._flushing:  # 增删改操作, 使用主库
            print('使用主库')
            return state.db.get_engine(self.app, bind='master')
        else:  # 读操作, 使用从库
            print('使用从库')
            return state.db.get_engine(self.app, bind='slave')


class RoutingSQLAlchemy(SQLAlchemy):
    def create_session(self, options):
        return orm.sessionmaker(class_=RoutingSession, db=self, **options)


# 初始化组件(建立数据库连接)
db = RoutingSQLAlchemy(app)


# ORM  类->表  类属性->字段  对象->记录
class User(db.Model):
    __tablename__ = "t_user"  # 设置表名, 默认为类名的小写
    id = db.Column(db.Integer, primary_key=True)  # 主键
    name = db.Column(db.String(20), unique=True, nullable=False)  # 设置唯一&非空约束
    age = db.Column(db.Integer, default=10, index=True)  # 设置默认值约束&建立索引


@app.route('/')
def index():  
    # 增加数据  进行检验是否读写分离
    # 1.创建模型对象
    user1 = User(name='zs', age=20)
    db.session.add(user1)
    db.session.commit()

    print('-' * 30)
    # 查询数据  
    print(User.query.all())
    return "index"


if __name__ == '__main__':
    db.drop_all()  # 删除所有继承自db.Model的表
    db.create_all()  # 创建所有继承自db.Model的表
    app.run(debug=True)

 

手动时mysql读写分离:

1,修改源码:routing_sqlalchemy.py

import random
from flask_sqlalchemy import SQLAlchemy, BaseQuery, Model, SignallingSession, get_state
from sqlalchemy import orm


class Config:
    SQLALCHEMY_BINDS = {
        "bj_m1": 'mysql://root:mysql@192.168.105.134:3306/test30',
        "bj_m2": 'mysql://root:mysql@192.168.105.134:3306/test30',
        "bj_s1": 'mysql://root:mysql@192.168.105.134:8306/test30',
        "bj_s2": 'mysql://root:mysql@192.168.105.134:8306/test30',
    }
    SQLALCHEMY_CLUSTER = {
        "masters": ["bj_m1", "bj_m2"],
        "slaves": ['bj_s1', 'bj_s2'],
        "default": 'bj_m1'
    }
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    SQLALCHEMY_ECHO = False


class RoutingSession(SignallingSession):
    def __init__(self, db, autocommit=False, autoflush=True, **options):
        SignallingSession.__init__(self, db, autocommit=autocommit, autoflush=autoflush, **options)
        self.default_key = db.default_key
        self.master_keys = db.master_keys if len(db.master_keys) else self.default_key
        self.slave_keys = db.slave_keys if len(db.slave_keys) else self.default_key
        self.bind_key = None


    def get_bind(self, mapper=None, clause=None):
        """获取会话使用的数据库连接engine"""
        state = get_state(self.app)

        if self.bind_key:
            # 指定
            print('Using DB bind: _name={}'.format(self.bind_key))
            return state.db.get_engine(self.app, bind=self.bind_key)
        else:
            # 默认数据库
            print('Using default DB bind: _name={}'.format(self.default_key))
            return state.db.get_engine(self.app, bind=self.default_key)

    def set_to_write(self):
        """使用写数据库"""
        self.bind_key = random.choice(self.master_keys)

    def set_to_read(self):
        """使用读数据库"""
        self.bind_key = random.choice(self.slave_keys)


class RoutingSQLAlchemy(SQLAlchemy):
    def init_app(self, app):
        config_binds = app.config.get("SQLALCHEMY_BINDS")
        if not config_binds:
            raise RuntimeError('Missing SQLALCHEMY_BINDS config')

        cluster = app.config.get("SQLALCHEMY_CLUSTER")
        if not cluster:
            raise RuntimeError('Missing SQLALCHEMY_CLUSTER config')

        default_key = cluster.get('default')
        if not default_key:
            raise KeyError("deafult is not in SQLALCHEMY_CLUSTER")

        # 生成并保存数据库引擎
        self.master_keys = cluster.get("masters") or []
        self.slave_keys = cluster.get("slaves") or []
        self.default_key = default_key

        super(RoutingSQLAlchemy, self).init_app(app)


    def create_session(self, options):
        return orm.sessionmaker(class_=RoutingSession, db=self, **options)
View Code

2, __init__.py下导入routing_sqlalchemy.py 中的类创建 db

from .routing_sqlalchemy import RoutingSQLAlchemy

db = RoutingSQLAlchemy()

3,定义装饰器类,实现读写分离

import functools

from user_select import db

def set_read_db(f):
    @functools.wraps(f)
    def wrapper(*args, **kwargs):
        db.session().set_to_read()
        return f(*args, **kwargs)
    
    return wrapper


def set_write_db(f):
    @functools.wraps(f)
    def wrapper(*args, **kwargs):
        db.session().set_to_write()
        return f(*args, **kwargs)

    return wrapper

4,使用装饰器方式,调用读写分离,如果用装饰器,都使用主库进行读写

from flask import Flask
from user_select import db
from user_select.routing_sqlalchemy import Config
from user_select.decorator import set_read_db, set_write_db


app = Flask(__name__)
app.config.from_object(Config)

# 初始化数据库连接对象
db.init_app(app)


# 建立映射模型  类->表  类属性->字段  对象->记录
class User(db.Model):
   __tablename__ = 't_user'  # 设置表名  表名默认为类名小写
   id = db.Column(db.Integer, primary_key=True)  # 主键  默认主键自增
   name = db.Column(db.String(20), unique=True)  # 设置唯一
   age = db.Column(db.Integer)


@app.route('/')
@set_write_db
def index():
    """增加数据"""
    user1 = User(name='lisi', age=20)
    db.session.add(user1)
    db.session.commit()

    return "index"


@app.route('/demo1')
# @set_read_db
def demo1():
    users = User.query.all()
    print(users)
    return 'demo1'


if __name__ == '__main__':
    # db.drop_all()  # 删除所有继承自db.Model的表
    # db.create_all()  # 创建所有继承自db.Model的表
    app.run(debug=True)

 

 

 

 

 

 

推荐阅读