最近把组内的一个项目对接钉钉审批接口,通过python3.6。
废话不多说了,上代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
|
import requests import json import time from dingtalk.crypto import DingTalkCrypto from django.conf import settings # settings.BASE_DIR class Crypto( object ): def __init__( self , token): # 随便填的字符串 self .token = token # 自己生成的43位随机字符串 self .aes_key = settings.DINGDING.get( "DINGTALK_AES_TOKEN" ) # 钉钉企业ID self .corp_id = settings.DINGDING.get( "CorpId" ) # print ( "corp_id:" , self .corp_id) self .nonce = settings.DINGDING.get( "nonce" ) self .crypto = DingTalkCrypto( token = self .nonce, encoding_aes_key = self .aes_key, corpid_or_suitekey = self .corp_id ) def encrypt_success( self ): # 返回加密success result = self .crypto.encrypt_message( msg = "success" , nonce = self .nonce, timestamp = int (time.time() * 1000 ) ) return result class DING( object ): def __init__( self , approve_process): self .AgentId = settings.DINGDING.get( "AgentId" ) self .AppKey = settings.DINGDING.get( "AppKey" ) self .AppSecret = settings.DINGDING.get( "AppSecret" ) self .dingding_url = settings.DINGDING.get( "URL" ) self .process_code = settings.DINGDING.get( "APPROVE_PROCESS" ).get(approve_process)[ 'process_code' ] self .aes_key = settings.DINGDING.get( "DINGTALK_AES_TOKEN" ) self .nonce = settings.DINGDING.get( "nonce" ) def get_token( self ): ''' 获取钉钉的token :return: 钉钉token ''' url = self .dingding_url + '/gettoken?appkey={}&appsecret={}' . format ( self .AppKey, self .AppSecret) req = requests.get(url) req = json.loads(req.text) return req[ 'access_token' ] # def createCallbackDd(): # ''' # 注册钉钉回调函数 # :return: # ''' # url = 'https://oapi.dingtalk.com/call_back/register_call_back?access_token=' + self.getToken() # data = { # "call_back_tag": ["bpms_task_change", "bpms_instance_change"], #这两个回调种类是审批的 # "token": TOKEN, #自定义的字符串 # "aes_key": AES_KEY, #自定义的43位字符串,密钥 # "url": URL #回调地址 # } # requests.post(url, data=json.dumps(data)) # return ('OK') def create_process( self , originator_user_id, dept_id, form_component_value_vo, approvers, cc_list, has_cc = 0 ): ''' 创建钉钉审批 approvers为list 元素为钉钉userid cc_list同理 ''' url = self .dingding_url + '/topapi/processinstance/create?access_token=' + self .get_token() print ( "form_component_value_vo:" , form_component_value_vo) if has_cc = = 0 : data = { 'agent_id' : self .AgentId, 'process_code' : self .process_code, #工单id 'originator_user_id' : originator_user_id, 'dept_id' : dept_id, #创建人的钉钉部门id 'form_component_values' : str (form_component_value_vo), #钉钉后台配置的需要填写的字段, 'approvers' : approvers, 'cc_list' : cc_list, 'cc_position' : 'START_FINISH' # 发起和完成时与抄送 } else : data = { 'agent_id' : self .AgentId, 'process_code' : self .process_code, #工单id 'originator_user_id' : originator_user_id, #创建人的钉钉userid 'dept_id' : dept_id, #创建人的钉钉部门id 'form_component_values' : str (form_component_value_vo), #钉钉后台配置的需要填写的字段, 'approvers' : approvers, } print ( "dingding_utils:" , data) response = requests.post(url, data = data) return response.json() def get_status( self , process_instance_id): url = self .dingding_url + '/topapi/processinstance/get?access_token=' + self .get_token() data = { "process_instance_id" : process_instance_id } response = requests.post(url, data = data) return response.json() def register_callback( self , call_back_url): # 注册回调 url = self .dingding_url + '/call_back/register_call_back?access_token=' + self .get_token() print ( "self.get_token():" , self .get_token()) data = { "call_back_tag" : [ 'bpms_task_change' , 'bpms_instance_change' ], "token" : self .nonce, "aes_key" : self .aes_key, "url" : call_back_url, } response = requests.post(url, data = json.dumps(data)) return response.json() def get_callback( self ): url = self .dingding_url + '/call_back/get_call_back?access_token=' + self .get_token() req = requests.get(url) req = json.loads(req.text) return req def create_process_approver_v2( self , originator_user_id, dept_id, form_component_value_vo, approvers, cc_list): ''' 创建钉钉审批 ''' url = self .dingding_url + '/topapi/processinstance/create?access_token=' + self .get_token() data = { 'agent_id' : self .AgentId, 'process_code' : self .process_code, 'originator_user_id' : originator_user_id, 'dept_id' : dept_id, 'form_component_values' : str (form_component_value_vo), 'approvers_v2' : json.dumps(approvers) } if cc_list: data[ 'cc_list' ] = cc_list data[ 'cc_position' ] = 'FINISH' response = requests.post(url, data = data) return response.json() def create_process_approver_v2_test( self , originator_user_id, dept_id, form_component_value_vo): ''' 创建钉钉审批 ''' url = self .dingding_url + '/topapi/processinstance/create?access_token=' + self .get_token() data = { 'agent_id' : self .AgentId, 'process_code' : self .process_code, 'originator_user_id' : originator_user_id, 'dept_id' : dept_id, 'form_component_values' : str (form_component_value_vo), 'approvers_v2' : json.dumps([ { "task_action_type" : "NONE" , "user_ids" : [ "dingding_id" ], # 单独审批人 }, { "task_action_type" : "OR" , "user_ids" : [ "dingding_id1" , "dingding_id2" ], # 或签 }, { "task_action_type" : "AND" , "user_ids" : [ "dingding_id1" , "dingding_id2" ], # 会签 } ]) } response = requests.post(url, data = data) return response.json() if __name__ = = "__main__" : import django, os, sys sys.path.append( 'xxxxxx' ) # 项目路径 os.environ[ 'DJANGO_SETTINGS_MODULE' ] = 'xx.settings' # print("settings.DINGDING", settings.DINGDING) ding = DING( "create_xx" ) # print(ding.get_token()) # info = [{'name': '单行输入框','value': 'testixxxxxxxx'}] # # print(ding.create_process('11', 11, info)) a = [ { 'name' : "输入框1" , 'value' : "value1" }, { 'name' : "输入框2" , 'value' : "value2" }, ] # print(ding.create_process_test('11', 11, a)) # print(ding.create_process_approver_v2_test('11', 11, a)) # print(ding.create_process_test2()) # print(ding.get_status('xxx')) print (ding.get_status( 'xx' )) # # 验证 回调 # a = ding.get_token() # print(a) # c = Crypto(a) # print(c.encrypt_success()) # 注册回调 # print(ding.register_callback("http://xxxx.vaiwan.com/xxx")) # print(ding.get_callback()) |
说明:
1 Crypto类用于对接钉钉回调用的。一个公司只有一个corpId,并且一个corpid只能注册一个回调地址。我司有公共组注册好了回调。只要接入公司内的回调即可。所以我实际没有使用到Crypto。
2 在钉钉管理后台中创建应用后会有这三个东西:AgentId、AppKey,AppSecret 。在创建钉钉审批流程,可以从审批流程浏览器中获取到APPROVE_PROCESS。别忘啦给这个流程审批接口权限。这些官方文档有说。
3 配置setting变量:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
DINGDING = { "AgentId" : 123 , "AppKey" : "xx" , "AppSecret" : "xx" , "URL" : "https://oapi.dingtalk.com" , "APPROVE_PROCESS" : { # process_code "create_xx" : { "process_code" : "abc" , # 审批流程的id }, "DINGTALK_AES_TOKEN" : "abc" , "nonce" : "abc" , "CorpId" : "abc" , } |
4 接口形式创建的审批流程,与钉钉管理后台创建的流程有一些不同:
1 不能在不同的审批环节设置不同的抄送人
2 不能审批流程前后有相同的人,不能自动显示成 “自动同意”(管理后台设置成去重后,但是接口指定审批人场景,不支持)
5 其他如:审批内容、或签,会签代码里都有示例。
到此这篇关于python公司内项目对接钉钉审批流程的实现的文章就介绍到这了,更多相关python对接钉钉审批内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://www.cnblogs.com/jiangbo2018/p/13531665.html