Source code for django_fabric.notifications

# -*- coding: utf8 -*-
import json
import re
import socket
import time

import requests
from fabric import colors


[docs]class Notifier(object): NICK = 'django-fabric' def message_payload(self): return {} def notification_message_context(self, instance): return { 'url': self.urls[instance], 'status': self.status_code } def pre_deploy_notify(self, instance): self.send_notification('Deploying %(url)s' % self.notification_message_context(instance)) def post_deploy_notify(self, instance): self.send_notification('Finished deploying %(url)s it responds with status %(status)s' % self.notification_message_context(instance)) def send_notification(self, message): raise NotImplemented
[docs]class IrcNotifyMixin(Notifier): SERVER = 'irc.freenode.org' PORT = 6667 ROOMS = [] TIMEOUT = 25 def post_deploy_notify(self, instance): pass def send_notification(self, message): irc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) irc.settimeout(self.TIMEOUT) irc.connect((self.SERVER, self.PORT)) irc.send("USER %s %s %s :django-fabric\n" % ((self.NICK,) * 3)) irc.send("NICK %s\n" % self.NICK) start = time.time() while (time.time() - start) < self.TIMEOUT: irc_message = irc.recv(2048).strip('\n\r') pong = re.compile(r'^PING\s*:\s*(.*)$').findall(irc_message) if pong: irc.send("PONG %s\n" % pong) if re.findall(' 00[1-4] %s' % self.NICK, irc_message): for room in self.ROOMS: irc.send("JOIN %s\n" % room) irc.send("PRIVMSG %s :%s\n" % (room, message)) irc.send("PART %s\n" % room) break irc.send("QUIT\n") irc.close()
[docs]class SlackNotifyMixin(Notifier): CHANNEL = '#general' def send_notification(self, message): payload = { 'channel': self.CHANNEL, 'username': self.NICK, 'text': message } data = {'payload': json.dumps(payload)} if requests.post(self.URL, data=data).status_code != requests.codes.ok: print(colors.yellow('Could not notify Slack'))
[docs]class HipChatNotifyMixin(Notifier): URL = 'https://api.hipchat.com/v2/room/%s/notification?auth_token=%s' COLOR = 'yellow' NOTIFY = False def send_notification(self, message): payload = { 'message': message, 'color': self.COLOR, 'notify': self.NOTIFY } url = self.URL % (self.ROOM, self.HIPCHAT_TOKEN) data = {'payload': json.dumps(payload)} if requests.post(url, data=data).status_code != requests.codes.ok: print(colors.yellow('Could not notify HipChat'))