import requests
import json
import os
def downloadDataProductIndex(self,
runId, # The ID of the run process to download the files for. RunIds are returned from the dataProductDelivery run method
url, # The full URL of the dataProductDelivery service
parameters, # The http request parameters for method, token, runId and index
indx=1, # The index of the file to be downloaded. Data files have an index of 1 or higher. The Metadata has an index of 'meta'
fileCount=1, # The actual or estimated file count, which is returned from the dataProductDelivery request method
maxRetries=100): # Determines the maximum number of times the process will poll the service before it times out.
# The purpose of this property is to prevent hung processes on the Task server to hang this process.
defaultSleepTime = 2
requestUrl = '{}?{}'.format(url,('&'.join(['{}={}'.format(i[0],i[1]) for i in parameters.items()])))
downloadResult = {"url":requestUrl}
tryCount = 0
lastMessage = None
if (estimatedProcessingTime > 1):
sleepTime = estimatedProcessingTime * 0.5
else:
sleepTime = defaultSleepTime
downloadResult['message'] = []
while True:
tryCount+=1
downloadResult['status'] = 'running'
if tryCount >= maxRetries:
print('Maximum number of retries ({}) exceeded'.format(maxRetries))
downloadResult['message'].append(msg)
break
with closing(requests.get(url,params=parameters,stream=True)) as streamResponse:
if (streamResponse.ok): #Indicates that the request was successful and did not fail. The status code indicates if the stream contains a file (200) or
if streamResponse.status_code == 200: #OK
tryCount=0
if 'Content-Disposition' in streamResponse.headers.keys():
content = streamResponse.headers['Content-Disposition']
filename = content.split('filename=')[1]
else:
print('Error: Invalid Header')
streamResponse.close()
break
if 'Content-Length' in streamResponse.headers.keys():
size = streamResponse.headers['Content-Length']
downloadResult['size'] = float(size)
else:
size = 0
filePath = '{}/{}'.format(self.outPath,filename)
downloadResult['file'] = filePath
downloadResult['index'] = indx
try:
if (indx==1):
print('')
if (not os.path.isfile(filePath)):
#Create the directory structure if it doesn't already exist
try:
os.makedirs(self.outPath)
except OSError as exc:
if exc.errno == errno.EEXIST and os.path.isdir(self.outPath):
pass
else:
raise
print (" Downloading {}/{} '{}' ({})".format(indx,fileCount,filename,util.convertSize(float(size))))
with open(filePath,'wb') as handle:
try:
for block in streamResponse.iter_content(1024):
handle.write(block)
except KeyboardInterrupt:
print('Process interupted: Deleting {}'.format(filePath))
handle.close()
streamResponse.close()
os.remove(filePath)
sys.exit(-1)
else:
if fileCount == 0:
print (" Skipping {} '{}': File Already Exists".format(indx,filename))
else:
print (" Skipping {}/{} '{}': File Already Exists".format(indx,fileCount,filename))
downloadResult['downloaded'] = True
downloadResult['status'] = 'complete'
except:
msg = 'Error streaming response.'
print(msg)
downloadResult['message'].append(msg)
downloadResult['status'] = 'error'
streamResponse.close()
break
elif streamResponse.status_code == 202: #Accepted - Result is not complete -> Retry
payload = json.loads(util.toString(streamResponse.content))
if len(payload) >= 1:
msg = payload['message']
if (msg != lastMessage): #display a new message if it has changed
util.printWithEnd('\n {}'.format(msg))
sys.stdout.flush()
downloadResult['message'].append(msg)
lastMessage=msg
tryCount=0
else: #Add a dot to the end of the message to indicate that it is still receiving the same message
if (not multiThreadMessages):
util.printWithEnd('.')
sys.stdout.flush()
else:
if (not multiThreadMessages):
print('Retrying...')
elif streamResponse.status_code == 204: #No Content - No Data found
if not(util.toString(streamResponse.content) == ''):
payload = json.loads(util.toString(streamResponse.content))
msg = ' {} [{}]'.format(payload['message'],streamResponse.status_code)
else:
msg = 'No Data found'
if multiThreadMessages:
with lock:
print('{}: {}'.format(runId,msg))
else:
print('\n{}'.format(msg))
streamResponse.close()
downloadResult['message'].append(msg)
downloadResult['status'] = 'complete'
break
else:
msg = 'HTTP Status: {}'.format(streamResponse.status_code)
if multiThreadMessages:
with lock:
print('{}: {}'.format(runId,msg))
else:
print(msg)
downloadResult['message'].append(msg)
elif streamResponse.status_code == 400: #Error occurred
if (self.showInfo): print(' HTTP Status: {}'.format(streamResponse.status_code))
payload = json.loads(util.toString(streamResponse.content))
if len(payload) >= 1:
if ('errors' in payload):
for e in payload['errors']:
msg = e['errorMessage']
util.printErrorMesasge(streamResponse,parameters)
elif ('message' in payload):
msg = ' {} [{}]'.format(payload['message'],streamResponse.status_code)
if (not multiThreadMessages):
print('\n{}'.format(msg))
else:
msg = 'Error occurred processing data product request'
if multiThreadMessages:
with lock:
print('{}: {}'.format(runId,msg))
else:
print(msg)
else:
msg = 'Error occurred processing data product request'
if multiThreadMessages:
with lock:
print('{}: {}'.format(runId,msg))
else:
print(msg)
streamResponse.close()
downloadResult['status'] = 'error'
downloadResult['message'].append(msg)
break
elif streamResponse.status_code == 404: #Not Found - Beyond End of Index - Index # > Results Count
streamResponse.close()
downloadResult = None
break
elif streamResponse.status_code == 410: #Gone - file does not exist on the FTP server. It may not have been transfered to the FTP server yet
payload = json.loads(util.toString(streamResponse.content))
if len(payload) >= 1:
msg = payload['message']
if (msg != lastMessage):
if multiThreadMessages:
with lock:
print('{}: Waiting... {}'.format(runId,msg))
else:
util.printWithEnd('\n Waiting... {}'.format(msg))
sys.stdout.flush()
lastMessage=msg
tryCount=0
else:
if (not multiThreadMessages):
util.printWithEnd('.','')
sys.stdout.flush()
else:
if multiThreadMessages:
with lock:
print('{}: Running... Writing File'.format(runId))
else:
print('\nRunning... Writing File.')
elif streamResponse.status_code == 500: #Internal Server Error occurred
msg = util.printErrorMesasge(streamResponse,parameters)
if (self.showInfo): print(' URL: {}'.format(streamResponse.url))
streamResponse.close()
downloadResult['status'] = 'error'
downloadResult['message'].append(msg)
break
else:
try:
payload = json.loads(util.toString(streamResponse.content))
if len(payload) >= 1:
if ('errors' in payload):
for e in payload['errors']:
msg = e['errorMessage']
util.printErrorMesasge(streamResponse,parameters)
elif ('message' in payload):
msg = payload['message']
if multiThreadMessages:
with lock:
print('{}: {} [{}]'.format(runId,msg,streamResponse.status_code))
else:
print('\n {} [{}]'.format(msg,streamResponse.status_code))
downloadResult['status'] = 'error'
downloadResult['message'].append(msg)
streamResponse.close()
break
except:
util.printErrorMesasge(streamResponse,parameters)
if multiThreadMessages:
with lock:
print('{}: {} Retrying...'.format(runId,msg))
else:
print('{} Retrying...'.format(msg))
streamResponse.close()
break
streamResponse.close()
if (tryCount <= 5) and (sleepTime > defaultSleepTime):
sleepTime = sleepTime * 0.5
time.sleep(sleepTime)
return downloadResult |