使用flask從零構建自動化運維平臺系列二
阿新 • • 發佈:2019-01-09
文章目錄
寫程式碼也是一種藝術,結構層次感一定要好,這樣做出來的才是一個好作品。
程式碼管理
git
目錄結構
SmartOps ├── app │ ├── factory.py │ ├── __init__.py │ ├── models.py │ ├── server.py │ ├── soapi │ ├── static │ └── templates ├── config.py ├── manage.py ├── migrations ├── requirements.txt ├── tests └── venv
目錄結構用途說明
目錄 | 用途 |
---|---|
app | 存放flask應用程式碼 |
migrations | 資料庫遷移指令碼 |
tests | 單元測試 |
venv | 虛擬環境 |
目錄檔案說明
檔案 | 用途 |
---|---|
manage.py | 啟動程式以及其他的程式任務。 |
儲存配置 | |
requirements.txt | python依賴包 |
配置檔案
配置檔案決定程式做出什麼的行為,比較常用的就是開發環境,測試環境,正式環境三種,下面是個例子
import logging
import os
from datetime import timedelta
CONFIG = {
"development": "config.DevelopmentConfig",
"testing": "config.TestingConfig" ,
"production": "config.ProductionConfig",
"default": "config.ProductionConfig"
}
USER_SECRET_KEY = 'asdfoasdjgio'
VERIFY_DEBUG = True
class BaseConfig(object):
"""Base class for default set of configs."""
DEBUG = False
TESTING = False
SECURITY_PASSWORD_HASH = 'pbkdf2_sha512'
SECURITY_TRACKABLE = True
LOGGING_FORMAT = "[%(asctime)s] [%(funcName)-30s] +\
[%(levelname)-6s] %(message)s"
LOGGING_LOCATION = 'web.log'
LOGGING_LEVEL = logging.DEBUG
SECURITY_TOKEN_MAX_AGE = 60 * 30
SECURITY_CONFIRMABLE = False
SQLALCHEMY_TRACK_MODIFICATIONS = False
CACHE_TYPE = 'simple'
SECURITY_PASSWORD_SALT = 'super-secret-stuff-here'
COMPRESS_MIMETYPES = ['text/html', 'text/css', 'text/xml',
'application/json', 'application/javascript']
WTF_CSRF_ENABLED = False
COMPRESS_LEVEL = 6
COMPRESS_MIN_SIZE = 500
# Change it based on your admin user, should ideally read from DB.
ADMIN_USER = 'admin'
ADMIN_PASSWORD = 'admin'
JWT_EXPIRES = timedelta(minutes=10)
class DevelopmentConfig(BaseConfig):
"""Default set of configurations for development mode."""
DEBUG = True
TESTING = False
BASEDIR = os.path.abspath(os.path.dirname(__file__))
SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(BASEDIR, 'app.db')
SECRET_KEY = 'not-so-super-secret'
JWT_SECRET_KEY = 'another_super_awesome_secret_stuff_yo.'
SQLALCHEMY_TRACK_MODIFICATIONS = True
class ProductionConfig(BaseConfig):
"""Default set of configurations for prod mode."""
DEBUG = False
TESTING = False
BASEDIR = os.path.abspath(os.path.dirname(__file__))
SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(BASEDIR, 'app.db')
SECRET_KEY = 'Super-awesome-secret-stuff'
SQLALCHEMY_TRACK_MODIFICATIONS = True
JWT_SECRET_KEY = 'another_super_awesome_secret_stuff_yo.'
class TestingConfig(BaseConfig):
"""Default set of configurations for test mode."""
DEBUG = False
TESTING = True
SQLALCHEMY_DATABASE_URI = 'sqlite://'
SECRET_KEY = '792842bc-c4df-4de1-9177-d5207bd9faa6'
JWT_SECRET_KEY = 'another_super_awesome_secret_stuff_yo.'
使用工廠來建立app
app/__init__.py
import os
from flask import Flask
from config import CONFIG
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
def create_app(config_name):
"""Configure the app w.r.t Flask-security, databases, loggers."""
app = Flask(__name__)
app.config.from_object(CONFIG[config_name])
db.init_app(app)
return app
使用manage來管理
from app import create_app, db
from app.models import User
from flask_script import Manager, Shell
from flask_migrate import Migrate, MigrateCommand
from flask_jsonrpc import JSONRPC
import os
app = create_app(os.getenv('FLASK_CONFIG') or 'development')
jsonrpc = JSONRPC(app, '/api', enable_web_browsable_api=True, auth_backend=User.authenticate)
manager = Manager(app)
migrate = Migrate(app, db)
def make_shell_context():
return dict(app=app, db=db, User=User)
manager.add_command("shell", Shell(make_context=make_shell_context))
manager.add_command('db', MigrateCommand)
import app.soapi.user
if __name__ == '__main__':
manager.run()
使用manage建立資料庫
初始化
python manage.py init
建立歷史版本
python manage.py migrate -m ‘first’
建立資料庫
python manage.py upgrade
jsonrpc模組化
app/soapi/user.py
from flask import Blueprint
mod = Blueprint('user', __name__)
from manage import jsonrpc
jsonrpc.register_blueprint(mod)
from app.models import User
from app import db
@jsonrpc.method('user.register(username=str,password=str)')
def user_register(username, password):
if not User.query.filter_by(username=username).first():
user = User(username=username)
user.password(password)
db.session.add(user)
db.session.commit()
return {'status': 0, 'message': u'註冊成功'}
else:
return {'status': 1, 'message': u'使用者已存在'}
@jsonrpc.method('user.verify(username=str,password=str)')
def user_verify(username, password):
user = User.query.filter_by(username=username).first()
if not user:
return {'status': 1, 'message': u'使用者名稱不存在'}
if user.verify_password(password):
token = user.generate_auth_token()
return {'status': 0, 'message': u'歡迎%s' % username, 'token': token}
return {'status': 1, 'message': u'密碼錯誤'}
然後在manage中import過來
import app.soapi.user
資料模型拆分
app/models.py
from itsdangerous import BadSignature, SignatureExpired, TimedJSONWebSignatureSerializer as Serializer
import werkzeug
from functools import wraps
from flask_jsonrpc import InvalidCredentialsError, InvalidParamsError
from app import db
from config import USER_SECRET_KEY, VERIFY_DEBUG
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True)
password_hash = db.Column(db.String(164))
def password(self, password):
"""
設定密碼hash值
"""
self.password_hash = werkzeug.security.generate_password_hash(password)
def verify_password(self, password):
"""
將使用者輸入的密碼明文與資料庫比對
"""
if self.password_hash:
return werkzeug.security.check_password_hash(self.password_hash, password)
return None
def generate_auth_token(self, expiration=600):
s = Serializer(USER_SECRET_KEY, expires_in=expiration)
return bytes.decode(s.dumps({'id': self.id}))
@staticmethod
def verify_auth_token(token):
s = Serializer(USER_SECRET_KEY)
try:
data = s.loads(token)
except SignatureExpired:
return None # valid token, but expired
except BadSignature:
return None # invalid token
user = User.query.get(data['id'])
return user
@staticmethod
def authenticate(f, f_check_auth):
@wraps(f)
def _f(*args, **kwargs):
is_auth = False
try:
creds = args[:2]
is_auth = f_check_auth(creds[0], creds[1])
if is_auth:
args = args[2:]
except IndexError:
if 'token' in kwargs:
is_auth = f_check_auth(kwargs['token'])
if is_auth:
kwargs.pop('token')
else:
raise InvalidParamsError('Authenticated methods require at least '
'[token] or {token: } arguments')
if not is_auth:
raise InvalidCredentialsError()
return f(*args, **kwargs)
return _f
@staticmethod
def check_auth(token):
# 啟用debug模式不需要進行token認證
if VERIFY_DEBUG:
return True
user = User.verify_auth_token(token)
if user:
return True
return False
def __init__(self, username):
self.username = username
def __repr__(self):
return '<User %r>' % self.username