Skip to content

Commit

Permalink
Updated import and improved error handling for ARPALAZIO (#1106)
Browse files Browse the repository at this point in the history
* Update cheerio import and improved error handling in fetchData function

* Better error handling for fetchStream

---------

Co-authored-by: Gabriel Fosse <kraken+majesticio@users.noreply.github.com>
  • Loading branch information
majesticio and Gabriel Fosse authored Apr 11, 2024
1 parent 06388db commit 08d9354
Showing 1 changed file with 61 additions and 69 deletions.
130 changes: 61 additions & 69 deletions src/adapters/arpalazio.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
'use strict';

import { REQUEST_TIMEOUT } from '../lib/constants.js';
import {
FetchError,
DATA_URL_ERROR,
DATA_PARSE_ERROR,
} from '../lib/errors.js';
import { FetchError, DATA_PARSE_ERROR } from '../lib/errors.js';
import log from '../lib/logger.js';
import { acceptableParameters, convertUnits } from '../lib/utils.js';
import got from 'got';
import sj from 'scramjet';
import cheerio from 'cheerio';
import { load } from 'cheerio';
import difference from 'lodash/difference.js';
import { DateTime } from 'luxon';

Expand All @@ -35,62 +31,59 @@ const hourlyParameters = difference(

export const name = 'arpalazio';


export async function fetchData (source, cb) {
export async function fetchData(source, cb) {
try {
const stream = await fetchStream(source, cb);
const measurements = await stream.toArray();
cb(null, { name: stream.name, measurements });
} catch (e) {
cb({ message: `fetchData error: ${e.message}` });
log.error(`fetchData error: ${e.message}`);
cb(e);
}
}


async function fetchStream(source, cb) {

const body = await client({ url: source.url, responseType: 'text' });

let $;
async function fetchStream(source) {
try {
$ = cheerio.load(body);
} catch (e) {
throw new FetchError(DATA_PARSE_ERROR, source, e);
const body = await client({
url: source.url,
responseType: 'text',
});
let $ = load(body);
const provinces = $('#provincia option')
.filter((i, el) => Number($(el).attr('value')) >= 0)
.map((i, el) => ({
id: $(el).attr('value'),
name: $(el).text(),
}))
.get();

const out = new MultiStream();
for (const province of provinces) {
const provinceHourlyURL = `${baseUrl}${provinceQueryPath}?provincia=${province.id}&dati=${hourlyAvgParam}`;
const provinceDailyURL = `${baseUrl}${provinceQueryPath}?provincia=${province.id}&dati=${dailyAvgParam}`;

out.add(
await handleProvince(
province.name,
provinceHourlyURL,
hourlyAvgPeriod,
source
)
);
out.add(
await handleProvince(
province.name,
provinceDailyURL,
dailyAvgPeriod,
source
)
);
}

return out.mux();
} catch (error) {
throw new FetchError(DATA_PARSE_ERROR, source, error);
}

const provinces = $('#provincia option')
.filter(function (i, el) {
return Number($(this).attr('value')) >= 0;
})
.map(function (i, el) {
return { id: $(this).attr('value'), name: $(this).text() };
})
.get();

const out = new MultiStream();
provinces.forEach(async (province) => {
const provinceHourlyURL = `${baseUrl}${provinceQueryPath}?provincia=${province.id}&dati=${hourlyAvgParam}`;
const provinceDailyURL = `${baseUrl}${provinceQueryPath}?provincia=${province.id}&dati=${dailyAvgParam}`;

out.add(
await handleProvince(
province.name,
provinceHourlyURL,
hourlyAvgPeriod,
source
)
);
out.add(
await handleProvince(
province.name,
provinceDailyURL,
dailyAvgPeriod,
source
)
);
});

return out.mux();
}

const handleProvince = async function (
Expand All @@ -99,30 +92,29 @@ const handleProvince = async function (
averagingPeriod,
source
) {

const body = await client({ url, responseType: 'text' });

const $ = cheerio.load(body);
const pollutantURLs = $('a')
.map(function () {
const pollutant = $(this).text().toLowerCase().replace('.', '');
const currentParameters = getParameters(averagingPeriod);
if (currentParameters.indexOf(pollutant) >= 0) {
const href = $(this).attr('href');
return `${baseUrl}${href}`;
} else {
return null;
}
})
.get();
const $ = load(body);
const pollutantURLs = $('a')
.map(function () {
const pollutant = $(this).text().toLowerCase().replace('.', '');
const currentParameters = getParameters(averagingPeriod);
if (currentParameters.indexOf(pollutant) >= 0) {
const href = $(this).attr('href');
return `${baseUrl}${href}`;
} else {
return null;
}
})
.get();

const arrayOfPromises = pollutantURLs.map((dataUrl) =>
getStream(name, dataUrl, averagingPeriod, source, url)
);

return new MultiStream(
await Promise.all(arrayOfPromises).catch((err) => {
log.verbose(`Promise error ${err}`);
log.error(`Promise error ${err}`);
return arrayOfPromises;
})
).mux();
Expand Down Expand Up @@ -151,7 +143,7 @@ export const getStream = function (
/[\w]{2}_([\w.]{2,})_([\d]{4})(?:_gg)?.txt/
);
if (!match || match.length < 2) {
log.verbose(`Failed to match url ${url}`);
log.error(`Failed to match url ${url}`);
}
const parameter = match[1].toLowerCase().replace('.', '');
const year = match[2];
Expand All @@ -161,7 +153,7 @@ export const getStream = function (
const fewDaysAgo = +parseFloat(
DateTime.local().setZone(timezone).minus({ days: 4 }).ordinal
);
log.verbose(`Fetching data from ${url}`);
log.debug(`Fetching data from ${url}`);

const stations = {};
return StringStream.from(getter.stream(url))
Expand Down

0 comments on commit 08d9354

Please sign in to comment.