Attachment 'active_directory.py'
Download 1 import ldap
2 from struct import pack
3 from binascii import hexlify
4
5 class AmbiguousResult(Exception):
6 pass
7
8 class DoesNotExist(Exception):
9 pass
10
11 class ADConnection:
12 """
13 Creates a connection to the Active Directory, and provides methods to search.
14
15 Author: Matt Magin
16 Contact: matt.azmoo@gmail.com
17
18 Args:
19 domain: String, Name of the active directory domain (eg, mydomain.com)
20 username: String, Username to connect with.
21 password: String, Duh.
22 server: String, AD Server to connect to. TODO: Find the most relevant ad server automatically.
23 port: Int, Port to connect to AD. Almost always 389.
24 timeout: Connection timeout. 3 by default, if it's not set then it will never time out. FAIL.
25 """
26 def __init__(self, domain, username, password, server, port=389, timeout=3):
27 self.username = username
28 self.password = password
29 self.domain = domain
30 self.ldap_conn = ldap.initialize('ldap://%s:%d/' % (server, port))
31 self.ldap_conn.set_option(ldap.OPT_NETWORK_TIMEOUT, timeout)
32 self.ldap_conn.simple_bind_s('%s@%s' % (username, domain), password)
33
34 def search_domain(self, ldap_filter):
35 """ Args:
36 ldap_filter: Filter in LDAP format (ugh!), eg, '(samAccountName=mmagin)'
37 """
38 dn = ','.join('DC=%s' % s for s in self.domain.split('.'))
39 return ADObjectList(self.ldap_conn.search_ext_s(dn, ldap.SCOPE_SUBTREE, ldap_filter), self)
40
41 def search_confined(self, dn, ldap_filter, scope=ldap.SCOPE_SUBTREE):
42 """ Args:
43 dn: Distinguished Name of base container, eg, 'CN=Users,DC=cmvhodom,DC=cmv'
44 ldap_filter: Filter in LDAP format (ugh!), eg, '(samAccountName=mmagin)'
45 scope: Scope to search, options are:
46 ldap.SCOPE_BASE to search only within the object defined by the dn
47 ldap.SCOPE_ONELEVEL to search within the object and first-level children of the dn
48 ldap.SCOPE_SUBTREE (default) to search within the object and all children of the dn
49 """
50 return ADObjectList(self.ldap_conn.search_ext_s(dn, scope, ldap_filter), self)
51
52 def get_single(self, ldap_filter):
53 res = self.search_domain(ldap_filter)
54 if len(res) > 1:
55 raise AmbiguousResult
56 else:
57 try:
58 return res[0]
59 except IndexError:
60 raise DoesNotExist
61
62 def get_user(self, name):
63 return self.get_single('(&(samAccountName=%s)(objectClass=user))' % name)
64
65 def get_group(self, name):
66 return self.get_single('(&(samAccountName=%s)(objectClass=group))' % name)
67
68 class ADObjectList(list):
69 """ Takes a List of LDAP Result objects and converts to a list of ADObjects."""
70 def __init__(self, ldap_result, ad_conn):
71 self.ad_conn = ad_conn
72 for r in ldap_result:
73 try:
74 self.append(ADObject(r, ad_conn))
75 except AttributeError:
76 continue
77
78 class ADObject:
79 """ Converts an LDAP Result Object to a format easier to use."""
80 def __init__(self, ldap_object, ad_conn):
81 self.ad_conn = ad_conn
82 for k in ldap_object[1].keys():
83 setattr(self, k, ldap_object[1][k])
84
85 def get_name(self):
86 if hasattr(self, 'sAMAccountName'):
87 return ','.join(self.sAMAccountName)
88 else:
89 return ''
90
91 def get_dn(self):
92 if hasattr(self, 'distinguishedName'):
93 return ','.join(self.distinguishedName)
94 else:
95 return ''
96
97 def get_primary_group(self):
98 """ Gets the primary group for a user. We need this because the primary group
99 is not included in the "memberOf" attribute."""
100 # Domain RID (relative identifier) is all of the user's SID (security identifier)
101 # except the last 4 bytes. The RID of the primary group is the 4 byte binary representation
102 # of the primaryGroupID (int) attribute. Append the two to get the SID of the primary group.
103 group_sid = self.objectSid[0][:-4] + pack('i', int(self.primaryGroupID[0]))
104 # Convert it to hex so we can work with it. Binary data FTL.
105 hex_sid = hexlify(group_sid)
106 # Create an octet string from the hex representation. It's just the hex with a \ every 2 chars.
107 octet_string = ''.join(["\%s" % hex_sid[i:i+2] for i in range(0, len(hex_sid), 2)])
108 # Search the domain using the octet string and return the result. WIN!
109 grp = self.ad_conn.get_single(r'(objectSid=%s)' % octet_string)
110 return grp.name[0]
111
112 def get_groups(self):
113 groups = []
114 try:
115 groups.append(self.get_primary_group())
116 groups = groups + [dn.split(',')[0].split("=")[1] for dn in self.memberOf]
117 except AttributeError:
118 pass
119 return groups
120
Attached Files
To refer to attachments on a page, use attachment:filename, as shown below in the list of files. Do NOT use the URL of the [get] link, since this is subject to change and can break easily.You are not allowed to attach a file to this page.