网络编程
位置:首页>> 网络编程>> Python编程>> Django实现支付宝付款和微信支付的示例代码

Django实现支付宝付款和微信支付的示例代码

作者:Fighting蔚  发布时间:2021-01-31 10:16:29 

标签:Django,支付宝支付,微信支付

支付宝支付和微信支付是当今互联网产品常用的功能,我使用Django Rest Framework实现了网页上支付宝支付和微信支付的一个通用服务,提供rpc接口给其他服务,包括获取支付宝支付页面url的rpc接口、支付宝支付成功异步回调http接口、获取微信支付二维码rpc接口、主动查询微信订单是否支付的rpc接口等。

支付宝网站支付需要蚂蚁金服开放平台账号,创建应用、配置秘钥等步骤请参考:蚂蚁金服支付宝电脑网站支付快速接入

微信网站支付需要到微信支付官网注册服务商账号,

目录结构如下:

Django实现支付宝付款和微信支付的示例代码

1、models.py


from django.db import models
from django.contrib.postgres.fields import ArrayField

# Create your models here.
class BaseModel(models.Model):
"""
 基础模型
"""
created_time = models.DateTimeField(auto_now_add=True, verbose_name="创建时间")
updated_time = models.DateTimeField(auto_now=True, verbose_name="修改时间")
created_by = models.IntegerField(verbose_name="创建人ID")
updated_by = models.IntegerField(verbose_name="修改人ID")
is_active = models.BooleanField(default=True, verbose_name='是否正常')

class Meta:
 abstract = True

class Alipay(BaseModel):
"""
 支付
"""

subject = models.CharField(max_length=256, verbose_name="订单标题")
out_trade_no = models.CharField(max_length=64, unique=True, verbose_name="唯一订单号")
trade_no = models.CharField(default="", max_length=64, verbose_name="支付宝系统中的交易流水号")
total_amount = models.DecimalField(max_digits=11, decimal_places=2, verbose_name="订单的资金总额")
return_url = models.CharField(max_length=500, verbose_name="支付完成同步跳转地址")
notify_url = models.CharField(max_length=500, verbose_name="支付完成异步通知rpc地址")
pay_time = models.DateTimeField(null=True, blank=True, verbose_name="支付时间")
pay_nos = ArrayField(models.CharField(max_length=100), default=[], verbose_name='同一订单的支付ID数组')

class Meta:
 verbose_name = "阿里支付"
 verbose_name_plural = verbose_name
 ordering = ('-created_time',)

class Wxorder(BaseModel):
"""
 订单
"""
body = models.CharField(max_length=256, verbose_name="商品描述")
out_trade_no = models.CharField(max_length=64, unique=True, verbose_name="订单号")
transaction_id = models.CharField(default="", max_length=64, verbose_name="微信支付订单号")
total_fee = models.BigIntegerField(verbose_name="订单的资金总额,单位为分")
product_id = models.CharField(max_length=16, verbose_name="商品ID")
notify_url = models.CharField(max_length=500, verbose_name="支付完成通知url")
pay_time = models.DateTimeField(null=True, blank=True, verbose_name="支付时间")

class Meta:
 verbose_name = "微信订单"
 verbose_name_plural = verbose_name
 ordering = ('-created_time',)

class Wxpay(BaseModel):
"""
 微信支付
"""
out_trade_no = models.CharField(null=True, blank=True, max_length=64, verbose_name="订单号")
pay_no = models.CharField(null=True, blank=True, max_length=64, unique=True, verbose_name="支付唯一订单号")
code_url = models.CharField(null=True, blank=True, max_length=100, verbose_name="二维码地址")
nonce_str = models.CharField(null=True, blank=True, max_length=32, verbose_name="随机字符串")

class Meta:
 verbose_name = "微信支付"
 verbose_name_plural = verbose_name
 ordering = ('-created_time',)

2、serializers.py:


from django.conf import settings
from rest_framework import serializers
from pay.models import Alipay, Wxpay

class BaseSerializer(serializers.ModelSerializer):
created_time = serializers.DateTimeField(format=settings.DATETIME_FORMAT, read_only=True)
updated_time = serializers.DateTimeField(format=settings.DATETIME_FORMAT, read_only=True)
is_active = serializers.BooleanField(read_only=True)

class Meta:
 model = None

class AlipaySerializer(BaseSerializer):
"""
 阿里支付序列化类
"""

class Meta:
 model = Alipay
 fields = "__all__"

class WxpaySerializer(BaseSerializer):
"""
 阿里支付序列化类
"""

class Meta:
 model = Wxpay
 fields = "__all__"

3、views.py:


# -*- coding=utf-8 -*-
# Create your views here.
import time
import dicttoxml
from jsonrpc import jsonrpc_method
from rest_framework.decorators import list_route
from rest_framework.response import Response
from rest_framework.viewsets import ModelViewSet
from rest_framework.views import APIView
from rest_framework_xml.parsers import XMLParser
from rest_framework_xml.renderers import XMLRenderer
from tokenauth.decorators import is_login
from pay import utils
from pay.weixin_pay import WeiXinPay, UnifiedOrderPay, OrderQuery
from pay.UUIDTools import UUIDTools
from pay.models import Alipay, Wxpay, Wxorder
from pay.serializers import AlipaySerializer, WxpaySerializer
from pay.utils import UnActiveModelMixin
from pay.alipay import AliPay
from PAY_SERVICE.settings.base import APPID, PRIVATE_KEY_PATH, \
ALI_PUB_KEY_PATH, ALIPAY_CALLBACK_URL, \
WXAPPID, WX_PAY_KEY, WX_MCH_ID, WXPAY_CALLBACK_URL

NOTIFY_URL = ALIPAY_CALLBACK_URL + 'api/v1.0/pay/alipay/notify/'

class AlipayViewSet(ModelViewSet):
queryset = Alipay.objects.filter(is_active=True)
serializer_class = AlipaySerializer

@list_route(methods=['post'])
def notify(self, request):
 """
  处理支付宝的notify_url
 :param request:
 :return:
 """
 processed_dict = {}
 for k, v in request.data.items():
  processed_dict[k] = v
 app_id = processed_dict.get('app_id')
 pay_no = processed_dict.get('out_trade_no')
 trade_no = processed_dict.get('trade_no')
 total_amount = processed_dict.get('total_amount')
 pay_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())

alipay = Alipay.objects.filter(pay_nos__contains=[pay_no]).values().first()
 if alipay is None:
  return Response("failed")
 if str(alipay.get('total_amount')) != str(total_amount):
  return Response("failed")
 if app_id != APPID:
  return Response("failed")
 if alipay.get('trade_no') != "":
  return Response("failed")

sign = processed_dict.pop('sign', None)

ali_pay = AliPay(
  appid=APPID,
  app_notify_url=NOTIFY_URL,
  app_private_key_path=PRIVATE_KEY_PATH,
  alipay_public_key_path=ALI_PUB_KEY_PATH,
  debug=True, # 默认False,
  return_url=alipay.get('return_url')
 )

is_verify = ali_pay.verify(processed_dict, sign)

if is_verify is True:
  Alipay.objects.filter(pk=alipay.get('id')).update(pay_time=pay_time, trade_no=trade_no)
  ret = utils.request_thrift('TradingManager', 'notify',
   settings.TRADING_RPC_IP, int(settings.TRADING_RPC_PORT),
   alipay.get('out_trade_no'), str(pay_time))
  if ret == "success":
   return Response("success")

class WxpayViewSet(ModelViewSet):
queryset = Wxpay.objects.filter(is_active=True)
serializer_class = WxpaySerializer
parser_classes = (XMLParser,)
renderer_classes = (XMLRenderer,)

@jsonrpc_method('pay.get_alipay_url')
def get_alipay_url(request, subject, out_trade_no, total_amount, return_url, notify_url, user_id):
recode = Alipay.objects.filter(out_trade_no=out_trade_no).values().first()
if recode is not None:
 pay_no = UUIDTools.datetime_random()
 alipay = Alipay.objects.get(pk=recode.get('id'))
 alipay.pay_nos.append(pay_no)
 alipay.save()
else:
 pay_no = out_trade_no
 Alipay.objects.create(subject=subject,
       out_trade_no=out_trade_no,
       total_amount=total_amount,
       return_url=return_url,
       notify_url=notify_url,
       pay_nos=[pay_no],
       created_by=user_id,
       updated_by=user_id
       )

ali_pay = AliPay(
 appid=APPID,
 app_notify_url=NOTIFY_URL,
 app_private_key_path=PRIVATE_KEY_PATH,
 alipay_public_key_path=ALI_PUB_KEY_PATH,
 debug=True, # 默认False,
 return_url=return_url
)

total_amount = "%.2f" % float(total_amount)
url = ali_pay.direct_pay(
 subject=subject,
 out_trade_no=pay_no,
 total_amount=total_amount
)
# 沙箱环境网关
# alipay_url = "https://openapi.alipaydev.com/gateway.do?{data}".format(data=url)
# 正式环境网关
alipay_url = "https://openapi.alipay.com/gateway.do?{data}".format(data=url)
return alipay_url

@jsonrpc_method('pay.get_wxpay_url')
def get_wxpay_url(request, out_trade_no, body, total_fee, notify_url, product_id, user_id):
recode = Wxorder.objects.filter(out_trade_no=out_trade_no).values().first()
if recode is None:
 Wxorder.objects.create(
  out_trade_no=out_trade_no,
  body=body,
  total_fee=total_fee,
  notify_url=notify_url,
  product_id=product_id,
  created_by=user_id,
  updated_by=user_id
 )

pay_no = UUIDTools.datetime_random()
pay = UnifiedOrderPay(WXAPPID, WX_MCH_ID, WX_PAY_KEY)
response = pay.post(body, pay_no, total_fee,
     WXPAY_CALLBACK_URL.split('://')[1].split(':')[0], WX_NOTIFY_URL)
if response and response["return_code"] == "SUCCESS" and response["result_code"] == "SUCCESS":
 wxorder = Wxorder.objects.filter(out_trade_no=out_trade_no).values().first()
 Wxpay.objects.create(
  out_trade_no=out_trade_no,
  pay_no=pay_no,
  code_url=response.get('code_url'),
  nonce_str=response.get('nonce_str'),
  created_by=user_id,
  updated_by=user_id
 )
 return response.get('code_url')

@jsonrpc_method('pay.wx_order_query')
def wx_order_query(request, out_trade_no):
wxpays = Wxpay.objects.filter(out_trade_no=out_trade_no).values()
pay = OrderQuery(WXAPPID, WX_MCH_ID, WX_PAY_KEY)
for wxpay in wxpays:
 response = pay.post(wxpay.get('pay_no'))
 if response and response["return_code"] == "SUCCESS" \
   and response["result_code"] == "SUCCESS":
  trade_state = response["trade_state"]
  if trade_state == "SUCCESS": # 支付成功
   pay_time = response["time_end"]
   transaction_id = response["transaction_id"]
   Wxorder.objects.filter(out_trade_no=out_trade_no).update(
    pay_time=time.strftime("%Y-%m-%d %H:%M:%S",
          time.strptime(pay_time, "%Y%m%d%H%M%S")),
    transaction_id=transaction_id
   )
   return {"success": True, "pay_time": pay_time}
return {"success": False}

4、alipay.py:


# -*- coding: utf-8 -*-
# pip install pycryptodome

from datetime import datetime
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
from Crypto.Hash import SHA256
from base64 import b64encode, b64decode
from urllib.parse import quote_plus
from urllib.parse import urlparse, parse_qs
from urllib.request import urlopen
from base64 import decodebytes, encodebytes

import json

class AliPay(object):
"""
支付宝支付接口
"""

def __init__(self, appid, app_notify_url, app_private_key_path,
    alipay_public_key_path, return_url, debug=False):
 self.appid = appid
 self.app_notify_url = app_notify_url
 self.app_private_key_path = app_private_key_path
 self.app_private_key = None
 self.return_url = return_url
 with open(self.app_private_key_path) as fp:
  self.app_private_key = RSA.importKey(fp.read())

self.alipay_public_key_path = alipay_public_key_path
 with open(self.alipay_public_key_path) as fp:
  self.alipay_public_key = RSA.import_key(fp.read())

if debug is True:
  self.__gateway = "https://openapi.alipaydev.com/gateway.do"
 else:
  self.__gateway = "https://openapi.alipay.com/gateway.do"

def direct_pay(self, subject, out_trade_no, total_amount, return_url=None, **kwargs):
 biz_content = {
  "subject": subject,
  "out_trade_no": out_trade_no,
  "total_amount": total_amount,
  "product_code": "FAST_INSTANT_TRADE_PAY",
  # "qr_pay_mode":4
 }

biz_content.update(kwargs)
 data = self.build_body("alipay.trade.page.pay", biz_content, self.return_url)
 return self.sign_data(data)

def build_body(self, method, biz_content, return_url=None):
 data = {
  "app_id": self.appid,
  "method": method,
  "charset": "utf-8",
  "sign_type": "RSA2",
  "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
  "version": "1.0",
  "biz_content": biz_content
 }

if return_url is not None:
  data["notify_url"] = self.app_notify_url
  data["return_url"] = self.return_url

return data

def sign_data(self, data):
 data.pop("sign", None)
 # 排序后的字符串
 unsigned_items = self.ordered_data(data)
 unsigned_string = "&".join("{0}={1}".format(k, v) for k, v in unsigned_items)
 sign = self.sign(unsigned_string.encode("utf-8"))
 ordered_items = self.ordered_data(data)
 quoted_string = "&".join("{0}={1}".format(k, quote_plus(v)) for k, v in ordered_items)

# 获得最终的订单信息字符串
 signed_string = quoted_string + "&sign=" + quote_plus(sign)
 return signed_string

def ordered_data(self, data):
 complex_keys = []
 for key, value in data.items():
  if isinstance(value, dict):
   complex_keys.append(key)

# 将字典类型的数据dump出来
 for key in complex_keys:
  data[key] = json.dumps(data[key], separators=(',', ':'))

return sorted([(k, v) for k, v in data.items()])

def sign(self, unsigned_string):
 # 开始计算签名
 key = self.app_private_key
 signer = PKCS1_v1_5.new(key)
 signature = signer.sign(SHA256.new(unsigned_string))
 # base64 编码,转换为unicode表示并移除回车
 sign = encodebytes(signature).decode("utf8").replace("\n", "")
 return sign

def _verify(self, raw_content, signature):
 # 开始计算签名
 key = self.alipay_public_key
 signer = PKCS1_v1_5.new(key)
 digest = SHA256.new()
 digest.update(raw_content.encode("utf8"))
 if signer.verify(digest, decodebytes(signature.encode("utf8"))):
  return True
 return False

def verify(self, data, signature):
 if "sign_type" in data:
  sign_type = data.pop("sign_type")
 # 排序后的字符串
 unsigned_items = self.ordered_data(data)
 message = "&".join(u"{}={}".format(k, v) for k, v in unsigned_items)
 return self._verify(message, signature)

if __name__ == "__main__":
alipay = AliPay(
 appid="2016081500252338",
 app_notify_url="http://projectsedus.com/",
 app_private_key_path="keys/private_2048.txt",
 alipay_public_key_path="keys/alipay_key_2048.txt", # 支付宝的公钥,验证支付宝回传消息使用,不是你自己的公钥,
 debug=True, # 默认False,
 return_url="http://192.168.247.129:8000/"
)

url = alipay.direct_pay(
 subject="测试订单",
 out_trade_no="20170202126666",
 total_amount=1000
)
re_url = "https://openapi.alipaydev.com/gateway.do?{data}".format(data=url)
print(re_url)

5、wxpay.py:


# -*- coding=utf-8 -*-

import time
import json
import hashlib
import requests

from pay.utils import (smart_str, dict_to_xml, calculate_sign, random_str,
     post_xml, xml_to_dict, validate_post_xml, format_url)

# from local_settings import appid, mch_id, api_key

OAUTH2_AUTHORIZE_URL = "https://open.weixin.qq.com/connect/oauth2/authorize?%s"
OAUTH2_ACCESS_TOKEN_URL = "https://api.weixin.qq.com/sns/oauth2/access_token?%s"

class WeiXinPay(object):
def __init__(self, appid, mch_id, api_key):
 self.appid = appid # 微信公众号身份的唯一标识。审核通过后,在微信发送的邮件中查看
 self.mch_id = mch_id # 受理商ID,身份标识
 self.api_key = api_key # 商户支付密钥Key。审核通过后,在微信发送的邮件中查看
 self.common_params = {
  "appid": self.appid,
  "mch_id": self.mch_id,
 }
 self.params = {}
 self.url = ""
 self.trade_type = ""

def set_params(self, **kwargs):
 self.params = {}
 for (k, v) in kwargs.items():
  self.params[k] = smart_str(v)

self.params["nonce_str"] = random_str(32)
 if self.trade_type:
  self.params["trade_type"] = self.trade_type
 self.params.update(self.common_params)

def post_xml(self):
 sign = calculate_sign(self.params, self.api_key)
 xml = dict_to_xml(self.params, sign)
 response = post_xml(self.url, xml)
 return xml_to_dict(response.text)

def valiate_xml(self, xml):
 return validate_post_xml(xml, self.appid, self.mch_id, self.api_key)

def get_error_code_desc(self, error_code):
 error_desc = {
  "SYSTEMERROR": u"接口后台错误",
  "INVALID_TRANSACTIONID": u"无效 transaction_id",
  "PARAM_ERROR": u"提交参数错误",
  "ORDERPAID": u"订单已支付",
  "OUT_TRADE_NO_USED": u"商户订单号重复",
  "NOAUTH": u"商户无权限",
  "NOTENOUGH": u"余额丌足",
  "NOTSUPORTCARD": u"不支持卡类型",
  "ORDERCLOSED": u"订单已关闭",
  "BANKERROR": u"银行系统异常",
  "REFUND_FEE_INVALID": u"退款金额大亍支付金额",
  "ORDERNOTEXIST": u"订单不存在",
 }
 return error_desc.get(error_code.strip().upper(), u"未知错误")

class UnifiedOrderPay(WeiXinPay):
"""发送预支付单"""

def __init__(self, appid, mch_id, api_key):
 super(UnifiedOrderPay, self).__init__(appid, mch_id, api_key)
 self.url = "https://api.mch.weixin.qq.com/pay/unifiedorder"
 self.trade_type = "NATIVE"

def post(self, body, out_trade_no, total_fee, spbill_create_ip, notify_url, **kwargs):
 tmp_kwargs = {
  "body": body,
  "out_trade_no": out_trade_no,
  "total_fee": total_fee,
  "spbill_create_ip": spbill_create_ip,
  "notify_url": notify_url,
 }
 tmp_kwargs.update(**kwargs)
 self.set_params(**tmp_kwargs)
 return self.post_xml()[1]

class OrderQuery(WeiXinPay):
"""订单状态查询"""

def __init__(self, appid, mch_id, api_key):
 super(OrderQuery, self).__init__(appid, mch_id, api_key)
 self.url = "https://api.mch.weixin.qq.com/pay/orderquery"

def post(self, out_trade_no):
 self.set_params(out_trade_no=out_trade_no)
 return self.post_xml()[1]

class JsAPIOrderPay(UnifiedOrderPay):
"""H5页面的Js调用类"""

def __init__(self, appid, mch_id, api_key, app_secret):
 super(JsAPIOrderPay, self).__init__(appid, mch_id, api_key)
 self.app_secret = app_secret
 self.trade_type = "JSAPI"

def create_oauth_url_for_code(self, redirect_uri):
 url_params = {
  "appid": self.appid,
  "redirect_uri": redirect_uri, # 一般是回调当前页面
  "response_type": "code",
  "scope": "snsapi_base",
  "state": "STATE#wechat_redirect"
 }
 url = format_url(url_params)
 return OAUTH2_AUTHORIZE_URL % url

def _create_oauth_url_for_openid(self, code):
 url_params = {
  "appid": self.appid,
  "secret": self.app_secret,
  "code": code,
  "grant_type": "authorization_code",
 }
 url = format_url(url_params)
 return OAUTH2_ACCESS_TOKEN_URL % url

def _get_oauth_info(self, code):
 """
 获取OAuth2的信息:access_token、expires_in、refresh_token、openid、scope
 返回结果为字典,可使用["xxx"]或.get("xxx", None)的方式进行读取
 """
 url = self._create_oauth_url_for_openid(code)
 response = requests.get(url)
 return response.json() if response else None

def _get_openid(self, code):
 oauth_info = self._get_oauth_info(code)
 if oauth_info:
  return oauth_info.get("openid", None)
 return None

def _get_json_js_api_params(self, prepay_id):
 js_params = {
  "appId": self.appid,
  "timeStamp": "%d" % time.time(),
  "nonceStr": random_str(32),
  "package": "prepay_id=%s" % prepay_id,
  "signType": "MD5",
 }
 js_params["paySign"] = calculate_sign(js_params, self.api_key)
 return js_params

def post(self, body, out_trade_no, total_fee, spbill_create_ip, notify_url, code):
 if code:
  open_id = self._get_openid(code)
  if open_id:
   # 直接调用基类的post方法查询prepay_id,如果成功,返回一个字典
   unified_order = super(JsAPIOrderPay, self).post(body, out_trade_no, total_fee, spbill_create_ip,
               notify_url, open_id=open_id)
   if unified_order:
    prepay_id = unified_order.get("prepay_id", None)
    if prepay_id:
     return self._get_json_js_api_params(prepay_id)
 return None

6、utils.py:


# -*- coding=utf-8 -*-
import hashlib
import re
import types
from random import Random
import requests
import thriftpy

from django.conf import settings
from django.core.exceptions import FieldDoesNotExist
from django.db import models
from django.db.models.fields.reverse_related import ForeignObjectRel
from rest_framework.pagination import PageNumberPagination
from thriftpy.rpc import make_client

from pay.exception_handler import ForeignObjectRelDeleteError, ModelDontHaveIsActiveFiled, logger

def smart_str(s, encoding='utf-8', strings_only=False, errors='strict'):
"""
Returns a bytestring version of 's', encoded as specified in 'encoding'.
If strings_only is True, don't convert (some) non-string-like objects.
"""
if strings_only and isinstance(s, (types.NoneType, int)):
 return s
if not isinstance(s, str):
 try:
  return str(s)
 except UnicodeEncodeError:
  if isinstance(s, Exception):
   # An Exception subclass containing non-ASCII data that doesn't
   # know how to print itself properly. We shouldn't raise a
   # further exception.
   return ' '.join([smart_str(arg, encoding, strings_only,
          errors) for arg in s])
  return unicode(s).encode(encoding, errors)
elif s and encoding != 'utf-8':
 return s.decode('utf-8', errors).encode(encoding, errors)
else:
 return s

def format_url(params, api_key=None):
url = "&".join(['%s=%s' % (key, smart_str(params[key])) for key in sorted(params)])
if api_key:
 url = '%s&key=%s' % (url, api_key)
return url

def calculate_sign(params, api_key):
# 签名步骤一:按字典序排序参数, 在string后加入KEY
url = format_url(params, api_key)
# 签名步骤二:MD5加密, 所有字符转为大写
return hashlib.md5(url.encode('utf-8')).hexdigest().upper()

def dict_to_xml(params, sign):
xml = ["<xml>", ]
for (k, v) in params.items():
 if (v.isdigit()):
  xml.append('<%s>%s</%s>' % (k, v, k))
 else:
  xml.append('<%s><![CDATA[%s]]></%s>' % (k, v, k))
xml.append('<sign><![CDATA[%s]]></sign></xml>' % sign)
return ''.join(xml)

def xml_to_dict(xml):
if xml[0:5].upper() != "<XML>" and xml[-6].upper() != "</XML>":
 return None, None

result = {}
sign = None
content = ''.join(xml[5:-6].strip().split('\n'))

pattern = re.compile(r"<(?P<key>.+)>(?P<value>.+)</(?P=key)>")
m = pattern.match(content)
while (m):
 key = m.group("key").strip()
 value = m.group("value").strip()
 if value != "<![CDATA[]]>":
  pattern_inner = re.compile(r"<!\[CDATA\[(?P<inner_val>.+)\]\]>")
  inner_m = pattern_inner.match(value)
  if inner_m:
   value = inner_m.group("inner_val").strip()
  if key == "sign":
   sign = value
  else:
   result[key] = value

next_index = m.end("value") + len(key) + 3
 if next_index >= len(content):
  break
 content = content[next_index:]
 m = pattern.match(content)

return sign, result

def validate_post_xml(xml, appid, mch_id, api_key):
sign, params = xml_to_dict(xml)
if (not sign) or (not params):
 return None

remote_sign = calculate_sign(params, api_key)
if sign != remote_sign:
 return None

if params["appid"] != appid or params["mch_id"] != mch_id:
 return None

return params

def random_str(randomlength=8):
chars = 'abcdefghijklmnopqrstuvwxyz0123456789'
random = Random()
return "".join([chars[random.randint(0, len(chars) - 1)] for i in range(randomlength)])

def post_xml(url, xml):
return requests.post(url, data=xml.encode('utf-8'), verify=False)

class UnActiveModelMixin(object):
"""
删除一个对象,并不真删除,级联将对应外键对象的is_active设置为false,需要外键对象都有is_active字段.
"""

def perform_destroy(self, instance):
 rel_fileds = [f for f in instance._meta.get_fields() if isinstance(f, ForeignObjectRel)]

links = [f.get_accessor_name() for f in rel_fileds]

for link in links:
  manager = getattr(instance, link, None)
  if not manager:
   continue
  if isinstance(manager, models.Model):
   if hasattr(manager, 'is_active') and manager.is_active:
    manager.is_active = False
    manager.save()
    raise ForeignObjectRelDeleteError(u'{} 上有关联数据'.format(link))
  else:
   if not manager.count():
    continue
   try:
    manager.model._meta.get_field('is_active')
    manager.filter(is_active=True).update(is_active=False)
   except FieldDoesNotExist as ex:
    # 理论上,级联删除的model上面应该也有is_active字段,否则代码逻辑应该有问题
    logger.warn(ex)
    raise ModelDontHaveIsActiveFiled(
     '{}.{} 没有is_active字段, 请检查程序逻辑'.format(
      manager.model.__module__,
      manager.model.__class__.__name__
     ))
 instance.is_active = False
 instance.save()

def get_queryset(self):
 return self.queryset.filter(is_active=True)

class StandardResultsSetPagination(PageNumberPagination):
page_size_query_param = 'size'

来源:https://www.cnblogs.com/victorwu/p/8505592.html

0
投稿

猜你喜欢

手机版 网络编程 asp之家 www.aspxhome.com