搞技术的大都比较纯粹,比较实在,除了工资之外基本就没有别的收入了(少部分人能接外包赚外块)。或许是迫于生活的压力,或许是不甘于固定的工资,或许是出于技术人骨子里的好奇,亦或是这几年关于理财投资的大力宣传、门槛降低,理财越来越被我们所接受,并开始尝试股票、基金、P2P、XX宝等各种理财产品,本文所讲与P2P有关,但不打广告,只讲技术,顺便说明:投资有风险,理财需谨慎,我们赚钱不容易,不能给打了水漂。
某公司的理财产品有如下特点:
有一部分用户(行话叫牛)就靠平台活动或高息的时候借钱加杠杆投资,需要还钱的时候通过债权转让平台转让标还借款,通过买入和卖出时的利率差获得额外收益。这中间比较关键的一点就是转出时的利率,利率低收益就高(但太低就没有人接手了,转不出去还不了借款就要支付高额罚金),利率又跟当天待还的金额和已成交的金额有直接关系,那么如果能及时获取这两个数据就大概知道自己标多少利率能转手成功了。
我们接下来的技术实现就主要跟获取这两个数据,以及如何及时的展示数据有关。
只是为了技术研究,没有商用,代码和架构以实现需求为目的,未做优化,且非专业开发,凑合看
翻了一遍平台官网发现有个页面直接展示了转让标的详细信息,无需登录,且是通过ajax方式异步加载的json字符串(但是json字符串里套了一堆的html代码,不知道咋设计的)的方式渲染页面的,那抓取工作简单多了,写了个抓取脚本,流程为:访问页面接口 --> 取到数据 --> 简单处理 --> 录入数据库,抓取脚本直接放在计划任务里每三分钟执行一次,脚本内容如下:
import re
import time
import datetime
import requests
import pymysql
# 连接mysql数据库
db = pymysql.connect("127.0.0.1","root","passwd","pzp")
cursor = db.cursor()
for i in range(1, 9999):
data = {
"RepaymentTypeId": 0,
"pagesize": 1,
"pageindex": i,
"type": 1,
"status": 2,
"startDeadLine": 0,
"endDeadLine": 0,
"rate": 0,
"beginRate": 0,
"endRate": 0,
"strkey": None,
"orderby": 15,
"unitStart": 0,
"unitEnd": 0
}
try:
r = requests.post('https://www.tuandai.com/pages/zSharePlan/getZXList', data=data).json()
if r.get('code') == 0:
html = r.get('data').get('projectListHtml')
dr = re.compile(r'<[^>]+>', re.S)
dd = dr.sub('', html).split()
# 截取单号,只取字符串中的数字
order_num = ''.join(re.compile('\d+').findall(dd[0]))
# 查询mysql数据库
cursor.execute("select order_num from tdw_zx_done where order_num = %s" %order_num)
# 获取到查询结果
ex = cursor.fetchone()
# 判断单号是否已经记录过
if ex is None:
# 如果单号没有记录过,则计数器置为0
x = 0
publish_time = datetime.datetime.strptime(
'20%s-%s-%s %s:%s:%s' %(order_num[0:2],order_num[2:4],order_num[4:6],order_num[6:8],order_num[8:10],order_num[10:12]),
'%Y-%m-%d %H:%M:%S'
)
# ‘元’单位都替换成‘万元’,并去掉汉字
a = dd[1].split(':')[1]
if '元' in a:
money = int(''.join(re.compile('\d+').findall(a))) / 10000
else:
money = a.replace('万','')
# 取利率
rate = dd[4].replace('%','')
# 计算还款日期,借款日期 + 借款时间
days = ''.join(re.compile('\d+').findall(dd[6]))
repay = (publish_time + datetime.timedelta(days=int(days))).strftime('%Y-%m-%d')
print(publish_time, order_num, money, rate, days, repay)
# 往数据库里插入数据并提交
sql = "INSERT INTO tdw_zx_done VALUES('%s', %s, %s, %s, %s, '%s')" %(publish_time, order_num, money, rate, days, repay)
cursor.execute(sql)
db.commit()
else:
# 如果单号已经记录过,则计数器加1
x += 1
# 如果单号已录入过数据库,则返回
print('单号已录入:%s' %str(order_num))
# 判断如果有连续200个单号都已经录入过数据库,则跳出循环
if x == 200:
break
time.sleep(0.02)
else:
print(r)
except Exception as e:
print(e)
db.close()
上边已经获取到了原始数据,接下来需要对原始数据进行清洗,取自己需要的今日待还及实时成交,并写入缓存,写入缓存的目的是公众号并发查询的情况下,直接去缓存取数据,减小对数据库的压力,这个脚本程序也放在计划任务里每分钟执行
import os
import sys
import json
import datetime
from decimal import Decimal
os.chdir(sys.path[0])
from connection import rediscon, mysqlcon
mc, rc = mysqlcon().cursor(), rediscon()
def cache_now_data():
today = datetime.date.today().strftime('%Y-%m-%d')
tj, th, = '', ''
try:
# 成交数据统计
mc.execute("select count(1),sum(money) from tdw_zx_done where DATE(publish_time)='%s';" %today)
tj = mc.fetchone()
tjie = '借款人数:%s 借款金额:%s' %(tj[0], tj[1])
# 待还数据统计
mc.execute("select count(1),sum(money) from tdw_zx_done where repay='%s';" %today)
th = mc.fetchone()
thuan = '待还人数:%s 待还金额:%s' %(th[0], th[1])
# 完成还款预估时间
if th[0]:
tomorror_date = datetime.date.today() + datetime.timedelta(days=1)
tomorror_time_str = tomorror_date.strftime('%Y-%m-%d 00:00:00')
tomorror_time_format = datetime.datetime.strptime(tomorror_time_str, '%Y-%m-%d %H:%M:%S')
last_hour = (tomorror_time_format - datetime.datetime.now()).seconds / 60 / 60
avg_hour_money = round(Decimal(th[1] - tj[1]) / Decimal(last_hour), 4)
avg_hour_money = avg_hour_money if avg_hour_money > 0 else 0
else:
avg_hour_money = '无今日待还数据,无法计算'
# 按小时统计详情
mc.execute("select Hour(publish_time) as Hour,count(1),sum(money) from tdw_zx_done where DATE(publish_time) ='%s' group by Hour;" %today)
tdetail = mc.fetchall()
dl = '小时 | 成交额(万元)\n'
for i in tdetail:
dl += str(i[0]) + ' | ' + str(i[2]) + '\n'
except Exception as e:
print('数据库操作异常:%s' %e)
try:
key = datetime.datetime.now().strftime('%Y%m%d%H%M')
val = {"daihuan":str(th[1]),"chengjiao":str(tj[1]),"avg_hour_money":str(avg_hour_money)}
print(rc.set('tdw_zx_now_'+key, json.dumps(val), ex=7200))
except Exception as e:
print('缓存操作异常:%s' %e)
if __name__ == '__main__':
cache_now_data()
我想看数据的时候如何去看呢?去服务器上执行下脚本这方式太low了吧,借助微信机器人,就像你跟朋友聊天一样,发消息“最新数据”,那他就立即回复最新消息给你,这个方式看起来不错,实现完成后有几个朋友觉得不错,也想看数据,那我干脆将这些需要看数据的朋友都拉倒一个群里吧,回复消息群里所有用户都看得到很方便了,技术实现主要借助了itchat
模块(itchat主要通过网页版微信接口处理数据,网页版微信很多用户无法登陆了,也就没有办法使用itchat),代码如下
import itchat
from tdw_data_statistics_now import get_now_data
# 处理好友消息
@itchat.msg_register(itchat.content.TEXT)
def text_reply(msg):
if msg['Text'].startswith('命令:'):
message = msg['Content'].split('命令:')[1]
msg.user.send('正在处理你的命令:%s' %(message))
if '最新数据' == msg['Text']:
message = get_now_data()
msg.user.send(message)
# 处理群聊消息
@itchat.msg_register(itchat.content.TEXT, isGroupChat=True)
def text_reply(msg):
#itchat.send(u'@%s\u2005I received: %s' % (msg['ActualNickName'], msg['Content']), msg['FromUserName'])
if '最新数据' == msg['Text']:
message = get_now_data()
msg.user.send(message)
#message = getMessage()
#itchat.send(message, toUserName='filehelper')
itchat.auto_login(True, enableCmdQR=True)
itchat.run(True)
设想一个应用场景,如果有很多人需要这个数据怎么处理呢?让他们都加一下我的微信或把他们都给加到一个群里固然可以,只是不够优雅,这里想到了微信公众号,当用户关注公众号后,回复“最新数据”可把最新数据自动回复给用户是不是就优雅很多了。然后就写了个机器人自动处理,主要借助werobot
模块实现。
微信公众号可以配置为开发者模式,也就是开发者可以提供一个http接口,公众号会把收到的所有消息发送给开发者提供的接口,服务器接收到数据后判断数据类型,对数据做处理,这里需要用到web服务,所以引入Django
from django.urls import path
from werobot.contrib.django import make_view
from official.robot import robot
urlpatterns = [
path('robot/', make_view(robot)),
]
# cat official/robot.py
import re
from werobot import WeRoBot
from werobot.replies import ImageReply
from official.backends.get_tdw_data import get_now_data
robot = WeRoBot(enable_session=False,
token='',
APP_ID='',
APP_SECRET='')
# 新用户关注自动回复
@robot.subscribe
def subscribe(message):
return '''来了?坐,好戏马上开始
回复[最新数据]获取最新数据更新'''
# 联系二维码,用户输入"联系"关键字回复作者二维码
@robot.filter(re.compile(".*?联系.*?"))
def contact(message):
return ImageReply(
message=message,
media_id="DBO8qVu-8bwNF9O7o8wCyRs4awfVTjA_WuPoLkj33B1C8ZX9JVdmw30zZo9l8ovx"
)
# 处理文本消息
@robot.text
def hello(message):
try:
msg = message.content.strip().lower()
if re.compile("最新.*").match(msg):
tdw_zx_now_data = get_now_data()
if tdw_zx_now_data:
return tdw_zx_now_data
else:
return '暂时无法获取数据,请稍后再试'
else:
return '请输入[ 最新数据 ]获取智享数据更新\n\n如有疑问可以回复[ 联系作者 ]与我联系'
except Exception as e:
print(e)
import json
import datetime
from django_redis import get_redis_connection
cache = get_redis_connection('default')
def get_now_data():
now_time = datetime.datetime.now()
prev_time = now_time - datetime.timedelta(minutes=1)
# 直接去缓存取数据
redis_value = cache.get('tdw_zx_now_'+prev_time.strftime('%Y%m%d%H%M'))
if redis_value:
jsondata = json.loads(redis_value.decode())
daihuan = '今日待还金额:%s万元' %jsondata.get('daihuan')
chengjiao = '当前已成交金额:%s万元' %jsondata.get('chengjiao')
avg_hour_money = '完成还款预估每小时需要成交:%s万元' %jsondata.get('avg_hour_money')
return '''%s\n%s\n%s\n%s''' %(prev_time.strftime('%Y-%m-%d %H:%M'), daihuan, chengjiao, avg_hour_money)
else:
return None