Commit eaa4040b authored by renzo's avatar renzo

anp y shn nuevos + script de recuperacion de datos de anp y shn

parent 2ba88b2d
......@@ -2,26 +2,11 @@
import requests
import argparse
import os
import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.support.wait import WebDriverWait
from datetime import timedelta, datetime
import zipfile
def process_url(url_path_pair):
url, destination = url_path_pair
destination_folder = destination[:-len(destination.split("/")[-1])]
if destination_folder and not os.path.exists(destination_folder):
os.makedirs(destination_folder)
with requests.get(url, stream=True) as r:
r.raise_for_status()
with open(destination, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024 * 1024):
if chunk:
f.write(chunk)
import metadata
import utils
def unzip(dir_name):
......@@ -39,65 +24,46 @@ def unzip(dir_name):
os.remove(item)
# Se utiliza Selenium para simular la secuencia de pasos necesaria para descargar los archivos del sitio.
# Idealmente se consumiría un web service, pero esta opción no parece estar disponible
def download_anp(tmp_path):
destination_folder = tmp_path + "/" + datetime.strftime(datetime.now(), '%Y%m%d') + "/"
if destination_folder and not os.path.exists(destination_folder):
os.makedirs(destination_folder)
# Loguearse
options = Options()
options.headless = True
driver = webdriver.Firefox(options=options)
#driver = webdriver.PhantomJS()
driver.get("http://mareografo.anp.com.uy/")
wait = WebDriverWait(driver, 15)
driver.implicitly_wait(20)
driver.find_element(By.ID, "username").send_keys("imfia")
driver.find_element(By.ID, "passwd").send_keys("imfia")
time.sleep(5)
driver.find_element(By.CSS_SELECTOR, ".boton:nth-child(5)").click()
# Para cada una de las estaciones, se descargan todas las variables
estaciones = ["271", "272", "273"]
variables = ["Nivel de Agua (Med)", "Nivel de Agua (Max)", "Nivel de Agua (Min)"]
link = "http://mareografo.anp.com.uy/index.php?option=datos&estacion=ID&task=showfilter"
for estacion in estaciones:
cur_link = link.replace("ID", estacion)
driver.get(cur_link)
time.sleep(5)
# Se seleccionan las fechas
fecha_inicio = driver.find_element(By.ID, "showdia")
fecha_inicio.clear()
fecha_inicio.click()
start_date = datetime.strftime(datetime.now() - timedelta(4), '%d/%m/%Y')
fecha_inicio.send_keys(start_date)
fecha_fin = driver.find_element(By.ID, "showdiafin")
fecha_fin.clear()
fecha_fin.click()
end_date = datetime.strftime(datetime.now() - timedelta(1), '%d/%m/%Y')
fecha_fin.send_keys(end_date)
# Se seleccionan las variables
dropdown = driver.find_element(By.ID, "disponibles")
for variable in variables:
dropdown.find_element(By.XPATH, "//option[. = '" + variable + "']").click()
driver.find_element(By.ID, "add_ch").click()
driver.find_element(By.NAME, "accion").click()
download_button = driver.find_element(By.CSS_SELECTOR, "a.btn:nth-child(3)")
# Se realiza la descarga
process_url((download_button.get_attribute("href"), destination_folder + download_button.text))
unzip(destination_folder)
# str_date is a string containing a str_date formatted as YYYYmmdd
def download_anp(tmp_path, date):
# Creates destination directory if it doesn't exist
destination_dir = tmp_path + "/" + datetime.strftime(date, '%Y%m%d') + "/"
if destination_dir and not os.path.exists(destination_dir):
os.makedirs(destination_dir)
session = requests.Session()
# Login
data = {'username':'imfia','passwd':'imfia','option':'login','Submit':'Access'}
session.post("http://mareografo.anp.com.uy/index.php", data=data)
# Ask server to generate files
estacion_ids = ["271", "272", "273"]
estacion_names = ["Isla de Flores", "Muelle fluvial", "La Paloma"]
start_date = date - timedelta(4)
final_date = date - timedelta(1)
data = {"showdia": datetime.strftime(start_date, '%d/%m/%Y'),
"showdiafin": datetime.strftime(final_date, '%d/%m/%Y'),
"exportype": "2",
"zip": "1", "accion": "Exportar+datos", "disponibles": "4",
"chanex[]": ["2", "1", "3"]}
for i in range(0, 3):
params = {"option":"datos","task":"showfilter","estacion": estacion_ids[i]}
session.post("http://mareografo.anp.com.uy/index.php", data=data, params=params)
# Download and unzip files
url = "http://mareografo.anp.com.uy/cache/" + estacion_names[i] + "_" + datetime.strftime(start_date, '%d-%m-%Y') \
+ "_" + datetime.strftime(final_date + timedelta(1), '%d-%m-%Y') + ".csv.zip"
utils.process_url((url, destination_dir + "anp_download.zip"))
unzip(destination_dir)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('tmp_path', type=str,
help='La carpeta base donde se realizan las descargas del proyecto')
parser.add_argument('--date', type=metadata.date_type, default=datetime.now(),
help=metadata.date_help_msg)
args = parser.parse_args()
download_anp(args.tmp_path)
download_anp(args.tmp_path, args.date)
......@@ -20,7 +20,7 @@ def download_copernicus(tmp_path):
+ out_name + " --user rfing --pwd RodrigoCMEMS2018"
call = call.replace("DATE-MIN", (date.today() - timedelta(days=22)).strftime("%Y-%m-%d"))
call = call.replace("DATE-MAX", (date.today() + timedelta(days=9)).strftime("%Y-%m-%d"))
call = call.replace("DATE-MAX", (date.today() + timedelta(days=9)).strp("%Y-%m-%d"))
subprocess.run(call, shell=True, check=True, stdout=sys.stdout, stderr=sys.stderr)
# Se hace lo siguiente porque a veces motuclient retorna 0 cuando hubo error
if not os.path.isfile(download_dir + "/" + out_name):
......
......@@ -3,31 +3,30 @@ import multiprocessing
import argparse
import os
import datetime
import metadata
import utils
def download_hycom(tmp_path, parallel_downloads=5, day=""):
def download_hycom(tmp_path, date, parallel_downloads=5):
if day == "today" or not day:
day = datetime.date.today().strftime("%Y%m%d")
elif day == "yesterday":
day = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y%m%d")
destination_folder = tmp_path + "/" + day
if destination_folder and not os.path.exists(destination_folder):
os.makedirs(destination_folder)
formatted_date = date.strftime("%Y%m%d")
destination_dir = tmp_path + "/" + formatted_date
if destination_dir and not os.path.exists(destination_dir):
os.makedirs(destination_dir)
# Genero la cola de descargas con tuplas de la forma (url, destination_path)
download_queue = []
for ending in ["prog", "diag"]:
for i in range(0, 49, 1):
url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/rtofs/prod/rtofs." \
+ day + "/rtofs_glo_2ds_n" + str(i).zfill(3) + "_1hrly_" + ending + ".nc"
destination_file = destination_folder + "/" + url.split("/")[-1]
+ formatted_date + "/rtofs_glo_2ds_n" + str(i).zfill(3) + "_1hrly_" + ending + ".nc"
destination_file = destination_dir + "/" + url.split("/")[-1]
download_queue.append((url, destination_file))
for i in range(0, 73, 1):
url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/rtofs/prod/rtofs." \
+ day + "/rtofs_glo_2ds_f" + str(i).zfill(3) + "_1hrly_" + ending + ".nc"
destination_file = destination_folder + "/" + url.split("/")[-1]
+ formatted_date + "/rtofs_glo_2ds_f" + str(i).zfill(3) + "_1hrly_" + ending + ".nc"
destination_file = destination_dir + "/" + url.split("/")[-1]
download_queue.append((url, destination_file))
# Descargar utilizando multiples procesos
......@@ -41,14 +40,13 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('tmp_path', type=str,
help='La carpeta base donde se realizan las descargas del proyecto')
parser.add_argument('--day', type=str, choices=["today", "yesterday"])
parser.add_argument('--date', type=metadata.date_type, default=datetime.now(),
help=metadata.date_help_msg)
parser.add_argument('--parallel', type=int,
help='Ingrese la cantidad de descargas paralelas')
help='La cantidad de descargas paralelas')
args = parser.parse_args()
if args.parallel is not None:
download_hycom(args.tmp_path, day=args.day)
elif args.day is None:
download_hycom(args.tmp_path, args.parallel)
download_hycom(args.tmp_path, args.date)
else:
download_hycom(args.tmp_path, args.parallel, args.day)
\ No newline at end of file
download_hycom(args.tmp_path, args.date, args.parallel)
......@@ -2,15 +2,15 @@
import multiprocessing
import argparse
import os
from datetime import timedelta, date
from datetime import datetime
import metadata
import utils
def download_noaa(tmp_path, parallel_downloads=5, day="today"):
if day == "today":
date_formatted = date.today().strftime("%Y%m%d")
else:
date_formatted = (date.today() - timedelta(days=1)).strftime("%Y%m%d")
def download_noaa(tmp_path, date, parallel_downloads=5):
date_formatted = date.strftime("%Y%m%d")
destination_folder = tmp_path + "/" + date_formatted
if destination_folder and not os.path.exists(destination_folder):
......@@ -51,16 +51,14 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('tmp_path', type=str,
help='La carpeta base donde se realizan las descargas del proyecto')
parser.add_argument('--day', type=str, choices=["today", "yesterday"],
help='El día para el que se realizarán las descargas')
parser.add_argument('--date', type=metadata.date_type, default=datetime.now(),
help=metadata.date_help_msg)
parser.add_argument('--parallel', type=int,
help='La cantidad de descargas paralelas')
args = parser.parse_args()
if args.parallel is not None:
download_noaa(args.tmp_path, day=args.day)
elif args.day is None:
download_noaa(args.tmp_path, args.parallel)
download_noaa(args.tmp_path, args.date)
else:
download_noaa(args.tmp_path, args.parallel, args.day)
download_noaa(args.tmp_path, args.date, args.parallel)
# -*- coding: utf-8 -*
import pickle
import requests
import argparse
import os
import csv
from lxml.html import parse
from datetime import datetime
def sortable_time(time_str):
hour, minute = [int(t) for t in time_str.split(":")]
return hour, minute
def dump_data(path, data):
with open(path, 'wb') as handle:
pickle.dump(dict(data), handle, protocol=pickle.HIGHEST_PROTOCOL)
def read_data(path):
data = {}
if os.path.isfile(path):
with open(path, 'rb') as handle:
data = pickle.load(handle)
return data
# Devuelve un dict data con los valores de la tabla.
# Los valores son accedidos como data[date][time][location]
def parse_html(path, url="http://www.hidro.gob.ar/oceanografia/alturashorarias.asp"):
path_to_html = path + "/html.data"
r = requests.get(url)
with open(path_to_html, 'wb') as f:
f.write(r.content)
page = parse(path_to_html)
table = page.xpath("//*/text()[normalize-space(.)='Alturas Horarias de los mareógrafos']/../../..")[0]
header = [elem.text_content() for elem in table[1].findall("tr")[0]][2:]
dates = [x.split(" ")[1][:-2].strip() for x in header]
timestamps = [x.split(" ")[3][:-2].strip() for x in header]
locations = [row.findall("td")[1].text_content().split("(")[0].strip() for row in table[2].findall("tr")]
data = {}
rows = table[2].findall("tr")
for col_number, cur_date in enumerate(dates):
if cur_date not in data:
data[cur_date] = {}
cur_time = timestamps[col_number]
data[cur_date][cur_time] = {}
for j, location in enumerate(locations):
row_values = rows[j].findall("td")[2:]
data[cur_date][cur_time][location] = row_values[col_number].text_content().strip().replace(",", ".")
os.remove(path_to_html)
return data
# Hace el dump de la tabla de mareógrafos de hidro.gob.ar en un CSV.
# Para cada fecha distinta que encuentra en la tabla crea una carpeta si no existe.
# Dentro de la carpeta, hace un dump de la tabla en un CSV.
# Si la carpeta existe, se actualizan los valores del CSV,
# manteniendo las columnas que no son parte de la tabla descargada.
def download_shn(tmp_path, url="http://www.hidro.gob.ar/oceanografia/alturashorarias.asp"):
# Idealmente, se consumiría un web service. El mismo no está disponible
data = parse_html(tmp_path, url)
for cur_date in data:
reversed_date = "".join(cur_date.split("/")[::-1]).replace("/", "")
destination_folder = tmp_path + "/" + reversed_date
if not os.path.exists(destination_folder):
os.makedirs(destination_folder)
# Para cada timestamp, se actualiza la última modificación
metadata_path = destination_folder + "/timestamps.pickle"
last_update = read_data(metadata_path)
timestamps = data[cur_date].keys()
for timestamp in timestamps:
last_update[timestamp] = datetime.now().strftime("%d/%m/%Y (%H:%M)")
# Se actualiza tabla
data_path = destination_folder + "/data.pickle"
cur_data = read_data(data_path)
for time in data[cur_date]:
cur_data[time] = data[cur_date][time]
# Se guardan los datos en disco
dump_data(data_path, cur_data)
dump_data(metadata_path, last_update)
# Se crean listas representando a las columnas del CSV
new_timestamps = sorted(last_update.keys(), key=sortable_time)
columns = []
_, locations = list(data[cur_date].items())[0]
locations = locations.keys()
for time in new_timestamps:
new_column = [cur_data[time][location] for location in locations]
columns.append(new_column)
# Se convierten las columnas a filas
rows = list(zip(locations, *columns))
# Se escribe el CSV
with open(destination_folder + "/data.csv", 'w') as csvfile:
writer = csv.writer(csvfile, delimiter=';', dialect='excel')
writer.writerow([""] + [last_update[t] for t in new_timestamps])
writer.writerow([""] + [cur_date] * len(new_timestamps))
writer.writerow([""] + new_timestamps)
writer.writerows(rows)
import metadata
def download_shn(tmp_path, date):
destination_dir = tmp_path + "/" + datetime.strftime(date, '%Y%m%d') + "/"
if destination_dir and not os.path.exists(destination_dir):
os.makedirs(destination_dir)
# Download data from hidro API
ids = ["MDPL", "STER", "SCLE", "OYAR", "ATAL", "LPLA", "PNOR", "BSAS", "SFER"]
names = ['Mar del Plata', 'Santa Teresita', 'San Clemente', 'Oyarvide', 'Atalaya',
'La Plata', 'Pilote Norden', 'Buenos Aires', 'San Fernando']
rows = []
get_formatted_date = datetime.strftime(date, '%Y-%m-%d')
for i, id in enumerate(ids):
readings = []
data = requests.get(
"http://www.hidro.gob.ar/Api/v1/AlturasHorarias/ValoresGrafico/" + id + "/" + get_formatted_date + "T11:55").json()
readings += [x for x in data["lecturas"] if get_formatted_date in x["fecha"]]
data = requests.get(
"http://www.hidro.gob.ar/Api/v1/AlturasHorarias/ValoresGrafico/" + id + "/" + get_formatted_date + "T23:55").json()
readings += [x for x in data["lecturas"] if get_formatted_date in x["fecha"] and x not in readings]
rows.append([names[i]] + ['{0:.2f}'.format(x["altura"]) if x["altura"] else "F/S" for x in readings])
timestamps = [x["fecha"].split("T")[1][:5] for x in readings[:24]]
# Se escribe el CSV
with open(destination_dir + "/data.csv", 'w') as csvfile:
writer = csv.writer(csvfile, delimiter=';', dialect='excel')
writer.writerow([""] + len(timestamps) * [datetime.strftime(date, '%d/%m/%Y')])
writer.writerow([""] + len(timestamps) * [datetime.strftime(date, '%d/%m/%Y')])
writer.writerow([""] + timestamps)
writer.writerows(rows)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('tmp_path', type=str,
help='La carpeta base donde se realizan las descargas del proyecto')
parser.add_argument('--date', type=metadata.date_type, default=datetime.now(),
help=metadata.date_help_msg)
args = parser.parse_args()
download_shn(args.tmp_path)
download_shn(args.tmp_path, args.date)
......@@ -4,14 +4,14 @@ import os
import pickle
import argparse
import datetime
from datetime import datetime, date
import logging
import sys
import time
import mail
import metadata
import process.process as exec
from mail import mail
log_handle = 0
......@@ -24,7 +24,7 @@ def try_execute(max_tries, waiting_time, f, args):
return
except Exception as e:
time.sleep(waiting_time)
logging.exception(datetime.datetime.now())
logging.exception(datetime.now())
tries += 1
raise RuntimeError
......@@ -60,8 +60,8 @@ def stop_logging(source):
def disable_same_calls(args):
lock_name = "".join([str(args[arg]) for arg in args])
lock_name = lock_name.replace("/", "").replace(" ", "") + ".lock"
lock_name = "".join([str(args[arg]) for arg in args if arg != "date"])
lock_name = lock_name.replace("/", "").replace(" ", "") + datetime.strftime(args["date"], '%Y%m%d') + ".lock"
lock_path = metadata.tmp_path + "/" + lock_name
lock_handle = open(lock_path, "w")
fcntl.flock(lock_handle, fcntl.LOCK_EX)
......@@ -82,7 +82,7 @@ def log_time(source):
error_stamps = {}
else:
error_stamps = dict(pickle.load(handle_state))
error_stamps[source] = datetime.date.today()
error_stamps[source] = date.today()
pickle.dump(error_stamps, handle_state, protocol=pickle.HIGHEST_PROTOCOL)
fcntl.flock(handle_state, fcntl.LOCK_UN)
......@@ -94,36 +94,20 @@ def clean_log(source):
handle_state.seek(0)
error_stamps = dict(pickle.load(handle_state))
if source in error_stamps:
day_diff = (datetime.date.today() - error_stamps[source]).days
day_diff = (date.today() - error_stamps[source]).days
if day_diff >= metadata.logs_expiration:
f = open(metadata.logs_path + "/" + source + ".log", 'r+')
f.truncate(0)
fcntl.flock(handle_state, fcntl.LOCK_UN)
if __name__ == "__main__":
# Parse arguments
parser = argparse.ArgumentParser()
parser.add_argument("source", type=str, help="Fuente de datos", choices=metadata.sources)
parser.add_argument('--only_download', action='store_true')
parser.add_argument('--only_process', action='store_true')
for arg in metadata.args:
final_arg = "--" + arg
if "choices" in metadata.args[arg]:
parser.add_argument(final_arg, type=metadata.args[arg]["type"], help=metadata.args[arg]["help"],
choices=metadata.args[arg]["choices"])
else:
parser.add_argument(final_arg, type=metadata.args[arg]["type"], help=metadata.args[arg]["help"])
args = vars(parser.parse_args())
def download_and_process(args):
# Try to execute downloading and processing scripts
try:
lock_handle = disable_same_calls(args)
source = args["source"]
start_logging(source)
print("testMain", flush=True)
if not args["only_process"]:
downloader_path = os.path.abspath(inspect.getfile(getattr(metadata, source)["download"]))
source_config = getattr(metadata, source)
downloader_args = get_downloader_args(source_config["download"], args)
try_execute(source_config["max_tries"], source_config["wait"], source_config["download"], downloader_args)
......@@ -131,8 +115,24 @@ if __name__ == "__main__":
exec.process(source, args)
except Exception as e:
log_time(source)
logging.exception(str(datetime.datetime.now()) + "\n" + repr(e))
logging.exception(str(datetime.now()) + "\n" + repr(e))
stop_logging(source)
mail.send(source)
finally:
enable_same_calls(lock_handle)
def parse_arguments():
# Parse arguments
parser = argparse.ArgumentParser()
parser.add_argument("source", type=str, help="Fuente de datos", choices=metadata.sources)
parser.add_argument('--only_download', action='store_true')
parser.add_argument('--only_process', action='store_true')
for arg in metadata.args:
final_arg = "--" + arg
parser.add_argument(final_arg, **metadata.args[arg])
args = vars(parser.parse_args())
return args
if __name__ == "__main__":
args = parse_arguments()
download_and_process(args)
\ No newline at end of file
from datetime import datetime, timedelta
from pathlib import Path
from anp import download_anp
......@@ -6,9 +7,12 @@ from hycom import download_hycom
from noaa import download_noaa
from shn import download_shn
import argparse
# 1 - GENERAL CONFIGURATION
sources = ["shn", "anp", "hycom", "noaa", "copernicus"]
logs_expiration = 7 # Measured in days
parallel_downloads = 5
# 1.1 - MESSAGES
processing_error_msg = "El script de procesamiento ha fallado, chequear el archivo de log"
......@@ -20,19 +24,38 @@ matlab_scripts = root_path + "/process/matlab_scripts"
logs_path = root_path + "/logs"
tmp_path = root_path + "/tmp"
output_root_path = "/export/flyma/PronosticoRPFM/DATOS"
# Debug paths
# root_path = "/home/renzo/Desktop/automatizacion-de-bajadas"
# nctoolbox_path = "dw"
# matlab_scripts = "/home/renzo/Desktop/automatizacion-de-bajadas/process/matlab_scripts"
# logs_path = root_path + "/logs"
# tmp_path = root_path + "/tmp"
# output_root_path = root_path + "/output"
# 2 - MAIN SCRIPT ARGUMENTS
args = {}
# 2.1 CUSTOM TYPES USED IN ARGUMENTS
date_help_msg = "La fecha puede indicarse mediante los valores" \
"today, yesterday u otra fecha cualquiera en formato %Y%m%d"
def date_type(date_str):
if date_str == "today":
date = datetime.now()
elif date_str == "yesterday":
date = (datetime.today() - timedelta(days=1))
else:
try:
date = datetime.strptime(date_str, '%Y%m%d')
except:
raise argparse.ArgumentTypeError(date_help_msg)
return date
args["day"] = {}
args["day"]["help"] = "Día para la descarga"
args["day"]["type"] = str
args["day"]["choices"] = ["today", "yesterday"]
# 2.2 ARGUMENTS SPECIFICATION
args["parallel_downloads"] = {}
args["parallel_downloads"]["help"] = "Ingrese la cantidad de descargas paralelas"
args["parallel_downloads"]["type"] = int
args = {}
args["date"] = {}
args["date"]["help"] = date_help_msg
args["date"]["type"] = date_type
args["date"]["default"] = default=datetime.now()
# 3 - PER-SOURCE CONFIGURATION
shn = {"max_tries": 45, "wait": 60}
......
# import requests
#
# shots_url = ''
#
# # request the URL and parse the JSON
# response = requests.get(shots_url)
# response.raise_for_status() # raise exception if invalid response
# shots = response.json()
import os
import subprocess
import shutil
import re
from shutil import copy
import metadata
def get_signature(script_name):
script_path = metadata.matlab_scripts + "/" + script_name + ".m"
regex = r"function.*=.*" + script_name + ".*[(](.*)[)]"
with open(script_path, 'r') as f:
for line in f:
match = re.match(regex, line)
if match:
signature = match.group(1)
break
return [x.strip() for x in signature.split(",")]
def create_matlab_call(script_name, source, optional_args):
# Get Matlab script signature
signature = get_signature(script_name)
# Get Matlab call arguments that match signature
args = ""
for param in signature:
if param in optional_args and optional_args[param]:
args += " " + optional_args[param]
elif hasattr(metadata, source) and param in getattr(metadata, source):
args += " " + getattr(metadata, source)[param]
elif hasattr(metadata, param):
args += " " + getattr(metadata, param)
else:
raise RuntimeError("El script de Matlab " + script_name + " recibe los parámetros " + str(signature)
+ ", pero " + param + " no se ha proporcionado al llamar al script, ni se encuentra"
"en el archivo de configuración del proyecto")
return "matlab -nodisplay -nosplash -r \"try, " + script_name + args \
+ ", catch me, fprintf('%s / %s\\n',me.identifier,me.message), exit(1), end, exit(0)\""
def copy_mats(paths, dir, filenames, patterns):
found = False