#------------------------------------------------------------------------------- # Name: downloadProcess.py # Purpose: The purpose of this script is to discover ONC Data for download # and save the data product download input to a spreadsheet, # which can be used with the updateDownloadProcess.py to # create a download process log file, which is used to request # data products from the dataProductDownload service and track the # download process using the runDownloadProcess.py script # Purpose: The purpose of this script is to take a list of root locations, a list of # device categories and a list of date ranges and generate all of # the unique request and store them in a process log json file. # Once the process log file has been generated, it can be manually updated # to include new root location codes, device category codes and time ranges # and this script rerun, to update the download processes. # The download processes can be executed and the generated files downloaded using # the runDownloadProcess.py script. # Purpose: The purpose of this script is to read data product requests from a json file, run them, # download the files, and save the process back to the file. # The process runs on multiple threads for running and polling the request status # while simultaneously downloading files on a pool of different threads. # The process log file can be manually updated to include new locations, device categories # and date ranges, and then the runDownloadProcessLogUpdate.py can be run to create # new download processes, which will be picked up, next time this script is run. # # Author: ryanross # # Created: 05/27/2017 # Copyright: (c) Ocean Networks Canada 2017 # Licence: None # Requires: onc.py # openpyxl #------------------------------------------------------------------------------- import sys import getopt import time from time import strftime from datetime import datetime import os from os.path import isfile from os.path import basename from os.path import splitext from os.path import dirname import errno import itertools import threading from queue import Queue from cgi import logfile from openpyxl import load_workbook from openpyxl import Workbook from openpyxl.worksheet.table import Table, TableStyleInfo from onc.onc import ONC from ctypes.wintypes import LONG if sys.version_info.major == 2: from V2 import util else: from onc.V3 import util class createXLS: token = None #-t, --token Personal token can be obtained from the 'Web Services API' tab at http://dmas.uvic.ca/Profile append = False #-a, --append Indicates if the file should be appended it if already exists. downloadMethod = 'byLocation' #-m, --downloadMethod The method for downloading data. Data can be downloaded by location or device. If excluded, byLocation will be used. Options are file = None #-f, --file The destination Excel spreadsheet file name downloadFolder = None #-o, --outpath The folder that file(s) will be saved to locationCode = None #-l, --locationCode The code for the parent location you would like to use to search for instruments from. deviceCategoryCode = None #-c, --deviceCategoryCode The code for the device category of the instrument you would like to download data from. deviceCode = None #-d, --deviceCode The code for a specific device you would like to download data from propertyCode = None #-v, --propertyCode The code for the property you would like to download data for dataProductCode = None #-p, --dataProductCode The code for the data product you would like to download extension = None #-x, --extension The file extension for the data product you would like to download begin = None #-b, --begin The beginning date/time of the data you would like to download end = None #-e, --end The ending date/time of the data you would like to download logFile = None # The name of the file that the data product request configurations are saved to and progress is tracked in splitDateRange = None # Used to determine how the requests are split up by date range. # Valid: all, year, month, week, day dayOfWeek = None # The day of the week to start a weekly request from. Is only used if requestTimeSpan=week # Valid MO, TU, WE, TH, FR, SA, SU validateRequests = None # Used to determine if the a service call is made to validate the parameters of the request (deployment date, properties) before the new request is added runThreads = None # The size of the queue and number of threads that can be used for running data product requests downloadThreads = None # The number of threads that can be used for downloading data on downloadQueueSize = None # The size of the queue for downloading files. unlimited can grow to any size, however, the number of concurrent requests is determined by the number of download threads beginRun = None # Time to begin adding run tasks to the queue endRun = None # Time to end adding run tasks to the queue isProduction = True # Indicates if the ONC Production Server will be used. showInfo = False # Indicates if verbose messaging will be used. # A list of all of the default data products. This list was manually defined by using https://wiki.oceannetworks.ca/display/DP/Data+Products+Home. This will eventually be replaced with service calls defaultDPO = [{"dataProductCode":"TSSD", "extension":"csv", "options":"dpo_qualityControl=1,dpo_dataGaps=1,dpo_resample=none"}, #https://wiki.oceannetworks.ca/display/DP/1 {"dataProductCode":"TSSD", "extension":"odv", "options":"dpo_qualityControl=1,dpo_dataGaps=1,dpo_resample=none"}, #https://wiki.oceannetworks.ca/display/DP/1 {"dataProductCode":"TSSD", "extension":"mat", "options":"dpo_qualityControl=1,dpo_dataGaps=1,dpo_resample=none"}, #https://wiki.oceannetworks.ca/display/DP/1 {"dataProductCode":"TSSD", "extension":"txt", "options":"dpo_qualityControl=1,dpo_dataGaps=1,dpo_resample=none"}, #https://wiki.oceannetworks.ca/display/DP/1 {"dataProductCode":"TSSD", "extension":"json", "options":"dpo_qualityControl=1,dpo_dataGaps=1,dpo_resample=none,dpo_jsonOutputEncoding=ONC"}, #https://wiki.oceannetworks.ca/display/DP/1 {"dataProductCode":"AD", "extension":"mp3", "options":"dpo_hydrophoneDataDiversionMode="}, ] def __init__(self,token,downloadFolder,locationCode=None,deviceCategoryCode=None,deviceCode=None,dataProductCode=None,extension=None,begin=None,end=None,append=False,qa=False,downloadMethod='byLocation'): self.token=token self.downloadMethod=downloadMethod self.append=append self.downloadFolder=downloadFolder self.locationCode=locationCode self.deviceCategoryCode=deviceCategoryCode self.deviceCode=deviceCode self.dataProductCode=dataProductCode self.extension=extension self.begin=begin self.end=end self.qa=qa self.getConfigurations() def main(self,argv): self.getParameters(argv) self.getConfigurations() self.createWorkbook(self.file) def createWorkbook(self,file=None): onc = ONC(self.token,self.isProduction,self.showInfo) # An instance of the ONC class used to perform all of the web service calls if not file: file = '{}/download.xlsx'.format(self.downloadFolder) print(file) #Check to see if the file is open before proceeding to processing if isfile(file): try: f = open(file,'r+') f.close() except IOError as e: print('Error opening file {}. Please close and retry: {}'.format(file,e)) exit(-1) if self.append and isfile(file): wb = load_workbook(file) else: wb = Workbook() wsSheet = wb['Sheet'] wb.remove(wsSheet) style = TableStyleInfo(name="TableStyleMedium16", showFirstColumn=False, showLastColumn=False, showRowStripes=True, showColumnStripes=False) if self.downloadMethod in ['all','byLocation']: wsByLocation = self.createByLocationsWS(wb,onc) self.createTable(wsByLocation,'ByLocation', style) if self.downloadMethod in ['all','byDevice']: wsByDevice = self.createByDeviceWS(wb,onc) self.createTable(wsByDevice,'ByDevice', style) if not self.append: wsConfiguration = self.createConfigWS(wb) self.createTable(wsConfiguration,'Configuration', style) print('Saving {}'.format(file)) path = os.path.dirname(file) if not os.path.isdir(path): try: os.makedirs(path) except OSError as exc: if exc.errno == errno.EEXIST and os.path.isdir(self.outPath): pass else: raise try: wb.save(file) except Exception as e: if (e.errno == 13): print('Error saving file. Please close and retry: {}'.format(e)) else: print('Error saving file: {}'.format(e)) print('Done!') def getParameters(self,argv): try: opts, args = getopt.getopt(argv,"ht:a:m:q:f:o:l:c:d:v:p:x:b:e",["token=","append","downloadMethod=","qa","file=","outPath=","locationCode=","deviceCategoryCode=","deviceCode=","propertyCode=","dataProductCode=","extension=","begin=","end="]) except getopt.GetoptError: self.usage() sys.exit(2) if (len(opts)==0): self.usage() sys.exit() for opt, arg in opts: if opt == '-h': self.usage() sys.exit() elif opt in ("-a", "--append"): self.append = True elif opt in ("-m", "--downloadMethod"): self.downloadMethod = arg elif opt in ("-q", "--qa"): self.isProduction = False elif opt in ("-f", "--file"): self.file = arg elif opt in ("-t", "--token"): self.token = arg elif opt in ("-o", "--outPath"): self.downloadFolder = arg elif opt in ("-l", "--locationCode"): self.locationCode = arg elif opt in ("-c", "--deviceCategoryCode"): self.deviceCategoryCode = arg elif opt in ("-d", "--deviceCode"): self.deviceCode = arg elif opt in ("-v", "--propertyCode"): self.propertyCode = arg elif opt in ("-p", "--dataProductCode"): self.dataProductCode = arg elif opt in ("-x", "--extension"): self.extension = arg elif opt in ("-b", "--begin"): self.begin = arg elif opt in ("-e", "--end"): self.end = arg def getConfigurations(self): self.logFile = '{}/download.json'.format(self.downloadFolder) self.splitDateRange ='all' self.dayOfWeek = 'SU' self.validateRequests = 'FALSE' self.runThreads = 4 self.downloadThreads = 10 self.downloadQueueSize = 'unlimited' self.beginRun = '00:00:00' self.endRun = '00:00:00' def createConfigWS(self,wb): wsConfiguration = wb.create_sheet('Configuration') wsConfiguration.append(['parameter','value','description']) wsConfiguration.append(['token',self.token,"Your personal token can be obtained from the 'Web Services API' tab at http://dmas.uvic.ca/Profile"]) wsConfiguration.append(['downloadFolder',self.downloadFolder,'The folder that the data files will be saved to']) wsConfiguration.append(['logFile',self.logFile,'The configuration file used to create download definitions and track progress']) wsConfiguration.append(['splitDateRange',self.splitDateRange,'Used to determine how the requests are split up by date range./nValid: all, year, month, week, day']) wsConfiguration.append(['dayOfWeek',self.dayOfWeek,'MO, TU, WE, TH, FR, SA, SU - Only used if requestTimeSpan=week']) wsConfiguration.append(['validateRequests',self.validateRequests,'Used to determine if the a service call is made to validate the parameters of the request (deployment date, properties) before the new request is added']) wsConfiguration.append(['runThreads',self.runThreads,'The size of the queue and number of threads that can be used for running data product requests']) wsConfiguration.append(['downloadThreads',self.downloadThreads,'The number of threads that can be used for downloading data on']) wsConfiguration.append(['downloadQueueSize',self.downloadQueueSize,'The size of the queue for downloading files. unlimited can grow to any size, however, the number of concurrent requests is determined by the number of download threads']) wsConfiguration.append(['beginRun',self.beginRun,'Time to begin adding run tasks to the queue']) wsConfiguration.append(['endRun',self.endRun,'Time to end adding run tasks to the queue']) wsConfiguration['B11'].number_format = 'HH:MM:SS' wsConfiguration['B12'].number_format = 'HH:MM:SS' return wsConfiguration def createByLocationsWS(self,wb,onc): if self.append and 'By Location' in wb: wsByLocation = wb['By Location'] else: wsByLocation = wb.create_sheet('By Location') #Create the column names wsByLocation.append(['locationCode', 'locationName', 'deviceCategoryCode', 'deviceCategoryName', 'propertyCode', 'download', 'begin', 'end', 'deviceCode', 'latitude', 'longitude', 'depth', 'dataProductCode', 'dataProductName', 'extension', 'data product options', 'helpDocument', 'Data Search', 'Device Listing']) print('Getting Locations') locParms = {'deploymentBegin':self.begin, 'deploymentEnd':self.end} if self.locationCode: locParms['locationCode']=self.locationCode locParms['includeChildren']='true' if self.deviceCode: locParms['deviceCode']=self.deviceCode if self.deviceCategoryCode: locParms['deviceCategoryCode']=self.deviceCategoryCode if self.propertyCode: locParms['propertyCode']=self.propertyCode if self.dataProductCode: locParms['dataProductCode'] = self.dataProductCode datetimeFormat = '%Y-%m-%dT%H:%M:%S.%f' datetimeFormatZ = '%Y-%m-%dT%H:%M:%S.%fZ' dtBegin = datetime.strptime(self.begin,datetimeFormatZ) dtEnd = datetime.strptime(self.end,datetimeFormatZ) foundLocations = [] for l in onc.getLocations(locParms): lCode = l['locationCode'] lName = l['locationName'] lLink = l['dataSearchURL'] dcLink = '{0}&deviceCategory={1}'.format(lLink,self.deviceCategoryCode) #Add some code to ensure that multiple locations are not processed. This is the result of JIRA Ticket DMAS-39346 if lCode in foundLocations: break foundLocations.append(lCode) dcParms = {'locationCode':lCode} if self.deviceCategoryCode: dcParms['deviceCategoryCode'] = self.deviceCategoryCode if self.propertyCode: dcParms['propertyCode'] = self.propertyCode for dc in onc.getDeviceCategories(dcParms): dcCode = dc['deviceCategoryCode'] dcName = dc['deviceCategoryName'] devParms = {'locationCode':lCode, 'deviceCategoryCode':dcCode} if self.deviceCode: devParms['deviceCode']=self.deviceCode devices = onc.getDevices(devParms) depParms = {'locationCode':lCode, 'deviceCategoryCode':dcCode, 'begin':self.begin, 'end':self.end} if self.deviceCode: depParms['deviceCode']=self.deviceCode deployments = [] for deployment in onc.getDeployments(depParms): depBegin = deployment['begin'] depEnd = deployment['end'] deviceCode = deployment['deviceCode'] lat = deployment['lat'] lon = deployment['lon'] depth = deployment['depth'] dtDepBegin = datetime.strptime(depBegin,datetimeFormatZ) if depEnd: dtDepEnd = datetime.strptime(depEnd,datetimeFormatZ) else: dtDepEnd = datetime.now() if max(dtBegin, dtDepBegin) < min(dtEnd, dtDepEnd): if (dtBegin >= dtDepBegin): dtDepBegin = dtBegin if (dtEnd <= dtDepEnd): dtDepEnd = dtEnd deviceLink = '' for d in devices: if d['deviceCode']==deviceCode: deviceLink = d['deviceLink'] break deployments.append({'begin':dtDepBegin, 'end':dtDepEnd, 'deviceCode':deviceCode, 'deviceLink':deviceLink, 'lat':lat, 'lon':lon, 'depth':depth}) sortedDeployments = sorted(deployments,key=lambda k: k['begin']) dcParms = {'locationCode':lCode,'deviceCategoryCode':dcCode} if self.dataProductCode: dcParms['dataProductCode'] = self.dataProductCode if self.extension: dcParms['extension'] = self.extension dataProducts = onc.getDataProducts(dcParms) if (len(dataProducts)>=1 and len(sortedDeployments)>=1): print(' {} - {} - {}'.format(lCode,lName,dcName)) print(' Deployments:') for dp in dataProducts: defaultOptions = [dpo['options'] for dpo in self.defaultDPO if dpo['dataProductCode']==dp['dataProductCode'] and dpo['extension']==dp['extension']] if (len(defaultOptions)>0): dataProductOptions = defaultOptions[0] else: dataProductOptions = '' for dep in sortedDeployments: print(" {} from {} to {}".format(dep['deviceCode'],dep['begin'],dep['end'])) wsByLocation.append([lCode, #locationCode lName, #locationName - For context only, not used for generating data product dcCode, #deviceCategoryCode dcName, #deviceCategoryName - For context only, not used for generating data product self.propertyCode, #propertyCode 'Y', #download dep['begin'], #begin dep['end'], #end dep['deviceCode'], #deviceCode dep['lat'], #latitude dep['lon'], #longitude dep['depth'], #depth dp['dataProductCode'], #dataProductCode dp['dataProductName'], #dataProductName dp['extension'], #extension dataProductOptions, #data product options - Example: 'dpo_qualityControl=1,dpo_resample=none,dpo_dataGaps=0'. User will be required to enter the DPOs into the spreadsheet by consulting the helpDocument dp['helpDocument'], #helpDocument - alt format '=HYPERLINK("{0}","{0}")'.format(dp['helpDocument']) dcLink, #Data Search - alt format '=HYPERLINK("{0}","{0}")'.format(dcLink) dep['deviceLink']]) #Device Listing #Make the helpDocument and Data Search Columns a hyperlink rows = [r for r in wsByLocation.rows] for col in [c for c in wsByLocation.columns if c[0].value in ['helpDocument','Data Search','Device Listing']]: for i in range(1,len(rows)): col[i].style='Hyperlink' col[i].hyperlink = col[i].value return wsByLocation def createByDeviceWS(self,wb,onc): if self.append and 'By Device' in wb: wsByDevice = wb['By Device'] else: wsByDevice = wb.create_sheet('By Device') wsByDevice.append(['deviceCode', 'deviceName', 'propertyCode', 'download', 'begin', 'end', 'locationCode', 'latitude', 'longitude', 'depth', 'dataProductCode', 'dataProductName', 'extension', 'data product options', 'helpDocument', 'Device Listing']) print('Getting Devices') if self.locationCode: locParms = {'locationCode':self.locationCode, 'includeChildren':'true', 'deploymentBegin':self.begin, 'deploymentEnd':self.end} if self.deviceCode: locParms['deviceCode'] = self.deviceCode if self.deviceCategoryCode: locParms['deviceCategoryCode'] = self.deviceCategoryCode if self.propertyCode: locParms['propertyCode'] = self.propertyCode if self.dataProductCode: locParms['dataProductCode'] = self.dataProductCode lCodes = [l['locationCode'] for l in onc.getLocations(locParms)] else: lCodes = [None] for lCode in lCodes: devParms = {'deploymentBegin':self.begin, 'deploymentEnd':self.end} if self.deviceCode: devParms['deviceCode'] = self.deviceCode if lCode: devParms['locationCode'] = lCode if self.deviceCategoryCode: devParms['deviceCategoryCode'] = self.deviceCategoryCode if self.propertyCode: devParms['propertyCode'] = self.propertyCode if self.dataProductCode: devParms['dataProductCode'] = self.dataProductCode datetimeFormat = '%Y-%m-%dT%H:%M:%S.%f' datetimeFormatZ = '%Y-%m-%dT%H:%M:%S.%fZ' dtBegin = datetime.strptime(self.begin,datetimeFormatZ) dtEnd = datetime.strptime(self.end,datetimeFormatZ) devices = onc.getDevices(devParms) if (len(devices) == 0): print("No Devices match filter criteria {}".format(devParms)) for d in devices: dCode = d['deviceCode'] dName = d['deviceName'] dLink = d['deviceLink'] depParams = {'deviceCode':dCode, 'begin':self.begin, 'end':self.end} if lCode: depParams['locationCode'] = lCode if self.deviceCategoryCode: depParams['deviceCategoryCode'] = self.deviceCategoryCode if self.propertyCode: depParams['propertyCode'] = self.propertyCode deployments = [] for deployment in onc.getDeployments(depParams): depBegin = deployment['begin'] depEnd = deployment['end'] locationCode = deployment['locationCode'] lat = deployment['lat'] lon = deployment['lon'] depth = deployment['depth'] dtDepBegin = datetime.strptime(depBegin,datetimeFormatZ) if depEnd: dtDepEnd = datetime.strptime(depEnd,datetimeFormatZ) else: dtDepEnd = datetime.now() if max(dtBegin, dtDepBegin) < min(dtEnd, dtDepEnd): if (dtBegin >= dtDepBegin): dtDepBegin = dtBegin if (dtEnd <= dtDepEnd): dtDepEnd = dtEnd deployments.append({'begin':dtDepBegin, 'end':dtDepEnd, 'locationCode':locationCode, 'lat':lat, 'lon':lon, 'depth':depth}) sortedDeployments = sorted(deployments,key=lambda k: k['begin']) dcParms = {'deviceCode':dCode} if self.dataProductCode: dcParms['dataProductCode'] = self.dataProductCode if self.extension: dcParms['extension'] = self.extension dataProducts = onc.getDataProducts(dcParms) if (len(dataProducts)>=1 and len(sortedDeployments)>=1): print(' {} - {}'.format(dCode,dName)) print(' Deployments:') for dp in dataProducts: defaultOptions = [dpo['options'] for dpo in self.defaultDPO if dpo['dataProductCode']==dp['dataProductCode'] and dpo['extension']==dp['extension']] if (len(defaultOptions)>0): dataProductOptions = defaultOptions[0] else: dataProductOptions = '' for dep in sortedDeployments: print(" {} from {} to {}".format(dep['locationCode'],dep['begin'],dep['end'])) wsByDevice.append([dCode, #deviceCode dName, #deviceName - For context only, not used for generating data product self.propertyCode, #propertyCode 'Y', #download dep['begin'], #begin dep['end'], #end dep['locationCode'], #locationCode dep['lat'], #latitude dep['lon'], #longitude dep['depth'], #depth dp['dataProductCode'], #dataProductCode dp['dataProductName'], #dataProductName dp['extension'], #extension dataProductOptions, #data product options - Example: 'dpo_qualityControl=1,dpo_resample=none,dpo_dataGaps=0'. User will be required to enter the DPOs into the spreadsheet by consulting the helpDocument dp['helpDocument'], #helpDocument - alt format '=HYPERLINK("{0}","{0}")'.format(dp['helpDocument']) dLink]) #Device Listing - alt format '=HYPERLINK("{0}","{0}")'.format(dLink) #Make the helpDocument and Device Listing Columns a hyperlink rows = [r for r in wsByDevice.rows] for col in [c for c in wsByDevice.columns if c[0].value in ['helpDocument','Device Listing']]: for i in range(1,len(rows)): col[i].style='Hyperlink' col[i].hyperlink = col[i].value return wsByDevice def createTable(self,ws,displayName,style): cols = [c for c in ws.columns] rows = [r for r in ws.rows] rowCount = len(rows) if rowCount == 1: rowCount = 2 # if the table only has a header, create a table with one empty row. That way a user can manually populate the table if desired. tabs = [t for t in ws._tables if t.displayName == displayName] if len(tabs) >=1: tab = tabs[0] tab.ref="A1:{0}{1}".format(cols[len(cols)-1][0].column,rowCount) else: tab = Table(displayName=displayName, ref="A1:{0}{1}".format(cols[len(cols)-1][0].column,rowCount)) tab.tableStyleInfo=style ws.add_table(tab) self.fitColumns(ws,2) def fitColumns(self,ws,pad): dims = {} for row in ws.rows: for cell in row: if cell.value: dims[cell.column] = max((dims.get(cell.column, 0), len(str(cell.value)))) for col, value in dims.items(): ws.column_dimensions[col].width = value + pad def usage(self): print('\nUsage:') print(' downloadONC.py createXLS -t -o -b -e [options]') print('\nDescription:') print(' Creates an Excel spreadsheet from discovery service filters. Optional input parameters') print(' are used to filter the ONC discovery services to discover ONC Data available for') print(' download as data products. Once a spreadsheet is generated, the user can run the') print(' download command to download the files.') print('\nRequired:') print(' -t, --token User authentication token.') print(' Obtain token from http://dmas.uvic.ca/Profile.') print(' -o, --outPath Path of the output folder.') print(' -b, --begin Beginning date of request.') print(' -e, --end Ending date of request.') print('\nOptions:') print(' -h, --help Show Help.') print(' -a, --append Append existing XLS spreadsheet.') print(' -m, --downloadMethod The method for downloading data. Data can be') print(' downloaded by location or device. If excluded,') print(' the byLocation method will be used.') print(' -q, --qa Use the QA Server. For Internal Use ONLY.') print(' Filters:') print(' -l, --locationCode Root location code filter.') print(' -c, --deviceCategoryCode Device category code filter.') print(' -d, --deviceCode Device code filter.') print(' -v, --propertyCode Property code filter.') print(' -p, --dataProductCode Data product code filter.') print(' -x, --extension Data product extension filter.') class process(): file = None #-f or --file The folder that file(s) will be saved to isProduction = True # Indicates if web service requests are run in production (True) or QA (False) showInfo = False # Used for process debugging. Default if False (off) def __init__(self,file=None,production=True): self.file=file self.isProduction=production def main(self,argv): self.getParameters(argv) self.updateFile(self.file) def updateFile(self,file): if file and not isfile(file): print('Error - Download Excel spreadsheet {} does not exist!'.format(self.file)) exit(-1) datetimeFormat = '%Y-%m-%dT%H:%M:%S.%f' dataProducts = [] print('Reading definitions from {}'.format(file)) wb = load_workbook(file) if 'Configuration' in wb: wsConfiguration = wb['Configuration'] ''' Get the column indexes from the Configuration worksheet''' colsConfiguration = {} c = 0 for cell in wsConfiguration[1]: val = cell.value if not val: break colsConfiguration[val] = c c+=1 configs = {} for row in wsConfiguration.iter_rows(min_row=2, max_col=4, max_row=1000): parameter = row[colsConfiguration['parameter']].value value = row[colsConfiguration['value']].value if (not parameter): break configs[parameter] = value if 'By Location' in wb: wsByLocation = wb['By Location'] ''' Get the column indexes from the By Location worksheet''' colsByLocation = {} c = 0 for cell in wsByLocation[1]: val = cell.value if not val: break colsByLocation[val] = c c+=1 for row in wsByLocation.iter_rows(min_row=2, max_col=16, max_row=1000): locationCode = row[colsByLocation['locationCode']].value if (not locationCode): break download = row[colsByLocation['download']].value if (download == 'Y'): deviceCategoryCode = row[colsByLocation['deviceCategoryCode']].value dataProductCode = row[colsByLocation['dataProductCode']].value extension = row[colsByLocation['extension']].value dpo = row[colsByLocation['data product options']].value begin = row[colsByLocation['begin']].value.strftime(datetimeFormat)[:-3]+'Z' end = row[colsByLocation['end']].value.strftime(datetimeFormat)[:-3]+'Z' p = {} if locationCode: p['locationCode'] = locationCode if deviceCategoryCode: p['deviceCategoryCode'] = deviceCategoryCode if not dataProductCode or not extension or not begin or not end: continue p['dataProductCode'] = dataProductCode p['extension'] = extension p['begin'] = begin p['end'] = end if dpo: for o in dpo.split(','): d = o.split('=') key = d[0] val = d[1] if util.isNumber(val): val = int(val) p[key] = val dataProducts.append(p) if 'By Device' in wb: wsByDevice = wb['By Device'] ''' Get the column indexes from the By Device worksheet''' colsByDevice = {} c = 0 for cell in wsByDevice[1]: val = cell.value if not val: break colsByDevice[val] = c c+=1 for row in wsByDevice.iter_rows(min_row=2, max_col=16, max_row=1000): deviceCode = row[colsByDevice['deviceCode']].value if (not deviceCode): break download = row[colsByDevice['download']].value if (download == 'Y'): propertyCode = row[colsByDevice['propertyCode']].value dataProductCode = row[colsByDevice['dataProductCode']].value extension = row[colsByDevice['extension']].value dpo = row[colsByDevice['data product options']].value begin = row[colsByDevice['begin']].value.strftime(datetimeFormat)[:-3]+'Z' end = row[colsByDevice['end']].value.strftime(datetimeFormat)[:-3]+'Z' p = {} if deviceCode: p['deviceCode'] = deviceCode if propertyCode: p['propertyCode'] = propertyCode if not dataProductCode or not extension or not begin or not end: continue p['dataProductCode'] = dataProductCode p['extension'] = extension p['begin'] = begin p['end'] = end if dpo: for o in dpo.split(','): d = o.split('=') key = d[0] val = d[1] if util.isNumber(val): val = int(val) p[key] = val dataProducts.append(p) token = configs['token'] downloadFolder = configs['downloadFolder'] logFile = configs['logFile'] runThreads = configs['runThreads'] downloadThreads = configs['downloadThreads'] downloadQueueSize = configs['downloadQueueSize'] splitDateRange = configs['splitDateRange'] dayOfWeek = configs['dayOfWeek'] beginRun = configs['beginRun'] endRun = configs['endRun'] if (not os.path.isfile(downloadFolder)): #Create the directory structure if it doesn't already exist try: os.makedirs(downloadFolder) except OSError as exc: if exc.errno == errno.EEXIST and os.path.isdir(downloadFolder): pass else: raise onc = ONC(token,self.isProduction,self.showInfo,downloadFolder) # An instance of the ONC class used to perform all of the web service calls url='{}api/dataProductDelivery'.format(onc.baseUrl) # The Oceans 2.0 web service url that will be used to request the data products. # It will be stored as part of the 'requestUrl' in the list of data product requests. # logFile = '{}/{}.json'.format(outPath,fileName) # The full path of the log file ''' If the process log json file already exists, read it in as a dictionary, otherwise create a new process log dictionary with default values and save it to the log file. ''' if (isfile(logFile)): processLog = util.readJSONFile(logFile) else: if isinstance(beginRun,datetime): beginRun = beginRun.strftime("%H:%M:%S") if isinstance(endRun,datetime): endRun = endRun.strftime("%H:%M:%S") processLog = {"token":token, "downloadFolder":downloadFolder, "runThreads":runThreads, "downloadThreads":downloadThreads, "downloadQueueSize":downloadQueueSize, "splitDateRange":splitDateRange, "dayOfWeek":dayOfWeek, "beginRun":beginRun, #time.strptime(beginRun, "%H:%M:%S") "endRun":endRun #time.strptime(endRun, "%H:%M:%S") } util.writeJSONFile(logFile,processLog) ''' Create the new Data Product Requests and add them to the process log ''' if 'dataProductRequests' in processLog: dataProductRequests = processLog['dataProductRequests'] else: dataProductRequests = [] print('Updating configurations') for d in dataProducts: isLocation = False isDevice = False dpParms = {'dataProductCode':d['dataProductCode'], 'extension':d['extension']} if 'propertyCode' in d: dpParms['propertyCode'] = d['propertyCode'] ''' Determine if the request is by location or device ''' if 'locationCode' in d: dpParms['locationCode'] = d['locationCode'] dpParms['deviceCategoryCode'] = d['deviceCategoryCode'] isLocation = True if 'deviceCode' in d: dpParms['deviceCode'] = d['deviceCode'] isDevice = True ''' verify that the data product extension is valid for the location/device category or device ''' if (len(onc.getDataProducts(dpParms)) <= 0): print('Invalid code combination: {}'.format(dpParms)) continue if isLocation: ''' verify that there are instruments deployed for the deviceCategory and location during the requested time ''' locParms = {'locationCode':d['locationCode'], 'deviceCategoryCode':d['deviceCategoryCode'], 'deploymentBegin':d['begin'], 'deploymentEnd':d['end']} if (len(onc.getLocations(locParms)) < 1): print('No instruments deployed for: {}'.format(locParms)) continue if isDevice: ''' verify that the device deployed during the requested time ''' devParms = {'deviceCode':d['deviceCode'], 'deploymentBegin':d['begin'], 'deploymentEnd':d['end']} if (len(onc.getDevices(devParms)) < 1): print('Device is not deployed for: {}'.format(devParms)) continue begin = datetime.strptime(d['begin'],'%Y-%m-%dT%H:%M:%S.%fZ') end = datetime.strptime(d['end'],'%Y-%m-%dT%H:%M:%S.%fZ') if (splitDateRange == 'day'): dateRanges = util.daterangeByDay(begin, end) elif (splitDateRange == 'week'): if dayOfWeek == 'MO': dateRanges = util.daterangeByWeek(begin, end, MO) elif dayOfWeek == 'TU': dateRanges = util.daterangeByWeek(begin, end, TU) elif dayOfWeek == 'WE': dateRanges = util.daterangeByWeek(begin, end, WE) elif dayOfWeek == 'TH': dateRanges = util.daterangeByWeek(begin, end, TH) elif dayOfWeek == 'FR': dateRanges = util.daterangeByWeek(begin, end, FR) elif dayOfWeek == 'SA': dateRanges = util.daterangeByWeek(begin, end, SA) else: dateRanges = util.daterangeByWeek(begin, end, SU) elif (splitDateRange == 'month'): dateRanges = util.daterangeByMonth(begin, end) elif (splitDateRange == 'year'): dateRanges = util.daterangeByYear(begin, end) else: dateRanges = [{'begin':d['begin'],'end':d['end']}] for dr in dateRanges: parameters = {'method':'request','token':token} rp = d.copy() rp['begin']=dr['begin'] rp['end']=dr['end'] parameters.update(rp) requestUrl = '{}?{}'.format(url,('&'.join(['{}={}'.format(i[0],i[1]) for i in parameters.items()]))) if len([r for r in dataProductRequests if (r['requestUrl'] == requestUrl)]) == 0: dataProductRequest = {} dataProductRequest['requestUrl'] = requestUrl dataProductRequest['parameters'] = parameters dataProductRequest['requestId'] = 0 print(' Adding Request: {}'.format(rp)) dataProductRequests.append(dataProductRequest) processLog['dataProductRequests'] = dataProductRequests ''' Update the run statistics to include the total number of requests ''' processLog['totalRequests'] = len(dataProductRequests) ''' Save the updated process log dictionary to the process log json file. ''' print('Saving {}'.format(logFile)) util.writeJSONFile(logFile,processLog) print('Done!') def getParameters(self,argv): try: opts, args = getopt.getopt(argv,"hf:q",["file=",'qa']) except getopt.GetoptError: self.usage() sys.exit(2) if (len(opts)==0): self.usage() sys.exit() for opt, arg in opts: if opt == '-h': self.usage() sys.exit() elif opt in ("-f", "--file"): self.file = arg elif opt in ("-q", "--qa"): self.isProduction = False def usage(self): print('\nUsage:') print(' downloadONC.py updateFromXLS -f ') print('\nDescription:') print(' Creates a JSON configuration file from an Excel spreadsheet.') print(' The Excel spreadsheet contains a tab for location level data') print(' product definitions, a tab for device level data product definitions') print(' and a tab for configurations. Configurations include information') print(' needed to run the data product delivery requests, such as the') print(' user authentication token, output location, number of threads') print(' and how to split up the requests for improved performance.') print('\nRequired:') print(' -f, --file The Excel spreadsheet file name') print('\nOptions:') print(' -h, --help Show Help') print(' -q, --qa Use the QA Server. For Internal Use ONLY.') class download(): lock = threading.Lock() # Used to prevent process log file read/write and print message contention ''' Read the configurations ''' file = None #-f, --file The data product definition json file name datetimeFormat = '%Y-%m-%dT%H:%M:%S.%f' # The date/time format used for all data product requests and time reporting # Example: 2017-03-29T16:55:07.647Z processLog = None # which is a json file that contains the definitions needed to create all of the data product requests and # the status of each of the 3 steps # 1 - Request the data product, # 2 - Run the data product request # 3 - download the data product files. qRun = None qDownload = None token = None # The token used to run all of the requests is stored in the process log json file. isProduction = True # Indicates if web service requests are run in production (True) or QA (False) showInfo = False # Used for process debugging. Default if False (off) downloadFolder = None # The folder that the files will be written to beginRun = None # The Run worker process will only add items to the queue after the beginRun time endRun = None # The Run worker process will stop adding items to the queue when the endRun time has passed def __init__(self, file=None, production=True): self.file=file self.isProduction=production def main(self, argv): self.getParameters(argv) self.execute(self.file) def getParameters(self, argv): try: opts, args = getopt.getopt(argv,"hf:q",["file=","qa"]) except getopt.GetoptError: self.usage() sys.exit(2) if (len(opts)==0): self.usage() sys.exit() for opt, arg in opts: if opt == '-h': self.usage() sys.exit() elif opt in ("-f", "--file"): self.file = arg elif opt in ("-q", "--qa"): self.isProduction = False def execute(self, xlsFile): if (not isfile(xlsFile)): print('Warning: {} does not exist. Please run downloadONC.py createXLS to generate.'.format(file)) exit(-1) wb = load_workbook(xlsFile) if 'Configuration' in wb: wsConfiguration = wb['Configuration'] ''' Get the column indexes from the Configuration worksheet''' colsConfiguration = {} c = 0 for cell in wsConfiguration[1]: val = cell.value if not val: break colsConfiguration[val] = c c+=1 configs = {} for row in wsConfiguration.iter_rows(min_row=2, max_col=4, max_row=1000): parameter = row[colsConfiguration['parameter']].value value = row[colsConfiguration['value']].value if (not parameter): break configs[parameter] = value self.file = configs['logFile'] # if (not isfile(self.file)): p = process(None,self.isProduction) p.updateFile(xlsFile) ''' Read the process log file ''' print('Reading configurations from {}'.format(self.file)) self.processLog = util.readJSONFile(self.file) self.token = self.processLog['token'] self.downloadFolder = self.processLog['downloadFolder'] runThreads = self.processLog['runThreads'] # Used to define the number of threads that the data product run requests can run on, and the size of Queue. Default is 5. Set in the process log file downloadThreads = self.processLog['downloadThreads'] # Used to define the number of threads that the file downloads can run on. Default is 10 downloadQueueSize = self.processLog['downloadQueueSize'] # Default is 'unlimited' self.beginRun = datetime.strptime(self.processLog['beginRun'],'%H:%M:%S') self.endRun = datetime.strptime(self.processLog['endRun'],'%H:%M:%S') ''' Create the Queues to handle the run processes and file download processes ''' self.qRun = Queue(runThreads) # The Data Product Run Request Queue. # This will allow only 5 (as set by the runThreads) run requests to be active on # the Task Server at a time. This will prevent this process from filling up the Task Server Queue if util.isNumber(downloadQueueSize): self.qDownload = Queue(downloadQueueSize) # File download Queue, is used to manage all of the file downloads else: self.qDownload = Queue() ''' Initialize the threads for running the data product run/status polling requests on. ''' for i in range(runThreads): t = threading.Thread(target=self.runWorker) t.daemon = True # Thread dies when main thread (only non-daemon thread) exits. t.start() ''' Initialize the threads for running the file download requests on. ''' for i in range(downloadThreads): t = threading.Thread(target=self.downloadWorker) t.daemon = True # Thread dies when main thread (only non-daemon thread) exits. t.start() ''' Instantiate an ONC object to perform all of the web service requests in the main thread. ''' onc = ONC(self.token,self.isProduction,self.showInfo,self.downloadFolder) ''' Request all of the data products that have not been requested yet. ''' print('Creating data product requests for all new data product definitions') for r in [dpr for dpr in self.processLog['dataProductRequests'] if dpr['requestId']==0]: dpRequest = onc.requestDataProduct(r['parameters'],True) if (dpRequest): if 'dpRequestId' in dpRequest: r['requestId'] = dpRequest['dpRequestId'] else: r['requestId'] = -1 r['requestInfo'] = dpRequest else: print(r['requestUrl']) ''' Update the statistics in the process log file. ''' self.processLog["totalRequested"] = len([dpr for dpr in self.processLog['dataProductRequests'] if dpr['requestId']>0]) self.processLog["totalRequestErrors"] = len([dpr for dpr in self.processLog['dataProductRequests'] if dpr['requestId']<0]) ''' Update the process log file with the latest batch of requests. ''' util.writeJSONFile(self.file,self.processLog) ''' Add all of the files that have not been downloaded to the Download Queue. Files may not have been downloaded for a number of reasons, including a previous process was unexpectedly stopped. ''' print('Adding files that have not been downloaded, to the file download queue') for dpRequest in [dpr for dpr in self.processLog['dataProductRequests'] if 'runs' in dpr]: for file in list(itertools.chain(*[[f for f in r['files'] if 'downloaded' in f and not f['downloaded']] for r in dpRequest['runs'] if 'files' in r])): self.qDownload.put(file) ''' Add all of the data product requests to the Run Queue, that have a requestId but no runs, or have runs but no files. ''' print('Running all new data product requests') for dpRequest in [dpr for dpr in self.processLog['dataProductRequests'] if dpr['requestId']>0]: if ('runs' not in dpRequest): # Run if the request has not been run self.qRun.put(dpRequest) elif (len([r for r in dpRequest['runs'] if 'files' not in r])>0): # Run if the request has been run but no files have been saved (likely caused by a crash) self.qRun.put(dpRequest) elif (len([f for f in list(itertools.chain(*[r['files'] for r in dpRequest['runs'] if 'files' in r])) if 'downloaded' not in f])>0): # Run if the request has been run but did not complete before it timed out self.qRun.put(dpRequest) elif (len([f for f in list(itertools.chain(*[r['files'] for r in dpRequest['runs'] if 'files' in r])) if 'status' in f and f['status'] == 'running'])>0): # Run if the request has been run but did not complete before it timed out self.qRun.put(dpRequest) self.qRun.join() # Block until all Run tasks are done. print('Done running request!') self.qDownload.join() # Block until all of the Download tasks are done. ''' Update the statistics in the process log file to include the total number of request that have been run and the total number of files that have been downloaded. ''' l = util.readJSONFile(self.file) l["totalRequestsRun"] = len([dpr for dpr in l['dataProductRequests'] if 'runs' in dpr]) # l["totalDownloadedFiles"] = len([f for f in list(itertools.chain(*[r['files'] for r in list(itertools.chain(*itertools.chain(*[[dpr['runs'] for dpr in dp['dataProductRequests'] if 'runs' in dpr] for dp in l['downloadProcesses']])))])) if 'downloaded' in f and f['downloaded']]) l["totalDownloadedFiles"] = len([f for f in list(itertools.chain(*[r['files'] for r in list(itertools.chain(*[dpr['runs'] for dpr in l['dataProductRequests'] if 'runs' in dpr])) if 'files' in r])) if 'downloaded' in f and f['downloaded']]) util.writeJSONFile(self.file,l) print('Done downloading files!') ''' Method for processing the next run request in the Run Queue. ''' def runWorker(self): start = False end = False while True: now = datetime.now() runStartTime = now.replace(hour=self.beginRun.hour,minute=self.beginRun.minute,second=self.beginRun.second,microsecond=self.beginRun.microsecond) runEndTime = now.replace(hour=self.endRun.hour,minute=self.endRun.minute,second=self.endRun.second,microsecond=self.endRun.microsecond) if not start: start = runStartTime < now if start: end = runEndTime > now else: end = False if start and not end: item = self.qRun.get() self.run(item) self.qRun.task_done() else: delta = runStartTime - now print('sleeping for {} seconds. Will resume at {}'.format(delta.total_seconds(),runStartTime.strftime('%H:%M:%S.%f'))) time.sleep(delta.total_seconds()) ''' Method for processing the next file in the Download Queue. ''' def downloadWorker(self): while True: item = self.qDownload.get() self.download(item) self.qDownload.task_done() ''' Method for running a data product request and polling the web service until it complete. ''' def run(self, item): requestId = item['requestId'] if (len([r for r in item if 'run' in item])<=0): item['runs'] = [] ''' Instantiate an ONC class to handle the web service requests ''' o = ONC(self.token,self.isProduction,self.showInfo,self.downloadFolder) runIds = o.runDataProduct(requestId) if (runIds): for runId in runIds: dt = datetime.now().strftime(self.datetimeFormat)[:-3]+'Z' if('locationCode' in item['parameters'] and 'deviceCategoryCode' in item['parameters']): downloadPath = '{}/{}/{}'.format(self.processLog['downloadFolder'],item['parameters']['locationCode'],item['parameters']['deviceCategoryCode']) elif ('deviceCode' in item['parameters']): downloadPath = '{}/{}'.format(self.processLog['downloadFolder'],item['parameters']['deviceCode']) else: print('{} - Error running Run Id: {}. Invalid parameters'.format(requestId,runId)) continue run = {"runId":runId, "runDate":dt, "downloadPath":downloadPath} item['runs'].append(run) self.saveRunInfo(requestId, item['runs']) ''' Initiate the download for all of the runs ''' for runId in [r['runId'] for r in item['runs']]: ''' Update the path to the new location ''' o.outPath = downloadPath fileCount = 0 # Number of data product files produced by the request. 0 is unknown. TODO: need to get this from request info... if ('requestedInfo' in item # Time in seconds between polling. This is pulled from the time estimate of the request. TODO: need to get this from the request info.... and item['requestInfo'] and 'estimatedProcessingTime' in item['requestInfo'] and util.isNumber(item['requestInfo']['estimatedProcessingTime'])): estimatedProcessingTime = item['requestInfo']['estimatedProcessingTime'] else: estimatedProcessingTime = 2 maxRetries = 1000 # Determines how many times it will poll the service before it stops downloadResultsOnly = True # Indicates that the files will not be downloaded by downloadDataProduct() method, but will only return information about the file, including url and file name. includeMetadataFile = False # Determines if the associated metadata file will be requested as well as the data file(s). multiThreadMessages = True # Determines if progress messages will be displayed. Multiple download requests can be running on different threads # and simultaneously printing to the console creating a confused output. # The False option, locks the thread during print and prefixes each message with the runId. ''' Run the download process to poll the service until it has finished running and return a list of all of the file definitions ''' files = o.downloadDataProduct(runId,fileCount,estimatedProcessingTime,maxRetries,downloadResultsOnly,includeMetadataFile,multiThreadMessages) ''' Save the file info to the process log ''' run['files'] = files self.saveRunInfo(requestId, item['runs']) ''' Add all of the files to be downloaded to the Download Queue ''' for f in files: if ('file' in f): self.qDownload.put(f) # Add the file to the download queue so that it can be downloaded on a separate thread ''' Method for downloading a file to disk from a url. ''' def download(self, item): if ('file' in item): url = item['url'] file = item['file'] o = ONC(self.token,self.isProduction,self.showInfo) downloadResult = o.downloadFile(url,file,True) if ('downloaded' in downloadResult): item['downloaded'] = downloadResult['downloaded'] if ('downloadingTime' in downloadResult): item['downloadingTime'] = downloadResult['downloadingTime'] if ('message' in downloadResult): item['message'] = downloadResult['message'] self.saveFileInfo(item) ''' Method for saving information about the request runs, for a request Id, to the process log ''' def saveRunInfo(self, requestId, runs): with self.lock: l = util.readJSONFile(self.file) dpr = [dpr for dpr in l['dataProductRequests'] if dpr['requestId'] == requestId][0] dpr['runs']= runs l["totalRequestsRun"] = len([dpr for dpr in l['dataProductRequests'] if 'runs' in dpr]) util.writeJSONFile(self.file,l) ''' Method for saving file information and download status to the process log ''' def saveFileInfo(self, item): with self.lock: l = util.readJSONFile(self.file) for dpRequest in [dpr for dpr in l['dataProductRequests'] if 'runs' in dpr]: for file in list(itertools.chain(*[[f for f in r['files'] if f['url'] == item['url'] and f['file'] == item['file']] for r in dpRequest['runs'] if 'files' in r])): file['downloaded']=item['downloaded'] file['downloadingTime']=item['downloadingTime'] if len(item['message']) > 0: for message in item['message']: file['message'].append(message) util.writeJSONFile(self.file,l) def usage(self): print('\nUsage:') print(' downloadONC.py download -f ') print('\nDescription:') print(' Executes the download process. It reads data product request definitions') print(' from the input JSON file, runs them, downloads the files and saves the') print(' progress back to the JSON file. If the JSON configuration file does not') print(' exist, it will create one at the location defined in the Excel spreadsheet') print('\nRequired:') print(' -f, --file The Excel spreadsheet file name') print('\nOptions:') print(' -h, --help Show Help') print(' -q, --qa Use the QA Server. For Internal Use ONLY.') def usage(): print('\nUsage:') print(' downloadONC.py [options]') print('\nCommands:') print(' createXLS Creates an Excel spreadsheet from discovery service filters.') #print(' updateFromXLS Creates a JSON configuration file from an Excel spreadsheet.') print(' download Executes the download process.') print('\nOptions:') print(' -h, --help Show Help.') print(' -m, --downloadMethod The method for downloading data. Data can be') print(' downloaded by location or device. If excluded,') print(' the byLocation method will be used.') print(' -a, --append Append existing XLS spreadsheet.') print(' -f, --file XLS File name.') print(' -t, --token User authentication token.') print(' Obtain token from http://data.oceannetworks.ca/Profile.') print(' -o, --outPath Path of the output folder.') print(' -b, --begin Beginning date of request.') print(' -e, --end Ending date of request.') print(' -l, --locationCode Root location code filter.') print(' -c, --deviceCategoryCode Device category code filter.') print(' -d, --deviceCode Device code filter.') print(' -v, --propertyCode Property code filter.') print(' -p, --dataProductCode Data product code filter.') print(' -x, --extension Data product extension filter.') print(' -q, --qa Use the QA Server. For Internal Use ONLY.') if __name__ == '__main__': if len(sys.argv) >= 2: command = sys.argv[1] if (command in ['-h','--help']): usage() elif (command == 'createXLS'): x = createXLS(None,None) x.main(sys.argv[2:]) # elif (command == 'updateFromXLS'): # p = process() # p.main(sys.argv[2:]) elif (command == 'download'): d = download() d.main(sys.argv[2:]) else: usage() else: usage()