maaash.jp

what I create

Adobe Stratusサンプルを読み解く

Stratus Sample Application
がコードといっしょに公開されているので読んでみる。

■NetConnectionでAdobeの提供しているrtmfpサーバstratusにつなぐ
予めDeveloperKeyを手に入れておく。

[as]netConnection = new NetConnection();
netConnection.addEventListener(NetStatusEvent.NET_STATUS, netConnectionHandler);
netConnection.connect(“rtmfp://stratus.adobe.com/” + DeveloperKey);
[/as]

NetStatusEventの NetConnection.Connect.Sucess になると、
NetConnection.nearID がとれる
この64桁のIDが、他のPeerから自分につなぐために、
相手が知る必要のある情報

webサービスに登録しておいて、
ユーザー名と照合できるようにしておく
#これがHttpIdManager.asのお仕事

なんだろサンプルでは4つのNetStream使ってるけど、
お互いの2つだけでいけそうだなぁ。

■音声/映像のやり取り
ユーザー名知ってる人に電話かけようかなと思ったら、
webサービスに照合して、相手の64桁のIDをもらい、
それをNetStreamに渡す(identity)

[as]incomingStream = new NetStream(netConnection, identity);
incomingStream.play(“media-callee”);
[/as]

playすれば音が鳴るし、VideoにattachNetStreamすれば相手の映像が見える
出す方は、DIRECT_CONNECTIONS設定してpublishする

[as]
outgoingStream = new NetStream(netConnection, NetStream.DIRECT_CONNECTIONS);
outgoingStream.publish(“media-caller”);[/as]

publish,playに渡す引数は、電話かける方はmedia-caller,受ける方はmedia-calleeとしてるけどなんでもよさそう。
ルールを決めておけばよい。

↓clientにonPeerConnectって関数を入れておくと、
誰かPeerがつないできたときに、farIDが取れる。
onPeerConnectがtrueを返せば接続ok、
falseを返せば接続ngッぽいので、
非接続側でもう一度webサービスにfarIDを照合して誰が電話かけてきたのか着信履歴表示や着信拒否ができそうだね

[as]
var o:Object = new Object
o.onPeerConnect = function(caller:NetStream):Boolean {
status(“Callee connecting to media stream: ” + caller.farID + “\n”);
return true;
}
outgoingStream.client = o;
[/as]

■RPC
NetStream.sendを使えばPeer2PeerのRPCも簡単。

[as]
outgoingStream.send(“onIm”, userNameInput.text, msg);[/as]

受け側のNetStream.clientにぶらさがってる関数が呼び出される

[as]i.onIm = function(name:String, text:String):void {
textOutput.text += name + “: ” + text + “\n”;
}[/as]

■帯域
NetStream.info.audioBytesPerSecond, NetSteam.info.videoBytesPerSecond,
とか使うとどれくらい帯域使ってるのか見えるようだ。
おー droppedFrames ってプロパティとかもある。

■結論
AIR1.5でもいけるようだし、こりゃ機能的にはSkypeレベルのものが簡単につくれそうだ。
あとは映像や音声の品質がどれくらい出るか。

Adobeの中の人のarticleもおもしろい

Stratus service for developing end-to-end applications using RTMFP in Flash Player
・Nat越えはUDPで。
・ファイアウォールやSymmetric Natだった場合用に、TURN proxy (Traversal Using Relays around NAT)を使うようにFlash Playerを設定することも可能だとか。でもローカルのmms.cfgファイルにTURNプロキシのIPを入れたり、ってそんなめんどくさいことできないわ。Nat越えできない人たちはとりあえず切り捨てそうだね。

Lingua::JA::Yomi 日本語読みモジュールをつくった

英語を手軽に日本語にしたいと思ってつくった。

今あるモジュールだと、Lingua::JA::Kanaっていうのがあってローマ字→ひらがな変換はできる。

1
2
3
4
5
6
7
8
9
10
11
#!/usr/bin/env perl

use strict;
use warnings;
use Lingua::JA::Kana;
use utf8;
use Encode;

my $hiragana = romaji2hiragana(‘aerosmith’);  
print ‘hiragana: ‘.Encode::encode(‘utf8′,$hiragana).”\n”;  
\# hiragana: あえろsみth  

でもローマ字にあてはまらないのは上記のように残念な感じになるので、
Lingua::JA::Yomi ってのをつくった。
http://coderepos.org/share/browser/lang/perl/Lingua-JA-Yomi/trunk

1
2
3
4
5
6
7
8
9
10
#!/usr/bin/env perl  
use strict;
use warnings;
use Test::More qw/no_plan/;
use utf8;

use Lingua::JA::Yomi;

my $converter = new Lingua::JA::Yomi;
is( $converter->convert(‘aerosmith’), ‘エアロウスミス’,'aerosmith’);

ルー語インスパイアなので辞書は
Bilingual Emacspeak Project
のを使わせていただいております。

今、あの紫の本を読んでるので、再帰処理で少しずつ変換していくとこを実装するのがたのしかったー

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# pass in utf8 flagged string  
sub convert {
my ($self, $roman, $remainder) = @_;
$remainder ||= ”;  
print “[convert]roman: $roman remainder: $remainder\n” if $self->debug;

return if ! $roman;

$roman = uc $roman;

if ( $roman =~ /^([^A-Z]+)(.*)/ ) {
\# preserve symbols  
return $2 ? ( $1 . $self->convert($2) ) : $1;
}
elsif ( exists $self->dic->{$roman} ) {
print “[convert]found: $roman, “.Encode::encode(‘utf8′,$self->dic->{$roman}).”\n” if $self->debug;  
if ( ! $remainder ) {
return $self->dic->{$roman};
}
else {
return $self->dic->{$roman} . $self->convert( $remainder );
}
}
else {
my $last\_of\_roman = chop( $roman );
return $self->convert( $roman, $last\_of\_roman . ($remainder || ”) );  
}
}

perl って何回まで再帰できるんだろ

FlashによるP2Pセッション確立用サーバStratus

って理解でいいのかな。

Adobe MAX 2008 US での発表つづき
Stratusってなんだろって探すとある

Stratus

http://labs.adobe.com/wiki/index.php/Stratus

Flash Player 10 and Adobe AIR 1.5 introduce a new communications protocol called the Real-Time Media Flow Protocol (RTMFP). The most important features of RTMFP include low latency, end-to-end peering capability, security and scalability. These properties make RTMFP especially well suited for developing real-time collaboration applications by not only providing superior user experience but also reducing cost for operators.

In order to use RTMFP, Flash Player endpoints must connect to an RTMFP-capable server, such as the Adobe Stratus service. Stratus is a Beta hosted rendezvous service that aids establishing communications between Flash Player endpoints. Unlike Flash Media Server, Stratus does not support media relay, shared objects, scripting, etc. So by using Stratus, you can only develop applications where Flash Player endpoints are directly communicating with each other.

Stratus:FAQ

How will I make a P2P connection through Flash Player 10 or Adobe AIR 1.5? Connections from a SWF through an RTMFP-capable server will be assigned a temporary unique ID that is infeasible to guess or forge. Other SWFs connected to the same server can subscribe to the streams and events from that ID once the broadcasting SWF agrees to the connection.

クライアントに接続する際に、StratusがIDをくれる。
xmppのJIDにあたるものを毎回つくるんだ。

ストリーミングするならFMS、P2PするならStratus
ってことなのかな
FMSでサポートするのかと思ってたけどでっかいから分けたのかなぁ

Can I get started with RTMFP now? Stratus will allow you to begin developing RTMFP-enabled applications. It will be available on Adobe Labs before the end of the year.

楽しみ

C#とアンマネージドC++のDLL間で文字列の配列をやり取りする

マネージド環境(C#)からアンマネージド環境になんか渡す時にマーシャリングというのが自動で起きて、
変数の相互型変換をしてくれるようだ。
名前がわかりにくすぎる。

C#からC++に文字列の配列を渡すとこ

C#でC++dllの関数使うよって宣言
[csharp][DllImport(“pcp.dll”)]
private static extern int call_array([MarshalAs(UnmanagedType.LPArray, ArraySubType = UnmanagedType.LPWStr, SizeParamIndex = 1)]string[] files, int length);
[/csharp]

C#側コード
[csharp]string[] files = new string[2];
files[0] = “日本語1″;
files[1] = “日本語2″;
int ret = call_array(files,2);
return ret;[/csharp]

C++側
[cpp]int call_array( wchar_t* files, int length ) {
setlocale(LC_ALL,”japanese”);
for ( int i=0; i<length; i++ ) {
wprintf(L”%d: %s\n”,i,
(files+i));
}
return 10;
}[/cpp]
defファイルに書いとく。

C++からC#のコールバックに文字列の配列を渡す

C#のコールバック
[csharp]public static int onReceivedFiles(IntPtr ptr, int nVal) {
IntPtr pptr;
string[] files = new string[nVal];
for( int n = 0; n < nVal; n++ ) {
// スタックに積まれて来るのはポインタのみなので
pptr = Marshal.ReadIntPtr( ptr, n * 4 );
files[n] = Marshal.PtrToStringAuto( pptr );
}
return 1;
}[/csharp]

コールバック関数を渡す先のC++側関数とdelegate
[cpp][DllImport(“pcp.dll”)]
private static extern int start(StringArrayCallback cb);
public delegate int StringArrayCallback(IntPtr ptr, int nVal);
[/cpp]

C++にコールバック関数を渡す
[cpp]StringArrayCallback cb = new StringArrayCallback(onReceivedFiles);
int ret = start(cb);[/cpp]

C++からC#のコールバック関数を呼ぶ
[cpp]std::wstring message1 = L”日本語1″;
std::wstring message2
= L”日本語2″;
wchar_t* messages_[2] = { (wchar_t*)message1_.c_str(), (wchar_t*)message2_.c_str() };
int ret = callback(messages_, sizeof(messages_)/sizeof(messages[0]));
[/cpp]

ExternalInterfaceで送れるデータの限界&全ブラウザ対応のブラクラができた

最初はブラクラなんてつくる気無かったんですが。

今度ExternalInterfaceを使って最大kBオーダーのテキストをjavascript→Flashに渡すようなアプリをつくるかもしれなくて、
ExternalInterface.addCallbackの限界を試したかったんです。

その結果は、IE7、FF2ともに 33,554,432[Byte]まではokだった。
その2倍はNG。IE7,FF2ともにメモリが不足しています、っていってjsがエラーる。

へー。

そしたらExternalInterface.callでFlash→javascriptにどんだけ渡せるのか試したくなる。

16,777,216[Byte]まではok。その2倍はIE7,FF2ともにNG。Flash側でメモリが足りなくなるようで、
IE7,FF2ともに終了。

へーへー。

Opera9.*,Safari3(Windows)も死亡
macは試してない

しかし、意外とたくさん送れるもんだなぁ。

検証コードはこんな

actionscript3
[as]package {
import flash.display.Sprite;
import flash.system.Security;
import flash.external.ExternalInterface;
import flash.utils.setTimeout;
public class RPCClient extends Sprite{
private var debug :Boolean = true;
Security.allowDomain(‘*’);

public function RPCClient(){
ExternalInterface.addCallback(“xi_send”, xi_send);
var message :String = “a”;
setTimeout( function() :void {
message = message + message;
ExternalInterface.call(“show_length_of(‘”+message+”‘)”);
setTimeout( arguments.callee, 50 );
}, 50 );
}

private function xi_send( st :String ) :void {
logger(“[xi_send]len: “+st.length);
}

private function logger(… args):void{
if(!debug){ return; }
log.apply(null,(new Array(“[RPCClient]“,this)).concat(args)); // to firebug
//ExternalInterface.call(“alert(‘”+args+”‘)”);
}
}
}[/as]

javascript
require prototype.js
[js]RPC = new function() {
var clientid = “externalRPCClient”;
return {
client : null,
init : function() {
var so = new SWFObject( ‘/swf/RPCClient.swf’, clientid, 1, 1, “9″, “#FFFFFF” );
so.addParam(“allowScriptAccess”, “always”);
so.addParam(“align”, “middle”);
so.addParam(“wmode”,”transparent”);
so.addVariable( “key”, “whatever” );
so.write( “rpc_container” );
},
start : function() {
this.client = navigator.userAgent.match(/MSIE/) ? window[clientid] : document[clientid];
if ( !this.client || typeof(this.client.xi_send)!=”function” ) {
console.log(“[start]not ready”);
return;
}
var sending_message = “a”;
new (function() {
sending_message = sending_message + sending_message;
this.client.xi_send( sending_message );
setTimeout( arguments.callee.bind(this), 50 );
}.bind(this));
}
};
}
document.observe(“dom:loaded”, RPC.init.bind(RPC));

function show_length_of( message ) {
alert(“[show_length_of]“+message.length);
//console.log(“[show_length_of]“+message.length);
}
[/js]

デモ
※注意。ブラウザが強制終了します

http://maaash.jp/lab/crasher/

speex+portaudioを使ったechoアプリを書いてみた

マイク → speexエンコーダ → speexデコーダ → スピーカー
だけの。
speexエンコーダ、speexデコーダ無しだと問題ないところ、ありにすると、
なんか高音のノイズが入るのはなぜ?

speex-devのMLに質問中。。。

コードはこんな
[cpp]

include “stdafx.h”

include

include

include #include

include

define USE_SPEEX

define SAMPLE_RATE (44100)

define FRAMES_PER_BUFFER (256)

define NUM_SECONDS (10)

//#define NUM_CHANNELS (2)

define NUM_CHANNELS (1) // try mono first

/* #define DITHER_FLAG (paDitherOff) */

define DITHER_FLAG (0) /**/

/* Select sample format. */

define PA_SAMPLE_TYPE paFloat32

typedef float SAMPLE;

define SAMPLE_SILENCE (0.0f)

define PRINTF_S_FORMAT “%.8f”

typedef struct {
int frameIndex; /* Index into sample array. */
int maxFrameIndex;
std::queue
qu_microphone;
std::queue qu_speex;
std::queue
qu_speaker;
} paTestData;

// speex globals
SpeexBits encoder_bits;
SpeexBits decoder_bits;
void encoder_state;
void
decoder_state;
unsigned int speex_frame_size; // 320 or 640 or ..
unsigned int speex_packet_size = 74; // 74 or something

/ This routine will be called by the PortAudio engine when audio is needed.
It may be called at interrupt level on some machines so don’t do anything
that could mess up the system like calling malloc() or free().
/
static int echo( const void inputBuffer,
void
outputBuffer,
unsigned long framesPerBuffer,
const PaStreamCallbackTimeInfo timeInfo,
PaStreamCallbackFlags statusFlags,
void
userData
)
{
paTestData *data = (paTestData*)userData;
const SAMPLE *rptr = (const SAMPLE*)inputBuffer;
SAMPLE *wptr = (SAMPLE*)outputBuffer;

ifdef USE_SPEEX

std::queue & qu_microphone = data->qu_microphone;
std::queue& qu_speex = data->qu_speex;
std::queue
& qu_speaker = data->qu_speaker;

else

std::queue & qu_microphone = data->qu_microphone;
//std::queue& qu_speex = data->qu_speex;
std::queue
& qu_speaker = data->qu_microphone; // point at the same queue, to feed the speaker what we got from the microphone

endif

long framesToCalc;
long i;
int finished;
unsigned long framesLeft = data->maxFrameIndex – data->frameIndex;

if( framesLeft < framesPerBuffer ) {
framesToCalc = framesLeft;
finished = paComplete;
}
else {
framesToCalc = framesPerBuffer;
finished = paContinue;
}

// process input from mic
if( rptr == 0 ) {
}
else {
for( i=0; i<framesToCalc; i++ ) {
qu_microphone.push( rptr++ );
if ( NUM_CHANNELS == 2 ) {
qu_microphone.push(
rptr++ );
}
}
}

ifdef USE_SPEEX

// speex encoder
while ( qu_microphone.size() > speex_frame_size ) {
float input[1000];
char encoded[1000];
for ( unsigned int i=0; i<speex_frame_size ; i++ ) {
input[i] = qu_microphone.front(); qu_microphone.pop();
}
speex_bits_reset( &encoder_bits );
speex_encode( encoder_state, input, &encoder_bits );
speex_packet_size = speex_bits_write( &encoder_bits, encoded, 1000 );
for ( unsigned int i=0; i<speex_packet_size; i++ ) {
qu_speex.push( encoded[i] );
}
}

// speex decoder
while ( qu_speex.size() >= speex_packet_size ) {
float output[1000];
char encoded[1000];
int nbBytes = speex_packet_size;
for ( int i=0; i<nbbytes ; i++ ) {
encoded[i] = qu_speex.front(); qu_speex.pop();
}
speex_bits_read_from( &decoder_bits, encoded, nbBytes );
speex_decode( decoder_state, &decoder_bits, output );
for ( unsigned int i=0; i<speex_frame_size; i++ ) {
qu_speaker.push( output[i] );
}
}

endif

// pop to speaker
for( i=0; i 0 ) {
wptr++ = qu_speaker.front(); qu_speaker.pop();
}
else {
wptr++ = SAMPLE_SILENCE;
}
if( NUM_CHANNELS == 2 ) {
if ( qu_speaker.size() > 0 ) {
wptr++ = qu_speaker.front(); qu_speaker.pop();
}
else {
wptr++ = SAMPLE_SILENCE;
}
}
}

data->frameIndex += framesToCalc;
return finished;
}

/**********************/
int main(void)
{
PaStreamParameters inputParameters,
outputParameters;
PaStream
stream;
PaError err = paNoError;
int totalFrames;

std::cout < < “patest_record.c”;

paTestData data;
data.maxFrameIndex = totalFrames = NUM_SECONDS * SAMPLE_RATE; /* Record for a few seconds. */
data.frameIndex = 0;
int numSamples = totalFrames * NUM_CHANNELS;
int numBytes = numSamples * sizeof(SAMPLE);
int tmp;

// encoder
encoder_state = speex_encoder_init( speex_lib_get_mode(SPEEX_MODEID_UWB) );

tmp=0;
speex_encoder_ctl(encoder_state, SPEEX_SET_VBR, &tmp);
tmp=8;
speex_encoder_ctl(encoder_state, SPEEX_SET_QUALITY, &tmp); // 8: 27,800[bps]
tmp=3;
speex_encoder_ctl(encoder_state, SPEEX_SET_COMPLEXITY, &tmp);
// 320[samples] = 20ms? according to http://www.speex.org/docs/manual/speex-manual/node7.html
speex_encoder_ctl( encoder_state, SPEEX_GET_FRAME_SIZE, &speex_frame_size );
speex_bits_init( &encoder_bits );

// decoder
decoder_state = speex_decoder_init( speex_lib_get_mode(SPEEX_MODEID_UWB) );

SpeexCallback callback;
callback.callback_id = SPEEX_INBAND_CHAR;
callback.func = speex_std_char_handler;
callback.data = stderr;
speex_decoder_ctl(decoder_state, SPEEX_SET_HANDLER, &callback);
tmp=1;
speex_decoder_ctl(decoder_state, SPEEX_SET_ENH, &tmp);
speex_bits_init( &decoder_bits );

err = Pa_Initialize();
if( err != paNoError ) goto done;

inputParameters.device = Pa_GetDefaultInputDevice(); /* default input device */
if (inputParameters.device == paNoDevice) {
fprintf(stderr,“Error: No default input device.\n”);
goto done;
}
inputParameters.channelCount = NUM_CHANNELS; /* stereo input */
inputParameters.sampleFormat = PA_SAMPLE_TYPE;
inputParameters.suggestedLatency = Pa_GetDeviceInfo( inputParameters.device )->defaultLowInputLatency;
inputParameters.hostApiSpecificStreamInfo = 0;

outputParameters.device = Pa_GetDefaultOutputDevice(); /* default output device */
if (outputParameters.device == paNoDevice) {
fprintf(stderr,”Error: No default output device.\n”);
goto done;
}
outputParameters.channelCount = NUM_CHANNELS; /* stereo output */
outputParameters.sampleFormat = PA_SAMPLE_TYPE;
outputParameters.suggestedLatency = Pa_GetDeviceInfo( outputParameters.device )->defaultLowOutputLatency;
outputParameters.hostApiSpecificStreamInfo = 0;

/* start echo ——————————————– */
err = Pa_OpenStream(
&stream,
&inputParameters,
&outputParameters,
SAMPLE_RATE,
FRAMES_PER_BUFFER,
paClipOff, /* we won’t output out of range samples so don’t bother clipping them */
echo,
&data );
if( err != paNoError ) goto done;

err = Pa_StartStream( stream );
if( err != paNoError ) goto done;
printf(“Now recording!!\n”); fflush(stdout);

while( ( err = Pa_IsStreamActive( stream ) ) == 1 )
{
Pa_Sleep(1000);
printf(“index = %d\n”, data.frameIndex ); fflush(stdout);
}
if( err < 0 ) goto done;

err = Pa_CloseStream( stream );
if( err != paNoError ) goto done;

done:
// destruct speex
speex_encoder_destroy( encoder_state );
speex_bits_destroy( &encoder_bits );
speex_decoder_destroy( decoder_state );
speex_bits_destroy( &decoder_bits );

Pa_Terminate();
if( err != paNoError )
{
fprintf( stderr, “An error occured while using the portaudio stream\n” );
fprintf( stderr, “Error number: %d\n”, err );
fprintf( stderr, “Error message: %s\n”, Pa_GetErrorText( err ) );
err = 1; /* Always return 0 or 1, but no other return codes. */
}
return err;
}

[/cpp]

libjingleでportaudioを使ったmediaengineをつくった

置いておこう。

portaudioが、マイクからデータが取れたとき、スピーカーに音を流せるとき、にcallbackを呼んでくれる形なので、
・マイクから音をとってくるところ
・スピーカーに音を流すところ
にバッファを持っている。

覚えたてのC++のSTLの明示的cast、queueを使っています。
C++おもろいなー

libjingleが確保してくれたP2Pセッションに音声データを流すときに、
“` network_interface_->SendPacket(const void, size_t);[/code]
ってのを呼ぶんだけど、
queueをconst void
に変換するとこが気持ち悪い。こんなもんなのかなぁ

音声は生のwavを送っているので、遅延は
DirectAudioを使ってたらそこで50msくらい(らしい?)
+バッファ分の遅延、
+portaudioの遅延
+通信の遅延
くらいなのかな。なんか体感かなり遅延してるのは後で調べる。

portaudiomediaengine.cc
[cpp]

include “talk/third_party/mediastreamer/mediastream.h”

include

include

include “talk/base/logging.h”

include “talk/base/thread.h”

include “talk/session/phone/codec.h”

include “talk/session/phone/portaudiomediaengine.h”

include

include

include

include

include

include “portaudio.h”

include

using namespace cricket;

static int playbackCallback( const void inputBuffer,
void
outputBuffer,
unsigned long framesPerBuffer,
const PaStreamCallbackTimeInfo timeInfo,
PaStreamCallbackFlags statusFlags,
void
userData
)
{
PortAudioMediaChannel* channel = static_cast (userData);
if ( inputBuffer!=0 ) {
channel->saveFromMicrophone( static_cast(inputBuffer), framesPerBuffer );
}
channel->pushToSpeaker( static_cast(outputBuffer), framesPerBuffer );
channel->SignalReadFromMicEvent( channel );
return paContinue;
}

PortAudioMediaChannel::PortAudioMediaChannel(PortAudioMediaEngine* eng) :
pt(-1),
audio_stream_(0),
engine
(eng)
{
PaStreamParameters inputParameters;
inputParameters.device = Pa_GetDefaultInputDevice(); /* default input device */
if (inputParameters.device == paNoDevice) {
fprintf(stderr,”Error: No default input device.\n”);
//goto done;
}
inputParameters.channelCount = 2; /* stereo input */
inputParameters.sampleFormat = PA_SAMPLE_TYPE;
inputParameters.suggestedLatency = Pa_GetDeviceInfo( inputParameters.device )->defaultLowInputLatency;
inputParameters.hostApiSpecificStreamInfo = 0;

PaStreamParameters outputParameters;
outputParameters.device = Pa_GetDefaultOutputDevice(); /* default output device */
if (outputParameters.device == paNoDevice) {
fprintf(stderr,”Error: No default output device.\n”);
//goto done;
}
outputParameters.channelCount = 2; /* stereo output */
outputParameters.sampleFormat = PA_SAMPLE_TYPE;
outputParameters.suggestedLatency = Pa_GetDeviceInfo( outputParameters.device )->defaultLowOutputLatency;
outputParameters.hostApiSpecificStreamInfo = 0;

SignalReadFromMicEvent.connect(this, &PortAudioMediaChannel::OnReadFromMic);

// Initialize PortAudio
int err = Pa_OpenStream(
&stream_,
&inputParameters,
&outputParameters,
SAMPLE_RATE, // 44100
paFramesPerBufferUnspecified, // FRAMES_PER_BUFFER
paClipOff,
playbackCallback,
this
);
if (err != paNoError)
fprintf(stderr, “Error creating a PortAudio stream: %s\n”, Pa_GetErrorText(err));

}

PortAudioMediaChannel::~PortAudioMediaChannel() {
if (stream_) {
Pa_CloseStream(stream_);
}
}

void PortAudioMediaChannel::SetCodecs(const std::vector &codecs) {
bool first = true;
std::vector
::const_iterator i;

for (i = codecs.begin(); i < codecs.end(); i++) {
if (!engine->FindCodec(*i))
continue;
if (first) {
LOG(LS_INFO) < < “Using ” << i->name < < “/” << i->clockrate;
pt
= i->id;
first = false;
}
}

if (first) {
// We’re being asked to set an empty list of codecs. This will only happen when
// working with a buggy client; let’s try PCMU.
LOG(LS_WARNING) < < “Received empty list of codces; using PCMU/8000”;
}
}

// マイクから新しいデータがとれたら、リモートに送る
void PortAudioMediaChannel::OnReadFromMic( PortAudioMediaChannel channel )
{
//char
buf[max_size];
int size = PORTAUDIO_PACKET_LENGTH;
float buff[PORTAUDIO_PACKET_LENGTH/4];
int len;
talk_base::CritScope cs(&crit_microphone);

int i=0;
for ( i=0; i<size/sizeof(float) && !qu_microphone.empty(); i++ ) {
buff[i] = qu_microphone.front();
qu_microphone.pop();
}
if ( network_interface_ && !mute ) {
const void *buf = static_cast(buff);
network_interface_->SendPacket( buff, i * sizeof(float) );
}
char dateStr [9];
strtime( dateStr );
std::cout < < “[” << dateStr << “][OnReadFromMic]qu_microphone.size(): ” << qu_microphone.size() << std::endl;
}

// 届いたパケットをスピーカーのバッファに送る
void PortAudioMediaChannel::OnPacketReceived( const void data, int len ) {
const float
reader = static_cast(data);
talk_base::CritScope cs(&crit_speaker);

LOG(INFO) < < “[PMC]OnPacketReceived data: ” << std::setprecision(3) << reader[0] << “ len: ” << len << “qu_speaker.size(): ” << qu_speaker.size();

for ( int i=0; i<len/4; i++ ) {
qu_speaker.push( *reader );
reader++;
}
}

void PortAudioMediaChannel::SetPlayout(bool playout) {

if (!stream_)
return;

if (play && !playout) {
if ( Pa_IsStreamActive(stream_) ) {
int err = Pa_StopStream(stream_);
if (err != paNoError) {
fprintf(stderr, “Error stopping PortAudio stream: %s\n”, Pa_GetErrorText(err));
LOG(LS_INFO) << “Error stopping PortAudio stream: %s\n”, Pa_GetErrorText(err);
return;
}
}
play
= false;
}
else if (!play && playout) {
if ( !Pa_IsStreamActive(stream_) ) {
int err = Pa_StartStream(stream_);
if (err != paNoError) {
fprintf(stderr, “Error starting PortAudio stream: %s\n”, Pa_GetErrorText(err));
LOG(LS_INFO) << “Error starting PortAudio stream: %s\n”, Pa_GetErrorText(err);
return;
}
}
play
= true;
}

}

void PortAudioMediaChannel::SetSend(bool send) {
mute_ = !send;
}

int PortAudioMediaChannel::GetOutputLevel() {
return 1;
}

// bufにlen分だけ書き込む => 音が出る
// リモートから届いたのを書き込む
void PortAudioMediaChannel::pushToSpeaker( float* buf, int len ) {
talk_base::CritScope cs(&crit_speaker);
int i=0;

// 遅延が大きかったら古いの消す
while ( qu_speaker.size() > (MAX_SPEAKER_QUEUE_SIZE) ) {
qu_speaker.pop();
}

for( i=0; i < len && qu_speaker.size() > 0; i++ ) {
buf++ = qu_speaker.front();
qu_speaker.pop();
if( NUM_CHANNELS == 2 ){
buf++ = qu_speaker.front();
qu_speaker.pop();
}
}
bool is_empty = false;
for( i=i; i < len; i++ ) {
is_empty = true;
buf++ = SAMPLE_SILENCE;
if( NUM_CHANNELS == 2 ){
buf++ = SAMPLE_SILENCE;
}
}
char dateStr [9];
_strtime( dateStr );
if ( is_empty ) {
std::cout << “[” << dateStr << “][pushToSpeaker]empty!!” << std::endl;
}
std::cout << “[” << dateStr << “][pushToSpeaker]qu_speaker.size(): ” << qu_speaker.size() << std::endl;
}

// マイクから来たのをためとく
void PortAudioMediaChannel::saveFromMicrophone( const float* buf, int len ) {
talk_base::CritScope cs(&crit_microphone);

for( int i=0; i<len; i++ ) {
qu_microphone.push( buf++ );
if( NUM_CHANNELS == 2 ){
qu_microphone.push(
buf++ );
}
}
char dateStr [9];
_strtime( dateStr );
std::cout << “[” << dateStr << “][saveFromMicrophone]qu_microphone.size(): ” << qu_microphone.size() << std::endl;
}

[/cpp]

portaudiomediaengine.h
[cpp]
/
* Jingle call example
* Copyright 2004–2005, Google Inc.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
/

// PortAudioMediaEngine is a Linphone implementation of MediaEngine

ifndef TALK_SESSION_PHONE_PORTAUDIOMEDIAENGINE_H_

define TALK_SESSION_PHONE_PORTAUDIOMEDIAENGINE_H_

include “talk/third_party/mediastreamer/mediastream.h”

include “talk/base/asyncsocket.h”

include “talk/base/scoped_ptr.h”

include “talk/session/phone/mediaengine.h”

include “talk/base/criticalsection.h”

include “portaudio.h”

include

include

// Engine settings

define ENGINE_BUFFER_SIZE 2048

// PortAudio settings

define PA_SAMPLE_TYPE (paFloat32) // 32 bit floating point output

typedef float SAMPLE;

define SAMPLE_RATE (44100)

//#define SAMPLE_RATE (11025)
//#define FRAMES_PER_BUFFER (1024)

define FRAMES_PER_BUFFER (256)

define SAMPLE_SILENCE (0.0f)

define NUM_CHANNELS (2)

//#define NUM_CHANNELS (1)

//#define PORTAUDIO_PACKET_LENGTH (2048)

define PORTAUDIO_PACKET_LENGTH (40960) // 一度に送る音声パケットのサイズ,これなら聞ける

define MAX_ALLOWED_LATENCY (0.5) // 許容する音声の最大遅延[s]

define MAX_SPEAKER_QUEUE_SIZE (MAX_ALLOWED_LATENCY * SAMPLE_RATE * NUM_CHANNELS) // キューの最大サイズ。これを超えたら先頭のは破棄する

ifndef M_PI

define M_PI (3.14159265)

endif

namespace cricket {

define TABLE_SIZE (200)

typedef struct
{
float sine[TABLE_SIZE];
int left_phase;
int right_phase;
char message[20];
}
paTestData;

typedef struct {
std::queue
qu;
}
paTestData2;

class PortAudioMediaEngine;

class PortAudioMediaChannel : public MediaChannel {
public:
PortAudioMediaChannel(PortAudioMediaEngine *eng);
virtual ~PortAudioMediaChannel();

virtual void SetCodecs(const std::vector &codecs);
virtual void OnPacketReceived(const void *data, int len);

virtual void SetPlayout(bool playout);
virtual void SetSend(bool send);

virtual int GetOutputLevel();
bool mute() {return mute_;}

virtual void StartMediaMonitor(VoiceChannel * voice_channel, uint32 cms) {}
virtual void StopMediaMonitor() {}

// portaudio
void pushToSpeaker( float, int );
void saveFromMicrophone( const float
, int );
sigslot::signal1 SignalReadFromMicEvent; // ready to read

paTestData data;
std::queue
qu_sine;

paTestData2 data2;
int totalFrames;

protected:
// portaudio
void readBuffer(float*, float**, float*, float*, float*, int);
void writeBuffer(float*, float*, float*, float*, float, int);

private:
PortAudioMediaEngine engine_;
AudioStream
audio_stream_;
talk_base::scoped_ptr socket_;
void OnReadFromMic(PortAudioMediaChannel*);

int pt;
bool mute
;
bool play_;

talk_base::CriticalSection crit_speaker;
talk_base::CriticalSection crit_microphone;

// portaudio
PaStream* stream_;
std::queue
qu_microphone; // ローカルのマイク用
std::queue
qu_speaker; // リモートから届いたのをためておく

};

class PortAudioMediaEngine : public MediaEngine {
public:
PortAudioMediaEngine();
~PortAudioMediaEngine();
virtual bool Init();
virtual void Terminate();

virtual MediaChannel *CreateChannel();

virtual int SetAudioOptions(int options);
virtual int SetSoundDevices(int wave_in_device, int wave_out_device);

virtual float GetCurrentQuality();
virtual int GetInputLevel();

virtual std::vector<codec , std::allocator > codecs() {return codecs_;}
virtual bool FindCodec(const Codec&);

private:
std::vector<codec , std::allocator > codecs_;
};

} // namespace cricket

endif // TALK_SESSION_PHONE_PORTAUDIOMEDIAENGINE_H_

[/cpp]

やりたいのは電話でなく一方向のストリーミングなので、
そろそろ自前のxmppサーバをたてて、
libjingleのxmppclientをいじることになるのかな。
あとspeexコーデックをいれてみたい。

mysql/tritonnのinsertベンチマーク

tritonn使おうかな。

これがいい
Tritonnを使うとMySQLはどう変わるの?

従いまして、MySQLにこのパッチを適用した場合、変わるのは基本的にはMyISAMのFULLTEXTインデックスの振る舞いのみで、後はこれまでと同様となっています。またこの図をご覧頂くとお分かりになるかと思いますが、MySQLへアクセスするアプリケーションから見ると、MySQLの上位層がSennaを隠蔽しているため、アプリケーション側で特にSennaを意識する必要はありません。

MATCH/AGAINSTを使ったSELECTでちょっぱやになる代わりに、

気になるのは、
INSERT時にどれだけ遅くなるのか?
実データに近い環境で計測してみる

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
test table<br />
+--------------+---------------------------+------+-----+---------------------+----------------+<br />
| Field        | Type                      | Null | Key | Default             | Extra          |<br />
+--------------+---------------------------+------+-----+---------------------+----------------+<br />
| id           | int(10) unsigned          | NO   | PRI | NULL                | auto_increment |<br />
中略<br />
| title        | varchar(255)              | NO   |     |                     |                |<br />
| description  | text                      | YES  |     | NULL                |                |<br />
中略<br />
+--------------+---------------------------+------+-----+---------------------+----------------+<br />
14 rows in set (0.00 sec)</p>
<p>mysql> select count(*) from test;<br />
+----------+<br />
| count(*) |<br />
+----------+<br />
|    16648 |<br />
+----------+<br />
1 row in set (0.00 sec)</p>
<p>santrini% mysqldump testdb test -u root -p "--where=id >=19000" > mysqldump_test_over_19000.log<br />
このあとinsert文だけにしておく</p>
<p>mysql> delete from test where id>=19000;<br />
Query OK, 887 rows affected (0.90 sec)</p>
<p>santrini% /usr/sbin/mysqld --version<br />
/usr/sbin/mysqld  Ver 5.0.51a-9+lenny2-log for debian-linux-gnu on x86_64 ((Debian))</p>
<p>santrini% cat bench.sh<br />
mysql testdb -u root --password=*********** < mysqldump_test_over_19000.log</p>
<p>santrini% /usr/bin/time ./bench.sh<br />
0.02user 0.01system 0:00.19elapsed 16%CPU (0avgtext+0avgdata 0maxresident)k<br />

time/insert = 0.19[s] / 887[rows] = 0.000214[s]

mysqldをシンボリックリンクでtritonnのmysqldに向けて再起動
FULLTEXT INDEXはデフォルトのNGRAMを使用

`
santrini% /usr/sbin/mysqld –version
/usr/sbin/mysqld Ver 5.0.51a-modified for redhat-linux-gnu on x86_64 (MySQL Community Server (GPL) (portions © Tritonn Project))

mysql> delete from test where id>=19000;
Query OK, 887 rows affected (0.07 sec)

mysql> create fulltext index title_description on test(title,description);
Query OK, 15761 rows affected (7.23 sec)
Records: 15761 Duplicates: 0 Warnings: 0

mysql> show index from test;
+——-+————+——————-+————–+————-+———–+————-+———-+——–+——+————+———+
| Table | Non_unique | Key_name | Seq_in_index | Column_name | Collation | Cardinality | Sub_part | Packed | Null | Index_type | Comment |
+——-+————+——————-+————–+————-+———–+————-+———-+——–+——+————+———+
| test | 0 | PRIMARY | 1 | id | A | 16648 | NULL | NULL | | BTREE | |
中略
| test | 1 | title_description | 1 | title | NULL | 1 | NULL | NULL | | FULLTEXT | |
| test | 1 | title_description | 2 | description | NULL | 1 | NULL | NULL | YES | FULLTEXT | |
+——-+————+——————-+————–+————-+———–+————-+———-+——–+——+————+———+
6 rows in set (0.00 sec)

santrini% /usr/bin/time ./bench.sh
0.02user 0.01system 0:15.89elapsed 0%CPU (0avgtext+0avgdata 0maxresident)k`

time/insert = 15.89[s] / 887[rows] = 0.0175[s]

ベンチマーク方法自信無い。
前のはINDEX張ってないので目的は比較ではないのだが、目標スペックがないのでなんとも。
まー使ってみましょう。

INSERT時にインデクサ動かしてるんだろうなぁ。
クライアントへはすぐに制御を返して、裏で非同期でインデクサを動かすオプション、があってもいいのかも、と思った。
INSERT直後にSELECTされたら見つからないかも、っていうのを許容できる人向けに。