This commit is contained in:
wxchen
2022-12-29 23:08:25 +08:00
commit 21ad625896
42 changed files with 2336 additions and 0 deletions

60
README.md Normal file
View File

@@ -0,0 +1,60 @@
<br />
<p align="center">
<img src="https://github.com/vocdex/vocdex.github.io/blob/master/assets/img/icon.png" width="150" title="hover text">
</p>
# DIGIT
This codebase allows you:
- Collect image frames from DIGIT and annotate circles in each frame.
- Save the annotated frame values into a csv file.
- Train a baseline MLP model for RGB to Normal mapping.
- Generate depth maps in real-time using a fast Poisson Solver.
- Estimate 2D object pose using PCA and OpenCV built-in algorithms.
Currently, labeling circles is done manually for each sensor. It can take up to an hour for annotating 30 images.
This codebase has a script that will replace manual labeling and model training process up to 15 mins.(400% faster).
This project is set up in a way that makes it easier to create your own ROS packages later for processing tactile data in your applications.
## Visualization
### Estimating object pose by fitting an ellipse (PCA and OpenCV):
<br />
<p align="center">
<img src="https://github.com/vocdex/digit-depth/blob/main/assets/depthPCA.gif" width="400" title="depth">
</p>
### Depth image point cloud :
<br />
<p align="center">
<img src="https://github.com/vocdex/digit-depth/blob/main/assets/point-cloud.gif" width="400" title="point-cloud">
</p>
### Marker movement tracking ( useful for force direction and magnitude estimation):
<br />
<p align="center">
<img src="https://github.com/vocdex/digit-depth/blob/main/assets/markers.gif" width="400" title="marker">
</p>
## TODO
- Add a Pix2Pix model to generate depth maps from RGB images.
- Add an LSTM model for predicting slip from collected video frames.
- Add a baseline ResNet based model for estimating total normal force magnitude.
## Usage
Change **gel height,gel width, mm_to_pix, base_img_path, sensor :serial_num ** values in rgb_to_normal.yaml file in config folder.
- `pip install . `
- `cd scripts`
- `python record.py` : Press SPACEBAR to start recording.
- `python label_data.py` : Press LEFTMOUSE to label center and RIGHTMOUSE to label circumference.
- `python create_image_dataset.py` : Create a dataset of images and save it to a csv file.
- `python train_mlp.py` : Train an MLP model for RGB to Normal mapping.
color2normal model will be saved to a separate folder "models" in the same directory as this file.
For ROS, you can use the following command to run the node:
```bash
python scripts/ros/depth_value_pub.py
python scripts/ros/digit_image_pub.py
```
depth_value_pub.py publishes the maximum depth (deformation) value for the entire image when object is pressed. Accuracy depends on your MLP-depth model.
digit_image_pub.py publishes compressed RGB images from DIGIT.
### Please star this repo if you like it!
### Feel free to post an issue and create PRs.

BIN
assets/depthPCA.gif Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.8 MiB

BIN
assets/icon.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.4 KiB

BIN
assets/markers.gif Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.8 MiB

BIN
assets/point-cloud.gif Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 976 KiB

1
assets/readme.txt Normal file
View File

@@ -0,0 +1 @@
This folder contains images for visualization of tactile-related tasks

32
config/digit.yaml Normal file
View File

@@ -0,0 +1,32 @@
model_path: "models/checkpoints/mlp_depth.ckpt"
visualize:
depth:
enabled: True
compute_type: "cuda"
point3d:
enabled: True
compute_type: "cuda"
sensor:
mesh_name: adigit.STL
serial_num: "D20001"
T_cam_offset: [[2.22e-16, 2.22e-16, -1.00e+00, 0.00e+00],
[-1.00e+00, 0.00e+00, -2.22e-16, 0.00e+00],
[0.00e+00, 1.00e+00, 2.22e-16, 1.50e-02],
[0.00e+00, 0.00e+00, 0.00e+00, 1.00e+00]]
P: [[2.30940108e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
[0.00000000e+00, 1.73205081e+00, 0.00000000e+00, 0.00000000e+00],
[0.00000000e+00, 0.00000000e+00, -1.04081633e+00, -2.04081633e-03],
[0.00000000e+00, 0.00000000e+00, -1.00000000e+00, 0.00000000e+00]]
view_params:
fov: 60
front: [ -0.1, 0.1, 0.1 ]
lookat: [-0.001, -0.01, 0.01 ]
up: [ 0.04, -0.05, 0.190 ]
zoom: 2.5
z_near: 0.001
z_far: 0.05
gel_width: 0.01835 # gel width (y-axis) in meters //original: 0.02
gel_height: 0.02490 # gel height (x-axis) in meters //original: 0.03

40
config/rgb_to_normal.yaml Normal file
View File

@@ -0,0 +1,40 @@
prefix: ""
random_seed: 0
save_csv: False
visualize:
normals: True
points3d: True
dataloader:
batch_size: 1
shuffle: False
num_workers: 8
annot_flag: True
annot_file: "/home/shuk/digit-depth/csv/annotate.csv"
dataset:
dataset_type: 'imgs'
save_dataset: True
sensor:
mesh_name: adigit.STL
serial_num: "D00001"
T_cam_offset: [ [ 2.22e-16, 2.22e-16, -1.00e+00, 0.00e+00 ],
[ -1.00e+00, 0.00e+00, -2.22e-16, 0.00e+00 ],
[ 0.00e+00, 1.00e+00, 2.22e-16, 1.50e-02 ],
[ 0.00e+00, 0.00e+00, 0.00e+00, 1.00e+00 ] ]
P: [ [ 2.30940108e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00 ],
[ 0.00000000e+00, 1.73205081e+00, 0.00000000e+00, 0.00000000e+00 ],
[ 0.00000000e+00, 0.00000000e+00, -1.04081633e+00, -2.04081633e-03 ],
[ 0.00000000e+00, 0.00000000e+00, -1.00000000e+00, 0.00000000e+00 ] ]
z_near: 0.001
z_far: 0.05
gel_width: 0.02 # "D00001" 0.01835 # gel width (y-axis) in meters //original: 0.02
gel_height: 0.03 # "D00001"0.02490 # gel height (x-axis) in meters //original: 0.03
base_path: "/home/shuk/digit-depth"
model_path: "/home/shuk/digits2/tactile-in-hand/inhandpy/local/Mike/mike.ckpt"
base_img_path: "/home/shuk/digit-depth/images/0014.png"
mm_to_pixel: 21.09

4
digit_depth/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
from .digit import *
from .third_party import *
from .train import *
from .handlers import *

View File

@@ -0,0 +1,97 @@
"""
combine_A_and_B.py
Preprocessing for pix2pix
Copyright (c) 2016, Phillip Isola and Jun-Yan Zhu
All rights reserved.
BSD License
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
"""
import argparse
import os
from multiprocessing import Pool
import cv2
import numpy as np
def image_write(path_A, path_B, path_AB):
im_A = cv2.imread(
path_A, 1
) # python2: cv2.CV_LOAD_IMAGE_COLOR; python3: cv2.IMREAD_COLOR
im_B = cv2.imread(
path_B, 1
) # python2: cv2.CV_LOAD_IMAGE_COLOR; python3: cv2.IMREAD_COLOR
# im_A = cv2.transpose(im_A)
# im_B = cv2.transpose(im_B)
im_AB = np.concatenate([im_A, im_B], 1)
cv2.imwrite(path_AB, im_AB)
parser = argparse.ArgumentParser("create image pairs")
parser.add_argument("--fold_A", dest="fold_A", help="input directory for image A", type=str, default="../dataset/50kshoes_edges")
parser.add_argument("--fold_B", dest="fold_B", help="input directory for image B", type=str, default="../dataset/50kshoes_jpg")
parser.add_argument("--fold_AB", dest="fold_AB", help="output directory", type=str, default="../dataset/test_AB")
parser.add_argument("--num_imgs", dest="num_imgs", help="number of images", type=int, default=1000000)
parser.add_argument("--use_AB", dest="use_AB", help="if true: (0001_A, 0001_B) to (0001_AB)", action="store_true")
parser.add_argument("--no_multiprocessing", dest="no_multiprocessing", help="If used, chooses single CPU execution instead of parallel execution", action="store_true", default=False,
)
args = parser.parse_args()
for arg in vars(args):
print("[%s] = " % arg, getattr(args, arg))
splits = os.listdir(args.fold_A)
if not args.no_multiprocessing:
pool = Pool()
for sp in splits:
img_fold_A = os.path.join(args.fold_A, sp)
img_fold_B = os.path.join(args.fold_B, sp)
img_list = os.listdir(img_fold_A)
if args.use_AB:
img_list = [img_path for img_path in img_list if "_A." in img_path]
num_imgs = min(args.num_imgs, len(img_list))
print("split = %s, use %d/%d images" % (sp, num_imgs, len(img_list)))
img_fold_AB = os.path.join(args.fold_AB, sp)
if not os.path.isdir(img_fold_AB):
os.makedirs(img_fold_AB)
print("split = %s, number of images = %d" % (sp, num_imgs))
for n in range(num_imgs):
name_A = img_list[n]
path_A = os.path.join(img_fold_A, name_A)
if args.use_AB:
name_B = name_A.replace("_A.", "_B.")
else:
name_B = name_A
path_B = os.path.join(img_fold_B, name_B)
if os.path.isfile(path_A) and os.path.isfile(path_B):
name_AB = name_A
if args.use_AB:
name_AB = name_AB.replace("_A.", ".") # remove _A
path_AB = os.path.join(img_fold_AB, name_AB)
if not args.no_multiprocessing:
pool.apply_async(image_write, args=(path_A, path_B, path_AB))
else:
im_A = cv2.imread(path_A, 1) # python2: cv2.CV_LOAD_IMAGE_COLOR; python3: cv2.IMREAD_COLOR
im_B = cv2.imread(path_B, 1) # python2: cv2.CV_LOAD_IMAGE_COLOR; python3: cv2.IMREAD_COLOR
im_AB = np.concatenate([im_A, im_B], 1)
cv2.imwrite(path_AB, im_AB)
if not args.no_multiprocessing:
pool.close()
pool.join()

View File

@@ -0,0 +1,109 @@
import glob
import numpy as np
import pandas as pd
from PIL import Image
from sklearn.model_selection import train_test_split
def create_pixel_csv(img_dir: str, save_dir: str, img_type: str):
"""
Creates a CSV file with the pixel coordinates of the image
:param img_dir: image directory containing the images
:param save_dir: directory for saving the CSV file per image
:param img_type: color or normal
:return: saves the CSV file per image0
"""
imgs = sorted(glob.glob(f"{img_dir}/*.png"))
print(f"Found {len(imgs)} {img_type} images")
for idx, img_path in enumerate(imgs):
img = Image.open(img_path)
img_np = np.array(img)
xy_coords = np.flip(
np.column_stack(np.where(np.all(img_np >= 0, axis=2))), axis=1
)
rgb = np.reshape(img_np, (np.prod(img_np.shape[:2]), 3))
# Add pixel numbers in front
pixel_numbers = np.expand_dims(np.arange(1, xy_coords.shape[0] + 1), axis=1)
value = np.hstack([pixel_numbers, xy_coords, rgb])
# Properly save as CSV
if img_type == "color":
df = pd.DataFrame(value, columns=["pixel_number", "X", "Y", "R", "G", "B"])
del df["pixel_number"]
df.to_csv(f"{save_dir}/image_{idx}.csv", sep=",", index=False)
elif img_type == "normal":
df = pd.DataFrame(value, columns=["pixel_number", "X", "Y", "Nx", "Ny", "Nz"])
del df["pixel_number"]
df.to_csv(f"{save_dir}/image_{idx}.csv", sep=",", index=False)
print(f"{img_type} image CSV written for image {idx}")
def combine_csv(csv_dir: str, img_type: str):
"""
Combines all the CSV files in the directory into one CSV file for later use in training
:param csv_dir: directory containing the CSV files
:param img_type: color or normal
"""
csv_files=(glob.glob(f"{csv_dir}/*.csv"))
print(f"Found {len(csv_files)} {img_type} CSV files")
df = pd.concat([pd.read_csv(f, sep=",") for f in (glob.glob(f"{csv_dir}/*.csv"))])
df.to_csv(f"{csv_dir}/combined.csv", sep=",", index=False)
print(f"Combined CSV written for {img_type} images")
check_nans(f"{csv_dir}/combined.csv")
print("----------------------------------------------------")
def create_train_test_csv(save_dir: str, normal_path: str, color_path: str):
"""
Creates a CSV file with the pixel coordinates of the image. Samples 4% from zeros.
Splits the cleaned dataset into train and test (80%/20%)
:param save_dir: path for train_test_split folder
:param normal_path: path to combined normal CSV file
:param color_path: path to combined color CSV file
"""
seed=42
rgb_df = pd.read_csv(color_path, sep=",")
normal_df = pd.read_csv(normal_path, sep=",")
rgb_df['Nx'] = normal_df['Nx']
rgb_df['Ny'] = normal_df['Ny']
rgb_df['Nz'] = normal_df['Nz']
zeros_df = rgb_df[(rgb_df['Nx'] == 127) & (rgb_df['Ny'] == 127) ^ (rgb_df['Nz'] == 127)]
print(f"Found {len(zeros_df)} zeros in the dataset")
non_zeros_df = rgb_df[(rgb_df['Nx'] != 127) | (rgb_df['Ny'] != 127) & (rgb_df['Nz'] != 127)]
print(f"Found {len(non_zeros_df)} non-zeros in the dataset")
zeros_df.to_csv(f"{save_dir}/zeros.csv", sep=",", index=False)
non_zeros_df.to_csv(f"{save_dir}/non_zeros.csv", sep=",", index=False)
print("----------------------------------------------------")
# sampling 4% of zeros
clean_df = zeros_df.sample(frac=0.04, random_state=seed)
clean_df = pd.concat([clean_df, non_zeros_df])
clean_df.to_csv(f"{save_dir}/clean-data.csv", sep=",", index=False)
print(f"Clean data CSV of {len(clean_df)} written")
# split into train and test
train_df,test_df=train_test_split(clean_df,test_size=0.2,random_state=seed)
train_df.to_csv(f"{save_dir}/train.csv", sep=",", index=False)
test_df.to_csv(f"{save_dir}/test.csv", sep=",", index=False)
print(f"Train set of size {len(train_df)} and Test set of size {len(test_df)} written")
def check_nans(csv_path: str):
"""
Checks if there are any NaNs in the CSV file
:param csv_path: path to the CSV file
:return: True if there are no NaNs, False otherwise
"""
df = pd.read_csv(csv_path, sep=",")
if df.isnull().values.any():
nans=df.isnull().sum().sum()
print(f"Found {nans} NaNs in {csv_path}")
print("Replacing NaNs with mean")
df.fillna(df.mean(), inplace=True)
df.to_csv(csv_path, sep=",", index=False)
print(f"NaNs replaced in {csv_path}")
else:
print("No NaNs found.Perfect!")

View File

@@ -0,0 +1,22 @@
"""
Data loader for the color-normal datasets
"""
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from digit_depth.dataio.digit_dataset import DigitRealImageAnnotDataset
def data_loader(dir_dataset, params):
"""A data loader for the color-normal datasets
Args:
dir_dataset: path to the dataset
params: a dict of parameters
"""
transform = transforms.Compose([transforms.ToTensor()])
dataset = DigitRealImageAnnotDataset( dir_dataset=dir_dataset, annot_file=params.annot_file,
transform=transform, annot_flag=params.annot_flag)
dataloader = DataLoader(dataset, batch_size=params.batch_size, shuffle=params.shuffle,
num_workers=params.num_workers)
return dataloader, dataset

View File

@@ -0,0 +1,43 @@
import glob
import pandas as pd
import torch
from PIL import Image
from torch.utils.data import Dataset
class DigitRealImageAnnotDataset(Dataset):
def __init__( self, dir_dataset, annot_file, transform=None, annot_flag=True, img_type="png" ):
self.dir_dataset = dir_dataset
print(f"Loading dataset from {dir_dataset}")
self.transform = transform
self.annot_flag = annot_flag
# a list of image paths sorted. dir_dataset is the root dir of the datasets (color)
self.img_files = sorted(glob.glob(f"{self.dir_dataset}/*.{img_type}"))
print(f"Found {len(self.img_files)} images")
if self.annot_flag:
self.annot_dataframe = pd.read_csv(annot_file, sep=",")
def __getitem__(self, idx):
"""Returns a tuple of (img, annot) where annot is a tensor of shape (3,1)"""
# read in image
img = Image.open(self.img_files[idx])
img = self.transform(img)
img = img.permute(0, 2, 1) # (3,240,320) -> (3,320,240)
# read in region annotations
if self.annot_flag:
img_name = self.img_files[idx]
row_filter = self.annot_dataframe["img_names"] == img_name
region_attr = self.annot_dataframe.loc[
row_filter, ["center_x", "center_y", "radius"]
]
annot = (torch.tensor(region_attr.values, dtype=torch.int32) if (len(region_attr) > 0) else torch.tensor([]))
data = img
if self.annot_flag:
data = (img, annot)
return data
def __len__(self):
return len(self.img_files)

View File

@@ -0,0 +1,47 @@
"""
Script for generating sphere ground truth normal images.
"""
import math
import numpy as np
def generate_sphere_gt_normals(img_mask, center_x, center_y, radius):
"""
Generates sphere ground truth normal images for an image.
Args:
img_mask: a numpy array of shape [H, W, 3]
center_x: x coordinate of the center of the sphere
center_y: y coordinate of the center of the sphere
radius: the radius of the sphere
Returns:
img_normal: a numpy array of shape [H, W, 3]
"""
img_normal = np.zeros(img_mask.shape, dtype="float64")
for y in range(img_mask.shape[0]):
for x in range(img_mask.shape[1]):
img_normal[y, x, 0] = 0.0
img_normal[y, x, 1] = 0.0
img_normal[y, x, 2] = 1.0
if np.sum(img_mask[y, x, :]) > 0:
dist = np.sqrt((x - center_x) ** 2 + (y - center_y) ** 2)
ang_xz = math.acos(dist / radius)
ang_xy = math.atan2(y - center_y, x - center_x)
nx = math.cos(ang_xz) * math.cos(ang_xy)
ny = math.cos(ang_xz) * math.sin(ang_xy)
nz = math.sin(ang_xz)
img_normal[y, x, 0] = nx
img_normal[y, x, 1] = -ny
img_normal[y, x, 2] = nz
norm_val = np.linalg.norm(img_normal[y, x, :])
img_normal[y, x, :] = img_normal[y, x, :] / norm_val
# img_normal between [-1., 1.], converting to [0., 1.]
img_normal = (img_normal + 1.0) * 0.5
return img_normal

View File

@@ -0,0 +1 @@
from .digit_sensor import *

View File

@@ -0,0 +1,37 @@
import time
from digit_interface import Digit
from digit_interface.digit import DigitDefaults
class DigitSensor:
def __init__(self, fps: int, resolution: str, serial_num: str):
self.fps = fps
self.resolution = resolution
self.serial_num = serial_num
def __call__(self, *args, **kwargs):
"""Calls the digit sensor."""
digit = self.setup_digit()
return digit
def __str__(self):
return f"DigitSensor(fps={self.fps}, resolution={self.resolution}, serial_num={self.serial_num})"
def setup_digit(self,):
"Sets up the digit sensor and returns it."
digit = Digit(self.serial_num)
digit.connect()
rgb_list = [(15, 0, 0), (0, 15, 0), (0, 0, 15)]
for rgb in rgb_list:
digit.set_intensity_rgb(*rgb)
time.sleep(1)
digit.set_intensity(15)
resolution = DigitDefaults.STREAMS[self.resolution]
digit.set_resolution(resolution)
fps = Digit.STREAMS[self.resolution]["fps"][str(self.fps) + "fps"]
digit.set_fps(fps)
return digit

View File

@@ -0,0 +1 @@
from .image import ImageHandler

View File

@@ -0,0 +1,38 @@
# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved.
import cv2
import numpy as np
from PIL import Image
from torchvision import transforms
class ImageHandler:
def __init__(self, img_path, convert="RGB"):
self.img = Image.open(img_path).convert(convert)
self.convert = convert
@staticmethod
def tensor_to_PIL(self, img_tensor):
img_tensor = img_tensor.squeeze_(0)
return transforms.ToPILImage()(img_tensor).convert(self.convert)
@property
def tensor(self):
return transforms.ToTensor()(self.img).unsqueeze_(0)
@property
def image(self):
return self.img
@property
def nparray(self):
return np.array(self.img)
@staticmethod
def save(file_name, img):
if isinstance(img, Image.Image):
# this is a PIL image
img.save(file_name)
else:
# cv2 image
cv2.imwrite(file_name, img)

4
digit_depth/third_party/__init__.py vendored Normal file
View File

@@ -0,0 +1,4 @@
from .data_utils import *
from .geom_utils import *
from .vis_utils import *
from .poisson import *

30
digit_depth/third_party/data_utils.py vendored Normal file
View File

@@ -0,0 +1,30 @@
import numpy as np
import torch
"""
dataloader, logger helpers
"""
def pandas_col_to_numpy(df_col):
df_col = df_col.apply(lambda x: np.fromstring(x.replace("\n", "").replace("[", "").replace("]", "").replace(" ", " "), sep=", "))
df_col = np.stack(df_col)
return df_col
def pandas_string_to_numpy(arr_str):
arr_npy = np.fromstring(arr_str.replace("\n", "").replace("[", "").replace("]", "").replace(" ", " "),sep=", ")
return arr_npy
def interpolate_img(img, rows, cols):
"""
img: C x H x W
"""
img = torch.nn.functional.interpolate(img, size=cols)
img = torch.nn.functional.interpolate(img.permute(0, 2, 1), size=rows)
img = img.permute(0, 2, 1)
return img

531
digit_depth/third_party/geom_utils.py vendored Normal file
View File

@@ -0,0 +1,531 @@
# Copyright (c) Facebook, Inc. and its affiliates.
import copy
import numpy as np
import open3d as o3d
import torch
from scipy import ndimage
from digit_depth.third_party.poisson import poisson_reconstruct
from digit_depth.third_party.vis_utils import visualize_inlier_outlier
"""
Common functions
"""
def flip(x):
return torch.flip(x, dims=[0])
def min_clip(x, min_val):
return torch.max(x, min_val)
def max_clip(x, max_val):
return torch.min(x, max_val)
def normalize(x, min_val, max_val):
return (x - torch.min(x)) * (max_val - min_val) / (
torch.max(x) - torch.min(x)
) + min_val
def mask_background(x, bg_mask, bg_val=0.0):
if bg_mask is not None:
x[bg_mask] = bg_val
return x
def remove_background_pts(pts, bg_mask=None):
if bg_mask is not None:
fg_mask_pts = ~bg_mask.view(-1)
points3d_x = pts[0, fg_mask_pts].view(1, -1)
points3d_y = pts[1, fg_mask_pts].view(1, -1)
points3d_z = pts[2, fg_mask_pts].view(1, -1)
pts_fg = torch.cat((points3d_x, points3d_y, points3d_z), dim=0)
else:
pts_fg = pts
return pts_fg
"""
3D transform helper functions
"""
def Rt_to_T(R=None, t=None, device=None):
"""
:param R: rotation, (B, 3, 3) or (3, 3)
:param t: translation, (B, 3) or (3)
:return: T, (B, 4, 4) or (4, 4)
"""
T = torch.eye(4, device=device)
if (len(R.shape) > 2) & (len(t.shape) > 1): # batch version
B = R.shape[0]
T = T.repeat(B, 1, 1)
if R is not None:
T[:, 0:3, 0:3] = R
if t is not None:
T[:, 0:3, -1] = t
else:
if R is not None:
T[0:3, 0:3] = R
if t is not None:
T[0:3, -1] = t
return T
def transform_pts3d(T, pts):
"""
T: 4x4
pts: 3xN
returns 3xN
"""
D, N = pts.shape
if D == 3:
pts_tf = (
torch.cat((pts, torch.ones(1, N)), dim=0)
if torch.is_tensor(pts)
else np.concatenate((pts, torch.ones(1, N)), axis=0)
)
pts_tf = torch.matmul(T, pts_tf) if torch.is_tensor(pts) else np.matmul(T, pts_tf)
pts_tf = pts_tf[0:3, :]
return pts_tf
"""
3D-2D projection / conversion functions
OpenGL transforms reference: http://www.songho.ca/opengl/gl_transform.html
"""
def _vectorize_pixel_coords(rows, cols, device=None):
y_range = torch.arange(rows, device=device)
x_range = torch.arange(cols, device=device)
grid_x, grid_y = torch.meshgrid(x_range, y_range)
pixel_pos = torch.stack([grid_x.reshape(-1), grid_y.reshape(-1)], dim=0) # 2 x N
return pixel_pos
def _clip_to_pixel(clip_pos, img_shape, params):
H, W = img_shape
# clip -> ndc coords
x_ndc = clip_pos[0, :] / clip_pos[3, :]
y_ndc = clip_pos[1, :] / clip_pos[3, :]
# z_ndc = clip_pos[2, :] / clip_pos[3, :]
# ndc -> pixel coords
x_pix = (W - 1) / 2 * (x_ndc + 1) # [-1, 1] -> [0, W-1]
y_pix = (H - 1) / 2 * (y_ndc + 1) # [-1, 1] -> [0, H-1]
# z_pix = (f-n) / 2 * z_ndc + (f+n) / 2
pixel_pos = torch.stack((x_pix, y_pix), dim=0)
return pixel_pos
def _pixel_to_clip(pix_pos, depth_map, params):
"""
:param pix_pos: position in pixel space, (2, N)
:param depth_map: depth map, (H, W)
:return: clip_pos position in clip space, (4, N)
"""
x_pix = pix_pos[0, :]
y_pix = pix_pos[1, :]
H, W = depth_map.shape
f = params.z_far
n = params.z_near
# pixel -> ndc coords
x_ndc = 2 / (W - 1) * x_pix - 1 # [0, W-1] -> [-1, 1]
y_ndc = 2 / (H - 1) * y_pix - 1 # [0, H-1] -> [-1, 1]
z_buf = depth_map[y_pix, x_pix]
# ndc -> clip coords
z_eye = -z_buf
w_c = -z_eye
x_c = x_ndc * w_c
y_c = y_ndc * w_c
z_c = -(f + n) / (f - n) * z_eye - 2 * f * n / (f - n) * 1.0
clip_pos = torch.stack([x_c, y_c, z_c, w_c], dim=0)
return clip_pos
def _clip_to_eye(clip_pos, P):
P_inv = torch.inverse(P)
eye_pos = torch.matmul(P_inv, clip_pos)
return eye_pos
def _eye_to_clip(eye_pos, P):
clip_pos = torch.matmul(P, eye_pos)
return clip_pos
def _eye_to_world(eye_pos, V):
V_inv = torch.inverse(V)
world_pos = torch.matmul(V_inv, eye_pos)
world_pos = world_pos / world_pos[3, :]
return world_pos
def _world_to_eye(world_pos, V):
eye_pos = torch.matmul(V, world_pos)
return eye_pos
def _world_to_object(world_pos, M):
M_inv = torch.inverse(M)
obj_pos = torch.matmul(M_inv, world_pos)
obj_pos = obj_pos / obj_pos[3, :]
return obj_pos
def _object_to_world(obj_pos, M):
world_pos = torch.matmul(M, obj_pos)
world_pos = world_pos / world_pos[3, :]
return world_pos
def depth_to_pts3d(depth, P, V, params=None, ordered_pts=False):
"""
:param depth: depth map, (C, H, W) or (H, W)
:param P: projection matrix, (4, 4)
:param V: view matrix, (4, 4)
:return: world_pos position in 3d world coordinates, (3, H, W) or (3, N)
"""
assert 2 <= len(depth.shape) <= 3
assert P.shape == (4, 4)
assert V.shape == (4, 4)
depth_map = depth.squeeze(0) if (len(depth.shape) == 3) else depth
H, W = depth_map.shape
pixel_pos = _vectorize_pixel_coords(rows=H, cols=W)
clip_pos = _pixel_to_clip(pixel_pos, depth_map, params)
eye_pos = _clip_to_eye(clip_pos, P)
world_pos = _eye_to_world(eye_pos, V)
world_pos = world_pos[0:3, :] / world_pos[3, :]
if ordered_pts:
H, W = depth_map.shape
world_pos = world_pos.reshape(world_pos.shape[0], H, W)
return world_pos
"""
Optical flow functions
"""
def analytic_flow(img1, depth1, P, V1, V2, M1, M2, gel_depth, params):
C, H, W = img1.shape
depth_map = depth1.squeeze(0) if (len(depth1.shape) == 3) else depth1
pixel_pos = _vectorize_pixel_coords(rows=H, cols=W, device=img1.device)
clip_pos = _pixel_to_clip(pixel_pos, depth_map, params)
eye_pos = _clip_to_eye(clip_pos, P)
world_pos = _eye_to_world(eye_pos, V1)
obj_pos = _world_to_object(world_pos, M1)
world_pos = _object_to_world(obj_pos, M2)
eye_pos = _world_to_eye(world_pos, V2)
clip_pos = _eye_to_clip(eye_pos, P)
pixel_pos_proj = _clip_to_pixel(clip_pos, (H, W), params)
pixel_flow = pixel_pos - pixel_pos_proj
flow_map = pixel_flow.reshape(pixel_flow.shape[0], H, W)
# mask out background gel pixels
mask_idxs = depth_map >= gel_depth
flow_map[:, mask_idxs] = 0.0
return flow_map
"""
Normal to depth conversion / integration functions
"""
def _preproc_depth(img_depth, bg_mask=None):
if bg_mask is not None:
img_depth = mask_background(img_depth, bg_mask=bg_mask, bg_val=0.0)
return img_depth
def _preproc_normal(img_normal, bg_mask=None):
"""
img_normal: lies in range [0, 1]
"""
# 0.5 corresponds to 0
img_normal = img_normal - 0.5
# normalize
img_normal = img_normal / torch.linalg.norm(img_normal, dim=0)
# set background to have only z normals (flat, facing camera)
if bg_mask is not None:
img_normal[0:2, bg_mask] = 0.0
img_normal[2, bg_mask] = 1.0
return img_normal
def _depth_to_grad_depth(img_depth, bg_mask=None):
gradx = ndimage.sobel(img_depth.cpu().detach().numpy(), axis=1, mode="constant")
grady = ndimage.sobel(img_depth.cpu().detach().numpy(), axis=0, mode="constant")
gradx = torch.FloatTensor(gradx, device=img_depth.device)
grady = torch.FloatTensor(grady, device=img_depth.device)
if bg_mask is not None:
gradx = mask_background(gradx, bg_mask=bg_mask, bg_val=0.0)
grady = mask_background(grady, bg_mask=bg_mask, bg_val=0.0)
return gradx, grady
def _normal_to_grad_depth(img_normal, gel_width=1.0, gel_height=1.0, bg_mask=None):
# Ref: https://stackoverflow.com/questions/34644101/calculate-surface-normals-from-depth-image-using-neighboring-pixels-cross-produc/34644939#34644939
EPS = 1e-1
nz = torch.max(torch.tensor(EPS), img_normal[2, :])
dzdx = -(img_normal[0, :] / nz).squeeze()
dzdy = -(img_normal[1, :] / nz).squeeze()
# taking out negative sign as we are computing gradient of depth not z
# since z is pointed towards sensor, increase in z corresponds to decrease in depth
# i.e., dz/dx = -ddepth/dx
ddepthdx = -dzdx
ddepthdy = -dzdy
# sim: pixel axis v points in opposite dxn of camera axis y
ddepthdu = ddepthdx
ddepthdv = -ddepthdy
gradx = ddepthdu # cols
grady = ddepthdv # rows
# convert units from pixel to meters
C, H, W = img_normal.shape
gradx = gradx * (gel_width / W)
grady = grady * (gel_height / H)
if bg_mask is not None:
gradx = mask_background(gradx, bg_mask=bg_mask, bg_val=0.0)
grady = mask_background(grady, bg_mask=bg_mask, bg_val=0.0)
return gradx, grady
def _integrate_grad_depth(gradx, grady, boundary=None, bg_mask=None, max_depth=0.0):
if boundary is None:
boundary = torch.zeros((gradx.shape[0], gradx.shape[1]))
img_depth_recon = poisson_reconstruct(
grady.cpu().detach().numpy(),
gradx.cpu().detach().numpy(),
boundary.cpu().detach().numpy(),
)
img_depth_recon = torch.tensor(
img_depth_recon, device=gradx.device, dtype=torch.float32
)
if bg_mask is not None:
img_depth_recon = mask_background(img_depth_recon, bg_mask)
# after integration, img_depth_recon lies between 0. (bdry) and a -ve val (obj depth)
# rescale to make max depth as gel depth and obj depth as +ve values
img_depth_recon = max_clip(img_depth_recon, max_val=torch.tensor(0.0)) + max_depth
return img_depth_recon
def depth_to_depth(img_depth, bg_mask=None, boundary=None, params=None):
# preproc depth img
img_depth = _preproc_depth(img_depth=img_depth.squeeze(), bg_mask=bg_mask)
# get grad depth
gradx, grady = _depth_to_grad_depth(img_depth=img_depth.squeeze(), bg_mask=bg_mask)
# integrate grad depth
img_depth_recon = _integrate_grad_depth(
gradx, grady, boundary=boundary, bg_mask=bg_mask
)
return img_depth_recon
def normal_to_depth(
img_normal,
bg_mask=None,
boundary=None,
gel_width=0.02,
gel_height=0.03,
max_depth=0.02,
):
# preproc normal img
img_normal = _preproc_normal(img_normal=img_normal, bg_mask=bg_mask)
# get grad depth
gradx, grady = _normal_to_grad_depth(
img_normal=img_normal,
gel_width=gel_width,
gel_height=gel_height,
bg_mask=bg_mask,
)
# integrate grad depth
img_depth_recon = _integrate_grad_depth(
gradx, grady, boundary=boundary, bg_mask=bg_mask, max_depth=max_depth
)
return img_depth_recon
"""
3D registration functions
"""
def _fpfh(pcd, normals):
pcd.normals = o3d.utility.Vector3dVector(normals)
pcd_fpfh = o3d.pipelines.registration.compute_fpfh_feature(
pcd, o3d.geometry.KDTreeSearchParamHybrid(radius=0.3, max_nn=64)
)
return pcd_fpfh
def _fast_global_registration(source, target, source_fpfh, target_fpfh):
distance_threshold = 0.01
result = o3d.pipelines.registration.registration_fast_based_on_feature_matching(
source,
target,
source_fpfh,
target_fpfh,
o3d.pipelines.registration.FastGlobalRegistrationOption(
maximum_correspondence_distance=distance_threshold
),
)
transformation = result.transformation
metrics = [result.fitness, result.inlier_rmse, result.correspondence_set]
return transformation, metrics
def fgr(source, target, src_normals, tgt_normals):
source_fpfh = _fpfh(source, src_normals)
target_fpfh = _fpfh(target, tgt_normals)
transformation, metrics = _fast_global_registration(
source=source, target=target, source_fpfh=source_fpfh, target_fpfh=target_fpfh
)
return transformation, metrics
def icp(source, target, T_init=np.eye(4), mcd=0.1, max_iter=50, type="point_to_plane"):
if type == "point_to_point":
result = o3d.pipelines.registration.registration_icp(
source=source,
target=target,
max_correspondence_distance=mcd,
init=T_init,
estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPoint(),
criteria=o3d.pipelines.registration.ICPConvergenceCriteria(
max_iteration=max_iter
),
)
else:
result = o3d.pipelines.registration.registration_icp(
source=source,
target=target,
max_correspondence_distance=mcd,
init=T_init,
estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPlane(),
criteria=o3d.pipelines.registration.ICPConvergenceCriteria(
max_iteration=max_iter
),
)
transformation = result.transformation
metrics = [result.fitness, result.inlier_rmse, result.correspondence_set]
return transformation, metrics
"""
Open3D helper functions
"""
def remove_outlier_pts(points3d, nb_neighbors=20, std_ratio=10.0, vis=False):
points3d_np = (
points3d.cpu().detach().numpy() if torch.is_tensor(points3d) else points3d
)
cloud = o3d.geometry.PointCloud()
cloud.points = copy.deepcopy(o3d.utility.Vector3dVector(points3d_np.transpose()))
cloud_filt, ind_filt = cloud.remove_statistical_outlier(
nb_neighbors=nb_neighbors, std_ratio=std_ratio
)
if vis:
visualize_inlier_outlier(cloud, ind_filt)
points3d_filt = np.asarray(cloud_filt.points).transpose()
points3d_filt = (
torch.tensor(points3d_filt) if torch.is_tensor(points3d) else points3d_filt
)
return points3d_filt
def init_points_to_clouds(clouds, points3d, colors=None):
for idx, cloud in enumerate(clouds):
points3d_np = (
points3d[idx].cpu().detach().numpy()
if torch.is_tensor(points3d[idx])
else points3d[idx]
)
cloud.points = copy.deepcopy(
o3d.utility.Vector3dVector(points3d_np.transpose())
)
if colors is not None:
cloud.paint_uniform_color(colors[idx])
return clouds

81
digit_depth/third_party/poisson.py vendored Normal file
View File

@@ -0,0 +1,81 @@
"""
poisson_reconstruct.py
Fast Poisson Reconstruction in Python
Copyright (c) 2014 Jack Doerner
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import copy
import math
import numpy
import scipy
import scipy.fftpack
def poisson_reconstruct(grady, gradx, boundarysrc):
# Thanks to Dr. Ramesh Raskar for providing the original matlab code from which this is derived
# Dr. Raskar's version is available here: http://web.media.mit.edu/~raskar/photo/code.pdf
# Laplacian
gyy = grady[1:, :-1] - grady[:-1, :-1]
gxx = gradx[:-1, 1:] - gradx[:-1, :-1]
f = numpy.zeros(boundarysrc.shape)
f[:-1, 1:] += gxx
f[1:, :-1] += gyy
# Boundary image
boundary = copy.deepcopy(boundarysrc) # .copy()
boundary[1:-1, 1:-1] = 0
# Subtract boundary contribution
f_bp = (
-4 * boundary[1:-1, 1:-1]
+ boundary[1:-1, 2:]
+ boundary[1:-1, 0:-2]
+ boundary[2:, 1:-1]
+ boundary[0:-2, 1:-1]
)
f = f[1:-1, 1:-1] - f_bp
# Discrete Sine Transform
tt = scipy.fftpack.dst(f, norm="ortho")
fsin = scipy.fftpack.dst(tt.T, norm="ortho").T
# Eigenvalues
(x, y) = numpy.meshgrid(
range(1, f.shape[1] + 1), range(1, f.shape[0] + 1), copy=True
)
denom = (2 * numpy.cos(math.pi * x / (f.shape[1] + 2)) - 2) + (
2 * numpy.cos(math.pi * y / (f.shape[0] + 2)) - 2
)
f = fsin / denom
# Inverse Discrete Sine Transform
tt = scipy.fftpack.idst(f, norm="ortho")
img_tt = scipy.fftpack.idst(tt.T, norm="ortho").T
# New center + old boundary
result = copy.deepcopy(boundary)
result[1:-1, 1:-1] = img_tt
return result

372
digit_depth/third_party/vis_utils.py vendored Normal file
View File

@@ -0,0 +1,372 @@
# Copyright (c) Facebook, Inc. and its affiliates.
import copy
import logging
import time
import types
import cv2
import matplotlib.pyplot as plt
import numpy as np
import open3d as o3d
import torch
from attrdict import AttrDict
from matplotlib.patches import Circle, Rectangle
log = logging.getLogger(__name__)
"""
Open3d visualization functions
"""
class Visualizer3d:
def __init__(self, base_path="", view_params=None, tsleep=0.01):
self.vis = o3d.visualization.VisualizerWithKeyCallback()
self.vis.create_window(top=0, left=750, width=1080, height=1080)
self.tsleep = tsleep
self.base_path = base_path
self.view_params = AttrDict(
{
"fov": 0,
"front": [0.0, 0.0, 0.0],
"lookat": [0.0, 0.0, 0.0],
"up": [0.0, 0.0, 0.0],
"zoom": 0.5,
}
)
if view_params is not None:
self.view_params = view_params
self._init_options()
def _init_options(self):
opt = self.vis.get_render_option()
opt.show_coordinate_frame = True
opt.background_color = np.asarray([0.6, 0.6, 0.6])
# pause option
self.paused = types.SimpleNamespace()
self.paused.value = False
self.vis.register_key_action_callback(
ord("P"),
lambda a, b, c: b == 1
or setattr(self.paused, "value", not self.paused.value),
)
def _init_geom_cloud(self):
return o3d.geometry.PointCloud()
def _init_geom_frame(self, frame_size=0.01, frame_origin=[0, 0, 0]):
return o3d.geometry.TriangleMesh.create_coordinate_frame(
size=frame_size, origin=frame_origin
)
def _init_geom_mesh(self, mesh_name, color=None, wireframe=False):
mesh = o3d.io.read_triangle_mesh(
f"{self.base_path}/local/resources/meshes/{mesh_name}"
)
mesh.compute_vertex_normals()
if wireframe:
mesh = o3d.geometry.LineSet.create_from_triangle_mesh(mesh)
if color is not None:
mesh.paint_uniform_color(color)
return mesh
def init_geometry(
self,
geom_type,
num_items=1,
sizes=None,
file_names=None,
colors=None,
wireframes=None,
):
geom_list = []
for i in range(0, num_items):
if geom_type == "cloud":
geom = self._init_geom_cloud()
elif geom_type == "frame":
frame_size = sizes[i] if sizes is not None else 0.001
geom = self._init_geom_frame(frame_size=frame_size)
elif geom_type == "mesh":
color = colors[i] if colors is not None else None
wireframe = wireframes[i] if wireframes is not None else False
geom = self._init_geom_mesh(file_names[i], color, wireframe)
else:
log.error(
f"[Visualizer3d::init_geometry] geom_type {geom_type} not found."
)
geom_list.append(geom)
return geom_list
def add_geometry(self, geom_list):
if geom_list is None:
return
for geom in geom_list:
self.vis.add_geometry(geom)
def remove_geometry(self, geom_list, reset_bounding_box=False):
if geom_list is None:
return
for geom in geom_list:
self.vis.remove_geometry(geom, reset_bounding_box=reset_bounding_box)
def update_geometry(self, geom_list):
for geom in geom_list:
self.vis.update_geometry(geom)
def set_view(self):
ctr = self.vis.get_view_control()
ctr.change_field_of_view(self.view_params.fov)
ctr.set_front(self.view_params.front)
ctr.set_lookat(self.view_params.lookat)
ctr.set_up(self.view_params.up)
ctr.set_zoom(self.view_params.zoom)
def set_view_cam(self, T):
ctr = self.vis.get_view_control()
cam = ctr.convert_to_pinhole_camera_parameters()
cam.extrinsic = T
ctr.convert_from_pinhole_camera_parameters(cam)
def set_zoom(self):
ctr = self.vis.get_view_control()
ctr.set_zoom(1.5)
def rotate_view(self):
ctr = self.vis.get_view_control()
ctr.rotate(10.0, -0.0)
def pan_scene(self, max=300):
for i in range(0, max):
self.rotate_view()
self.render()
def render(self, T=None):
if T is not None:
self.set_view_cam(T)
else:
self.set_view()
self.vis.poll_events()
self.vis.update_renderer()
time.sleep(self.tsleep)
def transform_geometry_absolute(self, transform_list, geom_list):
for idx, geom in enumerate(geom_list):
T = transform_list[idx]
geom.transform(T)
def transform_geometry_relative(
self, transform_prev_list, transform_curr_list, geom_list
):
for idx, geom in enumerate(geom_list):
T_prev = transform_prev_list[idx]
T_curr = transform_curr_list[idx]
# a. rotate R1^{-1}*R2 about center t1
geom.rotate(
torch.matmul(torch.inverse(T_prev[0:3, 0:3]), T_curr[0:3, 0:3]),
center=(T_prev[0, -1], T_prev[1, -1], T_prev[2, -1]),
)
# b. translate by t2 - t1
geom.translate(
(
T_curr[0, -1] - T_prev[0, -1],
T_curr[1, -1] - T_prev[1, -1],
T_curr[2, -1] - T_prev[2, -1],
)
)
def clear_geometries(self):
self.vis.clear_geometries()
def destroy(self):
self.vis.destroy_window()
def visualize_registration(source, target, transformation, vis3d=None, colors=None):
source_copy = copy.deepcopy(source)
target_copy = copy.deepcopy(target)
source_copy.transform(transformation)
clouds = [target_copy, source_copy]
if colors is not None:
clouds[0].paint_uniform_color(colors[1])
clouds[1].paint_uniform_color(colors[0])
vis3d.add_geometry(clouds)
vis3d.render()
vis3d.remove_geometry(clouds)
def visualize_geometries_o3d(
vis3d, clouds=None, frames=None, meshes=None, transforms=None
):
if meshes is not None:
meshes = [copy.deepcopy(mesh) for mesh in meshes]
if transforms is not None:
vis3d.transform_geometry_absolute(transforms, meshes)
if frames is not None:
frames = [copy.deepcopy(frame) for frame in frames]
if transforms is not None:
vis3d.transform_geometry_absolute(transforms, frames)
if clouds is not None:
vis3d.add_geometry(clouds)
if meshes is not None:
vis3d.add_geometry(meshes)
if frames is not None:
vis3d.add_geometry(frames)
vis3d.render()
if clouds is not None:
vis3d.remove_geometry(clouds)
if meshes is not None:
vis3d.remove_geometry(meshes)
if frames is not None:
vis3d.remove_geometry(frames)
def visualize_inlier_outlier(cloud, ind):
inlier_cloud = cloud.select_by_index(ind)
outlier_cloud = cloud.select_by_index(ind, invert=True)
print("Showing outliers (red) and inliers (gray): ")
outlier_cloud.paint_uniform_color([1, 0, 0])
inlier_cloud.paint_uniform_color([0.8, 0.8, 0.8])
o3d.visualization.draw_geometries(
[inlier_cloud, outlier_cloud],
zoom=0.3412,
front=[0.4257, -0.2125, -0.8795],
lookat=[2.6172, 2.0475, 1.532],
up=[-0.0694, -0.9768, 0.2024],
)
"""
Optical flow visualization functions
"""
def flow_to_color(flow_uv, cvt=cv2.COLOR_HSV2BGR):
hsv = np.zeros((flow_uv.shape[0], flow_uv.shape[1], 3), dtype=np.uint8)
hsv[:, :, 0] = 255
hsv[:, :, 1] = 255
mag, ang = cv2.cartToPolar(flow_uv[..., 0], flow_uv[..., 1])
hsv[..., 0] = ang * 180 / np.pi / 2
hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
flow_color = cv2.cvtColor(hsv, cvt)
return flow_color
def flow_to_arrows(img, flow, step=8):
img = copy.deepcopy(img)
# img = (255 * img).astype(np.uint8)
h, w = img.shape[:2]
y, x = np.mgrid[step / 2 : h : step, step / 2 : w : step].reshape(2, -1).astype(int)
fx, fy = 5.0 * flow[y, x].T
lines = np.vstack([x, y, x + fx, y + fy]).T.reshape(-1, 2, 2)
lines = np.int32(lines + 0.5)
cv2.polylines(img, lines, 0, color=(0, 255, 0), thickness=1)
for (x1, y1), (x2, y2) in lines:
cv2.circle(img, (x1, y1), 1, (0, 255, 0), -1)
return img
def depth_to_color(depth):
gray = (
np.clip((depth - depth.min()) / (depth.max() - depth.min()), 0, 1) * 255
).astype(np.uint8)
return cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR)
def visualize_flow_cv2(
img1, img2, flow_arrow=None, flow_color=None, win_size=(360, 360)
):
img_disp1 = np.concatenate([img1, img2], axis=1)
cv2.namedWindow("img1, img2", cv2.WINDOW_NORMAL)
cv2.resizeWindow("img1, img2", 2 * win_size[0], win_size[1])
cv2.imshow("img1, img2", img_disp1)
if flow_arrow is not None:
cv2.namedWindow("flow_arrow", cv2.WINDOW_NORMAL)
cv2.resizeWindow("flow_arrow", win_size[0], win_size[1])
cv2.imshow("flow_arrow", flow_arrow)
if flow_color is not None:
cv2.namedWindow("flow_color", cv2.WINDOW_NORMAL)
cv2.resizeWindow("flow_color", win_size[0], win_size[1])
cv2.imshow("flow_color", flow_color)
cv2.waitKey(300)
"""
General visualization functions
"""
def draw_rectangle(
center_x,
center_y,
size_x,
size_y,
ang=0.0,
edgecolor="dimgray",
facecolor=None,
linewidth=2,
):
R = np.array([[np.cos(ang), -np.sin(ang)], [np.sin(ang), np.cos(ang)]])
offset = np.matmul(R, np.array([[0.5 * size_x], [0.5 * size_y]]))
anchor_x = center_x - offset[0]
anchor_y = center_y - offset[1]
rect = Rectangle(
(anchor_x, anchor_y),
size_x,
size_y,
angle=(np.rad2deg(ang)),
facecolor=facecolor,
edgecolor=edgecolor,
linewidth=linewidth,
)
plt.gca().add_patch(rect)
def draw_circle(center_x, center_y, radius):
circle = Circle((center_x, center_y), color="dimgray", radius=radius)
plt.gca().add_patch(circle)
def visualize_imgs(fig, axs, img_list, titles=None, cmap=None):
for idx, img in enumerate(img_list):
if img is None:
continue
im = axs[idx].imshow(img, cmap=cmap)
if cmap is not None:
fig.colorbar(im, ax=axs[idx])
if titles is not None:
axs[idx].set_title(titles[idx])

View File

@@ -0,0 +1,2 @@
from .color2normal_dataset import Color2NormalDataset
from .mlp_model import MLP

View File

@@ -0,0 +1,21 @@
import pandas as pd
import torch
from torch.utils.data import Dataset
class Color2NormalDataset(Dataset):
def __init__(self, csv):
self.data = pd.read_csv(csv)
def __len__(self):
return self.data['X'].count()
def __getitem__(self,idx):
x = self.data['X'][idx]/120
y = self.data['Y'][idx]/160
r = self.data['R'][idx]/255
g = self.data['G'][idx]/255
b = self.data['B'][idx]/255
nx = self.data['Nx'][idx]/255
ny = self.data['Ny'][idx]/255
nz= self.data['Nz'][idx]/255
return torch.tensor((x, y, r, g, b), dtype=torch.float32), torch.tensor((nx, ny, nz), dtype=torch.float32)

View File

@@ -0,0 +1,27 @@
import torch.nn as nn
import torch.nn.functional as F
class MLP(nn.Module):
dropout_p = 0.05
def __init__(
self, input_size=5, output_size=3, hidden_size=32):
super().__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.fc2 = nn.Linear(hidden_size, hidden_size)
self.fc3 = nn.Linear(hidden_size, hidden_size)
self.fc4 = nn.Linear(hidden_size, output_size)
self.drop = nn.Dropout(p=self.dropout_p)
def forward(self, x):
x = F.relu(self.fc1(x))
x = self.drop(x)
x = F.relu(self.fc2(x))
x = self.drop(x)
x = self.fc3(x)
x = self.drop(x)
x = self.fc4(x)
return x

View File

@@ -0,0 +1,49 @@
import cv2
import torch
import numpy as np
import pandas as pd
import copy
seed = 42
torch.seed = seed
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
def preproc_mlp(image) -> torch.Tensor:
""" Preprocess image for input to model.
Args: image: OpenCV image in BGR format
Return: tensor of shape (R*C,5) where R=320 and C=240 for DIGIT images
5-columns are: X,Y,R,G,B
"""
img = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
img = img / 255.0
xy_coords = np.flip(np.column_stack(np.where(np.all(img >= 0, axis=2))), axis=1)
rgb = np.reshape(img, (np.prod(img.shape[:2]), 3))
pixel_numbers = np.expand_dims(np.arange(1, xy_coords.shape[0] + 1), axis=1)
value_base = np.hstack([pixel_numbers, xy_coords, rgb])
df_base = pd.DataFrame(value_base, columns=['pixel_number', 'X', 'Y', 'R', 'G', 'B'])
df_base['X'] = df_base['X'] / 240
df_base['Y'] = df_base['Y'] / 320
del df_base['pixel_number']
test_tensor = torch.tensor(df_base[['X', 'Y', 'R', 'G', 'B']].values, dtype=torch.float32).to(device)
return test_tensor
def post_proc_mlp(model_output: torch.Tensor):
""" Postprocess model output to get normal map.
Args: model_output: torch.Tensor of shape (1,3)
Return: two torch.Tensor of shape (1,3)
"""
test_np = model_output.reshape(320, 240, 3)
normal = copy.deepcopy(test_np) # surface normal image
test_np = torch.tensor(test_np,
dtype=torch.float32) # convert to torch tensor for later processing in gradient computation
test_np = test_np.permute(2, 0, 1) # swap axes to (3,320,240)
test_np = test_np # convert to uint8 for visualization
return test_np, normal

25
requirements.txt Normal file
View File

@@ -0,0 +1,25 @@
certifi==2022.6.15
digit-interface==0.2.1
numpy==1.23.1
opencv-python~=4.5.3.56
pip==22.1.2
PyQt5==5.15.7
PyQt5-Qt5==5.15.2
PyQt5-sip==12.11.0
pyudev==0.23.2
setuptools==61.2.0
six==1.16.0
wheel==0.37.1
torch~=1.12.0
pandas~=1.4.3
pillow~=9.2.0
torchvision~=0.13.0
scipy~=1.9.0
open3d~=0.15.2
matplotlib~=3.5.2
attrdict~=2.0.1
hydra~=2.5
imageio~=2.21.0
scikit-learn~=1.1.1
wandb~=0.13.0
tqdm~=4.64.0

0
scripts/__init__.py Normal file
View File

View File

@@ -0,0 +1,103 @@
"""
Creates color+normal datasets based on annotation file.
The datasets can be used to train MLP, CycleGAN, Pix2Pix models.
"""
import os
import cv2
import hydra
import imageio
import numpy as np
import torch
from digit_depth.dataio.create_csv import (combine_csv, create_pixel_csv,
create_train_test_csv)
from digit_depth.dataio.data_loader import data_loader
from digit_depth.dataio.generate_sphere_gt_normals import generate_sphere_gt_normals
from digit_depth.third_party import data_utils
base_path = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
@hydra.main(config_path=f"{base_path}/config", config_name="rgb_to_normal.yaml", version_base=None)
def main(cfg):
normal_dataloader, normal_dataset = data_loader(
dir_dataset=os.path.join(base_path, "images"), params=cfg.dataloader
)
dirs = [
f"{base_path}/datasets/A/imgs",
f"{base_path}/datasets/B/imgs",
f"{base_path}/datasets/A/csv",
f"{base_path}/datasets/B/csv",
f"{base_path}/datasets/train_test_split",
]
for dir in dirs:
print(f"Creating directory: {dir}")
os.makedirs(f"{dir}", exist_ok=True)
# iterate over images
img_idx = 0
radius_bearing = np.int32(0.5 * 6.0 * cfg.mm_to_pixel)
while img_idx < len(normal_dataset):
# read img + annotations
data = normal_dataset[img_idx]
if cfg.dataloader.annot_flag:
img, annot = data
if annot.shape[0] == 0:
img_idx = img_idx + 1
continue
else:
img = data
# get annotation circle params
if cfg.dataloader.annot_flag:
annot_np = annot.cpu().detach().numpy()
center_y, center_x, radius_annot = (
annot_np[0][1],
annot_np[0][0],
annot_np[0][2],
)
else:
center_y, center_x, radius_annot = 0, 0, 0
img_color_np = (img.permute(2, 1, 0).cpu().detach().numpy()) # (3,320,240) -> (240,320,3)
# apply foreground mask
fg_mask = np.zeros(img_color_np.shape[:2], dtype="uint8")
fg_mask = cv2.circle(fg_mask, (center_x, center_y), radius_annot, 255, -1)
# 1. rgb -> normal (generate gt surface normals)
img_mask = cv2.bitwise_and(img_color_np, img_color_np, mask=fg_mask)
img_normal_np = generate_sphere_gt_normals(
img_mask, center_x, center_y, radius=radius_bearing
)
# 2. downsample and convert to NumPy: (320,240,3) -> (160,120,3)
img_normal_np = data_utils.interpolate_img(
img=torch.tensor(img_normal_np).permute(2, 0, 1), rows=160, cols=120)
img_normal_np = img_normal_np.permute(1, 2, 0).cpu().detach().numpy()
img_color_ds = data_utils.interpolate_img(
img=torch.tensor(img_color_np).permute(2, 0, 1), rows=160, cols=120)
img_color_np = img_color_ds.permute(1, 2, 0).cpu().detach().numpy()
# 3. save csv files for color and normal images
if cfg.dataset.save_dataset:
imageio.imwrite(
f"{dirs[0]}/{img_idx:04d}.png", (img_color_np * 255).astype(np.uint8)
)
imageio.imwrite(f"{dirs[1]}/{img_idx:04d}.png", (img_normal_np*255).astype(np.uint8))
print(f"Saved image {img_idx:04d}")
img_idx += 1
# post-process CSV files and create train/test split
create_pixel_csv( img_dir=dirs[0], save_dir=dirs[2], img_type="color")
create_pixel_csv(img_dir=dirs[1], save_dir=dirs[3], img_type="normal")
combine_csv(dirs[2],img_type="color")
combine_csv(dirs[3],img_type="normal")
create_train_test_csv(color_path=f'{dirs[2]}/combined.csv',
normal_path=f'{dirs[3]}/combined.csv',
save_dir=dirs[4])
if __name__ == "__main__":
main()

84
scripts/depth.py Normal file
View File

@@ -0,0 +1,84 @@
""" Publishes a ROS topic with name /depth/compressed and shows the image on OpenCV window.
Issues: rqt_image_view is not showing the image due to some data conversion issues but OpenCV is showing the image."""
import os
import cv2
import hydra
import rospy
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
from sensor_msgs.msg import CompressedImage
from digit_depth.third_party import geom_utils
from digit_depth.digit import DigitSensor
from digit_depth.train import MLP
from digit_depth.train.prepost_mlp import *
from PIL import Image
seed = 42
torch.seed = seed
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
base_path = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
class ImageFeature:
def __init__(self):
# topic where we publish
self.image_pub = rospy.Publisher("/depth/compressed",
CompressedImage, queue_size=10)
self.br = CvBridge()
@hydra.main(config_path="/home/shuk/digit-depth/config", config_name="rgb_to_normal.yaml", version_base=None)
def show_depth(cfg):
model = torch.load(cfg.model_path).to(device)
model.eval()
ic = ImageFeature()
br = CvBridge()
rospy.init_node('depth_node', anonymous=True)
# base image depth map
base_img = cv2.imread(cfg.base_img_path)
base_img = preproc_mlp(base_img)
base_img_proc = model(base_img).cpu().detach().numpy()
base_img_proc, normal_base = post_proc_mlp(base_img_proc)
# get gradx and grady
gradx_base, grady_base = geom_utils._normal_to_grad_depth(img_normal=base_img_proc, gel_width=cfg.sensor.gel_width,
gel_height=cfg.sensor.gel_height, bg_mask=None)
# reconstruct depth
img_depth_base = geom_utils._integrate_grad_depth(gradx_base, grady_base, boundary=None, bg_mask=None,
max_depth=0.0237)
img_depth_base = img_depth_base.detach().cpu().numpy() # final depth image for base image
# setup digit sensor
digit = DigitSensor(30, "QVGA", cfg.sensor.serial_num)
digit_call = digit()
while not rospy.is_shutdown():
frame = digit_call.get_frame()
img_np = preproc_mlp(frame)
img_np = model(img_np).detach().cpu().numpy()
img_np, normal_img = post_proc_mlp(img_np)
# get gradx and grady
gradx_img, grady_img = geom_utils._normal_to_grad_depth(img_normal=img_np, gel_width=cfg.sensor.gel_width,
gel_height=cfg.sensor.gel_height,bg_mask=None)
# reconstruct depth
img_depth = geom_utils._integrate_grad_depth(gradx_img, grady_img, boundary=None, bg_mask=None,max_depth=0.0237)
img_depth = img_depth.detach().cpu().numpy() # final depth image for current image
# get depth difference
depth_diff = (img_depth - img_depth_base)*500
# cv2.imshow("depth", depth_diff)
img_depth[img_depth == 0.0237] = 0
img_depth[img_depth != 0] = (img_depth[img_depth != 0]-0.0237)*(-1)
img_depth = img_depth*1000
cv2.imshow("depth", depth_diff)
msg = br.cv2_to_compressed_imgmsg(img_depth, "png")
ic.image_pub.publish(msg)
now = rospy.get_rostime()
rospy.loginfo("published depth image at {}".format(now))
cv2.waitKey(1)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cv2.destroyAllWindows()
if __name__ == "__main__":
rospy.loginfo("starting...")
show_depth()

69
scripts/label_data.py Normal file
View File

@@ -0,0 +1,69 @@
"""
Labels images for training MLP depth reconstruction model.
Specify the image folder containing the circle images and csv folder to store the labels ( img_names, center_x, center_y, radius ).
The image datasets should include the rolling of a sphere with a known radius.
Directions:
-- Click left mouse button to select the center of the sphere.
-- Click right mouse button to select the circumference of the sphere.
-- Double click ESC to move to the next image.
"""
import argparse
import csv
import glob
import math
import os
import cv2
base_path = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
def click_and_store(event, x, y, flags, param):
global count
global center_x, center_y, circumference_x, circumference_y, radii
if event == cv2.EVENT_LBUTTONDOWN:
center_x = x
center_y = y
print("center_x: ", x)
print("center_y: ", y)
cv2.circle(image, (x, y), 3, (0, 0, 255), -1)
cv2.imshow("image", image)
elif event == cv2.EVENT_RBUTTONDOWN:
circumference_x = x
circumference_y = y
print("circumference_x: ", x)
print("circumference_y: ", y)
cv2.circle(image, (x, y), 3, (0, 0, 255), -1)
cv2.imshow("image", image)
radius = math.sqrt(
(center_x - circumference_x) ** 2 + (center_y - circumference_y) ** 2
)
print("radius: ", int(radius))
radii.append(int(radius))
with open(filename, "a") as csvfile:
writer = csv.writer(csvfile)
print("Writing>>")
count += 1
writer.writerow([img_name, center_x, center_y, int(radius)])
if __name__ == "__main__":
argparser = argparse.ArgumentParser()
argparser.add_argument("--folder", type=str, default="images", help="folder containing images")
argparser.add_argument("--csv", type=str, default=f"{base_path}/csv/annotate.csv", help="csv file to store results")
args = argparser.parse_args()
filename = args.csv
img_folder = os.path.join(base_path, args.folder)
img_files = sorted(glob.glob(f"{img_folder}/*.png"))
os.makedirs(os.path.join(base_path, "csv"), exist_ok=True)
center_x, center_y, circumference_x, circumference_y, radii = [], [], [], [], []
count = 0
for img in img_files:
image = cv2.imread(img)
img_name = img
cv2.imshow("image", image)
cv2.setMouseCallback("image", click_and_store, image)
cv2.waitKey(0)
cv2.destroyAllWindows()

65
scripts/point_cloud.py Normal file
View File

@@ -0,0 +1,65 @@
""" Publishes a ROS topic with name /depth/compressed and shows the image on OpenCV window.
Issues: rqt_image_view is not showing the image due to some data conversion issues but OpenCV is showing the image."""
import os
import hydra
import open3d as o3d
from digit_depth.third_party import geom_utils
from digit_depth.digit import DigitSensor
from digit_depth.train import MLP
from digit_depth.train.prepost_mlp import *
from attrdict import AttrDict
from digit_depth.third_party import vis_utils
seed = 42
torch.seed = seed
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
base_path = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
@hydra.main(config_path="/home/shuk/digit-depth/config", config_name="rgb_to_normal.yaml", version_base=None)
def show_point_cloud(cfg):
view_params = AttrDict({'fov': 60, 'front': [-0.1, 0.1, 0.1], 'lookat': [
-0.001, -0.01, 0.01], 'up': [0.04, -0.05, 0.190], 'zoom': 2.5})
vis3d = vis_utils.Visualizer3d(base_path=base_path, view_params=view_params)
# projection params
T_cam_offset = torch.tensor(cfg.sensor.T_cam_offset)
proj_mat = torch.tensor(cfg.sensor.P)
model = torch.load(cfg.model_path).to(device)
model.eval()
# base image depth map
base_img = cv2.imread(cfg.base_img_path)
base_img = preproc_mlp(base_img)
base_img_proc = model(base_img).cpu().detach().numpy()
base_img_proc, normal_base = post_proc_mlp(base_img_proc)
# get gradx and grady
gradx_base, grady_base = geom_utils._normal_to_grad_depth(img_normal=base_img_proc, gel_width=cfg.sensor.gel_width,
gel_height=cfg.sensor.gel_height, bg_mask=None)
# reconstruct depth
img_depth_base = geom_utils._integrate_grad_depth(gradx_base, grady_base, boundary=None, bg_mask=None,
max_depth=0.0237)
img_depth_base = img_depth_base.detach().cpu().numpy() # final depth image for base image
# setup digit sensor
digit = DigitSensor(30, "QVGA", cfg.sensor.serial_num)
digit_call = digit()
while True:
frame = digit_call.get_frame()
img_np = preproc_mlp(frame)
img_np = model(img_np).detach().cpu().numpy()
img_np, normal_img = post_proc_mlp(img_np)
# get gradx and grady
gradx_img, grady_img = geom_utils._normal_to_grad_depth(img_normal=img_np, gel_width=cfg.sensor.gel_width,
gel_height=cfg.sensor.gel_height,bg_mask=None)
# reconstruct depth
img_depth = geom_utils._integrate_grad_depth(gradx_img, grady_img, boundary=None, bg_mask=None, max_depth=0.0237)
view_mat = torch.eye(4) # torch.inverse(T_cam_offset)
# Project depth to 3D
points3d = geom_utils.depth_to_pts3d(depth=img_depth, P=proj_mat, V=view_mat, params=cfg.sensor)
points3d = geom_utils.remove_background_pts(points3d, bg_mask=None)
cloud = o3d.geometry.PointCloud()
clouds = geom_utils.init_points_to_clouds(clouds=[copy.deepcopy(cloud)], points3d=[points3d])
vis_utils.visualize_geometries_o3d(vis3d=vis3d, clouds=clouds)
if __name__ == "__main__":
show_point_cloud()

49
scripts/record.py Normal file
View File

@@ -0,0 +1,49 @@
"""
Script for capturing individual frames while the camera output is displayed.
-- Press SPACEBAR to capture
-- Press ESC to terminate the program.
"""
import argparse
import os
import os.path
import cv2
from digit_depth.digit.digit_sensor import DigitSensor
base_path = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
def record_frame(digit_sensor, dir_name: str):
img_counter = len(os.listdir(dir_name))
digit_call = digit_sensor()
while True:
frame = digit_call.get_frame()
cv2.imshow("Capture Frame", frame)
k = cv2.waitKey(1)
if k % 256 == 27:
# ESC hit
print("Escape hit, closing...")
break
elif k % 256 == 32:
# SPACEBAR hit
img_name = "{}/{:0>4}.png".format(dir_name, img_counter)
cv2.imwrite(img_name, frame)
print("{} written!".format(img_name))
img_counter += 1
cv2.destroyAllWindows()
if __name__ == "__main__":
argparser = argparse.ArgumentParser()
argparser.add_argument("--fps", type=int, default=30, help="Frames per second. Max:60 on QVGA")
argparser.add_argument("--resolution", type=str, default="QVGA", help="QVGA, VGA")
argparser.add_argument("--serial_num", type=str, default="D00001", help="Serial number of DIGIT")
args = argparser.parse_args()
if not os.path.exists(os.path.join(base_path, "images")):
os.makedirs(os.path.join(base_path, "images"), exist_ok=True)
print("Directory {} created for saving images".format(os.path.join(base_path, "images")))
digit = DigitSensor(args.fps, args.resolution, args.serial_num)
record_frame(digit, os.path.join(base_path, "images"))

View File

@@ -0,0 +1,54 @@
""" Node to publish max depth value when gel is deformed """
import os
import hydra
import rospy
from std_msgs.msg import Float32
from digit_depth.third_party import geom_utils
from digit_depth.digit.digit_sensor import DigitSensor
from digit_depth.train.mlp_model import MLP
from digit_depth.train.prepost_mlp import *
seed = 42
torch.seed = seed
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
BASE_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
@hydra.main(config_path=f"{BASE_PATH}/config", config_name="rgb_to_normal.yaml", version_base=None)
def print_depth(cfg):
model = torch.load(cfg.model_path).to(device)
model.eval()
# setup digit sensor
digit = DigitSensor(30, "QVGA", cfg.sensor.serial_num)
digit_call = digit()
pub = rospy.Publisher('chatter', Float32, queue_size=1)
rospy.init_node('depth', anonymous=True)
try:
while not rospy.is_shutdown():
frame = digit_call.get_frame()
img_np = preproc_mlp(frame)
img_np = model(img_np).detach().cpu().numpy()
img_np, normal_img = post_proc_mlp(img_np)
# get gradx and grady
gradx_img, grady_img = geom_utils._normal_to_grad_depth(img_normal=img_np, gel_width=cfg.sensor.gel_width,
gel_height=cfg.sensor.gel_height,bg_mask=None)
# reconstruct depth
img_depth = geom_utils._integrate_grad_depth(gradx_img, grady_img, boundary=None, bg_mask=None,max_depth=0.0237)
img_depth = img_depth.detach().cpu().numpy().flatten()
# get max depth value
max_depth = np.min(img_depth)
rospy.loginfo(f"max:{max_depth}")
img_depth_calibrated = np.abs((max_depth - 0.02362))
# publish max depth value
pub.publish(Float32(img_depth_calibrated*10000)) # convert to mm
except KeyboardInterrupt:
print("Shutting down")
digit().disconnect()
if __name__ == "__main__":
rospy.loginfo("starting...")
print_depth()

View File

@@ -0,0 +1,52 @@
""" ROS image publisher for DIGIT sensor """
import argparse
# OpenCV
import cv2
from PIL import Image as Im
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
# Ros libraries
import roslib
import rospy
# Ros Messages
from sensor_msgs.msg import CompressedImage
from sensor_msgs.msg import std_msgs
from digit_depth.digit.digit_sensor import DigitSensor
class ImageFeature:
def __init__(self):
# topic where we publish
self.image_pub = rospy.Publisher("/output/image_raw/compressed",
CompressedImage, queue_size=10)
self.br = CvBridge()
def rgb_pub(digit_sensor: DigitSensor):
# Initializes and cleanup ros node
ic = ImageFeature()
rospy.init_node('image_feature', anonymous=True)
digit_call = digit_sensor()
br = CvBridge()
while True:
frame = digit_call.get_frame()
msg = br.cv2_to_compressed_imgmsg(frame, "png")
ic.image_pub.publish(msg)
rospy.loginfo("published ...")
if cv2.waitKey(1) == 27:
break
if __name__ == "__main__":
argparser = argparse.ArgumentParser()
argparser.add_argument("--fps", type=int, default=30)
argparser.add_argument("--resolution", type=str, default="QVGA")
argparser.add_argument("--serial_num", type=str, default="D00001")
args, unknown = argparser.parse_known_args()
digit = DigitSensor(args.fps, args.resolution, args.serial_num)
rgb_pub(digit)

112
scripts/train_mlp.py Normal file
View File

@@ -0,0 +1,112 @@
import argparse
import os
import torch
import torch.nn as nn
import wandb
from torch.utils.data import DataLoader
from tqdm import tqdm
from digit_depth.train import MLP, Color2NormalDataset
seed = 42
torch.seed = seed
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
base_path = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
def train(train_loader, epochs, lr):
model = MLP().to(device)
wandb.init(project="MLP", name="Color 2 Normal model train")
wandb.watch(model, log_freq=100)
model.train()
learning_rate = lr
# Loss and optimizer
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
num_epochs = epochs
avg_loss=0.0
loss_record=[]
cnt=0
total_step = len(train_loader)
for epoch in tqdm(range(1, 1 + num_epochs)):
for i, (data, labels) in enumerate(train_loader):
# Move tensors to the configured device
data = data.to(device)
labels = labels.to(device)
outputs = model(data)
loss = criterion(outputs, labels)
avg_loss += loss.item()
optimizer.zero_grad()
loss.backward()
optimizer.step()
cnt+=1
if (i + 1) % 1 == 0:
print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'
.format(epoch + 1, num_epochs, i + 1, total_step, loss.item()))
loss_record.append(loss.item())
# wandb.log({"Mini-batch loss": loss})
# wandb.log({'Running test loss': avg_loss / cnt})
os.makedirs(f"{base_path}/models", exist_ok=True)
print(f"Saving model to {base_path}/models/")
torch.save(model,
f"{base_path}/models/mlp.ckpt")
def test(test_loader,criterion):
model = torch.load(
f"{base_path}/models/mlp.ckpt").to(
device)
model.eval()
wandb.init(project="MLP", name="Color 2 Normal model test")
wandb.watch(model, log_freq=100)
model.eval()
avg_loss = 0.0
cnt = 0
with torch.no_grad():
for idx, (data, labels) in enumerate(test_loader):
data = data.to(device)
labels = labels.to(device)
outputs = model(data)
loss = criterion(outputs, labels)
avg_loss += loss.item()
cnt=cnt+1
# wandb.log({"Mini-batch test loss": loss})
avg_loss = avg_loss / cnt
print("Test loss: {:.4f}".format(avg_loss))
# wandb.log({'Average Test loss': avg_loss})
def main():
argparser = argparse.ArgumentParser()
argparser.add_argument('--mode', type=str, default='train', help='train or test')
argparser.add_argument('--batch_size', type=int, default=10000, help='batch size')
argparser.add_argument('--learning_rate', type=float, default=0.001, help='learning rate')
argparser.add_argument('--epochs', type=int, default=2, help='epochs')
argparser.add_argument('--train_path', type=str, default=f'{base_path}/datasets/train_test_split/train.csv',
help='data path')
argparser.add_argument('--test_path', type=str, default=f'{base_path}/datasets/train_test_split/test.csv',
help='test data path')
option = argparser.parse_args()
if option.mode == "train":
train_set = Color2NormalDataset(
option.train_path)
train_loader = DataLoader(train_set, batch_size=option.batch_size, shuffle=True)
print("Training set size: ", len(train_set))
train(train_loader, option.epochs,option.learning_rate)
elif option.mode == "test":
test_set = Color2NormalDataset(
option.test_path)
test_loader = DataLoader(test_set, batch_size=option.batch_size, shuffle=True)
criterion = nn.MSELoss()
test(test_loader, criterion)
if __name__ == "__main__":
main()

17
setup.py Normal file
View File

@@ -0,0 +1,17 @@
from setuptools import setup, find_packages
setup(
name="digit-depth",
version="0.1",
description="Digit Depth Reconstruction",
url="https://github.com/vocdex/digit-depth",
author="Shukrullo Nazirjonov",
author_email="nazirjonovsh2000@gmail.com",
license="MIT",
install_requires=['numpy', 'opencv-python', 'torch'],
packages=find_packages(),
zip_safe=False,
extras_require={
'testing': ['pytest'],
},
)

0
tests/__init__.py Normal file
View File

15
tests/test_digit.py Normal file
View File

@@ -0,0 +1,15 @@
import unittest
from digit_interface import Digit
from digit_depth import DigitSensor
class TestDigit(unittest.TestCase):
def test_digit_sensor(self):
fps = 30
resolution = "QVGA"
serial_num = "12345"
digit_sensor = DigitSensor(fps, resolution, serial_num)
digit = digit_sensor()
self.assertIsInstance(digit, Digit)

16
tests/test_handlers.py Normal file
View File

@@ -0,0 +1,16 @@
import unittest
import torch
from PIL import Image
from digit_depth.handlers import image
class Handler(unittest.TestCase):
"""Test for various data handlers"""
def test_tensor_to_PIL(self):
instance = image.ImageHandler(Image.open("/home/shuk/digit-depth/images/0001.png"), "RGB")
tensor = torch.randn(1, 3, 224, 224)
pil_image = instance.tensor_to_PIL()
self.assertEqual(pil_image.size, (224, 224))
if __name__ == '__main__':
unittest.main()

26
tests/test_train.py Normal file
View File

@@ -0,0 +1,26 @@
import os
import unittest
import torch
from digit_depth.train import MLP, Color2NormalDataset
base_path = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
class Train(unittest.TestCase):
def test_shape(self):
model = MLP()
x = torch.randn(1, 5)
y = model(x)
self.assertEqual(torch.Size([1, 3]), y.size())
def test_dataset(self):
dataset = Color2NormalDataset(f'{base_path}/datasets/train_test_split/train.csv')
x, y = dataset[0]
self.assertEqual(torch.Size([5]), x.size())
self.assertEqual(torch.Size([3]), y.size())
self.assertLessEqual(x.max(), 1)
self.assertGreaterEqual(x.min(), 0)
if __name__ == '__main__':
unittest.main()