import numpy as np
import tensorflow as tf
from tensorflow.contrib.framework.python.ops import add_arg_scope, arg_scope
import math
from matplotlib import pyplot as plt
import tensorflow.contrib.distributions as tfd
[docs]def model_arg_scope(**kwargs):
"""Create new counter and apply arg scope to all arg scoped nn
operations."""
counters = {}
return arg_scope(
[conv2d, deconv2d, residual_block, dense, activate], counters=counters, **kwargs
)
[docs]def make_model(name, template, **kwargs):
"""Create model with fixed kwargs."""
run = lambda *args, **kw: template(
*args, **dict((k, v) for kws in (kw, kwargs) for k, v in kws.items())
)
if tf.executing_eagerly():
return tf.make_template(name, run)
return tf.make_template(name, run, unique_name=name)
[docs]def int_shape(x):
""" short for x.shape.as_list() """
return x.shape.as_list()
[docs]def get_name(layer_name, counters):
""" utlity for keeping track of layer names """
if not layer_name in counters:
counters[layer_name] = 0
name = layer_name + "_" + str(counters[layer_name])
counters[layer_name] += 1
return name
[docs]def apply_partwise(input_, func):
"""
Applies function func on all parts separately.
Parts are in channel 3.
The input is reshaped to map the parts to the batch axis and then the function is applied
Parameters
----------
input_ : tensor
[b, h, w, parts, features]
func : callable
a NN function to apply to each part individually
Returns
-------
[b, out_h, out_w, parts, out_features]
"""
b, h, w, parts, f = input_.shape.as_list()
# transpose [b, h, w, part, features] --> [part, b, h, w, features]
perm = [3, 0, 1, 2, 4]
x = tf.transpose(input_, perm=perm)
# reshape [part, b, h, w, features] --> [part * b, h, w, features]
x = tf.reshape(x, (b * parts, h, w, f))
y = func(x)
_, h_out, w_out, c_out = y.shape.as_list()
# reshape [part * b, h_out, w_out, c_out] --> [part, b, h_out, w_out, c_out]
out = tf.reshape(y, (parts, b, h_out, w_out, c_out))
# transpose back [part, b, h_out, w_out, c_out] --> [b, h_out, w_out, part, c_out]
inv_perm = [1, 2, 3, 0, 4]
out = tf.transpose(out, perm=inv_perm)
return out
@add_arg_scope
def partwise_conv2d(
x,
num_filters,
filter_size=[3, 3],
stride=[1, 1],
pad="SAME",
init_scale=1.0,
counters={},
init=False,
initdist="uniform",
**kwargs
):
"""
input: [b, h, w, parts, features]
Each part (channel 3) has is its own bias and scale
Uses 3D convolution internally to prevent tf.transpose
Examples
--------
import tensorflow as tf
tf.enable_eager_execution()
from pylab import *
from skimage import data
import numpy as np
import math
im = data.astronaut()
im = im.astype(np.float32) / 255
H, W, D = im.shape
b = 1
parts = 5
out_features = 1
features = tf.reshape(im, (b, H, W, 1, D))
features = tf.concat([features] * parts, axis=3)
out = partwise_conv2d(features, out_features, init=False, part_wise=True, initdist="debug")
fig, ax = plt.subplots(1, 1, figsize=(20, 20))
a = np.hstack([np.squeeze(out[..., p, :]) for p in range(parts)])
ax.imshow(a, cmap=plt.cm.gray) # this should render the astronaut in 5 different shades of gray
"""
num_filters = int(num_filters)
name = get_name("conv2d", counters)
with tf.variable_scope(name):
in_channels = x.shape.as_list()[4]
in_parts = x.shape.as_list()[3]
fan_in = in_channels * filter_size[0] * filter_size[1]
stdv = math.sqrt(1.0 / fan_in)
part_stdv = math.sqrt(1.0 / in_parts)
if initdist == "uniform":
V_initializer = tf.random_uniform_initializer(minval=-stdv, maxval=stdv)
b_initializer = tf.random_uniform_initializer(minval=-stdv, maxval=stdv)
v_part_initializer = tf.random_uniform_initializer(
minval=-part_stdv, maxval=part_stdv
)
b_part_initializer = tf.random_uniform_initializer(
minval=-part_stdv, maxval=part_stdv
)
elif initdist == "normal":
V_initializer = tf.random_normal_initializer(stddev=stdv)
b_initializer = tf.random_normal_initializer(stddev=stdv)
v_part_initializer = tf.random_normal_initializer(stddev=part_stdv)
b_part_initializer = tf.random_normal_initializer(stddev=part_stdv)
elif initdist == "debug":
pass
else:
raise ValueError(initdist)
if not initdist == "debug":
V = tf.get_variable(
"V",
filter_size + [1, in_channels, num_filters],
initializer=V_initializer,
dtype=tf.float32,
)
b = tf.get_variable(
"b",
[1, 1, 1, 1, num_filters],
initializer=b_initializer,
dtype=tf.float32,
)
V_part = tf.get_variable(
"V_part",
[1, 1, 1, in_parts, 1],
initializer=v_part_initializer,
dtype=tf.float32,
)
b_part = tf.get_variable(
"b_part",
[1, 1, 1, in_parts, 1],
initializer=b_part_initializer,
dtype=tf.float32,
)
else:
V = (
tf.reshape([1.0, 2.0, 1.0], (3, 1, 1, 1, 1))
/ 4
* tf.reshape([1.0, 2.0, 1.0], (1, 3, 1, 1, 1))
/ 4
* tf.reshape([1.0, 1.0, 0.0], (1, 1, 1, 3, 1))
/ 2
)
b = tf.zeros([1, 1, 1, 1, num_filters], dtype=tf.float32)
V_part = tf.reshape(
tf.cast(tf.linspace(0.0, 1.0, in_parts), dtype=tf.float32),
[1, 1, 1, in_parts, 1],
)
b_part = tf.zeros([1, 1, 1, in_parts, 1], dtype=tf.float32)
x = tf.nn.conv3d(x, V, strides=[1] + stride + [1] + [1], padding="SAME")
x *= V_part
x += b + b_part
return x
def _conv2d(
x,
num_filters,
filter_size=[3, 3],
stride=[1, 1],
pad="SAME",
init_scale=1.0,
counters={},
init=False,
**kwargs
):
num_filters = int(num_filters)
strides = [1] + stride + [1]
name = get_name("conv2d", counters)
initdist = "uniform"
with tf.variable_scope(name):
in_channels = int(x.get_shape()[-1])
fan_in = in_channels * filter_size[0] * filter_size[1]
stdv = math.sqrt(1.0 / fan_in)
if initdist == "uniform":
V_initializer = tf.random_uniform_initializer(minval=-stdv, maxval=stdv)
b_initializer = tf.random_uniform_initializer(minval=-stdv, maxval=stdv)
elif initdist == "normal":
V_initializer = tf.random_normal_initializer(stddev=stdv)
b_initializer = tf.random_normal_initializer(stddev=stdv)
else:
raise ValueError(initdist)
V = tf.get_variable(
"V",
filter_size + [in_channels, num_filters],
initializer=V_initializer,
dtype=tf.float32,
)
b = tf.get_variable(
"b", [num_filters], initializer=b_initializer, dtype=tf.float32
)
if init:
tmp = tf.nn.conv2d(x, V, [1] + stride + [1], pad) + tf.reshape(
b, [1, 1, 1, num_filters]
)
mean, var = tf.nn.moments(tmp, [0, 1, 2])
scaler = 1.0 / tf.sqrt(var + 1e-6)
V = tf.assign(V, V * scaler)
b = tf.assign(b, -mean * scaler)
x = tf.nn.conv2d(x, V, [1] + stride + [1], pad) + tf.reshape(
b, [1, 1, 1, num_filters]
)
return x
@add_arg_scope
def conv2d(
x,
num_filters,
filter_size=[3, 3],
stride=[1, 1],
pad="SAME",
init_scale=1.0,
counters={},
init=False,
part_wise=False,
coords=False,
**kwargs
):
"""
A 2D convolution.
Parameters
----------
x: tensor
input tensor [N, H, W, C]
num_filters: int
number of feature maps
filter_size: list of `ints`
filter size in x, y
stride: list of `ints`
stride in x, y
pad: A `string` from: `"SAME", "VALID"`.
The type of padding algorithm to use.
option 'SAME'
init_scale
counters
init
part_wise: bool
if True, the input has to be [N, H, W, Parts, C]. The convolution will get an additional scale and bias per part
coords: bool
if True, will use coordConv (2018ACS_liuIntriguingFailingConvolutionalNeuralNetworks)
**kwargs
Returns
-------
tensor
convolved input
"""
if coords:
x = add_coordinates(x)
if part_wise:
out = partwise_conv2d(
x,
num_filters,
filter_size=filter_size,
stride=stride,
pad=pad,
init_scale=init_scale,
counters=counters,
init=init,
**kwargs
)
else:
out = _conv2d(
x,
num_filters,
filter_size=filter_size,
stride=stride,
pad=pad,
init_scale=init_scale,
counters=counters,
init=init,
**kwargs
)
return out
@add_arg_scope
def dense(x, num_units, init_scale=1.0, counters={}, init=False, **kwargs):
""" fully connected layer """
name = get_name("dense", counters)
initdist = "uniform"
with tf.variable_scope(name):
in_channels = int(x.get_shape()[-1])
fan_in = in_channels
stdv = math.sqrt(1.0 / fan_in)
if initdist == "uniform":
V_initializer = tf.random_uniform_initializer(minval=-stdv, maxval=stdv)
b_initializer = tf.random_uniform_initializer(minval=-stdv, maxval=stdv)
elif initdist == "normal":
V_initializer = tf.random_normal_initializer(stddev=stdv)
b_initializer = tf.random_normal_initializer(stddev=stdv)
else:
raise ValueError(initdist)
V = tf.get_variable(
"V", [in_channels, num_units], initializer=V_initializer, dtype=tf.float32
)
b = tf.get_variable(
"b", [num_units], initializer=b_initializer, dtype=tf.float32
)
if init:
tmp = tf.matmul(x, V) + tf.reshape(b, [1, num_units])
mean, var = tf.nn.moments(tmp, [0])
scaler = 1.0 / tf.sqrt(var + 1e-6)
V = tf.assign(V, V * scaler)
b = tf.assign(b, -mean * scaler)
x = tf.matmul(x, V) + tf.reshape(b, [1, num_units])
return x
@add_arg_scope
def activate(x, activation, **kwargs):
"""
Activation unit
Parameters
----------
x: tensor
input tensor
activation:
A `string` from: `"elu", "relu", "leaky_relu", "softplus"`.
kwargs
Returns
-------
"""
if activation == None:
return x
elif activation == "elu":
return tf.nn.elu(x)
elif activation == "relu":
return tf.nn.relu(x)
elif activation == "leaky_relu":
return tf.nn.leaky_relu(x)
elif activation == "softplus":
return tf.nn.softplus(x)
else:
raise NotImplemented(activation)
[docs]def nin(x, num_units):
""" a network in network layer (1x1 CONV) """
return conv2d(x, num_units, filter_size=[1, 1])
[docs]def downsample(x, num_units):
"""
Downsampling by stride 2 convolution
equivalent to
x = conv2d(x, num_units, stride = [2, 2])
Parameters
----------
x: tensor
input
num_units:
number of feature map in the output
Returns
-------
"""
return conv2d(x, num_units, stride=[2, 2])
[docs]def upsample(x, num_units, method="subpixel"):
"""
2D upsampling layer.
Parameters
----------
x: tensor
input
num_units:
number of feature maps in the output
method:
upsampling method. A `string` from: `"conv_transposed", "nearest_neighbor", "linear", "subpixel"`
Subpixel means that every upsampled pixel gets its own filter.
Returns
-------
upsampled input
"""
xs = x.shape.as_list()
if method == "conv_transposed":
return deconv2d(x, num_units, stride=[2, 2])
elif method == "subpixel":
x = conv2d(x, 4 * num_units)
x = tf.depth_to_space(x, 2)
return x
elif method == "nearest_neighbor":
bs, h, w, c = x.shape.as_list()
x = tf.image.resize_images(
x, [2 * h, 2 * w], tf.image.ResizeMethod.NEAREST_NEIGHBOR
)
return x
elif method == "linear":
bs, h, w, c = xs[:4]
x = tf.image.resize_images(x, [2 * h, 2 * w], tf.image.ResizeMethod.BILINEAR)
return x
else:
raise NotImplemented(method)
@add_arg_scope
def partwise_deconv2d(
x,
num_filters,
filter_size=[3, 3],
stride=[1, 1],
pad="SAME",
init_scale=1.0,
counters={},
init=False,
**kwargs
):
""" transposed convolutional layer """
num_filters = int(num_filters)
name = get_name("deconv2d", counters)
xs = x.shape.as_list()
strides = [1] + stride + [1]
in_parts = xs[3]
part_stdv = math.sqrt(1.0 / in_parts)
v_part_initializer = tf.random_uniform_initializer(
minval=-part_stdv, maxval=part_stdv
)
b_part_initializer = tf.random_uniform_initializer(
minval=-part_stdv, maxval=part_stdv
)
if pad == "SAME":
target_shape = [
xs[0] * in_parts,
xs[1] * stride[0],
xs[2] * stride[1],
num_filters,
]
else:
target_shape = [
xs[0] * in_parts,
xs[1] * stride[0] + filter_size[0] - 1,
xs[2] * stride[1] + filter_size[1] - 1,
num_filters,
]
with tf.variable_scope(name):
V = tf.get_variable(
"V",
filter_size + [num_filters, xs[-1]],
tf.float32,
tf.random_normal_initializer(0, 0.05),
)
g = tf.get_variable(
"g",
[num_filters],
dtype=tf.float32,
initializer=tf.constant_initializer(1.0),
)
b = tf.get_variable(
"b",
[num_filters],
dtype=tf.float32,
initializer=tf.constant_initializer(0.0),
)
V_part = tf.get_variable(
"V_part", [in_parts], initializer=v_part_initializer, dtype=tf.float32
)
b_part = tf.get_variable(
"b_part", [in_parts], initializer=b_part_initializer, dtype=tf.float32
)
V_norm = tf.nn.l2_normalize(V, [0, 1, 3])
def part_conv_func(x_):
x_ = tf.nn.conv2d_transpose(
x_, V_norm, target_shape, [1] + stride + [1], pad
)
x_ = tf.reshape(g, [1, 1, 1, num_filters]) * x_ + tf.reshape(
b, [1, 1, 1, num_filters]
)
return x_
if init:
mean, var = tf.nn.moments(x, [0, 1, 2])
g = tf.assign(g, init_scale / tf.sqrt(var + 1e-10))
b = tf.assign(b, -mean * g)
x = apply_partwise(x, part_conv_func)
x = x * tf.reshape(V_part, (1, 1, 1, in_parts, 1)) + tf.reshape(
b_part, (1, 1, 1, in_parts, 1)
)
return x
@add_arg_scope
def _deconv2d(
x,
num_filters,
filter_size=[3, 3],
stride=[1, 1],
pad="SAME",
init_scale=1.0,
counters={},
init=False,
**kwargs
):
""" transposed convolutional layer """
num_filters = int(num_filters)
name = get_name("deconv2d", counters)
xs = x.shape.as_list()
strides = [1] + stride + [1]
if pad == "SAME":
target_shape = [xs[0], xs[1] * stride[0], xs[2] * stride[1], num_filters]
else:
target_shape = [
xs[0],
xs[1] * stride[0] + filter_size[0] - 1,
xs[2] * stride[1] + filter_size[1] - 1,
num_filters,
]
with tf.variable_scope(name):
V = tf.get_variable(
"V",
filter_size + [num_filters, xs[-1]],
tf.float32,
tf.random_normal_initializer(0, 0.05),
)
g = tf.get_variable(
"g",
[num_filters],
dtype=tf.float32,
initializer=tf.constant_initializer(1.0),
)
b = tf.get_variable(
"b",
[num_filters],
dtype=tf.float32,
initializer=tf.constant_initializer(0.0),
)
V_norm = tf.nn.l2_normalize(V, [0, 1, 3])
x = tf.nn.conv2d_transpose(x, V_norm, target_shape, [1] + stride + [1], pad)
if init:
mean, var = tf.nn.moments(x, [0, 1, 2])
g = tf.assign(g, init_scale / tf.sqrt(var + 1e-10))
b = tf.assign(b, -mean * g)
x = tf.reshape(g, [1, 1, 1, num_filters]) * x + tf.reshape(
b, [1, 1, 1, num_filters]
)
return x
@add_arg_scope
def deconv2d(
x,
num_filters,
filter_size=[3, 3],
stride=[1, 1],
pad="SAME",
init_scale=1.0,
counters={},
init=False,
part_wise=False,
coords=False,
**kwargs
):
"""
coords: if True, will use coordConv (2018ACS_liuIntriguingFailingConvolutionalNeuralNetworks)
"""
if coords:
x = add_coordinates(x)
if part_wise:
out = partwise_deconv2d(
x,
num_filters,
filter_size=filter_size,
stride=stride,
pad=pad,
init_scale=init_scale,
counters=counters,
init=init,
**kwargs
)
else:
out = _deconv2d(
x,
num_filters,
filter_size=filter_size,
stride=stride,
pad=pad,
init_scale=init_scale,
counters=counters,
init=init,
**kwargs
)
return out
@add_arg_scope
def residual_block(x, skipin=None, conv=conv2d, init=False, dropout_p=0.0, **kwargs):
"""
slight variation of original residual block.
Parameters
----------
x: tensor
Incoming tensor
skipin: tensor
Incomming tensor from skip connection, if any
conv: callable
which convolution function to use for the resodual
init
dropout_p: float
dropout probability, if any
kwargs
Returns
-------
output : x + residual
Examples
"""
xs = int_shape(x)
num_filters = xs[-1]
residual = x
if skipin is not None:
skipin = nin(activate(skipin), num_filters)
residual = tf.concat([residual, skipin], axis=-1)
residual = activate(residual)
residual = tf.nn.dropout(residual, keep_prob=1.0 - dropout_p)
residual = conv(residual, num_filters)
return x + residual
[docs]def flatten(x):
"""
returns a flat version of x --> [N, -1]
Parameters
----------
x: tensor
Returns
-------
"""
_shape = x.shape.as_list()
return tf.reshape(x, (_shape[0], -1))
[docs]def mask2rgb(mask):
"""
Convert tensor with masks [N, H, W, C] to an RGB tensor [N, H, W, 3]
using argmax over channels.
Parameters
----------
mask: ndarray
an array of shape [N, H, W, C]
Returns:
RGB visualization in shape [N, H, W, 3]
-------
"""
n_parts = mask.shape.as_list()[3]
maxmask = tf.argmax(mask, axis=3)
hotmask = tf.one_hot(maxmask, depth=n_parts)
hotmask = tf.expand_dims(hotmask, 4)
colors = make_mask_colors(n_parts)
colors = (colors - 0.5) * 2
colors = tf.to_float(colors)
colors = tf.expand_dims(colors, 0)
colors = tf.expand_dims(colors, 0)
colors = tf.expand_dims(colors, 0)
vis_mask = hotmask * colors
vis_mask = tf.reduce_sum(vis_mask, axis=3)
return vis_mask
[docs]def np_one_hot(targets, n_classes):
"""
numpy equivalent of tf.one_hot
returns targets as one hot matrix
Parameters
----------
targets: ndarray
array of target classes
n_classes: int
how many classes there are overall
Returns: ndarray
one-hot array with shape [n, n_classes]
-------
"""
res = np.eye(n_classes)[np.array(targets).reshape(-1)]
return res.reshape(list(targets.shape) + [n_classes])
[docs]def np_to_float(x):
""" cast x to float32 """
return x.astype(np.float32)
[docs]def np_mask2rgb(mask):
"""
numpy equivalent of @mask2rgb
convert tensor with masks [N, H, W, C] to an RGB tensor [N, H, W, 3]
using argmax over channels.
Parameters
----------
mask: ndarray
an array of shape [N, H, W, C]
Returns:
RGB visualization in shape [N, H, W, 3]
-------
"""
n_parts = mask.shape[3]
maxmask = np.argmax(mask, axis=3)
hotmask = np_one_hot(maxmask, n_classes=n_parts)
hotmask = np.expand_dims(hotmask, 4)
colors = make_mask_colors(n_parts)
colors = (colors - 0.5) * 2
colors = np_to_float(colors)
colors = np.expand_dims(colors, 0)
colors = np.expand_dims(colors, 0)
colors = np.expand_dims(colors, 0)
vis_mask = hotmask * colors
vis_mask = np.sum(vis_mask, axis=3)
return vis_mask
[docs]def make_mask_colors(n_parts, cmap=plt.cm.inferno):
"""
make a color array using the specified colormap for n_parts classes
Parameters
----------
n_parts: int
how many classes there are in the mask
cmap:
matplotlib colormap handle
Returns
-------
colors: ndarray
an array with shape [n_parts, 3] representing colors in the range [0, 1].
"""
colors = cmap(np.linspace(0, 1, n_parts), alpha=False, bytes=False)[:, :3]
return colors
[docs]def hourglass_model(
x,
config,
extra_resnets,
n_out=3,
activation="relu",
upsample_method="subpixel",
coords=False,
):
"""
A U-net or hourglass style image-to-image model with skip-connections
Parameters
----------
x : tensor
input tensor to unet
config : list
a list of ints specifying the number of feature maps on each scale of the unet in the downsampling path
for the upsampling path, the list will be reversed
For example [32, 64] will use 32 channels on scale 0 (without downsampling) and 64 channels on scale 1
once downsampled).
extra_resnets : int
how many extra res blocks to use at the bottleneck
n_out : int
number of final output feature maps of the unet. 3 for RGB
activation : str
a string specifying the activation function to use. See @activate for options.
upsample_method : list of str or str
a str specifying the upsampling method or a list of str specifying the upsampling method for each scale individually.
See @upsample for possible options.
coords : True
if coord conv should be used.
Returns
-------
Examples
--------
tf.enable_eager_execution()
x = tf.ones((1, 128, 128, 3))
config = [32, 64]
extra_resnets = 0
upsample_method = "subpixel"
activation = "leaky_relu"
coords = False
unet = make_model("unet", hourglass_model, config=config, extra_resnets= extra_resnets, upsample_method=upsample_method, activation=activation)
y = unet(x)
# plotting the output should look random because we did not train anything
im = np.concatenate([x, y], axis=1)
plt.imshow(np.squeeze(im))
"""
with model_arg_scope(activation=activation, coords=coords):
hs = list()
h = conv2d(x, config[0])
h = residual_block(h)
for nf in config[1:]:
h = downsample(h, nf)
h = residual_block(h)
hs.append(h)
for _ in range(extra_resnets):
h = residual_block(h)
for i, nf in enumerate(config[-2::-1]):
h = residual_block(h, skipin=hs[-(i + 1)])
h = upsample(h, nf, method=upsample_method)
h = residual_block(h)
h = conv2d(h, n_out)
return h
[docs]def make_ema(init_value, value, decay=0.99):
"""
apply exponential moving average to variable
Parameters
----------
init_value: float
initial value for moving average variable
value: variable
tf variable to apply update ops on
decay: float
decay parameter
Returns
-------
avg_value : variable with exponential moving average
update_ema: tensorflow update operation for exponential moving average
Examples
--------
# usage within edflow Trainer.make_loss_ops. Apply EMA to discriminator accuracy
avg_acc, update_ema = make_ema(0.5, dis_accuracy, decay)
self.update_ops.append(update_ema)
self.log_ops["dis_acc"] = avg_acc
"""
decay = tf.constant(decay, dtype=tf.float32)
avg_value = tf.Variable(init_value, dtype=tf.float32, trainable=False)
update_ema = tf.assign(
avg_value, decay * avg_value + (1.0 - decay) * tf.cast(value, tf.float32)
)
return avg_value, update_ema
[docs]def add_coordinates(input_tensor, with_r=False):
"""
Given an input_tensor, adds 2 channelw ith x and y coordinates to the feature maps.
This was introduced in coordConv (2018ACS_liuIntriguingFailingConvolutionalNeuralNetworks).
Parameters
----------
input_tensor: tensor
Tensor of shape [N, H, W, C]
with_r : bool
if True, euclidian radius will also be added as channel
Returns
ret : input_tensor concatenated with x and y coordinates and maybe euclidian distance.
-------
"""
assert len(input_tensor.shape.as_list()) == 4
bs = tf.shape(input_tensor)[0]
x_dim, y_dim = input_tensor.shape.as_list()[1:3]
xx_ones = tf.ones([bs, x_dim], dtype=tf.int32)
xx_ones = tf.expand_dims(xx_ones, -1)
xx_range = tf.tile(tf.expand_dims(tf.range(y_dim), 0), [bs, 1])
xx_range = tf.expand_dims(xx_range, 1)
xx_channel = tf.matmul(xx_ones, xx_range)
xx_channel = tf.expand_dims(xx_channel, -1)
yy_ones = tf.ones([bs, y_dim], dtype=tf.int32)
yy_ones = tf.expand_dims(yy_ones, 1)
yy_range = tf.tile(tf.expand_dims(tf.range(x_dim), 0), [bs, 1])
yy_range = tf.expand_dims(yy_range, -1)
yy_channel = tf.matmul(yy_range, yy_ones)
yy_channel = tf.expand_dims(yy_channel, -1)
xx_channel = tf.cast(xx_channel, "float32") / max(1, x_dim - 1)
yy_channel = tf.cast(yy_channel, "float32") / max(1, y_dim - 1)
xx_channel = xx_channel * 2 - 1
yy_channel = yy_channel * 2 - 1
ret = tf.concat([input_tensor, xx_channel, yy_channel], axis=-1)
if with_r:
rr = tf.sqrt(tf.square(xx_channel) + tf.square(yy_channel))
ret = tf.concat([ret, rr], axis=-1)
return ret
[docs]def probs_to_mu_L(
probs, scaling_factor, inv=True
): # todo maybe exponential map induces to much certainty ! low values basically ignored and only high values count!
"""Calculate mean and covariance (cholesky decomposition of covariance) for each channel of probs
tensor of keypoint probabilites [bn, h, w, n_kp]
mean calculated on a grid of scale [-1, 1]
Parameters
----------
probs: tensor
tensor of shape [b, h, w, k] where each channel along axis 3 is interpreted as an unnormalized probability density.
scaling_factor : tensor
tensor of shape [b, 1, 1, k] representing normalizing the normalizing constant of the density
inv: bool
if True, returns covariance matrix of density. Else returns inverse of covariance matrix aka precision matrix
Returns
-------
mu : tensor
tensor of shape [b, k, 2] representing partwise mean coordinates of x and y for each item in the batch
L : tensor
tensor of shape [b, k, 2, 2] representing partwise cholesky decomposition of covariance
matrix for each item in the batch.
Example
-------
.. code-block:: python
from matplotlib import pyplot as plt
tf.enable_eager_execution()
import numpy as np
import tensorflow.contrib.distributions as tfd
_means = [-0.5, 0, 0.5]
means = tf.ones((3, 1, 2), dtype=tf.float32) * np.array(_means).reshape((3, 1, 1))
means = tf.concat([means, means, means[::-1, ...]], axis=1)
means = tf.reshape(means, (-1, 2))
var_ = 0.1
rho = 0.5
cov = [[var_, rho * var_], [rho * var_, var_]]
scale = tf.cholesky(cov)
scale = tf.stack([scale] * 3, axis=0)
scale = tf.stack([scale] * 3, axis=0)
scale = tf.reshape(scale, (-1, 2, 2))
mvn = tfd.MultivariateNormalTriL(
loc=means,
scale_tril=scale)
h = 100
w = 100
y_t = tf.tile(tf.reshape(tf.linspace(-1., 1., h), [h, 1]), [1, w])
x_t = tf.tile(tf.reshape(tf.linspace(-1., 1., w), [1, w]), [h, 1])
y_t = tf.expand_dims(y_t, axis=-1)
x_t = tf.expand_dims(x_t, axis=-1)
meshgrid = tf.concat([y_t, x_t], axis=-1)
meshgrid = tf.expand_dims(meshgrid, 0)
meshgrid = tf.expand_dims(meshgrid, 3) # 1, h, w, 1, 2
blob = mvn.prob(meshgrid)
blob = tf.reshape(blob, (100, 100, 3, 3))
blob = tf.transpose(blob, perm=[2, 0, 1, 3])
norm_const = np.sum(blob, axis=(1, 2), keepdims=True)
mu, L = nn.probs_to_mu_L(blob / norm_const, 1, inv=False)
bn, h, w, nk = blob.get_shape().as_list()
estimated_blob = nn.tf_hm(h, w, mu, L)
fig, ax = plt.subplots(2, 3, figsize=(9, 6))
for b in range(len(_means)):
ax[0, b].imshow(np.squeeze(blob[b, ...]))
ax[0, b].set_title("target_blobs")
ax[0, b].set_axis_off()
for b in range(len(_means)):
ax[1, b].imshow(np.squeeze(estimated_blob[b, ...]))
ax[1, b].set_title("estimated_blobs")
ax[1, b].set_axis_off()
"""
(
bn,
h,
w,
nk,
) = (
probs.get_shape().as_list()
) # todo instead of calulating sequrity measure from amplitude one could alternativly calculate it by letting the network predict a extra paremeter also one could do
y_t = tf.tile(tf.reshape(tf.linspace(-1.0, 1.0, h), [h, 1]), [1, w])
x_t = tf.tile(tf.reshape(tf.linspace(-1.0, 1.0, w), [1, w]), [h, 1])
y_t = tf.expand_dims(y_t, axis=-1)
x_t = tf.expand_dims(x_t, axis=-1)
meshgrid = tf.concat([y_t, x_t], axis=-1)
mu = tf.einsum("ijl,aijk->akl", meshgrid, probs)
mu_out_prod = tf.einsum(
"akm,akn->akmn", mu, mu
) # todo incosisntent ordereing of mu! compare with cross_V2
mesh_out_prod = tf.einsum(
"ijm,ijn->ijmn", meshgrid, meshgrid
) # todo efficient (expand_dims)
stddev = tf.einsum("ijmn,aijk->akmn", mesh_out_prod, probs) - mu_out_prod
a_sq = stddev[:, :, 0, 0]
a_b = stddev[:, :, 0, 1]
b_sq_add_c_sq = stddev[:, :, 1, 1]
eps = 1e-12 # todo clean magic
a = tf.sqrt(
a_sq + eps
) # Σ = L L^T Prec = Σ^-1 = L^T^-1 * L^-1 ->looking for L^-1 but first L = [[a, 0], [b, c]
b = a_b / (a + eps)
c = tf.sqrt(b_sq_add_c_sq - b ** 2 + eps)
z = tf.zeros_like(a)
if inv:
det = tf.expand_dims(tf.expand_dims(a * c, axis=-1), axis=-1)
row_1 = tf.expand_dims(
tf.concat(
[tf.expand_dims(c, axis=-1), tf.expand_dims(z, axis=-1)], axis=-1
),
axis=-2,
)
row_2 = tf.expand_dims(
tf.concat(
[tf.expand_dims(-b, axis=-1), tf.expand_dims(a, axis=-1)], axis=-1
),
axis=-2,
)
L_inv = (
scaling_factor / (det + eps) * tf.concat([row_1, row_2], axis=-2)
) # L^⁻1 = 1/(ac)* [[c, 0], [-b, a]
return mu, L_inv
else:
row_1 = tf.expand_dims(
tf.concat(
[tf.expand_dims(a, axis=-1), tf.expand_dims(z, axis=-1)], axis=-1
),
axis=-2,
)
row_2 = tf.expand_dims(
tf.concat(
[tf.expand_dims(b, axis=-1), tf.expand_dims(c, axis=-1)], axis=-1
),
axis=-2,
)
L = scaling_factor * tf.concat([row_1, row_2], axis=-2) # just L
return mu, L
[docs]def probs_to_mu_sigma(probs):
"""Calculate mean and covariance matrix for each channel of spatial probability maps
Mean and covariance are caluclated on a grid of scale [-1, 1]
Parameters
----------
probs: tensor
tensor of shape [N, H, W, C] where each channel along axis 3 is interpreted as a probability density.
Returns
-------
mu : tensor
tensor of shape [N, C, 2] representing partwise mean coordinates of x and y for each item in the batch
sigma : tensor
tensor of shape [N, C, 2, 2] representing covariance matrix matrix for each item in the batch
Example
-------
mu, sigma = nn.probs_to_mu_sigma(spatial_probability_maps)
"""
bn, h, w, nk = probs.get_shape().as_list()
y_t = tf.tile(tf.reshape(tf.linspace(-1.0, 1.0, h), [h, 1]), [1, w])
x_t = tf.tile(tf.reshape(tf.linspace(-1.0, 1.0, w), [1, w]), [h, 1])
y_t = tf.expand_dims(y_t, axis=-1)
x_t = tf.expand_dims(x_t, axis=-1)
meshgrid = tf.concat([y_t, x_t], axis=-1)
mu = tf.einsum("ijl,aijk->akl", meshgrid, probs)
mu_out_prod = tf.einsum("akm,akn->akmn", mu, mu)
mesh_out_prod = tf.einsum("ijm,ijn->ijmn", meshgrid, meshgrid)
sigma = tf.einsum("ijmn,aijk->akmn", mesh_out_prod, probs) - mu_out_prod
return mu, sigma
[docs]def tf_hm(h, w, mu, L):
"""
Returns Gaussian densitiy function based on μ and L for each batch index and part
L is the cholesky decomposition of the covariance matrix : Σ = L L^T
Parameters
----------
h : int
heigh ot output map
w : int
width of output map
mu : tensor
mean of gaussian part and batch item. Shape [b, p, 2]. Mean in range [-1, 1] with respect to height and width
L : tensor
cholesky decomposition of covariance matrix for each batch item and part. Shape [b, p, 2, 2]
order:
Returns
-------
density : tensor
gaussian blob for each part and batch idx. Shape [b, h, w, p]
Example
-------
.. code-block:: python
from matplotlib import pyplot as plt
tf.enable_eager_execution()
import numpy as np
import tensorflow as tf
import tensorflow.contrib.distributions as tfd
# create Target Blobs
_means = [-0.5, 0, 0.5]
means = tf.ones((3, 1, 2), dtype=tf.float32) * np.array(_means).reshape((3, 1, 1))
means = tf.concat([means, means, means[::-1, ...]], axis=1)
means = tf.reshape(means, (-1, 2))
var_ = 0.1
rho = 0.5
cov = [[var_, rho * var_],
[rho * var_, var_]]
scale = tf.cholesky(cov)
scale = tf.stack([scale] * 3, axis=0)
scale = tf.stack([scale] * 3, axis=0)
scale = tf.reshape(scale, (-1, 2, 2))
mvn = tfd.MultivariateNormalTriL(
loc=means,
scale_tril=scale)
h = 100
w = 100
y_t = tf.tile(tf.reshape(tf.linspace(-1., 1., h), [h, 1]), [1, w])
x_t = tf.tile(tf.reshape(tf.linspace(-1., 1., w), [1, w]), [h, 1])
y_t = tf.expand_dims(y_t, axis=-1)
x_t = tf.expand_dims(x_t, axis=-1)
meshgrid = tf.concat([y_t, x_t], axis=-1)
meshgrid = tf.expand_dims(meshgrid, 0)
meshgrid = tf.expand_dims(meshgrid, 3) # 1, h, w, 1, 2
blob = mvn.prob(meshgrid)
blob = tf.reshape(blob, (100, 100, 3, 3))
blob = tf.transpose(blob, perm=[2, 0, 1, 3])
# Estimate mean and L
norm_const = np.sum(blob, axis=(1, 2), keepdims=True)
mu, L = nn.probs_to_mu_L(blob / norm_const, 1, inv=False)
bn, h, w, nk = blob.get_shape().as_list()
# Estimate blob based on mu and L
estimated_blob = nn.tf_hm(h, w, mu, L)
# plot
fig, ax = plt.subplots(2, 3, figsize=(9, 6))
for b in range(len(_means)):
ax[0, b].imshow(np.squeeze(blob[b, ...]))
ax[0, b].set_title("target_blobs")
ax[0, b].set_axis_off()
for b in range(len(_means)):
ax[1, b].imshow(np.squeeze(estimated_blob[b, ...]))
ax[1, b].set_title("estimated_blobs")
ax[1, b].set_axis_off()
"""
assert len(mu.get_shape().as_list()) == 3
assert len(L.get_shape().as_list()) == 4
assert mu.get_shape().as_list()[-1] == 2
assert L.get_shape().as_list()[-1] == 2
assert L.get_shape().as_list()[-2] == 2
b, p, _ = mu.get_shape().as_list()
mu = tf.reshape(mu, (b * p, 2))
L = tf.reshape(L, (b * p, 2, 2))
mvn = tfd.MultivariateNormalTriL(loc=mu, scale_tril=L)
y_t = tf.tile(tf.reshape(tf.linspace(-1.0, 1.0, h), [h, 1]), [1, w])
x_t = tf.tile(tf.reshape(tf.linspace(-1.0, 1.0, w), [1, w]), [h, 1])
y_t = tf.expand_dims(y_t, axis=-1)
x_t = tf.expand_dims(x_t, axis=-1)
meshgrid = tf.concat([y_t, x_t], axis=-1)
meshgrid = tf.expand_dims(meshgrid, 0)
meshgrid = tf.expand_dims(meshgrid, 3) # 1, h, w, 1, 2
probs = mvn.prob(meshgrid)
probs = tf.reshape(probs, (h, w, b, p))
probs = tf.transpose(probs, perm=[2, 0, 1, 3]) # move part axis to the back
return probs
if __name__ == "__main__":
tf.enable_eager_execution()
x = tf.ones((1, 128, 128, 3))
config = [32, 64]
extra_resnets = 0
upsample_method = "subpixel"
activation = "leaky_relu"
coords = False
unet = make_model(
"unet",
hourglass_model,
config=config,
extra_resnets=extra_resnets,
upsample_method=upsample_method,
activation=activation,
)
y = unet(x)
im = np.concatenate([x, y], axis=1)
plt.imshow(np.squeeze(im))