Sipping from the twitter stream

I’ve been playing around with connecting to twitter’s streaming API, and displaying a live stream of tweets returned.
To do this, I was originally using DotNetOpenAuth, along with the HttpClient, which worked fine for the sample stream, but would return authentication errors for the filtered stream. I looked at the HTML message in fiddler2, and the oauth parameters weren’t ordered, which twitter requires. Instead, I’m using TwitterDoodle, which uses HttpClient.
The C#5/.NET4.5 async/await code is quite elegant – it’s not CPU intensive so it’s fine running on the UI thread, without blocking. My first instinct prior to C#5 would have been to use RX for this, but now if I’m doing something simple I’d stick to async/await, only using RX if doing something more complex like buffering or batching.

    private async void ProcessTweets()
    {
      using (var t = new TwitterConnector())
      {
        var response = await t.GetSampleFirehoseConnection();
        var res = await response.Content.ReadAsStreamAsync();
        using (var streamReader = new StreamReader(resEncoding.UTF8))
        {
          // streamReader.EndOfStream can block, so instead check for null
          while (!cts.IsCancellationRequested)
          {
            var r = await streamReader.ReadLineAsync();
            if (r == null) { return; }
            ProcessTweetText(r);
          }
        }
      }
    }
 
    private void ProcessTweetText(string r)
    {
      if (!string.IsNullOrEmpty(r))
      {
        var tweetJToken = JsonConvert.DeserializeObject<dynamic>(r);
        var tweetObj = tweetJToken["text"];
        if (tweetObj != null)
        {
          var tweetText = tweetObj.ToString();
          viewModel.Items.Add(tweetText);
        }
      }
    }

The equivalent F# async code obviously looks quite similar, with the added goodness of Type Providers. Time dependent, I am planning to do some more analysis of tweets which would be a good fit for F#.

type tweetProvider = JsonProvider<"SampleTweet.json"SampleList=true>
 
type MainWindowViewModel() =
  inherit ViewModelBase()
  let items = new ObservableCollection<string>()
  member x.Items
    with get () = items
  member x.ProcessTweet tweet =
    let tweetParsed = tweetProvider.Parse(tweet)
    match tweetParsed.Text with
    | Some(v->  x.Items.Add v
    | None -> ()
  member x.ProcessTweets =
    let a = async {
      use t = new TwitterConnector()
      let! response = t.GetSampleFirehoseConnection() |> Async.AwaitTask
      let! result = response.Content.ReadAsStreamAsync() |> Async.AwaitTask
      use streamReader = new StreamReader(resultEncoding.UTF8)
      let rec processTweetsAsync (s:StreamReader=
        async {
          let! r = s.ReadLineAsync() |> Async.AwaitTask
          // streamReader.EndOfStream can block, so instead check for null
          if r <> null then
            x.ProcessTweet r
            return! processTweetsAsync s
        }
      do! processTweetsAsync streamReader
    }
    a |> Async.StartImmediate
  member x.GoCommand = 
    new RelayCommand ((fun canExecute -> true), 
      (fun action -> x.ProcessTweets))

Labels: , ,