Revision 774 (by ahitrov, 2019/04/22 15:18:57) |
Search through go-based JSON proxy
|
package sphinx::Keeper;
use strict;
use warnings 'all';
use base qw(Contenido::Keeper);
use Contenido::Globals;
use LWP::UserAgent;
use JSON::XS;
use Data::Dumper;
my $ua = LWP::UserAgent->new;
$ua->timeout(1);
######################
# Отправить объект в поиск:
# Вызов: $keeper->set_search( $object );
# Объект должен обязательно иметь метод
# ->get_search_data,
# возвращающий структуру:
# {
# id => $object->id,
# class => $object->class,
# name => $object_name,
# text => $search_text
# }
# Кроме того, метод чувствителен к полю status и предполагает,
# что status=1 - документ активен; status=0 - документ не активен.
##########################################################################
sub send
{
my $self = shift;
my $doc = shift;
return undef unless ref $doc && $doc->id;
my ($object) = $keeper->get_documents(
class => 'sphinx::Search',
object_id => $doc->id,
object_class => $doc->class,
);
if ( $doc->status == 1 ) {
my $data = $doc->get_search_data;
return undef unless $data;
unless ( ref $object ) {
$object = sphinx::Search->new( $keeper );
$object->status( 1 );
$object->is_deleted( 0 );
$object->object_id( $doc->id );
$object->object_class( $doc->class );
$object->name( $data->{name} );
$object->search( $data->{text} );
$object->store;
} else {
if ( $data->{name} ne $object->name || $data->{text} ne $object->search || $object->is_deleted || $object->status <= 0 ) {
$object->status( 1 );
$object->is_deleted( 0 );
$object->name( $data->{name} );
$object->search( $data->{text} );
$object->store;
}
}
} else {
if ( ref $object ) {
$object->status( 0 );
$object->is_deleted( 1 );
$object->store;
}
}
}
# Методы поиска
####################################################################
sub proxy {
my $self = shift;
my $text = shift;
return unless $text;
my $opts = shift // {};
my $db_table = delete $opts->{db_table} || $self->state->table_name;
my $uri = URI->new( 'http://'.$self->state->db_host.':'.$self->state->db_port.'/search/'.$db_table );
$uri->query_param( search => $text );
if ( exists $opts->{object_class} ) {
if ( ref $opts->{object_class} eq 'ARRAY' ) {
$uri->query_param( class => $opts->{object_class} );
} else {
$uri->query_param( class => $opts->{object_class} );
}
}
if ( exists $opts->{order_by} ) {
my @Orders;
if ( ref $opts->{order_by} eq 'ARRAY' ) {
foreach my $order ( @{$opts->{order_by}} ) {
if ( ref $order eq 'ARRAY' && scalar @$order > 0 && scalar @$order <= 2 ) {
if ( scalar @$order == 2 && lc($order->[1]) eq 'desc' ) {
push @Orders, { field => $order->[0], reverse => 'desc' };
} else {
push @Orders, { field => $order->[0] };
}
} elsif ( $order && !ref $order ) {
my @parts = split /\s*,\s+/, $order;
if ( scalar @parts == 2 && lc($parts[1]) eq 'desc' ) {
push @Orders, { field => $parts[0], reverse => 'desc' };
} else {
push @Orders, { field => $parts[0] };
}
}
}
} else {
my @orders = split /\s*,\s+/, $opts->{order_by};
foreach my $order ( @orders ) {
if ( $order =~ /^([\w\.]+)\s+(asc|desc)/i ) {
push @Orders, { field => $1, reverse => $2 };
} else {
push @Orders, { field => $order };
}
}
}
my $i = 1;
foreach my $order ( @Orders ) {
$uri->query_param( "order_$i" => $order->{field} );
if ( exists $order->{reverse} && lc($order->{reverse}) eq 'desc' ) {
$uri->query_param( "sort_$i" => 'desc' );
}
last if ++$i > 5;
}
}
if ( exists $opts->{limit} ) {
$uri->query_param( limit => $opts->{limit} );
}
if ( exists $opts->{offset} ) {
$uri->query_param( offset => $opts->{offset} );
}
warn "Ask sphinx: $uri\n" if $DEBUG;
my $res = $ua->get($uri);
if ($res->is_success) {
my $body = $res->decoded_content;
warn "Response: [$body]\n" if $DEBUG;
my $result = decode_json $body;
return $result;
} else {
warn "SPHINX PROXY error! Code: ".$res->code.". Status: ".$res->status_line."\n";
}
}
sub search {
my $self = shift;
my $text = shift;
return unless $text;
my (%opts) = @_;
my $result;
my $db_table = delete $opts{db_table} || $self->state->table_name;
my @wheres = ("MATCH(?)");
my @values = ($text);
my @orders;
my $count = delete $opts{count};
my $limit = delete $opts{limit};
return if $limit && ($limit =~ /\D/ || $limit < 0);
my $offset = delete $opts{offset};
return if $offset && ($offset =~ /\D/ || $offset < 0);
my $no_limit = delete $opts{no_limit};
unless ( $no_limit ) {
$limit ||= 1000;
}
if ( $count ) {
$limit = 0;
$offset = 0;
}
my $return_value = delete $opts{return_value} || 'array_ref';
my $hash_by = delete $opts{hash_by} || 'object_id';
if ( exists $opts{object_class} ) {
push @wheres, "object_class in (".join( ',', map { '?' } ref $opts{object_class} eq 'ARRAY' ? @{$opts{object_class}} : ($opts{object_class}) ).")";
push @values, ref $opts{object_class} ? @{$opts{object_class}} : $opts{object_class};
}
if ( exists $opts{order_by} ) {
push @orders, $opts{order_by};
} elsif ( !$count ) {
push @orders, 'weight desc, last_edited desc';
}
my $query = "select ".($count ? 'count(*) as cnt' : '*, weight() as weight')." from $db_table where ".join( ' and ', @wheres ).(@orders ? " order by ".join(', ', @orders) : '');
if ( $limit && $offset ) {
$query .= " limit $offset, $limit ";
} elsif ( $limit ) {
$query .= " limit 0, $limit ";
}
warn "SEARCH QUERY: $query\n" if $DEBUG;
warn "SEARCH VALUES: ".Dumper( \@values ) if $DEBUG;
my $sth = $self->SQL->prepare( $query );
if ( $sth->execute( @values ) ) {
if ( $count ) {
$result = $sth->fetchrow_arrayref;
warn "COUNT: ". Dumper( $result ) if $DEBUG;
$result = $result->[0];
} else {
$result = [];
while ( my $row = $sth->fetchrow_hashref ) {
push @$result, $row;
}
}
} else {
warn "ERROR in statement: ".$sth->errstr."\n";
warn "SEARCH QUERY: $query\n";
warn "SEARCH VALUES: ".Dumper( \@values );
}
$sth->finish;
return $result;
}
sub stemmed {
my $self = shift;
my $db_table = $self->state->table_name_stemmed;
return $self->search( @_, db_table => $db_table );
}
# МЕТОДЫ ДОСТУПА К СОЕДИНЕНИЯМ С БАЗОЙ УМНЫЕ
####################################################################
# получение соединения с базой или установка нового если его не было
sub SQL {
my $self = shift;
return ($self->connect_check() ? $self->{SQL} : undef);
}
# -------------------------------------------------------------------------------------------------
# Открываем соединение с базой данных
# -------------------------------------------------------------------------------------------------
sub connect {
my $self = shift;
#соединение уже есть
if ($self->is_connected) {
} else {
unless ($self->{SQL} = $self->db_connect) {
warn "Не могу соединиться с базой данных";
die;
}
$self->{SQL}->do("SET NAMES '".$self->state->db_client_encoding."'") if ($self->state->db_client_encoding);
}
$self->{_connect_ok} = 1;
return 1;
}
#проверка соединения с базой кеширующая состояние соединения
sub connect_check {
my $self = shift;
return 1 if ($self->{_connect_ok});
if ($self->is_connected) {
$self->{_connect_ok} = 1;
return 1;
} else {
if ($self->connect) {
return 1;
} else {
#сюда по логике попадать не должно так как die вылететь должен
warn "Connect failed\n";
return 0;
}
}
}
sub db_connect {
my $self = shift;
my $dbh = DBI->connect('DBI:mysql:host='.$self->{db_host}.';port='.$self->{db_port}.';mysql_enable_utf8=1')
|| die "Contenido Error: Не могу соединиться с Sphinx базой данных\n";
$dbh->{mysql_auto_reconnect} = 1;
# $dbh->{'AutoCommit'} = 1;
return $dbh;
}
sub is_connected {
my $self = shift;
if ( ref $self->{SQL} and $self->{SQL}->can('ping') and $self->{SQL}->ping() ) {
$self->{_connect_ok} = 1;
return 1;
} else {
$self->{_connect_ok} = 0;
return 0;
}
# warn 'Check if MySQL DB connected: '.(ref $self && exists $self->{SQL} && ref $self->{SQL} ? 1 : 0 ) if $DEBUG;
# return ( ref($self) && exists $self->{SQL} && ref $self->{SQL} );
}
1;