Search code examples
linuxapachewebcaching

How can I make web resources avialable offline?


Their is a folder in my PC with Linux OS, which contains a website (webpages etc.). The webpages and other complimentary files in the folder use cdns to bring resources like jquery, datatables etc.

I want to make these resources offline. I know I can manually search all files for occurrence of "http" keyword, download files from these URLs keep them in folder and accordingly change source file path. But as these are too many files it seems troublesome. I want to ask is there any better and elegant way of doing so. Thanks in advance


Solution

  • I made a python script to do the job:

    import re
    import os
    import aiohttp
    import asyncio
    import pathlib
    import string
    import random
    
    import chardet
    
    # Decode byte sequence using chardet to avoid "Type error"
    def decode_bytes(byte_sequence):
        result = chardet.detect(byte_sequence)
        encoding = result['encoding']
        return byte_sequence.decode(encoding)
    
    VALID_URL_REGEX = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+')
    
    # Downloader, I lazily have used resp.status as success criteria but it have logical issue you can also include other logic
    async def download_file(session, url, local_path):
        async with session.get(url, allow_redirects=True, ssl=False) as resp:
            if resp.status == 200:
                print("Content path is "+str(local_path))
                with open(local_path, "wb") as f:
                    while True:
                        print(local_path)
                        chunk = await resp.content.read(4196)
                        if not chunk:
                            break
                        chunk = chunk.encode("utf-8")
                        f.write(chunk)
    downloaded_urls = set()
    async def process_file(file_path, session):
        print("File during Read "+str(file_path))
        with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
            contents = f.read()
            try:
                contents = decode_bytes(contents)
            except UnicodeDecodeError as e:
                # To avoid Type error
                print(f"Error decoding file {file_path}: {e}")
                return
            urls = re.findall(VALID_URL_REGEX, contents)
            try:
                for url in urls:
                    file_name = url.split("/")[-1]
                    if len(file_name)==0:
                        continue
                    if url in downloaded_urls:
                        local_path = downloaded_urls[url]
                    # generating random strings to avoid same file name but different urls
                    res = ''.join(random.choices(string.ascii_uppercase +string.digits, k=5))
                    file_name=res+file_name
                    local_path = os.path.join("downloaded", file_name)
                    if not os.path.exists(local_path):
                        await download_file(session, url, local_path)
                        # To avoid redownloading
                        downloaded_urls.add(url)
                    contents = contents.replace(url, local_path)
            except:
                pass
        print("File during write "+str(file_path))
        with open(file_path, "w", encoding="utf-8", errors="ignore") as f:
            f.write(contents)
    
    async def process_directory(directory):
        
            if not os.path.exists("downloaded"):
                os.makedirs("downloaded")
            conn = aiohttp.TCPConnector(limit=2200,limit_per_host=20,ttl_dns_cache=22)
            async with aiohttp.ClientSession(connector=conn) as session:
                tasks = []
                try:
                    for filepath in pathlib.Path(directory).glob('**/*'):
                        fp=filepath.absolute()
                        if str(fp).endswith(".md") or str(fp).endswith(".txt"):
                            continue
                        if os.path.isfile(fp):
                            tasks.append(process_file(fp, session))
                except:
                    pass
                await asyncio.gather(*tasks)
                
    
    if __name__ == '__main__':
        directory = input("Enter root directory")           asyncio.run(process_directory(directory))
    

    I will also try "substitution" module and update answer accordingly.