uv
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/0.6.9/install.ps1 | iex"
uv sync # ensure all packages are matched
uv sync --group # sync only for a specific group
uv install -e . # Install package in development mode
uv install -e ".[dev]" # With development dependencies
uv install -e ".[viz,ml]" # With visualization and ML extras
uv pip install -r requirements.txt # install from requirements.txt
pyproject.toml
pyproject.toml
# This is a sample pyproject.toml configuration for a Python project
[build-system]
requires = ["setuptools>=42", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "toolkit"
version = "0.0.1"
dependencies = [
"loguru",
"click>=8.0.0",
]
authors = [{ name = "Wei Yang", email = "x@xxx.com" }]
description = "A Python package"
readme = "README.md"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
[project.scripts]
toolkit = "toolkit.cli.commands:cli"
[project.optional-dependencies]
dev = [
"pytest",
"pytest-cov",
"mock",
] # pip install ".[dev]"
[project.urls]
Repository = "https://github.com/weiyang2048/toolkit"
[tool.setuptools.packages]
find = {}
[tool.setuptools.package-data]
"*" = ["*.sql", "*.yaml", "*.yml", "*.py"]
[tool.black]
line-length = 130
target-version = ['py36', 'py37', 'py38', 'py39', 'py310', 'py311', 'py312']
include = '\.pyi?$'
Resources
importlib.resourcesfrom importlib import resources
text = resources.files('toolkit.queries').joinpath('toy.sql').read_text()
Command Line Interface
argparse# python my_script 314 --conf_url conf/problem.yaml -v
import argparse
parser = argparse.ArgumentParser(description='Description of your program')
parser.add_argument('pos_arg1', nargs="?", type=int,
help='Description of your program')
parser.add_argument("--conf_url", type=str, default="conf/problem.yaml")
parser.add_argument("-v", "--verbose", action="store_true") # flag
args = parser.parse_args()
args.pos_arg1,args.conf_url
clickimport click
@click.group()
def cli():
pass
@cli.command()
@click.argument('name', prompt="What is the name?", type=str)
def greet(name):
"""Greet someone by name."""
click.echo(f"Hi, {name}!", fg="green", bg="black", bold=True)
@cli.command()
@click.option('--count', default=1, help='Number of times.')
@click.option('--verbose', '-v', is_flag=True, help='Verbose mode.')
def repeat(count, verbose):
"""Repeat a message."""
for _ in range(count):
if verbose:
click.echo("Repeating!", fg="red", bg="black", bold=True)
else:
click.echo("Repeating!")
if __name__ == "__main__":
cli()
📦loguruimport sys
from loguru import logger
# Configure loguru
logger.remove() # Remove default handler
logger.add(
sys.stderr,
format=("{time:MM-DD HH:mm} | {name}:{function} [{line}] | {level} {level.icon} | {message}"),
colorize=True,
filter=lambda record: record["level"].name not in ["DATA", "SNOW"],
) # Log to console with custom format
logger.add(
sys.stderr,
format="{time:MM-DD HH:mm} | {level: <8} | {name}:{function}:{line} - {message}",
level="INFO",
colorize=True,
filter=lambda record: record["level"].name == "INFO",
)
# level data do not exist, so we need to create a new level
logger.level("DATA", no=15, color="blue")
logger.add(
sys.stderr,
level="DATA",
format="{time:MM-DD HH:mm} | {level: <8} | {name}:{function}:{line} - {message}",
colorize=True,
filter=lambda record: record["level"].name == "DATA",
)
logger.opt(colors=True).info("Hello World")
Inspect 📦
getsourcegetmembersisfunctionisclassismodulefrom inspect import getsource, getmembers, isfunction, isclass, ismodule
import numpy as np
functions = [i[0] for i in getmembers(np, isfunction)]
classes = [i[0] for i in getmembers(np, isclass)]
modules = [i[0] for i in getmembers(np, ismodule)]
print(getsource(np.eye))
get variable nameimport inspect
def retrieve_name_up(var):
callers_local_vars = inspect.currentframe().f_back.f_back.f_locals.items()
return [var_name for var_name, var_val in callers_local_vars if var_val is var]
def retrieve_name(var):
callers_local_vars = inspect.currentframe().f_back.f_locals.items()
return [var_name for var_name, var_val in callers_local_vars if var_val is var]
def f(x):
print(retrieve_name(x))
print(retrieve_name_up(x))
temp = 1
f(temp)['x']
['temp']
command linepapermill "notebooks/2.0 model_destructions.ipynb" -p my_param "Fancy_value" -p version 0.1 "papermill.ipynb" InstructionsIn jupyter notebook tag a cell with parameters
trangeset_descriptiontqdmfrom tqdm import trange, tqdm
from time import sleep
for i in trange(100):
sleep(0.01)
text = ""
pbar = tqdm(["a", "b", "c", "d"])
for char in pbar:
sleep(0.25)
text = text + char
pbar.set_description("Processing %s" %char)
pbar = tqdm(total=100)
for i in range(10):
sleep(0.1)
pbar.update(10)
pbar.close()
Parallelfrom joblib import Parallel, delayed
import time
V = range(10)
def process(i):
time.sleep(5)
return i * i
time_init = time.time()
results = Parallel(n_jobs=-1)(
delayed(process)(i) for i in V)
print(time.time()-time_init, " secs")
asyncimport asyncio
async def my_job():
s = 0
for i in range(1000):
s+=np.random.rand()
return s
my_jobs = (my_job() for i in range(1000))
# asyncio.run(my_coroutine())
print(await my_job())
print(await asyncio.gather(*my_jobs))
loop
loop = asyncio.new_event_loop()
loop.run_until_complete(asyncio.gather(*my_jobs))
loop.close()
Semaphoreasync def gather_with_concurrency(*coros, n=11):
semaphore = asyncio.Semaphore(n)
async def sem_coro(coro):
async with semaphore:
return await coro
return await asyncio.gather(*(sem_coro(c) for c in coros))
async def main():
coro1 = my_coroutine()
coro2 = my_coroutine()
coro3 = my_coroutine()
results = await gather_with_concurrency(coro1, coro2, coro3, n=2)
print(results)
# asyncio.run(main())
await main() # for jupyter
Basics
Data Types
Strings
f-string
# right align, fill spaces, type = string
"{name:>10s}".format(**{"name":"32W West"})
# num to percentage, round to 2 decimal places
"{pct:.2%}".format(pct=1.12125)
f"{8:b}" # '1000'
f"{8:d}" # '8'
CreatingAddingDeletingx = dict() # create an empty dictionary
# list out keys and values
y = {"apple": "red", "orange" : "yellow"}
# comprehension
x = {word : len(word) for word in y.keys()}
x["pear"] = 4 # add a new key-value pair
Methodsx.values()
x.keys() # keys
x.items() # key, value pairs
x.get("pineapple")
# if NOTHING , NO ERROR
x["pineapple"]
# if NOTHING , ERROR
forifbreakfor i in (x*x for x in range(5)):
if i == 1: continue
if i == 4: pass
if i == 9: break
print(i)
whiletry excepttry:
i=5
while i>-15:
i-=1
except Exception as e:
print(e)
# 1/5 is 0.2
# 1/4 is 0.25
# 1/3 is 0.3333333333333333
# 1/2 is 0.5
# 1/1 is 1.0
# Can't divided by 0
constructiondef generator(start,stop):
while (start<=stop):
yield start
start+=1
for counter in generator(3,4):
print(counter)
function* generator(start,stop){
while (start<=stop){
yield start;
start++;
}
}
for (let counter of generator(3,4)){
console.log(counter);
}
productfrom itertools import product
A = [1,2]
B = [0,1]
len(list(product(A,B))) # 4
groupbyfrom itertools import groupby
first_ = lambda x:x[0]
names = ['Alan', 'Adam', 'Wes', ]
for letter, names in groupby(names, first_):
print(letter, list(names))# A ['Alan', 'Adam']
# W ['Wes', 'Will']
# A ['Albert']
# S ['Steven']
CombinationsPermutationsfrom itertools import combinations, permutations
A = [1,2,33];
list(combinations(A,2)),list(permutations(A,2))
Functions
arbitrary argumentsdef Calc(currency,*rates):
for i in rates:
print(currency*i)
Calc(1,0.1,0.2) # 0.1 0.2
Calc(1,0.1) # 0.1
arbitrary keyword argsx = dict(A = "a", B ="b" )
def lower(**kwargs):
for i in kwargs:
print(i)
print(kwargs[i])
lower(**x) # AaBb
lower(Z="z", X="x") # ZzXx
sizedeleteblobs = bucket.list_blobs(
prefix=directory_name+"/folder/some_prefix_pattern")
for blob in blobs:
if blob.size <= 714:
print(blob.size)
blob.delete()
S3
Boto3 S3 Clientupload_fileimport boto3
# Create an S3 client
s3 = boto3.client('s3')
# Upload a file to S3
def upload_to_s3(file_name, bucket, object_name=None):
if object_name is None:
object_name = file_name
try:
s3.upload_file(file_name, bucket, object_name)
print(f"📤 File {file_name} uploaded to {bucket}/{object_name}")
except Exception as e:
print(f"❌ Failed to upload {file_name} to {bucket}/{object_name}: {e}")
# Example usage
upload_to_s3('my_file.txt', 'my_bucket')
Boto3 S3 Resourcedownload_file# Create an S3 resource
s3_resource = boto3.resource('s3')
# Download a file from S3
def download_from_s3(bucket, object_name, file_name=None):
if file_name is None:
file_name = object_name
try:
s3_resource.Bucket(bucket).download_file(object_name, file_name)
print(f"📥 File {object_name} downloaded from {bucket} to {file_name}")
except Exception as e:
print(f"❌ Failed to download {object_name} from {bucket}: {e}")
# Example usage
download_from_s3('my_bucket', 'my_file.txt')