1. 程式人生 > >藍鯨平臺Celery後臺任務進度獲取程式碼

藍鯨平臺Celery後臺任務進度獲取程式碼

celery_tasks.py

# -*- coding: utf-8 -*-
"""
Tencent is pleased to support the open source community by making 藍鯨智雲(BlueKing) available.
Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.

celery 任務示例

本地啟動celery命令: python  manage.py  celery  worker  --settings=settings
週期性任務還需要啟動celery排程命令:python  manage.py  celerybeat --settings=settings
"""
import datetime
import json
import traceback
import urllib2
import base64
from celery.task import task
import time
from celery.schedules import crontab
from celery.task import periodic_task
from common.log import logger

from home_application.dce import DCE_MAIN_IP, ADMIN_NAME, ADMIN_PWD, check_deployment_podcnt


@task(bind=True)
def update_deployment(self, app_name, deployment_name, replicas, labels, image, container_port, cpu, memory,
                      health_check_url, namespace='default'):
    # 縮replicas為replicas-1,空出一個ip
    post_url = 'http://' + DCE_MAIN_IP + '/apis/apps/v1/namespaces/' + namespace + '/deployments/' + deployment_name
    data = {"metadata": {"annotations": {"kubernetes.io/change-cause": "update replica"}},
            "spec": {"replicas": replicas - 1}}
    req = urllib2.Request(post_url, json.dumps(data))
    req.add_header('Content-Type', 'application/strategic-merge-patch+json')
    req.add_header("Authorization", "Basic " + base64.b64encode(ADMIN_NAME + ':' + ADMIN_PWD))
    req.get_method = lambda: 'PATCH'
    print(data)
    try:
        response = urllib2.urlopen(req)
        if response.code == 200:
            # result=json.loads(response.readline())
            print('replicas->replicas-1')
    except:
        print('error para:url:' + post_url)
        print('error data:' + json.dumps(data))
        traceback.print_exc()
        return 'RequestError'

    # 等待縮replicas完畢
    i = 0
    while check_deployment_podcnt(deployment_name, namespace) != replicas - 1 and i < 100:
        i += 5
        self.update_state(state='PROGRESS', meta={'i': i})
        time.sleep(5)
        print('sleep 5')

    labels = {} if not labels else labels  # 防止為None
    post_url = 'http://' + DCE_MAIN_IP + '/apis/apps/v1beta1/namespaces/' + namespace + '/deployments/' + deployment_name
    data = {
        "apiVersion": "apps/v1beta1",
        "kind": "Deployment",
        "metadata": {
            "name": deployment_name,
            "labels": {
                "dce.daocloud.io/app": app_name,
                "dce.daocloud.io/component": deployment_name
            }
        },
        "spec": {
            "terminationGracePeriodSeconds": 66,
            "selector": {
                "matchLabels": {
                    "dce.daocloud.io/component": deployment_name}},
            "revisionHistoryLimit": 10,
            "template": {
                "spec": {
                    "volumes": [
                        {"name": deployment_name + "-logs",
                         "hostPath":
                             {"path": "/logs/" + deployment_name,
                              "type": ""}}],
                    "containers": [
                        {
                            "image": image,
                            "name": deployment_name,
                            "livenessProbe":  # kubernetes認為該pod是存活的,不存活則需要重啟
                                {"httpGet":
                                     {"path": health_check_url,
                                      "port": container_port,
                                      "scheme": "HTTP"},
                                 "initialDelaySeconds": 60,
                                 # equals to the maximum startup time of the application + couple of seconds
                                 "timeoutSeconds": 5,
                                 "successThreshold": 1,
                                 "failureThreshold": 5},
                            "readinessProbe":  # kubernetes認為該pod是啟動成功的
                                {"httpGet":
                                     {"path": health_check_url,
                                      "port": container_port,
                                      "scheme": "HTTP", },
                                 "initialDelaySeconds": 60,  # equals to minimum startup time of the application
                                 "timeoutSeconds": 5,
                                 "successThreshold": 1,
                                 "failureThreshold": 5},
                            "resources": {"limits":
                                              {"cpu": cpu,
                                               "memory": memory}},
                            "ports": [
                                {
                                    "containerPort": container_port
                                }
                            ],
                            "volumeMounts": [
                                {"name": deployment_name + "-logs",
                                 "mountPath": "/home/tomcat/apache-tomcat-9.0.8/logs"}]
                        }
                    ],
                    "affinity": {
                        "nodeAffinity": {
                            "preferredDuringSchedulingIgnoredDuringExecution": [
                                {"weight": 1,
                                 "preference":
                                     {"matchExpressions": [
                                         {"key": "as.stat",
                                          "operator": "In",
                                          "values":
                                              ["active"]}
                                     ]}
                                 }]
                        },
                        "podAntiAffinity": {
                            "requiredDuringSchedulingIgnoredDuringExecution": [
                                {"labelSelector":
                                    {"matchExpressions": [
                                        {"key": "name", "operator": "In", "values": [deployment_name]}
                                    ]}
                                    ,
                                    "topologyKey": "kubernetes.io/hostname"}
                            ]}}
                },
                "metadata": {
                    "name": deployment_name,
                    "labels": {
                        "dce.daocloud.io/app": app_name,
                        "dce.daocloud.io/component": deployment_name
                    }
                }
            },
            "replicas": replicas - 1,
            "strategy":
                {"rollingUpdate":  # 由於replicas為3,則整個升級,pod個數在2-3個之間
                     {"maxSurge": 0,  # 滾動升級時會先啟動0個pod
                      "maxUnavailable": 1}}  # 滾動升級時允許的最大Unavailable的pod個數
        }
    }
    data['metadata']['labels'].update(labels)
    data['spec']['template']["metadata"]["labels"].update(labels)
    req = urllib2.Request(post_url, json.dumps(data))
    req.add_header('Content-Type', 'application/json')
    req.add_header("Authorization", "Basic " + base64.b64encode(ADMIN_NAME + ':' + ADMIN_PWD))
    req.get_method = lambda: 'PUT'
    print(data)
    try:
        response = urllib2.urlopen(req)
        if response.code == 200:
            # result=json.loads(response.readline())
            print('Created')
        else:
            return 'RequestError'
    except:
        print('error para:url:' + post_url)
        print('error data:' + json.dumps(data))
        traceback.print_exc()
        return 'RequestError'

    # 等待update完畢
    while check_deployment_podcnt(deployment_name, namespace) != replicas - 1 and i < 100:
        i += 5
        self.update_state(state='PROGRESS', meta={'i': i})
        time.sleep(5)
        print('sleep 5')
    # 縮replicas為replicas-1,空出一個ip
    post_url = 'http://' + DCE_MAIN_IP + '/apis/apps/v1/namespaces/' + namespace + '/deployments/' + deployment_name
    data = {"metadata": {"annotations": {"kubernetes.io/change-cause": "update replica"}},
            "spec": {"replicas": replicas}}
    req = urllib2.Request(post_url, json.dumps(data))
    req.add_header('Content-Type', 'application/strategic-merge-patch+json')
    req.add_header("Authorization", "Basic " + base64.b64encode(ADMIN_NAME + ':' + ADMIN_PWD))
    req.get_method = lambda: 'PATCH'

    print(data)
    try:
        response = urllib2.urlopen(req)
        if response.code == 200:
            # result=json.loads(response.readline())
            print('replicas-1->replicas')
            return 'Created'
    except:
        print('error para:url:' + post_url)
        print('error data:' + json.dumps(data))
        traceback.print_exc()
        return 'RequestError'

def deploy_image_dce(proj_code, category, name, module_id, ver, urlpath, instance_num, node_port, cpu, memory):
	async_task_id = '0'
 	if result == 'RequestError':
        return '{"code":"ERROR","info":"read_namespaced_deployment:RequestError"}'
    elif result == 'Find':
        # ---------------------------------------------Celery後臺任務---------------------------------------------------
        async_result = update_deployment.delay(proj_code, deployment_name, replicas, labels, image, container_port, cpu,
                                               memory, urlpath, namespace)
        async_task_id = async_result.id
        # ---------------------------------------------Celery後臺任務---------------------------------------------------
    DCE_MAIN_IP = '1.1.1.1'
    return '{"code":"OK","async_task_id":"' + str(async_task_id) + '","info":"http://' + DCE_MAIN_IP + ':' + str(
        node_port) + (urlpath if urlpath else '/') + '"}'

def task_status(request):
    """
    後臺任務狀態查詢
    :param request:
    :return:
    """
    the_task_id = request.GET.get('the_task_id')
    the_task = update_deployment.AsyncResult(the_task_id)
    print_util('狀態==>', "任務:{0} 當前的 state 為:{1}".format(the_task_id, the_task.state))
    if the_task.state == 'PROGRESS':
        resp = {'state': 'progress', 'progress': the_task.info.get('i', 0)}
    elif the_task.state == 'SUCCESS':
        resp = {'state': "success", 'progress': 100}
    elif the_task.state == 'PENDING':  # 任務處於排隊之中
        resp = {'state': 'waiting', 'progress': 0}
    else:
        resp = {'state': the_task.state, 'progress': the_task.info.get('i', 0)}
    return JsonResponse(resp)

deploy.html

<script type="text/javascript">

    function commit(func,func_name) {
        $("#myModalLabel").text(func_name);
        $("#myModalLabelBody").text("是否 "+func_name);
        $("#btn_submit").attr("onclick","commitYes('"+func+"','"+func_name+"');");
        $('#myModal').modal({backdrop: 'static',keyboard: false});
    }

    function commitYes(func,func_name){
        var form_data = new FormData();
        form_data.append('csrfmiddlewaretoken',$("[name='csrfmiddlewaretoken']").val());
        form_data.append('ver',$(".fun_deploy").parent().parent().children("td").eq(1).html());
        form_data.append('module',$(".fun_deploy").parent().parent().children("td").eq(3).html());
        $.ajax({
            url:'${SITE_URL}deploy_image/',
            type:'POST',
            data: form_data,
            processData: false,  // tell jquery not to process the data
            contentType: false, // tell jquery not to set contentType
            success: function(result) {
                the_task_id = result.async_task_id;
                //alert(the_task_id);
                if (the_task_id!=="0"){
                    $('#progressModal').modal({backdrop: 'static',keyboard: false});
                    $("#bar_show_area").append("<div class='task_div'>任務:"+the_task_id+"<span id='"+the_task_id+"_span' style='float:right'></span><div id='"+the_task_id+"'></div><div>"); //進度條繫結區域
                    // 建立進度條物件
                    var nanobar = new Nanobar({target:document.getElementById(the_task_id)});
                    update_progress(the_task_id,nanobar);
                }
                if (result.code==="OK") {
                    alert(result.info);
                    window.open(result.info, 'newwindow');
                }
                else if(result.code==="WARNING" && result.info==='no plan') {
                    alert('當前不在釋出時間視窗,禁止釋出');
                }
                else {
                    alert(result.info);
                }
            },
             error: function () {
               alert('釋出失敗!請聯絡系統管理員');
             }
        });
    }

    //更新進度條的函式
    function update_progress(the_task_id,nanobar){
      $.get("${SITE_URL}task_status/?the_task_id="+the_task_id,function(data){
        percent = parseInt(data['progress']);
        // alert(percent);
        nanobar.go(percent);
        // if (data['state'] != 'PENDING' && data['state'] != 'PROGRESS'){//state == 'success'
        if (data['state'] === 'success'){
          //alert("任務完成");
          // $("#"+the_task_id).append('完成');
          $("#"+the_task_id+"_span").empty();
          $("#"+the_task_id+"_span").append(percent+'%'+'完成');

          $("#"+the_task_id).parent().css("background","#def0d8");

        }
        else{
          $("#"+the_task_id+"_span").empty();
          $("#"+the_task_id+"_span").append(percent+'%');
          setTimeout(function(){
            update_progress(the_task_id,nanobar);
          },1000);
        }
      });
    }
</script>

<!--進度模態框-->
<div class="modal fade" id="progressModal" tabindex="-1" role="dialog" aria-labelledby="myModalLabel">
    <div class="modal-dialog" role="document">
        <div class="modal-content">
            <div class="modal-header">
                <button type="button" class="close" data-dismiss="modal" aria-label="Close"><span aria-hidden="true">&times;</span></button>
            </div>
            <div class="modal-body">
                <div id="bar_show_area"></div>
             </div>
         </div>
    </div>
</div>