微信对接中的3个token

编程

1.小程序access_token

  • APP_ID: 小程序
  • APP_SECRET: 小程序

def get_wx_access_token(ttl_timeout=60, timeout=30):

# 1.从缓存中获取

access_token_ttl = ttl_key(WX_ACCESS_TOKEN)

if access_token_ttl and access_token_ttl > ttl_timeout:

access_token = get_access_token_wx()

logger.info("access_token:{} ttl:{} > {}s".format(

access_token, access_token_ttl, ttl_timeout))

return access_token, None

# 2.请求微信服务器获取最新的access_token

params = {

"grant_type": "client_credential",

"appid": settings.APP_ID,

"secret": settings.APP_SECRET,

}

url = settings.WX_API_TOKEN

logger.info("params:{}".format(params))

try:

resp = requests.get(params=params, url=url, timeout=timeout)

data = resp.json()

except Exception as e:

msg = "{}".format(e)

logger.error(msg, exc_info=True)

return None, msg

if "errcode" in data or "errmsg" in data:

logger.error("data:{}".format(data))

return None, data["errcode"]

access_token = data["access_token"]

expires_in = data["expires_in"]

set_access_token_wx(access_token, expires_in)

logger.info("set_access_token_wx ok:{}".format(access_token, expires_in))

return data["access_token"], None

2.订阅号access_token

# 订阅号使用

def _post(url=None, json_data=None):

access_token_subscribe = cm.get_access_token_wx_subscribe()

if not access_token_subscribe:

client = WeChatClient(settings.WECHAT_APPID, settings.WECHAT_SECRET)

access_token_subscribe = client.access_token

timestamp = time.time()

expires_at = client.expires_at - timestamp

logger.info(

"new access_token_subscribe:{} expires_at:{}".format(

access_token_subscribe, expires_at))

cm.set_access_token_wx_subscribe(

access_token_subscribe, expires_at)

else:

logger.info("cache access_token_subscribe:{}".format(

access_token_subscribe))

params = {

"access_token": access_token_subscribe,

}

logger.debug("url:{} params:{} json:{}".format(

url, params, json_data))

try:

resp = requests.post(url, params=params, json=json_data)

resp = resp.json()

except Exception as e:

logger.error("e:{}".format(e))

resp = None

return Response(make_response(data=resp, err_msg="{}".format(e)))

if "errcode" in resp or "errmsg" in resp:

return Response(make_response(data=resp))

logger.info("resp:{}".format(resp))

return Response(make_response(data=resp))

3.订阅号js ticket

class SignatureView(generics.GenericAPIView):

permission_classes = ()

authentication_classes = ()

serializer_class = serializers.SignatureSerializer

def post(self, request, *args, **kwargs):

"""

微信JSAPI签名

---

serializer: serializers.SignatureSerializer

"""

serializer = serializers.SignatureSerializer(data=request.data)

if serializer.is_valid():

url = serializer.validated_data["url"]

noncestr = "{0}{1}".format(datetime.datetime.now().strftime(

"%Y%m%d%H%M%S"), random.randint(1000, 10000))

timestamp = int(time.time())

# url = url.strip("#")

url = url.split("#")[0]

logger.debug(

"APPID:{} SECRET:{} url:{} noncestr:{} timestamp:{}".format(

settings.WECHAT_APPID, settings.WECHAT_SECRET, url,

noncestr, timestamp)

)

client = WeChatClient(settings.WECHAT_APPID,

settings.WECHAT_SECRET)

ticket = cm.get_ticket_wx()

if not ticket:

ticket = client.jsapi.get_ticket()

logger.info("new ticket:{}".format(ticket))

cm.set_ticket_wx(ticket, ticket["expires_in"])

else:

logger.info("cache ticket:{}".format(ticket))

signature = client.jsapi.get_jsapi_signature(

noncestr,

ticket["ticket"],

timestamp,

url

)

data = {

"noncestr": noncestr,

"timestamp": timestamp,

"url": url,

"signature": signature,

}

return Response(make_response(data), status=status.HTTP_200_OK)

return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

wechatpy 源码走读

wechatpy 订阅号access_token

1.获取access_token

from wechatpy.session.memorystorage import MemoryStorage

class BaseWeChatClient(object):

# 抽象方法

def fetch_access_token(self):

raise NotImplementedError()

@property

def access_token(self):

""" WeChat access token """

access_token = self.session.get(self.access_token_key) # session 基本使用MemoryStorage存储

if access_token:

if not self.expires_at:

# user provided access_token, just return it

return access_token

timestamp = time.time()

if self.expires_at - timestamp > 60:

return access_token # 未过期直接返回

self.fetch_access_token()

return self.session.get(self.access_token_key)

2.刷新

以上是 微信对接中的3个token 的全部内容, 来源链接: utcz.com/z/511483.html

回到顶部