Skip to content

Commit

Permalink
Merge branch 'peru-oefa' of github.com:openaq/openaq-fetch into peru-…
Browse files Browse the repository at this point in the history
…oefa
  • Loading branch information
caparker committed Feb 29, 2024
2 parents ba98dff + 20a6a4b commit e61c07b
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 12,233 deletions.
25 changes: 0 additions & 25 deletions src/adapters/DELETEME.js

This file was deleted.

158 changes: 89 additions & 69 deletions src/adapters/peru.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,34 @@
*/

import got from 'got';

import { DateTime } from 'luxon';
import log from '../lib/logger.js';

import {
FetchError,
AUTHENTICATION_ERROR,
DATA_PARSE_ERROR,
} from '../lib/errors.js';

export const name = 'peru';

const gotExtended = got.extend({
retry: { limit: 3 },
timeout: { request: 120000 },
});
// Available parameters for the pollutants we are interested in.
const pollutants = [
const pollutants = [ // all available parameters
'pm10',
'pm25',
'so2',
// 'h2s',
'co',
'no2',
// 'pbar',
// 'pp',
// 'temp',
// 'hr',
// 'ws',
// 'wd',
// 'rs',
];

export async function fetchData(source, cb) {
export async function fetchData (source, cb) {
try {
// because we have do not have the ability to query the api
// to see how many stations we have we will create them this way
Expand All @@ -32,95 +40,107 @@ export async function fetchData(source, cb) {
let stationIds = [...Array(n).keys()].map(i => i + 1);
log.debug(`Fetching data for station ids up to ${n}`);

// we should migrate to using from/to to be consistent with our other services
// once we make those changes this will act as a default
if(!source.from) {
source.from = DateTime.utc().toISODate();
}
if(!source.to) {
source.to = DateTime.utc().toISODate();
}

const postResponses = stationIds.map((id) =>
createRequests(id, source)
);
const postResponses = stationIds.map(id =>createRequest(id, source));

const results = await Promise.all(postResponses);

let allMeasurements = [];
log.debug('Processing results...');

results.forEach((result, index) => {
if (result !== null) {
log.debug(`Formatting data for station ID: ${stationIds[index]}`);
const measurements = formatData(result.lastDataObject);
results.forEach((result) => {
if (result) {
const measurements = formatData(result);
allMeasurements = allMeasurements.concat(measurements);
} else {
log.warn(`No data received for station ID: ${stationIds[index]}`);
}
});

log.debug('All measurements compiled.', allMeasurements.length);
log.debug('All measurements:', allMeasurements);
cb(null, { name: 'unused', measurements: allMeasurements });
} catch (error) {
log.error('Error in fetchData:', error.message);
cb(error);
}
}

function formatData(data) {
function formatData (data) {
const measurements = [];
const latitude = parseFloat(data.coordinates.latitude);
const longitude = parseFloat(data.coordinates.longitude);

const { coordinates, date } = data.lastDataObject;
const formattedDate = date.replace(' ', 'T').replace(' UTC', 'Z');
const dateLuxon = DateTime.fromISO(formattedDate);

pollutants.forEach((pollutant) => {
if (data[pollutant] !== null) {
const measurement = {
date: {
utc: DateTime.fromISO(data.date).toUTC().toFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"),
local: DateTime.fromISO(data.date).setZone('America/Lima').toFormat("yyyy-MM-dd'T'HH:mm:ssZZ"),
},
location: data.station,
city: data.province,
coordinates: { latitude, longitude },
parameter: pollutant,
value: parseFloat(data[pollutant]),
unit: 'µg/m³',
averagingPeriod: { unit: 'minutes', value: 5 },
attribution: [
{ name: 'OEFA', url: 'https://www.gob.pe/oefa' },
],
};
measurements.push(measurement);
if (data.lastDataObject.hasOwnProperty(pollutant)) {
const value = data.lastDataObject[pollutant];
if (value !== null) {
measurements.push({
date: {
utc: dateLuxon.toUTC().toISO({ suppressMilliseconds: true}),
local: dateLuxon.setZone('America/Lima').toISO({ suppressMilliseconds: true}),
},
location: data.lastDataObject.station,
city: data.lastDataObject.district,
coordinates: {
latitude: parseFloat(coordinates.latitude),
longitude: parseFloat(coordinates.longitude),
},
parameter: pollutant,
value: parseFloat(value),
unit: 'µg/m³',
averagingPeriod: { unit: 'minutes', value: 5 },
attribution: [{ name: 'OEFA', url: 'https://www.gob.pe/oefa' }],
});
}
}
});

return measurements;
}

async function createRequests(idStation, source) {
const body = {
user: source.user,
password: source.password,
startDate: source.from,
endDate: source.to,
idStation: idStation.toString()
};

try {
log.debug(`Sending request for station ID: ${idStation} (${source.from} - ${source.to})to ${source.url}`);
const response = await gotExtended.post(source.url, {
json: body,
responseType: 'json',
});
const data = response.body.data;
if (data && data.length > 0) {
log.debug(`Data received for station ID: ${idStation}`);
return { idStation, lastDataObject: data[data.length - 1] };
} else {
log.warn(`No data found for station ID: ${idStation}`);
return null;
}
} catch (error) {
log.error(`Error for station ID ${idStation}: ${error.response?.body || error.message}`);
return null;
}

async function createRequest(idStation, source) {
const body = {
user: source.user,
password: source.password,
startDate: source.from,
endDate: source.to,
idStation: idStation.toString()
};

try {
log.debug(`Sending request for station ID: ${idStation} (${source.from} - ${source.to}) to ${source.url}`);
const response = await got.post(source.url, {
json: body,
responseType: 'json',
});

//console.log(response)
// Check if response body 'status' is not "1"; ie user or password is incorrect
if (response.body.status !== "1") {
throw new FetchError(AUTHENTICATION_ERROR, source, response.body.message);
//throw new Error(`API Error for station ID ${idStation}: ${response.body.message || 'Unknown error'}`);
}

if (!response.body.data || response.body.data.length === 0) {
log.debug(`No data for station ID ${idStation}`);
return null;
} else {
return {
idStation,
lastDataObject: response.body.data[response.body.data.length - 1],
};
}
} catch (error) {
log.error(
`Request failed for station ID ${idStation}:`,
error.response ? error.response.body : error.message
);
throw error;
}
}
1 change: 1 addition & 0 deletions src/lib/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export const ADAPTER_NOT_FOUND = Symbol('Adapter not found');
export const ADAPTER_NAME_INVALID = Symbol('Adapter name invalid');
export const DATA_URL_ERROR = Symbol('Source data url error');
export const DATA_PARSE_ERROR = Symbol('Source data parsing error');
export const AUTHENTICATION_ERROR = Symbol('User could not be authenticated');

export const STREAM_END = Symbol('End stream');

Expand Down
Loading

0 comments on commit e61c07b

Please sign in to comment.