mirror of
https://github.com/HSLdevcom/digitransit-ui
synced 2025-07-05 16:30:37 +02:00
172 lines
5.1 KiB
JavaScript
172 lines
5.1 KiB
JavaScript
import ceil from 'lodash/ceil';
|
|
import { DateTime } from 'luxon';
|
|
import { parseFeedMQTT } from './gtfsRtParser';
|
|
import { convertTo24HourFormat } from './timeUtils';
|
|
import { DATE_FORMAT_MQTT } from '../constants';
|
|
|
|
const standardModes = ['bus', 'tram', 'ferry'];
|
|
|
|
const getMode = mode => {
|
|
if (standardModes.includes(mode)) {
|
|
return mode;
|
|
}
|
|
if (mode === 'train') {
|
|
return 'rail';
|
|
}
|
|
if (mode === 'metro') {
|
|
return 'subway';
|
|
}
|
|
// bus mode should be used as fallback if mode is not one of the standard modes
|
|
return 'bus';
|
|
};
|
|
|
|
// getTopic
|
|
// Returns a MQTT topic to be subscribed to
|
|
// Input: options - route, direction, tripStartTime are used to generate the topic
|
|
function getTopic(options, settings) {
|
|
const route = options.route ? options.route : '+';
|
|
const direction = options.direction ? parseInt(options.direction, 10) : '+';
|
|
const geoHash = options.geoHash ? options.geoHash : ['+', '+', '+', '+'];
|
|
const tripId = options.tripId ? options.tripId : '+';
|
|
// headsigns with / cause problems
|
|
const headsign =
|
|
options.headsign && options.headsign.indexOf('/') === -1
|
|
? options.headsign
|
|
: '+';
|
|
const tripStartTime = options.tripStartTime
|
|
? convertTo24HourFormat(options.tripStartTime)
|
|
: '+';
|
|
const feedId = options.feedId || settings.feedId || '+';
|
|
const topic = settings.mqttTopicResolver(
|
|
route,
|
|
direction,
|
|
tripStartTime,
|
|
headsign,
|
|
feedId,
|
|
tripId,
|
|
geoHash,
|
|
);
|
|
return topic;
|
|
}
|
|
|
|
// parse HSL mqtt
|
|
export function parseMessage(topic, message, defaultFeedId) {
|
|
let parsedMessage;
|
|
const [
|
|
,
|
|
,
|
|
,
|
|
,
|
|
,
|
|
,
|
|
mode,
|
|
,
|
|
id,
|
|
line,
|
|
dir,
|
|
headsign, // eslint-disable-line no-unused-vars
|
|
startTime,
|
|
nextStop,
|
|
...rest // eslint-disable-line no-unused-vars
|
|
] = topic.split('/');
|
|
const vehid = `${defaultFeedId}_${id}`;
|
|
if (message instanceof Uint8Array) {
|
|
parsedMessage = JSON.parse(message).VP;
|
|
} else {
|
|
parsedMessage = message.VP;
|
|
}
|
|
if (
|
|
parsedMessage &&
|
|
parsedMessage.lat &&
|
|
parsedMessage.long &&
|
|
(parsedMessage.seq === undefined || parsedMessage.seq === 1) // seq is used for hsl metro carriage sequence
|
|
) {
|
|
// change times from 24 hour system to 29 hour system, and removes ':'
|
|
const tripStartTime =
|
|
startTime &&
|
|
startTime.length > 4 &&
|
|
parseInt(startTime.substring(0, 2), 10) < 5
|
|
? `${parseInt(startTime.substring(0, 2), 10) + 24}${startTime.substring(
|
|
3,
|
|
)}`
|
|
: startTime.replace(/:/g, '');
|
|
return {
|
|
id: vehid,
|
|
route: `${defaultFeedId}:${line}`,
|
|
direction: parseInt(dir, 10) - 1,
|
|
tripStartTime,
|
|
operatingDay:
|
|
parsedMessage.oday && parsedMessage.oday !== 'XXX'
|
|
? parsedMessage.oday
|
|
: DateTime.now().toFormat(DATE_FORMAT_MQTT),
|
|
mode: getMode(mode),
|
|
next_stop: `${defaultFeedId}:${nextStop}`,
|
|
timestamp: parsedMessage.tsi,
|
|
lat: ceil(parsedMessage.lat, 5),
|
|
long: ceil(parsedMessage.long, 5),
|
|
shortName: parsedMessage.desi,
|
|
heading: parsedMessage.hdg,
|
|
headsign: undefined, // in HSL data headsign from realtime data does not always match gtfs data
|
|
};
|
|
}
|
|
return undefined;
|
|
}
|
|
|
|
export function changeTopics(settings, actionContext) {
|
|
const { client, oldTopics } = settings;
|
|
|
|
if (Array.isArray(oldTopics) && oldTopics.length > 0) {
|
|
client.unsubscribe(oldTopics);
|
|
}
|
|
let topicsByRoute;
|
|
const topics = [];
|
|
settings.options.forEach(option => {
|
|
const topicString = getTopic(option, settings);
|
|
if (option.route) {
|
|
if (!topicsByRoute) {
|
|
topicsByRoute = {};
|
|
}
|
|
topicsByRoute[option.route] = topicString;
|
|
}
|
|
topics.push(topicString);
|
|
});
|
|
// set new topic to store
|
|
actionContext.dispatch('RealTimeClientNewTopics', { topics, topicsByRoute });
|
|
client.subscribe(topics);
|
|
}
|
|
|
|
export function startMqttClient(settings, actionContext) {
|
|
const options = settings.options || [{}];
|
|
const topics = options.map(option => getTopic(option, settings));
|
|
|
|
return import(/* webpackChunkName: "mqtt" */ 'mqtt').then(mqtt => {
|
|
if (settings.gtfsrt) {
|
|
return import(/* webpackChunkName: "gtfsrt" */ './gtfsrt').then(
|
|
bindings => {
|
|
const feedReader = bindings.FeedMessage.read;
|
|
const credentials =
|
|
settings.credentials !== undefined ? settings.credentials : {};
|
|
const client = mqtt.default.connect(settings.mqtt, credentials);
|
|
client.on('connect', () => client.subscribe(topics));
|
|
client.on('message', (topic, messages) => {
|
|
const parsedMessages = parseFeedMQTT(feedReader, messages, topic);
|
|
parsedMessages.forEach(message => {
|
|
actionContext.dispatch('RealTimeClientMessage', message);
|
|
});
|
|
});
|
|
|
|
return { client, topics };
|
|
},
|
|
);
|
|
}
|
|
const client = mqtt.default.connect(settings.mqtt);
|
|
client.on('connect', () => client.subscribe(topics));
|
|
client.on('message', (topic, message) =>
|
|
actionContext.dispatch(
|
|
'RealTimeClientMessage',
|
|
parseMessage(topic, message, settings.feedId),
|
|
),
|
|
);
|
|
return { client, topics };
|
|
});
|
|
}
|