Python

Package Management

venv | uv

pipvenvpython -m venv .env_name
.env_name\Scripts\activate

#pip install -r requirements.txt
#pip install ipykernel --trusted-host=pypi.python.org --trusted-host=pypi.org --trusted-host=files.pythonhosted.org
#pip show AutomationBots # show package details
requirements.txt # pip install -r "..\..\z. Documents\0. Templates\requirements.txt" -e . ## gcp google-cloud-bigquery google-cloud-storage google-cloud-bigquery-storage pandas-gbq db-dtypes ## scientific computing pandas scipy scikit-learn ## Visualization dataframe_image matplotlib plotly seaborn Sweetviz plottable ## jupyter ipykernel ipywidgets nbformat>=4.2.0 Jinja2
uv
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/0.6.9/install.ps1 | iex"
uv sync # ensure all packages are matched
uv sync --group # sync only for a specific group
uv install -e .  # Install package in development mode
uv install -e ".[dev]"  # With development dependencies
uv install -e ".[viz,ml]"  # With visualization and ML extras
uv pip install -r requirements.txt # install from requirements.txt
      
pyproject.toml
pyproject.toml
# This is a sample pyproject.toml configuration for a Python project

[build-system]
requires = ["setuptools>=42", "wheel"]
build-backend = "setuptools.build_meta"

[project]
name = "toolkit"
version = "0.0.1"
dependencies = [
    "loguru",
    "click>=8.0.0",
]
authors = [{ name = "Wei Yang", email = "x@xxx.com" }]
description = "A Python package"
readme = "README.md"
classifiers = [
    "Programming Language :: Python :: 3",
    "License :: OSI Approved :: MIT License",
    "Operating System :: OS Independent",
]

[project.scripts]
toolkit = "toolkit.cli.commands:cli"

[project.optional-dependencies]
dev = [
    "pytest",
    "pytest-cov",
    "mock",
] # pip install ".[dev]"

[project.urls]
Repository = "https://github.com/weiyang2048/toolkit"

[tool.setuptools.packages]
find = {}

[tool.setuptools.package-data]
"*" = ["*.sql", "*.yaml", "*.yml", "*.py"]

[tool.black]
line-length = 130
target-version = ['py36', 'py37', 'py38', 'py39', 'py310', 'py311', 'py312']
include = '\.pyi?$'

Resources

importlib.resourcesfrom importlib import resources

text = resources.files('toolkit.queries').joinpath('toy.sql').read_text()

Command Line Interface

argparse# python my_script 314 --conf_url conf/problem.yaml -v
import argparse
parser = argparse.ArgumentParser(description='Description of your program')

parser.add_argument('pos_arg1', nargs="?", type=int,
                     help='Description of your program')
parser.add_argument("--conf_url", type=str, default="conf/problem.yaml")
parser.add_argument("-v", "--verbose", action="store_true") # flag
args = parser.parse_args()
args.pos_arg1,args.conf_url

clickimport click

@click.group()
def cli():
    pass

@cli.command()
@click.argument('name', prompt="What is the name?", type=str)
def greet(name):
    """Greet someone by name."""
    click.echo(f"Hi, {name}!", fg="green", bg="black", bold=True)

@cli.command()
@click.option('--count', default=1, help='Number of times.')
@click.option('--verbose', '-v', is_flag=True, help='Verbose mode.')
def repeat(count, verbose):
    """Repeat a message."""
    for _ in range(count):
        if verbose:
            click.echo("Repeating!", fg="red", bg="black", bold=True)
        else:
            click.echo("Repeating!")

if __name__ == "__main__":
    cli()

python script.py --help
python script.py greet Wei
python script.py repeat --count 3 -v
        

Logging & Debugging

Complexipy

complexipy 🔗 complexipy . --failed

Logging

colorama 📦import numpy as np
from colorama import Fore, Style, Back
        
ipdbimport ipdb
ipdb.set_trace()
pdb++import pdb++
pdb++.set_trace()
ipdb++import ipdb++
ipdb++.set_trace()
import time fores = [Fore.RED, Fore.GREEN, Fore.YELLOW, Fore.BLUE, Fore.MAGENTA, Fore.CYAN, Fore.WHITE, Fore.LIGHTRED_EX, Fore.LIGHTGREEN_EX, Fore.LIGHTYELLOW_EX, Fore.LIGHTBLUE_EX, Fore.LIGHTMAGENTA_EX, Fore.LIGHTCYAN_EX, Fore.LIGHTWHITE_EX] backs = [Back.LIGHTRED_EX, Back.LIGHTGREEN_EX, Back.LIGHTYELLOW_EX, Back.LIGHTBLUE_EX, Back.LIGHTMAGENTA_EX, Back.LIGHTCYAN_EX, Back.LIGHTWHITE_EX, Back.RED, Back.GREEN, Back.YELLOW, Back.BLUE, Back.MAGENTA, Back.CYAN, Back.WHITE,] styles = [Style.BRIGHT, Style.DIM, Style.NORMAL, Style.RESET_ALL] for i in range(14): time.sleep(1) STYLE = np.random.choice(styles) print(STYLE + f"{fores[i]}{backs[i]} {i:>4}", end="\r\r")
📦loguruimport sys
from loguru import logger

# Configure loguru
logger.remove()  # Remove default handler

logger.add(
    sys.stderr,
    format=("{time:MM-DD HH:mm} | {name}:{function} [{line}] | {level} {level.icon} | {message}"),
    colorize=True,
    filter=lambda record: record["level"].name not in ["DATA", "SNOW"],
)  # Log to console with custom format

logger.add(
    sys.stderr,
    format="{time:MM-DD HH:mm} | {level: <8} | {name}:{function}:{line} - {message}",
    level="INFO",
    colorize=True,
    filter=lambda record: record["level"].name == "INFO",
)
# level data do not exist, so we need to create a new level
logger.level("DATA", no=15, color="blue")
logger.add(
    sys.stderr,
    level="DATA",
    format="{time:MM-DD HH:mm} | {level: <8} | {name}:{function}:{line} - {message}",
    colorize=True,
    filter=lambda record: record["level"].name == "DATA",
)

logger.opt(colors=True).info("Hello World")
    

Inspect 📦

getsourcegetmembersisfunctionisclassismodulefrom inspect import getsource, getmembers, isfunction, isclass, ismodule

import numpy as np
functions = [i[0] for i in getmembers(np, isfunction)]
classes = [i[0] for i in getmembers(np, isclass)]
modules = [i[0] for i in getmembers(np, ismodule)]
print(getsource(np.eye))
get variable nameimport inspect

def retrieve_name_up(var):
    callers_local_vars = inspect.currentframe().f_back.f_back.f_locals.items()
    return [var_name for var_name, var_val in callers_local_vars if var_val is var]


def retrieve_name(var):
    callers_local_vars = inspect.currentframe().f_back.f_locals.items()
    return [var_name for var_name, var_val in callers_local_vars if var_val is var]

def f(x):
    print(retrieve_name(x))
    print(retrieve_name_up(x))

temp = 1
f(temp)
['x'] ['temp']

tests 📦

unittestpython -m unittest
pytest# pip install pytest
pytest -v
pytest --cov=my_package tests/
        

Jupyter

Magic Commands

autoreload%load_ext autoreload
%autoreload 2   
whowhos%who # list all variables
%whos # with details
timeit only once%%timeit -r1 
📦memory_profiler
%load_ext memory_profiler

%%memit
s = []
for i in range(100000): s+=[i]

papermill 🔗

command linepapermill "notebooks/2.0 model_destructions.ipynb" -p  my_param "Fancy_value" -p version 0.1 "papermill.ipynb" 
InstructionsIn jupyter notebook tag a cell with parameters
parametersmy_param="example value"
version=0.1
        
python functionimport papermill as pm

pm.execute_notebook(
   'path/to/input.ipynb',
   'path/to/output.ipynb',
   parameters=dict(alpha=0.6, ratio=0.1)
)

System & Utilities

System

syssubprocessimport sys
import subprocess
try:
    import plottable
except:
    subprocess.call([sys.executable, "-m", "pip", "install", "plottable"])
    
os.systemos.system("rm -r some_old_dir")

os.system("mkdir some_new_dir")

Files & Directory

osimport os

os.getcwd() # /content
os.listdir("/content")
os.chdir("../") # parent
os.makedirs(os.path.join('..', 'data'), exist_ok=True)
os.walkfor root, dirs, files in os.walk("."):
    print(root, dirs, files)
pathfrom os.path import basename, exists
from urllib.request import urlretrieve
filename = basename(url)
if not exists(filename):
    urlretrieve(url, filename)
shutilfrom shutil import copy
copy("source_url", "destination_url")

Meta 📃

Scope

globalsdf = globals()[f"df_{unique_name}"]

Eval, Exec, Complie

evaleval("[0,1]")+[2]
execexec("""import matplotlib.pyplot as plt
plt.plot([1,2])""")
compilecode = compile("print(1)", "test.py", "exec")
exec(code)

tqdm & time & Parallel & Concurrent

trangeset_descriptiontqdmfrom tqdm import trange, tqdm
from time import sleep

for i in trange(100):
    sleep(0.01)

text = ""
pbar = tqdm(["a", "b", "c", "d"])
for char in pbar:
    sleep(0.25)
    text = text + char
    pbar.set_description("Processing %s" %char)


pbar = tqdm(total=100)
for i in range(10):
    sleep(0.1)
    pbar.update(10)
pbar.close()
perf_countersleepimport time
t0 = time.perf_counter()
time.sleep(5)
t1 = time.perf_counter()
t1 - t0 # 5.014447865999955
Parallelfrom joblib import Parallel, delayed
import time
V = range(10)

def process(i):
    time.sleep(5)
    return i * i

time_init = time.time()
results = Parallel(n_jobs=-1)(
        delayed(process)(i) for i in V)
print(time.time()-time_init, " secs")
asyncimport asyncio

async def my_job():
    s = 0
    for i in range(1000):
        s+=np.random.rand()
    return s

my_jobs = (my_job() for i in range(1000))
# asyncio.run(my_coroutine())
print(await my_job())


print(await asyncio.gather(*my_jobs))
loop
loop = asyncio.new_event_loop()
loop.run_until_complete(asyncio.gather(*my_jobs))
loop.close()

Semaphoreasync def gather_with_concurrency(*coros, n=11):
    semaphore = asyncio.Semaphore(n)

    async def sem_coro(coro):
        async with semaphore:
            return await coro

    return await asyncio.gather(*(sem_coro(c) for c in coros))

async def main():
    coro1 = my_coroutine()
    coro2 = my_coroutine()
    coro3 = my_coroutine()

    results = await gather_with_concurrency(coro1, coro2, coro3, n=2)
    print(results)

# asyncio.run(main())
await main() # for jupyter

Basics

Data Types

Strings

f-string
# right align, fill spaces, type = string
"{name:>10s}".format(**{"name":"32W West"})
# num to percentage, round to 2 decimal places
"{pct:.2%}".format(pct=1.12125)

f"{8:b}" # '1000'
f"{8:d}" # '8'

string functionschr(ord("a"))
# 'a'

ord('A')
# 65

ascii("🔗")
#\\U0001f517 
String Methods"1".isdigit() # True
"1".isnumeric() # True
"1".isalpha() # False
"112".count("1") # 2
"123".find("2",2)  # -1
"aBc".upper() # ABC
"Straße".lower() # straße
array & stringtemp="test.email+alex@leetcode.com".split("@")
temp[0]=temp[0].replace(".","").split("+")[0]
"@".join(temp)
# 'testemail@leetcode.com'

'   spacious   '.strip()
# 'spacious'

Dictionary

CreatingAddingDeletingx = dict() # create an empty dictionary

# list out keys and values
y = {"apple": "red", "orange" : "yellow"}

# comprehension
x = {word : len(word) for word in y.keys()}

x["pear"] = 4 # add a new key-value pair
            
Methodsx.values()
x.keys() # keys
x.items() # key, value pairs

x.get("pineapple")
# if NOTHING , NO ERROR

x["pineapple"]
# if NOTHING , ERROR
collections.Counterfrom collections import Counter
Counter([1,2,1,1,2,1,3])
# Counter({1: 4, 2: 2, 3: 1})defaultdictfrom collections import defaultdict
D = defaultdict(list)
D[1] # []
D = defaultdict(set)
D[1] # set()

Loops and Iterables

forifbreakfor i in (x*x for x in range(5)):
    if i == 1:  continue
    if i == 4:  pass
    if i == 9:  break
    print(i)
whiletry excepttry:
    i=5
    while i>-15:
        i-=1
except Exception as e:
    print(e)
# 1/5 is 0.2 # 1/4 is 0.25 # 1/3 is 0.3333333333333333 # 1/2 is 0.5 # 1/1 is 1.0 # Can't divided by 0
iterables
filter(key, iterable)

enumerate([1,2,3], start=-3)

zip([1,2],[1,1])

a=[1,2,3]; b = reversed(a)
[-1, -3, -5, -7, -9] [0, 1, 2, 3, 4] [(-3, 1), (-2, 2), (-1, 3)] [(1, 1), (2, 1)] [3, 2, 1]
constructiondef generator(start,stop):
    while (start<=stop):
        yield start
        start+=1

for counter in generator(3,4):
    print(counter)
function* generator(start,stop){ while (start<=stop){ yield start; start++; } } for (let counter of generator(3,4)){ console.log(counter); }
productfrom itertools import product

A = [1,2]
B = [0,1]

len(list(product(A,B))) # 4
groupbyfrom itertools import groupby

first_ = lambda x:x[0]
names =  ['Alan', 'Adam', 'Wes', ]
for letter, names in groupby(names, first_):
    print(letter, list(names))
# A ['Alan', 'Adam'] # W ['Wes', 'Will'] # A ['Albert'] # S ['Steven']
CombinationsPermutationsfrom itertools import combinations, permutations

A = [1,2,33];
list(combinations(A,2)),list(permutations(A,2))

Functions

arbitrary argumentsdef Calc(currency,*rates):
    for i in rates:
        print(currency*i)

Calc(1,0.1,0.2) # 0.1 0.2
Calc(1,0.1) # 0.1
arbitrary keyword argsx = dict(A = "a", B ="b" )
def lower(**kwargs):
    for i in kwargs:
        print(i)
        print(kwargs[i])
lower(**x) # AaBb
lower(Z="z", X="x") # ZzXx
maplist(map(int, vector))reducefrom functools import reduce
reduce(lambda x,y : x+y, range(101))
# 5050
partialfrom functools import partial
area = lambda w,h : w*h
area_w4 = partial(area, w=4)
area_w4(h=5) # 20
Decoratordef functor(f):
    def g():
        print("before")
        f()
        print("after")
    return g

@functor
def h():
    print("now")

h()
before now after

Class

classdunderaddstrreprclass ComplexNumber:
    def __init__(self, real, imaginary=0):
        self.real = real
        self.imaginary = imaginary

    def __mul__(self, no):
        return ComplexNumber(self.real  -  no.imaginary, self.real)

    def __str__(self):
        if self.imaginary < 0:
            return '{} - {}i'.format(self.real, abs(self.imaginary))
        else:
            return '{} + {}i'.format(self.real, self.imaginary)

    def __repr__(self):
        return str(self)
Subclass# real number class
class RealNumber(ComplexNumber):
    def __init__(self, real):
        self.real = real
        self.imaginary = 0
add_to_classdef add_to_class(Class):
    def wrapper(obj):
        setattr(Class, obj.__name__, obj)
    return wrapper

@add_to_class(ComplexNumber)
def angle(self):
    return np.arctan(self.imaginary / self.real)
dataclassfrom dataclasses import dataclass, field

@dataclass
class UpperComplexNumber:
    real : float
    imag : float
    c : complex = field(default_factory=lambda : [0,0])

    @property # getter
    def imag(self):
        return self._imag

    @imag.setter # setter
    def imag(self, imag):
        if imag < 0:
            raise ValueError("imaginary part must be positive")
        self._imag = imag

Cloud

GCS

Clientget_bucketfrom google.cloud import storage
client = storage.Client()
bucket_name = 'project'
bucket = client.get_bucket(bucket_name)
blobdownload_to_filenamedef download_from_gcs(client, bucket_name, blob_folder_name,
                      file_name, header=0):

    bucket = client.get_bucket(bucket_name)
    blob = bucket.blob(blob_folder_name + "/" + file_name)
    blob.download_to_filename(file_name)
    logging.info(Fore.GREEN+"File {} downloaded from {}.".format(
        file_name, blob_folder_name + "/" + file_name)+Style.RESET_ALL)

    if file_name.split(".")[-1] in ["csv", "xlsx"]:
        if file_name.split(".")[-1] == "csv":
            df = pd.read_csv(file_name, header=header)
        elif file_name.split(".")[-1] == "xlsx":
            df = pd.read_excel(file_name, header=header)
        os.remove(file_name)
        logging.info(
            Fore.GREEN + "DataFrame returned" + Style.RESET_ALL),
        return df
sizedeleteblobs = bucket.list_blobs(
    prefix=directory_name+"/folder/some_prefix_pattern")

for blob in blobs:
    if blob.size <= 714:
        print(blob.size)
        blob.delete()

S3

Boto3 S3 Clientupload_fileimport boto3

# Create an S3 client
s3 = boto3.client('s3')

# Upload a file to S3
def upload_to_s3(file_name, bucket, object_name=None):
    if object_name is None:
        object_name = file_name
    try:
        s3.upload_file(file_name, bucket, object_name)
        print(f"📤 File {file_name} uploaded to {bucket}/{object_name}")
    except Exception as e:
        print(f"❌ Failed to upload {file_name} to {bucket}/{object_name}: {e}")

# Example usage
upload_to_s3('my_file.txt', 'my_bucket')
Boto3 S3 Resourcedownload_file# Create an S3 resource
s3_resource = boto3.resource('s3')

# Download a file from S3
def download_from_s3(bucket, object_name, file_name=None):
    if file_name is None:
        file_name = object_name
    try:
        s3_resource.Bucket(bucket).download_file(object_name, file_name)
        print(f"📥 File {object_name} downloaded from {bucket} to {file_name}")
    except Exception as e:
        print(f"❌ Failed to download {object_name} from {bucket}: {e}")

# Example usage
download_from_s3('my_bucket', 'my_file.txt')