diff --git a/cmd/check.go b/cmd/check.go index 63d222e..42e676b 100644 --- a/cmd/check.go +++ b/cmd/check.go @@ -22,14 +22,6 @@ var checkBoxCmd = &cobra.Command{ Args: BoxIdValidator, RunE: func(cmd *cobra.Command, args []string) error { cmd.SilenceUsage = true - _, err := CheckBoxes(args, defaultConf) - if err != nil { - return err - } - if shouldNotify { - // TODO - } - - return nil + return checkAndNotify(args, defaultConf) }, } diff --git a/cmd/jobs.go b/cmd/jobs.go deleted file mode 100644 index b9ca5e0..0000000 --- a/cmd/jobs.go +++ /dev/null @@ -1,65 +0,0 @@ -package cmd - -import ( - "../core" - log "github.com/sirupsen/logrus" -) - -func CheckBoxes(boxIds []string, defaultConf *core.NotifyConfig) ([]core.CheckResult, error) { - log.Debug("Checking notifications for ", len(boxIds), " box(es)") - - // TODO: return a map of Box: []Notification instead? - results := []core.CheckResult{} - for _, boxId := range boxIds { - r, err := checkBox(boxId, defaultConf) - if err != nil { - return nil, err - } - - if r != nil { - results = append(results, r...) - } - } - - return results, nil -} - -func checkBox(boxId string, defaultConf *core.NotifyConfig) ([]core.CheckResult, error) { - boxLogger := log.WithFields(log.Fields{"boxId": boxId}) - boxLogger.Info("checking box for due notifications") - - // get box data - box, err := core.Osem.GetBox(boxId) - if err != nil { - boxLogger.Error(err) - return nil, err - } - - // if box has no notify config, we use the defaultConf - if box.NotifyConf == nil { - box.NotifyConf = defaultConf - } - - // run checks - results, err2 := box.RunChecks() - if err2 != nil { - boxLogger.Error("could not run checks on box: ", err2) - return results, err2 - } - - for _, r := range results { - resultLog := boxLogger.WithFields(log.Fields{ - "status": r.Status, - "event": r.Event, - "value": r.Value, - "target": r.Target, - }) - if r.Status == core.CheckOk { - resultLog.Debug(r) - } else { - resultLog.Warn(r) - } - } - - return results, nil -} diff --git a/cmd/root.go b/cmd/root.go index 0f7b34f..e744edb 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -2,15 +2,34 @@ package cmd import ( "os" + "strings" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "github.com/spf13/viper" ) var rootCmd = &cobra.Command{ Use: "osem_notify", Long: "Run healthchecks and send notifications for boxes on opensensemap.org", - PersistentPreRun: func(cmd *cobra.Command, args []string) { + PersistentPreRunE: func(cmd *cobra.Command, args []string) error { + // set up config environment FIXME: cannot open / write file?! + viper.SetConfigType("json") + viper.SetConfigFile(".osem_notify") + viper.AddConfigPath("$HOME") + viper.AddConfigPath(".") + // // If a config file is found, read it in. + // if _, err := os.Stat(path.Join(os.Getenv("HOME"), ".osem_notify.yml")); err == nil { + // err := viper.ReadInConfig() + // if err != nil { + // fmt.Println("Error when reading config file:", err) + // } + // } + viper.SetEnvPrefix("osem_notify") + viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_", "-", "_")) + viper.AutomaticEnv() + + // set up logger log.SetOutput(os.Stdout) switch logLevel { case "debug": @@ -26,6 +45,8 @@ var rootCmd = &cobra.Command{ case "json": log.SetFormatter(&log.JSONFormatter{}) } + + return nil }, Run: func(cmd *cobra.Command, args []string) { cmd.Help() @@ -33,17 +54,15 @@ var rootCmd = &cobra.Command{ } var ( - shouldNotify bool - defaultConfig string - logLevel string - logFormat string + shouldNotify bool + logLevel string + logFormat string ) func init() { rootCmd.PersistentFlags().StringVarP(&logLevel, "log-level", "", "info", "log level, can be one of debug, info, warn, error") rootCmd.PersistentFlags().StringVarP(&logFormat, "log-format", "", "plain", "log format, can be plain or json") rootCmd.PersistentFlags().BoolVarP(&shouldNotify, "notify", "n", false, "if set, will send out notifications.\nOtherwise results are printed to stdout only") - rootCmd.PersistentFlags().StringVarP(&defaultConfig, "conf-default", "c", "", "default JSON config to use for event checking") } func Execute() { diff --git a/cmd/args.go b/cmd/shared.go similarity index 63% rename from cmd/args.go rename to cmd/shared.go index d50e14a..d2e3724 100644 --- a/cmd/args.go +++ b/cmd/shared.go @@ -4,10 +4,38 @@ import ( "fmt" "regexp" - "../core" "github.com/spf13/cobra" + + "../core" ) +/** + * shared functionality between watch and check + */ + +// TODO: actually to be read from arg / file + +var defaultConf = &core.NotifyConfig{ + Notifications: core.TransportConfig{ + Transport: "email", + Options: core.EmailNotifier{ + []string{"test@nroo.de"}, + "notify@nroo.de", + }, + }, + Events: []core.NotifyEvent{ + core.NotifyEvent{ + Type: "measurement_age", + Target: "all", + Threshold: "15m", + }, + core.NotifyEvent{ + Type: "measurement_suspicious", + Target: "all", + }, + }, +} + func isValidBoxId(boxId string) bool { // boxIds are UUIDs r := regexp.MustCompile("^[0-9a-fA-F]{24}$") @@ -26,17 +54,16 @@ func BoxIdValidator(cmd *cobra.Command, args []string) error { return nil } -// TODO: actually to be read from arg / file -var defaultConf = &core.NotifyConfig{ - Events: []core.NotifyEvent{ - core.NotifyEvent{ - Type: "measurement_age", - Target: "all", - Threshold: "15m", - }, - core.NotifyEvent{ - Type: "measurement_suspicious", - Target: "all", - }, - }, +func checkAndNotify(boxIds []string, defaultNotifyConf *core.NotifyConfig) error { + results, err := core.CheckBoxes(boxIds, defaultConf) + if err != nil { + return err + } + + results.Log() + + if shouldNotify { + return results.SendNotifications() + } + return nil } diff --git a/cmd/watch.go b/cmd/watch.go index d8b31fe..807eb1b 100644 --- a/cmd/watch.go +++ b/cmd/watch.go @@ -4,8 +4,6 @@ import ( "time" "github.com/spf13/cobra" - - "../core" ) var watchInterval int @@ -22,9 +20,6 @@ var watchCmd = &cobra.Command{ Aliases: []string{"serve"}, Short: "Watch boxes for events at an interval", Long: "Watch boxes for events at an interval", - PersistentPreRun: func(cmd *cobra.Command, args []string) { - ticker = time.NewTicker(time.Duration(watchInterval) * time.Second).C - }, } var watchBoxesCmd = &cobra.Command{ @@ -32,47 +27,21 @@ var watchBoxesCmd = &cobra.Command{ Short: "watch a list of box IDs for events", Long: "specify box IDs to watch them for events", Args: BoxIdValidator, + PreRun: func(cmd *cobra.Command, args []string) { + ticker = time.NewTicker(time.Duration(watchInterval) * time.Second).C + }, RunE: func(cmd *cobra.Command, args []string) error { cmd.SilenceUsage = true - exec := func() error { - results, err := CheckBoxes(args, defaultConf) - if err != nil { - return err - } - - results, err = filterFromCache(results) - if err != nil { - return err - } - - if shouldNotify { - // TODO - } - return nil - } - - err := exec() + err := checkAndNotify(args, defaultConf) if err != nil { return err } for { <-ticker - err = exec() + err = checkAndNotify(args, defaultConf) if err != nil { return err } } }, } - -func filterFromCache(results []core.CheckResult) ([]core.CheckResult, error) { - // get results from cache. they are indexed by ______ - - // filter, so that only changed result.Status remain - - // extract additional results with Status ERR from cache with time.Since(lastNotifyDate) > thresh - - // upate cache set lastNotifyDate to Now() - - return results, nil -} diff --git a/core/Box.go b/core/Box.go index 4aa3c8d..c8d96fe 100644 --- a/core/Box.go +++ b/core/Box.go @@ -8,14 +8,23 @@ import ( const ( CheckOk = "OK" - CheckErr = "ERROR" - eventMeasurementAge = "measurement_age" // errors if age of last measurement is higher than a duration - eventMeasurementValMin = "measurement_min" // errors if value of last measurement is lower than threshold - eventMeasurementValMax = "measurement_max" // errors if value of last measurement is higher than threshold - eventMeasurementValSuspicious = "measurement_suspicious" // checks value of last measurement against a blacklist of values - eventTargetAll = "all" // if event.Target is this value, all sensors will be checked + CheckErr = "FAILED" + eventMeasurementAge = "measurement_age" + eventMeasurementValMin = "measurement_min" + eventMeasurementValMax = "measurement_max" + eventMeasurementValSuspicious = "measurement_suspicious" + eventTargetAll = "all" // if event.Target is this value, all sensors will be checked ) +type checkType = struct{ description string } + +var checkTypes = map[string]checkType{ + eventMeasurementAge: checkType{"No measurement from %s since %s"}, + eventMeasurementValMin: checkType{"Sensor %s reads low value of %s"}, + eventMeasurementValMax: checkType{"Sensor %s reads high value of %s"}, + eventMeasurementValSuspicious: checkType{"Sensor %s reads presumably faulty value of %s"}, +} + type SuspiciousValue struct { sensor string val float64 @@ -28,29 +37,25 @@ var suspiciousVals = map[SuspiciousValue]bool{ SuspiciousValue{sensor: "SDS 011", val: 0.0}: true, } -type CheckResult struct { - Status string - Event string - Target string - Value string -} - -func (r CheckResult) String() string { - return fmt.Sprintf("check %s on sensor %s: %s with value %s\n", r.Event, r.Target, r.Status, r.Value) -} - type NotifyEvent struct { Type string `json:"type"` Target string `json:"target"` Threshold string `json:"threshold"` } +type TransportConfig struct { + Transport string `json:"transport"` + Options interface{} `json:"options"` +} + type NotifyConfig struct { - Events []NotifyEvent `json:"events"` + Notifications TransportConfig `json:"notifications"` + Events []NotifyEvent `json:"events"` } type Box struct { Id string `json:"_id"` + Name string `json:"name"` Sensors []struct { Id string `json:"_id"` Type string `json:"sensorType"` @@ -59,7 +64,7 @@ type Box struct { Date time.Time `json:"createdAt"` } `json:"lastMeasurement"` } `json:"sensors"` - NotifyConf *NotifyConfig `json:"notify"` + NotifyConf *NotifyConfig `json:"healthcheck"` } func (box Box) RunChecks() ([]CheckResult, error) { @@ -89,10 +94,11 @@ func (box Box) RunChecks() ([]CheckResult, error) { } results = append(results, CheckResult{ - Event: event.Type, - Target: s.Id, - Value: s.LastMeasurement.Date.String(), - Status: status, + Threshold: event.Threshold, + Event: event.Type, + Target: s.Id, + Value: s.LastMeasurement.Date.String(), + Status: status, }) case eventMeasurementValMin, eventMeasurementValMax: @@ -111,10 +117,11 @@ func (box Box) RunChecks() ([]CheckResult, error) { } results = append(results, CheckResult{ - Event: event.Type, - Target: s.Id, - Value: s.LastMeasurement.Value, - Status: status, + Threshold: event.Threshold, + Event: event.Type, + Target: s.Id, + Value: s.LastMeasurement.Value, + Status: status, }) case eventMeasurementValSuspicious: @@ -132,10 +139,11 @@ func (box Box) RunChecks() ([]CheckResult, error) { } results = append(results, CheckResult{ - Event: event.Type, - Target: s.Id, - Value: s.LastMeasurement.Value, - Status: status, + Threshold: event.Threshold, + Event: event.Type, + Target: s.Id, + Value: s.LastMeasurement.Value, + Status: status, }) } } @@ -144,3 +152,17 @@ func (box Box) RunChecks() ([]CheckResult, error) { // must return ALL events to enable Notifier to clear previous notifications return results, nil } + +func (box Box) GetNotifier() (AbstractNotifier, error) { + transport := box.NotifyConf.Notifications.Transport + if transport == "" { + return nil, fmt.Errorf("No notification transport provided") + } + + notifier := notifiers[transport] + if notifier == nil { + return nil, fmt.Errorf("%s is not a supported notification transport", transport) + } + + return notifier.New(box.NotifyConf.Notifications.Options) +} diff --git a/core/BoxCheckResults.go b/core/BoxCheckResults.go new file mode 100644 index 0000000..41642ae --- /dev/null +++ b/core/BoxCheckResults.go @@ -0,0 +1,182 @@ +package core + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + + log "github.com/sirupsen/logrus" + "github.com/spf13/viper" +) + +type CheckResult struct { + Status string + Event string + Target string + Value string + Threshold string +} + +func (r CheckResult) EventID() string { + s := fmt.Sprintf("%s%s%s", r.Event, r.Target, r.Threshold) + hasher := sha256.New() + hasher.Write([]byte(s)) + return hex.EncodeToString(hasher.Sum(nil)) +} + +func (r CheckResult) String() string { + if r.Status == CheckOk { + return fmt.Sprintf("%s %s (on sensor %s with value %s)\n", r.Event, r.Status, r.Target, r.Value) + } else { + return fmt.Sprintf("%s: "+checkTypes[r.Event].description+"\n", r.Status, r.Target, r.Value) + } +} + +type BoxCheckResults map[*Box][]CheckResult + +func (results BoxCheckResults) Size() int { + size := 0 + for _, boxResults := range results { + size += len(boxResults) + } + return size +} + +func (results BoxCheckResults) Log() { + for box, boxResults := range results { + boxLog := log.WithFields(log.Fields{ + "boxId": box.Id, + }) + countErr := 0 + for _, r := range boxResults { + resultLog := boxLog.WithFields(log.Fields{ + "status": r.Status, + "event": r.Event, + "value": r.Value, + "target": r.Target, + }) + if r.Status == CheckOk { + resultLog.Debugf("%s: %s", box.Name, r) + } else { + resultLog.Warnf("%s: %s", box.Name, r) + countErr++ + } + } + if countErr == 0 { + boxLog.Infof("%s: all is fine!", box.Name) + } + } +} + +func (results BoxCheckResults) SendNotifications() error { + results = results.FilterChangedFromCache(false) + + n := results.Size() + if n == 0 { + log.Info("No notifications due.") + return nil + } else { + log.Infof("Notifying for %v checks turned bad in total...", results.Size()) + } + + for box, resultsDue := range results { + if len(resultsDue) == 0 { + continue + } + + transport := box.NotifyConf.Notifications.Transport + notifyLog := log.WithFields(log.Fields{ + "boxId": box.Id, + "transport": transport, + }) + + notifier, err2 := box.GetNotifier() + if err2 != nil { + notifyLog.Error(err2) + return err2 + } + notification := notifier.ComposeNotification(box, resultsDue) + err3 := notifier.Submit(notification) + if err3 != nil { + notifyLog.Error(err3) + return err3 + } + notifyLog.Infof("Sent notification for %s via %s with %v new issues", box.Name, transport, len(resultsDue)) + } + + return nil +} + +func (results BoxCheckResults) FilterChangedFromCache(keepOk bool) BoxCheckResults { + remaining := BoxCheckResults{} + + for box, boxResults := range results { + // get results from cache. they are indexed by an event ID per boxId + // filter, so that only changed result.Status remain + remaining[box] = []CheckResult{} + for _, result := range boxResults { + cached := viper.GetStringMap(fmt.Sprintf("watchcache.%s.%s", box.Id, result.EventID())) + if result.Status != cached["laststatus"] { + if result.Status != CheckOk || keepOk { + remaining[box] = append(remaining[box], result) + } + } + } + + // TODO: reminder functionality: extract additional results with Status ERR + // from cache with time.Since(lastNotifyDate) > remindAfter. + // would require to serialize the full result.. + } + + // upate cache, setting lastNotifyDate to Now() + for box, boxResults := range results { + for _, result := range boxResults { + // FIXME: somehow this is not persisted? + key := fmt.Sprintf("watchcache.%s.%s", box.Id, result.EventID()) + viper.Set(key+".laststatus", result.Status) + } + } + + return remaining +} + +func CheckBoxes(boxIds []string, defaultConf *NotifyConfig) (BoxCheckResults, error) { + log.Debug("Checking notifications for ", len(boxIds), " box(es)") + + results := BoxCheckResults{} + for _, boxId := range boxIds { + box, res, err := checkBox(boxId, defaultConf) + if err != nil { + return nil, err + } + results[box] = res + } + + return results, nil +} + +func checkBox(boxId string, defaultConf *NotifyConfig) (*Box, []CheckResult, error) { + boxLogger := log.WithFields(log.Fields{"boxId": boxId}) + boxLogger.Info("checking box for events") + + // get box data + box, err := Osem.GetBox(boxId) + if err != nil { + boxLogger.Error(err) + return nil, nil, err + } + + // if box has no notify config, we use the defaultConf + if box.NotifyConf == nil { + box.NotifyConf = defaultConf + } + + // run checks + results, err2 := box.RunChecks() + if err2 != nil { + boxLogger.Error("could not run checks on box: ", err2) + return box, results, err2 + } + + return box, results, nil +} diff --git a/core/OsemClient.go b/core/OsemClient.go index b5425e2..c8bae5e 100644 --- a/core/OsemClient.go +++ b/core/OsemClient.go @@ -22,10 +22,10 @@ func NewOsemClient(client *http.Client) *OsemClient { } } -func (client *OsemClient) GetBox(boxId string) (Box, error) { - box := Box{} - fail := OsemError{} - client.sling.New().Path("boxes/").Path(boxId).Receive(&box, &fail) +func (client *OsemClient) GetBox(boxId string) (*Box, error) { + box := &Box{} + fail := &OsemError{} + client.sling.New().Path("boxes/").Path(boxId).Receive(box, fail) if fail.Message != "" { return box, errors.New("could not fetch box: " + fail.Message) } diff --git a/core/notifiers.go b/core/notifiers.go index 0186055..a0abe94 100644 --- a/core/notifiers.go +++ b/core/notifiers.go @@ -1,14 +1,81 @@ package core +import ( + "errors" + "fmt" + "net/smtp" + "strings" + "time" +) + +var notifiers = map[string]AbstractNotifier{ + "email": EmailNotifier{}, +} + type AbstractNotifier interface { - GetName() string - SetupTransport(config interface{}) error - AddNotifications(notifications []Notification) error - SendNotifications() error + New(config interface{}) (AbstractNotifier, error) + ComposeNotification(box *Box, checks []CheckResult) Notification + Submit(notification Notification) error } type Notification struct { + body string + subject string +} + +type EmailNotifier struct { + Recipients []string + FromAddress string +} + +func (n EmailNotifier) New(config interface{}) (AbstractNotifier, error) { + res, ok := config.(EmailNotifier) + + if !ok || res.Recipients == nil || res.FromAddress == "" { + return nil, errors.New("Invalid EmailNotifier options") + } + + return EmailNotifier{ + Recipients: res.Recipients, + FromAddress: res.FromAddress, + }, nil +} + +func (n EmailNotifier) ComposeNotification(box *Box, checks []CheckResult) Notification { + resultTexts := []string{} + for _, check := range checks { + resultTexts = append(resultTexts, check.String()) + } + + return Notification{ + subject: fmt.Sprintf("Issues with your box \"%s\" on opensensemap.org!", box.Name), + body: fmt.Sprintf("A check at %s identified the following issue(s) with your box %s:\n\n%s\n\nYou may visit https://opensensemap.org/explore/%s for more details.\n\n--\nSent automatically by osem_notify (https://github.com/noerw/osem_notify)", + time.Now().Round(time.Minute), box.Name, strings.Join(resultTexts, "\n"), box.Id), + } } -// TODO: multiple transports? one transport per event? (??) -type SlackConfig struct{} +func (n EmailNotifier) Submit(notification Notification) error { + // Set up authentication information. TODO: load from config + auth := smtp.PlainAuth( + "", + "USERNAME", + "PASSWORD", + "SERVER", + ) + + fromAddress := "EXAMPLE@EXAMPLE.COM" + from := fmt.Sprintf("openSenseMap Notifier <%s>", fromAddress) + body := fmt.Sprintf("From: %s\nSubject: %s\nContent-Type: text/plain; charset=\"utf-8\"\n\n%s", from, notification.subject, notification.body) + + // Connect to the server, authenticate, set the sender and recipient, + // and send the email all in one step. + err := smtp.SendMail( + "smtp.gmx.de:25", + auth, + fromAddress, + n.Recipients, + []byte(body), + ) + + return err +}