1. 程式人生 > 程式設計 >使用OpenCV實現道路車輛計數的使用方法

使用OpenCV實現道路車輛計數的使用方法

今天,我們將一起探討如何基於計算機視覺實現道路交通計數。

使用OpenCV實現道路車輛計數的使用方法

在本教程中,我們將僅使用Python和OpenCV,並藉助背景減除演算法非常簡單地進行運動檢測。

我們將從以下四個方面進行介紹:

1. 用於物體檢測的背景減法演算法主要思想。

2. OpenCV影象過濾器。

3. 利用輪廓檢測物體。

4. 建立進一步資料處理的結構。

背景扣除演算法

使用OpenCV實現道路車輛計數的使用方法

有許多不同的背景扣除演算法,但是它們的主要思想都很簡單。

假設有一個房間的視訊,在某些幀上沒有人和寵物,那麼此時的視訊基本為靜態的,我們將其稱為背景(background_layer)。因此要獲取在視訊上移動的物件,我們只需要:用當前幀減去背景即可。

由於光照變化,人為移動物體,或者始終存在移動的人和寵物,我們將無法獲得靜態幀。在這種情況下,我們從視訊中選出一些影象幀,如果絕大多數影象幀中都具有某個相同的畫素點,則此將畫素作為background_layer中的一部分。

我們將使用MOG演算法進行背景扣除

使用OpenCV實現道路車輛計數的使用方法

原始幀

程式碼如下所示:

import os
import logging
import logging.handlers
import random

import numpy as np
import skvideo.io
import cv2
import matplotlib.pyplot as plt

import utils
# without this some strange errors happen
cv2.ocl.setUseOpenCL(False)
random.seed(123)

# ============================================================================
IMAGE_DIR = "./out"
VIDEO_SOURCE = "input.mp4"
SHAPE = (720,1280) # HxW
# ============================================================================

def train_bg_subtractor(inst,cap,num=500):
  '''
    BG substractor need process some amount of frames to start giving result
  '''
  print ('Training BG Subtractor...')
  i = 0
  for frame in cap:
    inst.apply(frame,None,0.001)
    i += 1
    if i >= num:
      return cap

def main():
  log = logging.getLogger("main")

  # creting MOG bg subtractor with 500 frames in cache
  # and shadow detction
  bg_subtractor = cv2.createBackgroundSubtractorMOG2(
    history=500,detectShadows=True)

  # Set up image source
  # You can use also CV2,for some reason it not working for me
  cap = skvideo.io.vreader(VIDEO_SOURCE)

  # skipping 500 frames to train bg subtractor
  train_bg_subtractor(bg_subtractor,num=500)

  frame_number = -1
  for frame in cap:
    if not frame.any():
      log.error("Frame capture failed,stopping...")
      break

    frame_number += 1
    utils.save_frame(frame,"./out/frame_%04d.png" % frame_number)
    fg_mask = bg_subtractor.apply(frame,0.001)
    utils.save_frame(frame,"./out/fg_mask_%04d.png" % frame_number)
# ============================================================================

if __name__ == "__main__":
  log = utils.init_logging()

  if not os.path.exists(IMAGE_DIR):
    log.debug("Creating image directory `%s`...",IMAGE_DIR)
    os.makedirs(IMAGE_DIR)

  main()

處理後得到下面的前景影象

使用OpenCV實現道路車輛計數的使用方法

去除背景後的前景影象

我們可以看出前景影象上有一些噪音,可以通過標準濾波技術可以將其消除。

濾波

針對我們現在的情況,我們將需要以下濾波函式:Threshold、Erode、Dilate、Opening、Closing。

首先,我們使用“Closing”來移除區域中的間隙,然後使用“Opening”來移除個別獨立的畫素點,然後使用“Dilate”進行擴張以使物件變粗。程式碼如下:

def filter_mask(img):
  kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE,(2,2))
  # Fill any small holes
  closing = cv2.morphologyEx(img,cv2.MORPH_CLOSE,kernel)
  # Remove noise
  opening = cv2.morphologyEx(closing,cv2.MORPH_OPEN,kernel)
  # Dilate to merge adjacent blobs
  dilation = cv2.dilate(opening,kernel,iterations=2)
  # threshold
  th = dilation[dilation < 240] = 0
  return th

處理後的前景如下:

使用OpenCV實現道路車輛計數的使用方法

利用輪廓進行物體檢測

我們將使用cv2.findContours函式對輪廓進行檢測。我們在使用的時候可以選擇的引數為:

cv2.CV_RETR_EXTERNAL------僅獲取外部輪廓。

cv2.CV_CHAIN_APPROX_TC89_L1------使用Teh-Chin鏈逼近演算法(更快)

程式碼如下:

def get_centroid(x,y,w,h):
   x1 = int(w / 2)
   y1 = int(h / 2)
   cx = x + x1
   cy = y + y1
   return (cx,cy)
 
 def detect_vehicles(fg_mask,min_contour_width=35,min_contour_height=35):
   matches = []
   # finding external contours
   im,contours,hierarchy = cv2.findContours(
     fg_mask,cv2.RETR_EXTERNAL,cv2.CHAIN_APPROX_TC89_L1)
   # filtering by with,height
   for (i,contour) in enumerate(contours):
     (x,h) = cv2.boundingRect(contour)
     contour_valid = (w >= min_contour_width) and (
       h >= min_contour_height)
     if not contour_valid:
       continue
     # getting center of the bounding box
     centroid = get_centroid(x,h)
     matches.append(((x,h),centroid))
   return matches

建立資料處理框架

我們都知道在ML和CV中,沒有一個演算法可以處理所有問題。即使存在這種演算法,我們也不會使用它,因為它很難大規模有效。例如幾年前Netflix公司用300萬美元的獎金懸賞最佳電影推薦演算法。有一個團隊完成這個任務,但是他們的推薦演算法無法大規模執行,因此其實對公司毫無用處。但是,Netflix公司仍獎勵了他們100萬美元。

接下來我們來建立解決當前問題的框架,這樣可以使資料的處理更加方便

class PipelineRunner(object):
   '''
     Very simple pipline.
     Just run passed processors in order with passing context from one to 
     another.
     You can also set log level for processors.
   '''
   def __init__(self,pipeline=None,log_level=logging.DEBUG):
     self.pipeline = pipeline or []
     self.context = {}
     self.log = logging.getLogger(self.__class__.__name__)
     self.log.setLevel(log_level)
     self.log_level = log_level
     self.set_log_level()
   def set_context(self,data):
     self.context = data
   def add(self,processor):
     if not isinstance(processor,PipelineProcessor):
       raise Exception(
         'Processor should be an isinstance of PipelineProcessor.')
     processor.log.setLevel(self.log_level)
     self.pipeline.append(processor)
 
   def remove(self,name):
     for i,p in enumerate(self.pipeline):
       if p.__class__.__name__ == name:
         del self.pipeline[i]
         return True
     return False
 
   def set_log_level(self):
     for p in self.pipeline:
       p.log.setLevel(self.log_level)
 
   def run(self):
     for p in self.pipeline:
       self.context = p(self.context) 
     self.log.debug("Frame #%d processed.",self.context['frame_number'])
     return self.context
 
 class PipelineProcessor(object):
   '''
     Base class for processors.
   '''
   def __init__(self):
     self.log = logging.getLogger(self.__class__.__name__)

首先我們獲取一張處理器執行順序的列表,讓每個處理器完成一部分工作,在案順序完成執行以獲得最終結果。

我們首先建立輪廓檢測處理器。輪廓檢測處理器只需將前面的背景扣除,濾波和輪廓檢測部分合並在一起即可,程式碼如下所示:

class ContourDetection(PipelineProcessor):
   '''
     Detecting moving objects.
     Purpose of this processor is to subtrac background,get moving objects
     and detect them with a cv2.findContours method,and then filter off-by
     width and height. 
     bg_subtractor - background subtractor isinstance.
     min_contour_width - min bounding rectangle width.
     min_contour_height - min bounding rectangle height.
     save_image - if True will save detected objects mask to file.
     image_dir - where to save images(must exist).    
   '''
 
   def __init__(self,bg_subtractor,min_contour_height=35,save_image=False,image_dir='images'):
     super(ContourDetection,self).__init__()
     self.bg_subtractor = bg_subtractor
     self.min_contour_width = min_contour_width
     self.min_contour_height = min_contour_height
     self.save_image = save_image
     self.image_dir = image_dir
 
   def filter_mask(self,img,a=None):
     '''
       This filters are hand-picked just based on visual tests
     '''
     kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE,2))
     # Fill any small holes
     closing = cv2.morphologyEx(img,kernel)
     # Remove noise
     opening = cv2.morphologyEx(closing,kernel)
     # Dilate to merge adjacent blobs
     dilation = cv2.dilate(opening,iterations=2)
     return dilation
 
   def detect_vehicles(self,fg_mask,context):
     matches = []
     # finding external contours
     im2,hierarchy = cv2.findContours(
       fg_mask,cv2.CHAIN_APPROX_TC89_L1)
     for (i,contour) in enumerate(contours):
       (x,h) = cv2.boundingRect(contour)
       contour_valid = (w >= self.min_contour_width) and (
         h >= self.min_contour_height)
       if not contour_valid:
         continue
       centroid = utils.get_centroid(x,h)
       matches.append(((x,centroid))
     return matches
 
   def __call__(self,context):
     frame = context['frame'].copy()
     frame_number = context['frame_number']
     fg_mask = self.bg_subtractor.apply(frame,0.001)
     # just thresholding values
     fg_mask[fg_mask < 240] = 0
     fg_mask = self.filter_mask(fg_mask,frame_number)
     if self.save_image:
       utils.save_frame(fg_mask,self.image_dir +
                "/mask_%04d.png" % frame_number,flip=False)
     context['objects'] = self.detect_vehicles(fg_mask,context)
     context['fg_mask'] = fg_mask
     return contex

現在,讓我們建立一個處理器,該處理器將找出不同的幀上檢測到的相同物件,建立路徑,並對到達出口區域的車輛進行計數。程式碼如下所示:

  '''
    Counting vehicles that entered in exit zone.

    Purpose of this class based on detected object and local cache create
    objects pathes and count that entered in exit zone defined by exit masks.

    exit_masks - list of the exit masks.
    path_size - max number of points in a path.
    max_dst - max distance between two points.
  '''

  def __init__(self,exit_masks=[],path_size=10,max_dst=30,x_weight=1.0,y_weight=1.0):
    super(VehicleCounter,self).__init__()

    self.exit_masks = exit_masks

    self.vehicle_count = 0
    self.path_size = path_size
    self.pathes = []
    self.max_dst = max_dst
    self.x_weight = x_weight
    self.y_weight = y_weight

  def check_exit(self,point):
    for exit_mask in self.exit_masks:
      try:
        if exit_mask[point[1]][point[0]] == 255:
          return True
      except:
        return True
    return False

  def __call__(self,context):
    objects = context['objects']
    context['exit_masks'] = self.exit_masks
    context['pathes'] = self.pathes
    context['vehicle_count'] = self.vehicle_count
    if not objects:
      return context

    points = np.array(objects)[:,0:2]
    points = points.tolist()

    # add new points if pathes is empty
    if not self.pathes:
      for match in points:
        self.pathes.append([match])

    else:
      # link new points with old pathes based on minimum distance between
      # points
      new_pathes = []

      for path in self.pathes:
        _min = 999999
        _match = None
        for p in points:
          if len(path) == 1:
            # distance from last point to current
            d = utils.distance(p[0],path[-1][0])
          else:
            # based on 2 prev points predict next point and calculate
            # distance from predicted next point to current
            xn = 2 * path[-1][0][0] - path[-2][0][0]
            yn = 2 * path[-1][0][1] - path[-2][0][1]
            d = utils.distance(
              p[0],(xn,yn),x_weight=self.x_weight,y_weight=self.y_weight
            )

          if d < _min:
            _min = d
            _match = p

        if _match and _min <= self.max_dst:
          points.remove(_match)
          path.append(_match)
          new_pathes.append(path)

        # do not drop path if current frame has no matches
        if _match is None:
          new_pathes.append(path)

      self.pathes = new_pathes

      # add new pathes
      if len(points):
        for p in points:
          # do not add points that already should be counted
          if self.check_exit(p[1]):
            continue
          self.pathes.append([p])

    # save only last N points in path
    for i,_ in enumerate(self.pathes):
      self.pathes[i] = self.pathes[i][self.path_size * -1:]

    # count vehicles and drop counted pathes:
    new_pathes = []
    for i,path in enumerate(self.pathes):
      d = path[-2:]

      if (
        # need at list two points to count
        len(d) >= 2 and
        # prev point not in exit zone
        not self.check_exit(d[0][1]) and
        # current point in exit zone
        self.check_exit(d[1][1]) and
        # path len is bigger then min
        self.path_size <= len(path)
      ):
        self.vehicle_count += 1
      else:
        # prevent linking with path that already in exit zone
        add = True
        for p in path:
          if self.check_exit(p[1]):
            add = False
            break
        if add:
          new_pathes.append(path)

    self.pathes = new_pathes

    context['pathes'] = self.pathes
    context['objects'] = objects
    context['vehicle_count'] = self.vehicle_count

    self.log.debug('#VEHICLES FOUND: %s' % self.vehicle_count)

    return context

上面的程式碼有點複雜,因此讓我們一個部分一個部分的介紹一下。

使用OpenCV實現道路車輛計數的使用方法

上面的影象中綠色的部分是出口區域。我們在這裡對車輛進行計數,只有當車輛移動的長度超過3個點我們才進行計算

我們使用掩碼來解決這個問題,因為它比使用向量演算法有效且簡單得多。只需使用“二進位制和”即可選出車輛區域中點。設定方式如下:

EXIT_PTS = np.array([
   [[732,720],[732,590],[1280,500],720]],[[0,400],[645,0],[0,0]]
 ])
 
 base = np.zeros(SHAPE + (3,),dtype='uint8')
 exit_mask = cv2.fillPoly(base,EXIT_PTS,(255,255,255))[:,:,0]

現在我們將檢測到的點連結起來。

對於第一幀影象,我們將所有點均新增為新路徑。

接下來,如果len(path)== 1,我們在新檢測到的物件中找到與每條路徑最後一點距離最近的物件。

如果len(path)> 1,則使用路徑中的最後兩個點,即在同一條線上預測新點,並找到該點與當前點之間的最小距離。

具有最小距離的點將新增到當前路徑的末端並從列表中刪除。如果在此之後還剩下一些點,我們會將其新增為新路徑。這個過程中我們還會限制路徑中的點數。

new_pathes = []
 for path in self.pathes:
   _min = 999999
   _match = None
   for p in points:
     if len(path) == 1:
       # distance from last point to current
       d = utils.distance(p[0],path[-1][0])
     else:
       # based on 2 prev points predict next point and calculate
       # distance from predicted next point to current
       xn = 2 * path[-1][0][0] - path[-2][0][0]
       yn = 2 * path[-1][0][1] - path[-2][0][1]
       d = utils.distance(
         p[0],y_weight=self.y_weight
       )
 
     if d < _min:
       _min = d
       _match = p
 
   if _match and _min <= self.max_dst:
     points.remove(_match)
     path.append(_match)
     new_pathes.append(path)
 
   # do not drop path if current frame has no matches
   if _match is None:
     new_pathes.append(path)
 
 self.pathes = new_pathes
 
 # add new pathes
 if len(points):
   for p in points:
     # do not add points that already should be counted
     if self.check_exit(p[1]):
       continue
     self.pathes.append([p])
 
 # save only last N points in path
 for i,_ in enumerate(self.pathes):
   self.pathes[i] = self.pathes[i][self.path_size * -1:]

現在,我們將嘗試計算進入出口區域的車輛。為此,我們需獲取路徑中的最後2個點,並檢查len(path)是否應大於限制。

# count vehicles and drop counted pathes:
  new_pathes = []
  for i,path in enumerate(self.pathes):
    d = path[-2:]
    if (
      # need at list two points to count
      len(d) >= 2 and
      # prev point not in exit zone
      not self.check_exit(d[0][1]) and
      # current point in exit zone
      self.check_exit(d[1][1]) and
      # path len is bigger then min
      self.path_size <= len(path)
    ):
      self.vehicle_count += 1
    else:
      # prevent linking with path that already in exit zone
      add = True
      for p in path:
        if self.check_exit(p[1]):
          add = False
          break
      if add:
        new_pathes.append(path)
  self.pathes = new_pathes
  
  context['pathes'] = self.pathes
  context['objects'] = objects
  context['vehicle_count'] = self.vehicle_count 
  self.log.debug('#VEHICLES FOUND: %s' % self.vehicle_count)
  return context

最後兩個處理器是CSV編寫器,用於建立報告CSV檔案,以及用於除錯和精美圖片的視覺化。

class CsvWriter(PipelineProcessor):
    def __init__(self,path,name,start_time=0,fps=15):
      super(CsvWriter,self).__init__()
      self.fp = open(os.path.join(path,name),'w')
      self.writer = csv.DictWriter(self.fp,fieldnames=['time','vehicles'])
      self.writer.writeheader()
      self.start_time = start_time
      self.fps = fps
      self.path = path
      self.name = name
      self.prev = None
    def __call__(self,context):
      frame_number = context['frame_number']
      count = _count = context['vehicle_count']
      if self.prev:
        _count = count - self.prev
      time = ((self.start_time + int(frame_number / self.fps)) * 100
          + int(100.0 / self.fps) * (frame_number % self.fps))
      self.writer.writerow({'time': time,'vehicles': _count})
      self.prev = count
      return context
  class Visualizer(PipelineProcessor):
    def __init__(self,save_image=True,image_dir='images'):
      super(Visualizer,self).__init__()
      self.save_image = save_image
      self.image_dir = image_dir
    def check_exit(self,point,exit_masks=[]):
      for exit_mask in exit_masks:
        if exit_mask[point[1]][point[0]] == 255:
          return True
      return False
    def draw_pathes(self,pathes):
      if not img.any():
        return
      for i,path in enumerate(pathes):
        path = np.array(path)[:,1].tolist()
        for point in path:
          cv2.circle(img,2,CAR_COLOURS[0],-1)
          cv2.polylines(img,[np.int32(path)],False,1)
      return img
    def draw_boxes(self,pathes,exit_masks=[]):
      for (i,match) in enumerate(pathes):
        contour,centroid = match[-1][:2]
        if self.check_exit(centroid,exit_masks):
          continue
        x,h = contour
        cv2.rectangle(img,(x,y),(x + w - 1,y + h - 1),BOUNDING_BOX_COLOUR,1)
        cv2.circle(img,centroid,CENTROID_COLOUR,-1)
      return img
    def draw_ui(self,vehicle_count,exit_masks=[]):
      # this just add green mask with opacity to the image
      for exit_mask in exit_masks:
        _img = np.zeros(img.shape,img.dtype)
        _img[:,:] = EXIT_COLOR
        mask = cv2.bitwise_and(_img,_img,mask=exit_mask)
        cv2.addWeighted(mask,1,img)
      # drawing top block with counts
      cv2.rectangle(img,(0,0),(img.shape[1],50),cv2.FILLED)
      cv2.putText(img,("Vehicles passed: {total} ".format(total=vehicle_count)),(30,30),cv2.FONT_HERSHEY_SIMPLEX,0.7,255),1)
      return img
    def __call__(self,context):
      frame = context['frame'].copy()
      frame_number = context['frame_number']
      pathes = context['pathes']
      exit_masks = context['exit_masks']
      vehicle_count = context['vehicle_count']
      frame = self.draw_ui(frame,exit_masks)
      frame = self.draw_pathes(frame,pathes)
      frame = self.draw_boxes(frame,exit_masks)
      utils.save_frame(frame,self.image_dir +
               "/processed_%04d.png" % frame_number)
      return context

結論

正如我們看到的那樣,它並不像許多人想象的那麼難。但是,如果小夥伴執行指令碼,小夥伴會發現此解決方案並不理想,存在前景物件存在重疊的問題,並且它也沒有按型別對車輛進行分類。但是,當相機有較好位置,例如位於道路正上方時,該演算法具有很好的準確性。

到此這篇關於使用OpenCV實現道路車輛計數的使用方法的文章就介紹到這了,更多相關OpenCV 道路車輛計數內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!