1. 程式人生 > 其它 >用手機寫程式碼:基於 Serverless 的線上程式設計能力探索

用手機寫程式碼:基於 Serverless 的線上程式設計能力探索

簡介:Serverless 架構的按量付費模式,可以在保證線上程式設計功能效能的前提下,進一步降低成本。本文將會以阿里雲函式計算為例,通過 Serverless 架構實現一個 Python 語言的線上程式設計功能,並對該功能進一步的優化,使其更加貼近本地原生代碼執行體驗。

隨著電腦科學與技術的發展,越來越多的人開始接觸程式設計,也有越來越多的線上程式設計平臺誕生。以 Python 語言的線上程式設計平臺為例,大致可以分為兩類:

  • 一類是 OJ 型別的,即線上評測的程式設計平臺,這類的平臺特點是阻塞型別的執行,即使用者需要一次性將程式碼和標準輸入內容提交,當程式執行完成會一次性將結果返回;
  • 另一類則是學習、工具類的線上程式設計平臺,例如 Anycodes 線上程式設計等網站,這一類平臺的特點是非阻塞型別的執行,即使用者可以實時看到程式碼執行的結果,以及可以實時內容進行內容的輸入。

但是,無論是那種型別的線上程式設計平臺,其背後的核心模組( “程式碼執行器”或“判題機”)都是極具有研究價值,一方面,這類網站通常情況下都需要比要嚴格的“安全機制”,例如程式會不會有惡意程式碼,出現死迴圈、破壞計算機系統等,程式是否需要隔離執行,執行時是否會獲取到其他人提交的程式碼等;

另一方面,這類平臺通常情況下都會對資源消耗比較大,尤其是比賽來臨時,更是需要突然間對相關機器進行擴容,必要時需要大規模叢集來進行應對。同時這類網站通常情況下也都有一個比較大的特點,那就是觸發式,即每個程式碼執行前後實際上並沒有非常緊密的前後文關係等。

隨著 Serverless 架構的不斷髮展,很多人發現 Serverless 架構的請求級隔離和極致彈性等特性可以解決傳統線上程式設計平臺所遇到的安全問題和資源消耗問題,Serverless 架構的按量付費模式,可以在保證線上程式設計功能效能的前提下,進一步降低成本。

所以,通過 Serverless 架構實現線上程式設計功能的開發就逐漸的被更多人所關注和研究。本文將會以阿里雲函式計算為例,通過 Serverless 架構實現一個 Python 語言的線上程式設計功能,並對該功能進一步的優化,使其更加貼近本地原生代碼執行體驗。

線上程式設計功能開發

一個比較簡單的、典型的線上程式設計功能,線上執行模組通常情況下是需要以下幾個能力:

  • 線上執行程式碼
  • 使用者可以輸入內容
  • 可以返回結果(標準輸出、標準錯誤等)

除了線上程式設計所需要實現的功能之外,線上程式設計在 Serverless 架構下,所需要實現的業務邏輯,也僅僅被收斂到關注程式碼執行模組即可:獲取客戶端傳送的程式資訊(包括程式碼、標準輸入等),將程式碼快取到本地,執行程式碼,獲取結果,但會給客戶端,整個架構的流程簡圖為:

關於執行程式碼部分,可以通過 Python 語言的 subprocess 依賴中的 Popen() 方法實現,在使用 Popen() 方法時,有幾個比較重要的概念,需要明確:
  • subprocess.PIPE:一個可以被用於 Popen 的stdin 、stdout 和 stderr 3 個引數的特殊值,表示需要建立一個新的管道;
  • subprocess.STDOUT:一個可以被用於 Popen 的 stderr 引數的輸出值,表示子程式的標準錯誤匯合到標準輸出;

所以,當我們想要實現可以:

進行標準輸入(stdin),獲取標準輸出(stdout)以及標準錯誤(stderr)的功能

可以簡化程式碼實現為:

除程式碼執行部分之外,在 Serverless 架構下,獲取到使用者程式碼並將其儲存過程中,需要額外注意函式例項中目錄的讀寫許可權。通常情況下,在函式計算中,如果不進行硬碟掛載,只有/tmp/目錄是有可寫入許可權的。所以在該專案中,我們將使用者傳遞到服務端的程式碼進行臨時儲存時,需要將其寫入臨時目錄/tmp/,在臨時儲存程式碼的時候,還需要額外考慮例項複用的情況,所以此時,可以為臨時程式碼提供臨時的檔名,例如:

# -*- coding: utf-8 -*-

import randomrandom

Str = lambda num=5: "".join(random.sample('abcdefghijklmnopqrstuvwxyz', num))

path = "/tmp/%s"% randomStr(5)

完整的程式碼實現為:

# -*- coding: utf-8 -*-

import json

import uuid

import random

import subprocess

# 隨機字串

randomStr = lambda num=5: "".join(random.sample('abcdefghijklmnopqrstuvwxyz', num))

# Response

class Response:

def __init__(self, start_response, response, errorCode=None):

self.start = start_response

responseBody = {

'Error': {"Code": errorCode, "Message": response},

} if errorCode else {

'Response': response

}

# 預設增加uuid,便於後期定位

responseBody['ResponseId'] = str(uuid.uuid1())

self.response = json.dumps(responseBody)

def __iter__(self):

status = '200'

response_headers = [('Content-type', 'application/json; charset=UTF-8')]

self.start(status, response_headers)

yield self.response.encode("utf-8")

def WriteCode(code, fileName):

try:

with open(fileName, "w") as f:

f.write(code)

return True

except Exception as e:

print(e)

return False

def RunCode(fileName, input_data=""):

child = subprocess.Popen("python %s" % (fileName),

stdin=subprocess.PIPE,

stdout=subprocess.PIPE,

stderr=subprocess.STDOUT,

shell=True)

output = child.communicate(input=input_data.encode("utf-8"))

return output[0].decode("utf-8")

def handler(environ, start_response):

try:

request_body_size = int(environ.get('CONTENT_LENGTH', 0))

except (ValueError):

request_body_size = 0

requestBody = json.loads(environ['wsgi.input'].read(request_body_size).decode("utf-8"))

code = requestBody.get("code", None)

inputData = requestBody.get("input", "")

fileName = "/tmp/" + randomStr(5)

responseData = RunCode(fileName, inputData) if code and WriteCode(code, fileName) else "Error"

return Response(start_response, {"result": responseData})

完成核心的業務邏輯編寫之後,我們可以將程式碼部署到阿里雲函式計算中。部署完成之後,我們可以獲得到介面的臨時測試地址。通過 PostMan 對該介面進行測試,以 Python 語言的輸出語句為例:

print('HELLO WORLD')

可以看到,當我們通過 POST 方法,攜帶程式碼等作為引數,發起請求後,獲得到的響應為:

我們通過響應結果,可以看到,系統是可以正常輸出我們的預期結果:“HELLO WORLD” 至此我們完成了標準輸出功能的測試,接下來我們對標準錯誤等功能進行測試,此時我們將剛剛的輸出程式碼進行破壞:

print('HELLO WORLD)

使用同樣的方法,再次進行程式碼執行,可以看到結果:

結果中,我們可以看到 Python 的報錯資訊,是符合我們的預期的,至此完成了線上程式設計功能的標準錯誤功能的測試,接下來,我們進行標準輸入功能的測試,由於我們使用的 subprocess.Popen() 方法,是一種阻塞方法,所以此時我們需要將程式碼和標準輸入內容一同放到服務端。測試的程式碼為:

tempInput = input('please input: ')

print('Output: ', tempInput)

測試的標準輸入內容為:“serverless devs”。

當我們使用同樣的方法,發起請求之後,我們可以看到:

系統是正常輸出預期的結果。至此我們完成了一個非常簡單的線上程式設計服務的介面。該介面目前只是初級版本,僅用於學習使用,其具有極大的優化空間:
  • 超時時間的處理
  • 程式碼執行完成,可以進行清理

當然,通過這個介面也可以看到這樣一個問題:那就是程式碼執行過程中是阻塞的,我們沒辦法進行持續性的輸入,也沒有辦法實時輸出,即使需要輸入內容也是需要將程式碼和輸入內容一併傳送到服務端。這種模式和目前市面上常見的 OJ 模式很類似,但是就單純的線上程式設計而言,還需要進一步對專案優化,使其可以通過非阻塞方法,實現程式碼的執行,並且可以持續性的進行輸入操作,持續性的進行內容輸出。

更貼近“本地”的程式碼執行器

我們以一段程式碼為例:

import time

print("hello world")

time.sleep(10)

tempInput = input("please: ")

print("Input data: ", tempInput)

當我們在本地的執行這段 Python 程式碼時,整體的使用者側的實際表現是:

  • 系統輸出 hello world
  • 系統等待 10 秒
  • 系統提醒我們 please,我們此時可以輸入一個字串
  • 系統輸出 Input data 以及我們剛剛輸入的字串

但是,這段程式碼如果應用於傳統 OJ 或者剛剛我們所實現的線上程式設計系統中,表現則大不相同:

  • 程式碼與我們要輸入內容一同傳給系統
  • 系統等待 10 秒
  • 輸出 hello world、please,以及最後輸 Input data 和我們輸入的內容

可以看到,OJ 模式上的線上程式設計功能和本地是有非常大的差距的,至少在體驗層面,這個差距是比較大的。為了減少這種體驗不統一的問題,我們可以將上上述的架構進一步升級,通過函式的非同步觸發,以及 Python 語言的 pexpect.spawn() 方法實現一款更貼近本地體驗的線上程式設計功能:

在整個專案中,包括了兩個函式,兩個儲存桶:
  • 業務邏輯函式:該函式的主要操作是業務邏輯,包括建立程式碼執行的任務(通過物件儲存觸發器進行非同步函式執行),以及獲取函式輸出結果以及對任務函式的標準輸入進行相關操作等;
  • 執行器函式:該函式的主要作用是執行使用者的函式程式碼,這部分是通過物件儲存觸發,通過下載程式碼、執行程式碼、獲取輸入、輸出結果等;程式碼獲取從程式碼儲存桶,輸出結果和獲取輸入從業務儲存桶;
  • 程式碼儲存桶:該儲存桶的作用是儲存程式碼,當用戶發起執行程式碼的請求, 業務邏輯函式收到使用者程式碼後,會將程式碼儲存到該儲存桶,再由該儲存桶處罰非同步任務;
  • 業務儲存桶:該儲存桶的作用是中間量的輸出,主要包括輸出內容的快取、輸入內容的快取;該部分資料可以通過物件儲存的本身特性進行生命週期的制定;

為了讓程式碼線上執行起來,更加貼近本地體驗,該方案的程式碼分為兩個函式,分別進行業務邏輯處理和線上程式設計核心功能。

其中業務邏輯處理函式,主要是:

  • 獲取使用者的程式碼資訊,生成程式碼執行 ID,並將程式碼存到物件儲存,非同步觸發線上程式設計函式的執行,返回生成程式碼執行 ID;
  • 獲取使用者的輸入資訊和程式碼執行 ID,並將內容儲存到對應的物件儲存中;
  • 獲取程式碼的輸出結果,根據使用者指定的程式碼執行 ID,將執行結果從物件儲存中讀取出來,並返回給使用者;

整體的業務邏輯為:

實現的程式碼為:

# -*- coding: utf-8 -*-

import os

import oss2

import json

import uuid

import random

# 基本配置資訊

AccessKey = {

"id": os.environ.get('AccessKeyId'),

"secret": os.environ.get('AccessKeySecret')

}

OSSCodeConf = {

'endPoint': os.environ.get('OSSConfEndPoint'),

'bucketName': os.environ.get('OSSConfBucketCodeName'),

'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))

}

OSSTargetConf = {

'endPoint': os.environ.get('OSSConfEndPoint'),

'bucketName': os.environ.get('OSSConfBucketTargetName'),

'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))

}

# 獲取獲取/上傳檔案到OSS的臨時地址

auth = oss2.Auth(AccessKey['id'], AccessKey['secret'])

codeBucket = oss2.Bucket(auth, OSSCodeConf['endPoint'], OSSCodeConf['bucketName'])

targetBucket = oss2.Bucket(auth, OSSTargetConf['endPoint'], OSSTargetConf['bucketName'])

# 隨機字串

randomStr = lambda num=5: "".join(random.sample('abcdefghijklmnopqrstuvwxyz', num))

# Response

class Response:

def __init__(self, start_response, response, errorCode=None):

self.start = start_response

responseBody = {

'Error': {"Code": errorCode, "Message": response},

} if errorCode else {

'Response': response

}

# 預設增加uuid,便於後期定位

responseBody['ResponseId'] = str(uuid.uuid1())

self.response = json.dumps(responseBody)

def __iter__(self):

status = '200'

response_headers = [('Content-type', 'application/json; charset=UTF-8')]

self.start(status, response_headers)

yield self.response.encode("utf-8")

def handler(environ, start_response):

try:

request_body_size = int(environ.get('CONTENT_LENGTH', 0))

except (ValueError):

request_body_size = 0

requestBody = json.loads(environ['wsgi.input'].read(request_body_size).decode("utf-8"))

reqType = requestBody.get("type", None)

if reqType == "run":

# 執行程式碼

code = requestBody.get("code", None)

runId = randomStr(10)

codeBucket.put_object(runId, code.encode("utf-8"))

responseData = runId

elif reqType == "input":

# 輸入內容

inputData = requestBody.get("input", None)

runId = requestBody.get("id", None)

targetBucket.put_object(runId + "-input", inputData.encode("utf-8"))

responseData = 'ok'

elif reqType == "output":

# 獲取結果

runId = requestBody.get("id", None)

targetBucket.get_object_to_file(runId + "-output", '/tmp/' + runId)

with open('/tmp/' + runId) as f:

responseData = f.read()

else:

responseData = "Error"

return Response(start_response, {"result": responseData})

執行器函式,主要是通過程式碼儲存桶觸發,從而進行程式碼執行的模組,這一部分主要包括:

  • 從儲存桶獲取程式碼,並通過 pexpect.spawn() 進行程式碼執行;
  • 通過 pexpect.spawn().read_nonblocking() 非阻塞的獲取間斷性的執行結果,並寫入到物件儲存;
  • 通過 pexpect.spawn().sendline() 進行內容輸入;

整體流程為:

程式碼實現為:

# -*- coding: utf-8 -*-

import os

import re

import oss2

import json

import time

import pexpect

# 基本配置資訊

AccessKey = {

"id": os.environ.get('AccessKeyId'),

"secret": os.environ.get('AccessKeySecret')

}

OSSCodeConf = {

'endPoint': os.environ.get('OSSConfEndPoint'),

'bucketName': os.environ.get('OSSConfBucketCodeName'),

'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))

}

OSSTargetConf = {

'endPoint': os.environ.get('OSSConfEndPoint'),

'bucketName': os.environ.get('OSSConfBucketTargetName'),

'objectSignUrlTimeOut': int(os.environ.get('OSSConfObjectSignUrlTimeOut'))

}

# 獲取獲取/上傳檔案到OSS的臨時地址

auth = oss2.Auth(AccessKey['id'], AccessKey['secret'])

codeBucket = oss2.Bucket(auth, OSSCodeConf['endPoint'], OSSCodeConf['bucketName'])

targetBucket = oss2.Bucket(auth, OSSTargetConf['endPoint'], OSSTargetConf['bucketName'])

def handler(event, context):

event = json.loads(event.decode("utf-8"))

for eveEvent in event["events"]:

# 獲取object

code = eveEvent["oss"]["object"]["key"]

localFileName = "/tmp/" + event["events"][0]["oss"]["object"]["eTag"]

# 下載程式碼

codeBucket.get_object_to_file(code, localFileName)

# 執行程式碼

foo = pexpect.spawn('python %s' % localFileName)

outputData = ""

startTime = time.time()

# timeout可以通過檔名來進行識別

try:

timeout = int(re.findall("timeout(.*?)s", code)[0])

except:

timeout = 60

while (time.time() - startTime) / 1000 <= timeout:

try:

tempOutput = foo.read_nonblocking(size=999999, timeout=0.01)

tempOutput = tempOutput.decode("utf-8", "ignore")

if len(str(tempOutput)) > 0:

outputData = outputData + tempOutput

# 輸出資料存入oss

targetBucket.put_object(code + "-output", outputData.encode("utf-8"))

except Exception as e:

print("Error: ", e)

# 有輸入請求被阻塞

if str(e) == "Timeout exceeded.":

try:

# 從oss讀取資料

targetBucket.get_object_to_file(code + "-input", localFileName + "-input")

targetBucket.delete_object(code + "-input")

with open(localFileName + "-input") as f:

inputData = f.read()

if inputData:

foo.sendline(inputData)

except:

pass

# 程式執行完成輸出

elif "End Of File (EOF)" in str(e):

targetBucket.put_object(code + "-output", outputData.encode("utf-8"))

return True

# 程式丟擲異常

else:

outputData = outputData + "\n\nException: %s" % str(e)

targetBucket.put_object(code + "-output", outputData.encode("utf-8"))

return False

當我們完成核心的業務邏輯編寫之後,我們可以將專案部署到線上。

專案部署完成之後,和上文的測試方法一樣,在這裡也通過 PostMan 對介面進行測試。此時,我們需要設定一個覆蓋能較全的測試程式碼,包括輸出列印、輸入、一些 sleep() 等方法:

當我們通過 PostMan 發起請求執行這段程式碼之後,我們可以看到系統為我們返回了預期的程式碼執行 ID:

我們可以看到系統會返回給我們一個程式碼執行 ID,該執行 ID 將會作為我們整個請求任務的 ID,此時,我們可以通過獲取輸出結果的介面,來獲取結果:

由於程式碼中有:

time.sleep(10)

所以,迅速獲得結果的時候是看不到後半部分的輸出結果,我們可以設定一個輪訓任務,不斷通過該 ID 對介面進行重新整理:

可以看到,10 秒鐘後,程式碼執行到了輸入部分:

tempInput = input('please: ')

此時,我們再通過輸入介面,進行輸入操作:

完成之後,我們可以看到輸入成功(result: ok)的結果,此時我們繼續重新整理之前獲取結果部分的請求:

可以看到,我們已經獲得到了所有結果的輸出。

相對於上文的線上程式設計功能,這種“更貼近本地的程式碼執行器“變得複雜了很多,但是在實際使用的過程中,卻可以更好的模擬出本地執行程式碼時的一些現象,例如程式碼的休眠、阻塞、內容的輸出等。

總結

無論是簡單的線上程式碼執行器部分,還是更貼近“本地”的程式碼執行器部分,這篇文章在所應用的內容是相對廣泛的。通過這篇文章你可以看到:

  • HTTP 觸發器的基本使用方法;物件儲存觸發器的基本使用方;
  • 函式計算元件、物件儲存元件的基本使用方法,元件間依賴的實現方法;

同時,通過這篇文章,也可以從一個側面看到這樣一個常見問題的簡單解答:我有一個專案,我是每個介面一個函式,還是多個介面複用一個函式?

針對這個問題,其實最主要的是看業務本身的訴求,如果多個介面表達的含義是一致的,或者是同類的,類似的,並且多個介面的資源消耗是類似的,那麼放在一個函式中來通過不同的路徑進行區分是完全可以的;如果出現資源消耗差距較大,或者函式型別、規模、類別區別過大的時候,將多個介面放在多個函式下也是沒有問題的。

本文實際上是拋磚引玉,無論是 OJ 系統的“判題機”部分,還是線上程式設計工具的“執行器部分”,都可以很好的和 Serverless 架構有著比較有趣的結合點。這種結合點不僅僅可以解決傳統線上程式設計所頭疼的事情(安全問題,資源消耗問題,併發問題,流量不穩定問題),更可以將 Serverless 的價值在一個新的領域發揮出來。

原文連結
本文為阿里雲原創內容,未經允許不得轉載。