在使用Django開發(fā)web應(yīng)用的時候,很多場景都會有需要微信相關(guān)功能的介入,最近我們公司在使用python的Django框架配合國產(chǎn)數(shù)據(jù)庫OceanBase數(shù)據(jù)庫進(jìn)行開發(fā)互聯(lián)網(wǎng)應(yīng)急指揮系統(tǒng)的時候,就用到了微信通知,在發(fā)生輿情事件的時候通過微信公眾號,通知對應(yīng)人員有新的事件發(fā)生,或者提醒相關(guān)人員對應(yīng)事件的進(jìn)度情況
想要使用微信的信息推送,就需要提前做好一些準(zhǔn)備,因為發(fā)送微信模板消息,您需要使用微信公眾平臺 API。
需要獲取微信公眾平臺的 API 密鑰,并且配置好相應(yīng)的回調(diào) URL。
需要根據(jù)微信公眾平臺的要求,選擇合適的模板,并填寫好模板內(nèi)容。
根據(jù)微信公眾平臺的 API 文檔,構(gòu)造合適的請求參數(shù),發(fā)送請求。
對于推送消息的響應(yīng)結(jié)果,需要進(jìn)行適當(dāng)?shù)奶幚砗徒馕?,以便于判斷推送消息是否成功?/p>
廢話直接不多說,直接貼代碼
import copy import json import redis import requests from django.conf import settings from api.models import DvadminSystemUsers class WeChat: def __init__(self, app_id=settings.WX_APP_ID, secret=settings.WX_APP_SECRET, template=None): self.app_id = app_id self.secret = secret self.template = { "touser": "", "template_id": "", "url": "", "data": { "first": { "value": "", }, "keyword1": { "value": "", }, "keyword2": { "value": "", }, "remark": { "value": "", }, } } if template: self.template = template self.access_token = None self.req_list = list() self.user_list = list() self.data_list = list() def _get_token(self, force_update=False): key_name = 'wechat_token_{}'.format(self.app_id) if force_update: self.access_token = None else: self.access_token = get_data(key_name) if not self.access_token: url = "https://api.weixin.qq.com/cgi-bin/token?" payload = { 'grant_type': 'client_credential', 'appid': self.app_id, 'secret': self.secret, } response = requests.get(url, params=payload, timeout=50) access_token = response.json().get("access_token") if access_token: set_data(key_name, access_token, ex=7100) self.access_token = get_data(key_name) def make_data_list(self): user_openid_list = DvadminSystemUsers.objects.filter( id__in=self.user_list ).exclude( openId='' ).exclude( openId__isnull=True ).values_list('openid', flat=True) for openid in user_openid_list: user_template = copy.deepcopy(self.template) user_template['touser'] = openid self.data_list.append(user_template) def post_data(self): # 獲取 token self._get_token() url = "https://api.weixin.qq.com/cgi-bin/message/template/send?access_token={}".format(self.access_token) # 準(zhǔn)備數(shù)據(jù) if self.user_list: self.make_data_list() # 發(fā)送請求 for data in self.data_list: json_template = json.dumps(data) res = requests.post(url, data=json_template) print('post_data', res.text) def get_redis(): """ 連接redis """ redis = redis.Redis(host='localhost', port=6379, db=0) return redis def set_data(name, value, **kwargs): # 將數(shù)據(jù)存入redis緩存 redis = get_redis() value = json.dumps(value) redis.set(name, value, **kwargs) def get_data(name): # 取出數(shù)據(jù) redis = get_redis() value = redis.get(name) if value: value = json.loads(value) return value
調(diào)用微信推送類