Hi @lvbin
Thanks for your repyly. To avoid broken links I directly post the two files into this posting here.
Place both files inside “ArduCAM_USB_Camera_Shield-master/Nvidia_Jetson/Python/Streaming_demo” so download from the arducam shield repo at github, extract … and as said place the two files there.
MultiCam.py:
"""
Multi-Camera-Snapshot class
"""
import sys
import os
import time
from ImageConvert import convert_image
import arducam_config_parser
import ArducamSDK
import threading
import signal
arducam_api_mutex = threading.Lock()
use_arducam_api_mutex = False
class singleCam(threading.Thread): #Class for the Multiple camera thread
def __init__(self,idx):
self.idx = idx
self.handle = {}
self.cfg = {}
self.running = False
self.save_flag = False
self.save_raw = True
self.color_mode = None
self.cam_id = None
self.img = None
self.Width = None
self.Height = None
self.capture_mutex = threading.Lock()
self.camera_name = "camera"
self.captures_done = 0
self.captures_todo = 0
#threads
self.capture_single_image_thread = None
#Method for the USB shield configuration
def configBoard(self,config):
ArducamSDK.Py_ArduCam_setboardConfig(self.handle, config.params[0], \
config.params[1], config.params[2], config.params[3], \
config.params[4:config.params_length])
pass
#Method for the Initial setup of the camera
def camera_initFromFile(self,fileName):
config = arducam_config_parser.LoadConfigFile(fileName)
camera_parameter = config.camera_param.getdict()
self.Width = camera_parameter["WIDTH"]
self.Height = camera_parameter["HEIGHT"]
BitWidth = camera_parameter["BIT_WIDTH"]
ByteLength = 1
if BitWidth > 8 and BitWidth <= 16:
ByteLength = 2
self.save_raw = True
FmtMode = camera_parameter["FORMAT"][0]
self.color_mode = camera_parameter["FORMAT"][1]
#print("color mode",self.color_mode)
# Camera is operated in I2C mode
I2CMode = camera_parameter["I2C_MODE"]
I2cAddr = camera_parameter["I2C_ADDR"]
TransLvl = camera_parameter["TRANS_LVL"]
# Dictionary to store the configuration data
self.cfg = {"u32CameraType":0x00,
"u32Width":self.Width,
"u32Height":self.Height,
"usbType":0,
"u8PixelBytes":ByteLength,
"u16Vid":0,
"u32Size":0,
"u8PixelBits":BitWidth,
"u32I2cAddr":I2cAddr,
"emI2cMode":I2CMode,
"emImageFmtMode":FmtMode,
"u32TransLvl":TransLvl }
# Open the available camera in the USB bus interface with the index
ret,self.handle,rtn_cfg = ArducamSDK.Py_ArduCam_open(self.cfg, self.idx)
#ret,handle,rtn_cfg = ArducamSDK.Py_ArduCam_autoopen(cfg)
#print(rtn_cfg)
#If opened successfully, send the configuration information to the USB board
if ret == 0:
#ArducamSDK.Py_ArduCam_writeReg_8_8(handle,0x46,3,0x00)
usb_version = rtn_cfg['usbType']
configs = config.configs
#print(configs)
configs_length = config.configs_length
#print(configs_length)
for i in range(configs_length):
type = configs[i].type
if ((type >> 16) & 0xFF) != 0 and ((type >> 16) & 0xFF) != usb_version:
continue
if type & 0xFFFF == arducam_config_parser.CONFIG_TYPE_REG:
ArducamSDK.Py_ArduCam_writeSensorReg(self.handle, configs[i].params[0], configs[i].params[1])
elif type & 0xFFFF == arducam_config_parser.CONFIG_TYPE_DELAY:
time.sleep(float(configs[i].params[0])/1000)
elif type & 0xFFFF == arducam_config_parser.CONFIG_TYPE_VRCMD:
self.configBoard(configs[i])
#brightness
#0x3012 = "Coarse Integration Time"
ArducamSDK.Py_ArduCam_writeSensorReg(self.handle, 0x3012, 1000)
ArducamSDK.Py_ArduCam_registerCtrls(self.handle, config.controls, config.controls_length)
#ArducamSDK.Py_ArduCam_setCtrl(self.handle, "setFramerate", 5)
rtn_val,self.cam_id = ArducamSDK.Py_ArduCam_readUserData(self.handle,0x400-16, 16)
self.cam_id = convert_serial(self.cam_id) #str(self.cam_id)
return True
else:
print("Failed to open camera #%i. rtn_val = %i" % (self.idx, ret))
return False
pass
def capture_single_image(self):
global use_arducam_api_mutex, arducam_api_mutex
#trigger the external trigger via software
ArducamSDK.Py_ArduCam_softTrigger(self.handle)
missed_time = 0
while ArducamSDK.Py_ArduCam_isFrameReady(self.handle) != 1:
#print("Camera #%i: Py_ArduCam_isFrameReady ... waiting." % (self.idx))
time.sleep(0.005)
missed_time += 0.005
#if missed_time > 0:
# print("Camera #%i: Waited %f seconds for isFrameReady()." % (self.idx, missed_time))
if use_arducam_api_mutex:
arducam_api_mutex.acquire()
rtn_val,data,rtn_cfg = ArducamSDK.Py_ArduCam_getSingleFrame(self.handle, 1000)
if rtn_val != 0:
print("Camera #%i: Error capturing single image, rtn_val = %i" % (self.idx, rtn_val))
self.running = False
if use_arducam_api_mutex:
arducam_api_mutex.release()
datasize = rtn_cfg['u32Size']
if datasize == 0:
print("Camera #%i: Data length zero" % (self.idx))
return
self.img = convert_image(data,rtn_cfg, self.color_mode)
self.captures_done += 1
def capture_single_image_thread_func(self):
while self.running == True:
if self.captures_todo > self.captures_done:
print("Camera #%i: Received command to capture image." % (self.idx))
time0 = time.time()
self.capture_single_image()
time1 = time.time()
print("Camera #%i: Image capture required %f seconds.." % (self.idx, (time1 - time0)))
def capture_start(self):
if self.running == True:
return
print("capture start")
self.running = True
# start thread
#if self.capture_single_image_thread is not None:
self.capture_single_image_thread.start()
def capture_end(self):
self.running = False
# kill thread
if self.capture_single_image_thread is not None and self.capture_single_image_thread.is_alive():
self.capture_single_image_thread.join()
print("Camera #%i. Single image thread killed successfully." % (self.idx))
def close_cam(self):
rtn_val = ArducamSDK.Py_ArduCam_close(self.handle)
if rtn_val == 0:
print(self.cam_id[:12],"-device close success!")
else:
print(self.cam_id[:12],"-device close fail!")
class MultiCam():
cams = []
cam_ok = []
cam_count = 0
config_file_name = ""
def __init__(self) -> None:
pass
def set_config(self, config):
self.config_file_name = config
def scan_cameras(self):
print("Scanning for cameras...")
# Scan for the number of camera are available on the USB bus interface
self.cam_count, handles, serial_numbers = ArducamSDK.Py_ArduCam_scan()
# TODO: umwandeln in "add_camera(...)" und dann hier fuer jede gefundene
# camera "add_camera()" nutzen
# Creating the objects for the camera's available
self.cam_ok = [False for i in range(self.cam_count)]
self.cams = [None for i in range(self.cam_count)]
for i in range(self.cam_count):
self.cams[i] = singleCam(handles[i])
print("Scanned for cameras... %i found." % (self.cam_count))
# Sleep time of atleast one second is required between scan and open of the camera
time.sleep(1)
def initialize_camera(self, cam, cam_name):
if cam.camera_initFromFile(self.config_file_name):
cam.camera_name = cam_name
print("camera '%s' initialized. Serial='%s'" % (cam.camera_name, cam.cam_id))
else:
print("camera '%s' failed to initialize." % (cam.camera_name))
return False
# Set the mode of operation - in our case it is continuous mode for the streaming purposes
rtn_val = ArducamSDK.Py_ArduCam_setMode(cam.handle, ArducamSDK.EXTERNAL_TRIGGER_MODE)
#ArducamSDK.Py_ArduCam_setMode(cam.handle,ArducamSDK.CONTINUOUS_MODE)
# Define the thread
cam.capture_single_image_thread = threading.Thread(target=cam.capture_single_image_thread_func)
return True
def initialize_cameras(self):
# Check if initialization setup is successful before starting the capture thread
for i in range(self.cam_count):
cam = self.cams[i]
if not self.initialize_camera(cam, "Kamera " + str(i+1)):
#break as soon as one camera fails
break
# wait a bit before giving back control to others
time.sleep(1)
def start_cameras(self):
if self.cam_count == 0:
return False
# Start Threads
for cam in self.cams:
if cam.running:
cam.capture_end()
cam.capture_start()
#capture an image from all cams - one after another
def capture_single_shot(self):
for cam in self.cams:
restart = cam.running
if restart:
cam.capture_end()
#actually fetch the image of this cam
cam.capture_single_image()
if restart:
cam.capture_start()
def close_cameras(self):
for i in range(self.cam_count):
self.cams[i].capture_end()
self.cams[i].close_cam()
def convert_serial(byteArray):
return (chr(byteArray[0]) + chr(byteArray[1]) + chr(byteArray[2]) + chr(byteArray[3])
+ "-" + chr(byteArray[4]) + chr(byteArray[5]) + chr(byteArray[6]) + chr(byteArray[7])
+ "-" + chr(byteArray[8]) + chr(byteArray[9]) + chr(byteArray[10]) + chr(byteArray[11])
)
def sigint_handler(signum, frame):
#global running,handle
#running = False
exit()
signal.signal(signal.SIGINT, sigint_handler)
#signal.signal(signal.SIGHUP, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
if __name__ == "__main__":
config_file_name = "/home/nano/Tests/Multicam/ArduCAM_USB_Camera_Shield-master/Config/USB2.0_UC-391_Rev.E/DVP/MT9J003/MT9J003_RAW_8b_1920x1080_20fps.cfg"
if len(sys.argv) > 1:
if os.path.exists(sys.argv[1]):
config_file_name = sys.argv[1]
else:
print("Config file does not exist.")
mc = MultiCam()
mc.set_config(config_file_name)
mc.scan_cameras()
mc.initialize_cameras()
mc.start_cameras()
print ("Ready to fetch data ...")
mc.close_cameras()
print("Done.")
MultiCam_TestWindow.py
import sys
import MultiCam
import cv2
import time
if __name__ == "__main__":
config_file_name = "/home/nano/Tests/Multicam/ArduCAM_USB_Camera_Shield-master/Config/USB2.0_UC-391_Rev.E/DVP/MT9J003/MT9J003_RAW_8b_1920x1080_20fps.cfg"
if len(sys.argv) > 1:
if os.path.exists(sys.argv[1]):
config_file_name = sys.argv[1]
else:
print("Config file does not exist.")
mc = MultiCam.MultiCam()
mc.set_config(config_file_name)
mc.scan_cameras()
mc.initialize_cameras()
mc.start_cameras()
print ("Starting loop")
# Exception handling for the termination of the program
try:
exit_loop = False
captures_done = 0
captures_todo = 0
next_snapshot_interval = 1000 #ms
next_snapshot_time = time.time() #now
snapshot_request_time = None
while(not exit_loop and mc.cam_count > 0):
time.sleep(0.10)
#check if one camera is still running/snapshotting
all_done_snapshotting = True
for cam in mc.cams:
if cam.captures_todo > cam.captures_done:
all_done_snapshotting = False
break
#current capture done?
if all_done_snapshotting and captures_todo > captures_done:
for cam in mc.cams:
image = cv2.resize(cam.img, (640,480), interpolation = cv2.INTER_LINEAR)
cv2.imshow(cam.camera_name, image)
cv2.waitKey(10)
captures_done += 1
print("Capture done by all cameras in %f seconds." % ((time.time() - snapshot_request_time)))
#time for a new snapshot?
if all_done_snapshotting and next_snapshot_time <= time.time():
print("Time for a new snapshot request...")
captures_todo += 1
for cam in mc.cams:
cam.captures_todo += 1
snapshot_request_time = time.time()
#print("Tick ... time=%f all_done_snapshotting=%s" % (time.time(), done_string))
#cv2.waitKey(1)
#cv2.imwrite('images1/camera1.jpg',cam1.img)
# Kill the threads and exit the program when Keyboard interrupt happens
except KeyboardInterrupt:
print('Exception called')
mc.close_cameras()
cv2.destroyAllWindows()
print("Done.")
- attach 4 cameras on the same hub (so they share a “USB2 lane”).
- open “MultiCam_TestWindow.py” and adjust the path to the config file (and save…)
- execute “MultiCam_TestWindow.py”
- wait for it to “stuck” in one of the loops, might happen right on the first - or maybe on the second or third (seems like “collisions” - like in non-thread-safe environments)
Open “MultiCam.py” and set use_arducam_api_mutex = False
to True
so it places a mutex around ArducamSDK.Py_ArduCam_getSingleFrame(self.handle, 1000)
.
If you need further information, do not hesitate to ask.