from xml.dom.minidom import parseString
from org.maemo.hermes.engine.friend import Friend
-# httplib.HTTPSConnection.debuglevel = 1
-
class LinkedInApi():
"""LinkedIn API for Hermes.
Copyright (c) Fredrik Wendt <fredrik@wendt.se> 2010.
Released under the Artistic Licence."""
-
+ GCONF_API_KEY = '/apps/maemo/hermes/linkedin_api_key'
+ GCONF_API_SECRET = '/apps/maemo/hermes/linkedin_key_secret'
+ GCONF_ACCESS_TOKEN = '/apps/maemo/hermes/linkedin_access_token'
+
LI_SERVER = "api.linkedin.com"
LI_API_URL = "https://api.linkedin.com"
LI_CONN_API_URL = LI_API_URL + "/v1/people/~/connections"
# -----------------------------------------------------------------------
- def __init__(self):
+ def __init__(self, gconf=None):
"""Initialize the LinkedIn service, finding LinkedIn API keys in gconf and
having a gui_callback available."""
- self._gc = gnome.gconf.client_get_default()
+ if gconf: self._gc = gconf
+ else: self._gc = gnome.gconf.client_get_default()
- # -- Check the environment is going to work...
- # FIXME: duplication
- if (self._gc.get_string('/desktop/gnome/url-handlers/http/command') == 'epiphany %s'):
- raise Exception('Browser in gconf invalid (see NB#136012). Installation error.')
-
- api_key = self._gc.get_string('/apps/maemo/hermes/linkedin_api_key')
- secret_key = self._gc.get_string('/apps/maemo/hermes/linkedin_key_secret')
+ api_key = self._gc.get_string(LinkedInApi.GCONF_API_KEY)
+ secret_key = self._gc.get_string(LinkedInApi.GCONF_API_SECRET)
+ self.api_key = api_key
if api_key is None or secret_key is None:
raise Exception('No LinkedIn application keys found. Installation error.')
self.consumer = oauth.OAuthConsumer(api_key, secret_key)
self.sig_method = oauth.OAuthSignatureMethod_HMAC_SHA1()
- # IMPROVEMENT: verify that the access_token is valid for at least another hour
- self._verify_access_token()
-
+ self._verify_browser_command()
+
+ # -----------------------------------------------------------------------
def authenticate(self, need_auth, block_for_auth):
need_auth()
token = self._get_request_token()
url = self._get_authorize_url(token)
verifier = block_for_auth(url)
- self._verify_verifier(verifier)
+ self._verify_verifier(token, verifier)
# -----------------------------------------------------------------------
# -----------------------------------------------------------------------
- def _verify_verifier(self, verifier):
- try:
- self.access_token = self._get_access_token(self.request_token, verifier)
- self._store_access_token_in_gconf()
- except:
- raise Exception("authorization failed, try again")
-
-
- # -----------------------------------------------------------------------
- def _verify_access_token(self):
- return True
-
-
- # -----------------------------------------------------------------------
- def _store_access_token_in_gconf(self, token_str):
- self._gc.set_string('/apps/maemo/hermes/linkedin_access_token', token_str)
-
-
- # -----------------------------------------------------------------------
- def _get_access_token_from_gconf(self):
- token_str = self._gc.get_string('/apps/maemo/hermes/linkedin_access_token')
- return oauth.OAuthToken.from_string(token_str)
+ def get_friend_details(self, url, header_value):
+ oauth_request = oauth.OAuthRequest.from_consumer_and_token(self.consumer, token=self.access_token, http_url=url)
+ oauth_request.sign_request(self.sig_method, self.consumer, self.access_token)
+ headers = oauth_request.to_header()
+ headers[u'x-li-auth-token'] = header_value
+ connection = httplib.HTTPConnection("api.linkedin.com")
+ connection.request(oauth_request.http_method, url, headers=headers)
+ data = connection.getresponse().read()
+ return data
+
# -----------------------------------------------------------------------
def _make_api_request(self, url):
- print "_make_api_request", url
oauth_request = oauth.OAuthRequest.from_consumer_and_token(self.consumer, token=self.access_token, http_url=url)
oauth_request.sign_request(self.sig_method, self.consumer, self.access_token)
connection = httplib.HTTPSConnection(self.LI_SERVER)
try:
connection.request(oauth_request.http_method, url, headers=oauth_request.to_header())
- return connection.getresponse().read()
+ xml = connection.getresponse().read()
+ return xml
except:
raise Exception("Failed to contact LinkedIn at " + url)
# -----------------------------------------------------------------------
def _parse_dom(self, dom):
- print "parse_dom", dom
def get_first_tag(node, tagName):
tags = node.getElementsByTagName(tagName)
if tags and len(tags) > 0:
# look for errors
errors = dom.getElementsByTagName('error')
if (len(errors) > 0):
- print "Error" # FIXME: handle this better
- return []
+ details = ""
+ try:
+ details = " (" + extract(errors[0], "message") + ")"
+ except:
+ pass
+ raise Exception("LinkedIn communication errors detected" + details)
friends = []
people = dom.getElementsByTagName('person')
friends.append(friend)
except:
- pass
+ pass
+
+ return friends
+
+ # -----------------------------------------------------------------------
+ def _get_request_token(self):
+ """Get a request token from LinkedIn"""
+
+ oauth_consumer_key = self.api_key
+ oauth_request = oauth.OAuthRequest.from_consumer_and_token(self.consumer, callback="oob", http_url=self.REQUEST_TOKEN_URL)
+ oauth_request.sign_request(self.sig_method, self.consumer, None)
+
+ connection = httplib.HTTPSConnection(self.LI_SERVER)
+ connection.request(oauth_request.http_method, self.REQUEST_TOKEN_URL, headers=oauth_request.to_header())
+ response = connection.getresponse().read()
+
+ token = oauth.OAuthToken.from_string(response)
+ return token
# -----------------------------------------------------------------------
+ def _get_authorize_url(self, token):
+ """The URL that the user should browse to, in order to authorize the
+ application's request to access data"""
+
+ oauth_request = oauth.OAuthRequest.from_token_and_callback(token=token, http_url=self.AUTHORIZE_URL)
+ return oauth_request.to_url()
+
+
+ # -----------------------------------------------------------------------
def _get_access_token(self, token, verifier):
- """If the verifier (which was displayed in the browser window) is valid,
- then an access token is returned which should be used to access data on the service."""
+ """If the verifier (which was displayed in the browser window) is
+ valid, then an access token is returned which should be used to
+ access data on the service."""
oauth_request = oauth.OAuthRequest.from_consumer_and_token(self.consumer, token=token, verifier=verifier, http_url=self.ACCESS_TOKEN_URL)
oauth_request.sign_request(self.sig_method, self.consumer, token)
connection.request(oauth_request.http_method, self.ACCESS_TOKEN_URL, headers=oauth_request.to_header())
response = connection.getresponse()
token_str = response.read()
+ if "ouath_problem" in token_str:
+ raise Exception("Authorization failure - failed to get access token")
self._store_access_token_in_gconf(token_str)
return oauth.OAuthToken.from_string(token_str)
# -----------------------------------------------------------------------
- def _get_authorize_url(self, token):
- """The URL that the user should browse to, in order to authorize the application to acess data"""
-
- oauth_request = oauth.OAuthRequest.from_token_and_callback(token=token, http_url=self.AUTHORIZE_URL)
- return oauth_request.to_url()
+ def _verify_verifier(self, request_token, verifier):
+ try:
+ self.access_token = self._get_access_token(request_token, verifier)
+ except Exception, e:
+ import traceback
+ traceback.print_exc()
+ raise Exception("LinkedIn authorization failed, try again (" + e + ")")
# -----------------------------------------------------------------------
- def _get_request_token(self):
- """Get a request token from LinkedIn"""
-
- oauth_consumer_key = self.api_key
- oauth_request = oauth.OAuthRequest.from_consumer_and_token(self.consumer, callback="oob", http_url=self.REQUEST_TOKEN_URL)
- oauth_request.sign_request(self.sig_method, self.consumer, None)
+ def _verify_browser_command(self):
+ # -- Check the environment is going to work...
+ # FIXME: duplication
+ if (self._gc.get_string('/desktop/gnome/url-handlers/http/command') == 'epiphany %s'):
+ raise Exception('Browser in gconf invalid (see NB#136012). Installation error.')
+
+
+ # -----------------------------------------------------------------------
+ def _store_access_token_in_gconf(self, token_str):
+ self._gc.set_string(LinkedInApi.GCONF_ACCESS_TOKEN, token_str)
- connection = httplib.HTTPSConnection(self.LI_SERVER)
- connection.request(oauth_request.http_method, self.REQUEST_TOKEN_URL, headers=oauth_request.to_header())
- response = self.connection.getresponse().read()
- token = oauth.OAuthToken.from_string(response)
- return token
+ # -----------------------------------------------------------------------
+ def _get_access_token_from_gconf(self):
+ """Returns an oauth.OAuthToken, or None if the gconf value is empty"""
+
+ token_str = self._gc.get_string(LinkedInApi.GCONF_ACCESS_TOKEN)
+ if not token_str:
+ return None
+ if "oauth_problem" in token_str:
+ self._store_access_token_in_gconf("")
+ raise Exception("Authorization failure - access token reported OAuth problem")
+ return oauth.OAuthToken.from_string(token_str)
+
--- /dev/null
+from org.maemo.hermes.engine.linkedin.service import Service
+from org.maemo.hermes.engine.linkedin.api import LinkedInApi
+from org.maemo.hermes.engine.names import canonical
+from org.maemo.hermes.engine.friend import Friend
+import unittest
+
+class FakeGConf(dict):
+ def __init__(self, props=None):
+ dict.__init__(self)
+ if props:
+ for key in props:
+ self.set_string(key, props[key])
+ def get_string(self, key):
+ try: return self.__getitem__(key)
+ except: return None
+ def set_string(self, key, value):
+ self.__setitem__(key, value)
+
+class TestLinkedInApi(unittest.TestCase):
+
+ def setUp(self):
+ self.gconf = FakeGConf({
+ LinkedInApi.GCONF_API_KEY:'1et4G-VtmtqNfY7gF8PHtxMOf0KNWl9ericlTEtdKJeoA4ubk4wEQwf8lSL8AnYE',
+ LinkedInApi.GCONF_API_SECRET:'uk--OtmWcxER-Yh6Py5p0VeLPNlDJSMaXj1xfHILoFzrK7fM9eepNo5RbwGdkRo_'})
+ self.testee = LinkedInApi(self.gconf)
+
+
+ def test_get_request_token(self):
+ token = self.testee._get_request_token()
+
+ assert token is not None
+ assert token.callback_confirmed == "true"
+ assert token.key is not None
+ assert token.secret is not None
+
+
+ def test_get_authorize_url(self):
+ request_token = self.testee._get_request_token()
+ url = self.testee._get_authorize_url(request_token)
+
+ assert url is not None
+ assert "linkedin.com" in url
+
+
+ def test_verify_verifier_opens_browser(self):
+ self.verifier = "IllegalVerifier"
+ authenticate_threw_exception = False
+
+ try:
+ #if True:
+ self.testee.authenticate(self._need_auth_cb, self._block_for_auth)
+ assert False # should always go to except clause
+ except:
+ authenticate_threw_exception = True
+
+ assert self.need_auth_called
+ assert self.block_for_auth_called
+ assert "linkedin.com" in self.block_for_auth_url
+ assert authenticate_threw_exception
+
+
+ def _need_auth_cb(self):
+ self.need_auth_called = True
+
+
+ def _block_for_auth(self, url):
+ self.block_for_auth_called = True
+ self.block_for_auth_url = url
+ return self.verifier
+
+
+if __name__ == '__main__':
+ unittest.main()
--- /dev/null
+from org.maemo.hermes.engine.linkedin.service import Service
+from org.maemo.hermes.engine.linkedin.api import LinkedInApi
+from org.maemo.hermes.engine.names import canonical
+from org.maemo.hermes.engine.friend import Friend
+import unittest, sys
+
+def log(msg):
+ sys.__stderr__.write(msg + "\n")
+
+class FakeGConf(dict):
+ def __init__(self, props=None):
+ dict.__init__(self)
+ if props:
+ for key in props:
+ self.set_string(key, props[key])
+ def get_string(self, key):
+ try: return self.__getitem__(key)
+ except: return None
+ def set_string(self, key, value):
+ log("gconf %s = %s" % (key, value))
+ self.__setitem__(key, value)
+
+class TestLinkedInApi(unittest.TestCase):
+
+ def setUp(self):
+ self.gconf = FakeGConf({
+ LinkedInApi.GCONF_API_KEY:'1et4G-VtmtqNfY7gF8PHtxMOf0KNWl9ericlTEtdKJeoA4ubk4wEQwf8lSL8AnYE',
+ LinkedInApi.GCONF_API_SECRET:'uk--OtmWcxER-Yh6Py5p0VeLPNlDJSMaXj1xfHILoFzrK7fM9eepNo5RbwGdkRo_',
+ LinkedInApi.GCONF_ACCESS_TOKEN:None })
+ self.testee = LinkedInApi(self.gconf)
+
+ # uncomment to increase verbosity on HTTP level
+ #httplib.HTTPSConnection.debuglevel = 1
+ #httplib.HTTPConnection.debuglevel = 1
+
+
+ def test_typical_flow(self):
+ # uncomment next line if you need to authorize ...
+ #self.testee.authenticate(self._need_auth_cb, self._block_for_auth)
+ # ... or uncomment the following four lines - replacing the oauth_token string with whatever you got
+ #self.gconf.set_string(
+ # LinkedInApi.GCONF_ACCESS_TOKEN,
+ # 'oauth_token=f89c2b7b-1c12-4f83-a469-838e78901716&oauth_token_secret=60f817af-6437-4015-962f-cc3aefee0264')
+ #self.testee = LinkedInApi(self.gconf)
+
+ # uncomment to see the raw xml feed for all connections
+ #xml = self.testee._make_api_request(LinkedInApi.LI_CONN_API_URL)
+ #open("/tmp/linkdata", "w").write(xml)
+
+ # uncomment to see the raw xml feed for a connection (maemohermes)
+ #xml = self.testee.get_friend_details('http://api.linkedin.com/v1/people/V8pEL-382L:full', 'name:eaU4')
+ #open("/tmp/linkdata2", "w").write(xml)
+
+ # uncomment if you want to inspect the friend objects returned
+ friends = self.testee.get_friends()
+ for f in friends:
+ photo = f.get_photo_url()
+ if photo: print photo
+
+
+ def _need_auth_cb(self):
+ self.need_auth_called = True
+
+
+ def _block_for_auth(self, url):
+ import os, time
+ file = "/tmp/verifier"
+ verifier = None
+ msg = "Now open a browser to " + url + " and put the verification code into " + file + " like 'echo 01234 > " + file + "'"
+ log(msg)
+ while True:
+ if os.path.exists(file):
+ verifier = open(file).read().strip()
+ os.remove(file)
+ break
+ time.sleep(2)
+ log("verifier: <" + verifier + ">")
+ return verifier
+
+
+if __name__ == '__main__':
+ unittest.main()
+++ /dev/null
-from org.maemo.hermes.engine.facebook.service import Service
-from org.maemo.hermes.engine.names import canonical
-import unittest
-
-
-class FakeContact():
- id_counter = 0
- def __init__(self, name, addr, id=None):
- self.name = name;
- self.urls = addr
- self.id = id or FakeContact.id_counter
- FakeContact.id_counter = FakeContact.id_counter + 1;
- def get_urls(self):
- return self.urls
- def get_name(self):
- return self.name
- def get_identifiers(self):
- return [canonical(self.name)]
-
-class TestFacebookService(unittest.TestCase):
-
- def setUp(self):
- self.testee = Service(None)
-
- def test_main_flow_one_match_by_url_one_by_name(self):
- # arrange
- self.existing_address = 'http://www.facebook.com/profile.php?id=123456'
- self.existing_contact = FakeContact("Facebook Person", [self.existing_address])
- existing_fake = {'uid':'123456','name':'Name Doesnt Match but URL Does'}
-
- self.other_address = 'http://twitter.com/not/correct/site'
- self.other_contact = FakeContact("Twitter Person", [self.other_address])
- other_fake = {'uid':'123','name':self.other_contact.get_name()}
-
- self.none_contact = FakeContact("No URLson", [])
-
- fake_data = [existing_fake, other_fake]
- self._fake_server_response(fake_data)
-
- # act
- self._run_service([self.existing_contact, self.other_contact, self.none_contact])
-
- # assert
- friends = self.testee.get_friends()
- contacts = self.testee.get_contacts_with_match()
- assert len(friends) == 2
- assert len(contacts) == 2
- assert self.other_contact in contacts
- assert self.existing_contact in contacts
- assert self.none_contact not in contacts
-
-
- def test_name_collision_avoided_by_previous_matching(self):
- contact_do_match = FakeContact("Same Name", ["http://www.facebook.com/profile.php?id=123"], 1);
- contact_no_match = FakeContact("Same Name", [None], 2)
-
- data = [{'uid':'123','name':'Same Name'}]
- self._fake_server_response(data)
-
- self._run_service([contact_no_match, contact_do_match])
-
- assert len(self.testee.get_unmatched_friends()) == 0
- matchers = self.testee.get_contacts_with_match().keys()
- assert len(matchers) == 1
- assert matchers[0].id == 1
-
- def test_name_collision_avoided_only_one_person_matched(self):
- contact_do_match = FakeContact("Same Name", ["http://twitter.com/same_name"]);
- contact_no_match = FakeContact("Same Name", [None])
-
- data = [{'uid':'123','name':'Same Name'}]
- self._fake_server_response(data)
-
- self._run_service([contact_no_match, contact_do_match])
-
- matchers = self.testee.get_contacts_with_match().keys()
- assert len(matchers) == 1
- assert len(self.testee.get_unmatched_friends()) == 0
-
-
- def _run_service(self, contacts):
- for contact in contacts:
- self.testee.pre_process_contact(contact)
- self.testee.process_friends()
- for contact in contacts:
- self.testee.process_contact(contact)
-
- def _fake_server_response(self, data):
- self.testee._get_friends_data = self._get_friends_data
- self._server_response = data
-
- def _get_friends_data(self):
- return self._server_response
-
-
-
-if __name__ == '__main__':
- unittest.main()
+++ /dev/null
-from org.maemo.hermes.engine.gravatar.service import Service
-from org.maemo.hermes.engine.friend import Friend
-import unittest
-
-class FakeContact():
- def __init__(self, addr):
- self.urls = addr
- def get_emails(self):
- return self.urls
- def get_name(self):
- return self.urls[0]
-
-class TestGravatarService(unittest.TestCase):
-
- def setUp(self):
- self._setUp('maemohermes@wendt.se', 'b14ec179822b')
-
- def test_main_flow(self):
- self.testee.pre_process_contact(self.existing_contact)
- self.testee.pre_process_contact(self.missing_contact)
- self.testee.process_friends()
- self.testee.process_contact(self.existing_contact)
- self.testee.process_contact(self.missing_contact)
-
- friends = self.testee.get_friends()
- contacts = self.testee.get_contacts_with_match()
- assert len(friends) == 1
- assert len(contacts) == 1
- assert self.missing_contact not in contacts.keys()
- assert self.existing_contact in contacts.keys()
- friend = friends[0]
- assert friend.get_name() == self.existing_contact.get_name()
- print friend.get_photo_url()
-
-
- def test_that_a_person_with_two_addresses_and_one_gravatar_works(self):
- self._fake_server_response({self.missing_address: None, self.existing_address: "http://url.to.img/"})
-
- self.testee.pre_process_contact(self.multiple_contact)
- self.testee.process_friends()
- self.testee.process_contact(self.multiple_contact)
-
- friends = self.testee.get_friends()
- contacts = self.testee.get_contacts_with_match()
- assert len(friends) == 1
- assert len(contacts) == 1
- assert self.multiple_contact in contacts
- assert self.missing_contact not in contacts
- assert self.existing_contact not in contacts.keys()
- assert friends[0].get_name() == self.existing_contact.get_name()
-
-
- def _fake_server_response(self, map):
- self.testee._get_hash_info_from_server = self._get_hash_info_from_server
- # in real results the addresses hashed, so we'll add that here, and keep originals for easier debugging/inspection
- for key in map.keys():
- hash = self.testee._get_hash_for_address(key)
- map[hash] = map[key]
- self._server_response = map
-
- def _get_hash_info_from_server(self, args, url):
- self._server_args = args
- self._server_url = url
- return self._server_response
-
- def _setUp(self, email, key):
- self.testee = Service(email, key)
-
- self.existing_address = 'fredrik@wendt.se'
- self.existing_contact = FakeContact([self.existing_address])
- self.existing_friend = Friend("Fredrik Wendt")
-
- self.missing_address = 'will_not_ever_exist_i_truly_hope_at_least@wendt.se'
- self.missing_contact = FakeContact([self.missing_address])
- self.missing_friend = Friend("Unknown Person")
-
- self.multiple_contact = FakeContact([self.existing_address, self.missing_address])
- self.multiple_friend = Friend("Another Person")
-
-
-if __name__ == '__main__':
- unittest.main()
+++ /dev/null
-from org.maemo.hermes.engine.linkedin.service import Service
-from org.maemo.hermes.engine.names import canonical
-from org.maemo.hermes.engine.friend import Friend
-import unittest
-
-
-class FakeContact():
- id_counter = 0
- def __init__(self, name, addr, id=None):
- self.name = name;
- self.urls = addr
- self.id = id or FakeContact.id_counter
- FakeContact.id_counter = FakeContact.id_counter + 1;
- def get_urls(self):
- return self.urls
- def get_name(self):
- return self.name
- def get_identifiers(self):
- return [canonical(self.name)]
-
-class FakeLinkedInApi():
- def get_friends(self):
- return self._friends
- def set_friends(self, data):
- self._friends = data
-
-class UrlFriend(Friend):
- def __init__(self, name, url):
- Friend.__init__(self, name)
- self.add_url(url)
-
-class TestLinkedInService(unittest.TestCase):
-
- def setUp(self):
- self.linkedInApi = FakeLinkedInApi()
- self.testee = Service(self.linkedInApi)
-
- def test_main_flow_one_match_by_url_one_by_name(self):
- known_url = "http://linkedin/id=1"
- contact_by_url = FakeContact("By Url", [known_url], 1)
- friend_by_url = UrlFriend("Unimportant", known_url)
-
- known_name = "By Name"
- contact_by_name = FakeContact(known_name, [], 2)
- friend_by_name = Friend(known_name)
-
- self._fake_server_response([friend_by_name, friend_by_url])
-
- self._run_service([contact_by_name, contact_by_url])
-
- assert len(self.testee.get_unmatched_friends()) == 0
- matchers = self.testee.get_contacts_with_match()
- assert contact_by_name in matchers
- assert contact_by_url in matchers
- assert known_url in matchers[contact_by_url].get_urls()
-
- def _run_service(self, contacts):
- for contact in contacts:
- self.testee.pre_process_contact(contact)
- self.testee.process_friends()
- for contact in contacts:
- self.testee.process_contact(contact)
-
- def _fake_server_response(self, data):
- self.linkedInApi.set_friends(data)
-
-
-if __name__ == '__main__':
- unittest.main()
+++ /dev/null
-import unittest
-from org.maemo.hermes.engine.syncjob import SyncJob
-
-class FakeContact():
- def __init__(self):
- pass
-
-class FakeService():
- def __init__(self):
- self.contacts_preprocessed = []
- self.contacts_processed = []
- def get_name(self):
- return "fake service"
- def pre_process_contact(self, contact):
- self.contacts_preprocessed.append(contact)
- def process_friends(self):
- pass
- def process_contact(self, contact):
- self.contacts_processed.append(contact)
-
-
-class TestSyncJob(unittest.TestCase):
-
- def setUp(self):
- self.service = FakeService()
- self.services = [self.service]
- self.contact = FakeContact()
- self.contacts = [self.contact]
- self.testee = SyncJob(self.services, self.contacts)
-
- def test_main_flow(self):
- self.testee.run()
- assert self.contact in self.service.contacts_preprocessed
- assert self.contact in self.service.contacts_processed
-
-if __name__ == '__main__':
- unittest.main()
+++ /dev/null
-from org.maemo.hermes.engine.twitter.service import Service
-from org.maemo.hermes.engine.names import canonical
-import unittest
-
-class FakeContact():
- def __init__(self, name, addr):
- self.name = name
- self.urls = addr
- def get_urls(self):
- return self.urls
- def get_name(self):
- return self.name
- def get_identifiers(self):
- return [canonical(self.name)]
-
-class FakeTweeter():
- def __init__(self, name, screen_name, props=None):
- self._props = {}
- self["name"] = name
- self["screen_name"] = screen_name
- if props:
- for key in props:
- self[key] = props[key]
- def __setitem__(self, key, value):
- self._props[key] = value
- def __getattr__(self, key):
- try:
- return self._props[key]
- except:
- return ""
-
-
-class TestTwitterService(unittest.TestCase):
-
- def setUp(self):
- self.testee = Service(None)
-
-
- def test_main_flow(self):
- twitterName = "Twitter Name"
- self._fake_server([FakeTweeter(twitterName, "wendtse", {"url":"http://wendt.se"})])
- contact = FakeContact("Fredrik Wendt", ["http://twitter.com/wendtse"])
-
- self._exercise_service([contact])
-
- assert len(self.testee.get_unmatched_friends()) == 0
- matchers = self.testee.get_contacts_with_match()
- assert contact in matchers
- assert twitterName == matchers[contact].get_name()
-
-
- def test_name_collision_avoided_by_previous_matching(self):
- twitterName = "Same Name"
- known_tweeter = FakeTweeter(twitterName, "samename")
- other_tweeter = FakeTweeter(twitterName, "otherscreenname")
- self._fake_server([other_tweeter, known_tweeter])
- contact = FakeContact(twitterName, [])
-
- self._exercise_service([contact])
-
- assert len(self.testee.get_unmatched_friends()) == 1
- matchers = self.testee.get_contacts_with_match()
- assert contact in matchers
-
-
- def test_name_collision_avoided_only_one_person_matched(self):
- twitter_name = "Same Name"
- screen_name = "samename"
- known_tweeter = FakeTweeter(twitter_name, screen_name)
- other_tweeter = FakeTweeter(twitter_name, "otherscreenname")
- self._fake_server([other_tweeter, known_tweeter])
- contact = FakeContact(twitter_name, ["http://twitter.com/" + screen_name])
-
- self._exercise_service([contact])
-
- assert len(self.testee.get_unmatched_friends()) == 1
- matchers = self.testee.get_contacts_with_match()
- assert contact in matchers
- assert screen_name == matchers[contact].get_nickname()
-
-
- def _exercise_service(self, contacts):
- for contact in contacts:
- self.testee.pre_process_contact(contact)
- self.testee.process_friends()
- for contact in contacts:
- self.testee.process_contact(contact)
-
- def _fake_server(self, data):
- self._fake_data = data
- self.testee._get_tweeters = self._get_tweeters
-
- def _get_tweeters(self):
- return self._fake_data
-
-if __name__ == '__main__':
- unittest.main()
--- /dev/null
+from org.maemo.hermes.engine.facebook.service import Service
+from org.maemo.hermes.engine.names import canonical
+import unittest
+
+
+class FakeContact():
+ id_counter = 0
+ def __init__(self, name, addr, id=None):
+ self.name = name;
+ self.urls = addr
+ self.id = id or FakeContact.id_counter
+ FakeContact.id_counter = FakeContact.id_counter + 1;
+ def get_urls(self):
+ return self.urls
+ def get_name(self):
+ return self.name
+ def get_identifiers(self):
+ return [canonical(self.name)]
+
+class TestFacebookService(unittest.TestCase):
+
+ def setUp(self):
+ self.testee = Service(None)
+
+ def test_main_flow_one_match_by_url_one_by_name(self):
+ # arrange
+ self.existing_address = 'http://www.facebook.com/profile.php?id=123456'
+ self.existing_contact = FakeContact("Facebook Person", [self.existing_address])
+ existing_fake = {'uid':'123456','name':'Name Doesnt Match but URL Does'}
+
+ self.other_address = 'http://twitter.com/not/correct/site'
+ self.other_contact = FakeContact("Twitter Person", [self.other_address])
+ other_fake = {'uid':'123','name':self.other_contact.get_name()}
+
+ self.none_contact = FakeContact("No URLson", [])
+
+ fake_data = [existing_fake, other_fake]
+ self._fake_server_response(fake_data)
+
+ # act
+ self._run_service([self.existing_contact, self.other_contact, self.none_contact])
+
+ # assert
+ friends = self.testee.get_friends()
+ contacts = self.testee.get_contacts_with_match()
+ assert len(friends) == 2
+ assert len(contacts) == 2
+ assert self.other_contact in contacts
+ assert self.existing_contact in contacts
+ assert self.none_contact not in contacts
+
+
+ def test_name_collision_avoided_by_previous_matching(self):
+ contact_do_match = FakeContact("Same Name", ["http://www.facebook.com/profile.php?id=123"], 1);
+ contact_no_match = FakeContact("Same Name", [None], 2)
+
+ data = [{'uid':'123','name':'Same Name'}]
+ self._fake_server_response(data)
+
+ self._run_service([contact_no_match, contact_do_match])
+
+ assert len(self.testee.get_unmatched_friends()) == 0
+ matchers = self.testee.get_contacts_with_match().keys()
+ assert len(matchers) == 1
+ assert matchers[0].id == 1
+
+ def test_name_collision_avoided_only_one_person_matched(self):
+ contact_do_match = FakeContact("Same Name", ["http://twitter.com/same_name"]);
+ contact_no_match = FakeContact("Same Name", [None])
+
+ data = [{'uid':'123','name':'Same Name'}]
+ self._fake_server_response(data)
+
+ self._run_service([contact_no_match, contact_do_match])
+
+ matchers = self.testee.get_contacts_with_match().keys()
+ assert len(matchers) == 1
+ assert len(self.testee.get_unmatched_friends()) == 0
+
+
+ def _run_service(self, contacts):
+ for contact in contacts:
+ self.testee.pre_process_contact(contact)
+ self.testee.process_friends()
+ for contact in contacts:
+ self.testee.process_contact(contact)
+
+ def _fake_server_response(self, data):
+ self.testee._get_friends_data = self._get_friends_data
+ self._server_response = data
+
+ def _get_friends_data(self):
+ return self._server_response
+
+
+
+if __name__ == '__main__':
+ unittest.main()
--- /dev/null
+from org.maemo.hermes.engine.gravatar.service import Service
+from org.maemo.hermes.engine.friend import Friend
+import unittest
+
+class FakeContact():
+ def __init__(self, addr):
+ self.urls = addr
+ def get_emails(self):
+ return self.urls
+ def get_name(self):
+ return self.urls[0]
+
+class TestGravatarService(unittest.TestCase):
+
+ def setUp(self):
+ self._setUp('maemohermes@wendt.se', 'b14ec179822b')
+
+ def test_main_flow(self):
+ self.testee.pre_process_contact(self.existing_contact)
+ self.testee.pre_process_contact(self.missing_contact)
+ self.testee.process_friends()
+ self.testee.process_contact(self.existing_contact)
+ self.testee.process_contact(self.missing_contact)
+
+ friends = self.testee.get_friends()
+ contacts = self.testee.get_contacts_with_match()
+ assert len(friends) == 1
+ assert len(contacts) == 1
+ assert self.missing_contact not in contacts.keys()
+ assert self.existing_contact in contacts.keys()
+ friend = friends[0]
+ assert friend.get_name() == self.existing_contact.get_name()
+ print friend.get_photo_url()
+
+
+ def test_that_a_person_with_two_addresses_and_one_gravatar_works(self):
+ self._fake_server_response({self.missing_address: None, self.existing_address: "http://url.to.img/"})
+
+ self.testee.pre_process_contact(self.multiple_contact)
+ self.testee.process_friends()
+ self.testee.process_contact(self.multiple_contact)
+
+ friends = self.testee.get_friends()
+ contacts = self.testee.get_contacts_with_match()
+ assert len(friends) == 1
+ assert len(contacts) == 1
+ assert self.multiple_contact in contacts
+ assert self.missing_contact not in contacts
+ assert self.existing_contact not in contacts.keys()
+ assert friends[0].get_name() == self.existing_contact.get_name()
+
+
+ def _fake_server_response(self, map):
+ self.testee._get_hash_info_from_server = self._get_hash_info_from_server
+ # in real results the addresses hashed, so we'll add that here, and keep originals for easier debugging/inspection
+ for key in map.keys():
+ hash = self.testee._get_hash_for_address(key)
+ map[hash] = map[key]
+ self._server_response = map
+
+ def _get_hash_info_from_server(self, args, url):
+ self._server_args = args
+ self._server_url = url
+ return self._server_response
+
+ def _setUp(self, email, key):
+ self.testee = Service(email, key)
+
+ self.existing_address = 'fredrik@wendt.se'
+ self.existing_contact = FakeContact([self.existing_address])
+ self.existing_friend = Friend("Fredrik Wendt")
+
+ self.missing_address = 'will_not_ever_exist_i_truly_hope_at_least@wendt.se'
+ self.missing_contact = FakeContact([self.missing_address])
+ self.missing_friend = Friend("Unknown Person")
+
+ self.multiple_contact = FakeContact([self.existing_address, self.missing_address])
+ self.multiple_friend = Friend("Another Person")
+
+
+if __name__ == '__main__':
+ unittest.main()
--- /dev/null
+from org.maemo.hermes.engine.linkedin.service import Service
+from org.maemo.hermes.engine.names import canonical
+from org.maemo.hermes.engine.friend import Friend
+import unittest
+
+
+class FakeContact():
+ id_counter = 0
+ def __init__(self, name, addr, id=None):
+ self.name = name;
+ self.urls = addr
+ self.id = id or FakeContact.id_counter
+ FakeContact.id_counter = FakeContact.id_counter + 1;
+ def get_urls(self):
+ return self.urls
+ def get_name(self):
+ return self.name
+ def get_identifiers(self):
+ return [canonical(self.name)]
+
+class FakeLinkedInApi():
+ def get_friends(self):
+ return self._friends
+ def set_friends(self, data):
+ self._friends = data
+
+class UrlFriend(Friend):
+ def __init__(self, name, url):
+ Friend.__init__(self, name)
+ self.add_url(url)
+
+class TestLinkedInService(unittest.TestCase):
+
+ def setUp(self):
+ self.linkedInApi = FakeLinkedInApi()
+ self.testee = Service(self.linkedInApi)
+
+ def test_main_flow_one_match_by_url_one_by_name(self):
+ known_url = "http://linkedin/id=1"
+ contact_by_url = FakeContact("By Url", [known_url], 1)
+ friend_by_url = UrlFriend("Unimportant", known_url)
+
+ known_name = "By Name"
+ contact_by_name = FakeContact(known_name, [], 2)
+ friend_by_name = Friend(known_name)
+
+ self._fake_server_response([friend_by_name, friend_by_url])
+
+ self._run_service([contact_by_name, contact_by_url])
+
+ assert len(self.testee.get_unmatched_friends()) == 0
+ matchers = self.testee.get_contacts_with_match()
+ assert contact_by_name in matchers
+ assert contact_by_url in matchers
+ assert known_url in matchers[contact_by_url].get_urls()
+
+ def _run_service(self, contacts):
+ for contact in contacts:
+ self.testee.pre_process_contact(contact)
+ self.testee.process_friends()
+ for contact in contacts:
+ self.testee.process_contact(contact)
+
+ def _fake_server_response(self, data):
+ self.linkedInApi.set_friends(data)
+
+
+if __name__ == '__main__':
+ unittest.main()
--- /dev/null
+import unittest
+from org.maemo.hermes.engine.syncjob import SyncJob
+
+class FakeContact():
+ def __init__(self):
+ pass
+
+class FakeService():
+ def __init__(self):
+ self.contacts_preprocessed = []
+ self.contacts_processed = []
+ def get_name(self):
+ return "fake service"
+ def pre_process_contact(self, contact):
+ self.contacts_preprocessed.append(contact)
+ def process_friends(self):
+ pass
+ def process_contact(self, contact):
+ self.contacts_processed.append(contact)
+
+
+class TestSyncJob(unittest.TestCase):
+
+ def setUp(self):
+ self.service = FakeService()
+ self.services = [self.service]
+ self.contact = FakeContact()
+ self.contacts = [self.contact]
+ self.testee = SyncJob(self.services, self.contacts)
+
+ def test_main_flow(self):
+ self.testee.run()
+ assert self.contact in self.service.contacts_preprocessed
+ assert self.contact in self.service.contacts_processed
+
+if __name__ == '__main__':
+ unittest.main()
--- /dev/null
+from org.maemo.hermes.engine.twitter.service import Service
+from org.maemo.hermes.engine.names import canonical
+import unittest
+
+class FakeContact():
+ def __init__(self, name, addr):
+ self.name = name
+ self.urls = addr
+ def get_urls(self):
+ return self.urls
+ def get_name(self):
+ return self.name
+ def get_identifiers(self):
+ return [canonical(self.name)]
+
+class FakeTweeter():
+ def __init__(self, name, screen_name, props=None):
+ self._props = {}
+ self["name"] = name
+ self["screen_name"] = screen_name
+ if props:
+ for key in props:
+ self[key] = props[key]
+ def __setitem__(self, key, value):
+ self._props[key] = value
+ def __getattr__(self, key):
+ try:
+ return self._props[key]
+ except:
+ return ""
+
+
+class TestTwitterService(unittest.TestCase):
+
+ def setUp(self):
+ self.testee = Service(None)
+
+
+ def test_main_flow(self):
+ twitterName = "Twitter Name"
+ self._fake_server([FakeTweeter(twitterName, "wendtse", {"url":"http://wendt.se"})])
+ contact = FakeContact("Fredrik Wendt", ["http://twitter.com/wendtse"])
+
+ self._exercise_service([contact])
+
+ assert len(self.testee.get_unmatched_friends()) == 0
+ matchers = self.testee.get_contacts_with_match()
+ assert contact in matchers
+ assert twitterName == matchers[contact].get_name()
+
+
+ def test_name_collision_avoided_by_previous_matching(self):
+ twitterName = "Same Name"
+ known_tweeter = FakeTweeter(twitterName, "samename")
+ other_tweeter = FakeTweeter(twitterName, "otherscreenname")
+ self._fake_server([other_tweeter, known_tweeter])
+ contact = FakeContact(twitterName, [])
+
+ self._exercise_service([contact])
+
+ assert len(self.testee.get_unmatched_friends()) == 1
+ matchers = self.testee.get_contacts_with_match()
+ assert contact in matchers
+
+
+ def test_name_collision_avoided_only_one_person_matched(self):
+ twitter_name = "Same Name"
+ screen_name = "samename"
+ known_tweeter = FakeTweeter(twitter_name, screen_name)
+ other_tweeter = FakeTweeter(twitter_name, "otherscreenname")
+ self._fake_server([other_tweeter, known_tweeter])
+ contact = FakeContact(twitter_name, ["http://twitter.com/" + screen_name])
+
+ self._exercise_service([contact])
+
+ assert len(self.testee.get_unmatched_friends()) == 1
+ matchers = self.testee.get_contacts_with_match()
+ assert contact in matchers
+ assert screen_name == matchers[contact].get_nickname()
+
+
+ def _exercise_service(self, contacts):
+ for contact in contacts:
+ self.testee.pre_process_contact(contact)
+ self.testee.process_friends()
+ for contact in contacts:
+ self.testee.process_contact(contact)
+
+ def _fake_server(self, data):
+ self._fake_data = data
+ self.testee._get_tweeters = self._get_tweeters
+
+ def _get_tweeters(self):
+ return self._fake_data
+
+if __name__ == '__main__':
+ unittest.main()