1. 程式人生 > >使用flask從零構建自動化運維平臺系列二

使用flask從零構建自動化運維平臺系列二

文章目錄


寫程式碼也是一種藝術,結構層次感一定要好,這樣做出來的才是一個好作品。

程式碼管理

git

目錄結構

SmartOps
├── app
│   ├── factory.py
│   ├── __init__.py
│   ├── models.py
│   ├── server.py
│   ├── soapi
│   ├── static
│   └── templates
├── config.py
├── manage.py
├── migrations
├── requirements.txt
├── tests
└── venv

目錄結構用途說明

目錄 用途
app 存放flask應用程式碼
migrations 資料庫遷移指令碼
tests 單元測試
venv 虛擬環境

目錄檔案說明

檔案 用途
manage.py 啟動程式以及其他的程式任務。
config.py
儲存配置
requirements.txt python依賴包

配置檔案

配置檔案決定程式做出什麼的行為,比較常用的就是開發環境,測試環境,正式環境三種,下面是個例子

config.py

import logging
import os
from datetime import timedelta

CONFIG = {
    "development": "config.DevelopmentConfig",
    "testing": "config.TestingConfig"
, "production": "config.ProductionConfig", "default": "config.ProductionConfig" } USER_SECRET_KEY = 'asdfoasdjgio' VERIFY_DEBUG = True class BaseConfig(object): """Base class for default set of configs.""" DEBUG = False TESTING = False SECURITY_PASSWORD_HASH = 'pbkdf2_sha512' SECURITY_TRACKABLE = True LOGGING_FORMAT = "[%(asctime)s] [%(funcName)-30s] +\ [%(levelname)-6s] %(message)s" LOGGING_LOCATION = 'web.log' LOGGING_LEVEL = logging.DEBUG SECURITY_TOKEN_MAX_AGE = 60 * 30 SECURITY_CONFIRMABLE = False SQLALCHEMY_TRACK_MODIFICATIONS = False CACHE_TYPE = 'simple' SECURITY_PASSWORD_SALT = 'super-secret-stuff-here' COMPRESS_MIMETYPES = ['text/html', 'text/css', 'text/xml', 'application/json', 'application/javascript'] WTF_CSRF_ENABLED = False COMPRESS_LEVEL = 6 COMPRESS_MIN_SIZE = 500 # Change it based on your admin user, should ideally read from DB. ADMIN_USER = 'admin' ADMIN_PASSWORD = 'admin' JWT_EXPIRES = timedelta(minutes=10) class DevelopmentConfig(BaseConfig): """Default set of configurations for development mode.""" DEBUG = True TESTING = False BASEDIR = os.path.abspath(os.path.dirname(__file__)) SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(BASEDIR, 'app.db') SECRET_KEY = 'not-so-super-secret' JWT_SECRET_KEY = 'another_super_awesome_secret_stuff_yo.' SQLALCHEMY_TRACK_MODIFICATIONS = True class ProductionConfig(BaseConfig): """Default set of configurations for prod mode.""" DEBUG = False TESTING = False BASEDIR = os.path.abspath(os.path.dirname(__file__)) SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(BASEDIR, 'app.db') SECRET_KEY = 'Super-awesome-secret-stuff' SQLALCHEMY_TRACK_MODIFICATIONS = True JWT_SECRET_KEY = 'another_super_awesome_secret_stuff_yo.' class TestingConfig(BaseConfig): """Default set of configurations for test mode.""" DEBUG = False TESTING = True SQLALCHEMY_DATABASE_URI = 'sqlite://' SECRET_KEY = '792842bc-c4df-4de1-9177-d5207bd9faa6' JWT_SECRET_KEY = 'another_super_awesome_secret_stuff_yo.'

使用工廠來建立app

app/__init__.py

import os

from flask import Flask
from config import CONFIG
from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()


def create_app(config_name):
    """Configure the app w.r.t Flask-security, databases, loggers."""
    app = Flask(__name__)
    app.config.from_object(CONFIG[config_name])
    db.init_app(app)

    return app

使用manage來管理

manage.py

from app import create_app, db
from app.models import User
from flask_script import Manager, Shell
from flask_migrate import Migrate, MigrateCommand
from flask_jsonrpc import JSONRPC
import os

app = create_app(os.getenv('FLASK_CONFIG') or 'development')
jsonrpc = JSONRPC(app, '/api', enable_web_browsable_api=True, auth_backend=User.authenticate)
manager = Manager(app)
migrate = Migrate(app, db)


def make_shell_context():
    return dict(app=app, db=db, User=User)


manager.add_command("shell", Shell(make_context=make_shell_context))
manager.add_command('db', MigrateCommand)

import app.soapi.user

if __name__ == '__main__':
    manager.run()

使用manage建立資料庫

初始化

python manage.py init

建立歷史版本

python manage.py migrate -m ‘first’

建立資料庫

python manage.py upgrade

jsonrpc模組化

app/soapi/user.py

from flask import Blueprint

mod = Blueprint('user', __name__)
from manage import jsonrpc
jsonrpc.register_blueprint(mod)
from app.models import User
from app import db


@jsonrpc.method('user.register(username=str,password=str)')
def user_register(username, password):
    if not User.query.filter_by(username=username).first():
        user = User(username=username)
        user.password(password)
        db.session.add(user)
        db.session.commit()
        return {'status': 0, 'message': u'註冊成功'}
    else:
        return {'status': 1, 'message': u'使用者已存在'}


@jsonrpc.method('user.verify(username=str,password=str)')
def user_verify(username, password):
    user = User.query.filter_by(username=username).first()
    if not user:
        return {'status': 1, 'message': u'使用者名稱不存在'}
    if user.verify_password(password):
        token = user.generate_auth_token()
        return {'status': 0, 'message': u'歡迎%s' % username, 'token': token}
    return {'status': 1, 'message': u'密碼錯誤'}

然後在manage中import過來

import app.soapi.user

資料模型拆分

app/models.py

from itsdangerous import BadSignature, SignatureExpired, TimedJSONWebSignatureSerializer as Serializer
import werkzeug
from functools import wraps
from flask_jsonrpc import InvalidCredentialsError, InvalidParamsError
from app import db
from config import USER_SECRET_KEY, VERIFY_DEBUG


class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True)
    password_hash = db.Column(db.String(164))

    def password(self, password):
        """
        設定密碼hash值
        """
        self.password_hash = werkzeug.security.generate_password_hash(password)

    def verify_password(self, password):
        """
        將使用者輸入的密碼明文與資料庫比對
        """

        if self.password_hash:
            return werkzeug.security.check_password_hash(self.password_hash, password)
        return None

    def generate_auth_token(self, expiration=600):
        s = Serializer(USER_SECRET_KEY, expires_in=expiration)
        return bytes.decode(s.dumps({'id': self.id}))

    @staticmethod
    def verify_auth_token(token):
        s = Serializer(USER_SECRET_KEY)
        try:
            data = s.loads(token)
        except SignatureExpired:
            return None  # valid token, but expired
        except BadSignature:
            return None  # invalid token
        user = User.query.get(data['id'])
        return user

    @staticmethod
    def authenticate(f, f_check_auth):
        @wraps(f)
        def _f(*args, **kwargs):
            is_auth = False
            try:
                creds = args[:2]
                is_auth = f_check_auth(creds[0], creds[1])
                if is_auth:
                    args = args[2:]
            except IndexError:
                if 'token' in kwargs:
                    is_auth = f_check_auth(kwargs['token'])
                    if is_auth:
                        kwargs.pop('token')
                else:
                    raise InvalidParamsError('Authenticated methods require at least '
                                             '[token] or {token: } arguments')
            if not is_auth:
                raise InvalidCredentialsError()
            return f(*args, **kwargs)

        return _f

    @staticmethod
    def check_auth(token):
        # 啟用debug模式不需要進行token認證
        if VERIFY_DEBUG:
            return True
        user = User.verify_auth_token(token)
        if user:
            return True
        return False

    def __init__(self, username):
        self.username = username

    def __repr__(self):
        return '<User %r>' % self.username