{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Chapter 12 – Distributed TensorFlow**"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "_This notebook contains all the sample code and solutions to the exercises in chapter 12._"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Setup"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "First, let's make sure this notebook works well in both python 2 and 3, import a few common modules, ensure MatplotLib plots figures inline and prepare a function to save the figures:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "# To support both python 2 and python 3\n",
    "from __future__ import division, print_function, unicode_literals\n",
    "\n",
    "# Common imports\n",
    "import numpy as np\n",
    "import os\n",
    "\n",
    "# to make this notebook's output stable across runs\n",
    "def reset_graph(seed=42):\n",
    "    tf.reset_default_graph()\n",
    "    tf.set_random_seed(seed)\n",
    "    np.random.seed(seed)\n",
    "\n",
    "# To plot pretty figures\n",
    "%matplotlib inline\n",
    "import matplotlib\n",
    "import matplotlib.pyplot as plt\n",
    "plt.rcParams['axes.labelsize'] = 14\n",
    "plt.rcParams['xtick.labelsize'] = 12\n",
    "plt.rcParams['ytick.labelsize'] = 12\n",
    "\n",
    "# Where to save the figures\n",
    "PROJECT_ROOT_DIR = \".\"\n",
    "CHAPTER_ID = \"distributed\"\n",
    "\n",
    "def save_fig(fig_id, tight_layout=True):\n",
    "    path = os.path.join(PROJECT_ROOT_DIR, \"images\", CHAPTER_ID, fig_id + \".png\")\n",
    "    print(\"Saving figure\", fig_id)\n",
    "    if tight_layout:\n",
    "        plt.tight_layout()\n",
    "    plt.savefig(path, format='png', dpi=300)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Local server"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "import tensorflow as tf"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "c = tf.constant(\"Hello distributed TensorFlow!\")\n",
    "server = tf.train.Server.create_local_server()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "b'Hello distributed TensorFlow!'\n"
     ]
    }
   ],
   "source": [
    "with tf.Session(server.target) as sess:\n",
    "    print(sess.run(c))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Cluster"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "cluster_spec = tf.train.ClusterSpec({\n",
    "    \"ps\": [\n",
    "        \"127.0.0.1:2221\",  # /job:ps/task:0\n",
    "        \"127.0.0.1:2222\",  # /job:ps/task:1\n",
    "    ],\n",
    "    \"worker\": [\n",
    "        \"127.0.0.1:2223\",  # /job:worker/task:0\n",
    "        \"127.0.0.1:2224\",  # /job:worker/task:1\n",
    "        \"127.0.0.1:2225\",  # /job:worker/task:2\n",
    "    ]})"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "task_ps0 = tf.train.Server(cluster_spec, job_name=\"ps\", task_index=0)\n",
    "task_ps1 = tf.train.Server(cluster_spec, job_name=\"ps\", task_index=1)\n",
    "task_worker0 = tf.train.Server(cluster_spec, job_name=\"worker\", task_index=0)\n",
    "task_worker1 = tf.train.Server(cluster_spec, job_name=\"worker\", task_index=1)\n",
    "task_worker2 = tf.train.Server(cluster_spec, job_name=\"worker\", task_index=2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Pinning operations across devices and servers"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "reset_graph()\n",
    "\n",
    "with tf.device(\"/job:ps\"):\n",
    "    a = tf.Variable(1.0, name=\"a\")\n",
    "\n",
    "with tf.device(\"/job:worker\"):\n",
    "    b = a + 2\n",
    "\n",
    "with tf.device(\"/job:worker/task:1\"):\n",
    "    c = a + b"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "4.0\n"
     ]
    }
   ],
   "source": [
    "with tf.Session(\"grpc://127.0.0.1:2221\") as sess:\n",
    "    sess.run(a.initializer)\n",
    "    print(c.eval())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "reset_graph()\n",
    "\n",
    "with tf.device(tf.train.replica_device_setter(\n",
    "        ps_tasks=2,\n",
    "        ps_device=\"/job:ps\",\n",
    "        worker_device=\"/job:worker\")):\n",
    "    v1 = tf.Variable(1.0, name=\"v1\")  # pinned to /job:ps/task:0 (defaults to /cpu:0)\n",
    "    v2 = tf.Variable(2.0, name=\"v2\")  # pinned to /job:ps/task:1 (defaults to /cpu:0)\n",
    "    v3 = tf.Variable(3.0, name=\"v3\")  # pinned to /job:ps/task:0 (defaults to /cpu:0)\n",
    "    s = v1 + v2            # pinned to /job:worker (defaults to task:0/cpu:0)\n",
    "    with tf.device(\"/task:1\"):\n",
    "        p1 = 2 * s         # pinned to /job:worker/task:1 (defaults to /cpu:0)\n",
    "        with tf.device(\"/cpu:0\"):\n",
    "            p2 = 3 * s     # pinned to /job:worker/task:1/cpu:0\n",
    "\n",
    "config = tf.ConfigProto()\n",
    "config.log_device_placement = True\n",
    "\n",
    "with tf.Session(\"grpc://127.0.0.1:2221\", config=config) as sess:\n",
    "    v1.initializer.run()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Readers – the old way"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "reset_graph()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[1.0, 6, 44]\n"
     ]
    }
   ],
   "source": [
    "default1 = tf.constant([5.])\n",
    "default2 = tf.constant([6])\n",
    "default3 = tf.constant([7])\n",
    "dec = tf.decode_csv(tf.constant(\"1.,,44\"),\n",
    "                    record_defaults=[default1, default2, default3])\n",
    "with tf.Session() as sess:\n",
    "    print(sess.run(dec))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "No more files to read\n",
      "[array([[ 4.,  5.],\n",
      "       [ 1., -1.]], dtype=float32), array([1, 0], dtype=int32)]\n",
      "[array([[7., 8.]], dtype=float32), array([0], dtype=int32)]\n",
      "No more training instances\n"
     ]
    }
   ],
   "source": [
    "reset_graph()\n",
    "\n",
    "test_csv = open(\"my_test.csv\", \"w\")\n",
    "test_csv.write(\"x1, x2 , target\\n\")\n",
    "test_csv.write(\"1.,, 0\\n\")\n",
    "test_csv.write(\"4., 5. , 1\\n\")\n",
    "test_csv.write(\"7., 8. , 0\\n\")\n",
    "test_csv.close()\n",
    "\n",
    "filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])\n",
    "filename = tf.placeholder(tf.string)\n",
    "enqueue_filename = filename_queue.enqueue([filename])\n",
    "close_filename_queue = filename_queue.close()\n",
    "\n",
    "reader = tf.TextLineReader(skip_header_lines=1)\n",
    "key, value = reader.read(filename_queue)\n",
    "\n",
    "x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])\n",
    "features = tf.stack([x1, x2])\n",
    "\n",
    "instance_queue = tf.RandomShuffleQueue(\n",
    "    capacity=10, min_after_dequeue=2,\n",
    "    dtypes=[tf.float32, tf.int32], shapes=[[2],[]],\n",
    "    name=\"instance_q\", shared_name=\"shared_instance_q\")\n",
    "enqueue_instance = instance_queue.enqueue([features, target])\n",
    "close_instance_queue = instance_queue.close()\n",
    "\n",
    "minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)\n",
    "\n",
    "with tf.Session() as sess:\n",
    "    sess.run(enqueue_filename, feed_dict={filename: \"my_test.csv\"})\n",
    "    sess.run(close_filename_queue)\n",
    "    try:\n",
    "        while True:\n",
    "            sess.run(enqueue_instance)\n",
    "    except tf.errors.OutOfRangeError as ex:\n",
    "        print(\"No more files to read\")\n",
    "    sess.run(close_instance_queue)\n",
    "    try:\n",
    "        while True:\n",
    "            print(sess.run([minibatch_instances, minibatch_targets]))\n",
    "    except tf.errors.OutOfRangeError as ex:\n",
    "        print(\"No more training instances\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "#coord = tf.train.Coordinator()\n",
    "#threads = tf.train.start_queue_runners(coord=coord)\n",
    "#filename_queue = tf.train.string_input_producer([\"test.csv\"])\n",
    "#coord.request_stop()\n",
    "#coord.join(threads)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Queue runners and coordinators"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[array([[ 7.,  8.],\n",
      "       [ 1., -1.]], dtype=float32), array([0, 0], dtype=int32)]\n",
      "[array([[4., 5.]], dtype=float32), array([1], dtype=int32)]\n",
      "No more training instances\n"
     ]
    }
   ],
   "source": [
    "reset_graph()\n",
    "\n",
    "filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])\n",
    "filename = tf.placeholder(tf.string)\n",
    "enqueue_filename = filename_queue.enqueue([filename])\n",
    "close_filename_queue = filename_queue.close()\n",
    "\n",
    "reader = tf.TextLineReader(skip_header_lines=1)\n",
    "key, value = reader.read(filename_queue)\n",
    "\n",
    "x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])\n",
    "features = tf.stack([x1, x2])\n",
    "\n",
    "instance_queue = tf.RandomShuffleQueue(\n",
    "    capacity=10, min_after_dequeue=2,\n",
    "    dtypes=[tf.float32, tf.int32], shapes=[[2],[]],\n",
    "    name=\"instance_q\", shared_name=\"shared_instance_q\")\n",
    "enqueue_instance = instance_queue.enqueue([features, target])\n",
    "close_instance_queue = instance_queue.close()\n",
    "\n",
    "minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)\n",
    "\n",
    "n_threads = 5\n",
    "queue_runner = tf.train.QueueRunner(instance_queue, [enqueue_instance] * n_threads)\n",
    "coord = tf.train.Coordinator()\n",
    "\n",
    "with tf.Session() as sess:\n",
    "    sess.run(enqueue_filename, feed_dict={filename: \"my_test.csv\"})\n",
    "    sess.run(close_filename_queue)\n",
    "    enqueue_threads = queue_runner.create_threads(sess, coord=coord, start=True)\n",
    "    try:\n",
    "        while True:\n",
    "            print(sess.run([minibatch_instances, minibatch_targets]))\n",
    "    except tf.errors.OutOfRangeError as ex:\n",
    "        print(\"No more training instances\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[array([[ 4.,  5.],\n",
      "       [ 1., -1.]], dtype=float32), array([1, 0], dtype=int32)]\n",
      "[array([[7., 8.]], dtype=float32), array([0], dtype=int32)]\n",
      "No more training instances\n"
     ]
    }
   ],
   "source": [
    "reset_graph()\n",
    "\n",
    "def read_and_push_instance(filename_queue, instance_queue):\n",
    "    reader = tf.TextLineReader(skip_header_lines=1)\n",
    "    key, value = reader.read(filename_queue)\n",
    "    x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])\n",
    "    features = tf.stack([x1, x2])\n",
    "    enqueue_instance = instance_queue.enqueue([features, target])\n",
    "    return enqueue_instance\n",
    "\n",
    "filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])\n",
    "filename = tf.placeholder(tf.string)\n",
    "enqueue_filename = filename_queue.enqueue([filename])\n",
    "close_filename_queue = filename_queue.close()\n",
    "\n",
    "instance_queue = tf.RandomShuffleQueue(\n",
    "    capacity=10, min_after_dequeue=2,\n",
    "    dtypes=[tf.float32, tf.int32], shapes=[[2],[]],\n",
    "    name=\"instance_q\", shared_name=\"shared_instance_q\")\n",
    "\n",
    "minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)\n",
    "\n",
    "read_and_enqueue_ops = [read_and_push_instance(filename_queue, instance_queue) for i in range(5)]\n",
    "queue_runner = tf.train.QueueRunner(instance_queue, read_and_enqueue_ops)\n",
    "\n",
    "with tf.Session() as sess:\n",
    "    sess.run(enqueue_filename, feed_dict={filename: \"my_test.csv\"})\n",
    "    sess.run(close_filename_queue)\n",
    "    coord = tf.train.Coordinator()\n",
    "    enqueue_threads = queue_runner.create_threads(sess, coord=coord, start=True)\n",
    "    try:\n",
    "        while True:\n",
    "            print(sess.run([minibatch_instances, minibatch_targets]))\n",
    "    except tf.errors.OutOfRangeError as ex:\n",
    "        print(\"No more training instances\")\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Setting a timeout"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2.0\n",
      "6.0\n",
      "3.0\n",
      "4.0\n",
      "Timed out while dequeuing\n"
     ]
    }
   ],
   "source": [
    "reset_graph()\n",
    "\n",
    "q = tf.FIFOQueue(capacity=10, dtypes=[tf.float32], shapes=[()])\n",
    "v = tf.placeholder(tf.float32)\n",
    "enqueue = q.enqueue([v])\n",
    "dequeue = q.dequeue()\n",
    "output = dequeue + 1\n",
    "\n",
    "config = tf.ConfigProto()\n",
    "config.operation_timeout_in_ms = 1000\n",
    "\n",
    "with tf.Session(config=config) as sess:\n",
    "    sess.run(enqueue, feed_dict={v: 1.0})\n",
    "    sess.run(enqueue, feed_dict={v: 2.0})\n",
    "    sess.run(enqueue, feed_dict={v: 3.0})\n",
    "    print(sess.run(output))\n",
    "    print(sess.run(output, feed_dict={dequeue: 5}))\n",
    "    print(sess.run(output))\n",
    "    print(sess.run(output))\n",
    "    try:\n",
    "        print(sess.run(output))\n",
    "    except tf.errors.DeadlineExceededError as ex:\n",
    "        print(\"Timed out while dequeuing\")\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Data API"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The Data API, introduced in TensorFlow 1.4, makes reading data efficiently much easier."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [],
   "source": [
    "tf.reset_default_graph()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's start with a simple dataset composed of three times the integers 0 to 9, in batches of 7:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [],
   "source": [
    "dataset = tf.data.Dataset.from_tensor_slices(np.arange(10))\n",
    "dataset = dataset.repeat(3).batch(7)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The first line creates a dataset containing the integers 0 through 9. The second line creates a new dataset based on the first one, repeating its elements three times and creating batches of 7 elements. As you can see, we start with a source dataset, then we chain calls to various methods to apply transformations to the data."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next, we create a one-shot-iterator to go through this dataset just once, and we call its `get_next()` method to get a tensor that represents the next element."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [],
   "source": [
    "iterator = dataset.make_one_shot_iterator()\n",
    "next_element = iterator.get_next()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's repeatedly evaluate `next_element` to go through the dataset. When there are not more elements, we get an `OutOfRangeError`:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[0 1 2 3 4 5 6]\n",
      "[7 8 9 0 1 2 3]\n",
      "[4 5 6 7 8 9 0]\n",
      "[1 2 3 4 5 6 7]\n",
      "[8 9]\n",
      "Done\n"
     ]
    }
   ],
   "source": [
    "with tf.Session() as sess:\n",
    "    try:\n",
    "        while True:\n",
    "            print(next_element.eval())\n",
    "    except tf.errors.OutOfRangeError:\n",
    "        print(\"Done\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Great! It worked fine."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note that, as always, a tensor is only evaluated once each time we run the graph (`sess.run()`): so even if we evaluate multiple tensors that all depend on `next_element`, it is only evaluated once. This is true as well if we ask for `next_element` to be evaluated twice in just one run:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[array([0, 1, 2, 3, 4, 5, 6]), array([0, 1, 2, 3, 4, 5, 6])]\n",
      "[array([7, 8, 9, 0, 1, 2, 3]), array([7, 8, 9, 0, 1, 2, 3])]\n",
      "[array([4, 5, 6, 7, 8, 9, 0]), array([4, 5, 6, 7, 8, 9, 0])]\n",
      "[array([1, 2, 3, 4, 5, 6, 7]), array([1, 2, 3, 4, 5, 6, 7])]\n",
      "[array([8, 9]), array([8, 9])]\n",
      "Done\n"
     ]
    }
   ],
   "source": [
    "with tf.Session() as sess:\n",
    "    try:\n",
    "        while True:\n",
    "            print(sess.run([next_element, next_element]))\n",
    "    except tf.errors.OutOfRangeError:\n",
    "        print(\"Done\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The `interleave()` method is powerful but a bit tricky to grasp at first. The easiest way to understand it is to look at an example:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [],
   "source": [
    "tf.reset_default_graph()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [],
   "source": [
    "dataset = tf.data.Dataset.from_tensor_slices(np.arange(10))\n",
    "dataset = dataset.repeat(3).batch(7)\n",
    "dataset = dataset.interleave(\n",
    "    lambda v: tf.data.Dataset.from_tensor_slices(v),\n",
    "    cycle_length=3,\n",
    "    block_length=2)\n",
    "iterator = dataset.make_one_shot_iterator()\n",
    "next_element = iterator.get_next()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "0,1,7,8,4,5,2,3,9,0,6,7,4,5,1,2,8,9,6,3,0,1,2,8,9,3,4,5,6,7,Done\n"
     ]
    }
   ],
   "source": [
    "with tf.Session() as sess:\n",
    "    try:\n",
    "        while True:\n",
    "            print(next_element.eval(), end=\",\")\n",
    "    except tf.errors.OutOfRangeError:\n",
    "        print(\"Done\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Because `cycle_length=3`, the new dataset starts by pulling 3 elements from the previous dataset: that's `[0,1,2,3,4,5,6]`, `[7,8,9,0,1,2,3]` and `[4,5,6,7,8,9,0]`. Then it calls the lambda function we gave it to create one dataset for each of the elements. Since we use `Dataset.from_tensor_slices()`, each dataset is going to return its elements one by one. Next, it pulls two items (since `block_length=2`) from each of these three datasets, and it iterates until all three datasets are out of items: 0,1 (from 1st), 7,8 (from 2nd), 4,5 (from 3rd), 2,3 (from 1st), 9,0 (from 2nd), and so on until 8,9 (from 3rd), 6 (from 1st), 3 (from 2nd), 0 (from 3rd). Next it tries to pull the next 3 elements from the original dataset, but there are just two left: `[1,2,3,4,5,6,7]` and `[8,9]`. Again, it creates datasets from these elements, and it pulls two items from each until both datasets are out of items: 1,2 (from 1st), 8,9 (from 2nd), 3,4 (from 1st), 5,6 (from 1st), 7 (from 1st). Notice that there's no interleaving at the end since the arrays do not have the same length."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Readers – the new way"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Instead of using a source dataset based on `from_tensor_slices()` or `from_tensor()`, we can use a reader dataset. It handles most of the complexity for us (e.g., threads):"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [],
   "source": [
    "tf.reset_default_graph()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [],
   "source": [
    "filenames = [\"my_test.csv\"]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [],
   "source": [
    "dataset = tf.data.TextLineDataset(filenames)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We still need to tell it how to decode each line:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {},
   "outputs": [],
   "source": [
    "def decode_csv_line(line):\n",
    "    x1, x2, y = tf.decode_csv(\n",
    "        line, record_defaults=[[-1.], [-1.], [-1.]])\n",
    "    X = tf.stack([x1, x2])\n",
    "    return X, y"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next, we can apply this decoding function to each element in the dataset using `map()`:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {},
   "outputs": [],
   "source": [
    "dataset = dataset.skip(1).map(decode_csv_line)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Finally, let's create a one-shot iterator:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {},
   "outputs": [],
   "source": [
    "it = dataset.make_one_shot_iterator()\n",
    "X, y = it.get_next()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[ 1. -1.] 0.0\n",
      "[4. 5.] 1.0\n",
      "[7. 8.] 0.0\n",
      "Done\n"
     ]
    }
   ],
   "source": [
    "with tf.Session() as sess:\n",
    "    try:\n",
    "        while True:\n",
    "            X_val, y_val = sess.run([X, y])\n",
    "            print(X_val, y_val)\n",
    "    except tf.errors.OutOfRangeError as ex:\n",
    "        print(\"Done\")\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "# Exercise solutions"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Coming soon**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.5"
  },
  "nav_menu": {},
  "toc": {
   "navigate_menu": true,
   "number_sections": true,
   "sideBar": true,
   "threshold": 6,
   "toc_cell": false,
   "toc_section_display": "block",
   "toc_window_display": false
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}