Python script that can download images and videos of the user, like Gallery with photos or videos. It saves the data in the folder.
How it works:
Log in in instragram using selenium and navigate to the profile
Check the availability of Instagram profile if it's private or existing
Gathering urls from images or videos
Using threads and multiprocessing improve execution speed
My code:
from pathlib import Path
import requests
import time
from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from multiprocessing.dummy import Pool
import urllib.parse
import re
from concurrent.futures import ThreadPoolExecutor
from typing import *
chromedriver_path = None
class PrivateException(Exception):
pass
class InstagramPV:
def __init__(self, username: str, password: str, folder: Path, search_name: str):
"""
:param username: username
:param password: password
:param folder: folder name
:param search_name: the name what will search
"""
self.username = username
self.password = password
self.folder = folder
self.http_base = requests.Session()
self._search_name = search_name
self.links: List[str] = []
self.pictures: List[str] = []
self.videos: List[str] = []
self.url: str = 'https://www.instagram.com/{name}/'
self.posts: int = 0
if chromedriver_path is not None:
self.driver = webdriver.Chrome(chromedriver_path)
else:
self.driver = webdriver.Chrome()
@property
def name(self) -> str:
"""
To avoid any errors, with regex find the url and taking the name <search_name>
:return: The name of the Profile
"""
find_name = ''.join(re.findall(r'(?P<url>https?://[^\s]+)', self._search_name))
if find_name.startswith('https'):
self._search_name = urllib.parse.urlparse(find_name).path.split('/')[1]
return self._search_name
else:
return self._search_name
def __enter__(self):
return self
def check_availability(self) -> None:
"""
Checking Status code, Taking number of posts, Privacy and followed by viewer
Raise Error if the Profile is private and not following by viewer
:return: None
"""
search = self.http_base.get(self.url.format(name=self.name), params={'__a': 1})
search.raise_for_status()
load_and_check = search.json()
self.posts = load_and_check.get('graphql').get('user').get('edge_owner_to_timeline_media').get('count')
privacy = load_and_check.get('graphql').get('user').get('is_private')
followed_by_viewer = load_and_check.get('graphql').get('user').get('followed_by_viewer')
if privacy and not followed_by_viewer:
raise PrivateException('[!] Account is private')
def control(self) -> None:
"""
Create the folder name
"""
self.folder.mkdir(exist_ok=True)
def login(self) -> None:
"""Login To Instagram"""
self.driver.get('https://www.instagram.com/accounts/login')
WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.TAG_NAME, 'form')))
self.driver.find_element_by_name('username').send_keys(self.username)
self.driver.find_element_by_name('password').send_keys(self.password)
submit = self.driver.find_element_by_tag_name('form')
submit.submit()
"""Check For Invalid Credentials"""
try:
var_error = self.driver.find_element_by_class_name('eiCW-').text
raise ValueError('[!] Invalid Credentials')
except NoSuchElementException:
pass
try:
"""Close Notifications"""
notifications = WebDriverWait(self.driver, 20).until(
EC.presence_of_element_located((By.XPATH, '//button[text()="Not Now"]')))
notifications.click()
except NoSuchElementException:
pass
"""Taking cookies"""
cookies = {
cookie['name']: cookie['value']
for cookie in self.driver.get_cookies()
}
self.http_base.cookies.update(cookies)
"""Check for availability"""
self.check_availability()
self.driver.get(self.url.format(name=self.name))
return self.scroll_down()
def get_href(self) -> None:
elements = self.driver.find_elements_by_xpath('//a[@href]')
for elem in elements:
urls = elem.get_attribute('href')
if 'p' in urls.split('/'):
self.links.append(urls)
def scroll_down(self) -> None:
"""Taking hrefs while scrolling down"""
while len(list(set(self.links))) < self.posts:
self.get_href()
time.sleep(1)
self.driver.execute_script('window.scrollTo(0, document.body.scrollHeight);')
time.sleep(1)
return self.submit_links()
def submit_links(self) -> None:
"""Gathering Images and Videos and pass to function <fetch_url> Using ThreadPoolExecutor"""
self.control()
links = list(set(self.links))
print('[!] Ready for video - images'.title())
print(f'[*] extracting {len(links)} posts , please wait...'.title())
new_links = [urllib.parse.urljoin(link, '?__a=1') for link in links]
with ThreadPoolExecutor(max_workers=8) as executor:
[executor.submit(self.fetch_url, link) for link in new_links]
def fetch_url(self, url: str) -> None:
"""
This function extracts images and videos
:param url: Taking the url
:return None
"""
logging_page_id = self.http_base.get(url.split()[0]).json()
try:
"""Taking Gallery Photos or Videos"""
for log_pages in logging_page_id['graphql']['shortcode_media']['edge_sidecar_to_children']['edges']:
video = log_pages['node']['is_video']
if video:
video_url = log_pages['node']['video_url']
self.videos.append(video_url)
else:
image = log_pages['node']['display_url']
self.pictures.append(image)
except KeyError:
"""Unique photo or Video"""
image = logging_page_id['graphql']['shortcode_media']['display_url']
self.pictures.append(image)
if logging_page_id['graphql']['shortcode_media']['is_video']:
videos = logging_page_id['graphql']['shortcode_media']['video_url']
self.videos.append(videos)
def download_video(self, new_videos: Tuple[int, str]) -> None:
"""
Saving the video content
:param new_videos: Tuple[int,str]
:return: None
"""
number = new_videos[0]
link = new_videos[1]
with open(self.folder / f'Video{number}.mp4', 'wb') as f:
content_of_video = InstagramPV.content_of_url(self.http_base.get(link))
f.write(content_of_video)
def images_download(self, new_pictures: Tuple[int, str]) -> None:
"""
Saving the picture content
:param new_pictures: Tuple[int, str]
:return: None
"""
number = new_pictures[0]
link = new_pictures[1]
with open(self.folder / f'Image{number}.jpg', 'wb') as f:
content_of_picture = InstagramPV.content_of_url(self.http_base.get(link))
f.write(content_of_picture)
def downloading_video_images(self) -> None:
"""Using multiprocessing for Saving Images and Videos"""
print('[*] ready for saving images and videos!'.title())
picture_data = enumerate(list(set(self.pictures)))
video_data = enumerate(list(set(self.videos)))
pool = Pool(8)
pool.map(self.images_download, picture_data)
pool.map(self.download_video, video_data)
print('[+] Done')
def __exit__(self, exc_type, exc_val, exc_tb):
self.http_base.close()
self.driver.close()
@staticmethod
def content_of_url(req: [requests.sessions.Session, requests.models.Response]) -> bytes:
"""
:param req: requests.sessions.Session, requests.models.Response
:return: Content of Url
"""
return req.content
def main():
USERNAME = ''
PASSWORD = ''
NAME = ''
FOLDER = Path('')
with InstagramPV(USERNAME, PASSWORD, FOLDER, NAME) as pv:
pv.login()
pv.downloading_video_images()
if __name__ == '__main__':
main()
My previous comparative review tag: Instagram Bot, selenium, web scraping