#!/usr/bin/env python3 import sys import os import numpy as np import cv2 import time import ctypes import multiprocessing from astropy.coordinates import EarthLocation from astropy.time import Time from astropy.io import fits import astropy.units as u from stvid.utils import get_sunset_and_sunrise import logging import configparser import argparse import zwoasi as asi # Capture images from pi def capture_pi(image_queue, z1, t1, z2, t2, nx, ny, nz, tend, device_id, live, cfg): from picamerax.array import PiRGBArray from picamerax import PiCamera # Intialization first = True slow_CPU = False # Initialize cv2 device camera = PiCamera(sensor_mode=2) camera.resolution = (nx, ny) # Turn off any thing automatic. camera.exposure_mode = 'off' camera.awb_mode = 'off' # ISO needs to be 0 otherwise analog and digital gain won't work. camera.iso = 0 # set the camea settings camera.framerate = cfg.getfloat(camera_type, 'framerate') camera.awb_gains = (cfg.getfloat(camera_type, 'awb_gain_red'), cfg.getfloat(camera_type, 'awb_gain_blue')) camera.analog_gain = cfg.getfloat(camera_type, 'analog_gain') camera.digital_gain = cfg.getfloat(camera_type, 'digital_gain') camera.shutter_speed = cfg.getint(camera_type, 'exposure') rawCapture = PiRGBArray(camera, size=(nx, ny)) # allow the camera to warmup time.sleep(0.1) try: # Loop until reaching end time while float(time.time()) < tend: # Wait for available capture buffer to become available if (image_queue.qsize() > 1): logger.warning("Acquiring data faster than your CPU can process") slow_CPU = True while (image_queue.qsize() > 1): time.sleep(0.1) if slow_CPU: lost_video = time.time() - t logger.info("Waited %.3fs for available capture buffer" % lost_video) slow_CPU = False # Get frames i = 0 for frameA in camera.capture_continuous(rawCapture, format="bgr", use_video_port=True): # Store start time t0 = float(time.time()) # grab the raw NumPy array representing the image, then initialize the timestamp frame = frameA.array # Compute mid time t = (float(time.time())+t0)/2.0 # Skip lost frames if frame is not None: # Convert image to grayscale z = np.asarray(cv2.cvtColor( frame, cv2.COLOR_BGR2GRAY)).astype(np.uint8) # optionally rotate the frame by 2 * 90 degrees. # z = np.rot90(z, 2) # Display Frame if live is True: cv2.imshow("Capture", z) cv2.waitKey(1) # Store results if first: z1[i] = z t1[i] = t else: z2[i] = z t2[i] = t # clear the stream in preparation for the next frame rawCapture.truncate(0) # count up to nz frames, then break out of the for loop. i += 1 if i >= nz: break if first: buf = 1 else: buf = 2 image_queue.put(buf) logger.debug("Captured z%d" % buf) # Swap flag first = not first reason = "Session complete" except KeyboardInterrupt: print() reason = "Keyboard interrupt" except ValueError as e: logger.error("%s" % e) reason = "Wrong image dimensions? Fix nx, ny in config." finally: # End capture logger.info("Capture: %s - Exiting" % reason) camera.close() # Capture images from cv2 def capture_cv2(image_queue, z1, t1, z2, t2, nx, ny, nz, tend, device_id, live): # Intialization first = True slow_CPU = False # Initialize cv2 device device = cv2.VideoCapture(device_id) # Set properties device.set(3, nx) device.set(4, ny) try: # Loop until reaching end time while float(time.time()) < tend: # Wait for available capture buffer to become available if (image_queue.qsize() > 1): logger.warning("Acquiring data faster than your CPU can process") slow_CPU = True while (image_queue.qsize() > 1): time.sleep(0.1) if slow_CPU: lost_video = time.time() - t logger.info("Waited %.3fs for available capture buffer" % lost_video) slow_CPU = False # Get frames for i in range(nz): # Store start time t0 = float(time.time()) # Get frame res, frame = device.read() # Compute mid time t = (float(time.time())+t0)/2.0 # Skip lost frames if res is True: # Convert image to grayscale z = np.asarray(cv2.cvtColor( frame, cv2.COLOR_BGR2GRAY)).astype(np.uint8) # Display Frame if live is True: cv2.imshow("Capture", z) cv2.waitKey(1) # Store results if first: z1[i] = z t1[i] = t else: z2[i] = z t2[i] = t if first: buf = 1 else: buf = 2 image_queue.put(buf) logger.debug("Captured z%d" % buf) # Swap flag first = not first reason = "Session complete" except KeyboardInterrupt: print() reason = "Keyboard interrupt" except ValueError as e: logger.error("%s" % e) reason = "Wrong image dimensions? Fix nx, ny in config." finally: # End capture logger.info("Capture: %s - Exiting" % reason) device.release() # Capture images def capture_asi(image_queue, z1, t1, z2, t2, nx, ny, nz, tend, device_id, live, cfg): first = True # Array flag slow_CPU = False # Performance issue flag camera_type = "ASI" gain = cfg.getint(camera_type, 'gain') maxgain = cfg.getint(camera_type, 'maxgain') autogain = cfg.getboolean(camera_type, 'autogain') exposure = cfg.getint(camera_type, 'exposure') binning = cfg.getint(camera_type, 'bin') brightness = cfg.getint(camera_type, 'brightness') bandwidth = cfg.getint(camera_type, 'bandwidth') high_speed = cfg.getint(camera_type, 'high_speed') hardware_bin = cfg.getint(camera_type, 'hardware_bin') sdk = cfg.get(camera_type, 'sdk') try: software_bin = cfg.getint(camera_type, 'software_bin') except configparser.Error: software_bin = 0 # Initialize device asi.init(sdk) num_cameras = asi.get_num_cameras() if num_cameras == 0: logger.error("No ZWOASI cameras found") raise ValueError sys.exit() cameras_found = asi.list_cameras() # Models names of the connected cameras if num_cameras == 1: device_id = 0 logger.info("Found one camera: %s" % cameras_found[0]) else: logger.info("Found %d ZWOASI cameras" % num_cameras) for n in range(num_cameras): logger.info(" %d: %s" % (n, cameras_found[n])) logger.info("Using #%d: %s" % (device_id, cameras_found[device_id])) camera = asi.Camera(device_id) camera_info = camera.get_camera_property() logger.debug("ASI Camera info:") for (key, value) in camera_info.items(): logger.debug(" %s : %s" % (key,value)) camera.set_control_value(asi.ASI_BANDWIDTHOVERLOAD, bandwidth) camera.disable_dark_subtract() camera.set_control_value(asi.ASI_GAIN, gain, auto=autogain) camera.set_control_value(asi.ASI_EXPOSURE, exposure, auto=False) camera.set_control_value(asi.ASI_AUTO_MAX_GAIN, maxgain) camera.set_control_value(asi.ASI_AUTO_MAX_BRIGHTNESS, 20) camera.set_control_value(asi.ASI_WB_B, 99) camera.set_control_value(asi.ASI_WB_R, 75) camera.set_control_value(asi.ASI_GAMMA, 50) camera.set_control_value(asi.ASI_BRIGHTNESS, brightness) camera.set_control_value(asi.ASI_FLIP, 0) try: camera.set_control_value(asi.ASI_HIGH_SPEED_MODE, high_speed) except: pass try: camera.set_control_value(asi.ASI_HARDWARE_BIN, hardware_bin) except: pass camera.set_roi(bins=binning) camera.start_video_capture() camera.set_image_type(asi.ASI_IMG_RAW8) try: # Fix autogain if autogain: while True: # Get frame z = camera.capture_video_frame() # Break on no change in gain settings = camera.get_control_values() if gain == settings["Gain"]: break gain = settings["Gain"] camera.set_control_value(asi.ASI_GAIN, gain, auto=autogain) # Loop until reaching end time while float(time.time()) < tend: # Wait for available capture buffer to become available if (image_queue.qsize() > 1): logger.warning("Acquiring data faster than your CPU can process") slow_CPU = True while (image_queue.qsize() > 1): time.sleep(0.1) if slow_CPU: lost_video = time.time() - t logger.info("Waited %.3fs for available capture buffer" % lost_video) slow_CPU = False # Get settings settings = camera.get_control_values() gain = settings["Gain"] temp = settings["Temperature"]/10.0 logger.info("Capturing frame with gain %d, temperature %.1f" % (gain, temp)) # Set gain if autogain: camera.set_control_value(asi.ASI_GAIN, gain, auto=autogain) # Get frames for i in range(nz): # Store start time t0 = float(time.time()) # Get frame z = camera.capture_video_frame() # Apply software binning if software_bin > 1: my, mx = z.shape z = cv2.resize(z, (mx // software_bin, my // software_bin)) # Compute mid time t = (float(time.time())+t0)/2.0 # Display Frame if live is True: cv2.imshow("Capture", z) cv2.waitKey(1) # Store results if first: z1[i] = z t1[i] = t else: z2[i] = z t2[i] = t if first: buf = 1 else: buf = 2 image_queue.put(buf) logger.debug("Captured buffer %d (%dx%dx%d)" % (buf, nx, ny, nz)) # Swap flag first = not first reason = "Session complete" except KeyboardInterrupt: print() reason = "Keyboard interrupt" except ValueError as e: logger.error("%s" % e) reason = "Wrong image dimensions? Fix nx, ny in config." except MemoryError as e: logger.error("Capture: Memory error %s" % e) finally: # End capture logger.info("Capture: %s - Exiting" % reason) camera.stop_video_capture() camera.close() # Main function if __name__ == '__main__': # Read commandline options conf_parser = argparse.ArgumentParser(description='Preview' + ' live video frames.') conf_parser.add_argument('-c', '--conf_file', help="Specify configuration file. If no file" + " is specified 'configuration.ini' is used.", metavar="FILE") args = conf_parser.parse_args() # Process commandline options and parse configuration cfg = configparser.ConfigParser(inline_comment_prefixes=('#', ';')) conf_file = args.conf_file if args.conf_file else "configuration.ini" result = cfg.read([conf_file]) if not result: print("Could not read config file: %s\nExiting..." % conf_file) sys.exit() # Setup logging logFormatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] " + "[%(levelname)-5.5s] %(message)s") logger = logging.getLogger() # Generate directory path = os.path.abspath(cfg.get('Common', 'observations_path')) if not os.path.exists(path): try: os.makedirs(path) except PermissionError: logger.error("Can not create observations_path: %s" % path) sys.exit() fileHandler = logging.FileHandler(os.path.join(path, "preview.log")) fileHandler.setFormatter(logFormatter) logger.addHandler(fileHandler) consoleHandler = logging.StreamHandler(sys.stdout) consoleHandler.setFormatter(logFormatter) logger.addHandler(consoleHandler) logger.setLevel(logging.DEBUG) logger.info("Using config: %s" % conf_file) # Testing mode test = True testing = True test_duration = 3600 logger.info("Preview duration: %ds" % test_duration) # Live mode live = True # Get camera type camera_type = cfg.get('Camera', 'camera_type') # Get device id device_id = cfg.getint(camera_type, 'device_id') # Current time tnow = Time.now() # Set location loc = EarthLocation(lat=cfg.getfloat('Common', 'observer_lat')*u.deg, lon=cfg.getfloat('Common', 'observer_lon')*u.deg, height=cfg.getfloat('Common', 'observer_height')*u.m) tend = tnow + test_duration*u.s logger.info("Starting data acquisition") logger.info("Acquisition will end after "+tend.isot) # Get settings nx = cfg.getint(camera_type, 'nx') ny = cfg.getint(camera_type, 'ny') nz = cfg.getint(camera_type, 'nframes') # Initialize arrays z1base = multiprocessing.Array(ctypes.c_uint8, nx*ny*nz) z1 = np.ctypeslib.as_array(z1base.get_obj()).reshape(nz, ny, nx) t1base = multiprocessing.Array(ctypes.c_double, nz) t1 = np.ctypeslib.as_array(t1base.get_obj()) z2base = multiprocessing.Array(ctypes.c_uint8, nx*ny*nz) z2 = np.ctypeslib.as_array(z2base.get_obj()).reshape(nz, ny, nx) t2base = multiprocessing.Array(ctypes.c_double, nz) t2 = np.ctypeslib.as_array(t2base.get_obj()) image_queue = multiprocessing.Queue() # Set processes if camera_type == "PI": pcapture = multiprocessing.Process(target=capture_pi, args=(image_queue, z1, t1, z2, t2, nx, ny, nz, tend.unix, device_id, live, cfg)) elif camera_type == "CV2": pcapture = multiprocessing.Process(target=capture_cv2, args=(image_queue, z1, t1, z2, t2, nx, ny, nz, tend.unix, device_id, live)) elif camera_type == "ASI": pcapture = multiprocessing.Process(target=capture_asi, args=(image_queue, z1, t1, z2, t2, nx, ny, nz, tend.unix, device_id, live, cfg)) # Start pcapture.start() # End try: pcapture.join() except (KeyboardInterrupt, ValueError): time.sleep(0.1) # Allow a little time for a graceful exit except MemoryError as e: logger.error("Memory error %s" % e) finally: pcapture.terminate() # Release device if live is True: cv2.destroyAllWindows()