4

I am trying to scrape concurrently with selenium and multiprocessing modules. Below is roughly my approach:

  • create queue with number of webdriver instances equal to number of workers
  • create pool of workers
  • each worker pulls webdriver instance from the queue
  • when function terminates webdriver instance is put back on the queue

Here is the code:

#!/usr/bin/env python
# encoding: utf-8

import time
import codecs
from selenium import webdriver
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from multiprocessing import Pool
from Queue import Queue


def download_and_save(link_tuple):
    link_id, link = link_tuple
    print link_id
    w = q.get()
    w.get(link)
    with codecs.open('%s.html' % link_id, 'w', encoding='utf-8') as f:
        f.write(w.page_source)
    time.sleep(10)
    q.put(w)


def main(num_processes):
    links = [
        'http://openjurist.org/743/f2d/273/zwiener-v-commissioner-of-internal-revenue',
        'http://www.oyez.org/advocates/z/l/lonny_f_zwiener',
        'http://www.texasbar.com/attorneys/member.cfm?id=21191',
        'https://www.courtlistener.com/opinion/441662/lonny-f-zwiener-and-ardith-e-zwiener-v-commissione/cited-by',
        'https://www.courtlistener.com/opinion/441662/lonny-f-zwiener-and-ardith-e-zwiener-v-commissione/authorities/',
        'http://www.myheritage.com/names/lonny_zwiene',
        'https://law.resource.org/pub/us/case/reporter/F2/743/743.F2d.273.84-4068.htm',
        'http://www.ancestry.com/1940-census/usa/Texas/Lonny-F-Zwiener_5bbff',
        'http://search.ancestry.com/cgi-bin/sse.dll?gl=34&rank=1&new=1&so=3&MSAV=0&msT=1&gss=ms_f-34&gl=bmd_death&rank=1&new=1&so=1&MSAV=0&msT=1&gss=ms_f-2_s&gsfn=Lonny&gsln=Zwiener&msypn__ftp=T',
        'http://www.mocavo.com/Lonny-F-Zwiener-Fredericksburg-Gillespie-Texas-1940-United-States-Census/0798164756456805432',
        'http://www.taftlaw.com/attorneys/635-mark-s-yuric'
    ]
    n = len(links)
    link_tuples = [(link_id, link) for link_id, link in zip(xrange(n), links)]
    pool = Pool(num_processes)
    pool.map(download_and_save, link_tuples)


if __name__ == '__main__':
    num_processes = 2
    q = Queue()
    dcap = dict(DesiredCapabilities.PHANTOMJS)
    dcap["phantomjs.page.settings.userAgent"] = (
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 "
        "(KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36"
    )
    for i in range(num_processes):
        w = webdriver.PhantomJS(desired_capabilities=dcap)
        q.put(w)
    main(num_processes)

This scripts runs but saved htmls are either duplicated or missing.

4
  • i think the webdriver itself only acts as a single static instance and therefore doesn't spawn up new instances to match the threads. at least, with pahntomjs, this is how it appears to be. Commented May 14, 2015 at 6:32
  • @jimtollan, thank you for the comment. Do you know if there is a way to instantiate multiple instances of webdriver? Commented May 15, 2015 at 8:01
  • 1
    I've rather crudely gone down the route of spawning new instances of a self contained application, rather than handle multiple threads to overcome this obstacle. It's not pretty and I'm sure there's a better way - however, my usecase at the time was against time pressure and was only going to be in use for a few days, so took a pragmatic approach Commented May 15, 2015 at 8:15
  • @jimtollan, kind of hack, but seems to work. Thanks for help. Commented May 16, 2015 at 22:35

1 Answer 1

3

Here is a different approach that I've had success with: you keep your workers in __main__, and the workers pull from the task_q.

import multiprocessing
import traceback

class scrapeWorker(multiprocessing.Process):
    def __init__(self, worker_num, task_q, result_q):
        super(scrapeWorker, self).__init__()
        self.worker_num = worker_num
        self.task_q = task_q
        self.result_q = result_q

        self.scraper = my_scraper_class() # this contains driver code, methods, etc.

    def handleWork(self, work):
        assert isinstance(work, tuple) or isinstance(work, list), "work should be a tuple or list. found {}".format(type(work))
        assert len(work) == 2, "len(work) != 2. found {}".format(work)
        assert isinstance(work[1], dict), "work[1] should be a dict. found {}".format(type(work[1]))

        # do the work
        result = getattr( self.scraper, work[0] )( **work[1] )

        self.result_q.put( result )

    # worker.run() is actually called via worker.start()
    def run(self):
        try:
            self.scraper.startDriving()

            while True:
                work = self.task_q.get()

                if work == 'KILL':
                    self.scraper.driver.quit()
                    break

                self.handleWork( work )
        except:
            print traceback.format_exc()

            raise

if __name__ == "__main__":
    num_workers = 4

    manager = multiprocessing.Manager()
    task_q = manager.Queue()
    result_q = manager.Queue()

    workers = []
    for worker_num in xrange(num_workers):
        worker = scrapeWorker(worker_num, task_q, result_q)
        worker.start()
        workers.append( worker )

    # you decide what job_stuff is
    # work == [ 'method_name', {'kw_1': val_1, ...} ]
    for work in job_stuff:
        task_q.put( work )

    results = []
    while len(results) < len(job_stuff):
        results.append( result_q.get() )

    for worker in workers:
        task_q.put( "KILL" )

    for worker in workers:
        worker.join()

    print "finished!"


####
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.